]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #ifndef ROCKSDB_LITE | |
7 | ||
8 | #include "rocksdb/utilities/spatial_db.h" | |
9 | ||
10 | #ifndef __STDC_FORMAT_MACROS | |
11 | #define __STDC_FORMAT_MACROS | |
12 | #endif | |
13 | ||
14 | #include <algorithm> | |
15 | #include <condition_variable> | |
16 | #include <inttypes.h> | |
17 | #include <string> | |
18 | #include <vector> | |
19 | #include <mutex> | |
20 | #include <thread> | |
21 | #include <set> | |
22 | #include <unordered_set> | |
23 | ||
24 | #include "rocksdb/cache.h" | |
25 | #include "rocksdb/options.h" | |
26 | #include "rocksdb/memtablerep.h" | |
27 | #include "rocksdb/slice_transform.h" | |
28 | #include "rocksdb/statistics.h" | |
29 | #include "rocksdb/table.h" | |
30 | #include "rocksdb/db.h" | |
31 | #include "rocksdb/utilities/stackable_db.h" | |
32 | #include "util/coding.h" | |
33 | #include "utilities/spatialdb/utils.h" | |
34 | #include "port/port.h" | |
35 | ||
36 | namespace rocksdb { | |
37 | namespace spatial { | |
38 | ||
39 | // Column families are used to store element's data and spatial indexes. We use | |
40 | // [default] column family to store the element data. This is the format of | |
41 | // [default] column family: | |
42 | // * id (fixed 64 big endian) -> blob (length prefixed slice) feature_set | |
43 | // (serialized) | |
44 | // We have one additional column family for each spatial index. The name of the | |
45 | // column family is [spatial$<spatial_index_name>]. The format is: | |
46 | // * quad_key (fixed 64 bit big endian) id (fixed 64 bit big endian) -> "" | |
47 | // We store information about indexes in [metadata] column family. Format is: | |
48 | // * spatial$<spatial_index_name> -> bbox (4 double encodings) tile_bits | |
49 | // (varint32) | |
50 | ||
51 | namespace { | |
52 | const std::string kMetadataColumnFamilyName("metadata"); | |
53 | inline std::string GetSpatialIndexColumnFamilyName( | |
54 | const std::string& spatial_index_name) { | |
55 | return "spatial$" + spatial_index_name; | |
56 | } | |
57 | inline bool GetSpatialIndexName(const std::string& column_family_name, | |
58 | Slice* dst) { | |
59 | *dst = Slice(column_family_name); | |
60 | if (dst->starts_with("spatial$")) { | |
61 | dst->remove_prefix(8); // strlen("spatial$") | |
62 | return true; | |
63 | } | |
64 | return false; | |
65 | } | |
66 | ||
67 | } // namespace | |
68 | ||
69 | void Variant::Init(const Variant& v, Data& d) { | |
70 | switch (v.type_) { | |
71 | case kNull: | |
72 | break; | |
73 | case kBool: | |
74 | d.b = v.data_.b; | |
75 | break; | |
76 | case kInt: | |
77 | d.i = v.data_.i; | |
78 | break; | |
79 | case kDouble: | |
80 | d.d = v.data_.d; | |
81 | break; | |
82 | case kString: | |
83 | new (d.s) std::string(*GetStringPtr(v.data_)); | |
84 | break; | |
85 | default: | |
86 | assert(false); | |
87 | } | |
88 | } | |
89 | ||
90 | Variant& Variant::operator=(const Variant& v) { | |
91 | // Construct first a temp so exception from a string ctor | |
92 | // does not change this object | |
93 | Data tmp; | |
94 | Init(v, tmp); | |
95 | ||
96 | Type thisType = type_; | |
97 | // Boils down to copying bits so safe | |
98 | std::swap(tmp, data_); | |
99 | type_ = v.type_; | |
100 | ||
101 | Destroy(thisType, tmp); | |
102 | ||
103 | return *this; | |
104 | } | |
105 | ||
106 | Variant& Variant::operator=(Variant&& rhs) { | |
107 | Destroy(type_, data_); | |
108 | if (rhs.type_ == kString) { | |
109 | new (data_.s) std::string(std::move(*GetStringPtr(rhs.data_))); | |
110 | } else { | |
111 | data_ = rhs.data_; | |
112 | } | |
113 | type_ = rhs.type_; | |
114 | rhs.type_ = kNull; | |
115 | return *this; | |
116 | } | |
117 | ||
118 | bool Variant::operator==(const Variant& rhs) const { | |
119 | if (type_ != rhs.type_) { | |
120 | return false; | |
121 | } | |
122 | ||
123 | switch (type_) { | |
124 | case kNull: | |
125 | return true; | |
126 | case kBool: | |
127 | return data_.b == rhs.data_.b; | |
128 | case kInt: | |
129 | return data_.i == rhs.data_.i; | |
130 | case kDouble: | |
131 | return data_.d == rhs.data_.d; | |
132 | case kString: | |
133 | return *GetStringPtr(data_) == *GetStringPtr(rhs.data_); | |
134 | default: | |
135 | assert(false); | |
136 | } | |
137 | // it will never reach here, but otherwise the compiler complains | |
138 | return false; | |
139 | } | |
140 | ||
141 | FeatureSet* FeatureSet::Set(const std::string& key, const Variant& value) { | |
142 | map_.insert({key, value}); | |
143 | return this; | |
144 | } | |
145 | ||
146 | bool FeatureSet::Contains(const std::string& key) const { | |
147 | return map_.find(key) != map_.end(); | |
148 | } | |
149 | ||
150 | const Variant& FeatureSet::Get(const std::string& key) const { | |
151 | auto itr = map_.find(key); | |
152 | assert(itr != map_.end()); | |
153 | return itr->second; | |
154 | } | |
155 | ||
156 | FeatureSet::iterator FeatureSet::Find(const std::string& key) const { | |
157 | return iterator(map_.find(key)); | |
158 | } | |
159 | ||
160 | void FeatureSet::Clear() { map_.clear(); } | |
161 | ||
162 | void FeatureSet::Serialize(std::string* output) const { | |
163 | for (const auto& iter : map_) { | |
164 | PutLengthPrefixedSlice(output, iter.first); | |
165 | output->push_back(static_cast<char>(iter.second.type())); | |
166 | switch (iter.second.type()) { | |
167 | case Variant::kNull: | |
168 | break; | |
169 | case Variant::kBool: | |
170 | output->push_back(static_cast<char>(iter.second.get_bool())); | |
171 | break; | |
172 | case Variant::kInt: | |
173 | PutVarint64(output, iter.second.get_int()); | |
174 | break; | |
175 | case Variant::kDouble: { | |
176 | PutDouble(output, iter.second.get_double()); | |
177 | break; | |
178 | } | |
179 | case Variant::kString: | |
180 | PutLengthPrefixedSlice(output, iter.second.get_string()); | |
181 | break; | |
182 | default: | |
183 | assert(false); | |
184 | } | |
185 | } | |
186 | } | |
187 | ||
188 | bool FeatureSet::Deserialize(const Slice& input) { | |
189 | assert(map_.empty()); | |
190 | Slice s(input); | |
191 | while (s.size()) { | |
192 | Slice key; | |
193 | if (!GetLengthPrefixedSlice(&s, &key) || s.size() == 0) { | |
194 | return false; | |
195 | } | |
196 | char type = s[0]; | |
197 | s.remove_prefix(1); | |
198 | switch (type) { | |
199 | case Variant::kNull: { | |
200 | map_.insert({key.ToString(), Variant()}); | |
201 | break; | |
202 | } | |
203 | case Variant::kBool: { | |
204 | if (s.size() == 0) { | |
205 | return false; | |
206 | } | |
207 | map_.insert({key.ToString(), Variant(static_cast<bool>(s[0]))}); | |
208 | s.remove_prefix(1); | |
209 | break; | |
210 | } | |
211 | case Variant::kInt: { | |
212 | uint64_t v; | |
213 | if (!GetVarint64(&s, &v)) { | |
214 | return false; | |
215 | } | |
216 | map_.insert({key.ToString(), Variant(v)}); | |
217 | break; | |
218 | } | |
219 | case Variant::kDouble: { | |
220 | double d; | |
221 | if (!GetDouble(&s, &d)) { | |
222 | return false; | |
223 | } | |
224 | map_.insert({key.ToString(), Variant(d)}); | |
225 | break; | |
226 | } | |
227 | case Variant::kString: { | |
228 | Slice str; | |
229 | if (!GetLengthPrefixedSlice(&s, &str)) { | |
230 | return false; | |
231 | } | |
232 | map_.insert({key.ToString(), str.ToString()}); | |
233 | break; | |
234 | } | |
235 | default: | |
236 | return false; | |
237 | } | |
238 | } | |
239 | return true; | |
240 | } | |
241 | ||
242 | std::string FeatureSet::DebugString() const { | |
243 | std::string out = "{"; | |
244 | bool comma = false; | |
245 | for (const auto& iter : map_) { | |
246 | if (comma) { | |
247 | out.append(", "); | |
248 | } else { | |
249 | comma = true; | |
250 | } | |
251 | out.append("\"" + iter.first + "\": "); | |
252 | switch (iter.second.type()) { | |
253 | case Variant::kNull: | |
254 | out.append("null"); | |
255 | break; | |
256 | case Variant::kBool: | |
257 | if (iter.second.get_bool()) { | |
258 | out.append("true"); | |
259 | } else { | |
260 | out.append("false"); | |
261 | } | |
262 | break; | |
263 | case Variant::kInt: { | |
264 | char buf[32]; | |
265 | snprintf(buf, sizeof(buf), "%" PRIu64, iter.second.get_int()); | |
266 | out.append(buf); | |
267 | break; | |
268 | } | |
269 | case Variant::kDouble: { | |
270 | char buf[32]; | |
271 | snprintf(buf, sizeof(buf), "%lf", iter.second.get_double()); | |
272 | out.append(buf); | |
273 | break; | |
274 | } | |
275 | case Variant::kString: | |
276 | out.append("\"" + iter.second.get_string() + "\""); | |
277 | break; | |
278 | default: | |
279 | assert(false); | |
280 | } | |
281 | } | |
282 | return out + "}"; | |
283 | } | |
284 | ||
285 | class ValueGetter { | |
286 | public: | |
287 | ValueGetter() {} | |
288 | virtual ~ValueGetter() {} | |
289 | ||
290 | virtual bool Get(uint64_t id) = 0; | |
291 | virtual const Slice value() const = 0; | |
292 | ||
293 | virtual Status status() const = 0; | |
294 | }; | |
295 | ||
296 | class ValueGetterFromDB : public ValueGetter { | |
297 | public: | |
298 | ValueGetterFromDB(DB* db, ColumnFamilyHandle* cf) : db_(db), cf_(cf) {} | |
299 | ||
300 | virtual bool Get(uint64_t id) override { | |
301 | std::string encoded_id; | |
302 | PutFixed64BigEndian(&encoded_id, id); | |
303 | status_ = db_->Get(ReadOptions(), cf_, encoded_id, &value_); | |
304 | if (status_.IsNotFound()) { | |
305 | status_ = Status::Corruption("Index inconsistency"); | |
306 | return false; | |
307 | } | |
308 | ||
309 | return true; | |
310 | } | |
311 | ||
312 | virtual const Slice value() const override { return value_; } | |
313 | ||
314 | virtual Status status() const override { return status_; } | |
315 | ||
316 | private: | |
317 | std::string value_; | |
318 | DB* db_; | |
319 | ColumnFamilyHandle* cf_; | |
320 | Status status_; | |
321 | }; | |
322 | ||
323 | class ValueGetterFromIterator : public ValueGetter { | |
324 | public: | |
325 | explicit ValueGetterFromIterator(Iterator* iterator) : iterator_(iterator) {} | |
326 | ||
327 | virtual bool Get(uint64_t id) override { | |
328 | std::string encoded_id; | |
329 | PutFixed64BigEndian(&encoded_id, id); | |
330 | iterator_->Seek(encoded_id); | |
331 | ||
332 | if (!iterator_->Valid() || iterator_->key() != Slice(encoded_id)) { | |
333 | status_ = Status::Corruption("Index inconsistency"); | |
334 | return false; | |
335 | } | |
336 | ||
337 | return true; | |
338 | } | |
339 | ||
340 | virtual const Slice value() const override { return iterator_->value(); } | |
341 | ||
342 | virtual Status status() const override { return status_; } | |
343 | ||
344 | private: | |
345 | std::unique_ptr<Iterator> iterator_; | |
346 | Status status_; | |
347 | }; | |
348 | ||
349 | class SpatialIndexCursor : public Cursor { | |
350 | public: | |
351 | // tile_box is inclusive | |
352 | SpatialIndexCursor(Iterator* spatial_iterator, ValueGetter* value_getter, | |
353 | const BoundingBox<uint64_t>& tile_bbox, uint32_t tile_bits) | |
354 | : value_getter_(value_getter), valid_(true) { | |
355 | // calculate quad keys we'll need to query | |
356 | std::vector<uint64_t> quad_keys; | |
11fdf7f2 TL |
357 | quad_keys.reserve(static_cast<size_t>((tile_bbox.max_x - tile_bbox.min_x + 1) * |
358 | (tile_bbox.max_y - tile_bbox.min_y + 1))); | |
7c673cae FG |
359 | for (uint64_t x = tile_bbox.min_x; x <= tile_bbox.max_x; ++x) { |
360 | for (uint64_t y = tile_bbox.min_y; y <= tile_bbox.max_y; ++y) { | |
361 | quad_keys.push_back(GetQuadKeyFromTile(x, y, tile_bits)); | |
362 | } | |
363 | } | |
364 | std::sort(quad_keys.begin(), quad_keys.end()); | |
365 | ||
366 | // load primary key ids for all quad keys | |
367 | for (auto quad_key : quad_keys) { | |
368 | std::string encoded_quad_key; | |
369 | PutFixed64BigEndian(&encoded_quad_key, quad_key); | |
370 | Slice slice_quad_key(encoded_quad_key); | |
371 | ||
372 | // If CheckQuadKey is true, there is no need to reseek, since | |
373 | // spatial_iterator is already pointing at the correct quad key. This is | |
374 | // an optimization. | |
375 | if (!CheckQuadKey(spatial_iterator, slice_quad_key)) { | |
376 | spatial_iterator->Seek(slice_quad_key); | |
377 | } | |
378 | ||
379 | while (CheckQuadKey(spatial_iterator, slice_quad_key)) { | |
380 | // extract ID from spatial_iterator | |
381 | uint64_t id; | |
382 | bool ok = GetFixed64BigEndian( | |
383 | Slice(spatial_iterator->key().data() + sizeof(uint64_t), | |
384 | sizeof(uint64_t)), | |
385 | &id); | |
386 | if (!ok) { | |
387 | valid_ = false; | |
388 | status_ = Status::Corruption("Spatial index corruption"); | |
389 | break; | |
390 | } | |
391 | primary_key_ids_.insert(id); | |
392 | spatial_iterator->Next(); | |
393 | } | |
394 | } | |
395 | ||
396 | if (!spatial_iterator->status().ok()) { | |
397 | status_ = spatial_iterator->status(); | |
398 | valid_ = false; | |
399 | } | |
400 | delete spatial_iterator; | |
401 | ||
402 | valid_ = valid_ && !primary_key_ids_.empty(); | |
403 | ||
404 | if (valid_) { | |
405 | primary_keys_iterator_ = primary_key_ids_.begin(); | |
406 | ExtractData(); | |
407 | } | |
408 | } | |
409 | ||
410 | virtual bool Valid() const override { return valid_; } | |
411 | ||
412 | virtual void Next() override { | |
413 | assert(valid_); | |
414 | ||
415 | ++primary_keys_iterator_; | |
416 | if (primary_keys_iterator_ == primary_key_ids_.end()) { | |
417 | valid_ = false; | |
418 | return; | |
419 | } | |
420 | ||
421 | ExtractData(); | |
422 | } | |
423 | ||
424 | virtual const Slice blob() override { return current_blob_; } | |
425 | virtual const FeatureSet& feature_set() override { | |
426 | return current_feature_set_; | |
427 | } | |
428 | ||
429 | virtual Status status() const override { | |
430 | if (!status_.ok()) { | |
431 | return status_; | |
432 | } | |
433 | return value_getter_->status(); | |
434 | } | |
435 | ||
436 | private: | |
437 | // * returns true if spatial iterator is on the current quad key and all is | |
438 | // well | |
439 | // * returns false if spatial iterator is not on current, or iterator is | |
440 | // invalid or corruption | |
441 | bool CheckQuadKey(Iterator* spatial_iterator, const Slice& quad_key) { | |
442 | if (!spatial_iterator->Valid()) { | |
443 | return false; | |
444 | } | |
445 | if (spatial_iterator->key().size() != 2 * sizeof(uint64_t)) { | |
446 | status_ = Status::Corruption("Invalid spatial index key"); | |
447 | valid_ = false; | |
448 | return false; | |
449 | } | |
450 | Slice spatial_iterator_quad_key(spatial_iterator->key().data(), | |
451 | sizeof(uint64_t)); | |
452 | if (spatial_iterator_quad_key != quad_key) { | |
453 | // caller needs to reseek | |
454 | return false; | |
455 | } | |
456 | // if we come to here, we have found the quad key | |
457 | return true; | |
458 | } | |
459 | ||
460 | void ExtractData() { | |
461 | assert(valid_); | |
462 | valid_ = value_getter_->Get(*primary_keys_iterator_); | |
463 | ||
464 | if (valid_) { | |
465 | Slice data = value_getter_->value(); | |
466 | current_feature_set_.Clear(); | |
467 | if (!GetLengthPrefixedSlice(&data, ¤t_blob_) || | |
468 | !current_feature_set_.Deserialize(data)) { | |
469 | status_ = Status::Corruption("Primary key column family corruption"); | |
470 | valid_ = false; | |
471 | } | |
472 | } | |
473 | ||
474 | } | |
475 | ||
476 | unique_ptr<ValueGetter> value_getter_; | |
477 | bool valid_; | |
478 | Status status_; | |
479 | ||
480 | FeatureSet current_feature_set_; | |
481 | Slice current_blob_; | |
482 | ||
483 | // This is loaded from spatial iterator. | |
484 | std::unordered_set<uint64_t> primary_key_ids_; | |
485 | std::unordered_set<uint64_t>::iterator primary_keys_iterator_; | |
486 | }; | |
487 | ||
488 | class ErrorCursor : public Cursor { | |
489 | public: | |
490 | explicit ErrorCursor(Status s) : s_(s) { assert(!s.ok()); } | |
491 | virtual Status status() const override { return s_; } | |
492 | virtual bool Valid() const override { return false; } | |
493 | virtual void Next() override { assert(false); } | |
494 | ||
495 | virtual const Slice blob() override { | |
496 | assert(false); | |
497 | return Slice(); | |
498 | } | |
499 | virtual const FeatureSet& feature_set() override { | |
500 | assert(false); | |
501 | // compiler complains otherwise | |
502 | return trash_; | |
503 | } | |
504 | ||
505 | private: | |
506 | Status s_; | |
507 | FeatureSet trash_; | |
508 | }; | |
509 | ||
510 | class SpatialDBImpl : public SpatialDB { | |
511 | public: | |
512 | // * db -- base DB that needs to be forwarded to StackableDB | |
513 | // * data_column_family -- column family used to store the data | |
514 | // * spatial_indexes -- a list of spatial indexes together with column | |
515 | // families that correspond to those spatial indexes | |
516 | // * next_id -- next ID in auto-incrementing ID. This is usually | |
517 | // `max_id_currenty_in_db + 1` | |
518 | SpatialDBImpl( | |
519 | DB* db, ColumnFamilyHandle* data_column_family, | |
520 | const std::vector<std::pair<SpatialIndexOptions, ColumnFamilyHandle*>>& | |
521 | spatial_indexes, | |
522 | uint64_t next_id, bool read_only) | |
523 | : SpatialDB(db), | |
524 | data_column_family_(data_column_family), | |
525 | next_id_(next_id), | |
526 | read_only_(read_only) { | |
527 | for (const auto& index : spatial_indexes) { | |
528 | name_to_index_.insert( | |
529 | {index.first.name, IndexColumnFamily(index.first, index.second)}); | |
530 | } | |
531 | } | |
532 | ||
533 | ~SpatialDBImpl() { | |
534 | for (auto& iter : name_to_index_) { | |
535 | delete iter.second.column_family; | |
536 | } | |
537 | delete data_column_family_; | |
538 | } | |
539 | ||
540 | virtual Status Insert( | |
541 | const WriteOptions& write_options, const BoundingBox<double>& bbox, | |
542 | const Slice& blob, const FeatureSet& feature_set, | |
543 | const std::vector<std::string>& spatial_indexes) override { | |
544 | WriteBatch batch; | |
545 | ||
546 | if (spatial_indexes.size() == 0) { | |
547 | return Status::InvalidArgument("Spatial indexes can't be empty"); | |
548 | } | |
549 | ||
550 | const size_t kWriteOutEveryBytes = 1024 * 1024; // 1MB | |
551 | uint64_t id = next_id_.fetch_add(1); | |
552 | ||
553 | for (const auto& si : spatial_indexes) { | |
554 | auto itr = name_to_index_.find(si); | |
555 | if (itr == name_to_index_.end()) { | |
556 | return Status::InvalidArgument("Can't find index " + si); | |
557 | } | |
558 | const auto& spatial_index = itr->second.index; | |
559 | if (!spatial_index.bbox.Intersects(bbox)) { | |
560 | continue; | |
561 | } | |
562 | BoundingBox<uint64_t> tile_bbox = GetTileBoundingBox(spatial_index, bbox); | |
563 | ||
564 | for (uint64_t x = tile_bbox.min_x; x <= tile_bbox.max_x; ++x) { | |
565 | for (uint64_t y = tile_bbox.min_y; y <= tile_bbox.max_y; ++y) { | |
566 | // see above for format | |
567 | std::string key; | |
568 | PutFixed64BigEndian( | |
569 | &key, GetQuadKeyFromTile(x, y, spatial_index.tile_bits)); | |
570 | PutFixed64BigEndian(&key, id); | |
571 | batch.Put(itr->second.column_family, key, Slice()); | |
572 | if (batch.GetDataSize() >= kWriteOutEveryBytes) { | |
573 | Status s = Write(write_options, &batch); | |
574 | batch.Clear(); | |
575 | if (!s.ok()) { | |
576 | return s; | |
577 | } | |
578 | } | |
579 | } | |
580 | } | |
581 | } | |
582 | ||
583 | // see above for format | |
584 | std::string data_key; | |
585 | PutFixed64BigEndian(&data_key, id); | |
586 | std::string data_value; | |
587 | PutLengthPrefixedSlice(&data_value, blob); | |
588 | feature_set.Serialize(&data_value); | |
589 | batch.Put(data_column_family_, data_key, data_value); | |
590 | ||
591 | return Write(write_options, &batch); | |
592 | } | |
593 | ||
594 | virtual Status Compact(int num_threads) override { | |
595 | std::vector<ColumnFamilyHandle*> column_families; | |
596 | column_families.push_back(data_column_family_); | |
597 | ||
598 | for (auto& iter : name_to_index_) { | |
599 | column_families.push_back(iter.second.column_family); | |
600 | } | |
601 | ||
602 | std::mutex state_mutex; | |
603 | std::condition_variable cv; | |
604 | Status s; | |
605 | int threads_running = 0; | |
606 | ||
607 | std::vector<port::Thread> threads; | |
608 | ||
609 | for (auto cfh : column_families) { | |
610 | threads.emplace_back([&, cfh] { | |
611 | { | |
612 | std::unique_lock<std::mutex> lk(state_mutex); | |
613 | cv.wait(lk, [&] { return threads_running < num_threads; }); | |
614 | threads_running++; | |
615 | } | |
616 | ||
617 | Status t = Flush(FlushOptions(), cfh); | |
618 | if (t.ok()) { | |
619 | t = CompactRange(CompactRangeOptions(), cfh, nullptr, nullptr); | |
620 | } | |
621 | ||
622 | { | |
623 | std::unique_lock<std::mutex> lk(state_mutex); | |
624 | threads_running--; | |
625 | if (s.ok() && !t.ok()) { | |
626 | s = t; | |
627 | } | |
628 | cv.notify_one(); | |
629 | } | |
630 | }); | |
631 | } | |
632 | ||
633 | for (auto& t : threads) { | |
634 | t.join(); | |
635 | } | |
636 | ||
637 | return s; | |
638 | } | |
639 | ||
640 | virtual Cursor* Query(const ReadOptions& read_options, | |
641 | const BoundingBox<double>& bbox, | |
642 | const std::string& spatial_index) override { | |
643 | auto itr = name_to_index_.find(spatial_index); | |
644 | if (itr == name_to_index_.end()) { | |
645 | return new ErrorCursor(Status::InvalidArgument( | |
646 | "Spatial index " + spatial_index + " not found")); | |
647 | } | |
648 | const auto& si = itr->second.index; | |
649 | Iterator* spatial_iterator; | |
650 | ValueGetter* value_getter; | |
651 | ||
652 | if (read_only_) { | |
653 | spatial_iterator = NewIterator(read_options, itr->second.column_family); | |
654 | value_getter = new ValueGetterFromDB(this, data_column_family_); | |
655 | } else { | |
656 | std::vector<Iterator*> iterators; | |
657 | Status s = NewIterators(read_options, | |
658 | {data_column_family_, itr->second.column_family}, | |
659 | &iterators); | |
660 | if (!s.ok()) { | |
661 | return new ErrorCursor(s); | |
662 | } | |
663 | ||
664 | spatial_iterator = iterators[1]; | |
665 | value_getter = new ValueGetterFromIterator(iterators[0]); | |
666 | } | |
667 | return new SpatialIndexCursor(spatial_iterator, value_getter, | |
668 | GetTileBoundingBox(si, bbox), si.tile_bits); | |
669 | } | |
670 | ||
671 | private: | |
672 | ColumnFamilyHandle* data_column_family_; | |
673 | struct IndexColumnFamily { | |
674 | SpatialIndexOptions index; | |
675 | ColumnFamilyHandle* column_family; | |
676 | IndexColumnFamily(const SpatialIndexOptions& _index, | |
677 | ColumnFamilyHandle* _cf) | |
678 | : index(_index), column_family(_cf) {} | |
679 | }; | |
680 | // constant after construction! | |
681 | std::unordered_map<std::string, IndexColumnFamily> name_to_index_; | |
682 | ||
683 | std::atomic<uint64_t> next_id_; | |
684 | bool read_only_; | |
685 | }; | |
686 | ||
687 | namespace { | |
688 | DBOptions GetDBOptionsFromSpatialDBOptions(const SpatialDBOptions& options) { | |
689 | DBOptions db_options; | |
690 | db_options.max_open_files = 50000; | |
691 | db_options.max_background_compactions = 3 * options.num_threads / 4; | |
692 | db_options.max_background_flushes = | |
693 | options.num_threads - db_options.max_background_compactions; | |
694 | db_options.env->SetBackgroundThreads(db_options.max_background_compactions, | |
695 | Env::LOW); | |
696 | db_options.env->SetBackgroundThreads(db_options.max_background_flushes, | |
697 | Env::HIGH); | |
698 | db_options.statistics = CreateDBStatistics(); | |
699 | if (options.bulk_load) { | |
700 | db_options.stats_dump_period_sec = 600; | |
701 | } else { | |
702 | db_options.stats_dump_period_sec = 1800; // 30min | |
703 | } | |
704 | return db_options; | |
705 | } | |
706 | ||
11fdf7f2 | 707 | ColumnFamilyOptions GetColumnFamilyOptions(const SpatialDBOptions& /*options*/, |
7c673cae FG |
708 | std::shared_ptr<Cache> block_cache) { |
709 | ColumnFamilyOptions column_family_options; | |
710 | column_family_options.write_buffer_size = 128 * 1024 * 1024; // 128MB | |
711 | column_family_options.max_write_buffer_number = 4; | |
712 | column_family_options.max_bytes_for_level_base = 256 * 1024 * 1024; // 256MB | |
713 | column_family_options.target_file_size_base = 64 * 1024 * 1024; // 64MB | |
714 | column_family_options.level0_file_num_compaction_trigger = 2; | |
715 | column_family_options.level0_slowdown_writes_trigger = 16; | |
716 | column_family_options.level0_stop_writes_trigger = 32; | |
717 | // only compress levels >= 2 | |
718 | column_family_options.compression_per_level.resize( | |
719 | column_family_options.num_levels); | |
720 | for (int i = 0; i < column_family_options.num_levels; ++i) { | |
721 | if (i < 2) { | |
722 | column_family_options.compression_per_level[i] = kNoCompression; | |
723 | } else { | |
724 | column_family_options.compression_per_level[i] = kLZ4Compression; | |
725 | } | |
726 | } | |
727 | BlockBasedTableOptions table_options; | |
728 | table_options.block_cache = block_cache; | |
729 | column_family_options.table_factory.reset( | |
730 | NewBlockBasedTableFactory(table_options)); | |
731 | return column_family_options; | |
732 | } | |
733 | ||
734 | ColumnFamilyOptions OptimizeOptionsForDataColumnFamily( | |
735 | ColumnFamilyOptions options, std::shared_ptr<Cache> block_cache) { | |
736 | options.prefix_extractor.reset(NewNoopTransform()); | |
737 | BlockBasedTableOptions block_based_options; | |
738 | block_based_options.index_type = BlockBasedTableOptions::kHashSearch; | |
739 | block_based_options.block_cache = block_cache; | |
740 | options.table_factory.reset(NewBlockBasedTableFactory(block_based_options)); | |
741 | return options; | |
742 | } | |
743 | ||
744 | } // namespace | |
745 | ||
746 | class MetadataStorage { | |
747 | public: | |
748 | MetadataStorage(DB* db, ColumnFamilyHandle* cf) : db_(db), cf_(cf) {} | |
749 | ~MetadataStorage() {} | |
750 | ||
751 | // format: <min_x double> <min_y double> <max_x double> <max_y double> | |
752 | // <tile_bits varint32> | |
753 | Status AddIndex(const SpatialIndexOptions& index) { | |
754 | std::string encoded_index; | |
755 | PutDouble(&encoded_index, index.bbox.min_x); | |
756 | PutDouble(&encoded_index, index.bbox.min_y); | |
757 | PutDouble(&encoded_index, index.bbox.max_x); | |
758 | PutDouble(&encoded_index, index.bbox.max_y); | |
759 | PutVarint32(&encoded_index, index.tile_bits); | |
760 | return db_->Put(WriteOptions(), cf_, | |
761 | GetSpatialIndexColumnFamilyName(index.name), encoded_index); | |
762 | } | |
763 | ||
764 | Status GetIndex(const std::string& name, SpatialIndexOptions* dst) { | |
765 | std::string value; | |
766 | Status s = db_->Get(ReadOptions(), cf_, | |
767 | GetSpatialIndexColumnFamilyName(name), &value); | |
768 | if (!s.ok()) { | |
769 | return s; | |
770 | } | |
771 | dst->name = name; | |
772 | Slice encoded_index(value); | |
773 | bool ok = GetDouble(&encoded_index, &(dst->bbox.min_x)); | |
774 | ok = ok && GetDouble(&encoded_index, &(dst->bbox.min_y)); | |
775 | ok = ok && GetDouble(&encoded_index, &(dst->bbox.max_x)); | |
776 | ok = ok && GetDouble(&encoded_index, &(dst->bbox.max_y)); | |
777 | ok = ok && GetVarint32(&encoded_index, &(dst->tile_bits)); | |
778 | return ok ? Status::OK() : Status::Corruption("Index encoding corrupted"); | |
779 | } | |
780 | ||
781 | private: | |
782 | DB* db_; | |
783 | ColumnFamilyHandle* cf_; | |
784 | }; | |
785 | ||
786 | Status SpatialDB::Create( | |
787 | const SpatialDBOptions& options, const std::string& name, | |
788 | const std::vector<SpatialIndexOptions>& spatial_indexes) { | |
789 | DBOptions db_options = GetDBOptionsFromSpatialDBOptions(options); | |
790 | db_options.create_if_missing = true; | |
791 | db_options.create_missing_column_families = true; | |
792 | db_options.error_if_exists = true; | |
793 | ||
11fdf7f2 | 794 | auto block_cache = NewLRUCache(static_cast<size_t>(options.cache_size)); |
7c673cae FG |
795 | ColumnFamilyOptions column_family_options = |
796 | GetColumnFamilyOptions(options, block_cache); | |
797 | ||
798 | std::vector<ColumnFamilyDescriptor> column_families; | |
799 | column_families.push_back(ColumnFamilyDescriptor( | |
800 | kDefaultColumnFamilyName, | |
801 | OptimizeOptionsForDataColumnFamily(column_family_options, block_cache))); | |
802 | column_families.push_back( | |
803 | ColumnFamilyDescriptor(kMetadataColumnFamilyName, column_family_options)); | |
804 | ||
805 | for (const auto& index : spatial_indexes) { | |
806 | column_families.emplace_back(GetSpatialIndexColumnFamilyName(index.name), | |
807 | column_family_options); | |
808 | } | |
809 | ||
810 | std::vector<ColumnFamilyHandle*> handles; | |
811 | DB* base_db; | |
812 | Status s = DB::Open(db_options, name, column_families, &handles, &base_db); | |
813 | if (!s.ok()) { | |
814 | return s; | |
815 | } | |
816 | MetadataStorage metadata(base_db, handles[1]); | |
817 | for (const auto& index : spatial_indexes) { | |
818 | s = metadata.AddIndex(index); | |
819 | if (!s.ok()) { | |
820 | break; | |
821 | } | |
822 | } | |
823 | ||
824 | for (auto h : handles) { | |
825 | delete h; | |
826 | } | |
827 | delete base_db; | |
828 | ||
829 | return s; | |
830 | } | |
831 | ||
832 | Status SpatialDB::Open(const SpatialDBOptions& options, const std::string& name, | |
833 | SpatialDB** db, bool read_only) { | |
834 | DBOptions db_options = GetDBOptionsFromSpatialDBOptions(options); | |
11fdf7f2 | 835 | auto block_cache = NewLRUCache(static_cast<size_t>(options.cache_size)); |
7c673cae FG |
836 | ColumnFamilyOptions column_family_options = |
837 | GetColumnFamilyOptions(options, block_cache); | |
838 | ||
839 | Status s; | |
840 | std::vector<std::string> existing_column_families; | |
841 | std::vector<std::string> spatial_indexes; | |
842 | s = DB::ListColumnFamilies(db_options, name, &existing_column_families); | |
843 | if (!s.ok()) { | |
844 | return s; | |
845 | } | |
846 | for (const auto& cf_name : existing_column_families) { | |
847 | Slice spatial_index; | |
848 | if (GetSpatialIndexName(cf_name, &spatial_index)) { | |
849 | spatial_indexes.emplace_back(spatial_index.data(), spatial_index.size()); | |
850 | } | |
851 | } | |
852 | ||
853 | std::vector<ColumnFamilyDescriptor> column_families; | |
854 | column_families.push_back(ColumnFamilyDescriptor( | |
855 | kDefaultColumnFamilyName, | |
856 | OptimizeOptionsForDataColumnFamily(column_family_options, block_cache))); | |
857 | column_families.push_back( | |
858 | ColumnFamilyDescriptor(kMetadataColumnFamilyName, column_family_options)); | |
859 | ||
860 | for (const auto& index : spatial_indexes) { | |
861 | column_families.emplace_back(GetSpatialIndexColumnFamilyName(index), | |
862 | column_family_options); | |
863 | } | |
864 | std::vector<ColumnFamilyHandle*> handles; | |
865 | DB* base_db; | |
866 | if (read_only) { | |
867 | s = DB::OpenForReadOnly(db_options, name, column_families, &handles, | |
868 | &base_db); | |
869 | } else { | |
870 | s = DB::Open(db_options, name, column_families, &handles, &base_db); | |
871 | } | |
872 | if (!s.ok()) { | |
873 | return s; | |
874 | } | |
875 | ||
876 | MetadataStorage metadata(base_db, handles[1]); | |
877 | ||
878 | std::vector<std::pair<SpatialIndexOptions, ColumnFamilyHandle*>> index_cf; | |
879 | assert(handles.size() == spatial_indexes.size() + 2); | |
880 | for (size_t i = 0; i < spatial_indexes.size(); ++i) { | |
881 | SpatialIndexOptions index_options; | |
882 | s = metadata.GetIndex(spatial_indexes[i], &index_options); | |
883 | if (!s.ok()) { | |
884 | break; | |
885 | } | |
886 | index_cf.emplace_back(index_options, handles[i + 2]); | |
887 | } | |
888 | uint64_t next_id = 1; | |
889 | if (s.ok()) { | |
890 | // find next_id | |
891 | Iterator* iter = base_db->NewIterator(ReadOptions(), handles[0]); | |
892 | iter->SeekToLast(); | |
893 | if (iter->Valid()) { | |
894 | uint64_t last_id = 0; | |
895 | if (!GetFixed64BigEndian(iter->key(), &last_id)) { | |
896 | s = Status::Corruption("Invalid key in data column family"); | |
897 | } else { | |
898 | next_id = last_id + 1; | |
899 | } | |
900 | } | |
901 | delete iter; | |
902 | } | |
903 | if (!s.ok()) { | |
904 | for (auto h : handles) { | |
905 | delete h; | |
906 | } | |
907 | delete base_db; | |
908 | return s; | |
909 | } | |
910 | ||
911 | // I don't need metadata column family any more, so delete it | |
912 | delete handles[1]; | |
913 | *db = new SpatialDBImpl(base_db, handles[0], index_cf, next_id, read_only); | |
914 | return Status::OK(); | |
915 | } | |
916 | ||
917 | } // namespace spatial | |
918 | } // namespace rocksdb | |
919 | #endif // ROCKSDB_LITE |