]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/cassandra/format.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / utilities / cassandra / format.cc
1 // Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "format.h"
7
8 #include <algorithm>
9 #include <map>
10 #include <memory>
11
12 #include "utilities/cassandra/serialize.h"
13
14 namespace rocksdb {
15 namespace cassandra {
16 namespace {
17 const int32_t kDefaultLocalDeletionTime =
18 std::numeric_limits<int32_t>::max();
19 const int64_t kDefaultMarkedForDeleteAt =
20 std::numeric_limits<int64_t>::min();
21 }
22
23 ColumnBase::ColumnBase(int8_t mask, int8_t index)
24 : mask_(mask), index_(index) {}
25
26 std::size_t ColumnBase::Size() const {
27 return sizeof(mask_) + sizeof(index_);
28 }
29
30 int8_t ColumnBase::Mask() const {
31 return mask_;
32 }
33
34 int8_t ColumnBase::Index() const {
35 return index_;
36 }
37
38 void ColumnBase::Serialize(std::string* dest) const {
39 rocksdb::cassandra::Serialize<int8_t>(mask_, dest);
40 rocksdb::cassandra::Serialize<int8_t>(index_, dest);
41 }
42
43 std::shared_ptr<ColumnBase> ColumnBase::Deserialize(const char* src,
44 std::size_t offset) {
45 int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
46 if ((mask & ColumnTypeMask::DELETION_MASK) != 0) {
47 return Tombstone::Deserialize(src, offset);
48 } else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) {
49 return ExpiringColumn::Deserialize(src, offset);
50 } else {
51 return Column::Deserialize(src, offset);
52 }
53 }
54
55 Column::Column(
56 int8_t mask,
57 int8_t index,
58 int64_t timestamp,
59 int32_t value_size,
60 const char* value
61 ) : ColumnBase(mask, index), timestamp_(timestamp),
62 value_size_(value_size), value_(value) {}
63
64 int64_t Column::Timestamp() const {
65 return timestamp_;
66 }
67
68 std::size_t Column::Size() const {
69 return ColumnBase::Size() + sizeof(timestamp_) + sizeof(value_size_)
70 + value_size_;
71 }
72
73 void Column::Serialize(std::string* dest) const {
74 ColumnBase::Serialize(dest);
75 rocksdb::cassandra::Serialize<int64_t>(timestamp_, dest);
76 rocksdb::cassandra::Serialize<int32_t>(value_size_, dest);
77 dest->append(value_, value_size_);
78 }
79
80 std::shared_ptr<Column> Column::Deserialize(const char *src,
81 std::size_t offset) {
82 int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
83 offset += sizeof(mask);
84 int8_t index = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
85 offset += sizeof(index);
86 int64_t timestamp = rocksdb::cassandra::Deserialize<int64_t>(src, offset);
87 offset += sizeof(timestamp);
88 int32_t value_size = rocksdb::cassandra::Deserialize<int32_t>(src, offset);
89 offset += sizeof(value_size);
90 return std::make_shared<Column>(
91 mask, index, timestamp, value_size, src + offset);
92 }
93
94 ExpiringColumn::ExpiringColumn(
95 int8_t mask,
96 int8_t index,
97 int64_t timestamp,
98 int32_t value_size,
99 const char* value,
100 int32_t ttl
101 ) : Column(mask, index, timestamp, value_size, value),
102 ttl_(ttl) {}
103
104 std::size_t ExpiringColumn::Size() const {
105 return Column::Size() + sizeof(ttl_);
106 }
107
108 void ExpiringColumn::Serialize(std::string* dest) const {
109 Column::Serialize(dest);
110 rocksdb::cassandra::Serialize<int32_t>(ttl_, dest);
111 }
112
113 std::chrono::time_point<std::chrono::system_clock> ExpiringColumn::TimePoint() const {
114 return std::chrono::time_point<std::chrono::system_clock>(std::chrono::microseconds(Timestamp()));
115 }
116
117 std::chrono::seconds ExpiringColumn::Ttl() const {
118 return std::chrono::seconds(ttl_);
119 }
120
121 bool ExpiringColumn::Expired() const {
122 return TimePoint() + Ttl() < std::chrono::system_clock::now();
123 }
124
125 std::shared_ptr<Tombstone> ExpiringColumn::ToTombstone() const {
126 auto expired_at = (TimePoint() + Ttl()).time_since_epoch();
127 int32_t local_deletion_time = static_cast<int32_t>(
128 std::chrono::duration_cast<std::chrono::seconds>(expired_at).count());
129 int64_t marked_for_delete_at =
130 std::chrono::duration_cast<std::chrono::microseconds>(expired_at).count();
131 return std::make_shared<Tombstone>(
132 static_cast<int8_t>(ColumnTypeMask::DELETION_MASK),
133 Index(),
134 local_deletion_time,
135 marked_for_delete_at);
136 }
137
138 std::shared_ptr<ExpiringColumn> ExpiringColumn::Deserialize(
139 const char *src,
140 std::size_t offset) {
141 int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
142 offset += sizeof(mask);
143 int8_t index = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
144 offset += sizeof(index);
145 int64_t timestamp = rocksdb::cassandra::Deserialize<int64_t>(src, offset);
146 offset += sizeof(timestamp);
147 int32_t value_size = rocksdb::cassandra::Deserialize<int32_t>(src, offset);
148 offset += sizeof(value_size);
149 const char* value = src + offset;
150 offset += value_size;
151 int32_t ttl = rocksdb::cassandra::Deserialize<int32_t>(src, offset);
152 return std::make_shared<ExpiringColumn>(
153 mask, index, timestamp, value_size, value, ttl);
154 }
155
156 Tombstone::Tombstone(
157 int8_t mask,
158 int8_t index,
159 int32_t local_deletion_time,
160 int64_t marked_for_delete_at
161 ) : ColumnBase(mask, index), local_deletion_time_(local_deletion_time),
162 marked_for_delete_at_(marked_for_delete_at) {}
163
164 int64_t Tombstone::Timestamp() const {
165 return marked_for_delete_at_;
166 }
167
168 std::size_t Tombstone::Size() const {
169 return ColumnBase::Size() + sizeof(local_deletion_time_)
170 + sizeof(marked_for_delete_at_);
171 }
172
173 void Tombstone::Serialize(std::string* dest) const {
174 ColumnBase::Serialize(dest);
175 rocksdb::cassandra::Serialize<int32_t>(local_deletion_time_, dest);
176 rocksdb::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
177 }
178
179 bool Tombstone::Collectable(int32_t gc_grace_period_in_seconds) const {
180 auto local_deleted_at = std::chrono::time_point<std::chrono::system_clock>(
181 std::chrono::seconds(local_deletion_time_));
182 auto gc_grace_period = std::chrono::seconds(gc_grace_period_in_seconds);
183 return local_deleted_at + gc_grace_period < std::chrono::system_clock::now();
184 }
185
186 std::shared_ptr<Tombstone> Tombstone::Deserialize(const char *src,
187 std::size_t offset) {
188 int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
189 offset += sizeof(mask);
190 int8_t index = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
191 offset += sizeof(index);
192 int32_t local_deletion_time =
193 rocksdb::cassandra::Deserialize<int32_t>(src, offset);
194 offset += sizeof(int32_t);
195 int64_t marked_for_delete_at =
196 rocksdb::cassandra::Deserialize<int64_t>(src, offset);
197 return std::make_shared<Tombstone>(
198 mask, index, local_deletion_time, marked_for_delete_at);
199 }
200
201 RowValue::RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at)
202 : local_deletion_time_(local_deletion_time),
203 marked_for_delete_at_(marked_for_delete_at), columns_(),
204 last_modified_time_(0) {}
205
206 RowValue::RowValue(Columns columns,
207 int64_t last_modified_time)
208 : local_deletion_time_(kDefaultLocalDeletionTime),
209 marked_for_delete_at_(kDefaultMarkedForDeleteAt),
210 columns_(std::move(columns)), last_modified_time_(last_modified_time) {}
211
212 std::size_t RowValue::Size() const {
213 std::size_t size = sizeof(local_deletion_time_)
214 + sizeof(marked_for_delete_at_);
215 for (const auto& column : columns_) {
216 size += column -> Size();
217 }
218 return size;
219 }
220
221 int64_t RowValue::LastModifiedTime() const {
222 if (IsTombstone()) {
223 return marked_for_delete_at_;
224 } else {
225 return last_modified_time_;
226 }
227 }
228
229 bool RowValue::IsTombstone() const {
230 return marked_for_delete_at_ > kDefaultMarkedForDeleteAt;
231 }
232
233 void RowValue::Serialize(std::string* dest) const {
234 rocksdb::cassandra::Serialize<int32_t>(local_deletion_time_, dest);
235 rocksdb::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
236 for (const auto& column : columns_) {
237 column -> Serialize(dest);
238 }
239 }
240
241 RowValue RowValue::RemoveExpiredColumns(bool* changed) const {
242 *changed = false;
243 Columns new_columns;
244 for (auto& column : columns_) {
245 if(column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
246 std::shared_ptr<ExpiringColumn> expiring_column =
247 std::static_pointer_cast<ExpiringColumn>(column);
248
249 if(expiring_column->Expired()){
250 *changed = true;
251 continue;
252 }
253 }
254
255 new_columns.push_back(column);
256 }
257 return RowValue(std::move(new_columns), last_modified_time_);
258 }
259
260 RowValue RowValue::ConvertExpiredColumnsToTombstones(bool* changed) const {
261 *changed = false;
262 Columns new_columns;
263 for (auto& column : columns_) {
264 if(column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
265 std::shared_ptr<ExpiringColumn> expiring_column =
266 std::static_pointer_cast<ExpiringColumn>(column);
267
268 if(expiring_column->Expired()) {
269 std::shared_ptr<Tombstone> tombstone = expiring_column->ToTombstone();
270 new_columns.push_back(tombstone);
271 *changed = true;
272 continue;
273 }
274 }
275 new_columns.push_back(column);
276 }
277 return RowValue(std::move(new_columns), last_modified_time_);
278 }
279
280 RowValue RowValue::RemoveTombstones(int32_t gc_grace_period) const {
281 Columns new_columns;
282 for (auto& column : columns_) {
283 if (column->Mask() == ColumnTypeMask::DELETION_MASK) {
284 std::shared_ptr<Tombstone> tombstone =
285 std::static_pointer_cast<Tombstone>(column);
286
287 if (tombstone->Collectable(gc_grace_period)) {
288 continue;
289 }
290 }
291
292 new_columns.push_back(column);
293 }
294 return RowValue(std::move(new_columns), last_modified_time_);
295 }
296
297 bool RowValue::Empty() const {
298 return columns_.empty();
299 }
300
301 RowValue RowValue::Deserialize(const char *src, std::size_t size) {
302 std::size_t offset = 0;
303 assert(size >= sizeof(local_deletion_time_) + sizeof(marked_for_delete_at_));
304 int32_t local_deletion_time =
305 rocksdb::cassandra::Deserialize<int32_t>(src, offset);
306 offset += sizeof(int32_t);
307 int64_t marked_for_delete_at =
308 rocksdb::cassandra::Deserialize<int64_t>(src, offset);
309 offset += sizeof(int64_t);
310 if (offset == size) {
311 return RowValue(local_deletion_time, marked_for_delete_at);
312 }
313
314 assert(local_deletion_time == kDefaultLocalDeletionTime);
315 assert(marked_for_delete_at == kDefaultMarkedForDeleteAt);
316 Columns columns;
317 int64_t last_modified_time = 0;
318 while (offset < size) {
319 auto c = ColumnBase::Deserialize(src, offset);
320 offset += c -> Size();
321 assert(offset <= size);
322 last_modified_time = std::max(last_modified_time, c -> Timestamp());
323 columns.push_back(std::move(c));
324 }
325
326 return RowValue(std::move(columns), last_modified_time);
327 }
328
329 // Merge multiple row values into one.
330 // For each column in rows with same index, we pick the one with latest
331 // timestamp. And we also take row tombstone into consideration, by iterating
332 // each row from reverse timestamp order, and stop once we hit the first
333 // row tombstone.
334 RowValue RowValue::Merge(std::vector<RowValue>&& values) {
335 assert(values.size() > 0);
336 if (values.size() == 1) {
337 return std::move(values[0]);
338 }
339
340 // Merge columns by their last modified time, and skip once we hit
341 // a row tombstone.
342 std::sort(values.begin(), values.end(),
343 [](const RowValue& r1, const RowValue& r2) {
344 return r1.LastModifiedTime() > r2.LastModifiedTime();
345 });
346
347 std::map<int8_t, std::shared_ptr<ColumnBase>> merged_columns;
348 int64_t tombstone_timestamp = 0;
349
350 for (auto& value : values) {
351 if (value.IsTombstone()) {
352 if (merged_columns.size() == 0) {
353 return std::move(value);
354 }
355 tombstone_timestamp = value.LastModifiedTime();
356 break;
357 }
358 for (auto& column : value.columns_) {
359 int8_t index = column->Index();
360 if (merged_columns.find(index) == merged_columns.end()) {
361 merged_columns[index] = column;
362 } else {
363 if (column->Timestamp() > merged_columns[index]->Timestamp()) {
364 merged_columns[index] = column;
365 }
366 }
367 }
368 }
369
370 int64_t last_modified_time = 0;
371 Columns columns;
372 for (auto& pair: merged_columns) {
373 // For some row, its last_modified_time > row tombstone_timestamp, but
374 // it might have rows whose timestamp is ealier than tombstone, so we
375 // ned to filter these rows.
376 if (pair.second->Timestamp() <= tombstone_timestamp) {
377 continue;
378 }
379 last_modified_time = std::max(last_modified_time, pair.second->Timestamp());
380 columns.push_back(std::move(pair.second));
381 }
382 return RowValue(std::move(columns), last_modified_time);
383 }
384
385 } // namepsace cassandrda
386 } // namespace rocksdb