]>
Commit | Line | Data |
---|---|---|
20effc67 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | #ifndef CEPH_RGWD3NDATACACHE_H | |
5 | #define CEPH_RGWD3NDATACACHE_H | |
6 | ||
7 | #include "rgw_rados.h" | |
8 | #include <curl/curl.h> | |
9 | ||
10 | #include "rgw_common.h" | |
11 | ||
12 | #include <unistd.h> | |
13 | #include <signal.h> | |
14 | #include "include/Context.h" | |
15 | #include "include/lru.h" | |
16 | #include "rgw_d3n_cacherequest.h" | |
17 | ||
18 | ||
19 | /*D3nDataCache*/ | |
20 | struct D3nDataCache; | |
21 | ||
22 | ||
23 | struct D3nChunkDataInfo : public LRUObject { | |
24 | CephContext *cct; | |
25 | uint64_t size; | |
26 | time_t access_time; | |
27 | std::string address; | |
28 | std::string oid; | |
29 | bool complete; | |
30 | struct D3nChunkDataInfo* lru_prev; | |
31 | struct D3nChunkDataInfo* lru_next; | |
32 | ||
33 | D3nChunkDataInfo(): size(0) {} | |
34 | ||
35 | void set_ctx(CephContext *_cct) { | |
36 | cct = _cct; | |
37 | } | |
38 | ||
39 | void dump(Formatter *f) const; | |
40 | static void generate_test_instances(std::list<D3nChunkDataInfo*>& o); | |
41 | }; | |
42 | ||
43 | struct D3nCacheAioWriteRequest { | |
44 | std::string oid; | |
45 | void *data; | |
46 | int fd; | |
47 | struct aiocb *cb; | |
48 | D3nDataCache *priv_data; | |
49 | CephContext *cct; | |
50 | ||
51 | D3nCacheAioWriteRequest(CephContext *_cct) : cct(_cct) {} | |
52 | int d3n_prepare_libaio_write_op(bufferlist& bl, unsigned int len, std::string oid, std::string cache_location); | |
53 | ||
54 | ~D3nCacheAioWriteRequest() { | |
55 | ::close(fd); | |
56 | cb->aio_buf = nullptr; | |
57 | free(data); | |
58 | data = nullptr; | |
59 | delete(cb); | |
60 | } | |
61 | }; | |
62 | ||
63 | struct D3nDataCache { | |
64 | ||
65 | private: | |
66 | std::unordered_map<std::string, D3nChunkDataInfo*> d3n_cache_map; | |
67 | std::set<std::string> d3n_outstanding_write_list; | |
68 | std::mutex d3n_cache_lock; | |
69 | std::mutex d3n_eviction_lock; | |
70 | ||
71 | CephContext *cct; | |
72 | enum class _io_type { | |
73 | SYNC_IO = 1, | |
74 | ASYNC_IO = 2, | |
75 | SEND_FILE = 3 | |
76 | } io_type; | |
77 | enum class _eviction_policy { | |
78 | LRU=0, RANDOM=1 | |
79 | } eviction_policy; | |
80 | ||
81 | struct sigaction action; | |
82 | uint64_t free_data_cache_size = 0; | |
83 | uint64_t outstanding_write_size = 0; | |
84 | struct D3nChunkDataInfo* head; | |
85 | struct D3nChunkDataInfo* tail; | |
86 | ||
87 | private: | |
88 | void add_io(); | |
89 | ||
90 | public: | |
91 | D3nDataCache(); | |
92 | ~D3nDataCache() { | |
93 | while (lru_eviction() > 0); | |
94 | } | |
95 | ||
96 | std::string cache_location; | |
97 | ||
98 | bool get(const std::string& oid, const off_t len); | |
99 | void put(bufferlist& bl, unsigned int len, std::string& obj_key); | |
100 | int d3n_io_write(bufferlist& bl, unsigned int len, std::string oid); | |
101 | int d3n_libaio_create_write_request(bufferlist& bl, unsigned int len, std::string oid); | |
102 | void d3n_libaio_write_completion_cb(D3nCacheAioWriteRequest* c); | |
103 | size_t random_eviction(); | |
104 | size_t lru_eviction(); | |
105 | ||
106 | void init(CephContext *_cct); | |
107 | ||
108 | void lru_insert_head(struct D3nChunkDataInfo* o) { | |
109 | lsubdout(g_ceph_context, rgw_datacache, 30) << "D3nDataCache: " << __func__ << "()" << dendl; | |
110 | o->lru_next = head; | |
111 | o->lru_prev = nullptr; | |
112 | if (head) { | |
113 | head->lru_prev = o; | |
114 | } else { | |
115 | tail = o; | |
116 | } | |
117 | head = o; | |
118 | } | |
119 | ||
120 | void lru_insert_tail(struct D3nChunkDataInfo* o) { | |
121 | lsubdout(g_ceph_context, rgw_datacache, 30) << "D3nDataCache: " << __func__ << "()" << dendl; | |
122 | o->lru_next = nullptr; | |
123 | o->lru_prev = tail; | |
124 | if (tail) { | |
125 | tail->lru_next = o; | |
126 | } else { | |
127 | head = o; | |
128 | } | |
129 | tail = o; | |
130 | } | |
131 | ||
132 | void lru_remove(struct D3nChunkDataInfo* o) { | |
133 | lsubdout(g_ceph_context, rgw_datacache, 30) << "D3nDataCache: " << __func__ << "()" << dendl; | |
134 | if (o->lru_next) | |
135 | o->lru_next->lru_prev = o->lru_prev; | |
136 | else | |
137 | tail = o->lru_prev; | |
138 | if (o->lru_prev) | |
139 | o->lru_prev->lru_next = o->lru_next; | |
140 | else | |
141 | head = o->lru_next; | |
142 | o->lru_next = o->lru_prev = nullptr; | |
143 | } | |
144 | }; | |
145 | ||
146 | ||
147 | template <class T> | |
148 | class D3nRGWDataCache : public T { | |
149 | ||
150 | public: | |
151 | D3nRGWDataCache() {} | |
152 | ||
153 | int init_rados() override { | |
154 | int ret; | |
155 | ret = T::init_rados(); | |
156 | if (ret < 0) | |
157 | return ret; | |
158 | ||
159 | return 0; | |
160 | } | |
161 | ||
162 | int get_obj_iterate_cb(const DoutPrefixProvider *dpp, const rgw_raw_obj& read_obj, off_t obj_ofs, | |
163 | off_t read_ofs, off_t len, bool is_head_obj, | |
164 | RGWObjState *astate, void *arg) override; | |
165 | }; | |
166 | ||
167 | template<typename T> | |
168 | int D3nRGWDataCache<T>::get_obj_iterate_cb(const DoutPrefixProvider *dpp, const rgw_raw_obj& read_obj, off_t obj_ofs, | |
169 | off_t read_ofs, off_t len, bool is_head_obj, | |
170 | RGWObjState *astate, void *arg) { | |
171 | lsubdout(g_ceph_context, rgw_datacache, 30) << "D3nDataCache::" << __func__ << "(): is head object : " << is_head_obj << dendl; | |
172 | librados::ObjectReadOperation op; | |
173 | struct get_obj_data* d = static_cast<struct get_obj_data*>(arg); | |
174 | std::string oid, key; | |
175 | ||
176 | if (is_head_obj) { | |
177 | // only when reading from the head object do we need to do the atomic test | |
178 | int r = T::append_atomic_test(dpp, astate, op); | |
179 | if (r < 0) | |
180 | return r; | |
181 | ||
182 | if (astate && | |
183 | obj_ofs < astate->data.length()) { | |
184 | unsigned chunk_len = std::min((uint64_t)astate->data.length() - obj_ofs, (uint64_t)len); | |
185 | ||
186 | r = d->client_cb->handle_data(astate->data, obj_ofs, chunk_len); | |
187 | if (r < 0) | |
188 | return r; | |
189 | ||
190 | len -= chunk_len; | |
191 | d->offset += chunk_len; | |
192 | read_ofs += chunk_len; | |
193 | obj_ofs += chunk_len; | |
194 | if (!len) | |
195 | return 0; | |
196 | } | |
197 | ||
198 | auto obj = d->rgwrados->svc.rados->obj(read_obj); | |
199 | r = obj.open(dpp); | |
200 | if (r < 0) { | |
201 | lsubdout(g_ceph_context, rgw, 4) << "failed to open rados context for " << read_obj << dendl; | |
202 | return r; | |
203 | } | |
204 | ||
205 | ldpp_dout(dpp, 20) << "D3nDataCache::" << __func__ << "(): oid=" << read_obj.oid << " obj-ofs=" << obj_ofs << " read_ofs=" << read_ofs << " len=" << len << dendl; | |
206 | op.read(read_ofs, len, nullptr, nullptr); | |
207 | ||
208 | const uint64_t cost = len; | |
209 | const uint64_t id = obj_ofs; // use logical object offset for sorting replies | |
210 | ||
211 | auto completed = d->aio->get(obj, rgw::Aio::librados_op(std::move(op), d->yield), cost, id); | |
212 | return d->flush(std::move(completed)); | |
213 | } else { | |
214 | ldpp_dout(dpp, 20) << "D3nDataCache::" << __func__ << "(): oid=" << read_obj.oid << ", is_head_obj=" << is_head_obj << ", obj-ofs=" << obj_ofs << ", read_ofs=" << read_ofs << ", len=" << len << dendl; | |
215 | int r; | |
216 | ||
217 | op.read(read_ofs, len, nullptr, nullptr); | |
218 | ||
219 | const uint64_t cost = len; | |
220 | const uint64_t id = obj_ofs; // use logical object offset for sorting replies | |
221 | oid = read_obj.oid; | |
222 | ||
223 | auto obj = d->rgwrados->svc.rados->obj(read_obj); | |
224 | r = obj.open(dpp); | |
225 | if (r < 0) { | |
226 | lsubdout(g_ceph_context, rgw, 0) << "D3nDataCache: Error: failed to open rados context for " << read_obj << ", r=" << r << dendl; | |
227 | return r; | |
228 | } | |
229 | ||
230 | const bool is_compressed = (astate->attrset.find(RGW_ATTR_COMPRESSION) != astate->attrset.end()); | |
231 | const bool is_encrypted = (astate->attrset.find(RGW_ATTR_CRYPT_MODE) != astate->attrset.end()); | |
232 | if (read_ofs != 0 || astate->size != astate->accounted_size || is_compressed || is_encrypted) { | |
233 | d->d3n_bypass_cache_write = true; | |
234 | lsubdout(g_ceph_context, rgw, 5) << "D3nDataCache: " << __func__ << "(): Note - bypassing datacache: oid=" << read_obj.oid << ", read_ofs!=0 = " << read_ofs << ", size=" << astate->size << " != accounted_size=" << astate->accounted_size << ", is_compressed=" << is_compressed << ", is_encrypted=" << is_encrypted << dendl; | |
235 | auto completed = d->aio->get(obj, rgw::Aio::librados_op(std::move(op), d->yield), cost, id); | |
236 | r = d->flush(std::move(completed)); | |
237 | return r; | |
238 | } | |
239 | ||
240 | if (d->rgwrados->d3n_data_cache->get(oid, len)) { | |
241 | // Read From Cache | |
242 | ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): READ FROM CACHE: oid=" << read_obj.oid << ", obj-ofs=" << obj_ofs << ", read_ofs=" << read_ofs << ", len=" << len << dendl; | |
243 | auto completed = d->aio->get(obj, rgw::Aio::d3n_cache_op(dpp, d->yield, read_ofs, len, d->rgwrados->d3n_data_cache->cache_location), cost, id); | |
244 | r = d->flush(std::move(completed)); | |
245 | if (r < 0) { | |
246 | lsubdout(g_ceph_context, rgw, 0) << "D3nDataCache: " << __func__ << "(): Error: failed to drain/flush, r= " << r << dendl; | |
247 | } | |
248 | return r; | |
249 | } else { | |
250 | // Write To Cache | |
251 | ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): WRITE TO CACHE: oid=" << read_obj.oid << ", obj-ofs=" << obj_ofs << ", read_ofs=" << read_ofs << " len=" << len << dendl; | |
252 | auto completed = d->aio->get(obj, rgw::Aio::librados_op(std::move(op), d->yield), cost, id); | |
253 | return d->flush(std::move(completed)); | |
254 | } | |
255 | } | |
256 | lsubdout(g_ceph_context, rgw, 1) << "D3nDataCache: " << __func__ << "(): Warning: Check head object cache handling flow, oid=" << read_obj.oid << dendl; | |
257 | ||
258 | return 0; | |
259 | } | |
260 | ||
261 | #endif |