]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | #ifndef CEPH_OBJECTCACHER_H | |
4 | #define CEPH_OBJECTCACHER_H | |
5 | ||
6 | #include "include/types.h" | |
7 | #include "include/lru.h" | |
8 | #include "include/Context.h" | |
9 | #include "include/xlist.h" | |
9f95a23c | 10 | #include "include/common_fwd.h" |
7c673cae FG |
11 | |
12 | #include "common/Cond.h" | |
13 | #include "common/Finisher.h" | |
14 | #include "common/Thread.h" | |
31f18b77 | 15 | #include "common/zipkin_trace.h" |
7c673cae FG |
16 | |
17 | #include "Objecter.h" | |
18 | #include "Striper.h" | |
19 | ||
7c673cae | 20 | class WritebackHandler; |
7c673cae FG |
21 | |
22 | enum { | |
23 | l_objectcacher_first = 25000, | |
24 | ||
25 | l_objectcacher_cache_ops_hit, // ops we satisfy completely from cache | |
26 | l_objectcacher_cache_ops_miss, // ops we don't satisfy completely from cache | |
27 | ||
28 | l_objectcacher_cache_bytes_hit, // bytes read directly from cache | |
29 | ||
30 | l_objectcacher_cache_bytes_miss, // bytes we couldn't read directly | |
31 | ||
32 | // from cache | |
33 | ||
34 | l_objectcacher_data_read, // total bytes read out | |
35 | l_objectcacher_data_written, // bytes written to cache | |
36 | l_objectcacher_data_flushed, // bytes flushed to WritebackHandler | |
37 | l_objectcacher_overwritten_in_flush, // bytes overwritten while | |
38 | // flushing is in progress | |
39 | ||
40 | l_objectcacher_write_ops_blocked, // total write ops we delayed due | |
41 | // to dirty limits | |
42 | l_objectcacher_write_bytes_blocked, // total number of write bytes | |
43 | // we delayed due to dirty | |
44 | // limits | |
45 | l_objectcacher_write_time_blocked, // total time in seconds spent | |
46 | // blocking a write due to dirty | |
47 | // limits | |
48 | ||
49 | l_objectcacher_last, | |
50 | }; | |
51 | ||
52 | class ObjectCacher { | |
53 | PerfCounters *perfcounter; | |
54 | public: | |
55 | CephContext *cct; | |
56 | class Object; | |
57 | struct ObjectSet; | |
58 | class C_ReadFinish; | |
59 | ||
60 | typedef void (*flush_set_callback_t) (void *p, ObjectSet *oset); | |
61 | ||
62 | // read scatter/gather | |
63 | struct OSDRead { | |
f67539c2 | 64 | std::vector<ObjectExtent> extents; |
7c673cae | 65 | snapid_t snap; |
f67539c2 | 66 | ceph::buffer::list *bl; |
7c673cae | 67 | int fadvise_flags; |
f67539c2 | 68 | OSDRead(snapid_t s, ceph::buffer::list *b, int f) |
7c673cae FG |
69 | : snap(s), bl(b), fadvise_flags(f) {} |
70 | }; | |
71 | ||
f67539c2 | 72 | OSDRead *prepare_read(snapid_t snap, ceph::buffer::list *b, int f) const { |
7c673cae FG |
73 | return new OSDRead(snap, b, f); |
74 | } | |
75 | ||
76 | // write scatter/gather | |
77 | struct OSDWrite { | |
f67539c2 | 78 | std::vector<ObjectExtent> extents; |
7c673cae | 79 | SnapContext snapc; |
f67539c2 | 80 | ceph::buffer::list bl; |
7c673cae FG |
81 | ceph::real_time mtime; |
82 | int fadvise_flags; | |
83 | ceph_tid_t journal_tid; | |
f67539c2 | 84 | OSDWrite(const SnapContext& sc, const ceph::buffer::list& b, ceph::real_time mt, |
7c673cae FG |
85 | int f, ceph_tid_t _journal_tid) |
86 | : snapc(sc), bl(b), mtime(mt), fadvise_flags(f), | |
87 | journal_tid(_journal_tid) {} | |
88 | }; | |
89 | ||
90 | OSDWrite *prepare_write(const SnapContext& sc, | |
f67539c2 | 91 | const ceph::buffer::list &b, |
7c673cae FG |
92 | ceph::real_time mt, |
93 | int f, | |
94 | ceph_tid_t journal_tid) const { | |
95 | return new OSDWrite(sc, b, mt, f, journal_tid); | |
96 | } | |
97 | ||
98 | ||
99 | ||
100 | // ******* BufferHead ********* | |
101 | class BufferHead : public LRUObject { | |
102 | public: | |
103 | // states | |
104 | static const int STATE_MISSING = 0; | |
105 | static const int STATE_CLEAN = 1; | |
106 | static const int STATE_ZERO = 2; // NOTE: these are *clean* zeros | |
107 | static const int STATE_DIRTY = 3; | |
108 | static const int STATE_RX = 4; | |
109 | static const int STATE_TX = 5; | |
110 | static const int STATE_ERROR = 6; // a read error occurred | |
111 | ||
112 | private: | |
113 | // my fields | |
114 | int state; | |
115 | int ref; | |
116 | struct { | |
117 | loff_t start, length; // bh extent in object | |
118 | } ex; | |
119 | bool dontneed; //indicate bh don't need by anyone | |
120 | bool nocache; //indicate bh don't need by this caller | |
121 | ||
122 | public: | |
123 | Object *ob; | |
f67539c2 | 124 | ceph::buffer::list bl; |
7c673cae FG |
125 | ceph_tid_t last_write_tid; // version of bh (if non-zero) |
126 | ceph_tid_t last_read_tid; // tid of last read op (if any) | |
127 | ceph::real_time last_write; | |
128 | SnapContext snapc; | |
129 | ceph_tid_t journal_tid; | |
130 | int error; // holds return value for failed reads | |
131 | ||
f67539c2 | 132 | std::map<loff_t, std::list<Context*> > waitfor_read; |
7c673cae FG |
133 | |
134 | // cons | |
135 | explicit BufferHead(Object *o) : | |
136 | state(STATE_MISSING), | |
137 | ref(0), | |
138 | dontneed(false), | |
139 | nocache(false), | |
140 | ob(o), | |
141 | last_write_tid(0), | |
142 | last_read_tid(0), | |
143 | journal_tid(0), | |
144 | error(0) { | |
145 | ex.start = ex.length = 0; | |
146 | } | |
147 | ||
148 | // extent | |
149 | loff_t start() const { return ex.start; } | |
150 | void set_start(loff_t s) { ex.start = s; } | |
151 | loff_t length() const { return ex.length; } | |
152 | void set_length(loff_t l) { ex.length = l; } | |
153 | loff_t end() const { return ex.start + ex.length; } | |
154 | loff_t last() const { return end() - 1; } | |
155 | ||
156 | // states | |
157 | void set_state(int s) { | |
158 | if (s == STATE_RX || s == STATE_TX) get(); | |
159 | if (state == STATE_RX || state == STATE_TX) put(); | |
160 | state = s; | |
161 | } | |
162 | int get_state() const { return state; } | |
163 | ||
20effc67 TL |
164 | inline int get_error() const { |
165 | return error; | |
166 | } | |
167 | inline void set_error(int _error) { | |
168 | error = _error; | |
169 | } | |
170 | ||
7c673cae FG |
171 | inline ceph_tid_t get_journal_tid() const { |
172 | return journal_tid; | |
173 | } | |
174 | inline void set_journal_tid(ceph_tid_t _journal_tid) { | |
175 | journal_tid = _journal_tid; | |
176 | } | |
177 | ||
178 | bool is_missing() const { return state == STATE_MISSING; } | |
179 | bool is_dirty() const { return state == STATE_DIRTY; } | |
180 | bool is_clean() const { return state == STATE_CLEAN; } | |
181 | bool is_zero() const { return state == STATE_ZERO; } | |
182 | bool is_tx() const { return state == STATE_TX; } | |
183 | bool is_rx() const { return state == STATE_RX; } | |
184 | bool is_error() const { return state == STATE_ERROR; } | |
185 | ||
186 | // reference counting | |
187 | int get() { | |
11fdf7f2 | 188 | ceph_assert(ref >= 0); |
7c673cae FG |
189 | if (ref == 0) lru_pin(); |
190 | return ++ref; | |
191 | } | |
192 | int put() { | |
11fdf7f2 | 193 | ceph_assert(ref > 0); |
7c673cae FG |
194 | if (ref == 1) lru_unpin(); |
195 | --ref; | |
196 | return ref; | |
197 | } | |
198 | ||
199 | void set_dontneed(bool v) { | |
200 | dontneed = v; | |
201 | } | |
202 | bool get_dontneed() const { | |
203 | return dontneed; | |
204 | } | |
205 | ||
206 | void set_nocache(bool v) { | |
207 | nocache = v; | |
208 | } | |
209 | bool get_nocache() const { | |
210 | return nocache; | |
211 | } | |
212 | ||
213 | inline bool can_merge_journal(BufferHead *bh) const { | |
214 | return (get_journal_tid() == bh->get_journal_tid()); | |
215 | } | |
216 | ||
217 | struct ptr_lt { | |
218 | bool operator()(const BufferHead* l, const BufferHead* r) const { | |
219 | const Object *lob = l->ob; | |
220 | const Object *rob = r->ob; | |
221 | const ObjectSet *loset = lob->oset; | |
222 | const ObjectSet *roset = rob->oset; | |
223 | if (loset != roset) | |
224 | return loset < roset; | |
225 | if (lob != rob) | |
226 | return lob < rob; | |
227 | if (l->start() != r->start()) | |
228 | return l->start() < r->start(); | |
229 | return l < r; | |
230 | } | |
231 | }; | |
232 | }; | |
233 | ||
234 | // ******* Object ********* | |
235 | class Object : public LRUObject { | |
236 | private: | |
237 | // ObjectCacher::Object fields | |
238 | int ref; | |
239 | ObjectCacher *oc; | |
240 | sobject_t oid; | |
241 | friend struct ObjectSet; | |
242 | ||
243 | public: | |
244 | uint64_t object_no; | |
245 | ObjectSet *oset; | |
246 | xlist<Object*>::item set_item; | |
247 | object_locator_t oloc; | |
248 | uint64_t truncate_size, truncate_seq; | |
249 | ||
250 | bool complete; | |
251 | bool exists; | |
252 | ||
f67539c2 | 253 | std::map<loff_t, BufferHead*> data; |
7c673cae FG |
254 | |
255 | ceph_tid_t last_write_tid; // version of bh (if non-zero) | |
11fdf7f2 | 256 | ceph_tid_t last_commit_tid; // last update committed. |
7c673cae FG |
257 | |
258 | int dirty_or_tx; | |
259 | ||
f67539c2 | 260 | std::map< ceph_tid_t, std::list<Context*> > waitfor_commit; |
7c673cae FG |
261 | xlist<C_ReadFinish*> reads; |
262 | ||
263 | Object(const Object&) = delete; | |
264 | Object& operator=(const Object&) = delete; | |
265 | ||
266 | Object(ObjectCacher *_oc, sobject_t o, uint64_t ono, ObjectSet *os, | |
267 | object_locator_t& l, uint64_t ts, uint64_t tq) : | |
268 | ref(0), | |
269 | oc(_oc), | |
270 | oid(o), object_no(ono), oset(os), set_item(this), oloc(l), | |
271 | truncate_size(ts), truncate_seq(tq), | |
272 | complete(false), exists(true), | |
273 | last_write_tid(0), last_commit_tid(0), | |
274 | dirty_or_tx(0) { | |
275 | // add to set | |
276 | os->objects.push_back(&set_item); | |
277 | } | |
278 | ~Object() { | |
279 | reads.clear(); | |
11fdf7f2 TL |
280 | ceph_assert(ref == 0); |
281 | ceph_assert(data.empty()); | |
282 | ceph_assert(dirty_or_tx == 0); | |
7c673cae FG |
283 | set_item.remove_myself(); |
284 | } | |
285 | ||
286 | sobject_t get_soid() const { return oid; } | |
287 | object_t get_oid() { return oid.oid; } | |
288 | snapid_t get_snap() { return oid.snap; } | |
289 | ObjectSet *get_object_set() const { return oset; } | |
f67539c2 | 290 | std::string get_namespace() { return oloc.nspace; } |
7c673cae FG |
291 | uint64_t get_object_number() const { return object_no; } |
292 | ||
293 | const object_locator_t& get_oloc() const { return oloc; } | |
294 | void set_object_locator(object_locator_t& l) { oloc = l; } | |
295 | ||
296 | bool can_close() const { | |
297 | if (lru_is_expireable()) { | |
11fdf7f2 TL |
298 | ceph_assert(data.empty()); |
299 | ceph_assert(waitfor_commit.empty()); | |
7c673cae FG |
300 | return true; |
301 | } | |
302 | return false; | |
303 | } | |
304 | ||
305 | /** | |
306 | * Check buffers and waiters for consistency | |
307 | * - no overlapping buffers | |
308 | * - index in map matches BH | |
309 | * - waiters fall within BH | |
310 | */ | |
311 | void audit_buffers(); | |
312 | ||
313 | /** | |
314 | * find first buffer that includes or follows an offset | |
315 | * | |
316 | * @param offset object byte offset | |
317 | * @return iterator pointing to buffer, or data.end() | |
318 | */ | |
f67539c2 TL |
319 | std::map<loff_t,BufferHead*>::const_iterator data_lower_bound(loff_t offset) const { |
320 | auto p = data.lower_bound(offset); | |
7c673cae FG |
321 | if (p != data.begin() && |
322 | (p == data.end() || p->first > offset)) { | |
323 | --p; // might overlap! | |
324 | if (p->first + p->second->length() <= offset) | |
325 | ++p; // doesn't overlap. | |
326 | } | |
327 | return p; | |
328 | } | |
329 | ||
330 | // bh | |
331 | // add to my map | |
332 | void add_bh(BufferHead *bh) { | |
333 | if (data.empty()) | |
334 | get(); | |
11fdf7f2 | 335 | ceph_assert(data.count(bh->start()) == 0); |
7c673cae FG |
336 | data[bh->start()] = bh; |
337 | } | |
338 | void remove_bh(BufferHead *bh) { | |
11fdf7f2 | 339 | ceph_assert(data.count(bh->start())); |
7c673cae FG |
340 | data.erase(bh->start()); |
341 | if (data.empty()) | |
342 | put(); | |
343 | } | |
344 | ||
345 | bool is_empty() const { return data.empty(); } | |
346 | ||
347 | // mid-level | |
348 | BufferHead *split(BufferHead *bh, loff_t off); | |
349 | void merge_left(BufferHead *left, BufferHead *right); | |
b32b8144 | 350 | bool can_merge_bh(BufferHead *left, BufferHead *right); |
7c673cae | 351 | void try_merge_bh(BufferHead *bh); |
f64942e4 | 352 | void maybe_rebuild_buffer(BufferHead *bh); |
7c673cae FG |
353 | |
354 | bool is_cached(loff_t off, loff_t len) const; | |
355 | bool include_all_cached_data(loff_t off, loff_t len); | |
356 | int map_read(ObjectExtent &ex, | |
f67539c2 TL |
357 | std::map<loff_t, BufferHead*>& hits, |
358 | std::map<loff_t, BufferHead*>& missing, | |
359 | std::map<loff_t, BufferHead*>& rx, | |
360 | std::map<loff_t, BufferHead*>& errors); | |
7c673cae | 361 | BufferHead *map_write(ObjectExtent &ex, ceph_tid_t tid); |
31f18b77 | 362 | |
7c673cae FG |
363 | void replace_journal_tid(BufferHead *bh, ceph_tid_t tid); |
364 | void truncate(loff_t s); | |
28e407b8 | 365 | void discard(loff_t off, loff_t len, C_GatherBuilder* commit_gather); |
7c673cae FG |
366 | |
367 | // reference counting | |
368 | int get() { | |
11fdf7f2 | 369 | ceph_assert(ref >= 0); |
7c673cae FG |
370 | if (ref == 0) lru_pin(); |
371 | return ++ref; | |
372 | } | |
373 | int put() { | |
11fdf7f2 | 374 | ceph_assert(ref > 0); |
7c673cae FG |
375 | if (ref == 1) lru_unpin(); |
376 | --ref; | |
377 | return ref; | |
378 | } | |
379 | }; | |
380 | ||
381 | ||
382 | struct ObjectSet { | |
383 | void *parent; | |
384 | ||
385 | inodeno_t ino; | |
386 | uint64_t truncate_seq, truncate_size; | |
387 | ||
388 | int64_t poolid; | |
389 | xlist<Object*> objects; | |
390 | ||
391 | int dirty_or_tx; | |
392 | bool return_enoent; | |
393 | ||
394 | ObjectSet(void *p, int64_t _poolid, inodeno_t i) | |
395 | : parent(p), ino(i), truncate_seq(0), | |
396 | truncate_size(0), poolid(_poolid), dirty_or_tx(0), | |
397 | return_enoent(false) {} | |
398 | ||
399 | }; | |
400 | ||
401 | ||
402 | // ******* ObjectCacher ********* | |
403 | // ObjectCacher fields | |
404 | private: | |
405 | WritebackHandler& writeback_handler; | |
406 | bool scattered_write; | |
407 | ||
f67539c2 | 408 | std::string name; |
9f95a23c | 409 | ceph::mutex& lock; |
7c673cae FG |
410 | |
411 | uint64_t max_dirty, target_dirty, max_size, max_objects; | |
412 | ceph::timespan max_dirty_age; | |
413 | bool block_writes_upfront; | |
414 | ||
31f18b77 FG |
415 | ZTracer::Endpoint trace_endpoint; |
416 | ||
7c673cae FG |
417 | flush_set_callback_t flush_set_callback; |
418 | void *flush_set_callback_arg; | |
419 | ||
420 | // indexed by pool_id | |
f67539c2 | 421 | std::vector<ceph::unordered_map<sobject_t, Object*> > objects; |
7c673cae | 422 | |
f67539c2 | 423 | std::list<Context*> waitfor_read; |
7c673cae FG |
424 | |
425 | ceph_tid_t last_read_tid; | |
426 | ||
f67539c2 | 427 | std::set<BufferHead*, BufferHead::ptr_lt> dirty_or_tx_bh; |
7c673cae FG |
428 | LRU bh_lru_dirty, bh_lru_rest; |
429 | LRU ob_lru; | |
430 | ||
9f95a23c | 431 | ceph::condition_variable flusher_cond; |
7c673cae FG |
432 | bool flusher_stop; |
433 | void flusher_entry(); | |
434 | class FlusherThread : public Thread { | |
435 | ObjectCacher *oc; | |
436 | public: | |
437 | explicit FlusherThread(ObjectCacher *o) : oc(o) {} | |
438 | void *entry() override { | |
439 | oc->flusher_entry(); | |
440 | return 0; | |
441 | } | |
442 | } flusher_thread; | |
443 | ||
444 | Finisher finisher; | |
445 | ||
446 | // objects | |
447 | Object *get_object_maybe(sobject_t oid, object_locator_t &l) { | |
448 | // have it? | |
449 | if (((uint32_t)l.pool < objects.size()) && | |
450 | (objects[l.pool].count(oid))) | |
451 | return objects[l.pool][oid]; | |
452 | return NULL; | |
453 | } | |
454 | ||
455 | Object *get_object(sobject_t oid, uint64_t object_no, ObjectSet *oset, | |
456 | object_locator_t &l, uint64_t truncate_size, | |
457 | uint64_t truncate_seq); | |
458 | void close_object(Object *ob); | |
459 | ||
460 | // bh stats | |
9f95a23c | 461 | ceph::condition_variable stat_cond; |
7c673cae FG |
462 | |
463 | loff_t stat_clean; | |
464 | loff_t stat_zero; | |
465 | loff_t stat_dirty; | |
466 | loff_t stat_rx; | |
467 | loff_t stat_tx; | |
468 | loff_t stat_missing; | |
469 | loff_t stat_error; | |
470 | loff_t stat_dirty_waiting; // bytes that writers are waiting on to write | |
471 | ||
3efd9988 FG |
472 | size_t stat_nr_dirty_waiters; |
473 | ||
7c673cae FG |
474 | void verify_stats() const; |
475 | ||
476 | void bh_stat_add(BufferHead *bh); | |
477 | void bh_stat_sub(BufferHead *bh); | |
478 | loff_t get_stat_tx() const { return stat_tx; } | |
479 | loff_t get_stat_rx() const { return stat_rx; } | |
480 | loff_t get_stat_dirty() const { return stat_dirty; } | |
7c673cae FG |
481 | loff_t get_stat_clean() const { return stat_clean; } |
482 | loff_t get_stat_zero() const { return stat_zero; } | |
3efd9988 FG |
483 | loff_t get_stat_dirty_waiting() const { return stat_dirty_waiting; } |
484 | size_t get_stat_nr_dirty_waiters() const { return stat_nr_dirty_waiters; } | |
7c673cae FG |
485 | |
486 | void touch_bh(BufferHead *bh) { | |
487 | if (bh->is_dirty()) | |
488 | bh_lru_dirty.lru_touch(bh); | |
489 | else | |
490 | bh_lru_rest.lru_touch(bh); | |
491 | ||
492 | bh->set_dontneed(false); | |
493 | bh->set_nocache(false); | |
494 | touch_ob(bh->ob); | |
495 | } | |
496 | void touch_ob(Object *ob) { | |
497 | ob_lru.lru_touch(ob); | |
498 | } | |
499 | void bottouch_ob(Object *ob) { | |
500 | ob_lru.lru_bottouch(ob); | |
501 | } | |
502 | ||
503 | // bh states | |
504 | void bh_set_state(BufferHead *bh, int s); | |
505 | void copy_bh_state(BufferHead *bh1, BufferHead *bh2) { | |
506 | bh_set_state(bh2, bh1->get_state()); | |
507 | } | |
508 | ||
509 | void mark_missing(BufferHead *bh) { | |
510 | bh_set_state(bh,BufferHead::STATE_MISSING); | |
511 | } | |
512 | void mark_clean(BufferHead *bh) { | |
513 | bh_set_state(bh, BufferHead::STATE_CLEAN); | |
514 | } | |
515 | void mark_zero(BufferHead *bh) { | |
516 | bh_set_state(bh, BufferHead::STATE_ZERO); | |
517 | } | |
518 | void mark_rx(BufferHead *bh) { | |
519 | bh_set_state(bh, BufferHead::STATE_RX); | |
520 | } | |
521 | void mark_tx(BufferHead *bh) { | |
522 | bh_set_state(bh, BufferHead::STATE_TX); } | |
523 | void mark_error(BufferHead *bh) { | |
524 | bh_set_state(bh, BufferHead::STATE_ERROR); | |
525 | } | |
526 | void mark_dirty(BufferHead *bh) { | |
527 | bh_set_state(bh, BufferHead::STATE_DIRTY); | |
528 | bh_lru_dirty.lru_touch(bh); | |
529 | //bh->set_dirty_stamp(ceph_clock_now()); | |
530 | } | |
531 | ||
532 | void bh_add(Object *ob, BufferHead *bh); | |
533 | void bh_remove(Object *ob, BufferHead *bh); | |
534 | ||
535 | // io | |
31f18b77 FG |
536 | void bh_read(BufferHead *bh, int op_flags, |
537 | const ZTracer::Trace &parent_trace); | |
538 | void bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace); | |
f67539c2 | 539 | void bh_write_scattered(std::list<BufferHead*>& blist); |
7c673cae FG |
540 | void bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff, |
541 | int64_t *amount, int *max_count); | |
542 | ||
543 | void trim(); | |
31f18b77 | 544 | void flush(ZTracer::Trace *trace, loff_t amount=0); |
7c673cae FG |
545 | |
546 | /** | |
547 | * flush a range of buffers | |
548 | * | |
549 | * Flush any buffers that intersect the specified extent. If len==0, | |
550 | * flush *all* buffers for the object. | |
551 | * | |
552 | * @param o object | |
553 | * @param off start offset | |
554 | * @param len extent length, or 0 for entire object | |
555 | * @return true if object was already clean/flushed. | |
556 | */ | |
31f18b77 FG |
557 | bool flush(Object *o, loff_t off, loff_t len, |
558 | ZTracer::Trace *trace); | |
7c673cae FG |
559 | loff_t release(Object *o); |
560 | void purge(Object *o); | |
561 | ||
562 | int64_t reads_outstanding; | |
9f95a23c | 563 | ceph::condition_variable read_cond; |
7c673cae FG |
564 | |
565 | int _readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, | |
31f18b77 | 566 | bool external_call, ZTracer::Trace *trace); |
7c673cae FG |
567 | void retry_waiting_reads(); |
568 | ||
569 | public: | |
570 | void bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid, | |
571 | loff_t offset, uint64_t length, | |
f67539c2 | 572 | ceph::buffer::list &bl, int r, |
7c673cae FG |
573 | bool trust_enoent); |
574 | void bh_write_commit(int64_t poolid, sobject_t oid, | |
f67539c2 | 575 | std::vector<std::pair<loff_t, uint64_t> >& ranges, |
7c673cae FG |
576 | ceph_tid_t t, int r); |
577 | ||
578 | class C_WriteCommit; | |
579 | class C_WaitForWrite; | |
580 | ||
581 | void perf_start(); | |
582 | void perf_stop(); | |
583 | ||
584 | ||
585 | ||
f67539c2 | 586 | ObjectCacher(CephContext *cct_, std::string name, WritebackHandler& wb, ceph::mutex& l, |
7c673cae FG |
587 | flush_set_callback_t flush_callback, |
588 | void *flush_callback_arg, | |
589 | uint64_t max_bytes, uint64_t max_objects, | |
590 | uint64_t max_dirty, uint64_t target_dirty, double max_age, | |
591 | bool block_writes_upfront); | |
592 | ~ObjectCacher(); | |
593 | ||
594 | void start() { | |
595 | flusher_thread.create("flusher"); | |
596 | } | |
597 | void stop() { | |
11fdf7f2 | 598 | ceph_assert(flusher_thread.is_started()); |
9f95a23c | 599 | lock.lock(); // hmm.. watch out for deadlock! |
7c673cae | 600 | flusher_stop = true; |
9f95a23c TL |
601 | flusher_cond.notify_all(); |
602 | lock.unlock(); | |
7c673cae FG |
603 | flusher_thread.join(); |
604 | } | |
605 | ||
606 | ||
607 | class C_RetryRead; | |
608 | ||
609 | ||
610 | // non-blocking. async. | |
611 | ||
612 | /** | |
613 | * @note total read size must be <= INT_MAX, since | |
614 | * the return value is total bytes read | |
615 | */ | |
31f18b77 FG |
616 | int readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, |
617 | ZTracer::Trace *parent_trace = nullptr); | |
618 | int writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace, | |
619 | ZTracer::Trace *parent_trace = nullptr); | |
f67539c2 | 620 | bool is_cached(ObjectSet *oset, std::vector<ObjectExtent>& extents, |
7c673cae FG |
621 | snapid_t snapid); |
622 | ||
623 | private: | |
624 | // write blocking | |
625 | int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, | |
31f18b77 | 626 | ZTracer::Trace *trace, Context *onfreespace); |
9f95a23c | 627 | void _maybe_wait_for_writeback(uint64_t len, ZTracer::Trace *trace); |
7c673cae FG |
628 | bool _flush_set_finish(C_GatherBuilder *gather, Context *onfinish); |
629 | ||
f67539c2 | 630 | void _discard(ObjectSet *oset, const std::vector<ObjectExtent>& exls, |
28e407b8 AA |
631 | C_GatherBuilder* gather); |
632 | void _discard_finish(ObjectSet *oset, bool was_dirty, Context* on_finish); | |
633 | ||
7c673cae FG |
634 | public: |
635 | bool set_is_empty(ObjectSet *oset); | |
636 | bool set_is_cached(ObjectSet *oset); | |
637 | bool set_is_dirty_or_committing(ObjectSet *oset); | |
638 | ||
639 | bool flush_set(ObjectSet *oset, Context *onfinish=0); | |
f67539c2 | 640 | bool flush_set(ObjectSet *oset, std::vector<ObjectExtent>& ex, |
31f18b77 | 641 | ZTracer::Trace *trace, Context *onfinish = 0); |
7c673cae FG |
642 | bool flush_all(Context *onfinish = 0); |
643 | ||
644 | void purge_set(ObjectSet *oset); | |
645 | ||
646 | // returns # of bytes not released (ie non-clean) | |
647 | loff_t release_set(ObjectSet *oset); | |
648 | uint64_t release_all(); | |
649 | ||
f67539c2 TL |
650 | void discard_set(ObjectSet *oset, const std::vector<ObjectExtent>& ex); |
651 | void discard_writeback(ObjectSet *oset, const std::vector<ObjectExtent>& ex, | |
28e407b8 | 652 | Context* on_finish); |
7c673cae FG |
653 | |
654 | /** | |
655 | * Retry any in-flight reads that get -ENOENT instead of marking | |
656 | * them zero, and get rid of any cached -ENOENTs. | |
657 | * After this is called and the cache's lock is unlocked, | |
658 | * any new requests will treat -ENOENT normally. | |
659 | */ | |
660 | void clear_nonexistence(ObjectSet *oset); | |
661 | ||
662 | ||
663 | // cache sizes | |
664 | void set_max_dirty(uint64_t v) { | |
665 | max_dirty = v; | |
666 | } | |
667 | void set_target_dirty(int64_t v) { | |
668 | target_dirty = v; | |
669 | } | |
670 | void set_max_size(int64_t v) { | |
671 | max_size = v; | |
672 | } | |
673 | void set_max_dirty_age(double a) { | |
f67539c2 | 674 | max_dirty_age = ceph::make_timespan(a); |
7c673cae FG |
675 | } |
676 | void set_max_objects(int64_t v) { | |
677 | max_objects = v; | |
678 | } | |
679 | ||
680 | ||
681 | // file functions | |
682 | ||
683 | /*** async+caching (non-blocking) file interface ***/ | |
684 | int file_is_cached(ObjectSet *oset, file_layout_t *layout, | |
685 | snapid_t snapid, loff_t offset, uint64_t len) { | |
f67539c2 | 686 | std::vector<ObjectExtent> extents; |
7c673cae FG |
687 | Striper::file_to_extents(cct, oset->ino, layout, offset, len, |
688 | oset->truncate_size, extents); | |
689 | return is_cached(oset, extents, snapid); | |
690 | } | |
691 | ||
692 | int file_read(ObjectSet *oset, file_layout_t *layout, snapid_t snapid, | |
f67539c2 | 693 | loff_t offset, uint64_t len, ceph::buffer::list *bl, int flags, |
7c673cae FG |
694 | Context *onfinish) { |
695 | OSDRead *rd = prepare_read(snapid, bl, flags); | |
696 | Striper::file_to_extents(cct, oset->ino, layout, offset, len, | |
697 | oset->truncate_size, rd->extents); | |
698 | return readx(rd, oset, onfinish); | |
699 | } | |
700 | ||
701 | int file_write(ObjectSet *oset, file_layout_t *layout, | |
702 | const SnapContext& snapc, loff_t offset, uint64_t len, | |
f67539c2 | 703 | ceph::buffer::list& bl, ceph::real_time mtime, int flags) { |
7c673cae FG |
704 | OSDWrite *wr = prepare_write(snapc, bl, mtime, flags, 0); |
705 | Striper::file_to_extents(cct, oset->ino, layout, offset, len, | |
706 | oset->truncate_size, wr->extents); | |
9f95a23c | 707 | return writex(wr, oset, nullptr); |
7c673cae FG |
708 | } |
709 | ||
710 | bool file_flush(ObjectSet *oset, file_layout_t *layout, | |
711 | const SnapContext& snapc, loff_t offset, uint64_t len, | |
712 | Context *onfinish) { | |
f67539c2 | 713 | std::vector<ObjectExtent> extents; |
7c673cae FG |
714 | Striper::file_to_extents(cct, oset->ino, layout, offset, len, |
715 | oset->truncate_size, extents); | |
31f18b77 FG |
716 | ZTracer::Trace trace; |
717 | return flush_set(oset, extents, &trace, onfinish); | |
7c673cae FG |
718 | } |
719 | }; | |
720 | ||
721 | ||
f67539c2 TL |
722 | inline std::ostream& operator<<(std::ostream &out, |
723 | const ObjectCacher::BufferHead &bh) | |
7c673cae FG |
724 | { |
725 | out << "bh[ " << &bh << " " | |
726 | << bh.start() << "~" << bh.length() | |
727 | << " " << bh.ob | |
728 | << " (" << bh.bl.length() << ")" | |
729 | << " v " << bh.last_write_tid; | |
730 | if (bh.get_journal_tid() != 0) { | |
731 | out << " j " << bh.get_journal_tid(); | |
732 | } | |
733 | if (bh.is_tx()) out << " tx"; | |
734 | if (bh.is_rx()) out << " rx"; | |
735 | if (bh.is_dirty()) out << " dirty"; | |
736 | if (bh.is_clean()) out << " clean"; | |
737 | if (bh.is_zero()) out << " zero"; | |
738 | if (bh.is_missing()) out << " missing"; | |
739 | if (bh.bl.length() > 0) out << " firstbyte=" << (int)bh.bl[0]; | |
740 | if (bh.error) out << " error=" << bh.error; | |
741 | out << "]"; | |
742 | out << " waiters = {"; | |
f67539c2 | 743 | for (auto it = bh.waitfor_read.begin(); it != bh.waitfor_read.end(); ++it) { |
7c673cae | 744 | out << " " << it->first << "->["; |
f67539c2 | 745 | for (auto lit = it->second.begin(); |
7c673cae FG |
746 | lit != it->second.end(); ++lit) { |
747 | out << *lit << ", "; | |
748 | } | |
749 | out << "]"; | |
750 | } | |
751 | out << "}"; | |
752 | return out; | |
753 | } | |
754 | ||
f67539c2 TL |
755 | inline std::ostream& operator<<(std::ostream &out, |
756 | const ObjectCacher::ObjectSet &os) | |
7c673cae FG |
757 | { |
758 | return out << "objectset[" << os.ino | |
759 | << " ts " << os.truncate_seq << "/" << os.truncate_size | |
760 | << " objects " << os.objects.size() | |
761 | << " dirty_or_tx " << os.dirty_or_tx | |
762 | << "]"; | |
763 | } | |
764 | ||
f67539c2 TL |
765 | inline std::ostream& operator<<(std::ostream &out, |
766 | const ObjectCacher::Object &ob) | |
7c673cae FG |
767 | { |
768 | out << "object[" | |
f67539c2 | 769 | << ob.get_soid() << " oset " << ob.oset << std::dec |
7c673cae FG |
770 | << " wr " << ob.last_write_tid << "/" << ob.last_commit_tid; |
771 | ||
772 | if (ob.complete) | |
773 | out << " COMPLETE"; | |
774 | if (!ob.exists) | |
775 | out << " !EXISTS"; | |
776 | ||
777 | out << "]"; | |
778 | return out; | |
779 | } | |
780 | ||
781 | #endif |