]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "ObjectCacheStore.h" | |
5 | #include "Utils.h" | |
f67539c2 TL |
6 | #if __has_include(<filesystem>) |
7 | #include <filesystem> | |
8 | namespace fs = std::filesystem; | |
9 | #else | |
9f95a23c | 10 | #include <experimental/filesystem> |
f67539c2 TL |
11 | namespace fs = std::experimental::filesystem; |
12 | #endif | |
9f95a23c TL |
13 | |
14 | #define dout_context g_ceph_context | |
15 | #define dout_subsys ceph_subsys_immutable_obj_cache | |
16 | #undef dout_prefix | |
17 | #define dout_prefix *_dout << "ceph::cache::ObjectCacheStore: " << this << " " \ | |
18 | << __func__ << ": " | |
19 | ||
9f95a23c TL |
20 | |
21 | namespace ceph { | |
22 | namespace immutable_obj_cache { | |
23 | ||
f67539c2 TL |
24 | namespace { |
25 | ||
26 | class SafeTimerSingleton : public SafeTimer { | |
27 | public: | |
28 | ceph::mutex lock = ceph::make_mutex | |
29 | ("ceph::immutable_object_cache::SafeTimerSingleton::lock"); | |
30 | ||
31 | explicit SafeTimerSingleton(CephContext *cct) | |
32 | : SafeTimer(cct, lock, true) { | |
33 | init(); | |
34 | } | |
35 | ~SafeTimerSingleton() { | |
36 | std::lock_guard locker{lock}; | |
37 | shutdown(); | |
38 | } | |
39 | }; | |
40 | ||
41 | } // anonymous namespace | |
42 | ||
43 | enum ThrottleTargetCode { | |
44 | ROC_QOS_IOPS_THROTTLE = 1, | |
45 | ROC_QOS_BPS_THROTTLE = 2 | |
46 | }; | |
47 | ||
9f95a23c TL |
48 | ObjectCacheStore::ObjectCacheStore(CephContext *cct) |
49 | : m_cct(cct), m_rados(new librados::Rados()) { | |
50 | ||
51 | m_cache_root_dir = | |
52 | m_cct->_conf.get_val<std::string>("immutable_object_cache_path"); | |
53 | ||
54 | if (m_cache_root_dir.back() != '/') { | |
55 | m_cache_root_dir += "/"; | |
56 | } | |
57 | ||
58 | uint64_t cache_max_size = | |
59 | m_cct->_conf.get_val<Option::size_t>("immutable_object_cache_max_size"); | |
60 | ||
61 | double cache_watermark = | |
62 | m_cct->_conf.get_val<double>("immutable_object_cache_watermark"); | |
63 | ||
64 | uint64_t max_inflight_ops = | |
65 | m_cct->_conf.get_val<uint64_t>("immutable_object_cache_max_inflight_ops"); | |
66 | ||
f67539c2 TL |
67 | uint64_t limit = 0; |
68 | if ((limit = m_cct->_conf.get_val<uint64_t> | |
69 | ("immutable_object_cache_qos_iops_limit")) != 0) { | |
70 | apply_qos_tick_and_limit(ROC_QOS_IOPS_THROTTLE, | |
71 | m_cct->_conf.get_val<std::chrono::milliseconds> | |
72 | ("immutable_object_cache_qos_schedule_tick_min"), | |
73 | limit, | |
74 | m_cct->_conf.get_val<uint64_t> | |
75 | ("immutable_object_cache_qos_iops_burst"), | |
76 | m_cct->_conf.get_val<std::chrono::seconds> | |
77 | ("immutable_object_cache_qos_iops_burst_seconds")); | |
78 | } | |
79 | if ((limit = m_cct->_conf.get_val<uint64_t> | |
80 | ("immutable_object_cache_qos_bps_limit")) != 0) { | |
81 | apply_qos_tick_and_limit(ROC_QOS_BPS_THROTTLE, | |
82 | m_cct->_conf.get_val<std::chrono::milliseconds> | |
83 | ("immutable_object_cache_qos_schedule_tick_min"), | |
84 | limit, | |
85 | m_cct->_conf.get_val<uint64_t> | |
86 | ("immutable_object_cache_qos_bps_burst"), | |
87 | m_cct->_conf.get_val<std::chrono::seconds> | |
88 | ("immutable_object_cache_qos_bps_burst_seconds")); | |
89 | } | |
90 | ||
91 | if ((cache_watermark <= 0) || (cache_watermark > 1)) { | |
92 | lderr(m_cct) << "Invalid water mark provided, set it to default." << dendl; | |
93 | cache_watermark = 0.9; | |
94 | } | |
9f95a23c TL |
95 | m_policy = new SimplePolicy(m_cct, cache_max_size, max_inflight_ops, |
96 | cache_watermark); | |
97 | } | |
98 | ||
99 | ObjectCacheStore::~ObjectCacheStore() { | |
100 | delete m_policy; | |
f67539c2 TL |
101 | if (m_qos_enabled_flag & ROC_QOS_IOPS_THROTTLE) { |
102 | ceph_assert(m_throttles[ROC_QOS_IOPS_THROTTLE] != nullptr); | |
103 | delete m_throttles[ROC_QOS_IOPS_THROTTLE]; | |
104 | } | |
105 | if (m_qos_enabled_flag & ROC_QOS_BPS_THROTTLE) { | |
106 | ceph_assert(m_throttles[ROC_QOS_BPS_THROTTLE] != nullptr); | |
107 | delete m_throttles[ROC_QOS_BPS_THROTTLE]; | |
108 | } | |
9f95a23c TL |
109 | } |
110 | ||
111 | int ObjectCacheStore::init(bool reset) { | |
112 | ldout(m_cct, 20) << dendl; | |
113 | ||
114 | int ret = m_rados->init_with_context(m_cct); | |
115 | if (ret < 0) { | |
116 | lderr(m_cct) << "fail to init Ceph context" << dendl; | |
117 | return ret; | |
118 | } | |
119 | ||
120 | ret = m_rados->connect(); | |
121 | if (ret < 0) { | |
122 | lderr(m_cct) << "fail to connect to cluster" << dendl; | |
123 | return ret; | |
124 | } | |
125 | ||
126 | // TODO(dehao): fsck and reuse existing cache objects | |
127 | if (reset) { | |
f6b5b4d7 | 128 | try { |
f67539c2 | 129 | if (fs::exists(m_cache_root_dir)) { |
f6b5b4d7 | 130 | // remove all sub folders |
f67539c2 TL |
131 | for (auto& p : fs::directory_iterator(m_cache_root_dir)) { |
132 | fs::remove_all(p.path()); | |
f6b5b4d7 TL |
133 | } |
134 | } else { | |
f67539c2 | 135 | fs::create_directories(m_cache_root_dir); |
9f95a23c | 136 | } |
f67539c2 | 137 | } catch (const fs::filesystem_error& e) { |
f6b5b4d7 TL |
138 | lderr(m_cct) << "failed to initialize cache store directory: " |
139 | << e.what() << dendl; | |
140 | return -e.code().value(); | |
9f95a23c TL |
141 | } |
142 | } | |
143 | return 0; | |
144 | } | |
145 | ||
146 | int ObjectCacheStore::shutdown() { | |
147 | ldout(m_cct, 20) << dendl; | |
148 | ||
149 | m_rados->shutdown(); | |
150 | return 0; | |
151 | } | |
152 | ||
153 | int ObjectCacheStore::init_cache() { | |
154 | ldout(m_cct, 20) << dendl; | |
155 | std::string cache_dir = m_cache_root_dir; | |
156 | ||
157 | return 0; | |
158 | } | |
159 | ||
f67539c2 TL |
160 | int ObjectCacheStore::do_promote(std::string pool_nspace, uint64_t pool_id, |
161 | uint64_t snap_id, std::string object_name) { | |
9f95a23c TL |
162 | ldout(m_cct, 20) << "to promote object: " << object_name |
163 | << " from pool id: " << pool_id | |
164 | << " namespace: " << pool_nspace | |
165 | << " snapshot: " << snap_id << dendl; | |
166 | ||
167 | int ret = 0; | |
f67539c2 TL |
168 | std::string cache_file_name = |
169 | get_cache_file_name(pool_nspace, pool_id, snap_id, object_name); | |
9f95a23c TL |
170 | librados::IoCtx ioctx; |
171 | { | |
172 | std::lock_guard _locker{m_ioctx_map_lock}; | |
173 | if (m_ioctx_map.find(pool_id) == m_ioctx_map.end()) { | |
174 | ret = m_rados->ioctx_create2(pool_id, ioctx); | |
175 | if (ret < 0) { | |
176 | lderr(m_cct) << "fail to create ioctx" << dendl; | |
177 | return ret; | |
178 | } | |
179 | m_ioctx_map.emplace(pool_id, ioctx); | |
180 | } else { | |
181 | ioctx = m_ioctx_map[pool_id]; | |
182 | } | |
183 | } | |
184 | ||
185 | ioctx.set_namespace(pool_nspace); | |
186 | ioctx.snap_set_read(snap_id); | |
187 | ||
188 | librados::bufferlist* read_buf = new librados::bufferlist(); | |
189 | ||
190 | auto ctx = new LambdaContext([this, read_buf, cache_file_name](int ret) { | |
191 | handle_promote_callback(ret, read_buf, cache_file_name); | |
192 | }); | |
193 | ||
194 | return promote_object(&ioctx, object_name, read_buf, ctx); | |
195 | } | |
196 | ||
197 | int ObjectCacheStore::handle_promote_callback(int ret, bufferlist* read_buf, | |
198 | std::string cache_file_name) { | |
199 | ldout(m_cct, 20) << " cache_file_name: " << cache_file_name << dendl; | |
200 | ||
201 | // rados read error | |
202 | if (ret != -ENOENT && ret < 0) { | |
203 | lderr(m_cct) << "fail to read from rados" << dendl; | |
204 | ||
205 | m_policy->update_status(cache_file_name, OBJ_CACHE_NONE); | |
206 | delete read_buf; | |
207 | return ret; | |
208 | } | |
209 | ||
f91f0fd5 | 210 | auto state = OBJ_CACHE_PROMOTED; |
9f95a23c TL |
211 | if (ret == -ENOENT) { |
212 | // object is empty | |
f91f0fd5 | 213 | state = OBJ_CACHE_DNE; |
9f95a23c | 214 | ret = 0; |
f91f0fd5 TL |
215 | } else { |
216 | std::string cache_file_path = get_cache_file_path(cache_file_name, true); | |
217 | if (cache_file_path == "") { | |
218 | lderr(m_cct) << "fail to write cache file" << dendl; | |
219 | m_policy->update_status(cache_file_name, OBJ_CACHE_NONE); | |
220 | delete read_buf; | |
221 | return -ENOSPC; | |
222 | } | |
9f95a23c | 223 | |
f91f0fd5 TL |
224 | ret = read_buf->write_file(cache_file_path.c_str()); |
225 | if (ret < 0) { | |
226 | lderr(m_cct) << "fail to write cache file" << dendl; | |
9f95a23c | 227 | |
f91f0fd5 TL |
228 | m_policy->update_status(cache_file_name, OBJ_CACHE_NONE); |
229 | delete read_buf; | |
230 | return ret; | |
231 | } | |
9f95a23c TL |
232 | } |
233 | ||
234 | // update metadata | |
235 | ceph_assert(OBJ_CACHE_SKIP == m_policy->get_status(cache_file_name)); | |
f91f0fd5 TL |
236 | m_policy->update_status(cache_file_name, state, read_buf->length()); |
237 | ceph_assert(state == m_policy->get_status(cache_file_name)); | |
9f95a23c TL |
238 | |
239 | delete read_buf; | |
240 | ||
241 | evict_objects(); | |
242 | ||
243 | return ret; | |
244 | } | |
245 | ||
f67539c2 TL |
246 | int ObjectCacheStore::lookup_object(std::string pool_nspace, uint64_t pool_id, |
247 | uint64_t snap_id, uint64_t object_size, | |
9f95a23c | 248 | std::string object_name, |
f91f0fd5 | 249 | bool return_dne_path, |
9f95a23c TL |
250 | std::string& target_cache_file_path) { |
251 | ldout(m_cct, 20) << "object name = " << object_name | |
252 | << " in pool ID : " << pool_id << dendl; | |
253 | ||
254 | int pret = -1; | |
f67539c2 TL |
255 | std::string cache_file_name = |
256 | get_cache_file_name(pool_nspace, pool_id, snap_id, object_name); | |
9f95a23c TL |
257 | |
258 | cache_status_t ret = m_policy->lookup_object(cache_file_name); | |
259 | ||
260 | switch (ret) { | |
261 | case OBJ_CACHE_NONE: { | |
f67539c2 TL |
262 | if (take_token_from_throttle(object_size, 1)) { |
263 | pret = do_promote(pool_nspace, pool_id, snap_id, object_name); | |
264 | if (pret < 0) { | |
265 | lderr(m_cct) << "fail to start promote" << dendl; | |
266 | } | |
267 | } else { | |
268 | m_policy->update_status(cache_file_name, OBJ_CACHE_NONE); | |
9f95a23c TL |
269 | } |
270 | return ret; | |
271 | } | |
272 | case OBJ_CACHE_PROMOTED: | |
273 | target_cache_file_path = get_cache_file_path(cache_file_name); | |
274 | return ret; | |
f91f0fd5 TL |
275 | case OBJ_CACHE_DNE: |
276 | if (return_dne_path) { | |
277 | target_cache_file_path = get_cache_file_path(cache_file_name); | |
278 | } | |
279 | return ret; | |
9f95a23c TL |
280 | case OBJ_CACHE_SKIP: |
281 | return ret; | |
282 | default: | |
283 | lderr(m_cct) << "unrecognized object cache status" << dendl; | |
284 | ceph_assert(0); | |
285 | } | |
286 | } | |
287 | ||
288 | int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, | |
289 | std::string object_name, | |
290 | librados::bufferlist* read_buf, | |
291 | Context* on_finish) { | |
292 | ldout(m_cct, 20) << "object name = " << object_name << dendl; | |
293 | ||
294 | librados::AioCompletion* read_completion = create_rados_callback(on_finish); | |
295 | // issue a zero-sized read req to get the entire obj | |
296 | int ret = ioctx->aio_read(object_name, read_completion, read_buf, 0, 0); | |
297 | if (ret < 0) { | |
298 | lderr(m_cct) << "failed to read from rados" << dendl; | |
299 | } | |
300 | read_completion->release(); | |
301 | ||
302 | return ret; | |
303 | } | |
304 | ||
305 | int ObjectCacheStore::evict_objects() { | |
306 | ldout(m_cct, 20) << dendl; | |
307 | ||
308 | std::list<std::string> obj_list; | |
309 | m_policy->get_evict_list(&obj_list); | |
310 | for (auto& obj : obj_list) { | |
311 | do_evict(obj); | |
312 | } | |
313 | return 0; | |
314 | } | |
315 | ||
316 | int ObjectCacheStore::do_evict(std::string cache_file) { | |
317 | ldout(m_cct, 20) << "file = " << cache_file << dendl; | |
318 | ||
319 | if (cache_file == "") { | |
320 | return 0; | |
321 | } | |
322 | ||
323 | std::string cache_file_path = get_cache_file_path(cache_file); | |
324 | ||
325 | ldout(m_cct, 20) << "evict cache: " << cache_file_path << dendl; | |
326 | ||
327 | // TODO(dehao): possible race on read? | |
328 | int ret = std::remove(cache_file_path.c_str()); | |
329 | // evict metadata | |
330 | if (ret == 0) { | |
331 | m_policy->update_status(cache_file, OBJ_CACHE_SKIP); | |
332 | m_policy->evict_entry(cache_file); | |
333 | } | |
334 | ||
335 | return ret; | |
336 | } | |
337 | ||
338 | std::string ObjectCacheStore::get_cache_file_name(std::string pool_nspace, | |
339 | uint64_t pool_id, | |
340 | uint64_t snap_id, | |
341 | std::string oid) { | |
342 | return pool_nspace + ":" + std::to_string(pool_id) + ":" + | |
343 | std::to_string(snap_id) + ":" + oid; | |
344 | } | |
345 | ||
346 | std::string ObjectCacheStore::get_cache_file_path(std::string cache_file_name, | |
347 | bool mkdir) { | |
348 | ldout(m_cct, 20) << cache_file_name <<dendl; | |
349 | ||
350 | uint32_t crc = 0; | |
351 | crc = ceph_crc32c(0, (unsigned char *)cache_file_name.c_str(), | |
352 | cache_file_name.length()); | |
353 | ||
354 | std::string cache_file_dir = std::to_string(crc % 100) + "/"; | |
355 | ||
356 | if (mkdir) { | |
357 | ldout(m_cct, 20) << "creating cache dir: " << cache_file_dir <<dendl; | |
358 | std::error_code ec; | |
359 | std::string new_dir = m_cache_root_dir + cache_file_dir; | |
f67539c2 | 360 | if (fs::exists(new_dir, ec)) { |
9f95a23c TL |
361 | ldout(m_cct, 20) << "cache dir exists: " << cache_file_dir <<dendl; |
362 | return new_dir + cache_file_name; | |
363 | } | |
364 | ||
f67539c2 | 365 | if (!fs::create_directories(new_dir, ec)) { |
9f95a23c TL |
366 | ldout(m_cct, 5) << "fail to create cache dir: " << new_dir |
367 | << "error: " << ec.message() << dendl; | |
368 | return ""; | |
369 | } | |
370 | } | |
371 | ||
372 | return m_cache_root_dir + cache_file_dir + cache_file_name; | |
373 | } | |
374 | ||
f67539c2 TL |
375 | void ObjectCacheStore::handle_throttle_ready(uint64_t tokens, uint64_t type) { |
376 | m_io_throttled = false; | |
377 | std::lock_guard lock(m_throttle_lock); | |
378 | if (type & ROC_QOS_IOPS_THROTTLE){ | |
379 | m_iops_tokens += tokens; | |
380 | } else if (type & ROC_QOS_BPS_THROTTLE){ | |
381 | m_bps_tokens += tokens; | |
382 | } else { | |
383 | lderr(m_cct) << "unknow throttle type." << dendl; | |
384 | } | |
385 | } | |
386 | ||
387 | bool ObjectCacheStore::take_token_from_throttle(uint64_t object_size, | |
388 | uint64_t object_num) { | |
389 | if (m_io_throttled == true) { | |
390 | return false; | |
391 | } | |
392 | ||
393 | int flag = 0; | |
394 | bool wait = false; | |
395 | if (!wait && (m_qos_enabled_flag & ROC_QOS_IOPS_THROTTLE)) { | |
396 | std::lock_guard lock(m_throttle_lock); | |
397 | if (object_num > m_iops_tokens) { | |
398 | wait = m_throttles[ROC_QOS_IOPS_THROTTLE]->get(object_num, this, | |
399 | &ObjectCacheStore::handle_throttle_ready, object_num, | |
400 | ROC_QOS_IOPS_THROTTLE); | |
401 | } else { | |
402 | m_iops_tokens -= object_num; | |
403 | flag = 1; | |
404 | } | |
405 | } | |
406 | if (!wait && (m_qos_enabled_flag & ROC_QOS_BPS_THROTTLE)) { | |
407 | std::lock_guard lock(m_throttle_lock); | |
408 | if (object_size > m_bps_tokens) { | |
409 | wait = m_throttles[ROC_QOS_BPS_THROTTLE]->get(object_size, this, | |
410 | &ObjectCacheStore::handle_throttle_ready, object_size, | |
411 | ROC_QOS_BPS_THROTTLE); | |
412 | } else { | |
413 | m_bps_tokens -= object_size; | |
414 | } | |
415 | } | |
416 | ||
417 | if (wait) { | |
418 | m_io_throttled = true; | |
419 | // when passing iops throttle, but limit in bps throttle, recovery | |
420 | if (flag == 1) { | |
421 | std::lock_guard lock(m_throttle_lock); | |
422 | m_iops_tokens += object_num; | |
423 | } | |
424 | } | |
425 | ||
426 | return !wait; | |
427 | } | |
428 | ||
429 | static const std::map<uint64_t, std::string> THROTTLE_FLAGS = { | |
430 | { ROC_QOS_IOPS_THROTTLE, "roc_qos_iops_throttle" }, | |
431 | { ROC_QOS_BPS_THROTTLE, "roc_qos_bps_throttle" } | |
432 | }; | |
433 | ||
434 | void ObjectCacheStore::apply_qos_tick_and_limit( | |
435 | const uint64_t flag, | |
436 | std::chrono::milliseconds min_tick, | |
437 | uint64_t limit, | |
438 | uint64_t burst, | |
439 | std::chrono::seconds burst_seconds) { | |
440 | SafeTimerSingleton* safe_timer_singleton = nullptr; | |
441 | TokenBucketThrottle* throttle = nullptr; | |
442 | safe_timer_singleton = | |
443 | &m_cct->lookup_or_create_singleton_object<SafeTimerSingleton>( | |
444 | "tools::immutable_object_cache", false, m_cct); | |
445 | SafeTimer* timer = safe_timer_singleton; | |
446 | ceph::mutex* timer_lock = &safe_timer_singleton->lock; | |
447 | m_qos_enabled_flag |= flag; | |
448 | auto throttle_flags_it = THROTTLE_FLAGS.find(flag); | |
449 | ceph_assert(throttle_flags_it != THROTTLE_FLAGS.end()); | |
450 | throttle = new TokenBucketThrottle(m_cct, throttle_flags_it->second, | |
451 | 0, 0, timer, timer_lock); | |
452 | throttle->set_schedule_tick_min(min_tick.count()); | |
453 | int ret = throttle->set_limit(limit, burst, burst_seconds.count()); | |
454 | if (ret < 0) { | |
455 | lderr(m_cct) << throttle->get_name() << ": invalid qos parameter: " | |
456 | << "burst(" << burst << ") is less than " | |
457 | << "limit(" << limit << ")" << dendl; | |
458 | throttle->set_limit(limit, 0, 1); | |
459 | } | |
460 | ||
461 | ceph_assert(m_throttles.find(flag) == m_throttles.end()); | |
462 | m_throttles.insert({flag, throttle}); | |
463 | } | |
464 | ||
9f95a23c TL |
465 | } // namespace immutable_obj_cache |
466 | } // namespace ceph |