1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #ifndef CEPH_OBJECTCACHER_H
4 #define CEPH_OBJECTCACHER_H
6 #include "include/types.h"
7 #include "include/lru.h"
8 #include "include/Context.h"
9 #include "include/xlist.h"
11 #include "common/Cond.h"
12 #include "common/Finisher.h"
13 #include "common/Thread.h"
19 class WritebackHandler
;
23 l_objectcacher_first
= 25000,
25 l_objectcacher_cache_ops_hit
, // ops we satisfy completely from cache
26 l_objectcacher_cache_ops_miss
, // ops we don't satisfy completely from cache
28 l_objectcacher_cache_bytes_hit
, // bytes read directly from cache
30 l_objectcacher_cache_bytes_miss
, // bytes we couldn't read directly
34 l_objectcacher_data_read
, // total bytes read out
35 l_objectcacher_data_written
, // bytes written to cache
36 l_objectcacher_data_flushed
, // bytes flushed to WritebackHandler
37 l_objectcacher_overwritten_in_flush
, // bytes overwritten while
38 // flushing is in progress
40 l_objectcacher_write_ops_blocked
, // total write ops we delayed due
42 l_objectcacher_write_bytes_blocked
, // total number of write bytes
43 // we delayed due to dirty
45 l_objectcacher_write_time_blocked
, // total time in seconds spent
46 // blocking a write due to dirty
53 PerfCounters
*perfcounter
;
60 typedef void (*flush_set_callback_t
) (void *p
, ObjectSet
*oset
);
62 // read scatter/gather
64 vector
<ObjectExtent
> extents
;
68 OSDRead(snapid_t s
, bufferlist
*b
, int f
)
69 : snap(s
), bl(b
), fadvise_flags(f
) {}
72 OSDRead
*prepare_read(snapid_t snap
, bufferlist
*b
, int f
) const {
73 return new OSDRead(snap
, b
, f
);
76 // write scatter/gather
78 vector
<ObjectExtent
> extents
;
81 ceph::real_time mtime
;
83 ceph_tid_t journal_tid
;
84 OSDWrite(const SnapContext
& sc
, const bufferlist
& b
, ceph::real_time mt
,
85 int f
, ceph_tid_t _journal_tid
)
86 : snapc(sc
), bl(b
), mtime(mt
), fadvise_flags(f
),
87 journal_tid(_journal_tid
) {}
90 OSDWrite
*prepare_write(const SnapContext
& sc
,
94 ceph_tid_t journal_tid
) const {
95 return new OSDWrite(sc
, b
, mt
, f
, journal_tid
);
100 // ******* BufferHead *********
101 class BufferHead
: public LRUObject
{
104 static const int STATE_MISSING
= 0;
105 static const int STATE_CLEAN
= 1;
106 static const int STATE_ZERO
= 2; // NOTE: these are *clean* zeros
107 static const int STATE_DIRTY
= 3;
108 static const int STATE_RX
= 4;
109 static const int STATE_TX
= 5;
110 static const int STATE_ERROR
= 6; // a read error occurred
117 loff_t start
, length
; // bh extent in object
119 bool dontneed
; //indicate bh don't need by anyone
120 bool nocache
; //indicate bh don't need by this caller
125 ceph_tid_t last_write_tid
; // version of bh (if non-zero)
126 ceph_tid_t last_read_tid
; // tid of last read op (if any)
127 ceph::real_time last_write
;
129 ceph_tid_t journal_tid
;
130 int error
; // holds return value for failed reads
132 map
<loff_t
, list
<Context
*> > waitfor_read
;
135 explicit BufferHead(Object
*o
) :
136 state(STATE_MISSING
),
145 ex
.start
= ex
.length
= 0;
149 loff_t
start() const { return ex
.start
; }
150 void set_start(loff_t s
) { ex
.start
= s
; }
151 loff_t
length() const { return ex
.length
; }
152 void set_length(loff_t l
) { ex
.length
= l
; }
153 loff_t
end() const { return ex
.start
+ ex
.length
; }
154 loff_t
last() const { return end() - 1; }
157 void set_state(int s
) {
158 if (s
== STATE_RX
|| s
== STATE_TX
) get();
159 if (state
== STATE_RX
|| state
== STATE_TX
) put();
162 int get_state() const { return state
; }
164 inline ceph_tid_t
get_journal_tid() const {
167 inline void set_journal_tid(ceph_tid_t _journal_tid
) {
168 journal_tid
= _journal_tid
;
171 bool is_missing() const { return state
== STATE_MISSING
; }
172 bool is_dirty() const { return state
== STATE_DIRTY
; }
173 bool is_clean() const { return state
== STATE_CLEAN
; }
174 bool is_zero() const { return state
== STATE_ZERO
; }
175 bool is_tx() const { return state
== STATE_TX
; }
176 bool is_rx() const { return state
== STATE_RX
; }
177 bool is_error() const { return state
== STATE_ERROR
; }
179 // reference counting
182 if (ref
== 0) lru_pin();
187 if (ref
== 1) lru_unpin();
192 void set_dontneed(bool v
) {
195 bool get_dontneed() const {
199 void set_nocache(bool v
) {
202 bool get_nocache() const {
206 inline bool can_merge_journal(BufferHead
*bh
) const {
207 return (get_journal_tid() == bh
->get_journal_tid());
211 bool operator()(const BufferHead
* l
, const BufferHead
* r
) const {
212 const Object
*lob
= l
->ob
;
213 const Object
*rob
= r
->ob
;
214 const ObjectSet
*loset
= lob
->oset
;
215 const ObjectSet
*roset
= rob
->oset
;
217 return loset
< roset
;
220 if (l
->start() != r
->start())
221 return l
->start() < r
->start();
227 // ******* Object *********
228 class Object
: public LRUObject
{
230 // ObjectCacher::Object fields
234 friend struct ObjectSet
;
239 xlist
<Object
*>::item set_item
;
240 object_locator_t oloc
;
241 uint64_t truncate_size
, truncate_seq
;
246 map
<loff_t
, BufferHead
*> data
;
248 ceph_tid_t last_write_tid
; // version of bh (if non-zero)
249 ceph_tid_t last_commit_tid
; // last update commited.
253 map
< ceph_tid_t
, list
<Context
*> > waitfor_commit
;
254 xlist
<C_ReadFinish
*> reads
;
256 Object(const Object
&) = delete;
257 Object
& operator=(const Object
&) = delete;
259 Object(ObjectCacher
*_oc
, sobject_t o
, uint64_t ono
, ObjectSet
*os
,
260 object_locator_t
& l
, uint64_t ts
, uint64_t tq
) :
263 oid(o
), object_no(ono
), oset(os
), set_item(this), oloc(l
),
264 truncate_size(ts
), truncate_seq(tq
),
265 complete(false), exists(true),
266 last_write_tid(0), last_commit_tid(0),
269 os
->objects
.push_back(&set_item
);
274 assert(data
.empty());
275 assert(dirty_or_tx
== 0);
276 set_item
.remove_myself();
279 sobject_t
get_soid() const { return oid
; }
280 object_t
get_oid() { return oid
.oid
; }
281 snapid_t
get_snap() { return oid
.snap
; }
282 ObjectSet
*get_object_set() const { return oset
; }
283 string
get_namespace() { return oloc
.nspace
; }
284 uint64_t get_object_number() const { return object_no
; }
286 const object_locator_t
& get_oloc() const { return oloc
; }
287 void set_object_locator(object_locator_t
& l
) { oloc
= l
; }
289 bool can_close() const {
290 if (lru_is_expireable()) {
291 assert(data
.empty());
292 assert(waitfor_commit
.empty());
299 * Check buffers and waiters for consistency
300 * - no overlapping buffers
301 * - index in map matches BH
302 * - waiters fall within BH
304 void audit_buffers();
307 * find first buffer that includes or follows an offset
309 * @param offset object byte offset
310 * @return iterator pointing to buffer, or data.end()
312 map
<loff_t
,BufferHead
*>::const_iterator
data_lower_bound(loff_t offset
) const {
313 map
<loff_t
,BufferHead
*>::const_iterator p
= data
.lower_bound(offset
);
314 if (p
!= data
.begin() &&
315 (p
== data
.end() || p
->first
> offset
)) {
316 --p
; // might overlap!
317 if (p
->first
+ p
->second
->length() <= offset
)
318 ++p
; // doesn't overlap.
325 void add_bh(BufferHead
*bh
) {
328 assert(data
.count(bh
->start()) == 0);
329 data
[bh
->start()] = bh
;
331 void remove_bh(BufferHead
*bh
) {
332 assert(data
.count(bh
->start()));
333 data
.erase(bh
->start());
338 bool is_empty() const { return data
.empty(); }
341 BufferHead
*split(BufferHead
*bh
, loff_t off
);
342 void merge_left(BufferHead
*left
, BufferHead
*right
);
343 void try_merge_bh(BufferHead
*bh
);
345 bool is_cached(loff_t off
, loff_t len
) const;
346 bool include_all_cached_data(loff_t off
, loff_t len
);
347 int map_read(ObjectExtent
&ex
,
348 map
<loff_t
, BufferHead
*>& hits
,
349 map
<loff_t
, BufferHead
*>& missing
,
350 map
<loff_t
, BufferHead
*>& rx
,
351 map
<loff_t
, BufferHead
*>& errors
);
352 BufferHead
*map_write(ObjectExtent
&ex
, ceph_tid_t tid
);
354 void replace_journal_tid(BufferHead
*bh
, ceph_tid_t tid
);
355 void truncate(loff_t s
);
356 void discard(loff_t off
, loff_t len
);
358 // reference counting
361 if (ref
== 0) lru_pin();
366 if (ref
== 1) lru_unpin();
377 uint64_t truncate_seq
, truncate_size
;
380 xlist
<Object
*> objects
;
385 ObjectSet(void *p
, int64_t _poolid
, inodeno_t i
)
386 : parent(p
), ino(i
), truncate_seq(0),
387 truncate_size(0), poolid(_poolid
), dirty_or_tx(0),
388 return_enoent(false) {}
393 // ******* ObjectCacher *********
394 // ObjectCacher fields
396 WritebackHandler
& writeback_handler
;
397 bool scattered_write
;
402 uint64_t max_dirty
, target_dirty
, max_size
, max_objects
;
403 ceph::timespan max_dirty_age
;
404 bool block_writes_upfront
;
406 flush_set_callback_t flush_set_callback
;
407 void *flush_set_callback_arg
;
409 // indexed by pool_id
410 vector
<ceph::unordered_map
<sobject_t
, Object
*> > objects
;
412 list
<Context
*> waitfor_read
;
414 ceph_tid_t last_read_tid
;
416 set
<BufferHead
*, BufferHead::ptr_lt
> dirty_or_tx_bh
;
417 LRU bh_lru_dirty
, bh_lru_rest
;
422 void flusher_entry();
423 class FlusherThread
: public Thread
{
426 explicit FlusherThread(ObjectCacher
*o
) : oc(o
) {}
427 void *entry() override
{
436 Object
*get_object_maybe(sobject_t oid
, object_locator_t
&l
) {
438 if (((uint32_t)l
.pool
< objects
.size()) &&
439 (objects
[l
.pool
].count(oid
)))
440 return objects
[l
.pool
][oid
];
444 Object
*get_object(sobject_t oid
, uint64_t object_no
, ObjectSet
*oset
,
445 object_locator_t
&l
, uint64_t truncate_size
,
446 uint64_t truncate_seq
);
447 void close_object(Object
*ob
);
459 loff_t stat_dirty_waiting
; // bytes that writers are waiting on to write
461 void verify_stats() const;
463 void bh_stat_add(BufferHead
*bh
);
464 void bh_stat_sub(BufferHead
*bh
);
465 loff_t
get_stat_tx() const { return stat_tx
; }
466 loff_t
get_stat_rx() const { return stat_rx
; }
467 loff_t
get_stat_dirty() const { return stat_dirty
; }
468 loff_t
get_stat_dirty_waiting() const { return stat_dirty_waiting
; }
469 loff_t
get_stat_clean() const { return stat_clean
; }
470 loff_t
get_stat_zero() const { return stat_zero
; }
472 void touch_bh(BufferHead
*bh
) {
474 bh_lru_dirty
.lru_touch(bh
);
476 bh_lru_rest
.lru_touch(bh
);
478 bh
->set_dontneed(false);
479 bh
->set_nocache(false);
482 void touch_ob(Object
*ob
) {
483 ob_lru
.lru_touch(ob
);
485 void bottouch_ob(Object
*ob
) {
486 ob_lru
.lru_bottouch(ob
);
490 void bh_set_state(BufferHead
*bh
, int s
);
491 void copy_bh_state(BufferHead
*bh1
, BufferHead
*bh2
) {
492 bh_set_state(bh2
, bh1
->get_state());
495 void mark_missing(BufferHead
*bh
) {
496 bh_set_state(bh
,BufferHead::STATE_MISSING
);
498 void mark_clean(BufferHead
*bh
) {
499 bh_set_state(bh
, BufferHead::STATE_CLEAN
);
501 void mark_zero(BufferHead
*bh
) {
502 bh_set_state(bh
, BufferHead::STATE_ZERO
);
504 void mark_rx(BufferHead
*bh
) {
505 bh_set_state(bh
, BufferHead::STATE_RX
);
507 void mark_tx(BufferHead
*bh
) {
508 bh_set_state(bh
, BufferHead::STATE_TX
); }
509 void mark_error(BufferHead
*bh
) {
510 bh_set_state(bh
, BufferHead::STATE_ERROR
);
512 void mark_dirty(BufferHead
*bh
) {
513 bh_set_state(bh
, BufferHead::STATE_DIRTY
);
514 bh_lru_dirty
.lru_touch(bh
);
515 //bh->set_dirty_stamp(ceph_clock_now());
518 void bh_add(Object
*ob
, BufferHead
*bh
);
519 void bh_remove(Object
*ob
, BufferHead
*bh
);
522 void bh_read(BufferHead
*bh
, int op_flags
);
523 void bh_write(BufferHead
*bh
);
524 void bh_write_scattered(list
<BufferHead
*>& blist
);
525 void bh_write_adjacencies(BufferHead
*bh
, ceph::real_time cutoff
,
526 int64_t *amount
, int *max_count
);
529 void flush(loff_t amount
=0);
532 * flush a range of buffers
534 * Flush any buffers that intersect the specified extent. If len==0,
535 * flush *all* buffers for the object.
538 * @param off start offset
539 * @param len extent length, or 0 for entire object
540 * @return true if object was already clean/flushed.
542 bool flush(Object
*o
, loff_t off
, loff_t len
);
543 loff_t
release(Object
*o
);
544 void purge(Object
*o
);
546 int64_t reads_outstanding
;
549 int _readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
,
551 void retry_waiting_reads();
554 void bh_read_finish(int64_t poolid
, sobject_t oid
, ceph_tid_t tid
,
555 loff_t offset
, uint64_t length
,
556 bufferlist
&bl
, int r
,
558 void bh_write_commit(int64_t poolid
, sobject_t oid
,
559 vector
<pair
<loff_t
, uint64_t> >& ranges
,
560 ceph_tid_t t
, int r
);
563 class C_WaitForWrite
;
570 ObjectCacher(CephContext
*cct_
, string name
, WritebackHandler
& wb
, Mutex
& l
,
571 flush_set_callback_t flush_callback
,
572 void *flush_callback_arg
,
573 uint64_t max_bytes
, uint64_t max_objects
,
574 uint64_t max_dirty
, uint64_t target_dirty
, double max_age
,
575 bool block_writes_upfront
);
579 flusher_thread
.create("flusher");
582 assert(flusher_thread
.is_started());
583 lock
.Lock(); // hmm.. watch out for deadlock!
585 flusher_cond
.Signal();
587 flusher_thread
.join();
594 // non-blocking. async.
597 * @note total read size must be <= INT_MAX, since
598 * the return value is total bytes read
600 int readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
);
601 int writex(OSDWrite
*wr
, ObjectSet
*oset
, Context
*onfreespace
);
602 bool is_cached(ObjectSet
*oset
, vector
<ObjectExtent
>& extents
,
607 int _wait_for_write(OSDWrite
*wr
, uint64_t len
, ObjectSet
*oset
,
608 Context
*onfreespace
);
609 void maybe_wait_for_writeback(uint64_t len
);
610 bool _flush_set_finish(C_GatherBuilder
*gather
, Context
*onfinish
);
613 bool set_is_empty(ObjectSet
*oset
);
614 bool set_is_cached(ObjectSet
*oset
);
615 bool set_is_dirty_or_committing(ObjectSet
*oset
);
617 bool flush_set(ObjectSet
*oset
, Context
*onfinish
=0);
618 bool flush_set(ObjectSet
*oset
, vector
<ObjectExtent
>& ex
,
619 Context
*onfinish
= 0);
620 bool flush_all(Context
*onfinish
= 0);
622 void purge_set(ObjectSet
*oset
);
624 // returns # of bytes not released (ie non-clean)
625 loff_t
release_set(ObjectSet
*oset
);
626 uint64_t release_all();
628 void discard_set(ObjectSet
*oset
, const vector
<ObjectExtent
>& ex
);
631 * Retry any in-flight reads that get -ENOENT instead of marking
632 * them zero, and get rid of any cached -ENOENTs.
633 * After this is called and the cache's lock is unlocked,
634 * any new requests will treat -ENOENT normally.
636 void clear_nonexistence(ObjectSet
*oset
);
640 void set_max_dirty(uint64_t v
) {
643 void set_target_dirty(int64_t v
) {
646 void set_max_size(int64_t v
) {
649 void set_max_dirty_age(double a
) {
650 max_dirty_age
= make_timespan(a
);
652 void set_max_objects(int64_t v
) {
659 /*** async+caching (non-blocking) file interface ***/
660 int file_is_cached(ObjectSet
*oset
, file_layout_t
*layout
,
661 snapid_t snapid
, loff_t offset
, uint64_t len
) {
662 vector
<ObjectExtent
> extents
;
663 Striper::file_to_extents(cct
, oset
->ino
, layout
, offset
, len
,
664 oset
->truncate_size
, extents
);
665 return is_cached(oset
, extents
, snapid
);
668 int file_read(ObjectSet
*oset
, file_layout_t
*layout
, snapid_t snapid
,
669 loff_t offset
, uint64_t len
, bufferlist
*bl
, int flags
,
671 OSDRead
*rd
= prepare_read(snapid
, bl
, flags
);
672 Striper::file_to_extents(cct
, oset
->ino
, layout
, offset
, len
,
673 oset
->truncate_size
, rd
->extents
);
674 return readx(rd
, oset
, onfinish
);
677 int file_write(ObjectSet
*oset
, file_layout_t
*layout
,
678 const SnapContext
& snapc
, loff_t offset
, uint64_t len
,
679 bufferlist
& bl
, ceph::real_time mtime
, int flags
) {
680 OSDWrite
*wr
= prepare_write(snapc
, bl
, mtime
, flags
, 0);
681 Striper::file_to_extents(cct
, oset
->ino
, layout
, offset
, len
,
682 oset
->truncate_size
, wr
->extents
);
683 return writex(wr
, oset
, NULL
);
686 bool file_flush(ObjectSet
*oset
, file_layout_t
*layout
,
687 const SnapContext
& snapc
, loff_t offset
, uint64_t len
,
689 vector
<ObjectExtent
> extents
;
690 Striper::file_to_extents(cct
, oset
->ino
, layout
, offset
, len
,
691 oset
->truncate_size
, extents
);
692 return flush_set(oset
, extents
, onfinish
);
697 inline ostream
& operator<<(ostream
&out
, const ObjectCacher::BufferHead
&bh
)
699 out
<< "bh[ " << &bh
<< " "
700 << bh
.start() << "~" << bh
.length()
702 << " (" << bh
.bl
.length() << ")"
703 << " v " << bh
.last_write_tid
;
704 if (bh
.get_journal_tid() != 0) {
705 out
<< " j " << bh
.get_journal_tid();
707 if (bh
.is_tx()) out
<< " tx";
708 if (bh
.is_rx()) out
<< " rx";
709 if (bh
.is_dirty()) out
<< " dirty";
710 if (bh
.is_clean()) out
<< " clean";
711 if (bh
.is_zero()) out
<< " zero";
712 if (bh
.is_missing()) out
<< " missing";
713 if (bh
.bl
.length() > 0) out
<< " firstbyte=" << (int)bh
.bl
[0];
714 if (bh
.error
) out
<< " error=" << bh
.error
;
716 out
<< " waiters = {";
717 for (map
<loff_t
, list
<Context
*> >::const_iterator it
718 = bh
.waitfor_read
.begin();
719 it
!= bh
.waitfor_read
.end(); ++it
) {
720 out
<< " " << it
->first
<< "->[";
721 for (list
<Context
*>::const_iterator lit
= it
->second
.begin();
722 lit
!= it
->second
.end(); ++lit
) {
731 inline ostream
& operator<<(ostream
&out
, const ObjectCacher::ObjectSet
&os
)
733 return out
<< "objectset[" << os
.ino
734 << " ts " << os
.truncate_seq
<< "/" << os
.truncate_size
735 << " objects " << os
.objects
.size()
736 << " dirty_or_tx " << os
.dirty_or_tx
740 inline ostream
& operator<<(ostream
&out
, const ObjectCacher::Object
&ob
)
743 << ob
.get_soid() << " oset " << ob
.oset
<< dec
744 << " wr " << ob
.last_write_tid
<< "/" << ob
.last_commit_tid
;