]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/parquet/encryption/two_level_cache_with_expiration_test.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / parquet / encryption / two_level_cache_with_expiration_test.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include <chrono>
19 #include <thread>
20
21 #include <gtest/gtest.h>
22
23 #include "arrow/testing/gtest_util.h"
24 #include "arrow/util/concurrent_map.h"
25
26 #include "parquet/encryption/two_level_cache_with_expiration.h"
27
28 namespace parquet {
29 namespace encryption {
30 namespace test {
31
32 using ::arrow::SleepFor;
33
34 class TwoLevelCacheWithExpirationTest : public ::testing::Test {
35 public:
36 void SetUp() {
37 // lifetime is 0.2s
38 std::shared_ptr<ConcurrentMap<std::string, int>> lifetime1 =
39 cache_.GetOrCreateInternalCache("lifetime1", 0.2);
40 lifetime1->Insert("item1", 1);
41 lifetime1->Insert("item2", 2);
42
43 // lifetime is 0.5s
44 std::shared_ptr<ConcurrentMap<std::string, int>> lifetime2 =
45 cache_.GetOrCreateInternalCache("lifetime2", 0.5);
46 lifetime2->Insert("item21", 21);
47 lifetime2->Insert("item22", 22);
48 }
49
50 protected:
51 void TaskInsert(int thread_no) {
52 for (int i = 0; i < 20; i++) {
53 std::string token = (i % 2 == 0) ? "lifetime1" : "lifetime2";
54 double lifetime = (i % 2 == 0) ? 0.2 : 0.5;
55 auto internal_cache = cache_.GetOrCreateInternalCache(token, lifetime);
56 std::stringstream ss;
57 ss << "item_" << thread_no << "_" << i;
58 internal_cache->Insert(ss.str(), i);
59 SleepFor(0.005);
60 }
61 }
62
63 void TaskClean() {
64 for (int i = 0; i < 20; i++) {
65 cache_.Clear();
66 SleepFor(0.008);
67 }
68 }
69
70 TwoLevelCacheWithExpiration<int> cache_;
71 };
72
73 TEST_F(TwoLevelCacheWithExpirationTest, RemoveExpiration) {
74 auto lifetime1_before_expiration = cache_.GetOrCreateInternalCache("lifetime1", 1);
75 ASSERT_EQ(lifetime1_before_expiration->size(), 2);
76
77 // wait for 0.3s, we expect:
78 // lifetime1 will be expired
79 // lifetime2 will not be expired
80 SleepFor(0.3);
81 // now clear expired items from the cache
82 cache_.RemoveExpiredEntriesFromCache();
83
84 // lifetime1 (with 2 items) is expired and has been removed from the cache.
85 // Now the cache create a new object which has no item.
86 auto lifetime1 = cache_.GetOrCreateInternalCache("lifetime1", 1);
87 ASSERT_EQ(lifetime1->size(), 0);
88
89 // However, lifetime1_before_expiration can still access normally and independently
90 // from the one in cache
91 lifetime1_before_expiration->Insert("item3", 3);
92 ASSERT_EQ(lifetime1_before_expiration->size(), 3);
93 ASSERT_EQ(lifetime1->size(), 0);
94
95 // lifetime2 is not expired and still contains 2 items.
96 std::shared_ptr<ConcurrentMap<std::string, int>> lifetime2 =
97 cache_.GetOrCreateInternalCache("lifetime2", 3);
98 ASSERT_EQ(lifetime2->size(), 2);
99 }
100
101 TEST_F(TwoLevelCacheWithExpirationTest, CleanupPeriodOk) {
102 // wait for 0.3s, now:
103 // lifetime1 is expired
104 // lifetime2 isn't expired
105 SleepFor(0.3);
106
107 // cleanup_period is 0.2s, less than or equals lifetime of both items, so the expired
108 // items will be removed from cache.
109 cache_.CheckCacheForExpiredTokens(0.2);
110
111 // lifetime1 (with 2 items) is expired and has been removed from the cache.
112 // Now the cache create a new object which has no item.
113 auto lifetime1 = cache_.GetOrCreateInternalCache("lifetime1", 1);
114 ASSERT_EQ(lifetime1->size(), 0);
115
116 // lifetime2 is not expired and still contains 2 items.
117 auto lifetime2 = cache_.GetOrCreateInternalCache("lifetime2", 3);
118 ASSERT_EQ(lifetime2->size(), 2);
119 }
120
121 TEST_F(TwoLevelCacheWithExpirationTest, RemoveByToken) {
122 cache_.Remove("lifetime1");
123
124 // lifetime1 (with 2 items) has been removed from the cache.
125 // Now the cache create a new object which has no item.
126 auto lifetime1 = cache_.GetOrCreateInternalCache("lifetime1", 1);
127 ASSERT_EQ(lifetime1->size(), 0);
128
129 // lifetime2 is still contains 2 items.
130 auto lifetime2 = cache_.GetOrCreateInternalCache("lifetime2", 3);
131 ASSERT_EQ(lifetime2->size(), 2);
132
133 cache_.Remove("lifetime2");
134 auto lifetime2_after_removed = cache_.GetOrCreateInternalCache("lifetime2", 3);
135 ASSERT_EQ(lifetime2_after_removed->size(), 0);
136 }
137
138 TEST_F(TwoLevelCacheWithExpirationTest, RemoveAllTokens) {
139 cache_.Clear();
140
141 // All tokens has been removed from the cache.
142 // Now the cache create a new object which has no item.
143 auto lifetime1 = cache_.GetOrCreateInternalCache("lifetime1", 1);
144 ASSERT_EQ(lifetime1->size(), 0);
145
146 auto lifetime2 = cache_.GetOrCreateInternalCache("lifetime2", 3);
147 ASSERT_EQ(lifetime2->size(), 0);
148 }
149
150 TEST_F(TwoLevelCacheWithExpirationTest, Clear) {
151 cache_.Clear();
152
153 // All tokens has been removed from the cache.
154 // Now the cache create a new object which has no item.
155 auto lifetime1 = cache_.GetOrCreateInternalCache("lifetime1", 1);
156 ASSERT_EQ(lifetime1->size(), 0);
157
158 auto lifetime2 = cache_.GetOrCreateInternalCache("lifetime2", 3);
159 ASSERT_EQ(lifetime2->size(), 0);
160 }
161
162 TEST_F(TwoLevelCacheWithExpirationTest, MultiThread) {
163 std::vector<std::thread> insert_threads;
164 for (int i = 0; i < 10; i++) {
165 insert_threads.emplace_back([this, i]() { this->TaskInsert(i); });
166 }
167 std::thread clean_thread([this]() { this->TaskClean(); });
168
169 for (auto& th : insert_threads) {
170 th.join();
171 }
172 clean_thread.join();
173 }
174
175 } // namespace test
176 } // namespace encryption
177 } // namespace parquet