1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "trace_replay/trace_record_handler.h"
8 #include "rocksdb/iterator.h"
9 #include "rocksdb/trace_record_result.h"
10 #include "rocksdb/write_batch.h"
12 namespace ROCKSDB_NAMESPACE
{
14 // TraceExecutionHandler
15 TraceExecutionHandler::TraceExecutionHandler(
16 DB
* db
, const std::vector
<ColumnFamilyHandle
*>& handles
)
17 : TraceRecord::Handler(),
19 write_opts_(WriteOptions()),
20 read_opts_(ReadOptions()) {
21 assert(db
!= nullptr);
22 assert(!handles
.empty());
23 cf_map_
.reserve(handles
.size());
24 for (ColumnFamilyHandle
* handle
: handles
) {
25 assert(handle
!= nullptr);
26 cf_map_
.insert({handle
->GetID(), handle
});
28 clock_
= db_
->GetEnv()->GetSystemClock().get();
31 TraceExecutionHandler::~TraceExecutionHandler() { cf_map_
.clear(); }
33 Status
TraceExecutionHandler::Handle(
34 const WriteQueryTraceRecord
& record
,
35 std::unique_ptr
<TraceRecordResult
>* result
) {
36 if (result
!= nullptr) {
37 result
->reset(nullptr);
39 uint64_t start
= clock_
->NowMicros();
41 WriteBatch
batch(record
.GetWriteBatchRep().ToString());
42 Status s
= db_
->Write(write_opts_
, &batch
);
44 uint64_t end
= clock_
->NowMicros();
46 if (s
.ok() && result
!= nullptr) {
47 result
->reset(new StatusOnlyTraceExecutionResult(s
, start
, end
,
48 record
.GetTraceType()));
54 Status
TraceExecutionHandler::Handle(
55 const GetQueryTraceRecord
& record
,
56 std::unique_ptr
<TraceRecordResult
>* result
) {
57 if (result
!= nullptr) {
58 result
->reset(nullptr);
60 auto it
= cf_map_
.find(record
.GetColumnFamilyID());
61 if (it
== cf_map_
.end()) {
62 return Status::Corruption("Invalid Column Family ID.");
65 uint64_t start
= clock_
->NowMicros();
68 Status s
= db_
->Get(read_opts_
, it
->second
, record
.GetKey(), &value
);
70 uint64_t end
= clock_
->NowMicros();
72 // Treat not found as ok, return other errors.
73 if (!s
.ok() && !s
.IsNotFound()) {
77 if (result
!= nullptr) {
78 // Report the actual opetation status in TraceExecutionResult
79 result
->reset(new SingleValueTraceExecutionResult(
80 std::move(s
), std::move(value
), start
, end
, record
.GetTraceType()));
85 Status
TraceExecutionHandler::Handle(
86 const IteratorSeekQueryTraceRecord
& record
,
87 std::unique_ptr
<TraceRecordResult
>* result
) {
88 if (result
!= nullptr) {
89 result
->reset(nullptr);
91 auto it
= cf_map_
.find(record
.GetColumnFamilyID());
92 if (it
== cf_map_
.end()) {
93 return Status::Corruption("Invalid Column Family ID.");
96 ReadOptions r_opts
= read_opts_
;
97 Slice lower
= record
.GetLowerBound();
99 r_opts
.iterate_lower_bound
= &lower
;
101 Slice upper
= record
.GetUpperBound();
102 if (!upper
.empty()) {
103 r_opts
.iterate_upper_bound
= &upper
;
105 Iterator
* single_iter
= db_
->NewIterator(r_opts
, it
->second
);
107 uint64_t start
= clock_
->NowMicros();
109 switch (record
.GetSeekType()) {
110 case IteratorSeekQueryTraceRecord::kSeekForPrev
: {
111 single_iter
->SeekForPrev(record
.GetKey());
115 single_iter
->Seek(record
.GetKey());
120 uint64_t end
= clock_
->NowMicros();
122 Status s
= single_iter
->status();
123 if (s
.ok() && result
!= nullptr) {
124 if (single_iter
->Valid()) {
125 PinnableSlice ps_key
;
126 ps_key
.PinSelf(single_iter
->key());
127 PinnableSlice ps_value
;
128 ps_value
.PinSelf(single_iter
->value());
129 result
->reset(new IteratorTraceExecutionResult(
130 true, s
, std::move(ps_key
), std::move(ps_value
), start
, end
,
131 record
.GetTraceType()));
133 result
->reset(new IteratorTraceExecutionResult(
134 false, s
, "", "", start
, end
, record
.GetTraceType()));
142 Status
TraceExecutionHandler::Handle(
143 const MultiGetQueryTraceRecord
& record
,
144 std::unique_ptr
<TraceRecordResult
>* result
) {
145 if (result
!= nullptr) {
146 result
->reset(nullptr);
148 std::vector
<ColumnFamilyHandle
*> handles
;
149 handles
.reserve(record
.GetColumnFamilyIDs().size());
150 for (uint32_t cf_id
: record
.GetColumnFamilyIDs()) {
151 auto it
= cf_map_
.find(cf_id
);
152 if (it
== cf_map_
.end()) {
153 return Status::Corruption("Invalid Column Family ID.");
155 handles
.push_back(it
->second
);
158 std::vector
<Slice
> keys
= record
.GetKeys();
160 if (handles
.empty() || keys
.empty()) {
161 return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
163 if (handles
.size() != keys
.size()) {
164 return Status::InvalidArgument("MultiGet cf_ids and keys size mismatch.");
167 uint64_t start
= clock_
->NowMicros();
169 std::vector
<std::string
> values
;
170 std::vector
<Status
> ss
= db_
->MultiGet(read_opts_
, handles
, keys
, &values
);
172 uint64_t end
= clock_
->NowMicros();
174 // Treat not found as ok, return other errors.
175 for (const Status
& s
: ss
) {
176 if (!s
.ok() && !s
.IsNotFound()) {
181 if (result
!= nullptr) {
182 // Report the actual opetation status in TraceExecutionResult
183 result
->reset(new MultiValuesTraceExecutionResult(
184 std::move(ss
), std::move(values
), start
, end
, record
.GetTraceType()));
190 } // namespace ROCKSDB_NAMESPACE