]>
Commit | Line | Data |
---|---|---|
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- | |
2 | // vim: ts=8 sw=2 smarttab | |
3 | #ifndef CEPH_OBJECTCACHER_H | |
4 | #define CEPH_OBJECTCACHER_H | |
5 | ||
6 | #include "include/types.h" | |
7 | #include "include/lru.h" | |
8 | #include "include/Context.h" | |
9 | #include "include/xlist.h" | |
10 | #include "include/common_fwd.h" | |
11 | ||
12 | #include "common/Cond.h" | |
13 | #include "common/Finisher.h" | |
14 | #include "common/Thread.h" | |
15 | #include "common/zipkin_trace.h" | |
16 | ||
17 | #include "Objecter.h" | |
18 | #include "Striper.h" | |
19 | ||
20 | class WritebackHandler; | |
21 | ||
22 | enum { | |
23 | l_objectcacher_first = 25000, | |
24 | ||
25 | l_objectcacher_cache_ops_hit, // ops we satisfy completely from cache | |
26 | l_objectcacher_cache_ops_miss, // ops we don't satisfy completely from cache | |
27 | ||
28 | l_objectcacher_cache_bytes_hit, // bytes read directly from cache | |
29 | ||
30 | l_objectcacher_cache_bytes_miss, // bytes we couldn't read directly | |
31 | ||
32 | // from cache | |
33 | ||
34 | l_objectcacher_data_read, // total bytes read out | |
35 | l_objectcacher_data_written, // bytes written to cache | |
36 | l_objectcacher_data_flushed, // bytes flushed to WritebackHandler | |
37 | l_objectcacher_overwritten_in_flush, // bytes overwritten while | |
38 | // flushing is in progress | |
39 | ||
40 | l_objectcacher_write_ops_blocked, // total write ops we delayed due | |
41 | // to dirty limits | |
42 | l_objectcacher_write_bytes_blocked, // total number of write bytes | |
43 | // we delayed due to dirty | |
44 | // limits | |
45 | l_objectcacher_write_time_blocked, // total time in seconds spent | |
46 | // blocking a write due to dirty | |
47 | // limits | |
48 | ||
49 | l_objectcacher_last, | |
50 | }; | |
51 | ||
52 | class ObjectCacher { | |
53 | PerfCounters *perfcounter; | |
54 | public: | |
55 | CephContext *cct; | |
56 | class Object; | |
57 | struct ObjectSet; | |
58 | class C_ReadFinish; | |
59 | ||
60 | typedef void (*flush_set_callback_t) (void *p, ObjectSet *oset); | |
61 | ||
62 | // read scatter/gather | |
63 | struct OSDRead { | |
64 | vector<ObjectExtent> extents; | |
65 | snapid_t snap; | |
66 | bufferlist *bl; | |
67 | int fadvise_flags; | |
68 | OSDRead(snapid_t s, bufferlist *b, int f) | |
69 | : snap(s), bl(b), fadvise_flags(f) {} | |
70 | }; | |
71 | ||
72 | OSDRead *prepare_read(snapid_t snap, bufferlist *b, int f) const { | |
73 | return new OSDRead(snap, b, f); | |
74 | } | |
75 | ||
76 | // write scatter/gather | |
77 | struct OSDWrite { | |
78 | vector<ObjectExtent> extents; | |
79 | SnapContext snapc; | |
80 | bufferlist bl; | |
81 | ceph::real_time mtime; | |
82 | int fadvise_flags; | |
83 | ceph_tid_t journal_tid; | |
84 | OSDWrite(const SnapContext& sc, const bufferlist& b, ceph::real_time mt, | |
85 | int f, ceph_tid_t _journal_tid) | |
86 | : snapc(sc), bl(b), mtime(mt), fadvise_flags(f), | |
87 | journal_tid(_journal_tid) {} | |
88 | }; | |
89 | ||
90 | OSDWrite *prepare_write(const SnapContext& sc, | |
91 | const bufferlist &b, | |
92 | ceph::real_time mt, | |
93 | int f, | |
94 | ceph_tid_t journal_tid) const { | |
95 | return new OSDWrite(sc, b, mt, f, journal_tid); | |
96 | } | |
97 | ||
98 | ||
99 | ||
100 | // ******* BufferHead ********* | |
101 | class BufferHead : public LRUObject { | |
102 | public: | |
103 | // states | |
104 | static const int STATE_MISSING = 0; | |
105 | static const int STATE_CLEAN = 1; | |
106 | static const int STATE_ZERO = 2; // NOTE: these are *clean* zeros | |
107 | static const int STATE_DIRTY = 3; | |
108 | static const int STATE_RX = 4; | |
109 | static const int STATE_TX = 5; | |
110 | static const int STATE_ERROR = 6; // a read error occurred | |
111 | ||
112 | private: | |
113 | // my fields | |
114 | int state; | |
115 | int ref; | |
116 | struct { | |
117 | loff_t start, length; // bh extent in object | |
118 | } ex; | |
119 | bool dontneed; //indicate bh don't need by anyone | |
120 | bool nocache; //indicate bh don't need by this caller | |
121 | ||
122 | public: | |
123 | Object *ob; | |
124 | bufferlist bl; | |
125 | ceph_tid_t last_write_tid; // version of bh (if non-zero) | |
126 | ceph_tid_t last_read_tid; // tid of last read op (if any) | |
127 | ceph::real_time last_write; | |
128 | SnapContext snapc; | |
129 | ceph_tid_t journal_tid; | |
130 | int error; // holds return value for failed reads | |
131 | ||
132 | map<loff_t, list<Context*> > waitfor_read; | |
133 | ||
134 | // cons | |
135 | explicit BufferHead(Object *o) : | |
136 | state(STATE_MISSING), | |
137 | ref(0), | |
138 | dontneed(false), | |
139 | nocache(false), | |
140 | ob(o), | |
141 | last_write_tid(0), | |
142 | last_read_tid(0), | |
143 | journal_tid(0), | |
144 | error(0) { | |
145 | ex.start = ex.length = 0; | |
146 | } | |
147 | ||
148 | // extent | |
149 | loff_t start() const { return ex.start; } | |
150 | void set_start(loff_t s) { ex.start = s; } | |
151 | loff_t length() const { return ex.length; } | |
152 | void set_length(loff_t l) { ex.length = l; } | |
153 | loff_t end() const { return ex.start + ex.length; } | |
154 | loff_t last() const { return end() - 1; } | |
155 | ||
156 | // states | |
157 | void set_state(int s) { | |
158 | if (s == STATE_RX || s == STATE_TX) get(); | |
159 | if (state == STATE_RX || state == STATE_TX) put(); | |
160 | state = s; | |
161 | } | |
162 | int get_state() const { return state; } | |
163 | ||
164 | inline ceph_tid_t get_journal_tid() const { | |
165 | return journal_tid; | |
166 | } | |
167 | inline void set_journal_tid(ceph_tid_t _journal_tid) { | |
168 | journal_tid = _journal_tid; | |
169 | } | |
170 | ||
171 | bool is_missing() const { return state == STATE_MISSING; } | |
172 | bool is_dirty() const { return state == STATE_DIRTY; } | |
173 | bool is_clean() const { return state == STATE_CLEAN; } | |
174 | bool is_zero() const { return state == STATE_ZERO; } | |
175 | bool is_tx() const { return state == STATE_TX; } | |
176 | bool is_rx() const { return state == STATE_RX; } | |
177 | bool is_error() const { return state == STATE_ERROR; } | |
178 | ||
179 | // reference counting | |
180 | int get() { | |
181 | ceph_assert(ref >= 0); | |
182 | if (ref == 0) lru_pin(); | |
183 | return ++ref; | |
184 | } | |
185 | int put() { | |
186 | ceph_assert(ref > 0); | |
187 | if (ref == 1) lru_unpin(); | |
188 | --ref; | |
189 | return ref; | |
190 | } | |
191 | ||
192 | void set_dontneed(bool v) { | |
193 | dontneed = v; | |
194 | } | |
195 | bool get_dontneed() const { | |
196 | return dontneed; | |
197 | } | |
198 | ||
199 | void set_nocache(bool v) { | |
200 | nocache = v; | |
201 | } | |
202 | bool get_nocache() const { | |
203 | return nocache; | |
204 | } | |
205 | ||
206 | inline bool can_merge_journal(BufferHead *bh) const { | |
207 | return (get_journal_tid() == bh->get_journal_tid()); | |
208 | } | |
209 | ||
210 | struct ptr_lt { | |
211 | bool operator()(const BufferHead* l, const BufferHead* r) const { | |
212 | const Object *lob = l->ob; | |
213 | const Object *rob = r->ob; | |
214 | const ObjectSet *loset = lob->oset; | |
215 | const ObjectSet *roset = rob->oset; | |
216 | if (loset != roset) | |
217 | return loset < roset; | |
218 | if (lob != rob) | |
219 | return lob < rob; | |
220 | if (l->start() != r->start()) | |
221 | return l->start() < r->start(); | |
222 | return l < r; | |
223 | } | |
224 | }; | |
225 | }; | |
226 | ||
227 | // ******* Object ********* | |
228 | class Object : public LRUObject { | |
229 | private: | |
230 | // ObjectCacher::Object fields | |
231 | int ref; | |
232 | ObjectCacher *oc; | |
233 | sobject_t oid; | |
234 | friend struct ObjectSet; | |
235 | ||
236 | public: | |
237 | uint64_t object_no; | |
238 | ObjectSet *oset; | |
239 | xlist<Object*>::item set_item; | |
240 | object_locator_t oloc; | |
241 | uint64_t truncate_size, truncate_seq; | |
242 | ||
243 | bool complete; | |
244 | bool exists; | |
245 | ||
246 | map<loff_t, BufferHead*> data; | |
247 | ||
248 | ceph_tid_t last_write_tid; // version of bh (if non-zero) | |
249 | ceph_tid_t last_commit_tid; // last update committed. | |
250 | ||
251 | int dirty_or_tx; | |
252 | ||
253 | map< ceph_tid_t, list<Context*> > waitfor_commit; | |
254 | xlist<C_ReadFinish*> reads; | |
255 | ||
256 | Object(const Object&) = delete; | |
257 | Object& operator=(const Object&) = delete; | |
258 | ||
259 | Object(ObjectCacher *_oc, sobject_t o, uint64_t ono, ObjectSet *os, | |
260 | object_locator_t& l, uint64_t ts, uint64_t tq) : | |
261 | ref(0), | |
262 | oc(_oc), | |
263 | oid(o), object_no(ono), oset(os), set_item(this), oloc(l), | |
264 | truncate_size(ts), truncate_seq(tq), | |
265 | complete(false), exists(true), | |
266 | last_write_tid(0), last_commit_tid(0), | |
267 | dirty_or_tx(0) { | |
268 | // add to set | |
269 | os->objects.push_back(&set_item); | |
270 | } | |
271 | ~Object() { | |
272 | reads.clear(); | |
273 | ceph_assert(ref == 0); | |
274 | ceph_assert(data.empty()); | |
275 | ceph_assert(dirty_or_tx == 0); | |
276 | set_item.remove_myself(); | |
277 | } | |
278 | ||
279 | sobject_t get_soid() const { return oid; } | |
280 | object_t get_oid() { return oid.oid; } | |
281 | snapid_t get_snap() { return oid.snap; } | |
282 | ObjectSet *get_object_set() const { return oset; } | |
283 | string get_namespace() { return oloc.nspace; } | |
284 | uint64_t get_object_number() const { return object_no; } | |
285 | ||
286 | const object_locator_t& get_oloc() const { return oloc; } | |
287 | void set_object_locator(object_locator_t& l) { oloc = l; } | |
288 | ||
289 | bool can_close() const { | |
290 | if (lru_is_expireable()) { | |
291 | ceph_assert(data.empty()); | |
292 | ceph_assert(waitfor_commit.empty()); | |
293 | return true; | |
294 | } | |
295 | return false; | |
296 | } | |
297 | ||
298 | /** | |
299 | * Check buffers and waiters for consistency | |
300 | * - no overlapping buffers | |
301 | * - index in map matches BH | |
302 | * - waiters fall within BH | |
303 | */ | |
304 | void audit_buffers(); | |
305 | ||
306 | /** | |
307 | * find first buffer that includes or follows an offset | |
308 | * | |
309 | * @param offset object byte offset | |
310 | * @return iterator pointing to buffer, or data.end() | |
311 | */ | |
312 | map<loff_t,BufferHead*>::const_iterator data_lower_bound(loff_t offset) const { | |
313 | map<loff_t,BufferHead*>::const_iterator p = data.lower_bound(offset); | |
314 | if (p != data.begin() && | |
315 | (p == data.end() || p->first > offset)) { | |
316 | --p; // might overlap! | |
317 | if (p->first + p->second->length() <= offset) | |
318 | ++p; // doesn't overlap. | |
319 | } | |
320 | return p; | |
321 | } | |
322 | ||
323 | // bh | |
324 | // add to my map | |
325 | void add_bh(BufferHead *bh) { | |
326 | if (data.empty()) | |
327 | get(); | |
328 | ceph_assert(data.count(bh->start()) == 0); | |
329 | data[bh->start()] = bh; | |
330 | } | |
331 | void remove_bh(BufferHead *bh) { | |
332 | ceph_assert(data.count(bh->start())); | |
333 | data.erase(bh->start()); | |
334 | if (data.empty()) | |
335 | put(); | |
336 | } | |
337 | ||
338 | bool is_empty() const { return data.empty(); } | |
339 | ||
340 | // mid-level | |
341 | BufferHead *split(BufferHead *bh, loff_t off); | |
342 | void merge_left(BufferHead *left, BufferHead *right); | |
343 | bool can_merge_bh(BufferHead *left, BufferHead *right); | |
344 | void try_merge_bh(BufferHead *bh); | |
345 | void maybe_rebuild_buffer(BufferHead *bh); | |
346 | ||
347 | bool is_cached(loff_t off, loff_t len) const; | |
348 | bool include_all_cached_data(loff_t off, loff_t len); | |
349 | int map_read(ObjectExtent &ex, | |
350 | map<loff_t, BufferHead*>& hits, | |
351 | map<loff_t, BufferHead*>& missing, | |
352 | map<loff_t, BufferHead*>& rx, | |
353 | map<loff_t, BufferHead*>& errors); | |
354 | BufferHead *map_write(ObjectExtent &ex, ceph_tid_t tid); | |
355 | ||
356 | void replace_journal_tid(BufferHead *bh, ceph_tid_t tid); | |
357 | void truncate(loff_t s); | |
358 | void discard(loff_t off, loff_t len, C_GatherBuilder* commit_gather); | |
359 | ||
360 | // reference counting | |
361 | int get() { | |
362 | ceph_assert(ref >= 0); | |
363 | if (ref == 0) lru_pin(); | |
364 | return ++ref; | |
365 | } | |
366 | int put() { | |
367 | ceph_assert(ref > 0); | |
368 | if (ref == 1) lru_unpin(); | |
369 | --ref; | |
370 | return ref; | |
371 | } | |
372 | }; | |
373 | ||
374 | ||
375 | struct ObjectSet { | |
376 | void *parent; | |
377 | ||
378 | inodeno_t ino; | |
379 | uint64_t truncate_seq, truncate_size; | |
380 | ||
381 | int64_t poolid; | |
382 | xlist<Object*> objects; | |
383 | ||
384 | int dirty_or_tx; | |
385 | bool return_enoent; | |
386 | ||
387 | ObjectSet(void *p, int64_t _poolid, inodeno_t i) | |
388 | : parent(p), ino(i), truncate_seq(0), | |
389 | truncate_size(0), poolid(_poolid), dirty_or_tx(0), | |
390 | return_enoent(false) {} | |
391 | ||
392 | }; | |
393 | ||
394 | ||
395 | // ******* ObjectCacher ********* | |
396 | // ObjectCacher fields | |
397 | private: | |
398 | WritebackHandler& writeback_handler; | |
399 | bool scattered_write; | |
400 | ||
401 | string name; | |
402 | ceph::mutex& lock; | |
403 | ||
404 | uint64_t max_dirty, target_dirty, max_size, max_objects; | |
405 | ceph::timespan max_dirty_age; | |
406 | bool block_writes_upfront; | |
407 | ||
408 | ZTracer::Endpoint trace_endpoint; | |
409 | ||
410 | flush_set_callback_t flush_set_callback; | |
411 | void *flush_set_callback_arg; | |
412 | ||
413 | // indexed by pool_id | |
414 | vector<ceph::unordered_map<sobject_t, Object*> > objects; | |
415 | ||
416 | list<Context*> waitfor_read; | |
417 | ||
418 | ceph_tid_t last_read_tid; | |
419 | ||
420 | set<BufferHead*, BufferHead::ptr_lt> dirty_or_tx_bh; | |
421 | LRU bh_lru_dirty, bh_lru_rest; | |
422 | LRU ob_lru; | |
423 | ||
424 | ceph::condition_variable flusher_cond; | |
425 | bool flusher_stop; | |
426 | void flusher_entry(); | |
427 | class FlusherThread : public Thread { | |
428 | ObjectCacher *oc; | |
429 | public: | |
430 | explicit FlusherThread(ObjectCacher *o) : oc(o) {} | |
431 | void *entry() override { | |
432 | oc->flusher_entry(); | |
433 | return 0; | |
434 | } | |
435 | } flusher_thread; | |
436 | ||
437 | Finisher finisher; | |
438 | ||
439 | // objects | |
440 | Object *get_object_maybe(sobject_t oid, object_locator_t &l) { | |
441 | // have it? | |
442 | if (((uint32_t)l.pool < objects.size()) && | |
443 | (objects[l.pool].count(oid))) | |
444 | return objects[l.pool][oid]; | |
445 | return NULL; | |
446 | } | |
447 | ||
448 | Object *get_object(sobject_t oid, uint64_t object_no, ObjectSet *oset, | |
449 | object_locator_t &l, uint64_t truncate_size, | |
450 | uint64_t truncate_seq); | |
451 | void close_object(Object *ob); | |
452 | ||
453 | // bh stats | |
454 | ceph::condition_variable stat_cond; | |
455 | ||
456 | loff_t stat_clean; | |
457 | loff_t stat_zero; | |
458 | loff_t stat_dirty; | |
459 | loff_t stat_rx; | |
460 | loff_t stat_tx; | |
461 | loff_t stat_missing; | |
462 | loff_t stat_error; | |
463 | loff_t stat_dirty_waiting; // bytes that writers are waiting on to write | |
464 | ||
465 | size_t stat_nr_dirty_waiters; | |
466 | ||
467 | void verify_stats() const; | |
468 | ||
469 | void bh_stat_add(BufferHead *bh); | |
470 | void bh_stat_sub(BufferHead *bh); | |
471 | loff_t get_stat_tx() const { return stat_tx; } | |
472 | loff_t get_stat_rx() const { return stat_rx; } | |
473 | loff_t get_stat_dirty() const { return stat_dirty; } | |
474 | loff_t get_stat_clean() const { return stat_clean; } | |
475 | loff_t get_stat_zero() const { return stat_zero; } | |
476 | loff_t get_stat_dirty_waiting() const { return stat_dirty_waiting; } | |
477 | size_t get_stat_nr_dirty_waiters() const { return stat_nr_dirty_waiters; } | |
478 | ||
479 | void touch_bh(BufferHead *bh) { | |
480 | if (bh->is_dirty()) | |
481 | bh_lru_dirty.lru_touch(bh); | |
482 | else | |
483 | bh_lru_rest.lru_touch(bh); | |
484 | ||
485 | bh->set_dontneed(false); | |
486 | bh->set_nocache(false); | |
487 | touch_ob(bh->ob); | |
488 | } | |
489 | void touch_ob(Object *ob) { | |
490 | ob_lru.lru_touch(ob); | |
491 | } | |
492 | void bottouch_ob(Object *ob) { | |
493 | ob_lru.lru_bottouch(ob); | |
494 | } | |
495 | ||
496 | // bh states | |
497 | void bh_set_state(BufferHead *bh, int s); | |
498 | void copy_bh_state(BufferHead *bh1, BufferHead *bh2) { | |
499 | bh_set_state(bh2, bh1->get_state()); | |
500 | } | |
501 | ||
502 | void mark_missing(BufferHead *bh) { | |
503 | bh_set_state(bh,BufferHead::STATE_MISSING); | |
504 | } | |
505 | void mark_clean(BufferHead *bh) { | |
506 | bh_set_state(bh, BufferHead::STATE_CLEAN); | |
507 | } | |
508 | void mark_zero(BufferHead *bh) { | |
509 | bh_set_state(bh, BufferHead::STATE_ZERO); | |
510 | } | |
511 | void mark_rx(BufferHead *bh) { | |
512 | bh_set_state(bh, BufferHead::STATE_RX); | |
513 | } | |
514 | void mark_tx(BufferHead *bh) { | |
515 | bh_set_state(bh, BufferHead::STATE_TX); } | |
516 | void mark_error(BufferHead *bh) { | |
517 | bh_set_state(bh, BufferHead::STATE_ERROR); | |
518 | } | |
519 | void mark_dirty(BufferHead *bh) { | |
520 | bh_set_state(bh, BufferHead::STATE_DIRTY); | |
521 | bh_lru_dirty.lru_touch(bh); | |
522 | //bh->set_dirty_stamp(ceph_clock_now()); | |
523 | } | |
524 | ||
525 | void bh_add(Object *ob, BufferHead *bh); | |
526 | void bh_remove(Object *ob, BufferHead *bh); | |
527 | ||
528 | // io | |
529 | void bh_read(BufferHead *bh, int op_flags, | |
530 | const ZTracer::Trace &parent_trace); | |
531 | void bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace); | |
532 | void bh_write_scattered(list<BufferHead*>& blist); | |
533 | void bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff, | |
534 | int64_t *amount, int *max_count); | |
535 | ||
536 | void trim(); | |
537 | void flush(ZTracer::Trace *trace, loff_t amount=0); | |
538 | ||
539 | /** | |
540 | * flush a range of buffers | |
541 | * | |
542 | * Flush any buffers that intersect the specified extent. If len==0, | |
543 | * flush *all* buffers for the object. | |
544 | * | |
545 | * @param o object | |
546 | * @param off start offset | |
547 | * @param len extent length, or 0 for entire object | |
548 | * @return true if object was already clean/flushed. | |
549 | */ | |
550 | bool flush(Object *o, loff_t off, loff_t len, | |
551 | ZTracer::Trace *trace); | |
552 | loff_t release(Object *o); | |
553 | void purge(Object *o); | |
554 | ||
555 | int64_t reads_outstanding; | |
556 | ceph::condition_variable read_cond; | |
557 | ||
558 | int _readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, | |
559 | bool external_call, ZTracer::Trace *trace); | |
560 | void retry_waiting_reads(); | |
561 | ||
562 | public: | |
563 | void bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid, | |
564 | loff_t offset, uint64_t length, | |
565 | bufferlist &bl, int r, | |
566 | bool trust_enoent); | |
567 | void bh_write_commit(int64_t poolid, sobject_t oid, | |
568 | vector<pair<loff_t, uint64_t> >& ranges, | |
569 | ceph_tid_t t, int r); | |
570 | ||
571 | class C_WriteCommit; | |
572 | class C_WaitForWrite; | |
573 | ||
574 | void perf_start(); | |
575 | void perf_stop(); | |
576 | ||
577 | ||
578 | ||
579 | ObjectCacher(CephContext *cct_, string name, WritebackHandler& wb, ceph::mutex& l, | |
580 | flush_set_callback_t flush_callback, | |
581 | void *flush_callback_arg, | |
582 | uint64_t max_bytes, uint64_t max_objects, | |
583 | uint64_t max_dirty, uint64_t target_dirty, double max_age, | |
584 | bool block_writes_upfront); | |
585 | ~ObjectCacher(); | |
586 | ||
587 | void start() { | |
588 | flusher_thread.create("flusher"); | |
589 | } | |
590 | void stop() { | |
591 | ceph_assert(flusher_thread.is_started()); | |
592 | lock.lock(); // hmm.. watch out for deadlock! | |
593 | flusher_stop = true; | |
594 | flusher_cond.notify_all(); | |
595 | lock.unlock(); | |
596 | flusher_thread.join(); | |
597 | } | |
598 | ||
599 | ||
600 | class C_RetryRead; | |
601 | ||
602 | ||
603 | // non-blocking. async. | |
604 | ||
605 | /** | |
606 | * @note total read size must be <= INT_MAX, since | |
607 | * the return value is total bytes read | |
608 | */ | |
609 | int readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, | |
610 | ZTracer::Trace *parent_trace = nullptr); | |
611 | int writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace, | |
612 | ZTracer::Trace *parent_trace = nullptr); | |
613 | bool is_cached(ObjectSet *oset, vector<ObjectExtent>& extents, | |
614 | snapid_t snapid); | |
615 | ||
616 | private: | |
617 | // write blocking | |
618 | int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, | |
619 | ZTracer::Trace *trace, Context *onfreespace); | |
620 | void _maybe_wait_for_writeback(uint64_t len, ZTracer::Trace *trace); | |
621 | bool _flush_set_finish(C_GatherBuilder *gather, Context *onfinish); | |
622 | ||
623 | void _discard(ObjectSet *oset, const vector<ObjectExtent>& exls, | |
624 | C_GatherBuilder* gather); | |
625 | void _discard_finish(ObjectSet *oset, bool was_dirty, Context* on_finish); | |
626 | ||
627 | public: | |
628 | bool set_is_empty(ObjectSet *oset); | |
629 | bool set_is_cached(ObjectSet *oset); | |
630 | bool set_is_dirty_or_committing(ObjectSet *oset); | |
631 | ||
632 | bool flush_set(ObjectSet *oset, Context *onfinish=0); | |
633 | bool flush_set(ObjectSet *oset, vector<ObjectExtent>& ex, | |
634 | ZTracer::Trace *trace, Context *onfinish = 0); | |
635 | bool flush_all(Context *onfinish = 0); | |
636 | ||
637 | void purge_set(ObjectSet *oset); | |
638 | ||
639 | // returns # of bytes not released (ie non-clean) | |
640 | loff_t release_set(ObjectSet *oset); | |
641 | uint64_t release_all(); | |
642 | ||
643 | void discard_set(ObjectSet *oset, const vector<ObjectExtent>& ex); | |
644 | void discard_writeback(ObjectSet *oset, const vector<ObjectExtent>& ex, | |
645 | Context* on_finish); | |
646 | ||
647 | /** | |
648 | * Retry any in-flight reads that get -ENOENT instead of marking | |
649 | * them zero, and get rid of any cached -ENOENTs. | |
650 | * After this is called and the cache's lock is unlocked, | |
651 | * any new requests will treat -ENOENT normally. | |
652 | */ | |
653 | void clear_nonexistence(ObjectSet *oset); | |
654 | ||
655 | ||
656 | // cache sizes | |
657 | void set_max_dirty(uint64_t v) { | |
658 | max_dirty = v; | |
659 | } | |
660 | void set_target_dirty(int64_t v) { | |
661 | target_dirty = v; | |
662 | } | |
663 | void set_max_size(int64_t v) { | |
664 | max_size = v; | |
665 | } | |
666 | void set_max_dirty_age(double a) { | |
667 | max_dirty_age = make_timespan(a); | |
668 | } | |
669 | void set_max_objects(int64_t v) { | |
670 | max_objects = v; | |
671 | } | |
672 | ||
673 | ||
674 | // file functions | |
675 | ||
676 | /*** async+caching (non-blocking) file interface ***/ | |
677 | int file_is_cached(ObjectSet *oset, file_layout_t *layout, | |
678 | snapid_t snapid, loff_t offset, uint64_t len) { | |
679 | vector<ObjectExtent> extents; | |
680 | Striper::file_to_extents(cct, oset->ino, layout, offset, len, | |
681 | oset->truncate_size, extents); | |
682 | return is_cached(oset, extents, snapid); | |
683 | } | |
684 | ||
685 | int file_read(ObjectSet *oset, file_layout_t *layout, snapid_t snapid, | |
686 | loff_t offset, uint64_t len, bufferlist *bl, int flags, | |
687 | Context *onfinish) { | |
688 | OSDRead *rd = prepare_read(snapid, bl, flags); | |
689 | Striper::file_to_extents(cct, oset->ino, layout, offset, len, | |
690 | oset->truncate_size, rd->extents); | |
691 | return readx(rd, oset, onfinish); | |
692 | } | |
693 | ||
694 | int file_write(ObjectSet *oset, file_layout_t *layout, | |
695 | const SnapContext& snapc, loff_t offset, uint64_t len, | |
696 | bufferlist& bl, ceph::real_time mtime, int flags) { | |
697 | OSDWrite *wr = prepare_write(snapc, bl, mtime, flags, 0); | |
698 | Striper::file_to_extents(cct, oset->ino, layout, offset, len, | |
699 | oset->truncate_size, wr->extents); | |
700 | return writex(wr, oset, nullptr); | |
701 | } | |
702 | ||
703 | bool file_flush(ObjectSet *oset, file_layout_t *layout, | |
704 | const SnapContext& snapc, loff_t offset, uint64_t len, | |
705 | Context *onfinish) { | |
706 | vector<ObjectExtent> extents; | |
707 | Striper::file_to_extents(cct, oset->ino, layout, offset, len, | |
708 | oset->truncate_size, extents); | |
709 | ZTracer::Trace trace; | |
710 | return flush_set(oset, extents, &trace, onfinish); | |
711 | } | |
712 | }; | |
713 | ||
714 | ||
715 | inline ostream& operator<<(ostream &out, const ObjectCacher::BufferHead &bh) | |
716 | { | |
717 | out << "bh[ " << &bh << " " | |
718 | << bh.start() << "~" << bh.length() | |
719 | << " " << bh.ob | |
720 | << " (" << bh.bl.length() << ")" | |
721 | << " v " << bh.last_write_tid; | |
722 | if (bh.get_journal_tid() != 0) { | |
723 | out << " j " << bh.get_journal_tid(); | |
724 | } | |
725 | if (bh.is_tx()) out << " tx"; | |
726 | if (bh.is_rx()) out << " rx"; | |
727 | if (bh.is_dirty()) out << " dirty"; | |
728 | if (bh.is_clean()) out << " clean"; | |
729 | if (bh.is_zero()) out << " zero"; | |
730 | if (bh.is_missing()) out << " missing"; | |
731 | if (bh.bl.length() > 0) out << " firstbyte=" << (int)bh.bl[0]; | |
732 | if (bh.error) out << " error=" << bh.error; | |
733 | out << "]"; | |
734 | out << " waiters = {"; | |
735 | for (map<loff_t, list<Context*> >::const_iterator it | |
736 | = bh.waitfor_read.begin(); | |
737 | it != bh.waitfor_read.end(); ++it) { | |
738 | out << " " << it->first << "->["; | |
739 | for (list<Context*>::const_iterator lit = it->second.begin(); | |
740 | lit != it->second.end(); ++lit) { | |
741 | out << *lit << ", "; | |
742 | } | |
743 | out << "]"; | |
744 | } | |
745 | out << "}"; | |
746 | return out; | |
747 | } | |
748 | ||
749 | inline ostream& operator<<(ostream &out, const ObjectCacher::ObjectSet &os) | |
750 | { | |
751 | return out << "objectset[" << os.ino | |
752 | << " ts " << os.truncate_seq << "/" << os.truncate_size | |
753 | << " objects " << os.objects.size() | |
754 | << " dirty_or_tx " << os.dirty_or_tx | |
755 | << "]"; | |
756 | } | |
757 | ||
758 | inline ostream& operator<<(ostream &out, const ObjectCacher::Object &ob) | |
759 | { | |
760 | out << "object[" | |
761 | << ob.get_soid() << " oset " << ob.oset << dec | |
762 | << " wr " << ob.last_write_tid << "/" << ob.last_commit_tid; | |
763 | ||
764 | if (ob.complete) | |
765 | out << " COMPLETE"; | |
766 | if (!ob.exists) | |
767 | out << " !EXISTS"; | |
768 | ||
769 | out << "]"; | |
770 | return out; | |
771 | } | |
772 | ||
773 | #endif |