]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/util/trace_replay.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / util / trace_replay.cc
CommitLineData
11fdf7f2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#include "util/trace_replay.h"
7
8#include <chrono>
9#include <sstream>
10#include <thread>
11#include "db/db_impl.h"
12#include "rocksdb/slice.h"
13#include "rocksdb/write_batch.h"
14#include "util/coding.h"
15#include "util/string_util.h"
16
17namespace rocksdb {
18
494da23a
TL
19const std::string kTraceMagic = "feedcafedeadbeef";
20
11fdf7f2
TL
21namespace {
22void EncodeCFAndKey(std::string* dst, uint32_t cf_id, const Slice& key) {
23 PutFixed32(dst, cf_id);
24 PutLengthPrefixedSlice(dst, key);
25}
26
27void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) {
28 Slice buf(buffer);
29 GetFixed32(&buf, cf_id);
30 GetLengthPrefixedSlice(&buf, key);
31}
32} // namespace
33
494da23a
TL
34Tracer::Tracer(Env* env, const TraceOptions& trace_options,
35 std::unique_ptr<TraceWriter>&& trace_writer)
36 : env_(env),
37 trace_options_(trace_options),
38 trace_writer_(std::move(trace_writer)),
39 trace_request_count_ (0) {
11fdf7f2
TL
40 WriteHeader();
41}
42
43Tracer::~Tracer() { trace_writer_.reset(); }
44
45Status Tracer::Write(WriteBatch* write_batch) {
494da23a
TL
46 TraceType trace_type = kTraceWrite;
47 if (ShouldSkipTrace(trace_type)) {
48 return Status::OK();
49 }
11fdf7f2
TL
50 Trace trace;
51 trace.ts = env_->NowMicros();
494da23a 52 trace.type = trace_type;
11fdf7f2
TL
53 trace.payload = write_batch->Data();
54 return WriteTrace(trace);
55}
56
57Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) {
494da23a
TL
58 TraceType trace_type = kTraceGet;
59 if (ShouldSkipTrace(trace_type)) {
60 return Status::OK();
61 }
11fdf7f2
TL
62 Trace trace;
63 trace.ts = env_->NowMicros();
494da23a 64 trace.type = trace_type;
11fdf7f2
TL
65 EncodeCFAndKey(&trace.payload, column_family->GetID(), key);
66 return WriteTrace(trace);
67}
68
69Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) {
494da23a
TL
70 TraceType trace_type = kTraceIteratorSeek;
71 if (ShouldSkipTrace(trace_type)) {
72 return Status::OK();
73 }
11fdf7f2
TL
74 Trace trace;
75 trace.ts = env_->NowMicros();
494da23a 76 trace.type = trace_type;
11fdf7f2
TL
77 EncodeCFAndKey(&trace.payload, cf_id, key);
78 return WriteTrace(trace);
79}
80
81Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) {
494da23a
TL
82 TraceType trace_type = kTraceIteratorSeekForPrev;
83 if (ShouldSkipTrace(trace_type)) {
84 return Status::OK();
85 }
11fdf7f2
TL
86 Trace trace;
87 trace.ts = env_->NowMicros();
494da23a 88 trace.type = trace_type;
11fdf7f2
TL
89 EncodeCFAndKey(&trace.payload, cf_id, key);
90 return WriteTrace(trace);
91}
92
494da23a
TL
93bool Tracer::ShouldSkipTrace(const TraceType& trace_type) {
94 if (IsTraceFileOverMax()) {
95 return true;
96 }
97 if ((trace_options_.filter & kTraceFilterGet
98 && trace_type == kTraceGet)
99 || (trace_options_.filter & kTraceFilterWrite
100 && trace_type == kTraceWrite)) {
101 return true;
102 }
103 ++trace_request_count_;
104 if (trace_request_count_ < trace_options_.sampling_frequency) {
105 return true;
106 }
107 trace_request_count_ = 0;
108 return false;
109}
110
111bool Tracer::IsTraceFileOverMax() {
112 uint64_t trace_file_size = trace_writer_->GetFileSize();
113 return (trace_file_size > trace_options_.max_trace_file_size);
114}
115
11fdf7f2
TL
116Status Tracer::WriteHeader() {
117 std::ostringstream s;
118 s << kTraceMagic << "\t"
119 << "Trace Version: 0.1\t"
120 << "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t"
121 << "Format: Timestamp OpType Payload\n";
122 std::string header(s.str());
123
124 Trace trace;
125 trace.ts = env_->NowMicros();
126 trace.type = kTraceBegin;
127 trace.payload = header;
128 return WriteTrace(trace);
129}
130
131Status Tracer::WriteFooter() {
132 Trace trace;
133 trace.ts = env_->NowMicros();
134 trace.type = kTraceEnd;
135 trace.payload = "";
136 return WriteTrace(trace);
137}
138
139Status Tracer::WriteTrace(const Trace& trace) {
140 std::string encoded_trace;
141 PutFixed64(&encoded_trace, trace.ts);
142 encoded_trace.push_back(trace.type);
143 PutFixed32(&encoded_trace, static_cast<uint32_t>(trace.payload.size()));
144 encoded_trace.append(trace.payload);
145 return trace_writer_->Write(Slice(encoded_trace));
146}
147
148Status Tracer::Close() { return WriteFooter(); }
149
150Replayer::Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
494da23a 151 std::unique_ptr<TraceReader>&& reader)
11fdf7f2
TL
152 : trace_reader_(std::move(reader)) {
153 assert(db != nullptr);
154 db_ = static_cast<DBImpl*>(db->GetRootDB());
155 for (ColumnFamilyHandle* cfh : handles) {
156 cf_map_[cfh->GetID()] = cfh;
157 }
158}
159
160Replayer::~Replayer() { trace_reader_.reset(); }
161
162Status Replayer::Replay() {
163 Status s;
164 Trace header;
165 s = ReadHeader(&header);
166 if (!s.ok()) {
167 return s;
168 }
169
170 std::chrono::system_clock::time_point replay_epoch =
171 std::chrono::system_clock::now();
172 WriteOptions woptions;
173 ReadOptions roptions;
174 Trace trace;
175 uint64_t ops = 0;
176 Iterator* single_iter = nullptr;
177 while (s.ok()) {
178 trace.reset();
179 s = ReadTrace(&trace);
180 if (!s.ok()) {
181 break;
182 }
183
184 std::this_thread::sleep_until(
185 replay_epoch + std::chrono::microseconds(trace.ts - header.ts));
186 if (trace.type == kTraceWrite) {
187 WriteBatch batch(trace.payload);
188 db_->Write(woptions, &batch);
189 ops++;
190 } else if (trace.type == kTraceGet) {
191 uint32_t cf_id = 0;
192 Slice key;
193 DecodeCFAndKey(trace.payload, &cf_id, &key);
194 if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
195 return Status::Corruption("Invalid Column Family ID.");
196 }
197
198 std::string value;
199 if (cf_id == 0) {
200 db_->Get(roptions, key, &value);
201 } else {
202 db_->Get(roptions, cf_map_[cf_id], key, &value);
203 }
204 ops++;
205 } else if (trace.type == kTraceIteratorSeek) {
206 uint32_t cf_id = 0;
207 Slice key;
208 DecodeCFAndKey(trace.payload, &cf_id, &key);
209 if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
210 return Status::Corruption("Invalid Column Family ID.");
211 }
212
213 if (cf_id == 0) {
214 single_iter = db_->NewIterator(roptions);
215 } else {
216 single_iter = db_->NewIterator(roptions, cf_map_[cf_id]);
217 }
218 single_iter->Seek(key);
219 ops++;
220 delete single_iter;
221 } else if (trace.type == kTraceIteratorSeekForPrev) {
222 // Currently, only support to call the Seek()
223 uint32_t cf_id = 0;
224 Slice key;
225 DecodeCFAndKey(trace.payload, &cf_id, &key);
226 if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
227 return Status::Corruption("Invalid Column Family ID.");
228 }
229
230 if (cf_id == 0) {
231 single_iter = db_->NewIterator(roptions);
232 } else {
233 single_iter = db_->NewIterator(roptions, cf_map_[cf_id]);
234 }
235 single_iter->SeekForPrev(key);
236 ops++;
237 delete single_iter;
238 } else if (trace.type == kTraceEnd) {
239 // Do nothing for now.
240 // TODO: Add some validations later.
241 break;
242 }
243 }
244
245 if (s.IsIncomplete()) {
246 // Reaching eof returns Incomplete status at the moment.
247 // Could happen when killing a process without calling EndTrace() API.
248 // TODO: Add better error handling.
249 return Status::OK();
250 }
251 return s;
252}
253
254Status Replayer::ReadHeader(Trace* header) {
255 assert(header != nullptr);
256 Status s = ReadTrace(header);
257 if (!s.ok()) {
258 return s;
259 }
260 if (header->type != kTraceBegin) {
261 return Status::Corruption("Corrupted trace file. Incorrect header.");
262 }
263 if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
264 return Status::Corruption("Corrupted trace file. Incorrect magic.");
265 }
266
267 return s;
268}
269
270Status Replayer::ReadFooter(Trace* footer) {
271 assert(footer != nullptr);
272 Status s = ReadTrace(footer);
273 if (!s.ok()) {
274 return s;
275 }
276 if (footer->type != kTraceEnd) {
277 return Status::Corruption("Corrupted trace file. Incorrect footer.");
278 }
279
280 // TODO: Add more validations later
281 return s;
282}
283
284Status Replayer::ReadTrace(Trace* trace) {
285 assert(trace != nullptr);
286 std::string encoded_trace;
287 Status s = trace_reader_->Read(&encoded_trace);
288 if (!s.ok()) {
289 return s;
290 }
291
292 Slice enc_slice = Slice(encoded_trace);
293 GetFixed64(&enc_slice, &trace->ts);
294 trace->type = static_cast<TraceType>(enc_slice[0]);
295 enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize);
296 trace->payload = enc_slice.ToString();
297 return s;
298}
299
300} // namespace rocksdb