1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "ObjectCacheStore.h"
6 #if __has_include(<filesystem>)
8 namespace fs
= std::filesystem
;
10 #include <experimental/filesystem>
11 namespace fs
= std::experimental::filesystem
;
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_immutable_obj_cache
17 #define dout_prefix *_dout << "ceph::cache::ObjectCacheStore: " << this << " " \
22 namespace immutable_obj_cache
{
26 class SafeTimerSingleton
: public CommonSafeTimer
<ceph::mutex
> {
28 ceph::mutex lock
= ceph::make_mutex
29 ("ceph::immutable_object_cache::SafeTimerSingleton::lock");
31 explicit SafeTimerSingleton(CephContext
*cct
)
32 : CommonSafeTimer(cct
, lock
, true) {
35 ~SafeTimerSingleton() {
36 std::lock_guard locker
{lock
};
41 } // anonymous namespace
43 enum ThrottleTargetCode
{
44 ROC_QOS_IOPS_THROTTLE
= 1,
45 ROC_QOS_BPS_THROTTLE
= 2
48 ObjectCacheStore::ObjectCacheStore(CephContext
*cct
)
49 : m_cct(cct
), m_rados(new librados::Rados()) {
52 m_cct
->_conf
.get_val
<std::string
>("immutable_object_cache_path");
54 if (m_cache_root_dir
.back() != '/') {
55 m_cache_root_dir
+= "/";
58 uint64_t cache_max_size
=
59 m_cct
->_conf
.get_val
<Option::size_t>("immutable_object_cache_max_size");
61 double cache_watermark
=
62 m_cct
->_conf
.get_val
<double>("immutable_object_cache_watermark");
64 uint64_t max_inflight_ops
=
65 m_cct
->_conf
.get_val
<uint64_t>("immutable_object_cache_max_inflight_ops");
68 if ((limit
= m_cct
->_conf
.get_val
<uint64_t>
69 ("immutable_object_cache_qos_iops_limit")) != 0) {
70 apply_qos_tick_and_limit(ROC_QOS_IOPS_THROTTLE
,
71 m_cct
->_conf
.get_val
<std::chrono::milliseconds
>
72 ("immutable_object_cache_qos_schedule_tick_min"),
74 m_cct
->_conf
.get_val
<uint64_t>
75 ("immutable_object_cache_qos_iops_burst"),
76 m_cct
->_conf
.get_val
<std::chrono::seconds
>
77 ("immutable_object_cache_qos_iops_burst_seconds"));
79 if ((limit
= m_cct
->_conf
.get_val
<uint64_t>
80 ("immutable_object_cache_qos_bps_limit")) != 0) {
81 apply_qos_tick_and_limit(ROC_QOS_BPS_THROTTLE
,
82 m_cct
->_conf
.get_val
<std::chrono::milliseconds
>
83 ("immutable_object_cache_qos_schedule_tick_min"),
85 m_cct
->_conf
.get_val
<uint64_t>
86 ("immutable_object_cache_qos_bps_burst"),
87 m_cct
->_conf
.get_val
<std::chrono::seconds
>
88 ("immutable_object_cache_qos_bps_burst_seconds"));
91 if ((cache_watermark
<= 0) || (cache_watermark
> 1)) {
92 lderr(m_cct
) << "Invalid water mark provided, set it to default." << dendl
;
93 cache_watermark
= 0.9;
95 m_policy
= new SimplePolicy(m_cct
, cache_max_size
, max_inflight_ops
,
99 ObjectCacheStore::~ObjectCacheStore() {
101 if (m_qos_enabled_flag
& ROC_QOS_IOPS_THROTTLE
) {
102 ceph_assert(m_throttles
[ROC_QOS_IOPS_THROTTLE
] != nullptr);
103 delete m_throttles
[ROC_QOS_IOPS_THROTTLE
];
105 if (m_qos_enabled_flag
& ROC_QOS_BPS_THROTTLE
) {
106 ceph_assert(m_throttles
[ROC_QOS_BPS_THROTTLE
] != nullptr);
107 delete m_throttles
[ROC_QOS_BPS_THROTTLE
];
111 int ObjectCacheStore::init(bool reset
) {
112 ldout(m_cct
, 20) << dendl
;
114 int ret
= m_rados
->init_with_context(m_cct
);
116 lderr(m_cct
) << "fail to init Ceph context" << dendl
;
120 ret
= m_rados
->connect();
122 lderr(m_cct
) << "fail to connect to cluster" << dendl
;
126 // TODO(dehao): fsck and reuse existing cache objects
129 if (fs::exists(m_cache_root_dir
)) {
130 // remove all sub folders
131 for (auto& p
: fs::directory_iterator(m_cache_root_dir
)) {
132 fs::remove_all(p
.path());
135 fs::create_directories(m_cache_root_dir
);
137 } catch (const fs::filesystem_error
& e
) {
138 lderr(m_cct
) << "failed to initialize cache store directory: "
139 << e
.what() << dendl
;
140 return -e
.code().value();
146 int ObjectCacheStore::shutdown() {
147 ldout(m_cct
, 20) << dendl
;
153 int ObjectCacheStore::init_cache() {
154 ldout(m_cct
, 20) << dendl
;
155 std::string cache_dir
= m_cache_root_dir
;
160 int ObjectCacheStore::do_promote(std::string pool_nspace
, uint64_t pool_id
,
161 uint64_t snap_id
, std::string object_name
) {
162 ldout(m_cct
, 20) << "to promote object: " << object_name
163 << " from pool id: " << pool_id
164 << " namespace: " << pool_nspace
165 << " snapshot: " << snap_id
<< dendl
;
168 std::string cache_file_name
=
169 get_cache_file_name(pool_nspace
, pool_id
, snap_id
, object_name
);
170 librados::IoCtx ioctx
;
172 std::lock_guard _locker
{m_ioctx_map_lock
};
173 if (m_ioctx_map
.find(pool_id
) == m_ioctx_map
.end()) {
174 ret
= m_rados
->ioctx_create2(pool_id
, ioctx
);
176 lderr(m_cct
) << "fail to create ioctx" << dendl
;
179 m_ioctx_map
.emplace(pool_id
, ioctx
);
181 ioctx
= m_ioctx_map
[pool_id
];
185 ioctx
.set_namespace(pool_nspace
);
186 ioctx
.snap_set_read(snap_id
);
188 librados::bufferlist
* read_buf
= new librados::bufferlist();
190 auto ctx
= new LambdaContext([this, read_buf
, cache_file_name
](int ret
) {
191 handle_promote_callback(ret
, read_buf
, cache_file_name
);
194 return promote_object(&ioctx
, object_name
, read_buf
, ctx
);
197 int ObjectCacheStore::handle_promote_callback(int ret
, bufferlist
* read_buf
,
198 std::string cache_file_name
) {
199 ldout(m_cct
, 20) << " cache_file_name: " << cache_file_name
<< dendl
;
202 if (ret
!= -ENOENT
&& ret
< 0) {
203 lderr(m_cct
) << "fail to read from rados" << dendl
;
205 m_policy
->update_status(cache_file_name
, OBJ_CACHE_NONE
);
210 auto state
= OBJ_CACHE_PROMOTED
;
211 if (ret
== -ENOENT
) {
213 state
= OBJ_CACHE_DNE
;
216 std::string cache_file_path
= get_cache_file_path(cache_file_name
, true);
217 if (cache_file_path
== "") {
218 lderr(m_cct
) << "fail to write cache file" << dendl
;
219 m_policy
->update_status(cache_file_name
, OBJ_CACHE_NONE
);
224 ret
= read_buf
->write_file(cache_file_path
.c_str());
226 lderr(m_cct
) << "fail to write cache file" << dendl
;
228 m_policy
->update_status(cache_file_name
, OBJ_CACHE_NONE
);
235 ceph_assert(OBJ_CACHE_SKIP
== m_policy
->get_status(cache_file_name
));
236 m_policy
->update_status(cache_file_name
, state
, read_buf
->length());
237 ceph_assert(state
== m_policy
->get_status(cache_file_name
));
246 int ObjectCacheStore::lookup_object(std::string pool_nspace
, uint64_t pool_id
,
247 uint64_t snap_id
, uint64_t object_size
,
248 std::string object_name
,
249 bool return_dne_path
,
250 std::string
& target_cache_file_path
) {
251 ldout(m_cct
, 20) << "object name = " << object_name
252 << " in pool ID : " << pool_id
<< dendl
;
255 std::string cache_file_name
=
256 get_cache_file_name(pool_nspace
, pool_id
, snap_id
, object_name
);
258 cache_status_t ret
= m_policy
->lookup_object(cache_file_name
);
261 case OBJ_CACHE_NONE
: {
262 if (take_token_from_throttle(object_size
, 1)) {
263 pret
= do_promote(pool_nspace
, pool_id
, snap_id
, object_name
);
265 lderr(m_cct
) << "fail to start promote" << dendl
;
268 m_policy
->update_status(cache_file_name
, OBJ_CACHE_NONE
);
272 case OBJ_CACHE_PROMOTED
:
273 target_cache_file_path
= get_cache_file_path(cache_file_name
);
276 if (return_dne_path
) {
277 target_cache_file_path
= get_cache_file_path(cache_file_name
);
283 lderr(m_cct
) << "unrecognized object cache status" << dendl
;
288 int ObjectCacheStore::promote_object(librados::IoCtx
* ioctx
,
289 std::string object_name
,
290 librados::bufferlist
* read_buf
,
291 Context
* on_finish
) {
292 ldout(m_cct
, 20) << "object name = " << object_name
<< dendl
;
294 librados::AioCompletion
* read_completion
= create_rados_callback(on_finish
);
295 // issue a zero-sized read req to get the entire obj
296 int ret
= ioctx
->aio_read(object_name
, read_completion
, read_buf
, 0, 0);
298 lderr(m_cct
) << "failed to read from rados" << dendl
;
300 read_completion
->release();
305 int ObjectCacheStore::evict_objects() {
306 ldout(m_cct
, 20) << dendl
;
308 std::list
<std::string
> obj_list
;
309 m_policy
->get_evict_list(&obj_list
);
310 for (auto& obj
: obj_list
) {
316 int ObjectCacheStore::do_evict(std::string cache_file
) {
317 ldout(m_cct
, 20) << "file = " << cache_file
<< dendl
;
319 if (cache_file
== "") {
323 std::string cache_file_path
= get_cache_file_path(cache_file
);
325 ldout(m_cct
, 20) << "evict cache: " << cache_file_path
<< dendl
;
327 // TODO(dehao): possible race on read?
328 int ret
= std::remove(cache_file_path
.c_str());
331 m_policy
->update_status(cache_file
, OBJ_CACHE_SKIP
);
332 m_policy
->evict_entry(cache_file
);
338 std::string
ObjectCacheStore::get_cache_file_name(std::string pool_nspace
,
342 return pool_nspace
+ ":" + std::to_string(pool_id
) + ":" +
343 std::to_string(snap_id
) + ":" + oid
;
346 std::string
ObjectCacheStore::get_cache_file_path(std::string cache_file_name
,
348 ldout(m_cct
, 20) << cache_file_name
<<dendl
;
351 crc
= ceph_crc32c(0, (unsigned char *)cache_file_name
.c_str(),
352 cache_file_name
.length());
354 std::string cache_file_dir
= std::to_string(crc
% 100) + "/";
357 ldout(m_cct
, 20) << "creating cache dir: " << cache_file_dir
<<dendl
;
359 std::string new_dir
= m_cache_root_dir
+ cache_file_dir
;
360 if (fs::exists(new_dir
, ec
)) {
361 ldout(m_cct
, 20) << "cache dir exists: " << cache_file_dir
<<dendl
;
362 return new_dir
+ cache_file_name
;
365 if (!fs::create_directories(new_dir
, ec
)) {
366 ldout(m_cct
, 5) << "fail to create cache dir: " << new_dir
367 << "error: " << ec
.message() << dendl
;
372 return m_cache_root_dir
+ cache_file_dir
+ cache_file_name
;
375 void ObjectCacheStore::handle_throttle_ready(uint64_t tokens
, uint64_t type
) {
376 m_io_throttled
= false;
377 std::lock_guard
lock(m_throttle_lock
);
378 if (type
& ROC_QOS_IOPS_THROTTLE
){
379 m_iops_tokens
+= tokens
;
380 } else if (type
& ROC_QOS_BPS_THROTTLE
){
381 m_bps_tokens
+= tokens
;
383 lderr(m_cct
) << "unknow throttle type." << dendl
;
387 bool ObjectCacheStore::take_token_from_throttle(uint64_t object_size
,
388 uint64_t object_num
) {
389 if (m_io_throttled
== true) {
395 if (!wait
&& (m_qos_enabled_flag
& ROC_QOS_IOPS_THROTTLE
)) {
396 std::lock_guard
lock(m_throttle_lock
);
397 if (object_num
> m_iops_tokens
) {
398 wait
= m_throttles
[ROC_QOS_IOPS_THROTTLE
]->get(object_num
, this,
399 &ObjectCacheStore::handle_throttle_ready
, object_num
,
400 ROC_QOS_IOPS_THROTTLE
);
402 m_iops_tokens
-= object_num
;
406 if (!wait
&& (m_qos_enabled_flag
& ROC_QOS_BPS_THROTTLE
)) {
407 std::lock_guard
lock(m_throttle_lock
);
408 if (object_size
> m_bps_tokens
) {
409 wait
= m_throttles
[ROC_QOS_BPS_THROTTLE
]->get(object_size
, this,
410 &ObjectCacheStore::handle_throttle_ready
, object_size
,
411 ROC_QOS_BPS_THROTTLE
);
413 m_bps_tokens
-= object_size
;
418 m_io_throttled
= true;
419 // when passing iops throttle, but limit in bps throttle, recovery
421 std::lock_guard
lock(m_throttle_lock
);
422 m_iops_tokens
+= object_num
;
429 static const std::map
<uint64_t, std::string
> THROTTLE_FLAGS
= {
430 { ROC_QOS_IOPS_THROTTLE
, "roc_qos_iops_throttle" },
431 { ROC_QOS_BPS_THROTTLE
, "roc_qos_bps_throttle" }
434 void ObjectCacheStore::apply_qos_tick_and_limit(
436 std::chrono::milliseconds min_tick
,
439 std::chrono::seconds burst_seconds
) {
440 SafeTimerSingleton
* safe_timer_singleton
= nullptr;
441 TokenBucketThrottle
* throttle
= nullptr;
442 safe_timer_singleton
=
443 &m_cct
->lookup_or_create_singleton_object
<SafeTimerSingleton
>(
444 "tools::immutable_object_cache", false, m_cct
);
445 SafeTimer
* timer
= safe_timer_singleton
;
446 ceph::mutex
* timer_lock
= &safe_timer_singleton
->lock
;
447 m_qos_enabled_flag
|= flag
;
448 auto throttle_flags_it
= THROTTLE_FLAGS
.find(flag
);
449 ceph_assert(throttle_flags_it
!= THROTTLE_FLAGS
.end());
450 throttle
= new TokenBucketThrottle(m_cct
, throttle_flags_it
->second
,
451 0, 0, timer
, timer_lock
);
452 throttle
->set_schedule_tick_min(min_tick
.count());
453 int ret
= throttle
->set_limit(limit
, burst
, burst_seconds
.count());
455 lderr(m_cct
) << throttle
->get_name() << ": invalid qos parameter: "
456 << "burst(" << burst
<< ") is less than "
457 << "limit(" << limit
<< ")" << dendl
;
458 throttle
->set_limit(limit
, 0, 1);
461 ceph_assert(m_throttles
.find(flag
) == m_throttles
.end());
462 m_throttles
.insert({flag
, throttle
});
465 } // namespace immutable_obj_cache