]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/trace_replay/block_cache_tracer.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / trace_replay / block_cache_tracer.cc
CommitLineData
f67539c2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#include "trace_replay/block_cache_tracer.h"
7
8#include <cinttypes>
9#include <cstdio>
10#include <cstdlib>
11
12#include "db/db_impl/db_impl.h"
13#include "db/dbformat.h"
14#include "rocksdb/slice.h"
15#include "util/coding.h"
16#include "util/hash.h"
17#include "util/string_util.h"
18
19namespace ROCKSDB_NAMESPACE {
20
21namespace {
f67539c2
TL
22bool ShouldTrace(const Slice& block_key, const TraceOptions& trace_options) {
23 if (trace_options.sampling_frequency == 0 ||
24 trace_options.sampling_frequency == 1) {
25 return true;
26 }
27 // We use spatial downsampling so that we have a complete access history for a
28 // block.
20effc67 29 return 0 == GetSliceRangedNPHash(block_key, trace_options.sampling_frequency);
f67539c2
TL
30}
31} // namespace
32
33const uint64_t kMicrosInSecond = 1000 * 1000;
34const uint64_t kSecondInMinute = 60;
35const uint64_t kSecondInHour = 3600;
36const std::string BlockCacheTraceHelper::kUnknownColumnFamilyName =
37 "UnknownColumnFamily";
38const uint64_t BlockCacheTraceHelper::kReservedGetId = 0;
39
40bool BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(
41 TraceType block_type, TableReaderCaller caller) {
42 return (block_type == TraceType::kBlockTraceDataBlock) &&
43 IsGetOrMultiGet(caller);
44}
45
46bool BlockCacheTraceHelper::IsGetOrMultiGet(TableReaderCaller caller) {
47 return caller == TableReaderCaller::kUserGet ||
48 caller == TableReaderCaller::kUserMultiGet;
49}
50
51bool BlockCacheTraceHelper::IsUserAccess(TableReaderCaller caller) {
52 return caller == TableReaderCaller::kUserGet ||
53 caller == TableReaderCaller::kUserMultiGet ||
54 caller == TableReaderCaller::kUserIterator ||
55 caller == TableReaderCaller::kUserApproximateSize ||
56 caller == TableReaderCaller::kUserVerifyChecksum;
57}
58
59std::string BlockCacheTraceHelper::ComputeRowKey(
60 const BlockCacheTraceRecord& access) {
61 if (!IsGetOrMultiGet(access.caller)) {
62 return "";
63 }
64 Slice key = ExtractUserKey(access.referenced_key);
65 return std::to_string(access.sst_fd_number) + "_" + key.ToString();
66}
67
68uint64_t BlockCacheTraceHelper::GetTableId(
69 const BlockCacheTraceRecord& access) {
70 if (!IsGetOrMultiGet(access.caller) || access.referenced_key.size() < 4) {
71 return 0;
72 }
73 return static_cast<uint64_t>(DecodeFixed32(access.referenced_key.data())) + 1;
74}
75
76uint64_t BlockCacheTraceHelper::GetSequenceNumber(
77 const BlockCacheTraceRecord& access) {
78 if (!IsGetOrMultiGet(access.caller)) {
79 return 0;
80 }
81 return access.get_from_user_specified_snapshot == Boolean::kFalse
82 ? 0
83 : 1 + GetInternalKeySeqno(access.referenced_key);
84}
85
86uint64_t BlockCacheTraceHelper::GetBlockOffsetInFile(
87 const BlockCacheTraceRecord& access) {
88 Slice input(access.block_key);
89 uint64_t offset = 0;
90 while (true) {
91 uint64_t tmp = 0;
92 if (GetVarint64(&input, &tmp)) {
93 offset = tmp;
94 } else {
95 break;
96 }
97 }
98 return offset;
99}
100
101BlockCacheTraceWriter::BlockCacheTraceWriter(
102 Env* env, const TraceOptions& trace_options,
103 std::unique_ptr<TraceWriter>&& trace_writer)
104 : env_(env),
105 trace_options_(trace_options),
106 trace_writer_(std::move(trace_writer)) {}
107
108Status BlockCacheTraceWriter::WriteBlockAccess(
109 const BlockCacheTraceRecord& record, const Slice& block_key,
110 const Slice& cf_name, const Slice& referenced_key) {
111 uint64_t trace_file_size = trace_writer_->GetFileSize();
112 if (trace_file_size > trace_options_.max_trace_file_size) {
113 return Status::OK();
114 }
115 Trace trace;
116 trace.ts = record.access_timestamp;
117 trace.type = record.block_type;
118 PutLengthPrefixedSlice(&trace.payload, block_key);
119 PutFixed64(&trace.payload, record.block_size);
120 PutFixed64(&trace.payload, record.cf_id);
121 PutLengthPrefixedSlice(&trace.payload, cf_name);
122 PutFixed32(&trace.payload, record.level);
123 PutFixed64(&trace.payload, record.sst_fd_number);
124 trace.payload.push_back(record.caller);
125 trace.payload.push_back(record.is_cache_hit);
126 trace.payload.push_back(record.no_insert);
127 if (BlockCacheTraceHelper::IsGetOrMultiGet(record.caller)) {
128 PutFixed64(&trace.payload, record.get_id);
129 trace.payload.push_back(record.get_from_user_specified_snapshot);
130 PutLengthPrefixedSlice(&trace.payload, referenced_key);
131 }
132 if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(record.block_type,
133 record.caller)) {
134 PutFixed64(&trace.payload, record.referenced_data_size);
135 PutFixed64(&trace.payload, record.num_keys_in_block);
136 trace.payload.push_back(record.referenced_key_exist_in_block);
137 }
138 std::string encoded_trace;
139 TracerHelper::EncodeTrace(trace, &encoded_trace);
140 return trace_writer_->Write(encoded_trace);
141}
142
143Status BlockCacheTraceWriter::WriteHeader() {
144 Trace trace;
145 trace.ts = env_->NowMicros();
146 trace.type = TraceType::kTraceBegin;
147 PutLengthPrefixedSlice(&trace.payload, kTraceMagic);
148 PutFixed32(&trace.payload, kMajorVersion);
149 PutFixed32(&trace.payload, kMinorVersion);
150 std::string encoded_trace;
151 TracerHelper::EncodeTrace(trace, &encoded_trace);
152 return trace_writer_->Write(encoded_trace);
153}
154
155BlockCacheTraceReader::BlockCacheTraceReader(
156 std::unique_ptr<TraceReader>&& reader)
157 : trace_reader_(std::move(reader)) {}
158
159Status BlockCacheTraceReader::ReadHeader(BlockCacheTraceHeader* header) {
160 assert(header != nullptr);
161 std::string encoded_trace;
162 Status s = trace_reader_->Read(&encoded_trace);
163 if (!s.ok()) {
164 return s;
165 }
166 Trace trace;
167 s = TracerHelper::DecodeTrace(encoded_trace, &trace);
168 if (!s.ok()) {
169 return s;
170 }
171 header->start_time = trace.ts;
172 Slice enc_slice = Slice(trace.payload);
173 Slice magnic_number;
174 if (!GetLengthPrefixedSlice(&enc_slice, &magnic_number)) {
175 return Status::Corruption(
176 "Corrupted header in the trace file: Failed to read the magic number.");
177 }
178 if (magnic_number.ToString() != kTraceMagic) {
179 return Status::Corruption(
180 "Corrupted header in the trace file: Magic number does not match.");
181 }
182 if (!GetFixed32(&enc_slice, &header->rocksdb_major_version)) {
183 return Status::Corruption(
184 "Corrupted header in the trace file: Failed to read rocksdb major "
185 "version number.");
186 }
187 if (!GetFixed32(&enc_slice, &header->rocksdb_minor_version)) {
188 return Status::Corruption(
189 "Corrupted header in the trace file: Failed to read rocksdb minor "
190 "version number.");
191 }
192 // We should have retrieved all information in the header.
193 if (!enc_slice.empty()) {
194 return Status::Corruption(
195 "Corrupted header in the trace file: The length of header is too "
196 "long.");
197 }
198 return Status::OK();
199}
200
201Status BlockCacheTraceReader::ReadAccess(BlockCacheTraceRecord* record) {
202 assert(record);
203 std::string encoded_trace;
204 Status s = trace_reader_->Read(&encoded_trace);
205 if (!s.ok()) {
206 return s;
207 }
208 Trace trace;
209 s = TracerHelper::DecodeTrace(encoded_trace, &trace);
210 if (!s.ok()) {
211 return s;
212 }
213 record->access_timestamp = trace.ts;
214 record->block_type = trace.type;
215 Slice enc_slice = Slice(trace.payload);
216
20effc67
TL
217 const unsigned int kCharSize = 1;
218
f67539c2
TL
219 Slice block_key;
220 if (!GetLengthPrefixedSlice(&enc_slice, &block_key)) {
221 return Status::Incomplete(
222 "Incomplete access record: Failed to read block key.");
223 }
224 record->block_key = block_key.ToString();
225 if (!GetFixed64(&enc_slice, &record->block_size)) {
226 return Status::Incomplete(
227 "Incomplete access record: Failed to read block size.");
228 }
229 if (!GetFixed64(&enc_slice, &record->cf_id)) {
230 return Status::Incomplete(
231 "Incomplete access record: Failed to read column family ID.");
232 }
233 Slice cf_name;
234 if (!GetLengthPrefixedSlice(&enc_slice, &cf_name)) {
235 return Status::Incomplete(
236 "Incomplete access record: Failed to read column family name.");
237 }
238 record->cf_name = cf_name.ToString();
239 if (!GetFixed32(&enc_slice, &record->level)) {
240 return Status::Incomplete(
241 "Incomplete access record: Failed to read level.");
242 }
243 if (!GetFixed64(&enc_slice, &record->sst_fd_number)) {
244 return Status::Incomplete(
245 "Incomplete access record: Failed to read SST file number.");
246 }
247 if (enc_slice.empty()) {
248 return Status::Incomplete(
249 "Incomplete access record: Failed to read caller.");
250 }
251 record->caller = static_cast<TableReaderCaller>(enc_slice[0]);
252 enc_slice.remove_prefix(kCharSize);
253 if (enc_slice.empty()) {
254 return Status::Incomplete(
255 "Incomplete access record: Failed to read is_cache_hit.");
256 }
257 record->is_cache_hit = static_cast<Boolean>(enc_slice[0]);
258 enc_slice.remove_prefix(kCharSize);
259 if (enc_slice.empty()) {
260 return Status::Incomplete(
261 "Incomplete access record: Failed to read no_insert.");
262 }
263 record->no_insert = static_cast<Boolean>(enc_slice[0]);
264 enc_slice.remove_prefix(kCharSize);
265 if (BlockCacheTraceHelper::IsGetOrMultiGet(record->caller)) {
266 if (!GetFixed64(&enc_slice, &record->get_id)) {
267 return Status::Incomplete(
268 "Incomplete access record: Failed to read the get id.");
269 }
270 if (enc_slice.empty()) {
271 return Status::Incomplete(
272 "Incomplete access record: Failed to read "
273 "get_from_user_specified_snapshot.");
274 }
275 record->get_from_user_specified_snapshot =
276 static_cast<Boolean>(enc_slice[0]);
277 enc_slice.remove_prefix(kCharSize);
278 Slice referenced_key;
279 if (!GetLengthPrefixedSlice(&enc_slice, &referenced_key)) {
280 return Status::Incomplete(
281 "Incomplete access record: Failed to read the referenced key.");
282 }
283 record->referenced_key = referenced_key.ToString();
284 }
285 if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(record->block_type,
286 record->caller)) {
287 if (!GetFixed64(&enc_slice, &record->referenced_data_size)) {
288 return Status::Incomplete(
289 "Incomplete access record: Failed to read the referenced data size.");
290 }
291 if (!GetFixed64(&enc_slice, &record->num_keys_in_block)) {
292 return Status::Incomplete(
293 "Incomplete access record: Failed to read the number of keys in the "
294 "block.");
295 }
296 if (enc_slice.empty()) {
297 return Status::Incomplete(
298 "Incomplete access record: Failed to read "
299 "referenced_key_exist_in_block.");
300 }
301 record->referenced_key_exist_in_block = static_cast<Boolean>(enc_slice[0]);
302 }
303 return Status::OK();
304}
305
306BlockCacheHumanReadableTraceWriter::~BlockCacheHumanReadableTraceWriter() {
307 if (human_readable_trace_file_writer_) {
20effc67
TL
308 human_readable_trace_file_writer_->Flush().PermitUncheckedError();
309 human_readable_trace_file_writer_->Close().PermitUncheckedError();
f67539c2
TL
310 }
311}
312
313Status BlockCacheHumanReadableTraceWriter::NewWritableFile(
314 const std::string& human_readable_trace_file_path,
315 ROCKSDB_NAMESPACE::Env* env) {
316 if (human_readable_trace_file_path.empty()) {
317 return Status::InvalidArgument(
318 "The provided human_readable_trace_file_path is null.");
319 }
320 return env->NewWritableFile(human_readable_trace_file_path,
321 &human_readable_trace_file_writer_, EnvOptions());
322}
323
324Status BlockCacheHumanReadableTraceWriter::WriteHumanReadableTraceRecord(
325 const BlockCacheTraceRecord& access, uint64_t block_id,
326 uint64_t get_key_id) {
327 if (!human_readable_trace_file_writer_) {
328 return Status::OK();
329 }
330 int ret = snprintf(
331 trace_record_buffer_, sizeof(trace_record_buffer_),
332 "%" PRIu64 ",%" PRIu64 ",%u,%" PRIu64 ",%" PRIu64 ",%s,%" PRIu32
333 ",%" PRIu64 ",%u,%u,%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%u,%u,%" PRIu64
334 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 "\n",
335 access.access_timestamp, block_id, access.block_type, access.block_size,
336 access.cf_id, access.cf_name.c_str(), access.level, access.sst_fd_number,
337 access.caller, access.no_insert, access.get_id, get_key_id,
338 access.referenced_data_size, access.is_cache_hit,
339 access.referenced_key_exist_in_block, access.num_keys_in_block,
340 BlockCacheTraceHelper::GetTableId(access),
341 BlockCacheTraceHelper::GetSequenceNumber(access),
342 static_cast<uint64_t>(access.block_key.size()),
343 static_cast<uint64_t>(access.referenced_key.size()),
344 BlockCacheTraceHelper::GetBlockOffsetInFile(access));
345 if (ret < 0) {
346 return Status::IOError("failed to format the output");
347 }
348 std::string printout(trace_record_buffer_);
349 return human_readable_trace_file_writer_->Append(printout);
350}
351
352BlockCacheHumanReadableTraceReader::BlockCacheHumanReadableTraceReader(
353 const std::string& trace_file_path)
354 : BlockCacheTraceReader(/*trace_reader=*/nullptr) {
355 human_readable_trace_reader_.open(trace_file_path, std::ifstream::in);
356}
357
358BlockCacheHumanReadableTraceReader::~BlockCacheHumanReadableTraceReader() {
359 human_readable_trace_reader_.close();
360}
361
362Status BlockCacheHumanReadableTraceReader::ReadHeader(
363 BlockCacheTraceHeader* /*header*/) {
364 return Status::OK();
365}
366
367Status BlockCacheHumanReadableTraceReader::ReadAccess(
368 BlockCacheTraceRecord* record) {
369 std::string line;
370 if (!std::getline(human_readable_trace_reader_, line)) {
371 return Status::Incomplete("No more records to read.");
372 }
373 std::stringstream ss(line);
374 std::vector<std::string> record_strs;
375 while (ss.good()) {
376 std::string substr;
377 getline(ss, substr, ',');
378 record_strs.push_back(substr);
379 }
380 if (record_strs.size() != 21) {
381 return Status::Incomplete("Records format is wrong.");
382 }
383
384 record->access_timestamp = ParseUint64(record_strs[0]);
385 uint64_t block_key = ParseUint64(record_strs[1]);
386 record->block_type = static_cast<TraceType>(ParseUint64(record_strs[2]));
387 record->block_size = ParseUint64(record_strs[3]);
388 record->cf_id = ParseUint64(record_strs[4]);
389 record->cf_name = record_strs[5];
390 record->level = static_cast<uint32_t>(ParseUint64(record_strs[6]));
391 record->sst_fd_number = ParseUint64(record_strs[7]);
392 record->caller = static_cast<TableReaderCaller>(ParseUint64(record_strs[8]));
393 record->no_insert = static_cast<Boolean>(ParseUint64(record_strs[9]));
394 record->get_id = ParseUint64(record_strs[10]);
395 uint64_t get_key_id = ParseUint64(record_strs[11]);
396
397 record->referenced_data_size = ParseUint64(record_strs[12]);
398 record->is_cache_hit = static_cast<Boolean>(ParseUint64(record_strs[13]));
399 record->referenced_key_exist_in_block =
400 static_cast<Boolean>(ParseUint64(record_strs[14]));
401 record->num_keys_in_block = ParseUint64(record_strs[15]);
402 uint64_t table_id = ParseUint64(record_strs[16]);
403 if (table_id > 0) {
404 // Decrement since valid table id in the trace file equals traced table id
405 // + 1.
406 table_id -= 1;
407 }
408 uint64_t get_sequence_number = ParseUint64(record_strs[17]);
409 if (get_sequence_number > 0) {
410 record->get_from_user_specified_snapshot = Boolean::kTrue;
411 // Decrement since valid seq number in the trace file equals traced seq
412 // number + 1.
413 get_sequence_number -= 1;
414 }
415 uint64_t block_key_size = ParseUint64(record_strs[18]);
416 uint64_t get_key_size = ParseUint64(record_strs[19]);
417 uint64_t block_offset = ParseUint64(record_strs[20]);
418
419 std::string tmp_block_key;
420 PutVarint64(&tmp_block_key, block_key);
421 PutVarint64(&tmp_block_key, block_offset);
422 // Append 1 until the size is the same as traced block key size.
423 while (record->block_key.size() < block_key_size - tmp_block_key.size()) {
424 record->block_key += "1";
425 }
426 record->block_key += tmp_block_key;
427
428 if (get_key_id != 0) {
429 std::string tmp_get_key;
430 PutFixed64(&tmp_get_key, get_key_id);
431 PutFixed64(&tmp_get_key, get_sequence_number << 8);
432 PutFixed32(&record->referenced_key, static_cast<uint32_t>(table_id));
433 // Append 1 until the size is the same as traced key size.
434 while (record->referenced_key.size() < get_key_size - tmp_get_key.size()) {
435 record->referenced_key += "1";
436 }
437 record->referenced_key += tmp_get_key;
438 }
439 return Status::OK();
440}
441
442BlockCacheTracer::BlockCacheTracer() { writer_.store(nullptr); }
443
444BlockCacheTracer::~BlockCacheTracer() { EndTrace(); }
445
446Status BlockCacheTracer::StartTrace(
447 Env* env, const TraceOptions& trace_options,
448 std::unique_ptr<TraceWriter>&& trace_writer) {
449 InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
450 if (writer_.load()) {
451 return Status::Busy();
452 }
453 get_id_counter_.store(1);
454 trace_options_ = trace_options;
455 writer_.store(
456 new BlockCacheTraceWriter(env, trace_options, std::move(trace_writer)));
457 return writer_.load()->WriteHeader();
458}
459
460void BlockCacheTracer::EndTrace() {
461 InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
462 if (!writer_.load()) {
463 return;
464 }
465 delete writer_.load();
466 writer_.store(nullptr);
467}
468
469Status BlockCacheTracer::WriteBlockAccess(const BlockCacheTraceRecord& record,
470 const Slice& block_key,
471 const Slice& cf_name,
472 const Slice& referenced_key) {
473 if (!writer_.load() || !ShouldTrace(block_key, trace_options_)) {
474 return Status::OK();
475 }
476 InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
477 if (!writer_.load()) {
478 return Status::OK();
479 }
480 return writer_.load()->WriteBlockAccess(record, block_key, cf_name,
481 referenced_key);
482}
483
484uint64_t BlockCacheTracer::NextGetId() {
485 if (!writer_.load(std::memory_order_relaxed)) {
486 return BlockCacheTraceHelper::kReservedGetId;
487 }
488 uint64_t prev_value = get_id_counter_.fetch_add(1);
489 if (prev_value == BlockCacheTraceHelper::kReservedGetId) {
490 // fetch and add again.
491 return get_id_counter_.fetch_add(1);
492 }
493 return prev_value;
494}
495
496} // namespace ROCKSDB_NAMESPACE