]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #include "util/trace_replay.h" | |
7 | ||
8 | #include <chrono> | |
9 | #include <sstream> | |
10 | #include <thread> | |
11 | #include "db/db_impl.h" | |
12 | #include "rocksdb/slice.h" | |
13 | #include "rocksdb/write_batch.h" | |
14 | #include "util/coding.h" | |
15 | #include "util/string_util.h" | |
16 | ||
17 | namespace rocksdb { | |
18 | ||
494da23a TL |
19 | const std::string kTraceMagic = "feedcafedeadbeef"; |
20 | ||
11fdf7f2 TL |
21 | namespace { |
22 | void EncodeCFAndKey(std::string* dst, uint32_t cf_id, const Slice& key) { | |
23 | PutFixed32(dst, cf_id); | |
24 | PutLengthPrefixedSlice(dst, key); | |
25 | } | |
26 | ||
27 | void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) { | |
28 | Slice buf(buffer); | |
29 | GetFixed32(&buf, cf_id); | |
30 | GetLengthPrefixedSlice(&buf, key); | |
31 | } | |
32 | } // namespace | |
33 | ||
494da23a TL |
34 | Tracer::Tracer(Env* env, const TraceOptions& trace_options, |
35 | std::unique_ptr<TraceWriter>&& trace_writer) | |
36 | : env_(env), | |
37 | trace_options_(trace_options), | |
38 | trace_writer_(std::move(trace_writer)), | |
39 | trace_request_count_ (0) { | |
11fdf7f2 TL |
40 | WriteHeader(); |
41 | } | |
42 | ||
43 | Tracer::~Tracer() { trace_writer_.reset(); } | |
44 | ||
45 | Status Tracer::Write(WriteBatch* write_batch) { | |
494da23a TL |
46 | TraceType trace_type = kTraceWrite; |
47 | if (ShouldSkipTrace(trace_type)) { | |
48 | return Status::OK(); | |
49 | } | |
11fdf7f2 TL |
50 | Trace trace; |
51 | trace.ts = env_->NowMicros(); | |
494da23a | 52 | trace.type = trace_type; |
11fdf7f2 TL |
53 | trace.payload = write_batch->Data(); |
54 | return WriteTrace(trace); | |
55 | } | |
56 | ||
57 | Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) { | |
494da23a TL |
58 | TraceType trace_type = kTraceGet; |
59 | if (ShouldSkipTrace(trace_type)) { | |
60 | return Status::OK(); | |
61 | } | |
11fdf7f2 TL |
62 | Trace trace; |
63 | trace.ts = env_->NowMicros(); | |
494da23a | 64 | trace.type = trace_type; |
11fdf7f2 TL |
65 | EncodeCFAndKey(&trace.payload, column_family->GetID(), key); |
66 | return WriteTrace(trace); | |
67 | } | |
68 | ||
69 | Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) { | |
494da23a TL |
70 | TraceType trace_type = kTraceIteratorSeek; |
71 | if (ShouldSkipTrace(trace_type)) { | |
72 | return Status::OK(); | |
73 | } | |
11fdf7f2 TL |
74 | Trace trace; |
75 | trace.ts = env_->NowMicros(); | |
494da23a | 76 | trace.type = trace_type; |
11fdf7f2 TL |
77 | EncodeCFAndKey(&trace.payload, cf_id, key); |
78 | return WriteTrace(trace); | |
79 | } | |
80 | ||
81 | Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) { | |
494da23a TL |
82 | TraceType trace_type = kTraceIteratorSeekForPrev; |
83 | if (ShouldSkipTrace(trace_type)) { | |
84 | return Status::OK(); | |
85 | } | |
11fdf7f2 TL |
86 | Trace trace; |
87 | trace.ts = env_->NowMicros(); | |
494da23a | 88 | trace.type = trace_type; |
11fdf7f2 TL |
89 | EncodeCFAndKey(&trace.payload, cf_id, key); |
90 | return WriteTrace(trace); | |
91 | } | |
92 | ||
494da23a TL |
93 | bool Tracer::ShouldSkipTrace(const TraceType& trace_type) { |
94 | if (IsTraceFileOverMax()) { | |
95 | return true; | |
96 | } | |
97 | if ((trace_options_.filter & kTraceFilterGet | |
98 | && trace_type == kTraceGet) | |
99 | || (trace_options_.filter & kTraceFilterWrite | |
100 | && trace_type == kTraceWrite)) { | |
101 | return true; | |
102 | } | |
103 | ++trace_request_count_; | |
104 | if (trace_request_count_ < trace_options_.sampling_frequency) { | |
105 | return true; | |
106 | } | |
107 | trace_request_count_ = 0; | |
108 | return false; | |
109 | } | |
110 | ||
111 | bool Tracer::IsTraceFileOverMax() { | |
112 | uint64_t trace_file_size = trace_writer_->GetFileSize(); | |
113 | return (trace_file_size > trace_options_.max_trace_file_size); | |
114 | } | |
115 | ||
11fdf7f2 TL |
116 | Status Tracer::WriteHeader() { |
117 | std::ostringstream s; | |
118 | s << kTraceMagic << "\t" | |
119 | << "Trace Version: 0.1\t" | |
120 | << "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t" | |
121 | << "Format: Timestamp OpType Payload\n"; | |
122 | std::string header(s.str()); | |
123 | ||
124 | Trace trace; | |
125 | trace.ts = env_->NowMicros(); | |
126 | trace.type = kTraceBegin; | |
127 | trace.payload = header; | |
128 | return WriteTrace(trace); | |
129 | } | |
130 | ||
131 | Status Tracer::WriteFooter() { | |
132 | Trace trace; | |
133 | trace.ts = env_->NowMicros(); | |
134 | trace.type = kTraceEnd; | |
135 | trace.payload = ""; | |
136 | return WriteTrace(trace); | |
137 | } | |
138 | ||
139 | Status Tracer::WriteTrace(const Trace& trace) { | |
140 | std::string encoded_trace; | |
141 | PutFixed64(&encoded_trace, trace.ts); | |
142 | encoded_trace.push_back(trace.type); | |
143 | PutFixed32(&encoded_trace, static_cast<uint32_t>(trace.payload.size())); | |
144 | encoded_trace.append(trace.payload); | |
145 | return trace_writer_->Write(Slice(encoded_trace)); | |
146 | } | |
147 | ||
148 | Status Tracer::Close() { return WriteFooter(); } | |
149 | ||
150 | Replayer::Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles, | |
494da23a | 151 | std::unique_ptr<TraceReader>&& reader) |
11fdf7f2 TL |
152 | : trace_reader_(std::move(reader)) { |
153 | assert(db != nullptr); | |
154 | db_ = static_cast<DBImpl*>(db->GetRootDB()); | |
155 | for (ColumnFamilyHandle* cfh : handles) { | |
156 | cf_map_[cfh->GetID()] = cfh; | |
157 | } | |
158 | } | |
159 | ||
160 | Replayer::~Replayer() { trace_reader_.reset(); } | |
161 | ||
162 | Status Replayer::Replay() { | |
163 | Status s; | |
164 | Trace header; | |
165 | s = ReadHeader(&header); | |
166 | if (!s.ok()) { | |
167 | return s; | |
168 | } | |
169 | ||
170 | std::chrono::system_clock::time_point replay_epoch = | |
171 | std::chrono::system_clock::now(); | |
172 | WriteOptions woptions; | |
173 | ReadOptions roptions; | |
174 | Trace trace; | |
175 | uint64_t ops = 0; | |
176 | Iterator* single_iter = nullptr; | |
177 | while (s.ok()) { | |
178 | trace.reset(); | |
179 | s = ReadTrace(&trace); | |
180 | if (!s.ok()) { | |
181 | break; | |
182 | } | |
183 | ||
184 | std::this_thread::sleep_until( | |
185 | replay_epoch + std::chrono::microseconds(trace.ts - header.ts)); | |
186 | if (trace.type == kTraceWrite) { | |
187 | WriteBatch batch(trace.payload); | |
188 | db_->Write(woptions, &batch); | |
189 | ops++; | |
190 | } else if (trace.type == kTraceGet) { | |
191 | uint32_t cf_id = 0; | |
192 | Slice key; | |
193 | DecodeCFAndKey(trace.payload, &cf_id, &key); | |
194 | if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) { | |
195 | return Status::Corruption("Invalid Column Family ID."); | |
196 | } | |
197 | ||
198 | std::string value; | |
199 | if (cf_id == 0) { | |
200 | db_->Get(roptions, key, &value); | |
201 | } else { | |
202 | db_->Get(roptions, cf_map_[cf_id], key, &value); | |
203 | } | |
204 | ops++; | |
205 | } else if (trace.type == kTraceIteratorSeek) { | |
206 | uint32_t cf_id = 0; | |
207 | Slice key; | |
208 | DecodeCFAndKey(trace.payload, &cf_id, &key); | |
209 | if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) { | |
210 | return Status::Corruption("Invalid Column Family ID."); | |
211 | } | |
212 | ||
213 | if (cf_id == 0) { | |
214 | single_iter = db_->NewIterator(roptions); | |
215 | } else { | |
216 | single_iter = db_->NewIterator(roptions, cf_map_[cf_id]); | |
217 | } | |
218 | single_iter->Seek(key); | |
219 | ops++; | |
220 | delete single_iter; | |
221 | } else if (trace.type == kTraceIteratorSeekForPrev) { | |
222 | // Currently, only support to call the Seek() | |
223 | uint32_t cf_id = 0; | |
224 | Slice key; | |
225 | DecodeCFAndKey(trace.payload, &cf_id, &key); | |
226 | if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) { | |
227 | return Status::Corruption("Invalid Column Family ID."); | |
228 | } | |
229 | ||
230 | if (cf_id == 0) { | |
231 | single_iter = db_->NewIterator(roptions); | |
232 | } else { | |
233 | single_iter = db_->NewIterator(roptions, cf_map_[cf_id]); | |
234 | } | |
235 | single_iter->SeekForPrev(key); | |
236 | ops++; | |
237 | delete single_iter; | |
238 | } else if (trace.type == kTraceEnd) { | |
239 | // Do nothing for now. | |
240 | // TODO: Add some validations later. | |
241 | break; | |
242 | } | |
243 | } | |
244 | ||
245 | if (s.IsIncomplete()) { | |
246 | // Reaching eof returns Incomplete status at the moment. | |
247 | // Could happen when killing a process without calling EndTrace() API. | |
248 | // TODO: Add better error handling. | |
249 | return Status::OK(); | |
250 | } | |
251 | return s; | |
252 | } | |
253 | ||
254 | Status Replayer::ReadHeader(Trace* header) { | |
255 | assert(header != nullptr); | |
256 | Status s = ReadTrace(header); | |
257 | if (!s.ok()) { | |
258 | return s; | |
259 | } | |
260 | if (header->type != kTraceBegin) { | |
261 | return Status::Corruption("Corrupted trace file. Incorrect header."); | |
262 | } | |
263 | if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) { | |
264 | return Status::Corruption("Corrupted trace file. Incorrect magic."); | |
265 | } | |
266 | ||
267 | return s; | |
268 | } | |
269 | ||
270 | Status Replayer::ReadFooter(Trace* footer) { | |
271 | assert(footer != nullptr); | |
272 | Status s = ReadTrace(footer); | |
273 | if (!s.ok()) { | |
274 | return s; | |
275 | } | |
276 | if (footer->type != kTraceEnd) { | |
277 | return Status::Corruption("Corrupted trace file. Incorrect footer."); | |
278 | } | |
279 | ||
280 | // TODO: Add more validations later | |
281 | return s; | |
282 | } | |
283 | ||
284 | Status Replayer::ReadTrace(Trace* trace) { | |
285 | assert(trace != nullptr); | |
286 | std::string encoded_trace; | |
287 | Status s = trace_reader_->Read(&encoded_trace); | |
288 | if (!s.ok()) { | |
289 | return s; | |
290 | } | |
291 | ||
292 | Slice enc_slice = Slice(encoded_trace); | |
293 | GetFixed64(&enc_slice, &trace->ts); | |
294 | trace->type = static_cast<TraceType>(enc_slice[0]); | |
295 | enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize); | |
296 | trace->payload = enc_slice.ToString(); | |
297 | return s; | |
298 | } | |
299 | ||
300 | } // namespace rocksdb |