]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/immutable_object_cache/ObjectCacheStore.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / tools / immutable_object_cache / ObjectCacheStore.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "ObjectCacheStore.h"
5#include "Utils.h"
f67539c2
TL
6#if __has_include(<filesystem>)
7#include <filesystem>
8namespace fs = std::filesystem;
9#else
9f95a23c 10#include <experimental/filesystem>
f67539c2
TL
11namespace fs = std::experimental::filesystem;
12#endif
9f95a23c
TL
13
14#define dout_context g_ceph_context
15#define dout_subsys ceph_subsys_immutable_obj_cache
16#undef dout_prefix
17#define dout_prefix *_dout << "ceph::cache::ObjectCacheStore: " << this << " " \
18 << __func__ << ": "
19
9f95a23c
TL
20
21namespace ceph {
22namespace immutable_obj_cache {
23
f67539c2
TL
24namespace {
25
26class SafeTimerSingleton : public SafeTimer {
27public:
28 ceph::mutex lock = ceph::make_mutex
29 ("ceph::immutable_object_cache::SafeTimerSingleton::lock");
30
31 explicit SafeTimerSingleton(CephContext *cct)
32 : SafeTimer(cct, lock, true) {
33 init();
34 }
35 ~SafeTimerSingleton() {
36 std::lock_guard locker{lock};
37 shutdown();
38 }
39};
40
41} // anonymous namespace
42
43enum ThrottleTargetCode {
44 ROC_QOS_IOPS_THROTTLE = 1,
45 ROC_QOS_BPS_THROTTLE = 2
46};
47
9f95a23c
TL
48ObjectCacheStore::ObjectCacheStore(CephContext *cct)
49 : m_cct(cct), m_rados(new librados::Rados()) {
50
51 m_cache_root_dir =
52 m_cct->_conf.get_val<std::string>("immutable_object_cache_path");
53
54 if (m_cache_root_dir.back() != '/') {
55 m_cache_root_dir += "/";
56 }
57
58 uint64_t cache_max_size =
59 m_cct->_conf.get_val<Option::size_t>("immutable_object_cache_max_size");
60
61 double cache_watermark =
62 m_cct->_conf.get_val<double>("immutable_object_cache_watermark");
63
64 uint64_t max_inflight_ops =
65 m_cct->_conf.get_val<uint64_t>("immutable_object_cache_max_inflight_ops");
66
f67539c2
TL
67 uint64_t limit = 0;
68 if ((limit = m_cct->_conf.get_val<uint64_t>
69 ("immutable_object_cache_qos_iops_limit")) != 0) {
70 apply_qos_tick_and_limit(ROC_QOS_IOPS_THROTTLE,
71 m_cct->_conf.get_val<std::chrono::milliseconds>
72 ("immutable_object_cache_qos_schedule_tick_min"),
73 limit,
74 m_cct->_conf.get_val<uint64_t>
75 ("immutable_object_cache_qos_iops_burst"),
76 m_cct->_conf.get_val<std::chrono::seconds>
77 ("immutable_object_cache_qos_iops_burst_seconds"));
78 }
79 if ((limit = m_cct->_conf.get_val<uint64_t>
80 ("immutable_object_cache_qos_bps_limit")) != 0) {
81 apply_qos_tick_and_limit(ROC_QOS_BPS_THROTTLE,
82 m_cct->_conf.get_val<std::chrono::milliseconds>
83 ("immutable_object_cache_qos_schedule_tick_min"),
84 limit,
85 m_cct->_conf.get_val<uint64_t>
86 ("immutable_object_cache_qos_bps_burst"),
87 m_cct->_conf.get_val<std::chrono::seconds>
88 ("immutable_object_cache_qos_bps_burst_seconds"));
89 }
90
91 if ((cache_watermark <= 0) || (cache_watermark > 1)) {
92 lderr(m_cct) << "Invalid water mark provided, set it to default." << dendl;
93 cache_watermark = 0.9;
94 }
9f95a23c
TL
95 m_policy = new SimplePolicy(m_cct, cache_max_size, max_inflight_ops,
96 cache_watermark);
97}
98
99ObjectCacheStore::~ObjectCacheStore() {
100 delete m_policy;
f67539c2
TL
101 if (m_qos_enabled_flag & ROC_QOS_IOPS_THROTTLE) {
102 ceph_assert(m_throttles[ROC_QOS_IOPS_THROTTLE] != nullptr);
103 delete m_throttles[ROC_QOS_IOPS_THROTTLE];
104 }
105 if (m_qos_enabled_flag & ROC_QOS_BPS_THROTTLE) {
106 ceph_assert(m_throttles[ROC_QOS_BPS_THROTTLE] != nullptr);
107 delete m_throttles[ROC_QOS_BPS_THROTTLE];
108 }
9f95a23c
TL
109}
110
111int ObjectCacheStore::init(bool reset) {
112 ldout(m_cct, 20) << dendl;
113
114 int ret = m_rados->init_with_context(m_cct);
115 if (ret < 0) {
116 lderr(m_cct) << "fail to init Ceph context" << dendl;
117 return ret;
118 }
119
120 ret = m_rados->connect();
121 if (ret < 0) {
122 lderr(m_cct) << "fail to connect to cluster" << dendl;
123 return ret;
124 }
125
126 // TODO(dehao): fsck and reuse existing cache objects
127 if (reset) {
f6b5b4d7 128 try {
f67539c2 129 if (fs::exists(m_cache_root_dir)) {
f6b5b4d7 130 // remove all sub folders
f67539c2
TL
131 for (auto& p : fs::directory_iterator(m_cache_root_dir)) {
132 fs::remove_all(p.path());
f6b5b4d7
TL
133 }
134 } else {
f67539c2 135 fs::create_directories(m_cache_root_dir);
9f95a23c 136 }
f67539c2 137 } catch (const fs::filesystem_error& e) {
f6b5b4d7
TL
138 lderr(m_cct) << "failed to initialize cache store directory: "
139 << e.what() << dendl;
140 return -e.code().value();
9f95a23c
TL
141 }
142 }
143 return 0;
144}
145
146int ObjectCacheStore::shutdown() {
147 ldout(m_cct, 20) << dendl;
148
149 m_rados->shutdown();
150 return 0;
151}
152
153int ObjectCacheStore::init_cache() {
154 ldout(m_cct, 20) << dendl;
155 std::string cache_dir = m_cache_root_dir;
156
157 return 0;
158}
159
f67539c2
TL
160int ObjectCacheStore::do_promote(std::string pool_nspace, uint64_t pool_id,
161 uint64_t snap_id, std::string object_name) {
9f95a23c
TL
162 ldout(m_cct, 20) << "to promote object: " << object_name
163 << " from pool id: " << pool_id
164 << " namespace: " << pool_nspace
165 << " snapshot: " << snap_id << dendl;
166
167 int ret = 0;
f67539c2
TL
168 std::string cache_file_name =
169 get_cache_file_name(pool_nspace, pool_id, snap_id, object_name);
9f95a23c
TL
170 librados::IoCtx ioctx;
171 {
172 std::lock_guard _locker{m_ioctx_map_lock};
173 if (m_ioctx_map.find(pool_id) == m_ioctx_map.end()) {
174 ret = m_rados->ioctx_create2(pool_id, ioctx);
175 if (ret < 0) {
176 lderr(m_cct) << "fail to create ioctx" << dendl;
177 return ret;
178 }
179 m_ioctx_map.emplace(pool_id, ioctx);
180 } else {
181 ioctx = m_ioctx_map[pool_id];
182 }
183 }
184
185 ioctx.set_namespace(pool_nspace);
186 ioctx.snap_set_read(snap_id);
187
188 librados::bufferlist* read_buf = new librados::bufferlist();
189
190 auto ctx = new LambdaContext([this, read_buf, cache_file_name](int ret) {
191 handle_promote_callback(ret, read_buf, cache_file_name);
192 });
193
194 return promote_object(&ioctx, object_name, read_buf, ctx);
195}
196
197int ObjectCacheStore::handle_promote_callback(int ret, bufferlist* read_buf,
198 std::string cache_file_name) {
199 ldout(m_cct, 20) << " cache_file_name: " << cache_file_name << dendl;
200
201 // rados read error
202 if (ret != -ENOENT && ret < 0) {
203 lderr(m_cct) << "fail to read from rados" << dendl;
204
205 m_policy->update_status(cache_file_name, OBJ_CACHE_NONE);
206 delete read_buf;
207 return ret;
208 }
209
f91f0fd5 210 auto state = OBJ_CACHE_PROMOTED;
9f95a23c
TL
211 if (ret == -ENOENT) {
212 // object is empty
f91f0fd5 213 state = OBJ_CACHE_DNE;
9f95a23c 214 ret = 0;
f91f0fd5
TL
215 } else {
216 std::string cache_file_path = get_cache_file_path(cache_file_name, true);
217 if (cache_file_path == "") {
218 lderr(m_cct) << "fail to write cache file" << dendl;
219 m_policy->update_status(cache_file_name, OBJ_CACHE_NONE);
220 delete read_buf;
221 return -ENOSPC;
222 }
9f95a23c 223
f91f0fd5
TL
224 ret = read_buf->write_file(cache_file_path.c_str());
225 if (ret < 0) {
226 lderr(m_cct) << "fail to write cache file" << dendl;
9f95a23c 227
f91f0fd5
TL
228 m_policy->update_status(cache_file_name, OBJ_CACHE_NONE);
229 delete read_buf;
230 return ret;
231 }
9f95a23c
TL
232 }
233
234 // update metadata
235 ceph_assert(OBJ_CACHE_SKIP == m_policy->get_status(cache_file_name));
f91f0fd5
TL
236 m_policy->update_status(cache_file_name, state, read_buf->length());
237 ceph_assert(state == m_policy->get_status(cache_file_name));
9f95a23c
TL
238
239 delete read_buf;
240
241 evict_objects();
242
243 return ret;
244}
245
f67539c2
TL
246int ObjectCacheStore::lookup_object(std::string pool_nspace, uint64_t pool_id,
247 uint64_t snap_id, uint64_t object_size,
9f95a23c 248 std::string object_name,
f91f0fd5 249 bool return_dne_path,
9f95a23c
TL
250 std::string& target_cache_file_path) {
251 ldout(m_cct, 20) << "object name = " << object_name
252 << " in pool ID : " << pool_id << dendl;
253
254 int pret = -1;
f67539c2
TL
255 std::string cache_file_name =
256 get_cache_file_name(pool_nspace, pool_id, snap_id, object_name);
9f95a23c
TL
257
258 cache_status_t ret = m_policy->lookup_object(cache_file_name);
259
260 switch (ret) {
261 case OBJ_CACHE_NONE: {
f67539c2
TL
262 if (take_token_from_throttle(object_size, 1)) {
263 pret = do_promote(pool_nspace, pool_id, snap_id, object_name);
264 if (pret < 0) {
265 lderr(m_cct) << "fail to start promote" << dendl;
266 }
267 } else {
268 m_policy->update_status(cache_file_name, OBJ_CACHE_NONE);
9f95a23c
TL
269 }
270 return ret;
271 }
272 case OBJ_CACHE_PROMOTED:
273 target_cache_file_path = get_cache_file_path(cache_file_name);
274 return ret;
f91f0fd5
TL
275 case OBJ_CACHE_DNE:
276 if (return_dne_path) {
277 target_cache_file_path = get_cache_file_path(cache_file_name);
278 }
279 return ret;
9f95a23c
TL
280 case OBJ_CACHE_SKIP:
281 return ret;
282 default:
283 lderr(m_cct) << "unrecognized object cache status" << dendl;
284 ceph_assert(0);
285 }
286}
287
288int ObjectCacheStore::promote_object(librados::IoCtx* ioctx,
289 std::string object_name,
290 librados::bufferlist* read_buf,
291 Context* on_finish) {
292 ldout(m_cct, 20) << "object name = " << object_name << dendl;
293
294 librados::AioCompletion* read_completion = create_rados_callback(on_finish);
295 // issue a zero-sized read req to get the entire obj
296 int ret = ioctx->aio_read(object_name, read_completion, read_buf, 0, 0);
297 if (ret < 0) {
298 lderr(m_cct) << "failed to read from rados" << dendl;
299 }
300 read_completion->release();
301
302 return ret;
303}
304
305int ObjectCacheStore::evict_objects() {
306 ldout(m_cct, 20) << dendl;
307
308 std::list<std::string> obj_list;
309 m_policy->get_evict_list(&obj_list);
310 for (auto& obj : obj_list) {
311 do_evict(obj);
312 }
313 return 0;
314}
315
316int ObjectCacheStore::do_evict(std::string cache_file) {
317 ldout(m_cct, 20) << "file = " << cache_file << dendl;
318
319 if (cache_file == "") {
320 return 0;
321 }
322
323 std::string cache_file_path = get_cache_file_path(cache_file);
324
325 ldout(m_cct, 20) << "evict cache: " << cache_file_path << dendl;
326
327 // TODO(dehao): possible race on read?
328 int ret = std::remove(cache_file_path.c_str());
329 // evict metadata
330 if (ret == 0) {
331 m_policy->update_status(cache_file, OBJ_CACHE_SKIP);
332 m_policy->evict_entry(cache_file);
333 }
334
335 return ret;
336}
337
338std::string ObjectCacheStore::get_cache_file_name(std::string pool_nspace,
339 uint64_t pool_id,
340 uint64_t snap_id,
341 std::string oid) {
342 return pool_nspace + ":" + std::to_string(pool_id) + ":" +
343 std::to_string(snap_id) + ":" + oid;
344}
345
346std::string ObjectCacheStore::get_cache_file_path(std::string cache_file_name,
347 bool mkdir) {
348 ldout(m_cct, 20) << cache_file_name <<dendl;
349
350 uint32_t crc = 0;
351 crc = ceph_crc32c(0, (unsigned char *)cache_file_name.c_str(),
352 cache_file_name.length());
353
354 std::string cache_file_dir = std::to_string(crc % 100) + "/";
355
356 if (mkdir) {
357 ldout(m_cct, 20) << "creating cache dir: " << cache_file_dir <<dendl;
358 std::error_code ec;
359 std::string new_dir = m_cache_root_dir + cache_file_dir;
f67539c2 360 if (fs::exists(new_dir, ec)) {
9f95a23c
TL
361 ldout(m_cct, 20) << "cache dir exists: " << cache_file_dir <<dendl;
362 return new_dir + cache_file_name;
363 }
364
f67539c2 365 if (!fs::create_directories(new_dir, ec)) {
9f95a23c
TL
366 ldout(m_cct, 5) << "fail to create cache dir: " << new_dir
367 << "error: " << ec.message() << dendl;
368 return "";
369 }
370 }
371
372 return m_cache_root_dir + cache_file_dir + cache_file_name;
373}
374
f67539c2
TL
375void ObjectCacheStore::handle_throttle_ready(uint64_t tokens, uint64_t type) {
376 m_io_throttled = false;
377 std::lock_guard lock(m_throttle_lock);
378 if (type & ROC_QOS_IOPS_THROTTLE){
379 m_iops_tokens += tokens;
380 } else if (type & ROC_QOS_BPS_THROTTLE){
381 m_bps_tokens += tokens;
382 } else {
383 lderr(m_cct) << "unknow throttle type." << dendl;
384 }
385}
386
387bool ObjectCacheStore::take_token_from_throttle(uint64_t object_size,
388 uint64_t object_num) {
389 if (m_io_throttled == true) {
390 return false;
391 }
392
393 int flag = 0;
394 bool wait = false;
395 if (!wait && (m_qos_enabled_flag & ROC_QOS_IOPS_THROTTLE)) {
396 std::lock_guard lock(m_throttle_lock);
397 if (object_num > m_iops_tokens) {
398 wait = m_throttles[ROC_QOS_IOPS_THROTTLE]->get(object_num, this,
399 &ObjectCacheStore::handle_throttle_ready, object_num,
400 ROC_QOS_IOPS_THROTTLE);
401 } else {
402 m_iops_tokens -= object_num;
403 flag = 1;
404 }
405 }
406 if (!wait && (m_qos_enabled_flag & ROC_QOS_BPS_THROTTLE)) {
407 std::lock_guard lock(m_throttle_lock);
408 if (object_size > m_bps_tokens) {
409 wait = m_throttles[ROC_QOS_BPS_THROTTLE]->get(object_size, this,
410 &ObjectCacheStore::handle_throttle_ready, object_size,
411 ROC_QOS_BPS_THROTTLE);
412 } else {
413 m_bps_tokens -= object_size;
414 }
415 }
416
417 if (wait) {
418 m_io_throttled = true;
419 // when passing iops throttle, but limit in bps throttle, recovery
420 if (flag == 1) {
421 std::lock_guard lock(m_throttle_lock);
422 m_iops_tokens += object_num;
423 }
424 }
425
426 return !wait;
427}
428
429static const std::map<uint64_t, std::string> THROTTLE_FLAGS = {
430 { ROC_QOS_IOPS_THROTTLE, "roc_qos_iops_throttle" },
431 { ROC_QOS_BPS_THROTTLE, "roc_qos_bps_throttle" }
432};
433
434void ObjectCacheStore::apply_qos_tick_and_limit(
435 const uint64_t flag,
436 std::chrono::milliseconds min_tick,
437 uint64_t limit,
438 uint64_t burst,
439 std::chrono::seconds burst_seconds) {
440 SafeTimerSingleton* safe_timer_singleton = nullptr;
441 TokenBucketThrottle* throttle = nullptr;
442 safe_timer_singleton =
443 &m_cct->lookup_or_create_singleton_object<SafeTimerSingleton>(
444 "tools::immutable_object_cache", false, m_cct);
445 SafeTimer* timer = safe_timer_singleton;
446 ceph::mutex* timer_lock = &safe_timer_singleton->lock;
447 m_qos_enabled_flag |= flag;
448 auto throttle_flags_it = THROTTLE_FLAGS.find(flag);
449 ceph_assert(throttle_flags_it != THROTTLE_FLAGS.end());
450 throttle = new TokenBucketThrottle(m_cct, throttle_flags_it->second,
451 0, 0, timer, timer_lock);
452 throttle->set_schedule_tick_min(min_tick.count());
453 int ret = throttle->set_limit(limit, burst, burst_seconds.count());
454 if (ret < 0) {
455 lderr(m_cct) << throttle->get_name() << ": invalid qos parameter: "
456 << "burst(" << burst << ") is less than "
457 << "limit(" << limit << ")" << dendl;
458 throttle->set_limit(limit, 0, 1);
459 }
460
461 ceph_assert(m_throttles.find(flag) == m_throttles.end());
462 m_throttles.insert({flag, throttle});
463}
464
9f95a23c
TL
465} // namespace immutable_obj_cache
466} // namespace ceph