]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/trace_replay/trace_record_handler.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / trace_replay / trace_record_handler.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "trace_replay/trace_record_handler.h"
7
8 #include "rocksdb/iterator.h"
9 #include "rocksdb/trace_record_result.h"
10 #include "rocksdb/write_batch.h"
11
12 namespace ROCKSDB_NAMESPACE {
13
14 // TraceExecutionHandler
15 TraceExecutionHandler::TraceExecutionHandler(
16 DB* db, const std::vector<ColumnFamilyHandle*>& handles)
17 : TraceRecord::Handler(),
18 db_(db),
19 write_opts_(WriteOptions()),
20 read_opts_(ReadOptions()) {
21 assert(db != nullptr);
22 assert(!handles.empty());
23 cf_map_.reserve(handles.size());
24 for (ColumnFamilyHandle* handle : handles) {
25 assert(handle != nullptr);
26 cf_map_.insert({handle->GetID(), handle});
27 }
28 clock_ = db_->GetEnv()->GetSystemClock().get();
29 }
30
31 TraceExecutionHandler::~TraceExecutionHandler() { cf_map_.clear(); }
32
33 Status TraceExecutionHandler::Handle(
34 const WriteQueryTraceRecord& record,
35 std::unique_ptr<TraceRecordResult>* result) {
36 if (result != nullptr) {
37 result->reset(nullptr);
38 }
39 uint64_t start = clock_->NowMicros();
40
41 WriteBatch batch(record.GetWriteBatchRep().ToString());
42 Status s = db_->Write(write_opts_, &batch);
43
44 uint64_t end = clock_->NowMicros();
45
46 if (s.ok() && result != nullptr) {
47 result->reset(new StatusOnlyTraceExecutionResult(s, start, end,
48 record.GetTraceType()));
49 }
50
51 return s;
52 }
53
54 Status TraceExecutionHandler::Handle(
55 const GetQueryTraceRecord& record,
56 std::unique_ptr<TraceRecordResult>* result) {
57 if (result != nullptr) {
58 result->reset(nullptr);
59 }
60 auto it = cf_map_.find(record.GetColumnFamilyID());
61 if (it == cf_map_.end()) {
62 return Status::Corruption("Invalid Column Family ID.");
63 }
64
65 uint64_t start = clock_->NowMicros();
66
67 std::string value;
68 Status s = db_->Get(read_opts_, it->second, record.GetKey(), &value);
69
70 uint64_t end = clock_->NowMicros();
71
72 // Treat not found as ok, return other errors.
73 if (!s.ok() && !s.IsNotFound()) {
74 return s;
75 }
76
77 if (result != nullptr) {
78 // Report the actual opetation status in TraceExecutionResult
79 result->reset(new SingleValueTraceExecutionResult(
80 std::move(s), std::move(value), start, end, record.GetTraceType()));
81 }
82 return Status::OK();
83 }
84
85 Status TraceExecutionHandler::Handle(
86 const IteratorSeekQueryTraceRecord& record,
87 std::unique_ptr<TraceRecordResult>* result) {
88 if (result != nullptr) {
89 result->reset(nullptr);
90 }
91 auto it = cf_map_.find(record.GetColumnFamilyID());
92 if (it == cf_map_.end()) {
93 return Status::Corruption("Invalid Column Family ID.");
94 }
95
96 ReadOptions r_opts = read_opts_;
97 Slice lower = record.GetLowerBound();
98 if (!lower.empty()) {
99 r_opts.iterate_lower_bound = &lower;
100 }
101 Slice upper = record.GetUpperBound();
102 if (!upper.empty()) {
103 r_opts.iterate_upper_bound = &upper;
104 }
105 Iterator* single_iter = db_->NewIterator(r_opts, it->second);
106
107 uint64_t start = clock_->NowMicros();
108
109 switch (record.GetSeekType()) {
110 case IteratorSeekQueryTraceRecord::kSeekForPrev: {
111 single_iter->SeekForPrev(record.GetKey());
112 break;
113 }
114 default: {
115 single_iter->Seek(record.GetKey());
116 break;
117 }
118 }
119
120 uint64_t end = clock_->NowMicros();
121
122 Status s = single_iter->status();
123 if (s.ok() && result != nullptr) {
124 if (single_iter->Valid()) {
125 PinnableSlice ps_key;
126 ps_key.PinSelf(single_iter->key());
127 PinnableSlice ps_value;
128 ps_value.PinSelf(single_iter->value());
129 result->reset(new IteratorTraceExecutionResult(
130 true, s, std::move(ps_key), std::move(ps_value), start, end,
131 record.GetTraceType()));
132 } else {
133 result->reset(new IteratorTraceExecutionResult(
134 false, s, "", "", start, end, record.GetTraceType()));
135 }
136 }
137 delete single_iter;
138
139 return s;
140 }
141
142 Status TraceExecutionHandler::Handle(
143 const MultiGetQueryTraceRecord& record,
144 std::unique_ptr<TraceRecordResult>* result) {
145 if (result != nullptr) {
146 result->reset(nullptr);
147 }
148 std::vector<ColumnFamilyHandle*> handles;
149 handles.reserve(record.GetColumnFamilyIDs().size());
150 for (uint32_t cf_id : record.GetColumnFamilyIDs()) {
151 auto it = cf_map_.find(cf_id);
152 if (it == cf_map_.end()) {
153 return Status::Corruption("Invalid Column Family ID.");
154 }
155 handles.push_back(it->second);
156 }
157
158 std::vector<Slice> keys = record.GetKeys();
159
160 if (handles.empty() || keys.empty()) {
161 return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
162 }
163 if (handles.size() != keys.size()) {
164 return Status::InvalidArgument("MultiGet cf_ids and keys size mismatch.");
165 }
166
167 uint64_t start = clock_->NowMicros();
168
169 std::vector<std::string> values;
170 std::vector<Status> ss = db_->MultiGet(read_opts_, handles, keys, &values);
171
172 uint64_t end = clock_->NowMicros();
173
174 // Treat not found as ok, return other errors.
175 for (const Status& s : ss) {
176 if (!s.ok() && !s.IsNotFound()) {
177 return s;
178 }
179 }
180
181 if (result != nullptr) {
182 // Report the actual opetation status in TraceExecutionResult
183 result->reset(new MultiValuesTraceExecutionResult(
184 std::move(ss), std::move(values), start, end, record.GetTraceType()));
185 }
186
187 return Status::OK();
188 }
189
190 } // namespace ROCKSDB_NAMESPACE