1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #ifndef CEPH_OBJECTCACHER_H
4 #define CEPH_OBJECTCACHER_H
6 #include "include/types.h"
7 #include "include/lru.h"
8 #include "include/Context.h"
9 #include "include/xlist.h"
10 #include "include/common_fwd.h"
12 #include "common/Cond.h"
13 #include "common/Finisher.h"
14 #include "common/Thread.h"
15 #include "common/zipkin_trace.h"
20 class WritebackHandler
;
23 l_objectcacher_first
= 25000,
25 l_objectcacher_cache_ops_hit
, // ops we satisfy completely from cache
26 l_objectcacher_cache_ops_miss
, // ops we don't satisfy completely from cache
28 l_objectcacher_cache_bytes_hit
, // bytes read directly from cache
30 l_objectcacher_cache_bytes_miss
, // bytes we couldn't read directly
34 l_objectcacher_data_read
, // total bytes read out
35 l_objectcacher_data_written
, // bytes written to cache
36 l_objectcacher_data_flushed
, // bytes flushed to WritebackHandler
37 l_objectcacher_overwritten_in_flush
, // bytes overwritten while
38 // flushing is in progress
40 l_objectcacher_write_ops_blocked
, // total write ops we delayed due
42 l_objectcacher_write_bytes_blocked
, // total number of write bytes
43 // we delayed due to dirty
45 l_objectcacher_write_time_blocked
, // total time in seconds spent
46 // blocking a write due to dirty
53 PerfCounters
*perfcounter
;
60 typedef void (*flush_set_callback_t
) (void *p
, ObjectSet
*oset
);
62 // read scatter/gather
64 std::vector
<ObjectExtent
> extents
;
66 ceph::buffer::list
*bl
;
68 OSDRead(snapid_t s
, ceph::buffer::list
*b
, int f
)
69 : snap(s
), bl(b
), fadvise_flags(f
) {}
72 OSDRead
*prepare_read(snapid_t snap
, ceph::buffer::list
*b
, int f
) const {
73 return new OSDRead(snap
, b
, f
);
76 // write scatter/gather
78 std::vector
<ObjectExtent
> extents
;
80 ceph::buffer::list bl
;
81 ceph::real_time mtime
;
83 ceph_tid_t journal_tid
;
84 OSDWrite(const SnapContext
& sc
, const ceph::buffer::list
& b
, ceph::real_time mt
,
85 int f
, ceph_tid_t _journal_tid
)
86 : snapc(sc
), bl(b
), mtime(mt
), fadvise_flags(f
),
87 journal_tid(_journal_tid
) {}
90 OSDWrite
*prepare_write(const SnapContext
& sc
,
91 const ceph::buffer::list
&b
,
94 ceph_tid_t journal_tid
) const {
95 return new OSDWrite(sc
, b
, mt
, f
, journal_tid
);
100 // ******* BufferHead *********
101 class BufferHead
: public LRUObject
{
104 static const int STATE_MISSING
= 0;
105 static const int STATE_CLEAN
= 1;
106 static const int STATE_ZERO
= 2; // NOTE: these are *clean* zeros
107 static const int STATE_DIRTY
= 3;
108 static const int STATE_RX
= 4;
109 static const int STATE_TX
= 5;
110 static const int STATE_ERROR
= 6; // a read error occurred
117 loff_t start
, length
; // bh extent in object
119 bool dontneed
; //indicate bh don't need by anyone
120 bool nocache
; //indicate bh don't need by this caller
124 ceph::buffer::list bl
;
125 ceph_tid_t last_write_tid
; // version of bh (if non-zero)
126 ceph_tid_t last_read_tid
; // tid of last read op (if any)
127 ceph::real_time last_write
;
129 ceph_tid_t journal_tid
;
130 int error
; // holds return value for failed reads
132 std::map
<loff_t
, std::list
<Context
*> > waitfor_read
;
135 explicit BufferHead(Object
*o
) :
136 state(STATE_MISSING
),
145 ex
.start
= ex
.length
= 0;
149 loff_t
start() const { return ex
.start
; }
150 void set_start(loff_t s
) { ex
.start
= s
; }
151 loff_t
length() const { return ex
.length
; }
152 void set_length(loff_t l
) { ex
.length
= l
; }
153 loff_t
end() const { return ex
.start
+ ex
.length
; }
154 loff_t
last() const { return end() - 1; }
157 void set_state(int s
) {
158 if (s
== STATE_RX
|| s
== STATE_TX
) get();
159 if (state
== STATE_RX
|| state
== STATE_TX
) put();
162 int get_state() const { return state
; }
164 inline int get_error() const {
167 inline void set_error(int _error
) {
171 inline ceph_tid_t
get_journal_tid() const {
174 inline void set_journal_tid(ceph_tid_t _journal_tid
) {
175 journal_tid
= _journal_tid
;
178 bool is_missing() const { return state
== STATE_MISSING
; }
179 bool is_dirty() const { return state
== STATE_DIRTY
; }
180 bool is_clean() const { return state
== STATE_CLEAN
; }
181 bool is_zero() const { return state
== STATE_ZERO
; }
182 bool is_tx() const { return state
== STATE_TX
; }
183 bool is_rx() const { return state
== STATE_RX
; }
184 bool is_error() const { return state
== STATE_ERROR
; }
186 // reference counting
188 ceph_assert(ref
>= 0);
189 if (ref
== 0) lru_pin();
193 ceph_assert(ref
> 0);
194 if (ref
== 1) lru_unpin();
199 void set_dontneed(bool v
) {
202 bool get_dontneed() const {
206 void set_nocache(bool v
) {
209 bool get_nocache() const {
213 inline bool can_merge_journal(BufferHead
*bh
) const {
214 return (get_journal_tid() == bh
->get_journal_tid());
218 bool operator()(const BufferHead
* l
, const BufferHead
* r
) const {
219 const Object
*lob
= l
->ob
;
220 const Object
*rob
= r
->ob
;
221 const ObjectSet
*loset
= lob
->oset
;
222 const ObjectSet
*roset
= rob
->oset
;
224 return loset
< roset
;
227 if (l
->start() != r
->start())
228 return l
->start() < r
->start();
234 // ******* Object *********
235 class Object
: public LRUObject
{
237 // ObjectCacher::Object fields
241 friend struct ObjectSet
;
246 xlist
<Object
*>::item set_item
;
247 object_locator_t oloc
;
248 uint64_t truncate_size
, truncate_seq
;
253 std::map
<loff_t
, BufferHead
*> data
;
255 ceph_tid_t last_write_tid
; // version of bh (if non-zero)
256 ceph_tid_t last_commit_tid
; // last update committed.
260 std::map
< ceph_tid_t
, std::list
<Context
*> > waitfor_commit
;
261 xlist
<C_ReadFinish
*> reads
;
263 Object(const Object
&) = delete;
264 Object
& operator=(const Object
&) = delete;
266 Object(ObjectCacher
*_oc
, sobject_t o
, uint64_t ono
, ObjectSet
*os
,
267 object_locator_t
& l
, uint64_t ts
, uint64_t tq
) :
270 oid(o
), object_no(ono
), oset(os
), set_item(this), oloc(l
),
271 truncate_size(ts
), truncate_seq(tq
),
272 complete(false), exists(true),
273 last_write_tid(0), last_commit_tid(0),
276 os
->objects
.push_back(&set_item
);
280 ceph_assert(ref
== 0);
281 ceph_assert(data
.empty());
282 ceph_assert(dirty_or_tx
== 0);
283 set_item
.remove_myself();
286 sobject_t
get_soid() const { return oid
; }
287 object_t
get_oid() { return oid
.oid
; }
288 snapid_t
get_snap() { return oid
.snap
; }
289 ObjectSet
*get_object_set() const { return oset
; }
290 std::string
get_namespace() { return oloc
.nspace
; }
291 uint64_t get_object_number() const { return object_no
; }
293 const object_locator_t
& get_oloc() const { return oloc
; }
294 void set_object_locator(object_locator_t
& l
) { oloc
= l
; }
296 bool can_close() const {
297 if (lru_is_expireable()) {
298 ceph_assert(data
.empty());
299 ceph_assert(waitfor_commit
.empty());
306 * Check buffers and waiters for consistency
307 * - no overlapping buffers
308 * - index in map matches BH
309 * - waiters fall within BH
311 void audit_buffers();
314 * find first buffer that includes or follows an offset
316 * @param offset object byte offset
317 * @return iterator pointing to buffer, or data.end()
319 std::map
<loff_t
,BufferHead
*>::const_iterator
data_lower_bound(loff_t offset
) const {
320 auto p
= data
.lower_bound(offset
);
321 if (p
!= data
.begin() &&
322 (p
== data
.end() || p
->first
> offset
)) {
323 --p
; // might overlap!
324 if (p
->first
+ p
->second
->length() <= offset
)
325 ++p
; // doesn't overlap.
332 void add_bh(BufferHead
*bh
) {
335 ceph_assert(data
.count(bh
->start()) == 0);
336 data
[bh
->start()] = bh
;
338 void remove_bh(BufferHead
*bh
) {
339 ceph_assert(data
.count(bh
->start()));
340 data
.erase(bh
->start());
345 bool is_empty() const { return data
.empty(); }
348 BufferHead
*split(BufferHead
*bh
, loff_t off
);
349 void merge_left(BufferHead
*left
, BufferHead
*right
);
350 bool can_merge_bh(BufferHead
*left
, BufferHead
*right
);
351 void try_merge_bh(BufferHead
*bh
);
352 void maybe_rebuild_buffer(BufferHead
*bh
);
354 bool is_cached(loff_t off
, loff_t len
) const;
355 bool include_all_cached_data(loff_t off
, loff_t len
);
356 int map_read(ObjectExtent
&ex
,
357 std::map
<loff_t
, BufferHead
*>& hits
,
358 std::map
<loff_t
, BufferHead
*>& missing
,
359 std::map
<loff_t
, BufferHead
*>& rx
,
360 std::map
<loff_t
, BufferHead
*>& errors
);
361 BufferHead
*map_write(ObjectExtent
&ex
, ceph_tid_t tid
);
363 void replace_journal_tid(BufferHead
*bh
, ceph_tid_t tid
);
364 void truncate(loff_t s
);
365 void discard(loff_t off
, loff_t len
, C_GatherBuilder
* commit_gather
);
367 // reference counting
369 ceph_assert(ref
>= 0);
370 if (ref
== 0) lru_pin();
374 ceph_assert(ref
> 0);
375 if (ref
== 1) lru_unpin();
386 uint64_t truncate_seq
, truncate_size
;
389 xlist
<Object
*> objects
;
394 ObjectSet(void *p
, int64_t _poolid
, inodeno_t i
)
395 : parent(p
), ino(i
), truncate_seq(0),
396 truncate_size(0), poolid(_poolid
), dirty_or_tx(0),
397 return_enoent(false) {}
402 // ******* ObjectCacher *********
403 // ObjectCacher fields
405 WritebackHandler
& writeback_handler
;
406 bool scattered_write
;
411 uint64_t max_dirty
, target_dirty
, max_size
, max_objects
;
412 ceph::timespan max_dirty_age
;
413 bool block_writes_upfront
;
415 ZTracer::Endpoint trace_endpoint
;
417 flush_set_callback_t flush_set_callback
;
418 void *flush_set_callback_arg
;
420 // indexed by pool_id
421 std::vector
<ceph::unordered_map
<sobject_t
, Object
*> > objects
;
423 std::list
<Context
*> waitfor_read
;
425 ceph_tid_t last_read_tid
;
427 std::set
<BufferHead
*, BufferHead::ptr_lt
> dirty_or_tx_bh
;
428 LRU bh_lru_dirty
, bh_lru_rest
;
431 ceph::condition_variable flusher_cond
;
433 void flusher_entry();
434 class FlusherThread
: public Thread
{
437 explicit FlusherThread(ObjectCacher
*o
) : oc(o
) {}
438 void *entry() override
{
447 Object
*get_object_maybe(sobject_t oid
, object_locator_t
&l
) {
449 if (((uint32_t)l
.pool
< objects
.size()) &&
450 (objects
[l
.pool
].count(oid
)))
451 return objects
[l
.pool
][oid
];
455 Object
*get_object(sobject_t oid
, uint64_t object_no
, ObjectSet
*oset
,
456 object_locator_t
&l
, uint64_t truncate_size
,
457 uint64_t truncate_seq
);
458 void close_object(Object
*ob
);
461 ceph::condition_variable stat_cond
;
470 loff_t stat_dirty_waiting
; // bytes that writers are waiting on to write
472 size_t stat_nr_dirty_waiters
;
474 void verify_stats() const;
476 void bh_stat_add(BufferHead
*bh
);
477 void bh_stat_sub(BufferHead
*bh
);
478 loff_t
get_stat_tx() const { return stat_tx
; }
479 loff_t
get_stat_rx() const { return stat_rx
; }
480 loff_t
get_stat_dirty() const { return stat_dirty
; }
481 loff_t
get_stat_clean() const { return stat_clean
; }
482 loff_t
get_stat_zero() const { return stat_zero
; }
483 loff_t
get_stat_dirty_waiting() const { return stat_dirty_waiting
; }
484 size_t get_stat_nr_dirty_waiters() const { return stat_nr_dirty_waiters
; }
486 void touch_bh(BufferHead
*bh
) {
488 bh_lru_dirty
.lru_touch(bh
);
490 bh_lru_rest
.lru_touch(bh
);
492 bh
->set_dontneed(false);
493 bh
->set_nocache(false);
496 void touch_ob(Object
*ob
) {
497 ob_lru
.lru_touch(ob
);
499 void bottouch_ob(Object
*ob
) {
500 ob_lru
.lru_bottouch(ob
);
504 void bh_set_state(BufferHead
*bh
, int s
);
505 void copy_bh_state(BufferHead
*bh1
, BufferHead
*bh2
) {
506 bh_set_state(bh2
, bh1
->get_state());
509 void mark_missing(BufferHead
*bh
) {
510 bh_set_state(bh
,BufferHead::STATE_MISSING
);
512 void mark_clean(BufferHead
*bh
) {
513 bh_set_state(bh
, BufferHead::STATE_CLEAN
);
515 void mark_zero(BufferHead
*bh
) {
516 bh_set_state(bh
, BufferHead::STATE_ZERO
);
518 void mark_rx(BufferHead
*bh
) {
519 bh_set_state(bh
, BufferHead::STATE_RX
);
521 void mark_tx(BufferHead
*bh
) {
522 bh_set_state(bh
, BufferHead::STATE_TX
); }
523 void mark_error(BufferHead
*bh
) {
524 bh_set_state(bh
, BufferHead::STATE_ERROR
);
526 void mark_dirty(BufferHead
*bh
) {
527 bh_set_state(bh
, BufferHead::STATE_DIRTY
);
528 bh_lru_dirty
.lru_touch(bh
);
529 //bh->set_dirty_stamp(ceph_clock_now());
532 void bh_add(Object
*ob
, BufferHead
*bh
);
533 void bh_remove(Object
*ob
, BufferHead
*bh
);
536 void bh_read(BufferHead
*bh
, int op_flags
,
537 const ZTracer::Trace
&parent_trace
);
538 void bh_write(BufferHead
*bh
, const ZTracer::Trace
&parent_trace
);
539 void bh_write_scattered(std::list
<BufferHead
*>& blist
);
540 void bh_write_adjacencies(BufferHead
*bh
, ceph::real_time cutoff
,
541 int64_t *amount
, int *max_count
);
544 void flush(ZTracer::Trace
*trace
, loff_t amount
=0);
547 * flush a range of buffers
549 * Flush any buffers that intersect the specified extent. If len==0,
550 * flush *all* buffers for the object.
553 * @param off start offset
554 * @param len extent length, or 0 for entire object
555 * @return true if object was already clean/flushed.
557 bool flush(Object
*o
, loff_t off
, loff_t len
,
558 ZTracer::Trace
*trace
);
559 loff_t
release(Object
*o
);
560 void purge(Object
*o
);
562 int64_t reads_outstanding
;
563 ceph::condition_variable read_cond
;
565 int _readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
,
566 bool external_call
, ZTracer::Trace
*trace
);
567 void retry_waiting_reads();
570 void bh_read_finish(int64_t poolid
, sobject_t oid
, ceph_tid_t tid
,
571 loff_t offset
, uint64_t length
,
572 ceph::buffer::list
&bl
, int r
,
574 void bh_write_commit(int64_t poolid
, sobject_t oid
,
575 std::vector
<std::pair
<loff_t
, uint64_t> >& ranges
,
576 ceph_tid_t t
, int r
);
579 class C_WaitForWrite
;
586 ObjectCacher(CephContext
*cct_
, std::string name
, WritebackHandler
& wb
, ceph::mutex
& l
,
587 flush_set_callback_t flush_callback
,
588 void *flush_callback_arg
,
589 uint64_t max_bytes
, uint64_t max_objects
,
590 uint64_t max_dirty
, uint64_t target_dirty
, double max_age
,
591 bool block_writes_upfront
);
595 flusher_thread
.create("flusher");
598 ceph_assert(flusher_thread
.is_started());
599 lock
.lock(); // hmm.. watch out for deadlock!
601 flusher_cond
.notify_all();
603 flusher_thread
.join();
610 // non-blocking. async.
613 * @note total read size must be <= INT_MAX, since
614 * the return value is total bytes read
616 int readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
,
617 ZTracer::Trace
*parent_trace
= nullptr);
618 int writex(OSDWrite
*wr
, ObjectSet
*oset
, Context
*onfreespace
,
619 ZTracer::Trace
*parent_trace
= nullptr);
620 bool is_cached(ObjectSet
*oset
, std::vector
<ObjectExtent
>& extents
,
625 int _wait_for_write(OSDWrite
*wr
, uint64_t len
, ObjectSet
*oset
,
626 ZTracer::Trace
*trace
, Context
*onfreespace
);
627 void _maybe_wait_for_writeback(uint64_t len
, ZTracer::Trace
*trace
);
628 bool _flush_set_finish(C_GatherBuilder
*gather
, Context
*onfinish
);
630 void _discard(ObjectSet
*oset
, const std::vector
<ObjectExtent
>& exls
,
631 C_GatherBuilder
* gather
);
632 void _discard_finish(ObjectSet
*oset
, bool was_dirty
, Context
* on_finish
);
635 bool set_is_empty(ObjectSet
*oset
);
636 bool set_is_cached(ObjectSet
*oset
);
637 bool set_is_dirty_or_committing(ObjectSet
*oset
);
639 bool flush_set(ObjectSet
*oset
, Context
*onfinish
=0);
640 bool flush_set(ObjectSet
*oset
, std::vector
<ObjectExtent
>& ex
,
641 ZTracer::Trace
*trace
, Context
*onfinish
= 0);
642 bool flush_all(Context
*onfinish
= 0);
644 void purge_set(ObjectSet
*oset
);
646 // returns # of bytes not released (ie non-clean)
647 loff_t
release_set(ObjectSet
*oset
);
648 uint64_t release_all();
650 void discard_set(ObjectSet
*oset
, const std::vector
<ObjectExtent
>& ex
);
651 void discard_writeback(ObjectSet
*oset
, const std::vector
<ObjectExtent
>& ex
,
655 * Retry any in-flight reads that get -ENOENT instead of marking
656 * them zero, and get rid of any cached -ENOENTs.
657 * After this is called and the cache's lock is unlocked,
658 * any new requests will treat -ENOENT normally.
660 void clear_nonexistence(ObjectSet
*oset
);
664 void set_max_dirty(uint64_t v
) {
667 void set_target_dirty(int64_t v
) {
670 void set_max_size(int64_t v
) {
673 void set_max_dirty_age(double a
) {
674 max_dirty_age
= ceph::make_timespan(a
);
676 void set_max_objects(int64_t v
) {
683 /*** async+caching (non-blocking) file interface ***/
684 int file_is_cached(ObjectSet
*oset
, file_layout_t
*layout
,
685 snapid_t snapid
, loff_t offset
, uint64_t len
) {
686 std::vector
<ObjectExtent
> extents
;
687 Striper::file_to_extents(cct
, oset
->ino
, layout
, offset
, len
,
688 oset
->truncate_size
, extents
);
689 return is_cached(oset
, extents
, snapid
);
692 int file_read(ObjectSet
*oset
, file_layout_t
*layout
, snapid_t snapid
,
693 loff_t offset
, uint64_t len
, ceph::buffer::list
*bl
, int flags
,
695 OSDRead
*rd
= prepare_read(snapid
, bl
, flags
);
696 Striper::file_to_extents(cct
, oset
->ino
, layout
, offset
, len
,
697 oset
->truncate_size
, rd
->extents
);
698 return readx(rd
, oset
, onfinish
);
701 int file_write(ObjectSet
*oset
, file_layout_t
*layout
,
702 const SnapContext
& snapc
, loff_t offset
, uint64_t len
,
703 ceph::buffer::list
& bl
, ceph::real_time mtime
, int flags
) {
704 OSDWrite
*wr
= prepare_write(snapc
, bl
, mtime
, flags
, 0);
705 Striper::file_to_extents(cct
, oset
->ino
, layout
, offset
, len
,
706 oset
->truncate_size
, wr
->extents
);
707 return writex(wr
, oset
, nullptr);
710 bool file_flush(ObjectSet
*oset
, file_layout_t
*layout
,
711 const SnapContext
& snapc
, loff_t offset
, uint64_t len
,
713 std::vector
<ObjectExtent
> extents
;
714 Striper::file_to_extents(cct
, oset
->ino
, layout
, offset
, len
,
715 oset
->truncate_size
, extents
);
716 ZTracer::Trace trace
;
717 return flush_set(oset
, extents
, &trace
, onfinish
);
722 inline std::ostream
& operator<<(std::ostream
&out
,
723 const ObjectCacher::BufferHead
&bh
)
725 out
<< "bh[ " << &bh
<< " "
726 << bh
.start() << "~" << bh
.length()
728 << " (" << bh
.bl
.length() << ")"
729 << " v " << bh
.last_write_tid
;
730 if (bh
.get_journal_tid() != 0) {
731 out
<< " j " << bh
.get_journal_tid();
733 if (bh
.is_tx()) out
<< " tx";
734 if (bh
.is_rx()) out
<< " rx";
735 if (bh
.is_dirty()) out
<< " dirty";
736 if (bh
.is_clean()) out
<< " clean";
737 if (bh
.is_zero()) out
<< " zero";
738 if (bh
.is_missing()) out
<< " missing";
739 if (bh
.bl
.length() > 0) out
<< " firstbyte=" << (int)bh
.bl
[0];
740 if (bh
.error
) out
<< " error=" << bh
.error
;
742 out
<< " waiters = {";
743 for (auto it
= bh
.waitfor_read
.begin(); it
!= bh
.waitfor_read
.end(); ++it
) {
744 out
<< " " << it
->first
<< "->[";
745 for (auto lit
= it
->second
.begin();
746 lit
!= it
->second
.end(); ++lit
) {
755 inline std::ostream
& operator<<(std::ostream
&out
,
756 const ObjectCacher::ObjectSet
&os
)
758 return out
<< "objectset[" << os
.ino
759 << " ts " << os
.truncate_seq
<< "/" << os
.truncate_size
760 << " objects " << os
.objects
.size()
761 << " dirty_or_tx " << os
.dirty_or_tx
765 inline std::ostream
& operator<<(std::ostream
&out
,
766 const ObjectCacher::Object
&ob
)
769 << ob
.get_soid() << " oset " << ob
.oset
<< std::dec
770 << " wr " << ob
.last_write_tid
<< "/" << ob
.last_commit_tid
;