1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #ifndef CEPH_OBJECTCACHER_H
4 #define CEPH_OBJECTCACHER_H
6 #include "include/types.h"
7 #include "include/lru.h"
8 #include "include/Context.h"
9 #include "include/xlist.h"
11 #include "common/Cond.h"
12 #include "common/Finisher.h"
13 #include "common/Thread.h"
14 #include "common/zipkin_trace.h"
20 class WritebackHandler
;
24 l_objectcacher_first
= 25000,
26 l_objectcacher_cache_ops_hit
, // ops we satisfy completely from cache
27 l_objectcacher_cache_ops_miss
, // ops we don't satisfy completely from cache
29 l_objectcacher_cache_bytes_hit
, // bytes read directly from cache
31 l_objectcacher_cache_bytes_miss
, // bytes we couldn't read directly
35 l_objectcacher_data_read
, // total bytes read out
36 l_objectcacher_data_written
, // bytes written to cache
37 l_objectcacher_data_flushed
, // bytes flushed to WritebackHandler
38 l_objectcacher_overwritten_in_flush
, // bytes overwritten while
39 // flushing is in progress
41 l_objectcacher_write_ops_blocked
, // total write ops we delayed due
43 l_objectcacher_write_bytes_blocked
, // total number of write bytes
44 // we delayed due to dirty
46 l_objectcacher_write_time_blocked
, // total time in seconds spent
47 // blocking a write due to dirty
54 PerfCounters
*perfcounter
;
61 typedef void (*flush_set_callback_t
) (void *p
, ObjectSet
*oset
);
63 // read scatter/gather
65 vector
<ObjectExtent
> extents
;
69 OSDRead(snapid_t s
, bufferlist
*b
, int f
)
70 : snap(s
), bl(b
), fadvise_flags(f
) {}
73 OSDRead
*prepare_read(snapid_t snap
, bufferlist
*b
, int f
) const {
74 return new OSDRead(snap
, b
, f
);
77 // write scatter/gather
79 vector
<ObjectExtent
> extents
;
82 ceph::real_time mtime
;
84 ceph_tid_t journal_tid
;
85 OSDWrite(const SnapContext
& sc
, const bufferlist
& b
, ceph::real_time mt
,
86 int f
, ceph_tid_t _journal_tid
)
87 : snapc(sc
), bl(b
), mtime(mt
), fadvise_flags(f
),
88 journal_tid(_journal_tid
) {}
91 OSDWrite
*prepare_write(const SnapContext
& sc
,
95 ceph_tid_t journal_tid
) const {
96 return new OSDWrite(sc
, b
, mt
, f
, journal_tid
);
101 // ******* BufferHead *********
102 class BufferHead
: public LRUObject
{
105 static const int STATE_MISSING
= 0;
106 static const int STATE_CLEAN
= 1;
107 static const int STATE_ZERO
= 2; // NOTE: these are *clean* zeros
108 static const int STATE_DIRTY
= 3;
109 static const int STATE_RX
= 4;
110 static const int STATE_TX
= 5;
111 static const int STATE_ERROR
= 6; // a read error occurred
118 loff_t start
, length
; // bh extent in object
120 bool dontneed
; //indicate bh don't need by anyone
121 bool nocache
; //indicate bh don't need by this caller
126 ceph_tid_t last_write_tid
; // version of bh (if non-zero)
127 ceph_tid_t last_read_tid
; // tid of last read op (if any)
128 ceph::real_time last_write
;
130 ceph_tid_t journal_tid
;
131 int error
; // holds return value for failed reads
133 map
<loff_t
, list
<Context
*> > waitfor_read
;
136 explicit BufferHead(Object
*o
) :
137 state(STATE_MISSING
),
146 ex
.start
= ex
.length
= 0;
150 loff_t
start() const { return ex
.start
; }
151 void set_start(loff_t s
) { ex
.start
= s
; }
152 loff_t
length() const { return ex
.length
; }
153 void set_length(loff_t l
) { ex
.length
= l
; }
154 loff_t
end() const { return ex
.start
+ ex
.length
; }
155 loff_t
last() const { return end() - 1; }
158 void set_state(int s
) {
159 if (s
== STATE_RX
|| s
== STATE_TX
) get();
160 if (state
== STATE_RX
|| state
== STATE_TX
) put();
163 int get_state() const { return state
; }
165 inline ceph_tid_t
get_journal_tid() const {
168 inline void set_journal_tid(ceph_tid_t _journal_tid
) {
169 journal_tid
= _journal_tid
;
172 bool is_missing() const { return state
== STATE_MISSING
; }
173 bool is_dirty() const { return state
== STATE_DIRTY
; }
174 bool is_clean() const { return state
== STATE_CLEAN
; }
175 bool is_zero() const { return state
== STATE_ZERO
; }
176 bool is_tx() const { return state
== STATE_TX
; }
177 bool is_rx() const { return state
== STATE_RX
; }
178 bool is_error() const { return state
== STATE_ERROR
; }
180 // reference counting
183 if (ref
== 0) lru_pin();
188 if (ref
== 1) lru_unpin();
193 void set_dontneed(bool v
) {
196 bool get_dontneed() const {
200 void set_nocache(bool v
) {
203 bool get_nocache() const {
207 inline bool can_merge_journal(BufferHead
*bh
) const {
208 return (get_journal_tid() == bh
->get_journal_tid());
212 bool operator()(const BufferHead
* l
, const BufferHead
* r
) const {
213 const Object
*lob
= l
->ob
;
214 const Object
*rob
= r
->ob
;
215 const ObjectSet
*loset
= lob
->oset
;
216 const ObjectSet
*roset
= rob
->oset
;
218 return loset
< roset
;
221 if (l
->start() != r
->start())
222 return l
->start() < r
->start();
228 // ******* Object *********
229 class Object
: public LRUObject
{
231 // ObjectCacher::Object fields
235 friend struct ObjectSet
;
240 xlist
<Object
*>::item set_item
;
241 object_locator_t oloc
;
242 uint64_t truncate_size
, truncate_seq
;
247 map
<loff_t
, BufferHead
*> data
;
249 ceph_tid_t last_write_tid
; // version of bh (if non-zero)
250 ceph_tid_t last_commit_tid
; // last update commited.
254 map
< ceph_tid_t
, list
<Context
*> > waitfor_commit
;
255 xlist
<C_ReadFinish
*> reads
;
257 Object(const Object
&) = delete;
258 Object
& operator=(const Object
&) = delete;
260 Object(ObjectCacher
*_oc
, sobject_t o
, uint64_t ono
, ObjectSet
*os
,
261 object_locator_t
& l
, uint64_t ts
, uint64_t tq
) :
264 oid(o
), object_no(ono
), oset(os
), set_item(this), oloc(l
),
265 truncate_size(ts
), truncate_seq(tq
),
266 complete(false), exists(true),
267 last_write_tid(0), last_commit_tid(0),
270 os
->objects
.push_back(&set_item
);
275 assert(data
.empty());
276 assert(dirty_or_tx
== 0);
277 set_item
.remove_myself();
280 sobject_t
get_soid() const { return oid
; }
281 object_t
get_oid() { return oid
.oid
; }
282 snapid_t
get_snap() { return oid
.snap
; }
283 ObjectSet
*get_object_set() const { return oset
; }
284 string
get_namespace() { return oloc
.nspace
; }
285 uint64_t get_object_number() const { return object_no
; }
287 const object_locator_t
& get_oloc() const { return oloc
; }
288 void set_object_locator(object_locator_t
& l
) { oloc
= l
; }
290 bool can_close() const {
291 if (lru_is_expireable()) {
292 assert(data
.empty());
293 assert(waitfor_commit
.empty());
300 * Check buffers and waiters for consistency
301 * - no overlapping buffers
302 * - index in map matches BH
303 * - waiters fall within BH
305 void audit_buffers();
308 * find first buffer that includes or follows an offset
310 * @param offset object byte offset
311 * @return iterator pointing to buffer, or data.end()
313 map
<loff_t
,BufferHead
*>::const_iterator
data_lower_bound(loff_t offset
) const {
314 map
<loff_t
,BufferHead
*>::const_iterator p
= data
.lower_bound(offset
);
315 if (p
!= data
.begin() &&
316 (p
== data
.end() || p
->first
> offset
)) {
317 --p
; // might overlap!
318 if (p
->first
+ p
->second
->length() <= offset
)
319 ++p
; // doesn't overlap.
326 void add_bh(BufferHead
*bh
) {
329 assert(data
.count(bh
->start()) == 0);
330 data
[bh
->start()] = bh
;
332 void remove_bh(BufferHead
*bh
) {
333 assert(data
.count(bh
->start()));
334 data
.erase(bh
->start());
339 bool is_empty() const { return data
.empty(); }
342 BufferHead
*split(BufferHead
*bh
, loff_t off
);
343 void merge_left(BufferHead
*left
, BufferHead
*right
);
344 bool can_merge_bh(BufferHead
*left
, BufferHead
*right
);
345 void try_merge_bh(BufferHead
*bh
);
346 void maybe_rebuild_buffer(BufferHead
*bh
);
348 bool is_cached(loff_t off
, loff_t len
) const;
349 bool include_all_cached_data(loff_t off
, loff_t len
);
350 int map_read(ObjectExtent
&ex
,
351 map
<loff_t
, BufferHead
*>& hits
,
352 map
<loff_t
, BufferHead
*>& missing
,
353 map
<loff_t
, BufferHead
*>& rx
,
354 map
<loff_t
, BufferHead
*>& errors
);
355 BufferHead
*map_write(ObjectExtent
&ex
, ceph_tid_t tid
);
357 void replace_journal_tid(BufferHead
*bh
, ceph_tid_t tid
);
358 void truncate(loff_t s
);
359 void discard(loff_t off
, loff_t len
, C_GatherBuilder
* commit_gather
);
361 // reference counting
364 if (ref
== 0) lru_pin();
369 if (ref
== 1) lru_unpin();
380 uint64_t truncate_seq
, truncate_size
;
383 xlist
<Object
*> objects
;
388 ObjectSet(void *p
, int64_t _poolid
, inodeno_t i
)
389 : parent(p
), ino(i
), truncate_seq(0),
390 truncate_size(0), poolid(_poolid
), dirty_or_tx(0),
391 return_enoent(false) {}
396 // ******* ObjectCacher *********
397 // ObjectCacher fields
399 WritebackHandler
& writeback_handler
;
400 bool scattered_write
;
405 uint64_t max_dirty
, target_dirty
, max_size
, max_objects
;
406 ceph::timespan max_dirty_age
;
407 bool block_writes_upfront
;
409 ZTracer::Endpoint trace_endpoint
;
411 flush_set_callback_t flush_set_callback
;
412 void *flush_set_callback_arg
;
414 // indexed by pool_id
415 vector
<ceph::unordered_map
<sobject_t
, Object
*> > objects
;
417 list
<Context
*> waitfor_read
;
419 ceph_tid_t last_read_tid
;
421 set
<BufferHead
*, BufferHead::ptr_lt
> dirty_or_tx_bh
;
422 LRU bh_lru_dirty
, bh_lru_rest
;
427 void flusher_entry();
428 class FlusherThread
: public Thread
{
431 explicit FlusherThread(ObjectCacher
*o
) : oc(o
) {}
432 void *entry() override
{
441 Object
*get_object_maybe(sobject_t oid
, object_locator_t
&l
) {
443 if (((uint32_t)l
.pool
< objects
.size()) &&
444 (objects
[l
.pool
].count(oid
)))
445 return objects
[l
.pool
][oid
];
449 Object
*get_object(sobject_t oid
, uint64_t object_no
, ObjectSet
*oset
,
450 object_locator_t
&l
, uint64_t truncate_size
,
451 uint64_t truncate_seq
);
452 void close_object(Object
*ob
);
464 loff_t stat_dirty_waiting
; // bytes that writers are waiting on to write
466 size_t stat_nr_dirty_waiters
;
468 void verify_stats() const;
470 void bh_stat_add(BufferHead
*bh
);
471 void bh_stat_sub(BufferHead
*bh
);
472 loff_t
get_stat_tx() const { return stat_tx
; }
473 loff_t
get_stat_rx() const { return stat_rx
; }
474 loff_t
get_stat_dirty() const { return stat_dirty
; }
475 loff_t
get_stat_clean() const { return stat_clean
; }
476 loff_t
get_stat_zero() const { return stat_zero
; }
477 loff_t
get_stat_dirty_waiting() const { return stat_dirty_waiting
; }
478 size_t get_stat_nr_dirty_waiters() const { return stat_nr_dirty_waiters
; }
480 void touch_bh(BufferHead
*bh
) {
482 bh_lru_dirty
.lru_touch(bh
);
484 bh_lru_rest
.lru_touch(bh
);
486 bh
->set_dontneed(false);
487 bh
->set_nocache(false);
490 void touch_ob(Object
*ob
) {
491 ob_lru
.lru_touch(ob
);
493 void bottouch_ob(Object
*ob
) {
494 ob_lru
.lru_bottouch(ob
);
498 void bh_set_state(BufferHead
*bh
, int s
);
499 void copy_bh_state(BufferHead
*bh1
, BufferHead
*bh2
) {
500 bh_set_state(bh2
, bh1
->get_state());
503 void mark_missing(BufferHead
*bh
) {
504 bh_set_state(bh
,BufferHead::STATE_MISSING
);
506 void mark_clean(BufferHead
*bh
) {
507 bh_set_state(bh
, BufferHead::STATE_CLEAN
);
509 void mark_zero(BufferHead
*bh
) {
510 bh_set_state(bh
, BufferHead::STATE_ZERO
);
512 void mark_rx(BufferHead
*bh
) {
513 bh_set_state(bh
, BufferHead::STATE_RX
);
515 void mark_tx(BufferHead
*bh
) {
516 bh_set_state(bh
, BufferHead::STATE_TX
); }
517 void mark_error(BufferHead
*bh
) {
518 bh_set_state(bh
, BufferHead::STATE_ERROR
);
520 void mark_dirty(BufferHead
*bh
) {
521 bh_set_state(bh
, BufferHead::STATE_DIRTY
);
522 bh_lru_dirty
.lru_touch(bh
);
523 //bh->set_dirty_stamp(ceph_clock_now());
526 void bh_add(Object
*ob
, BufferHead
*bh
);
527 void bh_remove(Object
*ob
, BufferHead
*bh
);
530 void bh_read(BufferHead
*bh
, int op_flags
,
531 const ZTracer::Trace
&parent_trace
);
532 void bh_write(BufferHead
*bh
, const ZTracer::Trace
&parent_trace
);
533 void bh_write_scattered(list
<BufferHead
*>& blist
);
534 void bh_write_adjacencies(BufferHead
*bh
, ceph::real_time cutoff
,
535 int64_t *amount
, int *max_count
);
538 void flush(ZTracer::Trace
*trace
, loff_t amount
=0);
541 * flush a range of buffers
543 * Flush any buffers that intersect the specified extent. If len==0,
544 * flush *all* buffers for the object.
547 * @param off start offset
548 * @param len extent length, or 0 for entire object
549 * @return true if object was already clean/flushed.
551 bool flush(Object
*o
, loff_t off
, loff_t len
,
552 ZTracer::Trace
*trace
);
553 loff_t
release(Object
*o
);
554 void purge(Object
*o
);
556 int64_t reads_outstanding
;
559 int _readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
,
560 bool external_call
, ZTracer::Trace
*trace
);
561 void retry_waiting_reads();
564 void bh_read_finish(int64_t poolid
, sobject_t oid
, ceph_tid_t tid
,
565 loff_t offset
, uint64_t length
,
566 bufferlist
&bl
, int r
,
568 void bh_write_commit(int64_t poolid
, sobject_t oid
,
569 vector
<pair
<loff_t
, uint64_t> >& ranges
,
570 ceph_tid_t t
, int r
);
573 class C_WaitForWrite
;
580 ObjectCacher(CephContext
*cct_
, string name
, WritebackHandler
& wb
, Mutex
& l
,
581 flush_set_callback_t flush_callback
,
582 void *flush_callback_arg
,
583 uint64_t max_bytes
, uint64_t max_objects
,
584 uint64_t max_dirty
, uint64_t target_dirty
, double max_age
,
585 bool block_writes_upfront
);
589 flusher_thread
.create("flusher");
592 assert(flusher_thread
.is_started());
593 lock
.Lock(); // hmm.. watch out for deadlock!
595 flusher_cond
.Signal();
597 flusher_thread
.join();
604 // non-blocking. async.
607 * @note total read size must be <= INT_MAX, since
608 * the return value is total bytes read
610 int readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
,
611 ZTracer::Trace
*parent_trace
= nullptr);
612 int writex(OSDWrite
*wr
, ObjectSet
*oset
, Context
*onfreespace
,
613 ZTracer::Trace
*parent_trace
= nullptr);
614 bool is_cached(ObjectSet
*oset
, vector
<ObjectExtent
>& extents
,
619 int _wait_for_write(OSDWrite
*wr
, uint64_t len
, ObjectSet
*oset
,
620 ZTracer::Trace
*trace
, Context
*onfreespace
);
621 void maybe_wait_for_writeback(uint64_t len
, ZTracer::Trace
*trace
);
622 bool _flush_set_finish(C_GatherBuilder
*gather
, Context
*onfinish
);
624 void _discard(ObjectSet
*oset
, const vector
<ObjectExtent
>& exls
,
625 C_GatherBuilder
* gather
);
626 void _discard_finish(ObjectSet
*oset
, bool was_dirty
, Context
* on_finish
);
629 bool set_is_empty(ObjectSet
*oset
);
630 bool set_is_cached(ObjectSet
*oset
);
631 bool set_is_dirty_or_committing(ObjectSet
*oset
);
633 bool flush_set(ObjectSet
*oset
, Context
*onfinish
=0);
634 bool flush_set(ObjectSet
*oset
, vector
<ObjectExtent
>& ex
,
635 ZTracer::Trace
*trace
, Context
*onfinish
= 0);
636 bool flush_all(Context
*onfinish
= 0);
638 void purge_set(ObjectSet
*oset
);
640 // returns # of bytes not released (ie non-clean)
641 loff_t
release_set(ObjectSet
*oset
);
642 uint64_t release_all();
644 void discard_set(ObjectSet
*oset
, const vector
<ObjectExtent
>& ex
);
645 void discard_writeback(ObjectSet
*oset
, const vector
<ObjectExtent
>& ex
,
649 * Retry any in-flight reads that get -ENOENT instead of marking
650 * them zero, and get rid of any cached -ENOENTs.
651 * After this is called and the cache's lock is unlocked,
652 * any new requests will treat -ENOENT normally.
654 void clear_nonexistence(ObjectSet
*oset
);
658 void set_max_dirty(uint64_t v
) {
661 void set_target_dirty(int64_t v
) {
664 void set_max_size(int64_t v
) {
667 void set_max_dirty_age(double a
) {
668 max_dirty_age
= make_timespan(a
);
670 void set_max_objects(int64_t v
) {
677 /*** async+caching (non-blocking) file interface ***/
678 int file_is_cached(ObjectSet
*oset
, file_layout_t
*layout
,
679 snapid_t snapid
, loff_t offset
, uint64_t len
) {
680 vector
<ObjectExtent
> extents
;
681 Striper::file_to_extents(cct
, oset
->ino
, layout
, offset
, len
,
682 oset
->truncate_size
, extents
);
683 return is_cached(oset
, extents
, snapid
);
686 int file_read(ObjectSet
*oset
, file_layout_t
*layout
, snapid_t snapid
,
687 loff_t offset
, uint64_t len
, bufferlist
*bl
, int flags
,
689 OSDRead
*rd
= prepare_read(snapid
, bl
, flags
);
690 Striper::file_to_extents(cct
, oset
->ino
, layout
, offset
, len
,
691 oset
->truncate_size
, rd
->extents
);
692 return readx(rd
, oset
, onfinish
);
695 int file_write(ObjectSet
*oset
, file_layout_t
*layout
,
696 const SnapContext
& snapc
, loff_t offset
, uint64_t len
,
697 bufferlist
& bl
, ceph::real_time mtime
, int flags
) {
698 OSDWrite
*wr
= prepare_write(snapc
, bl
, mtime
, flags
, 0);
699 Striper::file_to_extents(cct
, oset
->ino
, layout
, offset
, len
,
700 oset
->truncate_size
, wr
->extents
);
701 return writex(wr
, oset
, NULL
);
704 bool file_flush(ObjectSet
*oset
, file_layout_t
*layout
,
705 const SnapContext
& snapc
, loff_t offset
, uint64_t len
,
707 vector
<ObjectExtent
> extents
;
708 Striper::file_to_extents(cct
, oset
->ino
, layout
, offset
, len
,
709 oset
->truncate_size
, extents
);
710 ZTracer::Trace trace
;
711 return flush_set(oset
, extents
, &trace
, onfinish
);
716 inline ostream
& operator<<(ostream
&out
, const ObjectCacher::BufferHead
&bh
)
718 out
<< "bh[ " << &bh
<< " "
719 << bh
.start() << "~" << bh
.length()
721 << " (" << bh
.bl
.length() << ")"
722 << " v " << bh
.last_write_tid
;
723 if (bh
.get_journal_tid() != 0) {
724 out
<< " j " << bh
.get_journal_tid();
726 if (bh
.is_tx()) out
<< " tx";
727 if (bh
.is_rx()) out
<< " rx";
728 if (bh
.is_dirty()) out
<< " dirty";
729 if (bh
.is_clean()) out
<< " clean";
730 if (bh
.is_zero()) out
<< " zero";
731 if (bh
.is_missing()) out
<< " missing";
732 if (bh
.bl
.length() > 0) out
<< " firstbyte=" << (int)bh
.bl
[0];
733 if (bh
.error
) out
<< " error=" << bh
.error
;
735 out
<< " waiters = {";
736 for (map
<loff_t
, list
<Context
*> >::const_iterator it
737 = bh
.waitfor_read
.begin();
738 it
!= bh
.waitfor_read
.end(); ++it
) {
739 out
<< " " << it
->first
<< "->[";
740 for (list
<Context
*>::const_iterator lit
= it
->second
.begin();
741 lit
!= it
->second
.end(); ++lit
) {
750 inline ostream
& operator<<(ostream
&out
, const ObjectCacher::ObjectSet
&os
)
752 return out
<< "objectset[" << os
.ino
753 << " ts " << os
.truncate_seq
<< "/" << os
.truncate_size
754 << " objects " << os
.objects
.size()
755 << " dirty_or_tx " << os
.dirty_or_tx
759 inline ostream
& operator<<(ostream
&out
, const ObjectCacher::Object
&ob
)
762 << ob
.get_soid() << " oset " << ob
.oset
<< dec
763 << " wr " << ob
.last_write_tid
<< "/" << ob
.last_commit_tid
;