1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #ifndef CEPH_OBJECTCACHER_H
4 #define CEPH_OBJECTCACHER_H
6 #include "include/types.h"
7 #include "include/lru.h"
8 #include "include/Context.h"
9 #include "include/xlist.h"
11 #include "common/Cond.h"
12 #include "common/Finisher.h"
13 #include "common/Thread.h"
14 #include "common/zipkin_trace.h"
20 class WritebackHandler
;
24 l_objectcacher_first
= 25000,
26 l_objectcacher_cache_ops_hit
, // ops we satisfy completely from cache
27 l_objectcacher_cache_ops_miss
, // ops we don't satisfy completely from cache
29 l_objectcacher_cache_bytes_hit
, // bytes read directly from cache
31 l_objectcacher_cache_bytes_miss
, // bytes we couldn't read directly
35 l_objectcacher_data_read
, // total bytes read out
36 l_objectcacher_data_written
, // bytes written to cache
37 l_objectcacher_data_flushed
, // bytes flushed to WritebackHandler
38 l_objectcacher_overwritten_in_flush
, // bytes overwritten while
39 // flushing is in progress
41 l_objectcacher_write_ops_blocked
, // total write ops we delayed due
43 l_objectcacher_write_bytes_blocked
, // total number of write bytes
44 // we delayed due to dirty
46 l_objectcacher_write_time_blocked
, // total time in seconds spent
47 // blocking a write due to dirty
54 PerfCounters
*perfcounter
;
61 typedef void (*flush_set_callback_t
) (void *p
, ObjectSet
*oset
);
63 // read scatter/gather
65 vector
<ObjectExtent
> extents
;
69 OSDRead(snapid_t s
, bufferlist
*b
, int f
)
70 : snap(s
), bl(b
), fadvise_flags(f
) {}
73 OSDRead
*prepare_read(snapid_t snap
, bufferlist
*b
, int f
) const {
74 return new OSDRead(snap
, b
, f
);
77 // write scatter/gather
79 vector
<ObjectExtent
> extents
;
82 ceph::real_time mtime
;
84 ceph_tid_t journal_tid
;
85 OSDWrite(const SnapContext
& sc
, const bufferlist
& b
, ceph::real_time mt
,
86 int f
, ceph_tid_t _journal_tid
)
87 : snapc(sc
), bl(b
), mtime(mt
), fadvise_flags(f
),
88 journal_tid(_journal_tid
) {}
91 OSDWrite
*prepare_write(const SnapContext
& sc
,
95 ceph_tid_t journal_tid
) const {
96 return new OSDWrite(sc
, b
, mt
, f
, journal_tid
);
101 // ******* BufferHead *********
102 class BufferHead
: public LRUObject
{
105 static const int STATE_MISSING
= 0;
106 static const int STATE_CLEAN
= 1;
107 static const int STATE_ZERO
= 2; // NOTE: these are *clean* zeros
108 static const int STATE_DIRTY
= 3;
109 static const int STATE_RX
= 4;
110 static const int STATE_TX
= 5;
111 static const int STATE_ERROR
= 6; // a read error occurred
118 loff_t start
, length
; // bh extent in object
120 bool dontneed
; //indicate bh don't need by anyone
121 bool nocache
; //indicate bh don't need by this caller
126 ceph_tid_t last_write_tid
; // version of bh (if non-zero)
127 ceph_tid_t last_read_tid
; // tid of last read op (if any)
128 ceph::real_time last_write
;
130 ceph_tid_t journal_tid
;
131 int error
; // holds return value for failed reads
133 map
<loff_t
, list
<Context
*> > waitfor_read
;
136 explicit BufferHead(Object
*o
) :
137 state(STATE_MISSING
),
146 ex
.start
= ex
.length
= 0;
150 loff_t
start() const { return ex
.start
; }
151 void set_start(loff_t s
) { ex
.start
= s
; }
152 loff_t
length() const { return ex
.length
; }
153 void set_length(loff_t l
) { ex
.length
= l
; }
154 loff_t
end() const { return ex
.start
+ ex
.length
; }
155 loff_t
last() const { return end() - 1; }
158 void set_state(int s
) {
159 if (s
== STATE_RX
|| s
== STATE_TX
) get();
160 if (state
== STATE_RX
|| state
== STATE_TX
) put();
163 int get_state() const { return state
; }
165 inline ceph_tid_t
get_journal_tid() const {
168 inline void set_journal_tid(ceph_tid_t _journal_tid
) {
169 journal_tid
= _journal_tid
;
172 bool is_missing() const { return state
== STATE_MISSING
; }
173 bool is_dirty() const { return state
== STATE_DIRTY
; }
174 bool is_clean() const { return state
== STATE_CLEAN
; }
175 bool is_zero() const { return state
== STATE_ZERO
; }
176 bool is_tx() const { return state
== STATE_TX
; }
177 bool is_rx() const { return state
== STATE_RX
; }
178 bool is_error() const { return state
== STATE_ERROR
; }
180 // reference counting
183 if (ref
== 0) lru_pin();
188 if (ref
== 1) lru_unpin();
193 void set_dontneed(bool v
) {
196 bool get_dontneed() const {
200 void set_nocache(bool v
) {
203 bool get_nocache() const {
207 inline bool can_merge_journal(BufferHead
*bh
) const {
208 return (get_journal_tid() == bh
->get_journal_tid());
212 bool operator()(const BufferHead
* l
, const BufferHead
* r
) const {
213 const Object
*lob
= l
->ob
;
214 const Object
*rob
= r
->ob
;
215 const ObjectSet
*loset
= lob
->oset
;
216 const ObjectSet
*roset
= rob
->oset
;
218 return loset
< roset
;
221 if (l
->start() != r
->start())
222 return l
->start() < r
->start();
228 // ******* Object *********
229 class Object
: public LRUObject
{
231 // ObjectCacher::Object fields
235 friend struct ObjectSet
;
240 xlist
<Object
*>::item set_item
;
241 object_locator_t oloc
;
242 uint64_t truncate_size
, truncate_seq
;
247 map
<loff_t
, BufferHead
*> data
;
249 ceph_tid_t last_write_tid
; // version of bh (if non-zero)
250 ceph_tid_t last_commit_tid
; // last update commited.
254 map
< ceph_tid_t
, list
<Context
*> > waitfor_commit
;
255 xlist
<C_ReadFinish
*> reads
;
257 Object(const Object
&) = delete;
258 Object
& operator=(const Object
&) = delete;
260 Object(ObjectCacher
*_oc
, sobject_t o
, uint64_t ono
, ObjectSet
*os
,
261 object_locator_t
& l
, uint64_t ts
, uint64_t tq
) :
264 oid(o
), object_no(ono
), oset(os
), set_item(this), oloc(l
),
265 truncate_size(ts
), truncate_seq(tq
),
266 complete(false), exists(true),
267 last_write_tid(0), last_commit_tid(0),
270 os
->objects
.push_back(&set_item
);
275 assert(data
.empty());
276 assert(dirty_or_tx
== 0);
277 set_item
.remove_myself();
280 sobject_t
get_soid() const { return oid
; }
281 object_t
get_oid() { return oid
.oid
; }
282 snapid_t
get_snap() { return oid
.snap
; }
283 ObjectSet
*get_object_set() const { return oset
; }
284 string
get_namespace() { return oloc
.nspace
; }
285 uint64_t get_object_number() const { return object_no
; }
287 const object_locator_t
& get_oloc() const { return oloc
; }
288 void set_object_locator(object_locator_t
& l
) { oloc
= l
; }
290 bool can_close() const {
291 if (lru_is_expireable()) {
292 assert(data
.empty());
293 assert(waitfor_commit
.empty());
300 * Check buffers and waiters for consistency
301 * - no overlapping buffers
302 * - index in map matches BH
303 * - waiters fall within BH
305 void audit_buffers();
308 * find first buffer that includes or follows an offset
310 * @param offset object byte offset
311 * @return iterator pointing to buffer, or data.end()
313 map
<loff_t
,BufferHead
*>::const_iterator
data_lower_bound(loff_t offset
) const {
314 map
<loff_t
,BufferHead
*>::const_iterator p
= data
.lower_bound(offset
);
315 if (p
!= data
.begin() &&
316 (p
== data
.end() || p
->first
> offset
)) {
317 --p
; // might overlap!
318 if (p
->first
+ p
->second
->length() <= offset
)
319 ++p
; // doesn't overlap.
326 void add_bh(BufferHead
*bh
) {
329 assert(data
.count(bh
->start()) == 0);
330 data
[bh
->start()] = bh
;
332 void remove_bh(BufferHead
*bh
) {
333 assert(data
.count(bh
->start()));
334 data
.erase(bh
->start());
339 bool is_empty() const { return data
.empty(); }
342 BufferHead
*split(BufferHead
*bh
, loff_t off
);
343 void merge_left(BufferHead
*left
, BufferHead
*right
);
344 bool can_merge_bh(BufferHead
*left
, BufferHead
*right
);
345 void try_merge_bh(BufferHead
*bh
);
347 bool is_cached(loff_t off
, loff_t len
) const;
348 bool include_all_cached_data(loff_t off
, loff_t len
);
349 int map_read(ObjectExtent
&ex
,
350 map
<loff_t
, BufferHead
*>& hits
,
351 map
<loff_t
, BufferHead
*>& missing
,
352 map
<loff_t
, BufferHead
*>& rx
,
353 map
<loff_t
, BufferHead
*>& errors
);
354 BufferHead
*map_write(ObjectExtent
&ex
, ceph_tid_t tid
);
356 void replace_journal_tid(BufferHead
*bh
, ceph_tid_t tid
);
357 void truncate(loff_t s
);
358 void discard(loff_t off
, loff_t len
);
360 // reference counting
363 if (ref
== 0) lru_pin();
368 if (ref
== 1) lru_unpin();
379 uint64_t truncate_seq
, truncate_size
;
382 xlist
<Object
*> objects
;
387 ObjectSet(void *p
, int64_t _poolid
, inodeno_t i
)
388 : parent(p
), ino(i
), truncate_seq(0),
389 truncate_size(0), poolid(_poolid
), dirty_or_tx(0),
390 return_enoent(false) {}
395 // ******* ObjectCacher *********
396 // ObjectCacher fields
398 WritebackHandler
& writeback_handler
;
399 bool scattered_write
;
404 uint64_t max_dirty
, target_dirty
, max_size
, max_objects
;
405 ceph::timespan max_dirty_age
;
406 bool block_writes_upfront
;
408 ZTracer::Endpoint trace_endpoint
;
410 flush_set_callback_t flush_set_callback
;
411 void *flush_set_callback_arg
;
413 // indexed by pool_id
414 vector
<ceph::unordered_map
<sobject_t
, Object
*> > objects
;
416 list
<Context
*> waitfor_read
;
418 ceph_tid_t last_read_tid
;
420 set
<BufferHead
*, BufferHead::ptr_lt
> dirty_or_tx_bh
;
421 LRU bh_lru_dirty
, bh_lru_rest
;
426 void flusher_entry();
427 class FlusherThread
: public Thread
{
430 explicit FlusherThread(ObjectCacher
*o
) : oc(o
) {}
431 void *entry() override
{
440 Object
*get_object_maybe(sobject_t oid
, object_locator_t
&l
) {
442 if (((uint32_t)l
.pool
< objects
.size()) &&
443 (objects
[l
.pool
].count(oid
)))
444 return objects
[l
.pool
][oid
];
448 Object
*get_object(sobject_t oid
, uint64_t object_no
, ObjectSet
*oset
,
449 object_locator_t
&l
, uint64_t truncate_size
,
450 uint64_t truncate_seq
);
451 void close_object(Object
*ob
);
463 loff_t stat_dirty_waiting
; // bytes that writers are waiting on to write
465 size_t stat_nr_dirty_waiters
;
467 void verify_stats() const;
469 void bh_stat_add(BufferHead
*bh
);
470 void bh_stat_sub(BufferHead
*bh
);
471 loff_t
get_stat_tx() const { return stat_tx
; }
472 loff_t
get_stat_rx() const { return stat_rx
; }
473 loff_t
get_stat_dirty() const { return stat_dirty
; }
474 loff_t
get_stat_clean() const { return stat_clean
; }
475 loff_t
get_stat_zero() const { return stat_zero
; }
476 loff_t
get_stat_dirty_waiting() const { return stat_dirty_waiting
; }
477 size_t get_stat_nr_dirty_waiters() const { return stat_nr_dirty_waiters
; }
479 void touch_bh(BufferHead
*bh
) {
481 bh_lru_dirty
.lru_touch(bh
);
483 bh_lru_rest
.lru_touch(bh
);
485 bh
->set_dontneed(false);
486 bh
->set_nocache(false);
489 void touch_ob(Object
*ob
) {
490 ob_lru
.lru_touch(ob
);
492 void bottouch_ob(Object
*ob
) {
493 ob_lru
.lru_bottouch(ob
);
497 void bh_set_state(BufferHead
*bh
, int s
);
498 void copy_bh_state(BufferHead
*bh1
, BufferHead
*bh2
) {
499 bh_set_state(bh2
, bh1
->get_state());
502 void mark_missing(BufferHead
*bh
) {
503 bh_set_state(bh
,BufferHead::STATE_MISSING
);
505 void mark_clean(BufferHead
*bh
) {
506 bh_set_state(bh
, BufferHead::STATE_CLEAN
);
508 void mark_zero(BufferHead
*bh
) {
509 bh_set_state(bh
, BufferHead::STATE_ZERO
);
511 void mark_rx(BufferHead
*bh
) {
512 bh_set_state(bh
, BufferHead::STATE_RX
);
514 void mark_tx(BufferHead
*bh
) {
515 bh_set_state(bh
, BufferHead::STATE_TX
); }
516 void mark_error(BufferHead
*bh
) {
517 bh_set_state(bh
, BufferHead::STATE_ERROR
);
519 void mark_dirty(BufferHead
*bh
) {
520 bh_set_state(bh
, BufferHead::STATE_DIRTY
);
521 bh_lru_dirty
.lru_touch(bh
);
522 //bh->set_dirty_stamp(ceph_clock_now());
525 void bh_add(Object
*ob
, BufferHead
*bh
);
526 void bh_remove(Object
*ob
, BufferHead
*bh
);
529 void bh_read(BufferHead
*bh
, int op_flags
,
530 const ZTracer::Trace
&parent_trace
);
531 void bh_write(BufferHead
*bh
, const ZTracer::Trace
&parent_trace
);
532 void bh_write_scattered(list
<BufferHead
*>& blist
);
533 void bh_write_adjacencies(BufferHead
*bh
, ceph::real_time cutoff
,
534 int64_t *amount
, int *max_count
);
537 void flush(ZTracer::Trace
*trace
, loff_t amount
=0);
540 * flush a range of buffers
542 * Flush any buffers that intersect the specified extent. If len==0,
543 * flush *all* buffers for the object.
546 * @param off start offset
547 * @param len extent length, or 0 for entire object
548 * @return true if object was already clean/flushed.
550 bool flush(Object
*o
, loff_t off
, loff_t len
,
551 ZTracer::Trace
*trace
);
552 loff_t
release(Object
*o
);
553 void purge(Object
*o
);
555 int64_t reads_outstanding
;
558 int _readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
,
559 bool external_call
, ZTracer::Trace
*trace
);
560 void retry_waiting_reads();
563 void bh_read_finish(int64_t poolid
, sobject_t oid
, ceph_tid_t tid
,
564 loff_t offset
, uint64_t length
,
565 bufferlist
&bl
, int r
,
567 void bh_write_commit(int64_t poolid
, sobject_t oid
,
568 vector
<pair
<loff_t
, uint64_t> >& ranges
,
569 ceph_tid_t t
, int r
);
572 class C_WaitForWrite
;
579 ObjectCacher(CephContext
*cct_
, string name
, WritebackHandler
& wb
, Mutex
& l
,
580 flush_set_callback_t flush_callback
,
581 void *flush_callback_arg
,
582 uint64_t max_bytes
, uint64_t max_objects
,
583 uint64_t max_dirty
, uint64_t target_dirty
, double max_age
,
584 bool block_writes_upfront
);
588 flusher_thread
.create("flusher");
591 assert(flusher_thread
.is_started());
592 lock
.Lock(); // hmm.. watch out for deadlock!
594 flusher_cond
.Signal();
596 flusher_thread
.join();
603 // non-blocking. async.
606 * @note total read size must be <= INT_MAX, since
607 * the return value is total bytes read
609 int readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
,
610 ZTracer::Trace
*parent_trace
= nullptr);
611 int writex(OSDWrite
*wr
, ObjectSet
*oset
, Context
*onfreespace
,
612 ZTracer::Trace
*parent_trace
= nullptr);
613 bool is_cached(ObjectSet
*oset
, vector
<ObjectExtent
>& extents
,
618 int _wait_for_write(OSDWrite
*wr
, uint64_t len
, ObjectSet
*oset
,
619 ZTracer::Trace
*trace
, Context
*onfreespace
);
620 void maybe_wait_for_writeback(uint64_t len
, ZTracer::Trace
*trace
);
621 bool _flush_set_finish(C_GatherBuilder
*gather
, Context
*onfinish
);
624 bool set_is_empty(ObjectSet
*oset
);
625 bool set_is_cached(ObjectSet
*oset
);
626 bool set_is_dirty_or_committing(ObjectSet
*oset
);
628 bool flush_set(ObjectSet
*oset
, Context
*onfinish
=0);
629 bool flush_set(ObjectSet
*oset
, vector
<ObjectExtent
>& ex
,
630 ZTracer::Trace
*trace
, Context
*onfinish
= 0);
631 bool flush_all(Context
*onfinish
= 0);
633 void purge_set(ObjectSet
*oset
);
635 // returns # of bytes not released (ie non-clean)
636 loff_t
release_set(ObjectSet
*oset
);
637 uint64_t release_all();
639 void discard_set(ObjectSet
*oset
, const vector
<ObjectExtent
>& ex
);
642 * Retry any in-flight reads that get -ENOENT instead of marking
643 * them zero, and get rid of any cached -ENOENTs.
644 * After this is called and the cache's lock is unlocked,
645 * any new requests will treat -ENOENT normally.
647 void clear_nonexistence(ObjectSet
*oset
);
651 void set_max_dirty(uint64_t v
) {
654 void set_target_dirty(int64_t v
) {
657 void set_max_size(int64_t v
) {
660 void set_max_dirty_age(double a
) {
661 max_dirty_age
= make_timespan(a
);
663 void set_max_objects(int64_t v
) {
670 /*** async+caching (non-blocking) file interface ***/
671 int file_is_cached(ObjectSet
*oset
, file_layout_t
*layout
,
672 snapid_t snapid
, loff_t offset
, uint64_t len
) {
673 vector
<ObjectExtent
> extents
;
674 Striper::file_to_extents(cct
, oset
->ino
, layout
, offset
, len
,
675 oset
->truncate_size
, extents
);
676 return is_cached(oset
, extents
, snapid
);
679 int file_read(ObjectSet
*oset
, file_layout_t
*layout
, snapid_t snapid
,
680 loff_t offset
, uint64_t len
, bufferlist
*bl
, int flags
,
682 OSDRead
*rd
= prepare_read(snapid
, bl
, flags
);
683 Striper::file_to_extents(cct
, oset
->ino
, layout
, offset
, len
,
684 oset
->truncate_size
, rd
->extents
);
685 return readx(rd
, oset
, onfinish
);
688 int file_write(ObjectSet
*oset
, file_layout_t
*layout
,
689 const SnapContext
& snapc
, loff_t offset
, uint64_t len
,
690 bufferlist
& bl
, ceph::real_time mtime
, int flags
) {
691 OSDWrite
*wr
= prepare_write(snapc
, bl
, mtime
, flags
, 0);
692 Striper::file_to_extents(cct
, oset
->ino
, layout
, offset
, len
,
693 oset
->truncate_size
, wr
->extents
);
694 return writex(wr
, oset
, NULL
);
697 bool file_flush(ObjectSet
*oset
, file_layout_t
*layout
,
698 const SnapContext
& snapc
, loff_t offset
, uint64_t len
,
700 vector
<ObjectExtent
> extents
;
701 Striper::file_to_extents(cct
, oset
->ino
, layout
, offset
, len
,
702 oset
->truncate_size
, extents
);
703 ZTracer::Trace trace
;
704 return flush_set(oset
, extents
, &trace
, onfinish
);
709 inline ostream
& operator<<(ostream
&out
, const ObjectCacher::BufferHead
&bh
)
711 out
<< "bh[ " << &bh
<< " "
712 << bh
.start() << "~" << bh
.length()
714 << " (" << bh
.bl
.length() << ")"
715 << " v " << bh
.last_write_tid
;
716 if (bh
.get_journal_tid() != 0) {
717 out
<< " j " << bh
.get_journal_tid();
719 if (bh
.is_tx()) out
<< " tx";
720 if (bh
.is_rx()) out
<< " rx";
721 if (bh
.is_dirty()) out
<< " dirty";
722 if (bh
.is_clean()) out
<< " clean";
723 if (bh
.is_zero()) out
<< " zero";
724 if (bh
.is_missing()) out
<< " missing";
725 if (bh
.bl
.length() > 0) out
<< " firstbyte=" << (int)bh
.bl
[0];
726 if (bh
.error
) out
<< " error=" << bh
.error
;
728 out
<< " waiters = {";
729 for (map
<loff_t
, list
<Context
*> >::const_iterator it
730 = bh
.waitfor_read
.begin();
731 it
!= bh
.waitfor_read
.end(); ++it
) {
732 out
<< " " << it
->first
<< "->[";
733 for (list
<Context
*>::const_iterator lit
= it
->second
.begin();
734 lit
!= it
->second
.end(); ++lit
) {
743 inline ostream
& operator<<(ostream
&out
, const ObjectCacher::ObjectSet
&os
)
745 return out
<< "objectset[" << os
.ino
746 << " ts " << os
.truncate_seq
<< "/" << os
.truncate_size
747 << " objects " << os
.objects
.size()
748 << " dirty_or_tx " << os
.dirty_or_tx
752 inline ostream
& operator<<(ostream
&out
, const ObjectCacher::Object
&ob
)
755 << ob
.get_soid() << " oset " << ob
.oset
<< dec
756 << " wr " << ob
.last_write_tid
<< "/" << ob
.last_commit_tid
;