]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/cpp/src/parquet/encryption/two_level_cache_with_expiration.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / parquet / encryption / two_level_cache_with_expiration.h
diff --git a/ceph/src/arrow/cpp/src/parquet/encryption/two_level_cache_with_expiration.h b/ceph/src/arrow/cpp/src/parquet/encryption/two_level_cache_with_expiration.h
new file mode 100644 (file)
index 0000000..fbd06dc
--- /dev/null
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <chrono>
+#include <unordered_map>
+
+#include "arrow/util/concurrent_map.h"
+#include "arrow/util/mutex.h"
+
+namespace parquet {
+namespace encryption {
+
+using ::arrow::util::ConcurrentMap;
+
+namespace internal {
+
+using TimePoint =
+    std::chrono::time_point<std::chrono::system_clock, std::chrono::duration<double>>;
+
+inline TimePoint CurrentTimePoint() { return std::chrono::system_clock::now(); }
+
+template <typename E>
+class ExpiringCacheEntry {
+ public:
+  ExpiringCacheEntry() = default;
+
+  ExpiringCacheEntry(E cached_item, double expiration_interval_seconds)
+      : expiration_timestamp_(CurrentTimePoint() +
+                              std::chrono::duration<double>(expiration_interval_seconds)),
+        cached_item_(std::move(cached_item)) {}
+
+  bool IsExpired() const {
+    const auto now = CurrentTimePoint();
+    return (now > expiration_timestamp_);
+  }
+
+  E cached_item() { return cached_item_; }
+
+ private:
+  const TimePoint expiration_timestamp_;
+  E cached_item_;
+};
+
+// This class is to avoid the below warning when compiling KeyToolkit class with VS2015
+// warning C4503: decorated name length exceeded, name was truncated
+template <typename V>
+class ExpiringCacheMapEntry {
+ public:
+  ExpiringCacheMapEntry() = default;
+
+  explicit ExpiringCacheMapEntry(
+      std::shared_ptr<ConcurrentMap<std::string, V>> cached_item,
+      double expiration_interval_seconds)
+      : map_cache_(cached_item, expiration_interval_seconds) {}
+
+  bool IsExpired() { return map_cache_.IsExpired(); }
+
+  std::shared_ptr<ConcurrentMap<std::string, V>> cached_item() {
+    return map_cache_.cached_item();
+  }
+
+ private:
+  // ConcurrentMap object may be accessed and modified at many places at the same time,
+  // from multiple threads, or even removed from cache.
+  ExpiringCacheEntry<std::shared_ptr<ConcurrentMap<std::string, V>>> map_cache_;
+};
+
+}  // namespace internal
+
+// Two-level cache with expiration of internal caches according to token lifetime.
+// External cache is per token, internal is per string key.
+// Wrapper class around:
+//    std::unordered_map<std::string,
+//    internal::ExpiringCacheEntry<std::unordered_map<std::string, V>>>
+// This cache is safe to be shared between threads.
+template <typename V>
+class TwoLevelCacheWithExpiration {
+ public:
+  TwoLevelCacheWithExpiration() {
+    last_cache_cleanup_timestamp_ = internal::CurrentTimePoint();
+  }
+
+  std::shared_ptr<ConcurrentMap<std::string, V>> GetOrCreateInternalCache(
+      const std::string& access_token, double cache_entry_lifetime_seconds) {
+    auto lock = mutex_.Lock();
+
+    auto external_cache_entry = cache_.find(access_token);
+    if (external_cache_entry == cache_.end() ||
+        external_cache_entry->second.IsExpired()) {
+      cache_.insert({access_token, internal::ExpiringCacheMapEntry<V>(
+                                       std::shared_ptr<ConcurrentMap<std::string, V>>(
+                                           new ConcurrentMap<std::string, V>()),
+                                       cache_entry_lifetime_seconds)});
+    }
+
+    return cache_[access_token].cached_item();
+  }
+
+  void CheckCacheForExpiredTokens(double cache_cleanup_period_seconds) {
+    auto lock = mutex_.Lock();
+
+    const auto now = internal::CurrentTimePoint();
+    if (now > (last_cache_cleanup_timestamp_ +
+               std::chrono::duration<double>(cache_cleanup_period_seconds))) {
+      RemoveExpiredEntriesNoMutex();
+      last_cache_cleanup_timestamp_ =
+          now + std::chrono::duration<double>(cache_cleanup_period_seconds);
+    }
+  }
+
+  void RemoveExpiredEntriesFromCache() {
+    auto lock = mutex_.Lock();
+
+    RemoveExpiredEntriesNoMutex();
+  }
+
+  void Remove(const std::string& access_token) {
+    auto lock = mutex_.Lock();
+    cache_.erase(access_token);
+  }
+
+  void Clear() {
+    auto lock = mutex_.Lock();
+    cache_.clear();
+  }
+
+ private:
+  void RemoveExpiredEntriesNoMutex() {
+    for (auto it = cache_.begin(); it != cache_.end();) {
+      if (it->second.IsExpired()) {
+        it = cache_.erase(it);
+      } else {
+        ++it;
+      }
+    }
+  }
+  std::unordered_map<std::string, internal::ExpiringCacheMapEntry<V>> cache_;
+  internal::TimePoint last_cache_cleanup_timestamp_;
+  ::arrow::util::Mutex mutex_;
+};
+
+}  // namespace encryption
+}  // namespace parquet