]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/immutable_object_cache/ObjectCacheStore.cc
Import ceph 15.2.8
[ceph.git] / ceph / src / tools / immutable_object_cache / ObjectCacheStore.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "ObjectCacheStore.h"
5#include "Utils.h"
6#include <experimental/filesystem>
7
8#define dout_context g_ceph_context
9#define dout_subsys ceph_subsys_immutable_obj_cache
10#undef dout_prefix
11#define dout_prefix *_dout << "ceph::cache::ObjectCacheStore: " << this << " " \
12 << __func__ << ": "
13
14namespace efs = std::experimental::filesystem;
15
16namespace ceph {
17namespace immutable_obj_cache {
18
19ObjectCacheStore::ObjectCacheStore(CephContext *cct)
20 : m_cct(cct), m_rados(new librados::Rados()) {
21
22 m_cache_root_dir =
23 m_cct->_conf.get_val<std::string>("immutable_object_cache_path");
24
25 if (m_cache_root_dir.back() != '/') {
26 m_cache_root_dir += "/";
27 }
28
29 uint64_t cache_max_size =
30 m_cct->_conf.get_val<Option::size_t>("immutable_object_cache_max_size");
31
32 double cache_watermark =
33 m_cct->_conf.get_val<double>("immutable_object_cache_watermark");
34
35 uint64_t max_inflight_ops =
36 m_cct->_conf.get_val<uint64_t>("immutable_object_cache_max_inflight_ops");
37
38 m_policy = new SimplePolicy(m_cct, cache_max_size, max_inflight_ops,
39 cache_watermark);
40}
41
42ObjectCacheStore::~ObjectCacheStore() {
43 delete m_policy;
44}
45
46int ObjectCacheStore::init(bool reset) {
47 ldout(m_cct, 20) << dendl;
48
49 int ret = m_rados->init_with_context(m_cct);
50 if (ret < 0) {
51 lderr(m_cct) << "fail to init Ceph context" << dendl;
52 return ret;
53 }
54
55 ret = m_rados->connect();
56 if (ret < 0) {
57 lderr(m_cct) << "fail to connect to cluster" << dendl;
58 return ret;
59 }
60
61 // TODO(dehao): fsck and reuse existing cache objects
62 if (reset) {
f6b5b4d7
TL
63 try {
64 if (efs::exists(m_cache_root_dir)) {
65 // remove all sub folders
66 for (auto& p : efs::directory_iterator(m_cache_root_dir)) {
67 efs::remove_all(p.path());
68 }
69 } else {
70 efs::create_directories(m_cache_root_dir);
9f95a23c 71 }
f6b5b4d7
TL
72 } catch (const efs::filesystem_error& e) {
73 lderr(m_cct) << "failed to initialize cache store directory: "
74 << e.what() << dendl;
75 return -e.code().value();
9f95a23c
TL
76 }
77 }
78 return 0;
79}
80
81int ObjectCacheStore::shutdown() {
82 ldout(m_cct, 20) << dendl;
83
84 m_rados->shutdown();
85 return 0;
86}
87
88int ObjectCacheStore::init_cache() {
89 ldout(m_cct, 20) << dendl;
90 std::string cache_dir = m_cache_root_dir;
91
92 return 0;
93}
94
95int ObjectCacheStore::do_promote(std::string pool_nspace,
96 uint64_t pool_id, uint64_t snap_id,
97 std::string object_name) {
98 ldout(m_cct, 20) << "to promote object: " << object_name
99 << " from pool id: " << pool_id
100 << " namespace: " << pool_nspace
101 << " snapshot: " << snap_id << dendl;
102
103 int ret = 0;
104 std::string cache_file_name = get_cache_file_name(pool_nspace, pool_id, snap_id, object_name);
105 librados::IoCtx ioctx;
106 {
107 std::lock_guard _locker{m_ioctx_map_lock};
108 if (m_ioctx_map.find(pool_id) == m_ioctx_map.end()) {
109 ret = m_rados->ioctx_create2(pool_id, ioctx);
110 if (ret < 0) {
111 lderr(m_cct) << "fail to create ioctx" << dendl;
112 return ret;
113 }
114 m_ioctx_map.emplace(pool_id, ioctx);
115 } else {
116 ioctx = m_ioctx_map[pool_id];
117 }
118 }
119
120 ioctx.set_namespace(pool_nspace);
121 ioctx.snap_set_read(snap_id);
122
123 librados::bufferlist* read_buf = new librados::bufferlist();
124
125 auto ctx = new LambdaContext([this, read_buf, cache_file_name](int ret) {
126 handle_promote_callback(ret, read_buf, cache_file_name);
127 });
128
129 return promote_object(&ioctx, object_name, read_buf, ctx);
130}
131
132int ObjectCacheStore::handle_promote_callback(int ret, bufferlist* read_buf,
133 std::string cache_file_name) {
134 ldout(m_cct, 20) << " cache_file_name: " << cache_file_name << dendl;
135
136 // rados read error
137 if (ret != -ENOENT && ret < 0) {
138 lderr(m_cct) << "fail to read from rados" << dendl;
139
140 m_policy->update_status(cache_file_name, OBJ_CACHE_NONE);
141 delete read_buf;
142 return ret;
143 }
144
f91f0fd5 145 auto state = OBJ_CACHE_PROMOTED;
9f95a23c
TL
146 if (ret == -ENOENT) {
147 // object is empty
f91f0fd5 148 state = OBJ_CACHE_DNE;
9f95a23c 149 ret = 0;
f91f0fd5
TL
150 } else {
151 std::string cache_file_path = get_cache_file_path(cache_file_name, true);
152 if (cache_file_path == "") {
153 lderr(m_cct) << "fail to write cache file" << dendl;
154 m_policy->update_status(cache_file_name, OBJ_CACHE_NONE);
155 delete read_buf;
156 return -ENOSPC;
157 }
9f95a23c 158
f91f0fd5
TL
159 ret = read_buf->write_file(cache_file_path.c_str());
160 if (ret < 0) {
161 lderr(m_cct) << "fail to write cache file" << dendl;
9f95a23c 162
f91f0fd5
TL
163 m_policy->update_status(cache_file_name, OBJ_CACHE_NONE);
164 delete read_buf;
165 return ret;
166 }
9f95a23c
TL
167 }
168
169 // update metadata
170 ceph_assert(OBJ_CACHE_SKIP == m_policy->get_status(cache_file_name));
f91f0fd5
TL
171 m_policy->update_status(cache_file_name, state, read_buf->length());
172 ceph_assert(state == m_policy->get_status(cache_file_name));
9f95a23c
TL
173
174 delete read_buf;
175
176 evict_objects();
177
178 return ret;
179}
180
181int ObjectCacheStore::lookup_object(std::string pool_nspace,
182 uint64_t pool_id, uint64_t snap_id,
183 std::string object_name,
f91f0fd5 184 bool return_dne_path,
9f95a23c
TL
185 std::string& target_cache_file_path) {
186 ldout(m_cct, 20) << "object name = " << object_name
187 << " in pool ID : " << pool_id << dendl;
188
189 int pret = -1;
190 std::string cache_file_name = get_cache_file_name(pool_nspace, pool_id, snap_id, object_name);
191
192 cache_status_t ret = m_policy->lookup_object(cache_file_name);
193
194 switch (ret) {
195 case OBJ_CACHE_NONE: {
196 pret = do_promote(pool_nspace, pool_id, snap_id, object_name);
197 if (pret < 0) {
198 lderr(m_cct) << "fail to start promote" << dendl;
199 }
200 return ret;
201 }
202 case OBJ_CACHE_PROMOTED:
203 target_cache_file_path = get_cache_file_path(cache_file_name);
204 return ret;
f91f0fd5
TL
205 case OBJ_CACHE_DNE:
206 if (return_dne_path) {
207 target_cache_file_path = get_cache_file_path(cache_file_name);
208 }
209 return ret;
9f95a23c
TL
210 case OBJ_CACHE_SKIP:
211 return ret;
212 default:
213 lderr(m_cct) << "unrecognized object cache status" << dendl;
214 ceph_assert(0);
215 }
216}
217
218int ObjectCacheStore::promote_object(librados::IoCtx* ioctx,
219 std::string object_name,
220 librados::bufferlist* read_buf,
221 Context* on_finish) {
222 ldout(m_cct, 20) << "object name = " << object_name << dendl;
223
224 librados::AioCompletion* read_completion = create_rados_callback(on_finish);
225 // issue a zero-sized read req to get the entire obj
226 int ret = ioctx->aio_read(object_name, read_completion, read_buf, 0, 0);
227 if (ret < 0) {
228 lderr(m_cct) << "failed to read from rados" << dendl;
229 }
230 read_completion->release();
231
232 return ret;
233}
234
235int ObjectCacheStore::evict_objects() {
236 ldout(m_cct, 20) << dendl;
237
238 std::list<std::string> obj_list;
239 m_policy->get_evict_list(&obj_list);
240 for (auto& obj : obj_list) {
241 do_evict(obj);
242 }
243 return 0;
244}
245
246int ObjectCacheStore::do_evict(std::string cache_file) {
247 ldout(m_cct, 20) << "file = " << cache_file << dendl;
248
249 if (cache_file == "") {
250 return 0;
251 }
252
253 std::string cache_file_path = get_cache_file_path(cache_file);
254
255 ldout(m_cct, 20) << "evict cache: " << cache_file_path << dendl;
256
257 // TODO(dehao): possible race on read?
258 int ret = std::remove(cache_file_path.c_str());
259 // evict metadata
260 if (ret == 0) {
261 m_policy->update_status(cache_file, OBJ_CACHE_SKIP);
262 m_policy->evict_entry(cache_file);
263 }
264
265 return ret;
266}
267
268std::string ObjectCacheStore::get_cache_file_name(std::string pool_nspace,
269 uint64_t pool_id,
270 uint64_t snap_id,
271 std::string oid) {
272 return pool_nspace + ":" + std::to_string(pool_id) + ":" +
273 std::to_string(snap_id) + ":" + oid;
274}
275
276std::string ObjectCacheStore::get_cache_file_path(std::string cache_file_name,
277 bool mkdir) {
278 ldout(m_cct, 20) << cache_file_name <<dendl;
279
280 uint32_t crc = 0;
281 crc = ceph_crc32c(0, (unsigned char *)cache_file_name.c_str(),
282 cache_file_name.length());
283
284 std::string cache_file_dir = std::to_string(crc % 100) + "/";
285
286 if (mkdir) {
287 ldout(m_cct, 20) << "creating cache dir: " << cache_file_dir <<dendl;
288 std::error_code ec;
289 std::string new_dir = m_cache_root_dir + cache_file_dir;
290 if (efs::exists(new_dir, ec)) {
291 ldout(m_cct, 20) << "cache dir exists: " << cache_file_dir <<dendl;
292 return new_dir + cache_file_name;
293 }
294
295 if (!efs::create_directories(new_dir, ec)) {
296 ldout(m_cct, 5) << "fail to create cache dir: " << new_dir
297 << "error: " << ec.message() << dendl;
298 return "";
299 }
300 }
301
302 return m_cache_root_dir + cache_file_dir + cache_file_name;
303}
304
305} // namespace immutable_obj_cache
306} // namespace ceph