]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "ObjectCacheStore.h" | |
5 | #include "Utils.h" | |
6 | #include <experimental/filesystem> | |
7 | ||
8 | #define dout_context g_ceph_context | |
9 | #define dout_subsys ceph_subsys_immutable_obj_cache | |
10 | #undef dout_prefix | |
11 | #define dout_prefix *_dout << "ceph::cache::ObjectCacheStore: " << this << " " \ | |
12 | << __func__ << ": " | |
13 | ||
14 | namespace efs = std::experimental::filesystem; | |
15 | ||
16 | namespace ceph { | |
17 | namespace immutable_obj_cache { | |
18 | ||
19 | ObjectCacheStore::ObjectCacheStore(CephContext *cct) | |
20 | : m_cct(cct), m_rados(new librados::Rados()) { | |
21 | ||
22 | m_cache_root_dir = | |
23 | m_cct->_conf.get_val<std::string>("immutable_object_cache_path"); | |
24 | ||
25 | if (m_cache_root_dir.back() != '/') { | |
26 | m_cache_root_dir += "/"; | |
27 | } | |
28 | ||
29 | uint64_t cache_max_size = | |
30 | m_cct->_conf.get_val<Option::size_t>("immutable_object_cache_max_size"); | |
31 | ||
32 | double cache_watermark = | |
33 | m_cct->_conf.get_val<double>("immutable_object_cache_watermark"); | |
34 | ||
35 | uint64_t max_inflight_ops = | |
36 | m_cct->_conf.get_val<uint64_t>("immutable_object_cache_max_inflight_ops"); | |
37 | ||
38 | m_policy = new SimplePolicy(m_cct, cache_max_size, max_inflight_ops, | |
39 | cache_watermark); | |
40 | } | |
41 | ||
42 | ObjectCacheStore::~ObjectCacheStore() { | |
43 | delete m_policy; | |
44 | } | |
45 | ||
46 | int ObjectCacheStore::init(bool reset) { | |
47 | ldout(m_cct, 20) << dendl; | |
48 | ||
49 | int ret = m_rados->init_with_context(m_cct); | |
50 | if (ret < 0) { | |
51 | lderr(m_cct) << "fail to init Ceph context" << dendl; | |
52 | return ret; | |
53 | } | |
54 | ||
55 | ret = m_rados->connect(); | |
56 | if (ret < 0) { | |
57 | lderr(m_cct) << "fail to connect to cluster" << dendl; | |
58 | return ret; | |
59 | } | |
60 | ||
61 | // TODO(dehao): fsck and reuse existing cache objects | |
62 | if (reset) { | |
f6b5b4d7 TL |
63 | try { |
64 | if (efs::exists(m_cache_root_dir)) { | |
65 | // remove all sub folders | |
66 | for (auto& p : efs::directory_iterator(m_cache_root_dir)) { | |
67 | efs::remove_all(p.path()); | |
68 | } | |
69 | } else { | |
70 | efs::create_directories(m_cache_root_dir); | |
9f95a23c | 71 | } |
f6b5b4d7 TL |
72 | } catch (const efs::filesystem_error& e) { |
73 | lderr(m_cct) << "failed to initialize cache store directory: " | |
74 | << e.what() << dendl; | |
75 | return -e.code().value(); | |
9f95a23c TL |
76 | } |
77 | } | |
78 | return 0; | |
79 | } | |
80 | ||
81 | int ObjectCacheStore::shutdown() { | |
82 | ldout(m_cct, 20) << dendl; | |
83 | ||
84 | m_rados->shutdown(); | |
85 | return 0; | |
86 | } | |
87 | ||
88 | int ObjectCacheStore::init_cache() { | |
89 | ldout(m_cct, 20) << dendl; | |
90 | std::string cache_dir = m_cache_root_dir; | |
91 | ||
92 | return 0; | |
93 | } | |
94 | ||
95 | int ObjectCacheStore::do_promote(std::string pool_nspace, | |
96 | uint64_t pool_id, uint64_t snap_id, | |
97 | std::string object_name) { | |
98 | ldout(m_cct, 20) << "to promote object: " << object_name | |
99 | << " from pool id: " << pool_id | |
100 | << " namespace: " << pool_nspace | |
101 | << " snapshot: " << snap_id << dendl; | |
102 | ||
103 | int ret = 0; | |
104 | std::string cache_file_name = get_cache_file_name(pool_nspace, pool_id, snap_id, object_name); | |
105 | librados::IoCtx ioctx; | |
106 | { | |
107 | std::lock_guard _locker{m_ioctx_map_lock}; | |
108 | if (m_ioctx_map.find(pool_id) == m_ioctx_map.end()) { | |
109 | ret = m_rados->ioctx_create2(pool_id, ioctx); | |
110 | if (ret < 0) { | |
111 | lderr(m_cct) << "fail to create ioctx" << dendl; | |
112 | return ret; | |
113 | } | |
114 | m_ioctx_map.emplace(pool_id, ioctx); | |
115 | } else { | |
116 | ioctx = m_ioctx_map[pool_id]; | |
117 | } | |
118 | } | |
119 | ||
120 | ioctx.set_namespace(pool_nspace); | |
121 | ioctx.snap_set_read(snap_id); | |
122 | ||
123 | librados::bufferlist* read_buf = new librados::bufferlist(); | |
124 | ||
125 | auto ctx = new LambdaContext([this, read_buf, cache_file_name](int ret) { | |
126 | handle_promote_callback(ret, read_buf, cache_file_name); | |
127 | }); | |
128 | ||
129 | return promote_object(&ioctx, object_name, read_buf, ctx); | |
130 | } | |
131 | ||
132 | int ObjectCacheStore::handle_promote_callback(int ret, bufferlist* read_buf, | |
133 | std::string cache_file_name) { | |
134 | ldout(m_cct, 20) << " cache_file_name: " << cache_file_name << dendl; | |
135 | ||
136 | // rados read error | |
137 | if (ret != -ENOENT && ret < 0) { | |
138 | lderr(m_cct) << "fail to read from rados" << dendl; | |
139 | ||
140 | m_policy->update_status(cache_file_name, OBJ_CACHE_NONE); | |
141 | delete read_buf; | |
142 | return ret; | |
143 | } | |
144 | ||
f91f0fd5 | 145 | auto state = OBJ_CACHE_PROMOTED; |
9f95a23c TL |
146 | if (ret == -ENOENT) { |
147 | // object is empty | |
f91f0fd5 | 148 | state = OBJ_CACHE_DNE; |
9f95a23c | 149 | ret = 0; |
f91f0fd5 TL |
150 | } else { |
151 | std::string cache_file_path = get_cache_file_path(cache_file_name, true); | |
152 | if (cache_file_path == "") { | |
153 | lderr(m_cct) << "fail to write cache file" << dendl; | |
154 | m_policy->update_status(cache_file_name, OBJ_CACHE_NONE); | |
155 | delete read_buf; | |
156 | return -ENOSPC; | |
157 | } | |
9f95a23c | 158 | |
f91f0fd5 TL |
159 | ret = read_buf->write_file(cache_file_path.c_str()); |
160 | if (ret < 0) { | |
161 | lderr(m_cct) << "fail to write cache file" << dendl; | |
9f95a23c | 162 | |
f91f0fd5 TL |
163 | m_policy->update_status(cache_file_name, OBJ_CACHE_NONE); |
164 | delete read_buf; | |
165 | return ret; | |
166 | } | |
9f95a23c TL |
167 | } |
168 | ||
169 | // update metadata | |
170 | ceph_assert(OBJ_CACHE_SKIP == m_policy->get_status(cache_file_name)); | |
f91f0fd5 TL |
171 | m_policy->update_status(cache_file_name, state, read_buf->length()); |
172 | ceph_assert(state == m_policy->get_status(cache_file_name)); | |
9f95a23c TL |
173 | |
174 | delete read_buf; | |
175 | ||
176 | evict_objects(); | |
177 | ||
178 | return ret; | |
179 | } | |
180 | ||
181 | int ObjectCacheStore::lookup_object(std::string pool_nspace, | |
182 | uint64_t pool_id, uint64_t snap_id, | |
183 | std::string object_name, | |
f91f0fd5 | 184 | bool return_dne_path, |
9f95a23c TL |
185 | std::string& target_cache_file_path) { |
186 | ldout(m_cct, 20) << "object name = " << object_name | |
187 | << " in pool ID : " << pool_id << dendl; | |
188 | ||
189 | int pret = -1; | |
190 | std::string cache_file_name = get_cache_file_name(pool_nspace, pool_id, snap_id, object_name); | |
191 | ||
192 | cache_status_t ret = m_policy->lookup_object(cache_file_name); | |
193 | ||
194 | switch (ret) { | |
195 | case OBJ_CACHE_NONE: { | |
196 | pret = do_promote(pool_nspace, pool_id, snap_id, object_name); | |
197 | if (pret < 0) { | |
198 | lderr(m_cct) << "fail to start promote" << dendl; | |
199 | } | |
200 | return ret; | |
201 | } | |
202 | case OBJ_CACHE_PROMOTED: | |
203 | target_cache_file_path = get_cache_file_path(cache_file_name); | |
204 | return ret; | |
f91f0fd5 TL |
205 | case OBJ_CACHE_DNE: |
206 | if (return_dne_path) { | |
207 | target_cache_file_path = get_cache_file_path(cache_file_name); | |
208 | } | |
209 | return ret; | |
9f95a23c TL |
210 | case OBJ_CACHE_SKIP: |
211 | return ret; | |
212 | default: | |
213 | lderr(m_cct) << "unrecognized object cache status" << dendl; | |
214 | ceph_assert(0); | |
215 | } | |
216 | } | |
217 | ||
218 | int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, | |
219 | std::string object_name, | |
220 | librados::bufferlist* read_buf, | |
221 | Context* on_finish) { | |
222 | ldout(m_cct, 20) << "object name = " << object_name << dendl; | |
223 | ||
224 | librados::AioCompletion* read_completion = create_rados_callback(on_finish); | |
225 | // issue a zero-sized read req to get the entire obj | |
226 | int ret = ioctx->aio_read(object_name, read_completion, read_buf, 0, 0); | |
227 | if (ret < 0) { | |
228 | lderr(m_cct) << "failed to read from rados" << dendl; | |
229 | } | |
230 | read_completion->release(); | |
231 | ||
232 | return ret; | |
233 | } | |
234 | ||
235 | int ObjectCacheStore::evict_objects() { | |
236 | ldout(m_cct, 20) << dendl; | |
237 | ||
238 | std::list<std::string> obj_list; | |
239 | m_policy->get_evict_list(&obj_list); | |
240 | for (auto& obj : obj_list) { | |
241 | do_evict(obj); | |
242 | } | |
243 | return 0; | |
244 | } | |
245 | ||
246 | int ObjectCacheStore::do_evict(std::string cache_file) { | |
247 | ldout(m_cct, 20) << "file = " << cache_file << dendl; | |
248 | ||
249 | if (cache_file == "") { | |
250 | return 0; | |
251 | } | |
252 | ||
253 | std::string cache_file_path = get_cache_file_path(cache_file); | |
254 | ||
255 | ldout(m_cct, 20) << "evict cache: " << cache_file_path << dendl; | |
256 | ||
257 | // TODO(dehao): possible race on read? | |
258 | int ret = std::remove(cache_file_path.c_str()); | |
259 | // evict metadata | |
260 | if (ret == 0) { | |
261 | m_policy->update_status(cache_file, OBJ_CACHE_SKIP); | |
262 | m_policy->evict_entry(cache_file); | |
263 | } | |
264 | ||
265 | return ret; | |
266 | } | |
267 | ||
268 | std::string ObjectCacheStore::get_cache_file_name(std::string pool_nspace, | |
269 | uint64_t pool_id, | |
270 | uint64_t snap_id, | |
271 | std::string oid) { | |
272 | return pool_nspace + ":" + std::to_string(pool_id) + ":" + | |
273 | std::to_string(snap_id) + ":" + oid; | |
274 | } | |
275 | ||
276 | std::string ObjectCacheStore::get_cache_file_path(std::string cache_file_name, | |
277 | bool mkdir) { | |
278 | ldout(m_cct, 20) << cache_file_name <<dendl; | |
279 | ||
280 | uint32_t crc = 0; | |
281 | crc = ceph_crc32c(0, (unsigned char *)cache_file_name.c_str(), | |
282 | cache_file_name.length()); | |
283 | ||
284 | std::string cache_file_dir = std::to_string(crc % 100) + "/"; | |
285 | ||
286 | if (mkdir) { | |
287 | ldout(m_cct, 20) << "creating cache dir: " << cache_file_dir <<dendl; | |
288 | std::error_code ec; | |
289 | std::string new_dir = m_cache_root_dir + cache_file_dir; | |
290 | if (efs::exists(new_dir, ec)) { | |
291 | ldout(m_cct, 20) << "cache dir exists: " << cache_file_dir <<dendl; | |
292 | return new_dir + cache_file_name; | |
293 | } | |
294 | ||
295 | if (!efs::create_directories(new_dir, ec)) { | |
296 | ldout(m_cct, 5) << "fail to create cache dir: " << new_dir | |
297 | << "error: " << ec.message() << dendl; | |
298 | return ""; | |
299 | } | |
300 | } | |
301 | ||
302 | return m_cache_root_dir + cache_file_dir + cache_file_name; | |
303 | } | |
304 | ||
305 | } // namespace immutable_obj_cache | |
306 | } // namespace ceph |