1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include "msg/Messenger.h"
7 #include "ObjectCacher.h"
8 #include "WritebackHandler.h"
9 #include "common/errno.h"
10 #include "common/perf_counters.h"
12 #include "include/ceph_assert.h"
14 #define MAX_FLUSH_UNDER_LOCK 20 ///< max bh's we start writeback on
15 #define BUFFER_MEMORY_WEIGHT CEPH_PAGE_SHIFT // memory usage of BufferHead, count in (1<<n)
17 using std::chrono::seconds
;
18 /// while holding the lock
20 /*** ObjectCacher::BufferHead ***/
23 /*** ObjectCacher::Object ***/
25 #define dout_subsys ceph_subsys_objectcacher
27 #define dout_prefix *_dout << "objectcacher.object(" << oid << ") "
31 class ObjectCacher::C_ReadFinish
: public Context
{
37 xlist
<C_ReadFinish
*>::item set_item
;
44 C_ReadFinish(ObjectCacher
*c
, Object
*ob
, ceph_tid_t t
, loff_t s
,
45 uint64_t l
, const ZTracer::Trace
&trace
) :
46 oc(c
), poolid(ob
->oloc
.pool
), oid(ob
->get_soid()), start(s
), length(l
),
47 set_item(this), trust_enoent(true),
48 tid(t
), trace(trace
) {
49 ob
->reads
.push_back(&set_item
);
52 void finish(int r
) override
{
53 oc
->bh_read_finish(poolid
, oid
, tid
, start
, length
, bl
, r
, trust_enoent
);
54 trace
.event("finish");
56 // object destructor clears the list
57 if (set_item
.is_on_list())
58 set_item
.remove_myself();
61 void distrust_enoent() {
66 class ObjectCacher::C_RetryRead
: public Context
{
73 C_RetryRead(ObjectCacher
*_oc
, OSDRead
*r
, ObjectSet
*os
, Context
*c
,
74 const ZTracer::Trace
&trace
)
75 : oc(_oc
), rd(r
), oset(os
), onfinish(c
), trace(trace
) {
77 void finish(int r
) override
{
79 r
= oc
->_readx(rd
, oset
, onfinish
, false, &trace
);
83 // read is still in-progress
87 trace
.event("finish");
89 onfinish
->complete(r
);
94 ObjectCacher::BufferHead
*ObjectCacher::Object::split(BufferHead
*left
,
97 ceph_assert(ceph_mutex_is_locked(oc
->lock
));
98 ldout(oc
->cct
, 20) << "split " << *left
<< " at " << off
<< dendl
;
101 ObjectCacher::BufferHead
*right
= new BufferHead(this);
103 //inherit and if later access, this auto clean.
104 right
->set_dontneed(left
->get_dontneed());
105 right
->set_nocache(left
->get_nocache());
107 right
->last_write_tid
= left
->last_write_tid
;
108 right
->last_read_tid
= left
->last_read_tid
;
109 right
->set_state(left
->get_state());
110 right
->snapc
= left
->snapc
;
111 right
->set_journal_tid(left
->journal_tid
);
113 loff_t newleftlen
= off
- left
->start();
114 right
->set_start(off
);
115 right
->set_length(left
->length() - newleftlen
);
118 oc
->bh_stat_sub(left
);
119 left
->set_length(newleftlen
);
120 oc
->bh_stat_add(left
);
123 oc
->bh_add(this, right
);
129 ceph_assert(bl
.length() == (left
->length() + right
->length()));
130 right
->bl
.substr_of(bl
, left
->length(), right
->length());
131 left
->bl
.substr_of(bl
, 0, left
->length());
135 if (!left
->waitfor_read
.empty()) {
136 map
<loff_t
, list
<Context
*> >::iterator start_remove
137 = left
->waitfor_read
.begin();
138 while (start_remove
!= left
->waitfor_read
.end() &&
139 start_remove
->first
< right
->start())
141 for (map
<loff_t
, list
<Context
*> >::iterator p
= start_remove
;
142 p
!= left
->waitfor_read
.end(); ++p
) {
143 ldout(oc
->cct
, 20) << "split moving waiters at byte " << p
->first
144 << " to right bh" << dendl
;
145 right
->waitfor_read
[p
->first
].swap( p
->second
);
146 ceph_assert(p
->second
.empty());
148 left
->waitfor_read
.erase(start_remove
, left
->waitfor_read
.end());
151 ldout(oc
->cct
, 20) << "split left is " << *left
<< dendl
;
152 ldout(oc
->cct
, 20) << "split right is " << *right
<< dendl
;
157 void ObjectCacher::Object::merge_left(BufferHead
*left
, BufferHead
*right
)
159 ceph_assert(ceph_mutex_is_locked(oc
->lock
));
161 ldout(oc
->cct
, 10) << "merge_left " << *left
<< " + " << *right
<< dendl
;
162 if (left
->get_journal_tid() == 0) {
163 left
->set_journal_tid(right
->get_journal_tid());
165 right
->set_journal_tid(0);
167 oc
->bh_remove(this, right
);
168 oc
->bh_stat_sub(left
);
169 left
->set_length(left
->length() + right
->length());
170 oc
->bh_stat_add(left
);
173 left
->bl
.claim_append(right
->bl
);
176 // note: this is sorta busted, but should only be used for dirty buffers
177 left
->last_write_tid
= std::max( left
->last_write_tid
, right
->last_write_tid
);
178 left
->last_write
= std::max( left
->last_write
, right
->last_write
);
180 left
->set_dontneed(right
->get_dontneed() ? left
->get_dontneed() : false);
181 left
->set_nocache(right
->get_nocache() ? left
->get_nocache() : false);
184 for (map
<loff_t
, list
<Context
*> >::iterator p
= right
->waitfor_read
.begin();
185 p
!= right
->waitfor_read
.end();
187 left
->waitfor_read
[p
->first
].splice(left
->waitfor_read
[p
->first
].begin(),
193 ldout(oc
->cct
, 10) << "merge_left result " << *left
<< dendl
;
196 bool ObjectCacher::Object::can_merge_bh(BufferHead
*left
, BufferHead
*right
)
198 if (left
->end() != right
->start() ||
199 left
->get_state() != right
->get_state() ||
200 !left
->can_merge_journal(right
))
202 if (left
->is_tx() && left
->last_write_tid
!= right
->last_write_tid
)
207 void ObjectCacher::Object::try_merge_bh(BufferHead
*bh
)
209 ceph_assert(ceph_mutex_is_locked(oc
->lock
));
210 ldout(oc
->cct
, 10) << "try_merge_bh " << *bh
<< dendl
;
212 // do not merge rx buffers; last_read_tid may not match
217 map
<loff_t
,BufferHead
*>::iterator p
= data
.find(bh
->start());
218 ceph_assert(p
->second
== bh
);
219 if (p
!= data
.begin()) {
221 if (can_merge_bh(p
->second
, bh
)) {
222 merge_left(p
->second
, bh
);
229 ceph_assert(p
->second
== bh
);
231 if (p
!= data
.end() && can_merge_bh(bh
, p
->second
))
232 merge_left(bh
, p
->second
);
234 maybe_rebuild_buffer(bh
);
237 void ObjectCacher::Object::maybe_rebuild_buffer(BufferHead
*bh
)
240 if (bl
.get_num_buffers() <= 1)
243 auto wasted
= bl
.get_wasted_space();
244 if (wasted
* 2 > bl
.length() &&
245 wasted
> (1U << BUFFER_MEMORY_WEIGHT
))
250 * count bytes we have cached in given range
252 bool ObjectCacher::Object::is_cached(loff_t cur
, loff_t left
) const
254 ceph_assert(ceph_mutex_is_locked(oc
->lock
));
255 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(cur
);
260 if (p
->first
<= cur
) {
262 loff_t lenfromcur
= std::min(p
->second
->end() - cur
, left
);
267 } else if (p
->first
> cur
) {
278 * all cached data in this range[off, off+len]
280 bool ObjectCacher::Object::include_all_cached_data(loff_t off
, loff_t len
)
282 ceph_assert(ceph_mutex_is_locked(oc
->lock
));
285 map
<loff_t
, BufferHead
*>::iterator first
= data
.begin();
286 map
<loff_t
, BufferHead
*>::reverse_iterator last
= data
.rbegin();
287 if (first
->second
->start() >= off
&& last
->second
->end() <= (off
+ len
))
294 * map a range of bytes into buffer_heads.
295 * - create missing buffer_heads as necessary.
297 int ObjectCacher::Object::map_read(ObjectExtent
&ex
,
298 map
<loff_t
, BufferHead
*>& hits
,
299 map
<loff_t
, BufferHead
*>& missing
,
300 map
<loff_t
, BufferHead
*>& rx
,
301 map
<loff_t
, BufferHead
*>& errors
)
303 ceph_assert(ceph_mutex_is_locked(oc
->lock
));
304 ldout(oc
->cct
, 10) << "map_read " << ex
.oid
<< " "
305 << ex
.offset
<< "~" << ex
.length
<< dendl
;
307 loff_t cur
= ex
.offset
;
308 loff_t left
= ex
.length
;
310 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(ex
.offset
);
313 if (p
== data
.end()) {
315 BufferHead
*n
= new BufferHead(this);
322 ldout(oc
->cct
, 20) << "map_read miss+complete+zero " << left
<< " left, " << *n
<< dendl
;
325 ldout(oc
->cct
, 20) << "map_read miss " << left
<< " left, " << *n
<< dendl
;
328 ceph_assert(cur
== (loff_t
)ex
.offset
+ (loff_t
)ex
.length
);
332 if (p
->first
<= cur
) {
333 // have it (or part of it)
334 BufferHead
*e
= p
->second
;
340 hits
[cur
] = e
; // readable!
341 ldout(oc
->cct
, 20) << "map_read hit " << *e
<< dendl
;
342 } else if (e
->is_rx()) {
343 rx
[cur
] = e
; // missing, not readable.
344 ldout(oc
->cct
, 20) << "map_read rx " << *e
<< dendl
;
345 } else if (e
->is_error()) {
347 ldout(oc
->cct
, 20) << "map_read error " << *e
<< dendl
;
352 loff_t lenfromcur
= std::min(e
->end() - cur
, left
);
358 } else if (p
->first
> cur
) {
360 loff_t next
= p
->first
;
361 BufferHead
*n
= new BufferHead(this);
362 loff_t len
= std::min(next
- cur
, left
);
369 ldout(oc
->cct
, 20) << "map_read gap+complete+zero " << *n
<< dendl
;
372 ldout(oc
->cct
, 20) << "map_read gap " << *n
<< dendl
;
374 cur
+= std::min(left
, n
->length());
375 left
-= std::min(left
, n
->length());
384 void ObjectCacher::Object::audit_buffers()
387 for (map
<loff_t
, BufferHead
*>::const_iterator it
= data
.begin();
388 it
!= data
.end(); ++it
) {
389 if (it
->first
!= it
->second
->start()) {
390 lderr(oc
->cct
) << "AUDIT FAILURE: map position " << it
->first
391 << " does not match bh start position: "
392 << *it
->second
<< dendl
;
393 ceph_assert(it
->first
== it
->second
->start());
395 if (it
->first
< offset
) {
396 lderr(oc
->cct
) << "AUDIT FAILURE: " << it
->first
<< " " << *it
->second
397 << " overlaps with previous bh " << *((--it
)->second
)
399 ceph_assert(it
->first
>= offset
);
401 BufferHead
*bh
= it
->second
;
402 map
<loff_t
, list
<Context
*> >::const_iterator w_it
;
403 for (w_it
= bh
->waitfor_read
.begin();
404 w_it
!= bh
->waitfor_read
.end(); ++w_it
) {
405 if (w_it
->first
< bh
->start() ||
406 w_it
->first
>= bh
->start() + bh
->length()) {
407 lderr(oc
->cct
) << "AUDIT FAILURE: waiter at " << w_it
->first
408 << " is not within bh " << *bh
<< dendl
;
409 ceph_assert(w_it
->first
>= bh
->start());
410 ceph_assert(w_it
->first
< bh
->start() + bh
->length());
413 offset
= it
->first
+ it
->second
->length();
418 * map a range of extents on an object's buffer cache.
419 * - combine any bh's we're writing into one
420 * - break up bufferheads that don't fall completely within the range
421 * //no! - return a bh that includes the write. may also include
422 * other dirty data to left and/or right.
424 ObjectCacher::BufferHead
*ObjectCacher::Object::map_write(ObjectExtent
&ex
,
427 ceph_assert(ceph_mutex_is_locked(oc
->lock
));
428 BufferHead
*final
= 0;
430 ldout(oc
->cct
, 10) << "map_write oex " << ex
.oid
431 << " " << ex
.offset
<< "~" << ex
.length
<< dendl
;
433 loff_t cur
= ex
.offset
;
434 loff_t left
= ex
.length
;
436 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(ex
.offset
);
441 if (p
== data
.end()) {
443 final
= new BufferHead(this);
444 replace_journal_tid(final
, tid
);
445 final
->set_start( cur
);
446 final
->set_length( max
);
447 oc
->bh_add(this, final
);
448 ldout(oc
->cct
, 10) << "map_write adding trailing bh " << *final
<< dendl
;
450 oc
->bh_stat_sub(final
);
451 final
->set_length(final
->length() + max
);
452 oc
->bh_stat_add(final
);
459 ldout(oc
->cct
, 10) << "cur is " << cur
<< ", p is " << *p
->second
<< dendl
;
460 //oc->verify_stats();
462 if (p
->first
<= cur
) {
463 BufferHead
*bh
= p
->second
;
464 ldout(oc
->cct
, 10) << "map_write bh " << *bh
<< " intersected" << dendl
;
466 if (p
->first
< cur
) {
467 ceph_assert(final
== 0);
468 if (cur
+ max
>= bh
->end()) {
469 // we want right bit (one splice)
470 final
= split(bh
, cur
); // just split it, take right half.
471 maybe_rebuild_buffer(bh
);
472 replace_journal_tid(final
, tid
);
474 ceph_assert(p
->second
== final
);
476 // we want middle bit (two splices)
477 final
= split(bh
, cur
);
478 maybe_rebuild_buffer(bh
);
480 ceph_assert(p
->second
== final
);
481 auto right
= split(final
, cur
+max
);
482 maybe_rebuild_buffer(right
);
483 replace_journal_tid(final
, tid
);
486 ceph_assert(p
->first
== cur
);
487 if (bh
->length() <= max
) {
488 // whole bufferhead, piece of cake.
490 // we want left bit (one splice)
491 auto right
= split(bh
, cur
+ max
); // just split
492 maybe_rebuild_buffer(right
);
496 oc
->mark_dirty(final
);
497 --p
; // move iterator back to final
498 ceph_assert(p
->second
== final
);
499 replace_journal_tid(bh
, tid
);
500 merge_left(final
, bh
);
503 replace_journal_tid(final
, tid
);
508 loff_t lenfromcur
= final
->end() - cur
;
515 loff_t next
= p
->first
;
516 loff_t glen
= std::min(next
- cur
, max
);
517 ldout(oc
->cct
, 10) << "map_write gap " << cur
<< "~" << glen
<< dendl
;
519 oc
->bh_stat_sub(final
);
520 final
->set_length(final
->length() + glen
);
521 oc
->bh_stat_add(final
);
523 final
= new BufferHead(this);
524 replace_journal_tid(final
, tid
);
525 final
->set_start( cur
);
526 final
->set_length( glen
);
527 oc
->bh_add(this, final
);
538 ceph_assert(final
->get_journal_tid() == tid
);
539 ldout(oc
->cct
, 10) << "map_write final is " << *final
<< dendl
;
544 void ObjectCacher::Object::replace_journal_tid(BufferHead
*bh
,
546 ceph_tid_t bh_tid
= bh
->get_journal_tid();
548 ceph_assert(tid
== 0 || bh_tid
<= tid
);
549 if (bh_tid
!= 0 && bh_tid
!= tid
) {
550 // inform journal that it should not expect a writeback from this extent
551 oc
->writeback_handler
.overwrite_extent(get_oid(), bh
->start(),
552 bh
->length(), bh_tid
, tid
);
554 bh
->set_journal_tid(tid
);
557 void ObjectCacher::Object::truncate(loff_t s
)
559 ceph_assert(ceph_mutex_is_locked(oc
->lock
));
560 ldout(oc
->cct
, 10) << "truncate " << *this << " to " << s
<< dendl
;
562 while (!data
.empty()) {
563 BufferHead
*bh
= data
.rbegin()->second
;
567 // split bh at truncation point?
568 if (bh
->start() < s
) {
570 maybe_rebuild_buffer(bh
);
574 // remove bh entirely
575 ceph_assert(bh
->start() >= s
);
576 ceph_assert(bh
->waitfor_read
.empty());
577 replace_journal_tid(bh
, 0);
578 oc
->bh_remove(this, bh
);
583 void ObjectCacher::Object::discard(loff_t off
, loff_t len
,
584 C_GatherBuilder
* commit_gather
)
586 ceph_assert(ceph_mutex_is_locked(oc
->lock
));
587 ldout(oc
->cct
, 10) << "discard " << *this << " " << off
<< "~" << len
591 ldout(oc
->cct
, 10) << " setting exists on " << *this << dendl
;
595 ldout(oc
->cct
, 10) << " clearing complete on " << *this << dendl
;
599 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(off
);
600 while (p
!= data
.end()) {
601 BufferHead
*bh
= p
->second
;
602 if (bh
->start() >= off
+ len
)
605 // split bh at truncation point?
606 if (bh
->start() < off
) {
608 maybe_rebuild_buffer(bh
);
613 ceph_assert(bh
->start() >= off
);
614 if (bh
->end() > off
+ len
) {
615 auto right
= split(bh
, off
+ len
);
616 maybe_rebuild_buffer(right
);
620 ldout(oc
->cct
, 10) << "discard " << *this << " bh " << *bh
<< dendl
;
621 replace_journal_tid(bh
, 0);
623 if (bh
->is_tx() && commit_gather
!= nullptr) {
624 // wait for the writeback to commit
625 waitfor_commit
[bh
->last_write_tid
].emplace_back(commit_gather
->new_sub());
626 } else if (bh
->is_rx()) {
627 // cannot remove bh with in-flight read, but we can ensure the
628 // read won't overwrite the discard
629 bh
->last_read_tid
= ++oc
->last_read_tid
;
631 bh
->set_nocache(true);
633 // we should mark all Rx bh to zero
636 ceph_assert(bh
->waitfor_read
.empty());
639 oc
->bh_remove(this, bh
);
646 /*** ObjectCacher ***/
649 #define dout_prefix *_dout << "objectcacher "
652 ObjectCacher::ObjectCacher(CephContext
*cct_
, string name
,
653 WritebackHandler
& wb
, ceph::mutex
& l
,
654 flush_set_callback_t flush_callback
,
655 void *flush_callback_arg
, uint64_t max_bytes
,
656 uint64_t max_objects
, uint64_t max_dirty
,
657 uint64_t target_dirty
, double max_dirty_age
,
658 bool block_writes_upfront
)
660 cct(cct_
), writeback_handler(wb
), name(name
), lock(l
),
661 max_dirty(max_dirty
), target_dirty(target_dirty
),
662 max_size(max_bytes
), max_objects(max_objects
),
663 max_dirty_age(ceph::make_timespan(max_dirty_age
)),
664 block_writes_upfront(block_writes_upfront
),
665 trace_endpoint("ObjectCacher"),
666 flush_set_callback(flush_callback
),
667 flush_set_callback_arg(flush_callback_arg
),
668 last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct
),
669 stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
670 stat_missing(0), stat_error(0), stat_dirty_waiting(0),
671 stat_nr_dirty_waiters(0), reads_outstanding(0)
675 scattered_write
= writeback_handler
.can_scattered_write();
678 ObjectCacher::~ObjectCacher()
682 // we should be empty.
683 for (vector
<ceph::unordered_map
<sobject_t
, Object
*> >::iterator i
687 ceph_assert(i
->empty());
688 ceph_assert(bh_lru_rest
.lru_get_size() == 0);
689 ceph_assert(bh_lru_dirty
.lru_get_size() == 0);
690 ceph_assert(ob_lru
.lru_get_size() == 0);
691 ceph_assert(dirty_or_tx_bh
.empty());
694 void ObjectCacher::perf_start()
696 string n
= "objectcacher-" + name
;
697 PerfCountersBuilder
plb(cct
, n
, l_objectcacher_first
, l_objectcacher_last
);
699 plb
.add_u64_counter(l_objectcacher_cache_ops_hit
,
700 "cache_ops_hit", "Hit operations");
701 plb
.add_u64_counter(l_objectcacher_cache_ops_miss
,
702 "cache_ops_miss", "Miss operations");
703 plb
.add_u64_counter(l_objectcacher_cache_bytes_hit
,
704 "cache_bytes_hit", "Hit data", NULL
, 0, unit_t(UNIT_BYTES
));
705 plb
.add_u64_counter(l_objectcacher_cache_bytes_miss
,
706 "cache_bytes_miss", "Miss data", NULL
, 0, unit_t(UNIT_BYTES
));
707 plb
.add_u64_counter(l_objectcacher_data_read
,
708 "data_read", "Read data");
709 plb
.add_u64_counter(l_objectcacher_data_written
,
710 "data_written", "Data written to cache");
711 plb
.add_u64_counter(l_objectcacher_data_flushed
,
712 "data_flushed", "Data flushed");
713 plb
.add_u64_counter(l_objectcacher_overwritten_in_flush
,
714 "data_overwritten_while_flushing",
715 "Data overwritten while flushing");
716 plb
.add_u64_counter(l_objectcacher_write_ops_blocked
, "write_ops_blocked",
717 "Write operations, delayed due to dirty limits");
718 plb
.add_u64_counter(l_objectcacher_write_bytes_blocked
,
719 "write_bytes_blocked",
720 "Write data blocked on dirty limit", NULL
, 0, unit_t(UNIT_BYTES
));
721 plb
.add_time(l_objectcacher_write_time_blocked
, "write_time_blocked",
722 "Time spent blocking a write due to dirty limits");
724 perfcounter
= plb
.create_perf_counters();
725 cct
->get_perfcounters_collection()->add(perfcounter
);
728 void ObjectCacher::perf_stop()
730 ceph_assert(perfcounter
);
731 cct
->get_perfcounters_collection()->remove(perfcounter
);
736 ObjectCacher::Object
*ObjectCacher::get_object(sobject_t oid
,
740 uint64_t truncate_size
,
741 uint64_t truncate_seq
)
743 // XXX: Add handling of nspace in object_locator_t in cache
744 ceph_assert(ceph_mutex_is_locked(lock
));
746 if ((uint32_t)l
.pool
< objects
.size()) {
747 if (objects
[l
.pool
].count(oid
)) {
748 Object
*o
= objects
[l
.pool
][oid
];
749 o
->object_no
= object_no
;
750 o
->truncate_size
= truncate_size
;
751 o
->truncate_seq
= truncate_seq
;
755 objects
.resize(l
.pool
+1);
759 Object
*o
= new Object(this, oid
, object_no
, oset
, l
, truncate_size
,
761 objects
[l
.pool
][oid
] = o
;
762 ob_lru
.lru_insert_top(o
);
766 void ObjectCacher::close_object(Object
*ob
)
768 ceph_assert(ceph_mutex_is_locked(lock
));
769 ldout(cct
, 10) << "close_object " << *ob
<< dendl
;
770 ceph_assert(ob
->can_close());
773 ob_lru
.lru_remove(ob
);
774 objects
[ob
->oloc
.pool
].erase(ob
->get_soid());
775 ob
->set_item
.remove_myself();
779 void ObjectCacher::bh_read(BufferHead
*bh
, int op_flags
,
780 const ZTracer::Trace
&parent_trace
)
782 ceph_assert(ceph_mutex_is_locked(lock
));
783 ldout(cct
, 7) << "bh_read on " << *bh
<< " outstanding reads "
784 << reads_outstanding
<< dendl
;
786 ZTracer::Trace trace
;
787 if (parent_trace
.valid()) {
788 trace
.init("", &trace_endpoint
, &parent_trace
);
789 trace
.copy_name("bh_read " + bh
->ob
->get_oid().name
);
790 trace
.event("start");
794 bh
->last_read_tid
= ++last_read_tid
;
797 C_ReadFinish
*onfinish
= new C_ReadFinish(this, bh
->ob
, bh
->last_read_tid
,
798 bh
->start(), bh
->length(), trace
);
800 writeback_handler
.read(bh
->ob
->get_oid(), bh
->ob
->get_object_number(),
801 bh
->ob
->get_oloc(), bh
->start(), bh
->length(),
802 bh
->ob
->get_snap(), &onfinish
->bl
,
803 bh
->ob
->truncate_size
, bh
->ob
->truncate_seq
,
804 op_flags
, trace
, onfinish
);
809 void ObjectCacher::bh_read_finish(int64_t poolid
, sobject_t oid
,
810 ceph_tid_t tid
, loff_t start
,
811 uint64_t length
, bufferlist
&bl
, int r
,
814 ceph_assert(ceph_mutex_is_locked(lock
));
815 ldout(cct
, 7) << "bh_read_finish "
818 << " " << start
<< "~" << length
819 << " (bl is " << bl
.length() << ")"
821 << " outstanding reads " << reads_outstanding
824 if (r
>= 0 && bl
.length() < length
) {
825 ldout(cct
, 7) << "bh_read_finish " << oid
<< " padding " << start
<< "~"
826 << length
<< " with " << length
- bl
.length() << " bytes of zeroes"
828 bl
.append_zero(length
- bl
.length());
834 if (objects
[poolid
].count(oid
) == 0) {
835 ldout(cct
, 7) << "bh_read_finish no object cache" << dendl
;
837 Object
*ob
= objects
[poolid
][oid
];
839 if (r
== -ENOENT
&& !ob
->complete
) {
840 // wake up *all* rx waiters, or else we risk reordering
841 // identical reads. e.g.
843 // reply to unrelated 3~1 -> !exists
844 // read 1~1 -> immediate ENOENT
845 // reply to first 1~1 -> ooo ENOENT
847 for (map
<loff_t
, BufferHead
*>::iterator p
= ob
->data
.begin();
848 p
!= ob
->data
.end(); ++p
) {
849 BufferHead
*bh
= p
->second
;
850 for (map
<loff_t
, list
<Context
*> >::iterator p
851 = bh
->waitfor_read
.begin();
852 p
!= bh
->waitfor_read
.end();
854 ls
.splice(ls
.end(), p
->second
);
855 bh
->waitfor_read
.clear();
856 if (!bh
->is_zero() && !bh
->is_rx())
860 // just pass through and retry all waiters if we don't trust
861 // -ENOENT for this read
864 << "bh_read_finish ENOENT, marking complete and !exists on " << *ob
869 /* If all the bhs are effectively zero, get rid of them. All
870 * the waiters will be retried and get -ENOENT immediately, so
871 * it's safe to clean up the unneeded bh's now. Since we know
872 * it's safe to remove them now, do so, so they aren't hanging
873 *around waiting for more -ENOENTs from rados while the cache
874 * is being shut down.
876 * Only do this when all the bhs are rx or clean, to match the
877 * condition in _readx(). If there are any non-rx or non-clean
878 * bhs, _readx() will wait for the final result instead of
879 * returning -ENOENT immediately.
883 << "bh_read_finish ENOENT and allzero, getting rid of "
884 << "bhs for " << *ob
<< dendl
;
885 map
<loff_t
, BufferHead
*>::iterator p
= ob
->data
.begin();
886 while (p
!= ob
->data
.end()) {
887 BufferHead
*bh
= p
->second
;
888 // current iterator will be invalidated by bh_remove()
900 map
<loff_t
, BufferHead
*>::const_iterator p
= ob
->data_lower_bound(opos
);
901 if (p
== ob
->data
.end())
903 if (opos
>= start
+(loff_t
)length
) {
904 ldout(cct
, 20) << "break due to opos " << opos
<< " >= start+length "
905 << start
<< "+" << length
<< "=" << start
+(loff_t
)length
910 BufferHead
*bh
= p
->second
;
911 ldout(cct
, 20) << "checking bh " << *bh
<< dendl
;
914 for (map
<loff_t
, list
<Context
*> >::iterator it
915 = bh
->waitfor_read
.begin();
916 it
!= bh
->waitfor_read
.end();
918 ls
.splice(ls
.end(), it
->second
);
919 bh
->waitfor_read
.clear();
921 if (bh
->start() > opos
) {
922 ldout(cct
, 1) << "bh_read_finish skipping gap "
923 << opos
<< "~" << bh
->start() - opos
930 ldout(cct
, 10) << "bh_read_finish skipping non-rx " << *bh
<< dendl
;
935 if (bh
->last_read_tid
!= tid
) {
936 ldout(cct
, 10) << "bh_read_finish bh->last_read_tid "
937 << bh
->last_read_tid
<< " != tid " << tid
938 << ", skipping" << dendl
;
943 ceph_assert(opos
>= bh
->start());
944 ceph_assert(bh
->start() == opos
); // we don't merge rx bh's... yet!
945 ceph_assert(bh
->length() <= start
+(loff_t
)length
-opos
);
954 ldout(cct
, 10) << "bh_read_finish removing " << *bh
<< dendl
;
958 ldout(cct
, 10) << "skipping unstrusted -ENOENT and will retry for "
974 ldout(cct
, 10) << "bh_read_finish read " << *bh
<< dendl
;
976 ob
->try_merge_bh(bh
);
980 // called with lock held.
981 ldout(cct
, 20) << "finishing waiters " << ls
<< dendl
;
983 finish_contexts(cct
, ls
, err
);
984 retry_waiting_reads();
987 read_cond
.notify_all();
990 void ObjectCacher::bh_write_adjacencies(BufferHead
*bh
, ceph::real_time cutoff
,
991 int64_t *max_amount
, int *max_count
)
993 list
<BufferHead
*> blist
;
996 int64_t total_len
= 0;
997 set
<BufferHead
*, BufferHead::ptr_lt
>::iterator it
= dirty_or_tx_bh
.find(bh
);
998 ceph_assert(it
!= dirty_or_tx_bh
.end());
999 for (set
<BufferHead
*, BufferHead::ptr_lt
>::iterator p
= it
;
1000 p
!= dirty_or_tx_bh
.end();
1002 BufferHead
*obh
= *p
;
1003 if (obh
->ob
!= bh
->ob
)
1005 if (obh
->is_dirty() && obh
->last_write
<= cutoff
) {
1006 blist
.push_back(obh
);
1008 total_len
+= obh
->length();
1009 if ((max_count
&& count
> *max_count
) ||
1010 (max_amount
&& total_len
> *max_amount
))
1015 while (it
!= dirty_or_tx_bh
.begin()) {
1017 BufferHead
*obh
= *it
;
1018 if (obh
->ob
!= bh
->ob
)
1020 if (obh
->is_dirty() && obh
->last_write
<= cutoff
) {
1021 blist
.push_front(obh
);
1023 total_len
+= obh
->length();
1024 if ((max_count
&& count
> *max_count
) ||
1025 (max_amount
&& total_len
> *max_amount
))
1030 *max_count
-= count
;
1032 *max_amount
-= total_len
;
1034 bh_write_scattered(blist
);
1037 class ObjectCacher::C_WriteCommit
: public Context
{
1041 vector
<pair
<loff_t
, uint64_t> > ranges
;
1042 ZTracer::Trace trace
;
1045 C_WriteCommit(ObjectCacher
*c
, int64_t _poolid
, sobject_t o
, loff_t s
,
1046 uint64_t l
, const ZTracer::Trace
&trace
) :
1047 oc(c
), poolid(_poolid
), oid(o
), trace(trace
) {
1048 ranges
.push_back(make_pair(s
, l
));
1050 C_WriteCommit(ObjectCacher
*c
, int64_t _poolid
, sobject_t o
,
1051 vector
<pair
<loff_t
, uint64_t> >& _ranges
) :
1052 oc(c
), poolid(_poolid
), oid(o
), tid(0) {
1053 ranges
.swap(_ranges
);
1055 void finish(int r
) override
{
1056 oc
->bh_write_commit(poolid
, oid
, ranges
, tid
, r
);
1057 trace
.event("finish");
1060 void ObjectCacher::bh_write_scattered(list
<BufferHead
*>& blist
)
1062 ceph_assert(ceph_mutex_is_locked(lock
));
1064 Object
*ob
= blist
.front()->ob
;
1067 ceph::real_time last_write
;
1069 vector
<pair
<loff_t
, uint64_t> > ranges
;
1070 vector
<pair
<uint64_t, bufferlist
> > io_vec
;
1072 ranges
.reserve(blist
.size());
1073 io_vec
.reserve(blist
.size());
1075 uint64_t total_len
= 0;
1076 for (list
<BufferHead
*>::iterator p
= blist
.begin(); p
!= blist
.end(); ++p
) {
1077 BufferHead
*bh
= *p
;
1078 ldout(cct
, 7) << "bh_write_scattered " << *bh
<< dendl
;
1079 ceph_assert(bh
->ob
== ob
);
1080 ceph_assert(bh
->bl
.length() == bh
->length());
1081 ranges
.push_back(pair
<loff_t
, uint64_t>(bh
->start(), bh
->length()));
1083 int n
= io_vec
.size();
1084 io_vec
.resize(n
+ 1);
1085 io_vec
[n
].first
= bh
->start();
1086 io_vec
[n
].second
= bh
->bl
;
1088 total_len
+= bh
->length();
1089 if (bh
->snapc
.seq
> snapc
.seq
)
1091 if (bh
->last_write
> last_write
)
1092 last_write
= bh
->last_write
;
1095 C_WriteCommit
*oncommit
= new C_WriteCommit(this, ob
->oloc
.pool
, ob
->get_soid(), ranges
);
1097 ceph_tid_t tid
= writeback_handler
.write(ob
->get_oid(), ob
->get_oloc(),
1098 io_vec
, snapc
, last_write
,
1099 ob
->truncate_size
, ob
->truncate_seq
,
1101 oncommit
->tid
= tid
;
1102 ob
->last_write_tid
= tid
;
1103 for (list
<BufferHead
*>::iterator p
= blist
.begin(); p
!= blist
.end(); ++p
) {
1104 BufferHead
*bh
= *p
;
1105 bh
->last_write_tid
= tid
;
1110 perfcounter
->inc(l_objectcacher_data_flushed
, total_len
);
1113 void ObjectCacher::bh_write(BufferHead
*bh
, const ZTracer::Trace
&parent_trace
)
1115 ceph_assert(ceph_mutex_is_locked(lock
));
1116 ldout(cct
, 7) << "bh_write " << *bh
<< dendl
;
1120 ZTracer::Trace trace
;
1121 if (parent_trace
.valid()) {
1122 trace
.init("", &trace_endpoint
, &parent_trace
);
1123 trace
.copy_name("bh_write " + bh
->ob
->get_oid().name
);
1124 trace
.event("start");
1128 C_WriteCommit
*oncommit
= new C_WriteCommit(this, bh
->ob
->oloc
.pool
,
1129 bh
->ob
->get_soid(), bh
->start(),
1130 bh
->length(), trace
);
1132 ceph_tid_t tid
= writeback_handler
.write(bh
->ob
->get_oid(),
1134 bh
->start(), bh
->length(),
1135 bh
->snapc
, bh
->bl
, bh
->last_write
,
1136 bh
->ob
->truncate_size
,
1137 bh
->ob
->truncate_seq
,
1138 bh
->journal_tid
, trace
, oncommit
);
1139 ldout(cct
, 20) << " tid " << tid
<< " on " << bh
->ob
->get_oid() << dendl
;
1141 // set bh last_write_tid
1142 oncommit
->tid
= tid
;
1143 bh
->ob
->last_write_tid
= tid
;
1144 bh
->last_write_tid
= tid
;
1147 perfcounter
->inc(l_objectcacher_data_flushed
, bh
->length());
1153 void ObjectCacher::bh_write_commit(int64_t poolid
, sobject_t oid
,
1154 vector
<pair
<loff_t
, uint64_t> >& ranges
,
1155 ceph_tid_t tid
, int r
)
1157 ceph_assert(ceph_mutex_is_locked(lock
));
1158 ldout(cct
, 7) << "bh_write_commit " << oid
<< " tid " << tid
1159 << " ranges " << ranges
<< " returned " << r
<< dendl
;
1161 if (objects
[poolid
].count(oid
) == 0) {
1162 ldout(cct
, 7) << "bh_write_commit no object cache" << dendl
;
1166 Object
*ob
= objects
[poolid
][oid
];
1167 int was_dirty_or_tx
= ob
->oset
->dirty_or_tx
;
1169 for (vector
<pair
<loff_t
, uint64_t> >::iterator p
= ranges
.begin();
1172 loff_t start
= p
->first
;
1173 uint64_t length
= p
->second
;
1175 ldout(cct
, 10) << "bh_write_commit marking exists on " << *ob
<< dendl
;
1178 if (writeback_handler
.may_copy_on_write(ob
->get_oid(), start
, length
,
1180 ldout(cct
, 10) << "bh_write_commit may copy on write, clearing "
1181 "complete on " << *ob
<< dendl
;
1182 ob
->complete
= false;
1186 vector
<pair
<loff_t
, BufferHead
*>> hit
;
1188 for (map
<loff_t
, BufferHead
*>::const_iterator p
= ob
->data_lower_bound(start
);
1189 p
!= ob
->data
.end();
1191 BufferHead
*bh
= p
->second
;
1193 if (bh
->start() >= start
+(loff_t
)length
)
1196 // make sure bh is tx
1198 ldout(cct
, 10) << "bh_write_commit skipping non-tx " << *bh
<< dendl
;
1202 // make sure bh tid matches
1203 if (bh
->last_write_tid
!= tid
) {
1204 ceph_assert(bh
->last_write_tid
> tid
);
1205 ldout(cct
, 10) << "bh_write_commit newer tid on " << *bh
<< dendl
;
1209 // we don't merge tx buffers. tx buffer should be within the range
1210 ceph_assert(bh
->start() >= start
);
1211 ceph_assert(bh
->end() <= start
+(loff_t
)length
);
1214 // ok! mark bh clean and error-free
1216 bh
->set_journal_tid(0);
1217 if (bh
->get_nocache())
1218 bh_lru_rest
.lru_bottouch(bh
);
1219 hit
.push_back(make_pair(bh
->start(), bh
));
1220 ldout(cct
, 10) << "bh_write_commit clean " << *bh
<< dendl
;
1223 ldout(cct
, 10) << "bh_write_commit marking dirty again due to error "
1224 << *bh
<< " r = " << r
<< " " << cpp_strerror(-r
)
1229 for (auto& p
: hit
) {
1230 //p.second maybe merged and deleted in merge_left
1231 if (ob
->data
.count(p
.first
))
1232 ob
->try_merge_bh(p
.second
);
1236 // update last_commit.
1237 ceph_assert(ob
->last_commit_tid
< tid
);
1238 ob
->last_commit_tid
= tid
;
1242 if (ob
->waitfor_commit
.count(tid
)) {
1243 ls
.splice(ls
.begin(), ob
->waitfor_commit
[tid
]);
1244 ob
->waitfor_commit
.erase(tid
);
1247 // is the entire object set now clean and fully committed?
1248 ObjectSet
*oset
= ob
->oset
;
1251 if (flush_set_callback
&&
1252 was_dirty_or_tx
> 0 &&
1253 oset
->dirty_or_tx
== 0) { // nothing dirty/tx
1254 flush_set_callback(flush_set_callback_arg
, oset
);
1258 finish_contexts(cct
, ls
, r
);
1261 void ObjectCacher::flush(ZTracer::Trace
*trace
, loff_t amount
)
1263 ceph_assert(trace
!= nullptr);
1264 ceph_assert(ceph_mutex_is_locked(lock
));
1265 ceph::real_time cutoff
= ceph::real_clock::now();
1267 ldout(cct
, 10) << "flush " << amount
<< dendl
;
1270 * NOTE: we aren't actually pulling things off the LRU here, just
1271 * looking at the tail item. Then we call bh_write, which moves it
1272 * to the other LRU, so that we can call
1273 * lru_dirty.lru_get_next_expire() again.
1275 int64_t left
= amount
;
1276 while (amount
== 0 || left
> 0) {
1277 BufferHead
*bh
= static_cast<BufferHead
*>(
1278 bh_lru_dirty
.lru_get_next_expire());
1280 if (bh
->last_write
> cutoff
) break;
1282 if (scattered_write
) {
1283 bh_write_adjacencies(bh
, cutoff
, amount
> 0 ? &left
: NULL
, NULL
);
1285 left
-= bh
->length();
1286 bh_write(bh
, *trace
);
1292 void ObjectCacher::trim()
1294 ceph_assert(ceph_mutex_is_locked(lock
));
1295 ldout(cct
, 10) << "trim start: bytes: max " << max_size
<< " clean "
1296 << get_stat_clean() << ", objects: max " << max_objects
1297 << " current " << ob_lru
.lru_get_size() << dendl
;
1299 uint64_t max_clean_bh
= max_size
>> BUFFER_MEMORY_WEIGHT
;
1300 uint64_t nr_clean_bh
= bh_lru_rest
.lru_get_size() - bh_lru_rest
.lru_get_num_pinned();
1301 while (get_stat_clean() > 0 &&
1302 ((uint64_t)get_stat_clean() > max_size
||
1303 nr_clean_bh
> max_clean_bh
)) {
1304 BufferHead
*bh
= static_cast<BufferHead
*>(bh_lru_rest
.lru_expire());
1308 ldout(cct
, 10) << "trim trimming " << *bh
<< dendl
;
1309 ceph_assert(bh
->is_clean() || bh
->is_zero() || bh
->is_error());
1311 Object
*ob
= bh
->ob
;
1318 ldout(cct
, 10) << "trim clearing complete on " << *ob
<< dendl
;
1319 ob
->complete
= false;
1323 while (ob_lru
.lru_get_size() > max_objects
) {
1324 Object
*ob
= static_cast<Object
*>(ob_lru
.lru_expire());
1328 ldout(cct
, 10) << "trim trimming " << *ob
<< dendl
;
1332 ldout(cct
, 10) << "trim finish: max " << max_size
<< " clean "
1333 << get_stat_clean() << ", objects: max " << max_objects
1334 << " current " << ob_lru
.lru_get_size() << dendl
;
1341 bool ObjectCacher::is_cached(ObjectSet
*oset
, vector
<ObjectExtent
>& extents
,
1344 ceph_assert(ceph_mutex_is_locked(lock
));
1345 for (vector
<ObjectExtent
>::iterator ex_it
= extents
.begin();
1346 ex_it
!= extents
.end();
1348 ldout(cct
, 10) << "is_cached " << *ex_it
<< dendl
;
1351 sobject_t
soid(ex_it
->oid
, snapid
);
1352 Object
*o
= get_object_maybe(soid
, ex_it
->oloc
);
1355 if (!o
->is_cached(ex_it
->offset
, ex_it
->length
))
1363 * returns # bytes read (if in cache). onfinish is untouched (caller
1365 * returns 0 if doing async read
1367 int ObjectCacher::readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
,
1368 ZTracer::Trace
*parent_trace
)
1370 ZTracer::Trace trace
;
1371 if (parent_trace
!= nullptr) {
1372 trace
.init("read", &trace_endpoint
, parent_trace
);
1373 trace
.event("start");
1376 int r
=_readx(rd
, oset
, onfinish
, true, &trace
);
1378 trace
.event("finish");
1383 int ObjectCacher::_readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
,
1384 bool external_call
, ZTracer::Trace
*trace
)
1386 ceph_assert(trace
!= nullptr);
1387 ceph_assert(ceph_mutex_is_locked(lock
));
1388 bool success
= true;
1390 uint64_t bytes_in_cache
= 0;
1391 uint64_t bytes_not_in_cache
= 0;
1392 uint64_t total_bytes_read
= 0;
1393 map
<uint64_t, bufferlist
> stripe_map
; // final buffer offset -> substring
1394 bool dontneed
= rd
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_DONTNEED
;
1395 bool nocache
= rd
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_NOCACHE
;
1398 * WARNING: we can only meaningfully return ENOENT if the read request
1399 * passed in a single ObjectExtent. Any caller who wants ENOENT instead of
1400 * zeroed buffers needs to feed single extents into readx().
1402 ceph_assert(!oset
->return_enoent
|| rd
->extents
.size() == 1);
1404 for (vector
<ObjectExtent
>::iterator ex_it
= rd
->extents
.begin();
1405 ex_it
!= rd
->extents
.end();
1407 ldout(cct
, 10) << "readx " << *ex_it
<< dendl
;
1409 total_bytes_read
+= ex_it
->length
;
1412 sobject_t
soid(ex_it
->oid
, rd
->snap
);
1413 Object
*o
= get_object(soid
, ex_it
->objectno
, oset
, ex_it
->oloc
,
1414 ex_it
->truncate_size
, oset
->truncate_seq
);
1418 // does not exist and no hits?
1419 if (oset
->return_enoent
&& !o
->exists
) {
1420 ldout(cct
, 10) << "readx object !exists, 1 extent..." << dendl
;
1422 // should we worry about COW underneath us?
1423 if (writeback_handler
.may_copy_on_write(soid
.oid
, ex_it
->offset
,
1424 ex_it
->length
, soid
.snap
)) {
1425 ldout(cct
, 20) << "readx may copy on write" << dendl
;
1427 list
<BufferHead
*> blist
;
1428 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= o
->data
.begin();
1429 bh_it
!= o
->data
.end();
1431 BufferHead
*bh
= bh_it
->second
;
1432 if (bh
->is_dirty() || bh
->is_tx()) {
1433 ldout(cct
, 10) << "readx flushing " << *bh
<< dendl
;
1435 if (bh
->is_dirty()) {
1436 if (scattered_write
)
1437 blist
.push_back(bh
);
1439 bh_write(bh
, *trace
);
1443 if (scattered_write
&& !blist
.empty())
1444 bh_write_scattered(blist
);
1446 ldout(cct
, 10) << "readx waiting on tid " << o
->last_write_tid
1447 << " on " << *o
<< dendl
;
1448 o
->waitfor_commit
[o
->last_write_tid
].push_back(
1449 new C_RetryRead(this,rd
, oset
, onfinish
, *trace
));
1450 // FIXME: perfcounter!
1455 // can we return ENOENT?
1456 bool allzero
= true;
1457 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= o
->data
.begin();
1458 bh_it
!= o
->data
.end();
1460 ldout(cct
, 20) << "readx ob has bh " << *bh_it
->second
<< dendl
;
1461 if (!bh_it
->second
->is_zero() && !bh_it
->second
->is_rx()) {
1467 ldout(cct
, 10) << "readx ob has all zero|rx, returning ENOENT"
1476 // map extent into bufferheads
1477 map
<loff_t
, BufferHead
*> hits
, missing
, rx
, errors
;
1478 o
->map_read(*ex_it
, hits
, missing
, rx
, errors
);
1479 if (external_call
) {
1480 // retry reading error buffers
1481 missing
.insert(errors
.begin(), errors
.end());
1483 // some reads had errors, fail later so completions
1484 // are cleaned up properly
1485 // TODO: make read path not call _readx for every completion
1486 hits
.insert(errors
.begin(), errors
.end());
1489 if (!missing
.empty() || !rx
.empty()) {
1491 map
<loff_t
, BufferHead
*>::iterator last
= missing
.end();
1492 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= missing
.begin();
1493 bh_it
!= missing
.end();
1495 uint64_t rx_bytes
= static_cast<uint64_t>(
1496 stat_rx
+ bh_it
->second
->length());
1497 bytes_not_in_cache
+= bh_it
->second
->length();
1498 if (!waitfor_read
.empty() || (stat_rx
> 0 && rx_bytes
> max_size
)) {
1499 // cache is full with concurrent reads -- wait for rx's to complete
1500 // to constrain memory growth (especially during copy-ups)
1502 ldout(cct
, 10) << "readx missed, waiting on cache to complete "
1503 << waitfor_read
.size() << " blocked reads, "
1504 << (std::max(rx_bytes
, max_size
) - max_size
)
1505 << " read bytes" << dendl
;
1506 waitfor_read
.push_back(new C_RetryRead(this, rd
, oset
, onfinish
,
1510 bh_remove(o
, bh_it
->second
);
1511 delete bh_it
->second
;
1513 bh_it
->second
->set_nocache(nocache
);
1514 bh_read(bh_it
->second
, rd
->fadvise_flags
, *trace
);
1515 if ((success
&& onfinish
) || last
!= missing
.end())
1521 //add wait in last bh avoid wakeup early. Because read is order
1522 if (last
!= missing
.end()) {
1523 ldout(cct
, 10) << "readx missed, waiting on " << *last
->second
1524 << " off " << last
->first
<< dendl
;
1525 last
->second
->waitfor_read
[last
->first
].push_back(
1526 new C_RetryRead(this, rd
, oset
, onfinish
, *trace
) );
1531 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= rx
.begin();
1534 touch_bh(bh_it
->second
); // bump in lru, so we don't lose it.
1535 if (success
&& onfinish
) {
1536 ldout(cct
, 10) << "readx missed, waiting on " << *bh_it
->second
1537 << " off " << bh_it
->first
<< dendl
;
1538 bh_it
->second
->waitfor_read
[bh_it
->first
].push_back(
1539 new C_RetryRead(this, rd
, oset
, onfinish
, *trace
) );
1541 bytes_not_in_cache
+= bh_it
->second
->length();
1545 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= hits
.begin();
1546 bh_it
!= hits
.end(); ++bh_it
)
1547 //bump in lru, so we don't lose it when later read
1548 touch_bh(bh_it
->second
);
1551 ceph_assert(!hits
.empty());
1553 // make a plain list
1554 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= hits
.begin();
1555 bh_it
!= hits
.end();
1557 BufferHead
*bh
= bh_it
->second
;
1558 ldout(cct
, 10) << "readx hit bh " << *bh
<< dendl
;
1559 if (bh
->is_error() && bh
->error
)
1561 bytes_in_cache
+= bh
->length();
1563 if (bh
->get_nocache() && bh
->is_clean())
1564 bh_lru_rest
.lru_bottouch(bh
);
1567 //must be after touch_bh because touch_bh set dontneed false
1569 ((loff_t
)ex_it
->offset
<= bh
->start() &&
1570 (bh
->end() <=(loff_t
)(ex_it
->offset
+ ex_it
->length
)))) {
1571 bh
->set_dontneed(true); //if dirty
1573 bh_lru_rest
.lru_bottouch(bh
);
1578 // create reverse map of buffer offset -> object for the
1579 // eventual result. this is over a single ObjectExtent, so we
1581 // - the bh's are contiguous
1582 // - the buffer frags need not be (and almost certainly aren't)
1583 loff_t opos
= ex_it
->offset
;
1584 map
<loff_t
, BufferHead
*>::iterator bh_it
= hits
.begin();
1585 ceph_assert(bh_it
->second
->start() <= opos
);
1586 uint64_t bhoff
= opos
- bh_it
->second
->start();
1587 vector
<pair
<uint64_t,uint64_t> >::iterator f_it
1588 = ex_it
->buffer_extents
.begin();
1591 BufferHead
*bh
= bh_it
->second
;
1592 ceph_assert(opos
== (loff_t
)(bh
->start() + bhoff
));
1594 uint64_t len
= std::min(f_it
->second
- foff
, bh
->length() - bhoff
);
1595 ldout(cct
, 10) << "readx rmap opos " << opos
<< ": " << *bh
<< " +"
1596 << bhoff
<< " frag " << f_it
->first
<< "~"
1597 << f_it
->second
<< " +" << foff
<< "~" << len
1601 // put substr here first, since substr_of clobbers, and we
1602 // may get multiple bh's at this stripe_map position
1603 if (bh
->is_zero()) {
1604 stripe_map
[f_it
->first
].append_zero(len
);
1606 bit
.substr_of(bh
->bl
,
1609 stripe_map
[f_it
->first
].claim_append(bit
);
1615 if (opos
== bh
->end()) {
1619 if (foff
== f_it
->second
) {
1623 if (bh_it
== hits
.end()) break;
1624 if (f_it
== ex_it
->buffer_extents
.end())
1627 ceph_assert(f_it
== ex_it
->buffer_extents
.end());
1628 ceph_assert(opos
== (loff_t
)ex_it
->offset
+ (loff_t
)ex_it
->length
);
1631 if (dontneed
&& o
->include_all_cached_data(ex_it
->offset
, ex_it
->length
))
1637 if (perfcounter
&& external_call
) {
1638 perfcounter
->inc(l_objectcacher_data_read
, total_bytes_read
);
1639 perfcounter
->inc(l_objectcacher_cache_bytes_miss
, bytes_not_in_cache
);
1640 perfcounter
->inc(l_objectcacher_cache_ops_miss
);
1643 ldout(cct
, 20) << "readx defer " << rd
<< dendl
;
1645 ldout(cct
, 20) << "readx drop " << rd
<< " (no complete, but no waiter)"
1651 if (perfcounter
&& external_call
) {
1652 perfcounter
->inc(l_objectcacher_data_read
, total_bytes_read
);
1653 perfcounter
->inc(l_objectcacher_cache_bytes_hit
, bytes_in_cache
);
1654 perfcounter
->inc(l_objectcacher_cache_ops_hit
);
1657 // no misses... success! do the read.
1658 ldout(cct
, 10) << "readx has all buffers" << dendl
;
1660 // ok, assemble into result buffer.
1662 if (rd
->bl
&& !error
) {
1664 for (map
<uint64_t,bufferlist
>::iterator i
= stripe_map
.begin();
1665 i
!= stripe_map
.end();
1667 ceph_assert(pos
== i
->first
);
1668 ldout(cct
, 10) << "readx adding buffer len " << i
->second
.length()
1669 << " at " << pos
<< dendl
;
1670 pos
+= i
->second
.length();
1671 rd
->bl
->claim_append(i
->second
);
1672 ceph_assert(rd
->bl
->length() == pos
);
1674 ldout(cct
, 10) << "readx result is " << rd
->bl
->length() << dendl
;
1675 } else if (!error
) {
1676 ldout(cct
, 10) << "readx no bufferlist ptr (readahead?), done." << dendl
;
1677 map
<uint64_t,bufferlist
>::reverse_iterator i
= stripe_map
.rbegin();
1678 pos
= i
->first
+ i
->second
.length();
1682 int ret
= error
? error
: pos
;
1683 ldout(cct
, 20) << "readx done " << rd
<< " " << ret
<< dendl
;
1684 ceph_assert(pos
<= (uint64_t) INT_MAX
);
1693 void ObjectCacher::retry_waiting_reads()
1696 ls
.swap(waitfor_read
);
1698 while (!ls
.empty() && waitfor_read
.empty()) {
1699 Context
*ctx
= ls
.front();
1703 waitfor_read
.splice(waitfor_read
.end(), ls
);
1706 int ObjectCacher::writex(OSDWrite
*wr
, ObjectSet
*oset
, Context
*onfreespace
,
1707 ZTracer::Trace
*parent_trace
)
1709 ceph_assert(ceph_mutex_is_locked(lock
));
1710 ceph::real_time now
= ceph::real_clock::now();
1711 uint64_t bytes_written
= 0;
1712 uint64_t bytes_written_in_flush
= 0;
1713 bool dontneed
= wr
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_DONTNEED
;
1714 bool nocache
= wr
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_NOCACHE
;
1716 ZTracer::Trace trace
;
1717 if (parent_trace
!= nullptr) {
1718 trace
.init("write", &trace_endpoint
, parent_trace
);
1719 trace
.event("start");
1722 list
<Context
*> wait_for_reads
;
1723 for (vector
<ObjectExtent
>::iterator ex_it
= wr
->extents
.begin();
1724 ex_it
!= wr
->extents
.end();
1727 sobject_t
soid(ex_it
->oid
, CEPH_NOSNAP
);
1728 Object
*o
= get_object(soid
, ex_it
->objectno
, oset
, ex_it
->oloc
,
1729 ex_it
->truncate_size
, oset
->truncate_seq
);
1731 // map it all into a single bufferhead.
1732 BufferHead
*bh
= o
->map_write(*ex_it
, wr
->journal_tid
);
1733 bool missing
= bh
->is_missing();
1734 bh
->snapc
= wr
->snapc
;
1736 // readers that need to be woken up due to an overwrite
1737 for (auto& [_
, wait_for_read
] : bh
->waitfor_read
) {
1738 wait_for_reads
.splice(wait_for_reads
.end(), wait_for_read
);
1740 bh
->waitfor_read
.clear();
1742 bytes_written
+= ex_it
->length
;
1744 bytes_written_in_flush
+= ex_it
->length
;
1747 // adjust buffer pointers (ie "copy" data into my cache)
1748 // this is over a single ObjectExtent, so we know that
1749 // - there is one contiguous bh
1750 // - the buffer frags need not be (and almost certainly aren't)
1751 // note: i assume striping is monotonic... no jumps backwards, ever!
1752 loff_t opos
= ex_it
->offset
;
1753 for (vector
<pair
<uint64_t, uint64_t> >::iterator f_it
1754 = ex_it
->buffer_extents
.begin();
1755 f_it
!= ex_it
->buffer_extents
.end();
1757 ldout(cct
, 10) << "writex writing " << f_it
->first
<< "~"
1758 << f_it
->second
<< " into " << *bh
<< " at " << opos
1760 uint64_t bhoff
= opos
- bh
->start();
1761 ceph_assert(f_it
->second
<= bh
->length() - bhoff
);
1763 // get the frag we're mapping in
1765 frag
.substr_of(wr
->bl
, f_it
->first
, f_it
->second
);
1767 // keep anything left of bhoff
1771 bh
->bl
.claim_append(frag
);
1773 opos
+= f_it
->second
;
1776 // ok, now bh is dirty.
1779 bh
->set_dontneed(true);
1780 else if (nocache
&& missing
)
1781 bh
->set_nocache(true);
1785 bh
->last_write
= now
;
1787 o
->try_merge_bh(bh
);
1791 perfcounter
->inc(l_objectcacher_data_written
, bytes_written
);
1792 if (bytes_written_in_flush
) {
1793 perfcounter
->inc(l_objectcacher_overwritten_in_flush
,
1794 bytes_written_in_flush
);
1798 int r
= _wait_for_write(wr
, bytes_written
, oset
, &trace
, onfreespace
);
1801 finish_contexts(cct
, wait_for_reads
, 0);
1808 class ObjectCacher::C_WaitForWrite
: public Context
{
1810 C_WaitForWrite(ObjectCacher
*oc
, uint64_t len
,
1811 const ZTracer::Trace
&trace
, Context
*onfinish
) :
1812 m_oc(oc
), m_len(len
), m_trace(trace
), m_onfinish(onfinish
) {}
1813 void finish(int r
) override
;
1817 ZTracer::Trace m_trace
;
1818 Context
*m_onfinish
;
1821 void ObjectCacher::C_WaitForWrite::finish(int r
)
1823 std::lock_guard
l(m_oc
->lock
);
1824 m_oc
->_maybe_wait_for_writeback(m_len
, &m_trace
);
1825 m_onfinish
->complete(r
);
1828 void ObjectCacher::_maybe_wait_for_writeback(uint64_t len
,
1829 ZTracer::Trace
*trace
)
1831 ceph_assert(ceph_mutex_is_locked(lock
));
1832 ceph::mono_time start
= ceph::mono_clock::now();
1834 // wait for writeback?
1835 // - wait for dirty and tx bytes (relative to the max_dirty threshold)
1836 // - do not wait for bytes other waiters are waiting on. this means that
1837 // threads do not wait for each other. this effectively allows the cache
1838 // size to balloon proportional to the data that is in flight.
1840 uint64_t max_dirty_bh
= max_dirty
>> BUFFER_MEMORY_WEIGHT
;
1841 while (get_stat_dirty() + get_stat_tx() > 0 &&
1842 (((uint64_t)(get_stat_dirty() + get_stat_tx()) >=
1843 max_dirty
+ get_stat_dirty_waiting()) ||
1844 (dirty_or_tx_bh
.size() >=
1845 max_dirty_bh
+ get_stat_nr_dirty_waiters()))) {
1848 trace
->event("start wait for writeback");
1850 ldout(cct
, 10) << __func__
<< " waiting for dirty|tx "
1851 << (get_stat_dirty() + get_stat_tx()) << " >= max "
1852 << max_dirty
<< " + dirty_waiting "
1853 << get_stat_dirty_waiting() << dendl
;
1854 flusher_cond
.notify_all();
1855 stat_dirty_waiting
+= len
;
1856 ++stat_nr_dirty_waiters
;
1857 std::unique_lock l
{lock
, std::adopt_lock
};
1860 stat_dirty_waiting
-= len
;
1861 --stat_nr_dirty_waiters
;
1863 ldout(cct
, 10) << __func__
<< " woke up" << dendl
;
1866 trace
->event("finish wait for writeback");
1868 if (blocked
&& perfcounter
) {
1869 perfcounter
->inc(l_objectcacher_write_ops_blocked
);
1870 perfcounter
->inc(l_objectcacher_write_bytes_blocked
, len
);
1871 ceph::timespan blocked
= ceph::mono_clock::now() - start
;
1872 perfcounter
->tinc(l_objectcacher_write_time_blocked
, blocked
);
1876 // blocking wait for write.
1877 int ObjectCacher::_wait_for_write(OSDWrite
*wr
, uint64_t len
, ObjectSet
*oset
,
1878 ZTracer::Trace
*trace
, Context
*onfreespace
)
1880 ceph_assert(ceph_mutex_is_locked(lock
));
1881 ceph_assert(trace
!= nullptr);
1884 if (max_dirty
> 0 && !(wr
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_FUA
)) {
1885 if (block_writes_upfront
) {
1886 _maybe_wait_for_writeback(len
, trace
);
1888 onfreespace
->complete(0);
1890 ceph_assert(onfreespace
);
1891 finisher
.queue(new C_WaitForWrite(this, len
, *trace
, onfreespace
));
1894 // write-thru! flush what we just wrote.
1895 ceph::condition_variable cond
;
1897 Context
*fin
= block_writes_upfront
?
1898 new C_Cond(cond
, &done
, &ret
) : onfreespace
;
1900 bool flushed
= flush_set(oset
, wr
->extents
, trace
, fin
);
1901 ceph_assert(!flushed
); // we just dirtied it, and didn't drop our lock!
1902 ldout(cct
, 10) << "wait_for_write waiting on write-thru of " << len
1903 << " bytes" << dendl
;
1904 if (block_writes_upfront
) {
1905 std::unique_lock l
{lock
, std::adopt_lock
};
1906 cond
.wait(l
, [&done
] { return done
; });
1908 ldout(cct
, 10) << "wait_for_write woke up, ret " << ret
<< dendl
;
1910 onfreespace
->complete(ret
);
1914 // start writeback anyway?
1915 if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty
) {
1916 ldout(cct
, 10) << "wait_for_write " << get_stat_dirty() << " > target "
1917 << target_dirty
<< ", nudging flusher" << dendl
;
1918 flusher_cond
.notify_all();
1923 void ObjectCacher::flusher_entry()
1925 ldout(cct
, 10) << "flusher start" << dendl
;
1926 std::unique_lock l
{lock
};
1927 while (!flusher_stop
) {
1928 loff_t all
= get_stat_tx() + get_stat_rx() + get_stat_clean() +
1930 ldout(cct
, 11) << "flusher "
1931 << all
<< " / " << max_size
<< ": "
1932 << get_stat_tx() << " tx, "
1933 << get_stat_rx() << " rx, "
1934 << get_stat_clean() << " clean, "
1935 << get_stat_dirty() << " dirty ("
1936 << target_dirty
<< " target, "
1937 << max_dirty
<< " max)"
1939 loff_t actual
= get_stat_dirty() + get_stat_dirty_waiting();
1941 ZTracer::Trace trace
;
1942 if (cct
->_conf
->osdc_blkin_trace_all
) {
1943 trace
.init("flusher", &trace_endpoint
);
1944 trace
.event("start");
1947 if (actual
> 0 && (uint64_t) actual
> target_dirty
) {
1948 // flush some dirty pages
1949 ldout(cct
, 10) << "flusher " << get_stat_dirty() << " dirty + "
1950 << get_stat_dirty_waiting() << " dirty_waiting > target "
1951 << target_dirty
<< ", flushing some dirty bhs" << dendl
;
1952 flush(&trace
, actual
- target_dirty
);
1954 // check tail of lru for old dirty items
1955 ceph::real_time cutoff
= ceph::real_clock::now();
1956 cutoff
-= max_dirty_age
;
1958 int max
= MAX_FLUSH_UNDER_LOCK
;
1959 while ((bh
= static_cast<BufferHead
*>(bh_lru_dirty
.
1960 lru_get_next_expire())) != 0 &&
1961 bh
->last_write
<= cutoff
&&
1963 ldout(cct
, 10) << "flusher flushing aged dirty bh " << *bh
<< dendl
;
1964 if (scattered_write
) {
1965 bh_write_adjacencies(bh
, cutoff
, NULL
, &max
);
1967 bh_write(bh
, trace
);
1972 // back off the lock to avoid starving other threads
1973 trace
.event("backoff");
1980 trace
.event("finish");
1984 flusher_cond
.wait_for(l
, 1s
);
1987 /* Wait for reads to finish. This is only possible if handling
1988 * -ENOENT made some read completions finish before their rados read
1989 * came back. If we don't wait for them, and destroy the cache, when
1990 * the rados reads do come back their callback will try to access the
1991 * no-longer-valid ObjectCacher.
1993 read_cond
.wait(l
, [this] {
1994 if (reads_outstanding
> 0) {
1995 ldout(cct
, 10) << "Waiting for all reads to complete. Number left: "
1996 << reads_outstanding
<< dendl
;
2002 ldout(cct
, 10) << "flusher finish" << dendl
;
2006 // -------------------------------------------------
2008 bool ObjectCacher::set_is_empty(ObjectSet
*oset
)
2010 ceph_assert(ceph_mutex_is_locked(lock
));
2011 if (oset
->objects
.empty())
2014 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin(); !p
.end(); ++p
)
2015 if (!(*p
)->is_empty())
2021 bool ObjectCacher::set_is_cached(ObjectSet
*oset
)
2023 ceph_assert(ceph_mutex_is_locked(lock
));
2024 if (oset
->objects
.empty())
2027 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin();
2030 for (map
<loff_t
,BufferHead
*>::iterator q
= ob
->data
.begin();
2031 q
!= ob
->data
.end();
2033 BufferHead
*bh
= q
->second
;
2034 if (!bh
->is_dirty() && !bh
->is_tx())
2042 bool ObjectCacher::set_is_dirty_or_committing(ObjectSet
*oset
)
2044 ceph_assert(ceph_mutex_is_locked(lock
));
2045 if (oset
->objects
.empty())
2048 for (xlist
<Object
*>::iterator i
= oset
->objects
.begin();
2052 for (map
<loff_t
,BufferHead
*>::iterator p
= ob
->data
.begin();
2053 p
!= ob
->data
.end();
2055 BufferHead
*bh
= p
->second
;
2056 if (bh
->is_dirty() || bh
->is_tx())
2065 // purge. non-blocking. violently removes dirty buffers from cache.
2066 void ObjectCacher::purge(Object
*ob
)
2068 ceph_assert(ceph_mutex_is_locked(lock
));
2069 ldout(cct
, 10) << "purge " << *ob
<< dendl
;
2075 // flush. non-blocking. no callback.
2076 // true if clean, already flushed.
2077 // false if we wrote something.
2078 // be sloppy about the ranges and flush any buffer it touches
2079 bool ObjectCacher::flush(Object
*ob
, loff_t offset
, loff_t length
,
2080 ZTracer::Trace
*trace
)
2082 ceph_assert(trace
!= nullptr);
2083 ceph_assert(ceph_mutex_is_locked(lock
));
2084 list
<BufferHead
*> blist
;
2086 ldout(cct
, 10) << "flush " << *ob
<< " " << offset
<< "~" << length
<< dendl
;
2087 for (map
<loff_t
,BufferHead
*>::const_iterator p
= ob
->data_lower_bound(offset
);
2088 p
!= ob
->data
.end();
2090 BufferHead
*bh
= p
->second
;
2091 ldout(cct
, 20) << "flush " << *bh
<< dendl
;
2092 if (length
&& bh
->start() > offset
+length
) {
2099 if (!bh
->is_dirty()) {
2103 if (scattered_write
)
2104 blist
.push_back(bh
);
2106 bh_write(bh
, *trace
);
2109 if (scattered_write
&& !blist
.empty())
2110 bh_write_scattered(blist
);
2115 bool ObjectCacher::_flush_set_finish(C_GatherBuilder
*gather
,
2118 ceph_assert(ceph_mutex_is_locked(lock
));
2119 if (gather
->has_subs()) {
2120 gather
->set_finisher(onfinish
);
2125 ldout(cct
, 10) << "flush_set has no dirty|tx bhs" << dendl
;
2126 onfinish
->complete(0);
2130 // flush. non-blocking, takes callback.
2131 // returns true if already flushed
2132 bool ObjectCacher::flush_set(ObjectSet
*oset
, Context
*onfinish
)
2134 ceph_assert(ceph_mutex_is_locked(lock
));
2135 ceph_assert(onfinish
!= NULL
);
2136 if (oset
->objects
.empty()) {
2137 ldout(cct
, 10) << "flush_set on " << oset
<< " dne" << dendl
;
2138 onfinish
->complete(0);
2142 ldout(cct
, 10) << "flush_set " << oset
<< dendl
;
2144 // we'll need to wait for all objects to flush!
2145 C_GatherBuilder
gather(cct
);
2146 set
<Object
*> waitfor_commit
;
2148 list
<BufferHead
*> blist
;
2149 Object
*last_ob
= NULL
;
2150 set
<BufferHead
*, BufferHead::ptr_lt
>::const_iterator it
, p
, q
;
2152 // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
2153 // order. But items in oset->objects are not sorted. So the iterator can
2154 // point to any buffer head in the ObjectSet
2155 BufferHead
key(*oset
->objects
.begin());
2156 it
= dirty_or_tx_bh
.lower_bound(&key
);
2159 bool backwards
= true;
2160 if (it
!= dirty_or_tx_bh
.begin())
2165 for (; p
!= dirty_or_tx_bh
.end(); p
= q
) {
2167 BufferHead
*bh
= *p
;
2168 if (bh
->ob
->oset
!= oset
)
2170 waitfor_commit
.insert(bh
->ob
);
2171 if (bh
->is_dirty()) {
2172 if (scattered_write
) {
2173 if (last_ob
!= bh
->ob
) {
2174 if (!blist
.empty()) {
2175 bh_write_scattered(blist
);
2180 blist
.push_back(bh
);
2188 for(p
= q
= it
; true; p
= q
) {
2189 if (q
!= dirty_or_tx_bh
.begin())
2193 BufferHead
*bh
= *p
;
2194 if (bh
->ob
->oset
!= oset
)
2196 waitfor_commit
.insert(bh
->ob
);
2197 if (bh
->is_dirty()) {
2198 if (scattered_write
) {
2199 if (last_ob
!= bh
->ob
) {
2200 if (!blist
.empty()) {
2201 bh_write_scattered(blist
);
2206 blist
.push_front(bh
);
2216 if (scattered_write
&& !blist
.empty())
2217 bh_write_scattered(blist
);
2219 for (set
<Object
*>::iterator i
= waitfor_commit
.begin();
2220 i
!= waitfor_commit
.end(); ++i
) {
2223 // we'll need to gather...
2224 ldout(cct
, 10) << "flush_set " << oset
<< " will wait for ack tid "
2225 << ob
->last_write_tid
<< " on " << *ob
<< dendl
;
2226 ob
->waitfor_commit
[ob
->last_write_tid
].push_back(gather
.new_sub());
2229 return _flush_set_finish(&gather
, onfinish
);
2232 // flush. non-blocking, takes callback.
2233 // returns true if already flushed
2234 bool ObjectCacher::flush_set(ObjectSet
*oset
, vector
<ObjectExtent
>& exv
,
2235 ZTracer::Trace
*trace
, Context
*onfinish
)
2237 ceph_assert(ceph_mutex_is_locked(lock
));
2238 ceph_assert(trace
!= nullptr);
2239 ceph_assert(onfinish
!= NULL
);
2240 if (oset
->objects
.empty()) {
2241 ldout(cct
, 10) << "flush_set on " << oset
<< " dne" << dendl
;
2242 onfinish
->complete(0);
2246 ldout(cct
, 10) << "flush_set " << oset
<< " on " << exv
.size()
2247 << " ObjectExtents" << dendl
;
2249 // we'll need to wait for all objects to flush!
2250 C_GatherBuilder
gather(cct
);
2252 for (vector
<ObjectExtent
>::iterator p
= exv
.begin();
2255 ObjectExtent
&ex
= *p
;
2256 sobject_t
soid(ex
.oid
, CEPH_NOSNAP
);
2257 if (objects
[oset
->poolid
].count(soid
) == 0)
2259 Object
*ob
= objects
[oset
->poolid
][soid
];
2261 ldout(cct
, 20) << "flush_set " << oset
<< " ex " << ex
<< " ob " << soid
2262 << " " << ob
<< dendl
;
2264 if (!flush(ob
, ex
.offset
, ex
.length
, trace
)) {
2265 // we'll need to gather...
2266 ldout(cct
, 10) << "flush_set " << oset
<< " will wait for ack tid "
2267 << ob
->last_write_tid
<< " on " << *ob
<< dendl
;
2268 ob
->waitfor_commit
[ob
->last_write_tid
].push_back(gather
.new_sub());
2272 return _flush_set_finish(&gather
, onfinish
);
2275 // flush all dirty data. non-blocking, takes callback.
2276 // returns true if already flushed
2277 bool ObjectCacher::flush_all(Context
*onfinish
)
2279 ceph_assert(ceph_mutex_is_locked(lock
));
2280 ceph_assert(onfinish
!= NULL
);
2282 ldout(cct
, 10) << "flush_all " << dendl
;
2284 // we'll need to wait for all objects to flush!
2285 C_GatherBuilder
gather(cct
);
2286 set
<Object
*> waitfor_commit
;
2288 list
<BufferHead
*> blist
;
2289 Object
*last_ob
= NULL
;
2290 set
<BufferHead
*, BufferHead::ptr_lt
>::iterator next
, it
;
2291 next
= it
= dirty_or_tx_bh
.begin();
2292 while (it
!= dirty_or_tx_bh
.end()) {
2294 BufferHead
*bh
= *it
;
2295 waitfor_commit
.insert(bh
->ob
);
2297 if (bh
->is_dirty()) {
2298 if (scattered_write
) {
2299 if (last_ob
!= bh
->ob
) {
2300 if (!blist
.empty()) {
2301 bh_write_scattered(blist
);
2306 blist
.push_back(bh
);
2315 if (scattered_write
&& !blist
.empty())
2316 bh_write_scattered(blist
);
2318 for (set
<Object
*>::iterator i
= waitfor_commit
.begin();
2319 i
!= waitfor_commit
.end();
2323 // we'll need to gather...
2324 ldout(cct
, 10) << "flush_all will wait for ack tid "
2325 << ob
->last_write_tid
<< " on " << *ob
<< dendl
;
2326 ob
->waitfor_commit
[ob
->last_write_tid
].push_back(gather
.new_sub());
2329 return _flush_set_finish(&gather
, onfinish
);
2332 void ObjectCacher::purge_set(ObjectSet
*oset
)
2334 ceph_assert(ceph_mutex_is_locked(lock
));
2335 if (oset
->objects
.empty()) {
2336 ldout(cct
, 10) << "purge_set on " << oset
<< " dne" << dendl
;
2340 ldout(cct
, 10) << "purge_set " << oset
<< dendl
;
2341 const bool were_dirty
= oset
->dirty_or_tx
> 0;
2343 for (xlist
<Object
*>::iterator i
= oset
->objects
.begin();
2349 // Although we have purged rather than flushed, caller should still
2350 // drop any resources associate with dirty data.
2351 ceph_assert(oset
->dirty_or_tx
== 0);
2352 if (flush_set_callback
&& were_dirty
) {
2353 flush_set_callback(flush_set_callback_arg
, oset
);
2358 loff_t
ObjectCacher::release(Object
*ob
)
2360 ceph_assert(ceph_mutex_is_locked(lock
));
2361 list
<BufferHead
*> clean
;
2362 loff_t o_unclean
= 0;
2364 for (map
<loff_t
,BufferHead
*>::iterator p
= ob
->data
.begin();
2365 p
!= ob
->data
.end();
2367 BufferHead
*bh
= p
->second
;
2368 if (bh
->is_clean() || bh
->is_zero() || bh
->is_error())
2369 clean
.push_back(bh
);
2371 o_unclean
+= bh
->length();
2374 for (list
<BufferHead
*>::iterator p
= clean
.begin();
2381 if (ob
->can_close()) {
2382 ldout(cct
, 10) << "release trimming " << *ob
<< dendl
;
2384 ceph_assert(o_unclean
== 0);
2389 ldout(cct
, 10) << "release clearing complete on " << *ob
<< dendl
;
2390 ob
->complete
= false;
2393 ldout(cct
, 10) << "release setting exists on " << *ob
<< dendl
;
2400 loff_t
ObjectCacher::release_set(ObjectSet
*oset
)
2402 ceph_assert(ceph_mutex_is_locked(lock
));
2403 // return # bytes not clean (and thus not released).
2406 if (oset
->objects
.empty()) {
2407 ldout(cct
, 10) << "release_set on " << oset
<< " dne" << dendl
;
2411 ldout(cct
, 10) << "release_set " << oset
<< dendl
;
2413 xlist
<Object
*>::iterator q
;
2414 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin();
2420 loff_t o_unclean
= release(ob
);
2421 unclean
+= o_unclean
;
2424 ldout(cct
, 10) << "release_set " << oset
<< " " << *ob
2425 << " has " << o_unclean
<< " bytes left"
2431 ldout(cct
, 10) << "release_set " << oset
2432 << ", " << unclean
<< " bytes left" << dendl
;
2439 uint64_t ObjectCacher::release_all()
2441 ceph_assert(ceph_mutex_is_locked(lock
));
2442 ldout(cct
, 10) << "release_all" << dendl
;
2443 uint64_t unclean
= 0;
2445 vector
<ceph::unordered_map
<sobject_t
, Object
*> >::iterator i
2447 while (i
!= objects
.end()) {
2448 ceph::unordered_map
<sobject_t
, Object
*>::iterator p
= i
->begin();
2449 while (p
!= i
->end()) {
2450 ceph::unordered_map
<sobject_t
, Object
*>::iterator n
= p
;
2453 Object
*ob
= p
->second
;
2455 loff_t o_unclean
= release(ob
);
2456 unclean
+= o_unclean
;
2459 ldout(cct
, 10) << "release_all " << *ob
2460 << " has " << o_unclean
<< " bytes left"
2468 ldout(cct
, 10) << "release_all unclean " << unclean
<< " bytes left"
2475 void ObjectCacher::clear_nonexistence(ObjectSet
*oset
)
2477 ceph_assert(ceph_mutex_is_locked(lock
));
2478 ldout(cct
, 10) << "clear_nonexistence() " << oset
<< dendl
;
2480 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin();
2484 ldout(cct
, 10) << " setting exists and complete on " << *ob
<< dendl
;
2486 ob
->complete
= false;
2488 for (xlist
<C_ReadFinish
*>::iterator q
= ob
->reads
.begin();
2490 C_ReadFinish
*comp
= *q
;
2491 comp
->distrust_enoent();
2497 * discard object extents from an ObjectSet by removing the objects in
2498 * exls from the in-memory oset.
2500 void ObjectCacher::discard_set(ObjectSet
*oset
, const vector
<ObjectExtent
>& exls
)
2502 ceph_assert(ceph_mutex_is_locked(lock
));
2503 bool was_dirty
= oset
->dirty_or_tx
> 0;
2505 _discard(oset
, exls
, nullptr);
2506 _discard_finish(oset
, was_dirty
, nullptr);
2510 * discard object extents from an ObjectSet by removing the objects in
2511 * exls from the in-memory oset. If the bh is in TX state, the discard
2512 * will wait for the write to commit prior to invoking on_finish.
2514 void ObjectCacher::discard_writeback(ObjectSet
*oset
,
2515 const vector
<ObjectExtent
>& exls
,
2518 ceph_assert(ceph_mutex_is_locked(lock
));
2519 bool was_dirty
= oset
->dirty_or_tx
> 0;
2521 C_GatherBuilder
gather(cct
);
2522 _discard(oset
, exls
, &gather
);
2524 if (gather
.has_subs()) {
2525 bool flushed
= was_dirty
&& oset
->dirty_or_tx
== 0;
2526 gather
.set_finisher(new LambdaContext(
2527 [this, oset
, flushed
, on_finish
](int) {
2528 ceph_assert(ceph_mutex_is_locked(lock
));
2529 if (flushed
&& flush_set_callback
)
2530 flush_set_callback(flush_set_callback_arg
, oset
);
2532 on_finish
->complete(0);
2538 _discard_finish(oset
, was_dirty
, on_finish
);
2541 void ObjectCacher::_discard(ObjectSet
*oset
, const vector
<ObjectExtent
>& exls
,
2542 C_GatherBuilder
* gather
)
2544 if (oset
->objects
.empty()) {
2545 ldout(cct
, 10) << __func__
<< " on " << oset
<< " dne" << dendl
;
2549 ldout(cct
, 10) << __func__
<< " " << oset
<< dendl
;
2551 for (auto& ex
: exls
) {
2552 ldout(cct
, 10) << __func__
<< " " << oset
<< " ex " << ex
<< dendl
;
2553 sobject_t
soid(ex
.oid
, CEPH_NOSNAP
);
2554 if (objects
[oset
->poolid
].count(soid
) == 0)
2556 Object
*ob
= objects
[oset
->poolid
][soid
];
2558 ob
->discard(ex
.offset
, ex
.length
, gather
);
2562 void ObjectCacher::_discard_finish(ObjectSet
*oset
, bool was_dirty
,
2565 ceph_assert(ceph_mutex_is_locked(lock
));
2567 // did we truncate off dirty data?
2568 if (flush_set_callback
&& was_dirty
&& oset
->dirty_or_tx
== 0) {
2569 flush_set_callback(flush_set_callback_arg
, oset
);
2572 // notify that in-flight writeback has completed
2573 if (on_finish
!= nullptr) {
2574 on_finish
->complete(0);
2578 void ObjectCacher::verify_stats() const
2580 ceph_assert(ceph_mutex_is_locked(lock
));
2581 ldout(cct
, 10) << "verify_stats" << dendl
;
2583 loff_t clean
= 0, zero
= 0, dirty
= 0, rx
= 0, tx
= 0, missing
= 0,
2585 for (vector
<ceph::unordered_map
<sobject_t
, Object
*> >::const_iterator i
2589 for (ceph::unordered_map
<sobject_t
, Object
*>::const_iterator p
2593 Object
*ob
= p
->second
;
2594 for (map
<loff_t
, BufferHead
*>::const_iterator q
= ob
->data
.begin();
2595 q
!= ob
->data
.end();
2597 BufferHead
*bh
= q
->second
;
2598 switch (bh
->get_state()) {
2599 case BufferHead::STATE_MISSING
:
2600 missing
+= bh
->length();
2602 case BufferHead::STATE_CLEAN
:
2603 clean
+= bh
->length();
2605 case BufferHead::STATE_ZERO
:
2606 zero
+= bh
->length();
2608 case BufferHead::STATE_DIRTY
:
2609 dirty
+= bh
->length();
2611 case BufferHead::STATE_TX
:
2614 case BufferHead::STATE_RX
:
2617 case BufferHead::STATE_ERROR
:
2618 error
+= bh
->length();
2627 ldout(cct
, 10) << " clean " << clean
<< " rx " << rx
<< " tx " << tx
2628 << " dirty " << dirty
<< " missing " << missing
2629 << " error " << error
<< dendl
;
2630 ceph_assert(clean
== stat_clean
);
2631 ceph_assert(rx
== stat_rx
);
2632 ceph_assert(tx
== stat_tx
);
2633 ceph_assert(dirty
== stat_dirty
);
2634 ceph_assert(missing
== stat_missing
);
2635 ceph_assert(zero
== stat_zero
);
2636 ceph_assert(error
== stat_error
);
2639 void ObjectCacher::bh_stat_add(BufferHead
*bh
)
2641 ceph_assert(ceph_mutex_is_locked(lock
));
2642 switch (bh
->get_state()) {
2643 case BufferHead::STATE_MISSING
:
2644 stat_missing
+= bh
->length();
2646 case BufferHead::STATE_CLEAN
:
2647 stat_clean
+= bh
->length();
2649 case BufferHead::STATE_ZERO
:
2650 stat_zero
+= bh
->length();
2652 case BufferHead::STATE_DIRTY
:
2653 stat_dirty
+= bh
->length();
2654 bh
->ob
->dirty_or_tx
+= bh
->length();
2655 bh
->ob
->oset
->dirty_or_tx
+= bh
->length();
2657 case BufferHead::STATE_TX
:
2658 stat_tx
+= bh
->length();
2659 bh
->ob
->dirty_or_tx
+= bh
->length();
2660 bh
->ob
->oset
->dirty_or_tx
+= bh
->length();
2662 case BufferHead::STATE_RX
:
2663 stat_rx
+= bh
->length();
2665 case BufferHead::STATE_ERROR
:
2666 stat_error
+= bh
->length();
2669 ceph_abort_msg("bh_stat_add: invalid bufferhead state");
2671 if (get_stat_dirty_waiting() > 0)
2672 stat_cond
.notify_all();
2675 void ObjectCacher::bh_stat_sub(BufferHead
*bh
)
2677 ceph_assert(ceph_mutex_is_locked(lock
));
2678 switch (bh
->get_state()) {
2679 case BufferHead::STATE_MISSING
:
2680 stat_missing
-= bh
->length();
2682 case BufferHead::STATE_CLEAN
:
2683 stat_clean
-= bh
->length();
2685 case BufferHead::STATE_ZERO
:
2686 stat_zero
-= bh
->length();
2688 case BufferHead::STATE_DIRTY
:
2689 stat_dirty
-= bh
->length();
2690 bh
->ob
->dirty_or_tx
-= bh
->length();
2691 bh
->ob
->oset
->dirty_or_tx
-= bh
->length();
2693 case BufferHead::STATE_TX
:
2694 stat_tx
-= bh
->length();
2695 bh
->ob
->dirty_or_tx
-= bh
->length();
2696 bh
->ob
->oset
->dirty_or_tx
-= bh
->length();
2698 case BufferHead::STATE_RX
:
2699 stat_rx
-= bh
->length();
2701 case BufferHead::STATE_ERROR
:
2702 stat_error
-= bh
->length();
2705 ceph_abort_msg("bh_stat_sub: invalid bufferhead state");
2709 void ObjectCacher::bh_set_state(BufferHead
*bh
, int s
)
2711 ceph_assert(ceph_mutex_is_locked(lock
));
2712 int state
= bh
->get_state();
2713 // move between lru lists?
2714 if (s
== BufferHead::STATE_DIRTY
&& state
!= BufferHead::STATE_DIRTY
) {
2715 bh_lru_rest
.lru_remove(bh
);
2716 bh_lru_dirty
.lru_insert_top(bh
);
2717 } else if (s
!= BufferHead::STATE_DIRTY
&&state
== BufferHead::STATE_DIRTY
) {
2718 bh_lru_dirty
.lru_remove(bh
);
2719 if (bh
->get_dontneed())
2720 bh_lru_rest
.lru_insert_bot(bh
);
2722 bh_lru_rest
.lru_insert_top(bh
);
2725 if ((s
== BufferHead::STATE_TX
||
2726 s
== BufferHead::STATE_DIRTY
) &&
2727 state
!= BufferHead::STATE_TX
&&
2728 state
!= BufferHead::STATE_DIRTY
) {
2729 dirty_or_tx_bh
.insert(bh
);
2730 } else if ((state
== BufferHead::STATE_TX
||
2731 state
== BufferHead::STATE_DIRTY
) &&
2732 s
!= BufferHead::STATE_TX
&&
2733 s
!= BufferHead::STATE_DIRTY
) {
2734 dirty_or_tx_bh
.erase(bh
);
2737 if (s
!= BufferHead::STATE_ERROR
&&
2738 state
== BufferHead::STATE_ERROR
) {
2748 void ObjectCacher::bh_add(Object
*ob
, BufferHead
*bh
)
2750 ceph_assert(ceph_mutex_is_locked(lock
));
2751 ldout(cct
, 30) << "bh_add " << *ob
<< " " << *bh
<< dendl
;
2753 if (bh
->is_dirty()) {
2754 bh_lru_dirty
.lru_insert_top(bh
);
2755 dirty_or_tx_bh
.insert(bh
);
2757 if (bh
->get_dontneed())
2758 bh_lru_rest
.lru_insert_bot(bh
);
2760 bh_lru_rest
.lru_insert_top(bh
);
2764 dirty_or_tx_bh
.insert(bh
);
2769 void ObjectCacher::bh_remove(Object
*ob
, BufferHead
*bh
)
2771 ceph_assert(ceph_mutex_is_locked(lock
));
2772 ceph_assert(bh
->get_journal_tid() == 0);
2773 ldout(cct
, 30) << "bh_remove " << *ob
<< " " << *bh
<< dendl
;
2775 if (bh
->is_dirty()) {
2776 bh_lru_dirty
.lru_remove(bh
);
2777 dirty_or_tx_bh
.erase(bh
);
2779 bh_lru_rest
.lru_remove(bh
);
2783 dirty_or_tx_bh
.erase(bh
);
2786 if (get_stat_dirty_waiting() > 0)
2787 stat_cond
.notify_all();