1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include "msg/Messenger.h"
7 #include "ObjectCacher.h"
8 #include "WritebackHandler.h"
9 #include "common/errno.h"
10 #include "common/perf_counters.h"
12 #include "include/ceph_assert.h"
14 #define MAX_FLUSH_UNDER_LOCK 20 ///< max bh's we start writeback on
15 #define BUFFER_MEMORY_WEIGHT CEPH_PAGE_SHIFT // memory usage of BufferHead, count in (1<<n)
16 /// while holding the lock
18 using std::chrono::seconds
;
27 using ceph::bufferlist
;
29 using namespace std::literals
;
31 /*** ObjectCacher::BufferHead ***/
34 /*** ObjectCacher::Object ***/
36 #define dout_subsys ceph_subsys_objectcacher
38 #define dout_prefix *_dout << "objectcacher.object(" << oid << ") "
42 class ObjectCacher::C_ReadFinish
: public Context
{
48 xlist
<C_ReadFinish
*>::item set_item
;
55 C_ReadFinish(ObjectCacher
*c
, Object
*ob
, ceph_tid_t t
, loff_t s
,
56 uint64_t l
, const ZTracer::Trace
&trace
) :
57 oc(c
), poolid(ob
->oloc
.pool
), oid(ob
->get_soid()), start(s
), length(l
),
58 set_item(this), trust_enoent(true),
59 tid(t
), trace(trace
) {
60 ob
->reads
.push_back(&set_item
);
63 void finish(int r
) override
{
64 oc
->bh_read_finish(poolid
, oid
, tid
, start
, length
, bl
, r
, trust_enoent
);
65 trace
.event("finish");
67 // object destructor clears the list
68 if (set_item
.is_on_list())
69 set_item
.remove_myself();
72 void distrust_enoent() {
77 class ObjectCacher::C_RetryRead
: public Context
{
84 C_RetryRead(ObjectCacher
*_oc
, OSDRead
*r
, ObjectSet
*os
, Context
*c
,
85 const ZTracer::Trace
&trace
)
86 : oc(_oc
), rd(r
), oset(os
), onfinish(c
), trace(trace
) {
88 void finish(int r
) override
{
90 r
= oc
->_readx(rd
, oset
, onfinish
, false, &trace
);
94 // read is still in-progress
98 trace
.event("finish");
100 onfinish
->complete(r
);
105 ObjectCacher::BufferHead
*ObjectCacher::Object::split(BufferHead
*left
,
108 ceph_assert(ceph_mutex_is_locked(oc
->lock
));
109 ldout(oc
->cct
, 20) << "split " << *left
<< " at " << off
<< dendl
;
112 ObjectCacher::BufferHead
*right
= new BufferHead(this);
114 //inherit and if later access, this auto clean.
115 right
->set_dontneed(left
->get_dontneed());
116 right
->set_nocache(left
->get_nocache());
118 right
->last_write_tid
= left
->last_write_tid
;
119 right
->last_read_tid
= left
->last_read_tid
;
120 right
->set_state(left
->get_state());
121 right
->set_error(left
->error
);
122 right
->snapc
= left
->snapc
;
123 right
->set_journal_tid(left
->journal_tid
);
125 loff_t newleftlen
= off
- left
->start();
126 right
->set_start(off
);
127 right
->set_length(left
->length() - newleftlen
);
130 oc
->bh_stat_sub(left
);
131 left
->set_length(newleftlen
);
132 oc
->bh_stat_add(left
);
135 oc
->bh_add(this, right
);
139 bl
= std::move(left
->bl
);
141 ceph_assert(bl
.length() == (left
->length() + right
->length()));
142 right
->bl
.substr_of(bl
, left
->length(), right
->length());
143 left
->bl
.substr_of(bl
, 0, left
->length());
147 if (!left
->waitfor_read
.empty()) {
148 auto start_remove
= left
->waitfor_read
.begin();
149 while (start_remove
!= left
->waitfor_read
.end() &&
150 start_remove
->first
< right
->start())
152 for (auto p
= start_remove
; p
!= left
->waitfor_read
.end(); ++p
) {
153 ldout(oc
->cct
, 20) << "split moving waiters at byte " << p
->first
154 << " to right bh" << dendl
;
155 right
->waitfor_read
[p
->first
].swap( p
->second
);
156 ceph_assert(p
->second
.empty());
158 left
->waitfor_read
.erase(start_remove
, left
->waitfor_read
.end());
161 ldout(oc
->cct
, 20) << "split left is " << *left
<< dendl
;
162 ldout(oc
->cct
, 20) << "split right is " << *right
<< dendl
;
167 void ObjectCacher::Object::merge_left(BufferHead
*left
, BufferHead
*right
)
169 ceph_assert(ceph_mutex_is_locked(oc
->lock
));
171 ldout(oc
->cct
, 10) << "merge_left " << *left
<< " + " << *right
<< dendl
;
172 if (left
->get_journal_tid() == 0) {
173 left
->set_journal_tid(right
->get_journal_tid());
175 right
->set_journal_tid(0);
177 oc
->bh_remove(this, right
);
178 oc
->bh_stat_sub(left
);
179 left
->set_length(left
->length() + right
->length());
180 oc
->bh_stat_add(left
);
183 left
->bl
.claim_append(right
->bl
);
186 // note: this is sorta busted, but should only be used for dirty buffers
187 left
->last_write_tid
= std::max( left
->last_write_tid
, right
->last_write_tid
);
188 left
->last_write
= std::max( left
->last_write
, right
->last_write
);
190 left
->set_dontneed(right
->get_dontneed() ? left
->get_dontneed() : false);
191 left
->set_nocache(right
->get_nocache() ? left
->get_nocache() : false);
194 for (auto p
= right
->waitfor_read
.begin();
195 p
!= right
->waitfor_read
.end();
197 left
->waitfor_read
[p
->first
].splice(left
->waitfor_read
[p
->first
].begin(),
203 ldout(oc
->cct
, 10) << "merge_left result " << *left
<< dendl
;
206 bool ObjectCacher::Object::can_merge_bh(BufferHead
*left
, BufferHead
*right
)
208 if (left
->end() != right
->start() ||
209 left
->get_state() != right
->get_state() ||
210 !left
->can_merge_journal(right
))
212 if (left
->is_tx() && left
->last_write_tid
!= right
->last_write_tid
)
217 void ObjectCacher::Object::try_merge_bh(BufferHead
*bh
)
219 ceph_assert(ceph_mutex_is_locked(oc
->lock
));
220 ldout(oc
->cct
, 10) << "try_merge_bh " << *bh
<< dendl
;
222 // do not merge rx buffers; last_read_tid may not match
227 auto p
= data
.find(bh
->start());
228 ceph_assert(p
->second
== bh
);
229 if (p
!= data
.begin()) {
231 if (can_merge_bh(p
->second
, bh
)) {
232 merge_left(p
->second
, bh
);
239 ceph_assert(p
->second
== bh
);
241 if (p
!= data
.end() && can_merge_bh(bh
, p
->second
))
242 merge_left(bh
, p
->second
);
244 maybe_rebuild_buffer(bh
);
247 void ObjectCacher::Object::maybe_rebuild_buffer(BufferHead
*bh
)
250 if (bl
.get_num_buffers() <= 1)
253 auto wasted
= bl
.get_wasted_space();
254 if (wasted
* 2 > bl
.length() &&
255 wasted
> (1U << BUFFER_MEMORY_WEIGHT
))
260 * count bytes we have cached in given range
262 bool ObjectCacher::Object::is_cached(loff_t cur
, loff_t left
) const
264 ceph_assert(ceph_mutex_is_locked(oc
->lock
));
265 auto p
= data_lower_bound(cur
);
270 if (p
->first
<= cur
) {
272 loff_t lenfromcur
= std::min(p
->second
->end() - cur
, left
);
277 } else if (p
->first
> cur
) {
288 * all cached data in this range[off, off+len]
290 bool ObjectCacher::Object::include_all_cached_data(loff_t off
, loff_t len
)
292 ceph_assert(ceph_mutex_is_locked(oc
->lock
));
295 auto first
= data
.begin();
296 auto last
= data
.rbegin();
297 if (first
->second
->start() >= off
&& last
->second
->end() <= (off
+ len
))
304 * map a range of bytes into buffer_heads.
305 * - create missing buffer_heads as necessary.
307 int ObjectCacher::Object::map_read(ObjectExtent
&ex
,
308 map
<loff_t
, BufferHead
*>& hits
,
309 map
<loff_t
, BufferHead
*>& missing
,
310 map
<loff_t
, BufferHead
*>& rx
,
311 map
<loff_t
, BufferHead
*>& errors
)
313 ceph_assert(ceph_mutex_is_locked(oc
->lock
));
314 ldout(oc
->cct
, 10) << "map_read " << ex
.oid
<< " "
315 << ex
.offset
<< "~" << ex
.length
<< dendl
;
317 loff_t cur
= ex
.offset
;
318 loff_t left
= ex
.length
;
320 auto p
= data_lower_bound(ex
.offset
);
323 if (p
== data
.end()) {
325 BufferHead
*n
= new BufferHead(this);
332 ldout(oc
->cct
, 20) << "map_read miss+complete+zero " << left
<< " left, " << *n
<< dendl
;
335 ldout(oc
->cct
, 20) << "map_read miss " << left
<< " left, " << *n
<< dendl
;
338 ceph_assert(cur
== (loff_t
)ex
.offset
+ (loff_t
)ex
.length
);
342 if (p
->first
<= cur
) {
343 // have it (or part of it)
344 BufferHead
*e
= p
->second
;
350 hits
[cur
] = e
; // readable!
351 ldout(oc
->cct
, 20) << "map_read hit " << *e
<< dendl
;
352 } else if (e
->is_rx()) {
353 rx
[cur
] = e
; // missing, not readable.
354 ldout(oc
->cct
, 20) << "map_read rx " << *e
<< dendl
;
355 } else if (e
->is_error()) {
357 ldout(oc
->cct
, 20) << "map_read error " << *e
<< dendl
;
362 loff_t lenfromcur
= std::min(e
->end() - cur
, left
);
368 } else if (p
->first
> cur
) {
370 loff_t next
= p
->first
;
371 BufferHead
*n
= new BufferHead(this);
372 loff_t len
= std::min(next
- cur
, left
);
379 ldout(oc
->cct
, 20) << "map_read gap+complete+zero " << *n
<< dendl
;
382 ldout(oc
->cct
, 20) << "map_read gap " << *n
<< dendl
;
384 cur
+= std::min(left
, n
->length());
385 left
-= std::min(left
, n
->length());
394 void ObjectCacher::Object::audit_buffers()
397 for (auto it
= data
.begin(); it
!= data
.end(); ++it
) {
398 if (it
->first
!= it
->second
->start()) {
399 lderr(oc
->cct
) << "AUDIT FAILURE: map position " << it
->first
400 << " does not match bh start position: "
401 << *it
->second
<< dendl
;
402 ceph_assert(it
->first
== it
->second
->start());
404 if (it
->first
< offset
) {
405 lderr(oc
->cct
) << "AUDIT FAILURE: " << it
->first
<< " " << *it
->second
406 << " overlaps with previous bh " << *((--it
)->second
)
408 ceph_assert(it
->first
>= offset
);
410 BufferHead
*bh
= it
->second
;
411 for (auto w_it
= bh
->waitfor_read
.begin();
412 w_it
!= bh
->waitfor_read
.end(); ++w_it
) {
413 if (w_it
->first
< bh
->start() ||
414 w_it
->first
>= bh
->start() + bh
->length()) {
415 lderr(oc
->cct
) << "AUDIT FAILURE: waiter at " << w_it
->first
416 << " is not within bh " << *bh
<< dendl
;
417 ceph_assert(w_it
->first
>= bh
->start());
418 ceph_assert(w_it
->first
< bh
->start() + bh
->length());
421 offset
= it
->first
+ it
->second
->length();
426 * map a range of extents on an object's buffer cache.
427 * - combine any bh's we're writing into one
428 * - break up bufferheads that don't fall completely within the range
429 * //no! - return a bh that includes the write. may also include
430 * other dirty data to left and/or right.
432 ObjectCacher::BufferHead
*ObjectCacher::Object::map_write(ObjectExtent
&ex
,
435 ceph_assert(ceph_mutex_is_locked(oc
->lock
));
436 BufferHead
*final
= 0;
438 ldout(oc
->cct
, 10) << "map_write oex " << ex
.oid
439 << " " << ex
.offset
<< "~" << ex
.length
<< dendl
;
441 loff_t cur
= ex
.offset
;
442 loff_t left
= ex
.length
;
444 auto p
= data_lower_bound(ex
.offset
);
449 if (p
== data
.end()) {
451 final
= new BufferHead(this);
452 replace_journal_tid(final
, tid
);
453 final
->set_start( cur
);
454 final
->set_length( max
);
455 oc
->bh_add(this, final
);
456 ldout(oc
->cct
, 10) << "map_write adding trailing bh " << *final
<< dendl
;
458 oc
->bh_stat_sub(final
);
459 final
->set_length(final
->length() + max
);
460 oc
->bh_stat_add(final
);
467 ldout(oc
->cct
, 10) << "cur is " << cur
<< ", p is " << *p
->second
<< dendl
;
468 //oc->verify_stats();
470 if (p
->first
<= cur
) {
471 BufferHead
*bh
= p
->second
;
472 ldout(oc
->cct
, 10) << "map_write bh " << *bh
<< " intersected" << dendl
;
474 if (p
->first
< cur
) {
475 ceph_assert(final
== 0);
476 if (cur
+ max
>= bh
->end()) {
477 // we want right bit (one splice)
478 final
= split(bh
, cur
); // just split it, take right half.
479 maybe_rebuild_buffer(bh
);
480 replace_journal_tid(final
, tid
);
482 ceph_assert(p
->second
== final
);
484 // we want middle bit (two splices)
485 final
= split(bh
, cur
);
486 maybe_rebuild_buffer(bh
);
488 ceph_assert(p
->second
== final
);
489 auto right
= split(final
, cur
+max
);
490 maybe_rebuild_buffer(right
);
491 replace_journal_tid(final
, tid
);
494 ceph_assert(p
->first
== cur
);
495 if (bh
->length() <= max
) {
496 // whole bufferhead, piece of cake.
498 // we want left bit (one splice)
499 auto right
= split(bh
, cur
+ max
); // just split
500 maybe_rebuild_buffer(right
);
504 oc
->mark_dirty(final
);
505 --p
; // move iterator back to final
506 ceph_assert(p
->second
== final
);
507 replace_journal_tid(bh
, tid
);
508 merge_left(final
, bh
);
511 replace_journal_tid(final
, tid
);
516 loff_t lenfromcur
= final
->end() - cur
;
523 loff_t next
= p
->first
;
524 loff_t glen
= std::min(next
- cur
, max
);
525 ldout(oc
->cct
, 10) << "map_write gap " << cur
<< "~" << glen
<< dendl
;
527 oc
->bh_stat_sub(final
);
528 final
->set_length(final
->length() + glen
);
529 oc
->bh_stat_add(final
);
531 final
= new BufferHead(this);
532 replace_journal_tid(final
, tid
);
533 final
->set_start( cur
);
534 final
->set_length( glen
);
535 oc
->bh_add(this, final
);
546 ceph_assert(final
->get_journal_tid() == tid
);
547 ldout(oc
->cct
, 10) << "map_write final is " << *final
<< dendl
;
552 void ObjectCacher::Object::replace_journal_tid(BufferHead
*bh
,
554 ceph_tid_t bh_tid
= bh
->get_journal_tid();
556 ceph_assert(tid
== 0 || bh_tid
<= tid
);
557 if (bh_tid
!= 0 && bh_tid
!= tid
) {
558 // inform journal that it should not expect a writeback from this extent
559 oc
->writeback_handler
.overwrite_extent(get_oid(), bh
->start(),
560 bh
->length(), bh_tid
, tid
);
562 bh
->set_journal_tid(tid
);
565 void ObjectCacher::Object::truncate(loff_t s
)
567 ceph_assert(ceph_mutex_is_locked(oc
->lock
));
568 ldout(oc
->cct
, 10) << "truncate " << *this << " to " << s
<< dendl
;
570 std::list
<Context
*> waiting_for_read
;
571 while (!data
.empty()) {
572 BufferHead
*bh
= data
.rbegin()->second
;
576 // split bh at truncation point?
577 if (bh
->start() < s
) {
579 maybe_rebuild_buffer(bh
);
583 // remove bh entirely
584 ceph_assert(bh
->start() >= s
);
585 for ([[maybe_unused
]] auto& [off
, ctxs
] : bh
->waitfor_read
) {
586 waiting_for_read
.splice(waiting_for_read
.end(), ctxs
);
588 bh
->waitfor_read
.clear();
589 replace_journal_tid(bh
, 0);
590 oc
->bh_remove(this, bh
);
593 if (!waiting_for_read
.empty()) {
594 ldout(oc
->cct
, 10) << "restarting reads post-truncate" << dendl
;
596 finish_contexts(oc
->cct
, waiting_for_read
, 0);
599 void ObjectCacher::Object::discard(loff_t off
, loff_t len
,
600 C_GatherBuilder
* commit_gather
)
602 ceph_assert(ceph_mutex_is_locked(oc
->lock
));
603 ldout(oc
->cct
, 10) << "discard " << *this << " " << off
<< "~" << len
607 ldout(oc
->cct
, 10) << " setting exists on " << *this << dendl
;
611 ldout(oc
->cct
, 10) << " clearing complete on " << *this << dendl
;
615 std::list
<Context
*> waiting_for_read
;
616 auto p
= data_lower_bound(off
);
617 while (p
!= data
.end()) {
618 BufferHead
*bh
= p
->second
;
619 if (bh
->start() >= off
+ len
)
622 // split bh at truncation point?
623 if (bh
->start() < off
) {
625 maybe_rebuild_buffer(bh
);
630 ceph_assert(bh
->start() >= off
);
631 if (bh
->end() > off
+ len
) {
632 auto right
= split(bh
, off
+ len
);
633 maybe_rebuild_buffer(right
);
637 ldout(oc
->cct
, 10) << "discard " << *this << " bh " << *bh
<< dendl
;
638 replace_journal_tid(bh
, 0);
640 if (bh
->is_tx() && commit_gather
!= nullptr) {
641 // wait for the writeback to commit
642 waitfor_commit
[bh
->last_write_tid
].emplace_back(commit_gather
->new_sub());
643 } else if (bh
->is_rx()) {
644 // cannot remove bh with in-flight read, but we can ensure the
645 // read won't overwrite the discard
646 bh
->last_read_tid
= ++oc
->last_read_tid
;
648 bh
->set_nocache(true);
650 // we should mark all Rx bh to zero
653 for ([[maybe_unused
]] auto& [off
, ctxs
] : bh
->waitfor_read
) {
654 waiting_for_read
.splice(waiting_for_read
.end(), ctxs
);
656 bh
->waitfor_read
.clear();
659 oc
->bh_remove(this, bh
);
662 if (!waiting_for_read
.empty()) {
663 ldout(oc
->cct
, 10) << "restarting reads post-discard" << dendl
;
665 finish_contexts(oc
->cct
, waiting_for_read
, 0); /* restart reads */
670 /*** ObjectCacher ***/
673 #define dout_prefix *_dout << "objectcacher "
676 ObjectCacher::ObjectCacher(CephContext
*cct_
, string name
,
677 WritebackHandler
& wb
, ceph::mutex
& l
,
678 flush_set_callback_t flush_callback
,
679 void *flush_callback_arg
, uint64_t max_bytes
,
680 uint64_t max_objects
, uint64_t max_dirty
,
681 uint64_t target_dirty
, double max_dirty_age
,
682 bool block_writes_upfront
)
684 cct(cct_
), writeback_handler(wb
), name(name
), lock(l
),
685 max_dirty(max_dirty
), target_dirty(target_dirty
),
686 max_size(max_bytes
), max_objects(max_objects
),
687 max_dirty_age(ceph::make_timespan(max_dirty_age
)),
688 block_writes_upfront(block_writes_upfront
),
689 trace_endpoint("ObjectCacher"),
690 flush_set_callback(flush_callback
),
691 flush_set_callback_arg(flush_callback_arg
),
692 last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct
),
693 stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
694 stat_missing(0), stat_error(0), stat_dirty_waiting(0),
695 stat_nr_dirty_waiters(0), reads_outstanding(0)
699 scattered_write
= writeback_handler
.can_scattered_write();
702 ObjectCacher::~ObjectCacher()
706 // we should be empty.
707 for (auto i
= objects
.begin(); i
!= objects
.end(); ++i
)
708 ceph_assert(i
->empty());
709 ceph_assert(bh_lru_rest
.lru_get_size() == 0);
710 ceph_assert(bh_lru_dirty
.lru_get_size() == 0);
711 ceph_assert(ob_lru
.lru_get_size() == 0);
712 ceph_assert(dirty_or_tx_bh
.empty());
715 void ObjectCacher::perf_start()
717 string n
= "objectcacher-" + name
;
718 PerfCountersBuilder
plb(cct
, n
, l_objectcacher_first
, l_objectcacher_last
);
720 plb
.add_u64_counter(l_objectcacher_cache_ops_hit
,
721 "cache_ops_hit", "Hit operations");
722 plb
.add_u64_counter(l_objectcacher_cache_ops_miss
,
723 "cache_ops_miss", "Miss operations");
724 plb
.add_u64_counter(l_objectcacher_cache_bytes_hit
,
725 "cache_bytes_hit", "Hit data", NULL
, 0, unit_t(UNIT_BYTES
));
726 plb
.add_u64_counter(l_objectcacher_cache_bytes_miss
,
727 "cache_bytes_miss", "Miss data", NULL
, 0, unit_t(UNIT_BYTES
));
728 plb
.add_u64_counter(l_objectcacher_data_read
,
729 "data_read", "Read data");
730 plb
.add_u64_counter(l_objectcacher_data_written
,
731 "data_written", "Data written to cache");
732 plb
.add_u64_counter(l_objectcacher_data_flushed
,
733 "data_flushed", "Data flushed");
734 plb
.add_u64_counter(l_objectcacher_overwritten_in_flush
,
735 "data_overwritten_while_flushing",
736 "Data overwritten while flushing");
737 plb
.add_u64_counter(l_objectcacher_write_ops_blocked
, "write_ops_blocked",
738 "Write operations, delayed due to dirty limits");
739 plb
.add_u64_counter(l_objectcacher_write_bytes_blocked
,
740 "write_bytes_blocked",
741 "Write data blocked on dirty limit", NULL
, 0, unit_t(UNIT_BYTES
));
742 plb
.add_time(l_objectcacher_write_time_blocked
, "write_time_blocked",
743 "Time spent blocking a write due to dirty limits");
745 perfcounter
= plb
.create_perf_counters();
746 cct
->get_perfcounters_collection()->add(perfcounter
);
749 void ObjectCacher::perf_stop()
751 ceph_assert(perfcounter
);
752 cct
->get_perfcounters_collection()->remove(perfcounter
);
757 ObjectCacher::Object
*ObjectCacher::get_object(sobject_t oid
,
761 uint64_t truncate_size
,
762 uint64_t truncate_seq
)
764 // XXX: Add handling of nspace in object_locator_t in cache
765 ceph_assert(ceph_mutex_is_locked(lock
));
767 if ((uint32_t)l
.pool
< objects
.size()) {
768 if (objects
[l
.pool
].count(oid
)) {
769 Object
*o
= objects
[l
.pool
][oid
];
770 o
->object_no
= object_no
;
771 o
->truncate_size
= truncate_size
;
772 o
->truncate_seq
= truncate_seq
;
776 objects
.resize(l
.pool
+1);
780 Object
*o
= new Object(this, oid
, object_no
, oset
, l
, truncate_size
,
782 objects
[l
.pool
][oid
] = o
;
783 ob_lru
.lru_insert_top(o
);
787 void ObjectCacher::close_object(Object
*ob
)
789 ceph_assert(ceph_mutex_is_locked(lock
));
790 ldout(cct
, 10) << "close_object " << *ob
<< dendl
;
791 ceph_assert(ob
->can_close());
794 ob_lru
.lru_remove(ob
);
795 objects
[ob
->oloc
.pool
].erase(ob
->get_soid());
796 ob
->set_item
.remove_myself();
800 void ObjectCacher::bh_read(BufferHead
*bh
, int op_flags
,
801 const ZTracer::Trace
&parent_trace
)
803 ceph_assert(ceph_mutex_is_locked(lock
));
804 ldout(cct
, 7) << "bh_read on " << *bh
<< " outstanding reads "
805 << reads_outstanding
<< dendl
;
807 ZTracer::Trace trace
;
808 if (parent_trace
.valid()) {
809 trace
.init("", &trace_endpoint
, &parent_trace
);
810 trace
.copy_name("bh_read " + bh
->ob
->get_oid().name
);
811 trace
.event("start");
815 bh
->last_read_tid
= ++last_read_tid
;
818 C_ReadFinish
*onfinish
= new C_ReadFinish(this, bh
->ob
, bh
->last_read_tid
,
819 bh
->start(), bh
->length(), trace
);
821 writeback_handler
.read(bh
->ob
->get_oid(), bh
->ob
->get_object_number(),
822 bh
->ob
->get_oloc(), bh
->start(), bh
->length(),
823 bh
->ob
->get_snap(), &onfinish
->bl
,
824 bh
->ob
->truncate_size
, bh
->ob
->truncate_seq
,
825 op_flags
, trace
, onfinish
);
830 void ObjectCacher::bh_read_finish(int64_t poolid
, sobject_t oid
,
831 ceph_tid_t tid
, loff_t start
,
832 uint64_t length
, bufferlist
&bl
, int r
,
835 ceph_assert(ceph_mutex_is_locked(lock
));
836 ldout(cct
, 7) << "bh_read_finish "
839 << " " << start
<< "~" << length
840 << " (bl is " << bl
.length() << ")"
842 << " outstanding reads " << reads_outstanding
845 if (r
>= 0 && bl
.length() < length
) {
846 ldout(cct
, 7) << "bh_read_finish " << oid
<< " padding " << start
<< "~"
847 << length
<< " with " << length
- bl
.length() << " bytes of zeroes"
849 bl
.append_zero(length
- bl
.length());
855 if (objects
[poolid
].count(oid
) == 0) {
856 ldout(cct
, 7) << "bh_read_finish no object cache" << dendl
;
858 Object
*ob
= objects
[poolid
][oid
];
860 if (r
== -ENOENT
&& !ob
->complete
) {
861 // wake up *all* rx waiters, or else we risk reordering
862 // identical reads. e.g.
864 // reply to unrelated 3~1 -> !exists
865 // read 1~1 -> immediate ENOENT
866 // reply to first 1~1 -> ooo ENOENT
868 for (auto p
= ob
->data
.begin(); p
!= ob
->data
.end(); ++p
) {
869 BufferHead
*bh
= p
->second
;
870 for (auto p
= bh
->waitfor_read
.begin();
871 p
!= bh
->waitfor_read
.end();
873 ls
.splice(ls
.end(), p
->second
);
874 bh
->waitfor_read
.clear();
875 if (!bh
->is_zero() && !bh
->is_rx())
879 // just pass through and retry all waiters if we don't trust
880 // -ENOENT for this read
883 << "bh_read_finish ENOENT, marking complete and !exists on " << *ob
888 /* If all the bhs are effectively zero, get rid of them. All
889 * the waiters will be retried and get -ENOENT immediately, so
890 * it's safe to clean up the unneeded bh's now. Since we know
891 * it's safe to remove them now, do so, so they aren't hanging
892 *around waiting for more -ENOENTs from rados while the cache
893 * is being shut down.
895 * Only do this when all the bhs are rx or clean, to match the
896 * condition in _readx(). If there are any non-rx or non-clean
897 * bhs, _readx() will wait for the final result instead of
898 * returning -ENOENT immediately.
902 << "bh_read_finish ENOENT and allzero, getting rid of "
903 << "bhs for " << *ob
<< dendl
;
904 auto p
= ob
->data
.begin();
905 while (p
!= ob
->data
.end()) {
906 BufferHead
*bh
= p
->second
;
907 // current iterator will be invalidated by bh_remove()
919 auto p
= ob
->data_lower_bound(opos
);
920 if (p
== ob
->data
.end())
922 if (opos
>= start
+(loff_t
)length
) {
923 ldout(cct
, 20) << "break due to opos " << opos
<< " >= start+length "
924 << start
<< "+" << length
<< "=" << start
+(loff_t
)length
929 BufferHead
*bh
= p
->second
;
930 ldout(cct
, 20) << "checking bh " << *bh
<< dendl
;
933 for (auto it
= bh
->waitfor_read
.begin();
934 it
!= bh
->waitfor_read
.end();
936 ls
.splice(ls
.end(), it
->second
);
937 bh
->waitfor_read
.clear();
939 if (bh
->start() > opos
) {
940 ldout(cct
, 1) << "bh_read_finish skipping gap "
941 << opos
<< "~" << bh
->start() - opos
948 ldout(cct
, 10) << "bh_read_finish skipping non-rx " << *bh
<< dendl
;
953 if (bh
->last_read_tid
!= tid
) {
954 ldout(cct
, 10) << "bh_read_finish bh->last_read_tid "
955 << bh
->last_read_tid
<< " != tid " << tid
956 << ", skipping" << dendl
;
961 ceph_assert(opos
>= bh
->start());
962 ceph_assert(bh
->start() == opos
); // we don't merge rx bh's... yet!
963 ceph_assert(bh
->length() <= start
+(loff_t
)length
-opos
);
972 ldout(cct
, 10) << "bh_read_finish removing " << *bh
<< dendl
;
976 ldout(cct
, 10) << "skipping unstrusted -ENOENT and will retry for "
992 ldout(cct
, 10) << "bh_read_finish read " << *bh
<< dendl
;
994 ob
->try_merge_bh(bh
);
998 // called with lock held.
999 ldout(cct
, 20) << "finishing waiters " << ls
<< dendl
;
1001 finish_contexts(cct
, ls
, err
);
1002 retry_waiting_reads();
1004 --reads_outstanding
;
1005 read_cond
.notify_all();
1008 void ObjectCacher::bh_write_adjacencies(BufferHead
*bh
, ceph::real_time cutoff
,
1009 int64_t *max_amount
, int *max_count
)
1011 list
<BufferHead
*> blist
;
1014 int64_t total_len
= 0;
1015 set
<BufferHead
*, BufferHead::ptr_lt
>::iterator it
= dirty_or_tx_bh
.find(bh
);
1016 ceph_assert(it
!= dirty_or_tx_bh
.end());
1017 for (set
<BufferHead
*, BufferHead::ptr_lt
>::iterator p
= it
;
1018 p
!= dirty_or_tx_bh
.end();
1020 BufferHead
*obh
= *p
;
1021 if (obh
->ob
!= bh
->ob
)
1023 if (obh
->is_dirty() && obh
->last_write
<= cutoff
) {
1024 blist
.push_back(obh
);
1026 total_len
+= obh
->length();
1027 if ((max_count
&& count
> *max_count
) ||
1028 (max_amount
&& total_len
> *max_amount
))
1033 while (it
!= dirty_or_tx_bh
.begin()) {
1035 BufferHead
*obh
= *it
;
1036 if (obh
->ob
!= bh
->ob
)
1038 if (obh
->is_dirty() && obh
->last_write
<= cutoff
) {
1039 blist
.push_front(obh
);
1041 total_len
+= obh
->length();
1042 if ((max_count
&& count
> *max_count
) ||
1043 (max_amount
&& total_len
> *max_amount
))
1048 *max_count
-= count
;
1050 *max_amount
-= total_len
;
1052 bh_write_scattered(blist
);
1055 class ObjectCacher::C_WriteCommit
: public Context
{
1059 vector
<pair
<loff_t
, uint64_t> > ranges
;
1060 ZTracer::Trace trace
;
1063 C_WriteCommit(ObjectCacher
*c
, int64_t _poolid
, sobject_t o
, loff_t s
,
1064 uint64_t l
, const ZTracer::Trace
&trace
) :
1065 oc(c
), poolid(_poolid
), oid(o
), trace(trace
) {
1066 ranges
.push_back(make_pair(s
, l
));
1068 C_WriteCommit(ObjectCacher
*c
, int64_t _poolid
, sobject_t o
,
1069 vector
<pair
<loff_t
, uint64_t> >& _ranges
) :
1070 oc(c
), poolid(_poolid
), oid(o
), tid(0) {
1071 ranges
.swap(_ranges
);
1073 void finish(int r
) override
{
1074 oc
->bh_write_commit(poolid
, oid
, ranges
, tid
, r
);
1075 trace
.event("finish");
1078 void ObjectCacher::bh_write_scattered(list
<BufferHead
*>& blist
)
1080 ceph_assert(ceph_mutex_is_locked(lock
));
1082 Object
*ob
= blist
.front()->ob
;
1085 ceph::real_time last_write
;
1087 vector
<pair
<loff_t
, uint64_t> > ranges
;
1088 vector
<pair
<uint64_t, bufferlist
> > io_vec
;
1090 ranges
.reserve(blist
.size());
1091 io_vec
.reserve(blist
.size());
1093 uint64_t total_len
= 0;
1094 for (list
<BufferHead
*>::iterator p
= blist
.begin(); p
!= blist
.end(); ++p
) {
1095 BufferHead
*bh
= *p
;
1096 ldout(cct
, 7) << "bh_write_scattered " << *bh
<< dendl
;
1097 ceph_assert(bh
->ob
== ob
);
1098 ceph_assert(bh
->bl
.length() == bh
->length());
1099 ranges
.push_back(pair
<loff_t
, uint64_t>(bh
->start(), bh
->length()));
1101 int n
= io_vec
.size();
1102 io_vec
.resize(n
+ 1);
1103 io_vec
[n
].first
= bh
->start();
1104 io_vec
[n
].second
= bh
->bl
;
1106 total_len
+= bh
->length();
1107 if (bh
->snapc
.seq
> snapc
.seq
)
1109 if (bh
->last_write
> last_write
)
1110 last_write
= bh
->last_write
;
1113 C_WriteCommit
*oncommit
= new C_WriteCommit(this, ob
->oloc
.pool
, ob
->get_soid(), ranges
);
1115 ceph_tid_t tid
= writeback_handler
.write(ob
->get_oid(), ob
->get_oloc(),
1116 io_vec
, snapc
, last_write
,
1117 ob
->truncate_size
, ob
->truncate_seq
,
1119 oncommit
->tid
= tid
;
1120 ob
->last_write_tid
= tid
;
1121 for (list
<BufferHead
*>::iterator p
= blist
.begin(); p
!= blist
.end(); ++p
) {
1122 BufferHead
*bh
= *p
;
1123 bh
->last_write_tid
= tid
;
1128 perfcounter
->inc(l_objectcacher_data_flushed
, total_len
);
1131 void ObjectCacher::bh_write(BufferHead
*bh
, const ZTracer::Trace
&parent_trace
)
1133 ceph_assert(ceph_mutex_is_locked(lock
));
1134 ldout(cct
, 7) << "bh_write " << *bh
<< dendl
;
1138 ZTracer::Trace trace
;
1139 if (parent_trace
.valid()) {
1140 trace
.init("", &trace_endpoint
, &parent_trace
);
1141 trace
.copy_name("bh_write " + bh
->ob
->get_oid().name
);
1142 trace
.event("start");
1146 C_WriteCommit
*oncommit
= new C_WriteCommit(this, bh
->ob
->oloc
.pool
,
1147 bh
->ob
->get_soid(), bh
->start(),
1148 bh
->length(), trace
);
1150 ceph_tid_t tid
= writeback_handler
.write(bh
->ob
->get_oid(),
1152 bh
->start(), bh
->length(),
1153 bh
->snapc
, bh
->bl
, bh
->last_write
,
1154 bh
->ob
->truncate_size
,
1155 bh
->ob
->truncate_seq
,
1156 bh
->journal_tid
, trace
, oncommit
);
1157 ldout(cct
, 20) << " tid " << tid
<< " on " << bh
->ob
->get_oid() << dendl
;
1159 // set bh last_write_tid
1160 oncommit
->tid
= tid
;
1161 bh
->ob
->last_write_tid
= tid
;
1162 bh
->last_write_tid
= tid
;
1165 perfcounter
->inc(l_objectcacher_data_flushed
, bh
->length());
1171 void ObjectCacher::bh_write_commit(int64_t poolid
, sobject_t oid
,
1172 vector
<pair
<loff_t
, uint64_t> >& ranges
,
1173 ceph_tid_t tid
, int r
)
1175 ceph_assert(ceph_mutex_is_locked(lock
));
1176 ldout(cct
, 7) << "bh_write_commit " << oid
<< " tid " << tid
1177 << " ranges " << ranges
<< " returned " << r
<< dendl
;
1179 if (objects
[poolid
].count(oid
) == 0) {
1180 ldout(cct
, 7) << "bh_write_commit no object cache" << dendl
;
1184 Object
*ob
= objects
[poolid
][oid
];
1185 int was_dirty_or_tx
= ob
->oset
->dirty_or_tx
;
1187 for (vector
<pair
<loff_t
, uint64_t> >::iterator p
= ranges
.begin();
1190 loff_t start
= p
->first
;
1191 uint64_t length
= p
->second
;
1193 ldout(cct
, 10) << "bh_write_commit marking exists on " << *ob
<< dendl
;
1196 if (writeback_handler
.may_copy_on_write(ob
->get_oid(), start
, length
,
1198 ldout(cct
, 10) << "bh_write_commit may copy on write, clearing "
1199 "complete on " << *ob
<< dendl
;
1200 ob
->complete
= false;
1204 vector
<pair
<loff_t
, BufferHead
*>> hit
;
1206 for (map
<loff_t
, BufferHead
*>::const_iterator p
= ob
->data_lower_bound(start
);
1207 p
!= ob
->data
.end();
1209 BufferHead
*bh
= p
->second
;
1211 if (bh
->start() >= start
+(loff_t
)length
)
1214 // make sure bh is tx
1216 ldout(cct
, 10) << "bh_write_commit skipping non-tx " << *bh
<< dendl
;
1220 // make sure bh tid matches
1221 if (bh
->last_write_tid
!= tid
) {
1222 ceph_assert(bh
->last_write_tid
> tid
);
1223 ldout(cct
, 10) << "bh_write_commit newer tid on " << *bh
<< dendl
;
1227 // we don't merge tx buffers. tx buffer should be within the range
1228 ceph_assert(bh
->start() >= start
);
1229 ceph_assert(bh
->end() <= start
+(loff_t
)length
);
1232 // ok! mark bh clean and error-free
1234 bh
->set_journal_tid(0);
1235 if (bh
->get_nocache())
1236 bh_lru_rest
.lru_bottouch(bh
);
1237 hit
.push_back(make_pair(bh
->start(), bh
));
1238 ldout(cct
, 10) << "bh_write_commit clean " << *bh
<< dendl
;
1241 ldout(cct
, 10) << "bh_write_commit marking dirty again due to error "
1242 << *bh
<< " r = " << r
<< " " << cpp_strerror(-r
)
1247 for (auto& p
: hit
) {
1248 //p.second maybe merged and deleted in merge_left
1249 if (ob
->data
.count(p
.first
))
1250 ob
->try_merge_bh(p
.second
);
1254 // update last_commit.
1255 ceph_assert(ob
->last_commit_tid
< tid
);
1256 ob
->last_commit_tid
= tid
;
1260 if (ob
->waitfor_commit
.count(tid
)) {
1261 ls
.splice(ls
.begin(), ob
->waitfor_commit
[tid
]);
1262 ob
->waitfor_commit
.erase(tid
);
1265 // is the entire object set now clean and fully committed?
1266 ObjectSet
*oset
= ob
->oset
;
1269 if (flush_set_callback
&&
1270 was_dirty_or_tx
> 0 &&
1271 oset
->dirty_or_tx
== 0) { // nothing dirty/tx
1272 flush_set_callback(flush_set_callback_arg
, oset
);
1276 finish_contexts(cct
, ls
, r
);
1279 void ObjectCacher::flush(ZTracer::Trace
*trace
, loff_t amount
)
1281 ceph_assert(trace
!= nullptr);
1282 ceph_assert(ceph_mutex_is_locked(lock
));
1283 ceph::real_time cutoff
= ceph::real_clock::now();
1285 ldout(cct
, 10) << "flush " << amount
<< dendl
;
1288 * NOTE: we aren't actually pulling things off the LRU here, just
1289 * looking at the tail item. Then we call bh_write, which moves it
1290 * to the other LRU, so that we can call
1291 * lru_dirty.lru_get_next_expire() again.
1293 int64_t left
= amount
;
1294 while (amount
== 0 || left
> 0) {
1295 BufferHead
*bh
= static_cast<BufferHead
*>(
1296 bh_lru_dirty
.lru_get_next_expire());
1298 if (bh
->last_write
> cutoff
) break;
1300 if (scattered_write
) {
1301 bh_write_adjacencies(bh
, cutoff
, amount
> 0 ? &left
: NULL
, NULL
);
1303 left
-= bh
->length();
1304 bh_write(bh
, *trace
);
1310 void ObjectCacher::trim()
1312 ceph_assert(ceph_mutex_is_locked(lock
));
1313 ldout(cct
, 10) << "trim start: bytes: max " << max_size
<< " clean "
1314 << get_stat_clean() << ", objects: max " << max_objects
1315 << " current " << ob_lru
.lru_get_size() << dendl
;
1317 uint64_t max_clean_bh
= max_size
>> BUFFER_MEMORY_WEIGHT
;
1318 uint64_t nr_clean_bh
= bh_lru_rest
.lru_get_size() - bh_lru_rest
.lru_get_num_pinned();
1319 while (get_stat_clean() > 0 &&
1320 ((uint64_t)get_stat_clean() > max_size
||
1321 nr_clean_bh
> max_clean_bh
)) {
1322 BufferHead
*bh
= static_cast<BufferHead
*>(bh_lru_rest
.lru_expire());
1326 ldout(cct
, 10) << "trim trimming " << *bh
<< dendl
;
1327 ceph_assert(bh
->is_clean() || bh
->is_zero() || bh
->is_error());
1329 Object
*ob
= bh
->ob
;
1336 ldout(cct
, 10) << "trim clearing complete on " << *ob
<< dendl
;
1337 ob
->complete
= false;
1341 while (ob_lru
.lru_get_size() > max_objects
) {
1342 Object
*ob
= static_cast<Object
*>(ob_lru
.lru_expire());
1346 ldout(cct
, 10) << "trim trimming " << *ob
<< dendl
;
1350 ldout(cct
, 10) << "trim finish: max " << max_size
<< " clean "
1351 << get_stat_clean() << ", objects: max " << max_objects
1352 << " current " << ob_lru
.lru_get_size() << dendl
;
1359 bool ObjectCacher::is_cached(ObjectSet
*oset
, vector
<ObjectExtent
>& extents
,
1362 ceph_assert(ceph_mutex_is_locked(lock
));
1363 for (vector
<ObjectExtent
>::iterator ex_it
= extents
.begin();
1364 ex_it
!= extents
.end();
1366 ldout(cct
, 10) << "is_cached " << *ex_it
<< dendl
;
1369 sobject_t
soid(ex_it
->oid
, snapid
);
1370 Object
*o
= get_object_maybe(soid
, ex_it
->oloc
);
1373 if (!o
->is_cached(ex_it
->offset
, ex_it
->length
))
1381 * returns # bytes read (if in cache). onfinish is untouched (caller
1383 * returns 0 if doing async read
1385 int ObjectCacher::readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
,
1386 ZTracer::Trace
*parent_trace
)
1388 ZTracer::Trace trace
;
1389 if (parent_trace
!= nullptr) {
1390 trace
.init("read", &trace_endpoint
, parent_trace
);
1391 trace
.event("start");
1394 int r
=_readx(rd
, oset
, onfinish
, true, &trace
);
1396 trace
.event("finish");
1401 int ObjectCacher::_readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
,
1402 bool external_call
, ZTracer::Trace
*trace
)
1404 ceph_assert(trace
!= nullptr);
1405 ceph_assert(ceph_mutex_is_locked(lock
));
1406 bool success
= true;
1408 uint64_t bytes_in_cache
= 0;
1409 uint64_t bytes_not_in_cache
= 0;
1410 uint64_t total_bytes_read
= 0;
1411 map
<uint64_t, bufferlist
> stripe_map
; // final buffer offset -> substring
1412 bool dontneed
= rd
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_DONTNEED
;
1413 bool nocache
= rd
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_NOCACHE
;
1416 * WARNING: we can only meaningfully return ENOENT if the read request
1417 * passed in a single ObjectExtent. Any caller who wants ENOENT instead of
1418 * zeroed buffers needs to feed single extents into readx().
1420 ceph_assert(!oset
->return_enoent
|| rd
->extents
.size() == 1);
1422 for (vector
<ObjectExtent
>::iterator ex_it
= rd
->extents
.begin();
1423 ex_it
!= rd
->extents
.end();
1425 ldout(cct
, 10) << "readx " << *ex_it
<< dendl
;
1427 total_bytes_read
+= ex_it
->length
;
1430 sobject_t
soid(ex_it
->oid
, rd
->snap
);
1431 Object
*o
= get_object(soid
, ex_it
->objectno
, oset
, ex_it
->oloc
,
1432 ex_it
->truncate_size
, oset
->truncate_seq
);
1436 // does not exist and no hits?
1437 if (oset
->return_enoent
&& !o
->exists
) {
1438 ldout(cct
, 10) << "readx object !exists, 1 extent..." << dendl
;
1440 // should we worry about COW underneath us?
1441 if (writeback_handler
.may_copy_on_write(soid
.oid
, ex_it
->offset
,
1442 ex_it
->length
, soid
.snap
)) {
1443 ldout(cct
, 20) << "readx may copy on write" << dendl
;
1445 list
<BufferHead
*> blist
;
1446 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= o
->data
.begin();
1447 bh_it
!= o
->data
.end();
1449 BufferHead
*bh
= bh_it
->second
;
1450 if (bh
->is_dirty() || bh
->is_tx()) {
1451 ldout(cct
, 10) << "readx flushing " << *bh
<< dendl
;
1453 if (bh
->is_dirty()) {
1454 if (scattered_write
)
1455 blist
.push_back(bh
);
1457 bh_write(bh
, *trace
);
1461 if (scattered_write
&& !blist
.empty())
1462 bh_write_scattered(blist
);
1464 ldout(cct
, 10) << "readx waiting on tid " << o
->last_write_tid
1465 << " on " << *o
<< dendl
;
1466 o
->waitfor_commit
[o
->last_write_tid
].push_back(
1467 new C_RetryRead(this,rd
, oset
, onfinish
, *trace
));
1468 // FIXME: perfcounter!
1473 // can we return ENOENT?
1474 bool allzero
= true;
1475 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= o
->data
.begin();
1476 bh_it
!= o
->data
.end();
1478 ldout(cct
, 20) << "readx ob has bh " << *bh_it
->second
<< dendl
;
1479 if (!bh_it
->second
->is_zero() && !bh_it
->second
->is_rx()) {
1485 ldout(cct
, 10) << "readx ob has all zero|rx, returning ENOENT"
1494 // map extent into bufferheads
1495 map
<loff_t
, BufferHead
*> hits
, missing
, rx
, errors
;
1496 o
->map_read(*ex_it
, hits
, missing
, rx
, errors
);
1497 if (external_call
) {
1498 // retry reading error buffers
1499 missing
.insert(errors
.begin(), errors
.end());
1501 // some reads had errors, fail later so completions
1502 // are cleaned up properly
1503 // TODO: make read path not call _readx for every completion
1504 hits
.insert(errors
.begin(), errors
.end());
1507 if (!missing
.empty() || !rx
.empty()) {
1509 map
<loff_t
, BufferHead
*>::iterator last
= missing
.end();
1510 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= missing
.begin();
1511 bh_it
!= missing
.end();
1513 uint64_t rx_bytes
= static_cast<uint64_t>(
1514 stat_rx
+ bh_it
->second
->length());
1515 bytes_not_in_cache
+= bh_it
->second
->length();
1516 if (!waitfor_read
.empty() || (stat_rx
> 0 && rx_bytes
> max_size
)) {
1517 // cache is full with concurrent reads -- wait for rx's to complete
1518 // to constrain memory growth (especially during copy-ups)
1520 ldout(cct
, 10) << "readx missed, waiting on cache to complete "
1521 << waitfor_read
.size() << " blocked reads, "
1522 << (std::max(rx_bytes
, max_size
) - max_size
)
1523 << " read bytes" << dendl
;
1524 waitfor_read
.push_back(new C_RetryRead(this, rd
, oset
, onfinish
,
1528 bh_remove(o
, bh_it
->second
);
1529 delete bh_it
->second
;
1531 bh_it
->second
->set_nocache(nocache
);
1532 bh_read(bh_it
->second
, rd
->fadvise_flags
, *trace
);
1533 if ((success
&& onfinish
) || last
!= missing
.end())
1539 //add wait in last bh avoid wakeup early. Because read is order
1540 if (last
!= missing
.end()) {
1541 ldout(cct
, 10) << "readx missed, waiting on " << *last
->second
1542 << " off " << last
->first
<< dendl
;
1543 last
->second
->waitfor_read
[last
->first
].push_back(
1544 new C_RetryRead(this, rd
, oset
, onfinish
, *trace
) );
1549 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= rx
.begin();
1552 touch_bh(bh_it
->second
); // bump in lru, so we don't lose it.
1553 if (success
&& onfinish
) {
1554 ldout(cct
, 10) << "readx missed, waiting on " << *bh_it
->second
1555 << " off " << bh_it
->first
<< dendl
;
1556 bh_it
->second
->waitfor_read
[bh_it
->first
].push_back(
1557 new C_RetryRead(this, rd
, oset
, onfinish
, *trace
) );
1559 bytes_not_in_cache
+= bh_it
->second
->length();
1563 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= hits
.begin();
1564 bh_it
!= hits
.end(); ++bh_it
)
1565 //bump in lru, so we don't lose it when later read
1566 touch_bh(bh_it
->second
);
1569 ceph_assert(!hits
.empty());
1571 // make a plain list
1572 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= hits
.begin();
1573 bh_it
!= hits
.end();
1575 BufferHead
*bh
= bh_it
->second
;
1576 ldout(cct
, 10) << "readx hit bh " << *bh
<< dendl
;
1577 if (bh
->is_error() && bh
->error
)
1579 bytes_in_cache
+= bh
->length();
1581 if (bh
->get_nocache() && bh
->is_clean())
1582 bh_lru_rest
.lru_bottouch(bh
);
1585 //must be after touch_bh because touch_bh set dontneed false
1587 ((loff_t
)ex_it
->offset
<= bh
->start() &&
1588 (bh
->end() <=(loff_t
)(ex_it
->offset
+ ex_it
->length
)))) {
1589 bh
->set_dontneed(true); //if dirty
1591 bh_lru_rest
.lru_bottouch(bh
);
1596 // create reverse map of buffer offset -> object for the
1597 // eventual result. this is over a single ObjectExtent, so we
1599 // - the bh's are contiguous
1600 // - the buffer frags need not be (and almost certainly aren't)
1601 loff_t opos
= ex_it
->offset
;
1602 map
<loff_t
, BufferHead
*>::iterator bh_it
= hits
.begin();
1603 ceph_assert(bh_it
->second
->start() <= opos
);
1604 uint64_t bhoff
= opos
- bh_it
->second
->start();
1605 vector
<pair
<uint64_t,uint64_t> >::iterator f_it
1606 = ex_it
->buffer_extents
.begin();
1609 BufferHead
*bh
= bh_it
->second
;
1610 ceph_assert(opos
== (loff_t
)(bh
->start() + bhoff
));
1612 uint64_t len
= std::min(f_it
->second
- foff
, bh
->length() - bhoff
);
1613 ldout(cct
, 10) << "readx rmap opos " << opos
<< ": " << *bh
<< " +"
1614 << bhoff
<< " frag " << f_it
->first
<< "~"
1615 << f_it
->second
<< " +" << foff
<< "~" << len
1619 // put substr here first, since substr_of clobbers, and we
1620 // may get multiple bh's at this stripe_map position
1621 if (bh
->is_zero()) {
1622 stripe_map
[f_it
->first
].append_zero(len
);
1624 bit
.substr_of(bh
->bl
,
1627 stripe_map
[f_it
->first
].claim_append(bit
);
1633 if (opos
== bh
->end()) {
1637 if (foff
== f_it
->second
) {
1641 if (bh_it
== hits
.end()) break;
1642 if (f_it
== ex_it
->buffer_extents
.end())
1645 ceph_assert(f_it
== ex_it
->buffer_extents
.end());
1646 ceph_assert(opos
== (loff_t
)ex_it
->offset
+ (loff_t
)ex_it
->length
);
1649 if (dontneed
&& o
->include_all_cached_data(ex_it
->offset
, ex_it
->length
))
1655 if (perfcounter
&& external_call
) {
1656 perfcounter
->inc(l_objectcacher_data_read
, total_bytes_read
);
1657 perfcounter
->inc(l_objectcacher_cache_bytes_miss
, bytes_not_in_cache
);
1658 perfcounter
->inc(l_objectcacher_cache_ops_miss
);
1661 ldout(cct
, 20) << "readx defer " << rd
<< dendl
;
1663 ldout(cct
, 20) << "readx drop " << rd
<< " (no complete, but no waiter)"
1669 if (perfcounter
&& external_call
) {
1670 perfcounter
->inc(l_objectcacher_data_read
, total_bytes_read
);
1671 perfcounter
->inc(l_objectcacher_cache_bytes_hit
, bytes_in_cache
);
1672 perfcounter
->inc(l_objectcacher_cache_ops_hit
);
1675 // no misses... success! do the read.
1676 ldout(cct
, 10) << "readx has all buffers" << dendl
;
1678 // ok, assemble into result buffer.
1680 if (rd
->bl
&& !error
) {
1682 for (map
<uint64_t,bufferlist
>::iterator i
= stripe_map
.begin();
1683 i
!= stripe_map
.end();
1685 ceph_assert(pos
== i
->first
);
1686 ldout(cct
, 10) << "readx adding buffer len " << i
->second
.length()
1687 << " at " << pos
<< dendl
;
1688 pos
+= i
->second
.length();
1689 rd
->bl
->claim_append(i
->second
);
1690 ceph_assert(rd
->bl
->length() == pos
);
1692 ldout(cct
, 10) << "readx result is " << rd
->bl
->length() << dendl
;
1693 } else if (!error
) {
1694 ldout(cct
, 10) << "readx no bufferlist ptr (readahead?), done." << dendl
;
1695 map
<uint64_t,bufferlist
>::reverse_iterator i
= stripe_map
.rbegin();
1696 pos
= i
->first
+ i
->second
.length();
1700 int ret
= error
? error
: pos
;
1701 ldout(cct
, 20) << "readx done " << rd
<< " " << ret
<< dendl
;
1702 ceph_assert(pos
<= (uint64_t) INT_MAX
);
1711 void ObjectCacher::retry_waiting_reads()
1714 ls
.swap(waitfor_read
);
1716 while (!ls
.empty() && waitfor_read
.empty()) {
1717 Context
*ctx
= ls
.front();
1721 waitfor_read
.splice(waitfor_read
.end(), ls
);
1724 int ObjectCacher::writex(OSDWrite
*wr
, ObjectSet
*oset
, Context
*onfreespace
,
1725 ZTracer::Trace
*parent_trace
)
1727 ceph_assert(ceph_mutex_is_locked(lock
));
1728 ceph::real_time now
= ceph::real_clock::now();
1729 uint64_t bytes_written
= 0;
1730 uint64_t bytes_written_in_flush
= 0;
1731 bool dontneed
= wr
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_DONTNEED
;
1732 bool nocache
= wr
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_NOCACHE
;
1734 ZTracer::Trace trace
;
1735 if (parent_trace
!= nullptr) {
1736 trace
.init("write", &trace_endpoint
, parent_trace
);
1737 trace
.event("start");
1740 list
<Context
*> wait_for_reads
;
1741 for (vector
<ObjectExtent
>::iterator ex_it
= wr
->extents
.begin();
1742 ex_it
!= wr
->extents
.end();
1745 sobject_t
soid(ex_it
->oid
, CEPH_NOSNAP
);
1746 Object
*o
= get_object(soid
, ex_it
->objectno
, oset
, ex_it
->oloc
,
1747 ex_it
->truncate_size
, oset
->truncate_seq
);
1749 // map it all into a single bufferhead.
1750 BufferHead
*bh
= o
->map_write(*ex_it
, wr
->journal_tid
);
1751 bool missing
= bh
->is_missing();
1752 bh
->snapc
= wr
->snapc
;
1754 // readers that need to be woken up due to an overwrite
1755 for (auto& [_
, wait_for_read
] : bh
->waitfor_read
) {
1756 wait_for_reads
.splice(wait_for_reads
.end(), wait_for_read
);
1758 bh
->waitfor_read
.clear();
1760 bytes_written
+= ex_it
->length
;
1762 bytes_written_in_flush
+= ex_it
->length
;
1765 // adjust buffer pointers (ie "copy" data into my cache)
1766 // this is over a single ObjectExtent, so we know that
1767 // - there is one contiguous bh
1768 // - the buffer frags need not be (and almost certainly aren't)
1769 // note: i assume striping is monotonic... no jumps backwards, ever!
1770 loff_t opos
= ex_it
->offset
;
1771 for (vector
<pair
<uint64_t, uint64_t> >::iterator f_it
1772 = ex_it
->buffer_extents
.begin();
1773 f_it
!= ex_it
->buffer_extents
.end();
1775 ldout(cct
, 10) << "writex writing " << f_it
->first
<< "~"
1776 << f_it
->second
<< " into " << *bh
<< " at " << opos
1778 uint64_t bhoff
= opos
- bh
->start();
1779 ceph_assert(f_it
->second
<= bh
->length() - bhoff
);
1781 // get the frag we're mapping in
1783 frag
.substr_of(wr
->bl
, f_it
->first
, f_it
->second
);
1785 // keep anything left of bhoff
1789 bh
->bl
.claim_append(frag
);
1791 opos
+= f_it
->second
;
1794 // ok, now bh is dirty.
1797 bh
->set_dontneed(true);
1798 else if (nocache
&& missing
)
1799 bh
->set_nocache(true);
1803 bh
->last_write
= now
;
1805 o
->try_merge_bh(bh
);
1809 perfcounter
->inc(l_objectcacher_data_written
, bytes_written
);
1810 if (bytes_written_in_flush
) {
1811 perfcounter
->inc(l_objectcacher_overwritten_in_flush
,
1812 bytes_written_in_flush
);
1816 int r
= _wait_for_write(wr
, bytes_written
, oset
, &trace
, onfreespace
);
1819 finish_contexts(cct
, wait_for_reads
, 0);
1826 class ObjectCacher::C_WaitForWrite
: public Context
{
1828 C_WaitForWrite(ObjectCacher
*oc
, uint64_t len
,
1829 const ZTracer::Trace
&trace
, Context
*onfinish
) :
1830 m_oc(oc
), m_len(len
), m_trace(trace
), m_onfinish(onfinish
) {}
1831 void finish(int r
) override
;
1835 ZTracer::Trace m_trace
;
1836 Context
*m_onfinish
;
1839 void ObjectCacher::C_WaitForWrite::finish(int r
)
1841 std::lock_guard
l(m_oc
->lock
);
1842 m_oc
->_maybe_wait_for_writeback(m_len
, &m_trace
);
1843 m_onfinish
->complete(r
);
1846 void ObjectCacher::_maybe_wait_for_writeback(uint64_t len
,
1847 ZTracer::Trace
*trace
)
1849 ceph_assert(ceph_mutex_is_locked(lock
));
1850 ceph::mono_time start
= ceph::mono_clock::now();
1852 // wait for writeback?
1853 // - wait for dirty and tx bytes (relative to the max_dirty threshold)
1854 // - do not wait for bytes other waiters are waiting on. this means that
1855 // threads do not wait for each other. this effectively allows the cache
1856 // size to balloon proportional to the data that is in flight.
1858 uint64_t max_dirty_bh
= max_dirty
>> BUFFER_MEMORY_WEIGHT
;
1859 while (get_stat_dirty() + get_stat_tx() > 0 &&
1860 (((uint64_t)(get_stat_dirty() + get_stat_tx()) >=
1861 max_dirty
+ get_stat_dirty_waiting()) ||
1862 (dirty_or_tx_bh
.size() >=
1863 max_dirty_bh
+ get_stat_nr_dirty_waiters()))) {
1866 trace
->event("start wait for writeback");
1868 ldout(cct
, 10) << __func__
<< " waiting for dirty|tx "
1869 << (get_stat_dirty() + get_stat_tx()) << " >= max "
1870 << max_dirty
<< " + dirty_waiting "
1871 << get_stat_dirty_waiting() << dendl
;
1872 flusher_cond
.notify_all();
1873 stat_dirty_waiting
+= len
;
1874 ++stat_nr_dirty_waiters
;
1875 std::unique_lock l
{lock
, std::adopt_lock
};
1878 stat_dirty_waiting
-= len
;
1879 --stat_nr_dirty_waiters
;
1881 ldout(cct
, 10) << __func__
<< " woke up" << dendl
;
1884 trace
->event("finish wait for writeback");
1886 if (blocked
&& perfcounter
) {
1887 perfcounter
->inc(l_objectcacher_write_ops_blocked
);
1888 perfcounter
->inc(l_objectcacher_write_bytes_blocked
, len
);
1889 ceph::timespan blocked
= ceph::mono_clock::now() - start
;
1890 perfcounter
->tinc(l_objectcacher_write_time_blocked
, blocked
);
1894 // blocking wait for write.
1895 int ObjectCacher::_wait_for_write(OSDWrite
*wr
, uint64_t len
, ObjectSet
*oset
,
1896 ZTracer::Trace
*trace
, Context
*onfreespace
)
1898 ceph_assert(ceph_mutex_is_locked(lock
));
1899 ceph_assert(trace
!= nullptr);
1902 if (max_dirty
> 0 && !(wr
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_FUA
)) {
1903 if (block_writes_upfront
) {
1904 _maybe_wait_for_writeback(len
, trace
);
1906 onfreespace
->complete(0);
1908 ceph_assert(onfreespace
);
1909 finisher
.queue(new C_WaitForWrite(this, len
, *trace
, onfreespace
));
1912 // write-thru! flush what we just wrote.
1913 ceph::condition_variable cond
;
1915 Context
*fin
= block_writes_upfront
?
1916 new C_Cond(cond
, &done
, &ret
) : onfreespace
;
1918 bool flushed
= flush_set(oset
, wr
->extents
, trace
, fin
);
1919 ceph_assert(!flushed
); // we just dirtied it, and didn't drop our lock!
1920 ldout(cct
, 10) << "wait_for_write waiting on write-thru of " << len
1921 << " bytes" << dendl
;
1922 if (block_writes_upfront
) {
1923 std::unique_lock l
{lock
, std::adopt_lock
};
1924 cond
.wait(l
, [&done
] { return done
; });
1926 ldout(cct
, 10) << "wait_for_write woke up, ret " << ret
<< dendl
;
1928 onfreespace
->complete(ret
);
1932 // start writeback anyway?
1933 if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty
) {
1934 ldout(cct
, 10) << "wait_for_write " << get_stat_dirty() << " > target "
1935 << target_dirty
<< ", nudging flusher" << dendl
;
1936 flusher_cond
.notify_all();
1941 void ObjectCacher::flusher_entry()
1943 ldout(cct
, 10) << "flusher start" << dendl
;
1944 std::unique_lock l
{lock
};
1945 while (!flusher_stop
) {
1946 loff_t all
= get_stat_tx() + get_stat_rx() + get_stat_clean() +
1948 ldout(cct
, 11) << "flusher "
1949 << all
<< " / " << max_size
<< ": "
1950 << get_stat_tx() << " tx, "
1951 << get_stat_rx() << " rx, "
1952 << get_stat_clean() << " clean, "
1953 << get_stat_dirty() << " dirty ("
1954 << target_dirty
<< " target, "
1955 << max_dirty
<< " max)"
1957 loff_t actual
= get_stat_dirty() + get_stat_dirty_waiting();
1959 ZTracer::Trace trace
;
1960 if (cct
->_conf
->osdc_blkin_trace_all
) {
1961 trace
.init("flusher", &trace_endpoint
);
1962 trace
.event("start");
1965 if (actual
> 0 && (uint64_t) actual
> target_dirty
) {
1966 // flush some dirty pages
1967 ldout(cct
, 10) << "flusher " << get_stat_dirty() << " dirty + "
1968 << get_stat_dirty_waiting() << " dirty_waiting > target "
1969 << target_dirty
<< ", flushing some dirty bhs" << dendl
;
1970 flush(&trace
, actual
- target_dirty
);
1972 // check tail of lru for old dirty items
1973 ceph::real_time cutoff
= ceph::real_clock::now();
1974 cutoff
-= max_dirty_age
;
1976 int max
= MAX_FLUSH_UNDER_LOCK
;
1977 while ((bh
= static_cast<BufferHead
*>(bh_lru_dirty
.
1978 lru_get_next_expire())) != 0 &&
1979 bh
->last_write
<= cutoff
&&
1981 ldout(cct
, 10) << "flusher flushing aged dirty bh " << *bh
<< dendl
;
1982 if (scattered_write
) {
1983 bh_write_adjacencies(bh
, cutoff
, NULL
, &max
);
1985 bh_write(bh
, trace
);
1990 // back off the lock to avoid starving other threads
1991 trace
.event("backoff");
1998 trace
.event("finish");
2002 flusher_cond
.wait_for(l
, 1s
);
2005 /* Wait for reads to finish. This is only possible if handling
2006 * -ENOENT made some read completions finish before their rados read
2007 * came back. If we don't wait for them, and destroy the cache, when
2008 * the rados reads do come back their callback will try to access the
2009 * no-longer-valid ObjectCacher.
2011 read_cond
.wait(l
, [this] {
2012 if (reads_outstanding
> 0) {
2013 ldout(cct
, 10) << "Waiting for all reads to complete. Number left: "
2014 << reads_outstanding
<< dendl
;
2020 ldout(cct
, 10) << "flusher finish" << dendl
;
2024 // -------------------------------------------------
2026 bool ObjectCacher::set_is_empty(ObjectSet
*oset
)
2028 ceph_assert(ceph_mutex_is_locked(lock
));
2029 if (oset
->objects
.empty())
2032 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin(); !p
.end(); ++p
)
2033 if (!(*p
)->is_empty())
2039 bool ObjectCacher::set_is_cached(ObjectSet
*oset
)
2041 ceph_assert(ceph_mutex_is_locked(lock
));
2042 if (oset
->objects
.empty())
2045 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin();
2048 for (map
<loff_t
,BufferHead
*>::iterator q
= ob
->data
.begin();
2049 q
!= ob
->data
.end();
2051 BufferHead
*bh
= q
->second
;
2052 if (!bh
->is_dirty() && !bh
->is_tx())
2060 bool ObjectCacher::set_is_dirty_or_committing(ObjectSet
*oset
)
2062 ceph_assert(ceph_mutex_is_locked(lock
));
2063 if (oset
->objects
.empty())
2066 for (xlist
<Object
*>::iterator i
= oset
->objects
.begin();
2070 for (map
<loff_t
,BufferHead
*>::iterator p
= ob
->data
.begin();
2071 p
!= ob
->data
.end();
2073 BufferHead
*bh
= p
->second
;
2074 if (bh
->is_dirty() || bh
->is_tx())
2083 // purge. non-blocking. violently removes dirty buffers from cache.
2084 void ObjectCacher::purge(Object
*ob
)
2086 ceph_assert(ceph_mutex_is_locked(lock
));
2087 ldout(cct
, 10) << "purge " << *ob
<< dendl
;
2093 // flush. non-blocking. no callback.
2094 // true if clean, already flushed.
2095 // false if we wrote something.
2096 // be sloppy about the ranges and flush any buffer it touches
2097 bool ObjectCacher::flush(Object
*ob
, loff_t offset
, loff_t length
,
2098 ZTracer::Trace
*trace
)
2100 ceph_assert(trace
!= nullptr);
2101 ceph_assert(ceph_mutex_is_locked(lock
));
2102 list
<BufferHead
*> blist
;
2104 ldout(cct
, 10) << "flush " << *ob
<< " " << offset
<< "~" << length
<< dendl
;
2105 for (map
<loff_t
,BufferHead
*>::const_iterator p
= ob
->data_lower_bound(offset
);
2106 p
!= ob
->data
.end();
2108 BufferHead
*bh
= p
->second
;
2109 ldout(cct
, 20) << "flush " << *bh
<< dendl
;
2110 if (length
&& bh
->start() > offset
+length
) {
2117 if (!bh
->is_dirty()) {
2121 if (scattered_write
)
2122 blist
.push_back(bh
);
2124 bh_write(bh
, *trace
);
2127 if (scattered_write
&& !blist
.empty())
2128 bh_write_scattered(blist
);
2133 bool ObjectCacher::_flush_set_finish(C_GatherBuilder
*gather
,
2136 ceph_assert(ceph_mutex_is_locked(lock
));
2137 if (gather
->has_subs()) {
2138 gather
->set_finisher(onfinish
);
2143 ldout(cct
, 10) << "flush_set has no dirty|tx bhs" << dendl
;
2144 onfinish
->complete(0);
2148 // flush. non-blocking, takes callback.
2149 // returns true if already flushed
2150 bool ObjectCacher::flush_set(ObjectSet
*oset
, Context
*onfinish
)
2152 ceph_assert(ceph_mutex_is_locked(lock
));
2153 ceph_assert(onfinish
!= NULL
);
2154 if (oset
->objects
.empty()) {
2155 ldout(cct
, 10) << "flush_set on " << oset
<< " dne" << dendl
;
2156 onfinish
->complete(0);
2160 ldout(cct
, 10) << "flush_set " << oset
<< dendl
;
2162 // we'll need to wait for all objects to flush!
2163 C_GatherBuilder
gather(cct
);
2164 set
<Object
*> waitfor_commit
;
2166 list
<BufferHead
*> blist
;
2167 Object
*last_ob
= NULL
;
2168 set
<BufferHead
*, BufferHead::ptr_lt
>::const_iterator it
, p
, q
;
2170 // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
2171 // order. But items in oset->objects are not sorted. So the iterator can
2172 // point to any buffer head in the ObjectSet
2173 BufferHead
key(*oset
->objects
.begin());
2174 it
= dirty_or_tx_bh
.lower_bound(&key
);
2177 bool backwards
= true;
2178 if (it
!= dirty_or_tx_bh
.begin())
2183 for (; p
!= dirty_or_tx_bh
.end(); p
= q
) {
2185 BufferHead
*bh
= *p
;
2186 if (bh
->ob
->oset
!= oset
)
2188 waitfor_commit
.insert(bh
->ob
);
2189 if (bh
->is_dirty()) {
2190 if (scattered_write
) {
2191 if (last_ob
!= bh
->ob
) {
2192 if (!blist
.empty()) {
2193 bh_write_scattered(blist
);
2198 blist
.push_back(bh
);
2206 for(p
= q
= it
; true; p
= q
) {
2207 if (q
!= dirty_or_tx_bh
.begin())
2211 BufferHead
*bh
= *p
;
2212 if (bh
->ob
->oset
!= oset
)
2214 waitfor_commit
.insert(bh
->ob
);
2215 if (bh
->is_dirty()) {
2216 if (scattered_write
) {
2217 if (last_ob
!= bh
->ob
) {
2218 if (!blist
.empty()) {
2219 bh_write_scattered(blist
);
2224 blist
.push_front(bh
);
2234 if (scattered_write
&& !blist
.empty())
2235 bh_write_scattered(blist
);
2237 for (set
<Object
*>::iterator i
= waitfor_commit
.begin();
2238 i
!= waitfor_commit
.end(); ++i
) {
2241 // we'll need to gather...
2242 ldout(cct
, 10) << "flush_set " << oset
<< " will wait for ack tid "
2243 << ob
->last_write_tid
<< " on " << *ob
<< dendl
;
2244 ob
->waitfor_commit
[ob
->last_write_tid
].push_back(gather
.new_sub());
2247 return _flush_set_finish(&gather
, onfinish
);
2250 // flush. non-blocking, takes callback.
2251 // returns true if already flushed
2252 bool ObjectCacher::flush_set(ObjectSet
*oset
, vector
<ObjectExtent
>& exv
,
2253 ZTracer::Trace
*trace
, Context
*onfinish
)
2255 ceph_assert(ceph_mutex_is_locked(lock
));
2256 ceph_assert(trace
!= nullptr);
2257 ceph_assert(onfinish
!= NULL
);
2258 if (oset
->objects
.empty()) {
2259 ldout(cct
, 10) << "flush_set on " << oset
<< " dne" << dendl
;
2260 onfinish
->complete(0);
2264 ldout(cct
, 10) << "flush_set " << oset
<< " on " << exv
.size()
2265 << " ObjectExtents" << dendl
;
2267 // we'll need to wait for all objects to flush!
2268 C_GatherBuilder
gather(cct
);
2270 for (vector
<ObjectExtent
>::iterator p
= exv
.begin();
2273 ObjectExtent
&ex
= *p
;
2274 sobject_t
soid(ex
.oid
, CEPH_NOSNAP
);
2275 if (objects
[oset
->poolid
].count(soid
) == 0)
2277 Object
*ob
= objects
[oset
->poolid
][soid
];
2279 ldout(cct
, 20) << "flush_set " << oset
<< " ex " << ex
<< " ob " << soid
2280 << " " << ob
<< dendl
;
2282 if (!flush(ob
, ex
.offset
, ex
.length
, trace
)) {
2283 // we'll need to gather...
2284 ldout(cct
, 10) << "flush_set " << oset
<< " will wait for ack tid "
2285 << ob
->last_write_tid
<< " on " << *ob
<< dendl
;
2286 ob
->waitfor_commit
[ob
->last_write_tid
].push_back(gather
.new_sub());
2290 return _flush_set_finish(&gather
, onfinish
);
2293 // flush all dirty data. non-blocking, takes callback.
2294 // returns true if already flushed
2295 bool ObjectCacher::flush_all(Context
*onfinish
)
2297 ceph_assert(ceph_mutex_is_locked(lock
));
2298 ceph_assert(onfinish
!= NULL
);
2300 ldout(cct
, 10) << "flush_all " << dendl
;
2302 // we'll need to wait for all objects to flush!
2303 C_GatherBuilder
gather(cct
);
2304 set
<Object
*> waitfor_commit
;
2306 list
<BufferHead
*> blist
;
2307 Object
*last_ob
= NULL
;
2308 set
<BufferHead
*, BufferHead::ptr_lt
>::iterator next
, it
;
2309 next
= it
= dirty_or_tx_bh
.begin();
2310 while (it
!= dirty_or_tx_bh
.end()) {
2312 BufferHead
*bh
= *it
;
2313 waitfor_commit
.insert(bh
->ob
);
2315 if (bh
->is_dirty()) {
2316 if (scattered_write
) {
2317 if (last_ob
!= bh
->ob
) {
2318 if (!blist
.empty()) {
2319 bh_write_scattered(blist
);
2324 blist
.push_back(bh
);
2333 if (scattered_write
&& !blist
.empty())
2334 bh_write_scattered(blist
);
2336 for (set
<Object
*>::iterator i
= waitfor_commit
.begin();
2337 i
!= waitfor_commit
.end();
2341 // we'll need to gather...
2342 ldout(cct
, 10) << "flush_all will wait for ack tid "
2343 << ob
->last_write_tid
<< " on " << *ob
<< dendl
;
2344 ob
->waitfor_commit
[ob
->last_write_tid
].push_back(gather
.new_sub());
2347 return _flush_set_finish(&gather
, onfinish
);
2350 void ObjectCacher::purge_set(ObjectSet
*oset
)
2352 ceph_assert(ceph_mutex_is_locked(lock
));
2353 if (oset
->objects
.empty()) {
2354 ldout(cct
, 10) << "purge_set on " << oset
<< " dne" << dendl
;
2358 ldout(cct
, 10) << "purge_set " << oset
<< dendl
;
2359 const bool were_dirty
= oset
->dirty_or_tx
> 0;
2361 for (xlist
<Object
*>::iterator i
= oset
->objects
.begin();
2367 // Although we have purged rather than flushed, caller should still
2368 // drop any resources associate with dirty data.
2369 ceph_assert(oset
->dirty_or_tx
== 0);
2370 if (flush_set_callback
&& were_dirty
) {
2371 flush_set_callback(flush_set_callback_arg
, oset
);
2376 loff_t
ObjectCacher::release(Object
*ob
)
2378 ceph_assert(ceph_mutex_is_locked(lock
));
2379 list
<BufferHead
*> clean
;
2380 loff_t o_unclean
= 0;
2382 for (map
<loff_t
,BufferHead
*>::iterator p
= ob
->data
.begin();
2383 p
!= ob
->data
.end();
2385 BufferHead
*bh
= p
->second
;
2386 if (bh
->is_clean() || bh
->is_zero() || bh
->is_error())
2387 clean
.push_back(bh
);
2389 o_unclean
+= bh
->length();
2392 for (list
<BufferHead
*>::iterator p
= clean
.begin();
2399 if (ob
->can_close()) {
2400 ldout(cct
, 10) << "release trimming " << *ob
<< dendl
;
2402 ceph_assert(o_unclean
== 0);
2407 ldout(cct
, 10) << "release clearing complete on " << *ob
<< dendl
;
2408 ob
->complete
= false;
2411 ldout(cct
, 10) << "release setting exists on " << *ob
<< dendl
;
2418 loff_t
ObjectCacher::release_set(ObjectSet
*oset
)
2420 ceph_assert(ceph_mutex_is_locked(lock
));
2421 // return # bytes not clean (and thus not released).
2424 if (oset
->objects
.empty()) {
2425 ldout(cct
, 10) << "release_set on " << oset
<< " dne" << dendl
;
2429 ldout(cct
, 10) << "release_set " << oset
<< dendl
;
2431 xlist
<Object
*>::iterator q
;
2432 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin();
2438 loff_t o_unclean
= release(ob
);
2439 unclean
+= o_unclean
;
2442 ldout(cct
, 10) << "release_set " << oset
<< " " << *ob
2443 << " has " << o_unclean
<< " bytes left"
2449 ldout(cct
, 10) << "release_set " << oset
2450 << ", " << unclean
<< " bytes left" << dendl
;
2457 uint64_t ObjectCacher::release_all()
2459 ceph_assert(ceph_mutex_is_locked(lock
));
2460 ldout(cct
, 10) << "release_all" << dendl
;
2461 uint64_t unclean
= 0;
2463 vector
<ceph::unordered_map
<sobject_t
, Object
*> >::iterator i
2465 while (i
!= objects
.end()) {
2466 ceph::unordered_map
<sobject_t
, Object
*>::iterator p
= i
->begin();
2467 while (p
!= i
->end()) {
2468 ceph::unordered_map
<sobject_t
, Object
*>::iterator n
= p
;
2471 Object
*ob
= p
->second
;
2473 loff_t o_unclean
= release(ob
);
2474 unclean
+= o_unclean
;
2477 ldout(cct
, 10) << "release_all " << *ob
2478 << " has " << o_unclean
<< " bytes left"
2486 ldout(cct
, 10) << "release_all unclean " << unclean
<< " bytes left"
2493 void ObjectCacher::clear_nonexistence(ObjectSet
*oset
)
2495 ceph_assert(ceph_mutex_is_locked(lock
));
2496 ldout(cct
, 10) << "clear_nonexistence() " << oset
<< dendl
;
2498 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin();
2502 ldout(cct
, 10) << " setting exists and complete on " << *ob
<< dendl
;
2504 ob
->complete
= false;
2506 for (xlist
<C_ReadFinish
*>::iterator q
= ob
->reads
.begin();
2508 C_ReadFinish
*comp
= *q
;
2509 comp
->distrust_enoent();
2515 * discard object extents from an ObjectSet by removing the objects in
2516 * exls from the in-memory oset.
2518 void ObjectCacher::discard_set(ObjectSet
*oset
, const vector
<ObjectExtent
>& exls
)
2520 ceph_assert(ceph_mutex_is_locked(lock
));
2521 bool was_dirty
= oset
->dirty_or_tx
> 0;
2523 _discard(oset
, exls
, nullptr);
2524 _discard_finish(oset
, was_dirty
, nullptr);
2528 * discard object extents from an ObjectSet by removing the objects in
2529 * exls from the in-memory oset. If the bh is in TX state, the discard
2530 * will wait for the write to commit prior to invoking on_finish.
2532 void ObjectCacher::discard_writeback(ObjectSet
*oset
,
2533 const vector
<ObjectExtent
>& exls
,
2536 ceph_assert(ceph_mutex_is_locked(lock
));
2537 bool was_dirty
= oset
->dirty_or_tx
> 0;
2539 C_GatherBuilder
gather(cct
);
2540 _discard(oset
, exls
, &gather
);
2542 if (gather
.has_subs()) {
2543 bool flushed
= was_dirty
&& oset
->dirty_or_tx
== 0;
2544 gather
.set_finisher(new LambdaContext(
2545 [this, oset
, flushed
, on_finish
](int) {
2546 ceph_assert(ceph_mutex_is_locked(lock
));
2547 if (flushed
&& flush_set_callback
)
2548 flush_set_callback(flush_set_callback_arg
, oset
);
2550 on_finish
->complete(0);
2556 _discard_finish(oset
, was_dirty
, on_finish
);
2559 void ObjectCacher::_discard(ObjectSet
*oset
, const vector
<ObjectExtent
>& exls
,
2560 C_GatherBuilder
* gather
)
2562 if (oset
->objects
.empty()) {
2563 ldout(cct
, 10) << __func__
<< " on " << oset
<< " dne" << dendl
;
2567 ldout(cct
, 10) << __func__
<< " " << oset
<< dendl
;
2569 for (auto& ex
: exls
) {
2570 ldout(cct
, 10) << __func__
<< " " << oset
<< " ex " << ex
<< dendl
;
2571 sobject_t
soid(ex
.oid
, CEPH_NOSNAP
);
2572 if (objects
[oset
->poolid
].count(soid
) == 0)
2574 Object
*ob
= objects
[oset
->poolid
][soid
];
2576 ob
->discard(ex
.offset
, ex
.length
, gather
);
2580 void ObjectCacher::_discard_finish(ObjectSet
*oset
, bool was_dirty
,
2583 ceph_assert(ceph_mutex_is_locked(lock
));
2585 // did we truncate off dirty data?
2586 if (flush_set_callback
&& was_dirty
&& oset
->dirty_or_tx
== 0) {
2587 flush_set_callback(flush_set_callback_arg
, oset
);
2590 // notify that in-flight writeback has completed
2591 if (on_finish
!= nullptr) {
2592 on_finish
->complete(0);
2596 void ObjectCacher::verify_stats() const
2598 ceph_assert(ceph_mutex_is_locked(lock
));
2599 ldout(cct
, 10) << "verify_stats" << dendl
;
2601 loff_t clean
= 0, zero
= 0, dirty
= 0, rx
= 0, tx
= 0, missing
= 0,
2603 for (vector
<ceph::unordered_map
<sobject_t
, Object
*> >::const_iterator i
2607 for (ceph::unordered_map
<sobject_t
, Object
*>::const_iterator p
2611 Object
*ob
= p
->second
;
2612 for (map
<loff_t
, BufferHead
*>::const_iterator q
= ob
->data
.begin();
2613 q
!= ob
->data
.end();
2615 BufferHead
*bh
= q
->second
;
2616 switch (bh
->get_state()) {
2617 case BufferHead::STATE_MISSING
:
2618 missing
+= bh
->length();
2620 case BufferHead::STATE_CLEAN
:
2621 clean
+= bh
->length();
2623 case BufferHead::STATE_ZERO
:
2624 zero
+= bh
->length();
2626 case BufferHead::STATE_DIRTY
:
2627 dirty
+= bh
->length();
2629 case BufferHead::STATE_TX
:
2632 case BufferHead::STATE_RX
:
2635 case BufferHead::STATE_ERROR
:
2636 error
+= bh
->length();
2645 ldout(cct
, 10) << " clean " << clean
<< " rx " << rx
<< " tx " << tx
2646 << " dirty " << dirty
<< " missing " << missing
2647 << " error " << error
<< dendl
;
2648 ceph_assert(clean
== stat_clean
);
2649 ceph_assert(rx
== stat_rx
);
2650 ceph_assert(tx
== stat_tx
);
2651 ceph_assert(dirty
== stat_dirty
);
2652 ceph_assert(missing
== stat_missing
);
2653 ceph_assert(zero
== stat_zero
);
2654 ceph_assert(error
== stat_error
);
2657 void ObjectCacher::bh_stat_add(BufferHead
*bh
)
2659 ceph_assert(ceph_mutex_is_locked(lock
));
2660 switch (bh
->get_state()) {
2661 case BufferHead::STATE_MISSING
:
2662 stat_missing
+= bh
->length();
2664 case BufferHead::STATE_CLEAN
:
2665 stat_clean
+= bh
->length();
2667 case BufferHead::STATE_ZERO
:
2668 stat_zero
+= bh
->length();
2670 case BufferHead::STATE_DIRTY
:
2671 stat_dirty
+= bh
->length();
2672 bh
->ob
->dirty_or_tx
+= bh
->length();
2673 bh
->ob
->oset
->dirty_or_tx
+= bh
->length();
2675 case BufferHead::STATE_TX
:
2676 stat_tx
+= bh
->length();
2677 bh
->ob
->dirty_or_tx
+= bh
->length();
2678 bh
->ob
->oset
->dirty_or_tx
+= bh
->length();
2680 case BufferHead::STATE_RX
:
2681 stat_rx
+= bh
->length();
2683 case BufferHead::STATE_ERROR
:
2684 stat_error
+= bh
->length();
2687 ceph_abort_msg("bh_stat_add: invalid bufferhead state");
2689 if (get_stat_dirty_waiting() > 0)
2690 stat_cond
.notify_all();
2693 void ObjectCacher::bh_stat_sub(BufferHead
*bh
)
2695 ceph_assert(ceph_mutex_is_locked(lock
));
2696 switch (bh
->get_state()) {
2697 case BufferHead::STATE_MISSING
:
2698 stat_missing
-= bh
->length();
2700 case BufferHead::STATE_CLEAN
:
2701 stat_clean
-= bh
->length();
2703 case BufferHead::STATE_ZERO
:
2704 stat_zero
-= bh
->length();
2706 case BufferHead::STATE_DIRTY
:
2707 stat_dirty
-= bh
->length();
2708 bh
->ob
->dirty_or_tx
-= bh
->length();
2709 bh
->ob
->oset
->dirty_or_tx
-= bh
->length();
2711 case BufferHead::STATE_TX
:
2712 stat_tx
-= bh
->length();
2713 bh
->ob
->dirty_or_tx
-= bh
->length();
2714 bh
->ob
->oset
->dirty_or_tx
-= bh
->length();
2716 case BufferHead::STATE_RX
:
2717 stat_rx
-= bh
->length();
2719 case BufferHead::STATE_ERROR
:
2720 stat_error
-= bh
->length();
2723 ceph_abort_msg("bh_stat_sub: invalid bufferhead state");
2727 void ObjectCacher::bh_set_state(BufferHead
*bh
, int s
)
2729 ceph_assert(ceph_mutex_is_locked(lock
));
2730 int state
= bh
->get_state();
2731 // move between lru lists?
2732 if (s
== BufferHead::STATE_DIRTY
&& state
!= BufferHead::STATE_DIRTY
) {
2733 bh_lru_rest
.lru_remove(bh
);
2734 bh_lru_dirty
.lru_insert_top(bh
);
2735 } else if (s
!= BufferHead::STATE_DIRTY
&&state
== BufferHead::STATE_DIRTY
) {
2736 bh_lru_dirty
.lru_remove(bh
);
2737 if (bh
->get_dontneed())
2738 bh_lru_rest
.lru_insert_bot(bh
);
2740 bh_lru_rest
.lru_insert_top(bh
);
2743 if ((s
== BufferHead::STATE_TX
||
2744 s
== BufferHead::STATE_DIRTY
) &&
2745 state
!= BufferHead::STATE_TX
&&
2746 state
!= BufferHead::STATE_DIRTY
) {
2747 dirty_or_tx_bh
.insert(bh
);
2748 } else if ((state
== BufferHead::STATE_TX
||
2749 state
== BufferHead::STATE_DIRTY
) &&
2750 s
!= BufferHead::STATE_TX
&&
2751 s
!= BufferHead::STATE_DIRTY
) {
2752 dirty_or_tx_bh
.erase(bh
);
2755 if (s
!= BufferHead::STATE_ERROR
&&
2756 state
== BufferHead::STATE_ERROR
) {
2766 void ObjectCacher::bh_add(Object
*ob
, BufferHead
*bh
)
2768 ceph_assert(ceph_mutex_is_locked(lock
));
2769 ldout(cct
, 30) << "bh_add " << *ob
<< " " << *bh
<< dendl
;
2771 if (bh
->is_dirty()) {
2772 bh_lru_dirty
.lru_insert_top(bh
);
2773 dirty_or_tx_bh
.insert(bh
);
2775 if (bh
->get_dontneed())
2776 bh_lru_rest
.lru_insert_bot(bh
);
2778 bh_lru_rest
.lru_insert_top(bh
);
2782 dirty_or_tx_bh
.insert(bh
);
2787 void ObjectCacher::bh_remove(Object
*ob
, BufferHead
*bh
)
2789 ceph_assert(ceph_mutex_is_locked(lock
));
2790 ceph_assert(bh
->get_journal_tid() == 0);
2791 ldout(cct
, 30) << "bh_remove " << *ob
<< " " << *bh
<< dendl
;
2793 if (bh
->is_dirty()) {
2794 bh_lru_dirty
.lru_remove(bh
);
2795 dirty_or_tx_bh
.erase(bh
);
2797 bh_lru_rest
.lru_remove(bh
);
2801 dirty_or_tx_bh
.erase(bh
);
2804 if (get_stat_dirty_waiting() > 0)
2805 stat_cond
.notify_all();