1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include "msg/Messenger.h"
7 #include "ObjectCacher.h"
8 #include "WritebackHandler.h"
9 #include "common/errno.h"
10 #include "common/perf_counters.h"
12 #include "include/assert.h"
14 #define MAX_FLUSH_UNDER_LOCK 20 ///< max bh's we start writeback on
15 #define BUFFER_MEMORY_WEIGHT 12 // memory usage of BufferHead, count in (1<<n)
17 using std::chrono::seconds
;
18 /// while holding the lock
20 /*** ObjectCacher::BufferHead ***/
23 /*** ObjectCacher::Object ***/
25 #define dout_subsys ceph_subsys_objectcacher
27 #define dout_prefix *_dout << "objectcacher.object(" << oid << ") "
31 class ObjectCacher::C_ReadFinish
: public Context
{
37 xlist
<C_ReadFinish
*>::item set_item
;
44 C_ReadFinish(ObjectCacher
*c
, Object
*ob
, ceph_tid_t t
, loff_t s
,
45 uint64_t l
, const ZTracer::Trace
&trace
) :
46 oc(c
), poolid(ob
->oloc
.pool
), oid(ob
->get_soid()), start(s
), length(l
),
47 set_item(this), trust_enoent(true),
48 tid(t
), trace(trace
) {
49 ob
->reads
.push_back(&set_item
);
52 void finish(int r
) override
{
53 oc
->bh_read_finish(poolid
, oid
, tid
, start
, length
, bl
, r
, trust_enoent
);
54 trace
.event("finish");
56 // object destructor clears the list
57 if (set_item
.is_on_list())
58 set_item
.remove_myself();
61 void distrust_enoent() {
66 class ObjectCacher::C_RetryRead
: public Context
{
73 C_RetryRead(ObjectCacher
*_oc
, OSDRead
*r
, ObjectSet
*os
, Context
*c
,
74 const ZTracer::Trace
&trace
)
75 : oc(_oc
), rd(r
), oset(os
), onfinish(c
), trace(trace
) {
77 void finish(int r
) override
{
79 r
= oc
->_readx(rd
, oset
, onfinish
, false, &trace
);
83 // read is still in-progress
87 trace
.event("finish");
89 onfinish
->complete(r
);
94 ObjectCacher::BufferHead
*ObjectCacher::Object::split(BufferHead
*left
,
97 assert(oc
->lock
.is_locked());
98 ldout(oc
->cct
, 20) << "split " << *left
<< " at " << off
<< dendl
;
101 ObjectCacher::BufferHead
*right
= new BufferHead(this);
103 //inherit and if later access, this auto clean.
104 right
->set_dontneed(left
->get_dontneed());
105 right
->set_nocache(left
->get_nocache());
107 right
->last_write_tid
= left
->last_write_tid
;
108 right
->last_read_tid
= left
->last_read_tid
;
109 right
->set_state(left
->get_state());
110 right
->snapc
= left
->snapc
;
111 right
->set_journal_tid(left
->journal_tid
);
113 loff_t newleftlen
= off
- left
->start();
114 right
->set_start(off
);
115 right
->set_length(left
->length() - newleftlen
);
118 oc
->bh_stat_sub(left
);
119 left
->set_length(newleftlen
);
120 oc
->bh_stat_add(left
);
123 oc
->bh_add(this, right
);
129 assert(bl
.length() == (left
->length() + right
->length()));
130 right
->bl
.substr_of(bl
, left
->length(), right
->length());
131 left
->bl
.substr_of(bl
, 0, left
->length());
135 if (!left
->waitfor_read
.empty()) {
136 map
<loff_t
, list
<Context
*> >::iterator start_remove
137 = left
->waitfor_read
.begin();
138 while (start_remove
!= left
->waitfor_read
.end() &&
139 start_remove
->first
< right
->start())
141 for (map
<loff_t
, list
<Context
*> >::iterator p
= start_remove
;
142 p
!= left
->waitfor_read
.end(); ++p
) {
143 ldout(oc
->cct
, 20) << "split moving waiters at byte " << p
->first
144 << " to right bh" << dendl
;
145 right
->waitfor_read
[p
->first
].swap( p
->second
);
146 assert(p
->second
.empty());
148 left
->waitfor_read
.erase(start_remove
, left
->waitfor_read
.end());
151 ldout(oc
->cct
, 20) << "split left is " << *left
<< dendl
;
152 ldout(oc
->cct
, 20) << "split right is " << *right
<< dendl
;
157 void ObjectCacher::Object::merge_left(BufferHead
*left
, BufferHead
*right
)
159 assert(oc
->lock
.is_locked());
160 assert(left
->end() == right
->start());
161 assert(left
->get_state() == right
->get_state());
162 assert(left
->can_merge_journal(right
));
164 ldout(oc
->cct
, 10) << "merge_left " << *left
<< " + " << *right
<< dendl
;
165 if (left
->get_journal_tid() == 0) {
166 left
->set_journal_tid(right
->get_journal_tid());
168 right
->set_journal_tid(0);
170 oc
->bh_remove(this, right
);
171 oc
->bh_stat_sub(left
);
172 left
->set_length(left
->length() + right
->length());
173 oc
->bh_stat_add(left
);
176 left
->bl
.claim_append(right
->bl
);
179 // note: this is sorta busted, but should only be used for dirty buffers
180 left
->last_write_tid
= MAX( left
->last_write_tid
, right
->last_write_tid
);
181 left
->last_write
= MAX( left
->last_write
, right
->last_write
);
183 left
->set_dontneed(right
->get_dontneed() ? left
->get_dontneed() : false);
184 left
->set_nocache(right
->get_nocache() ? left
->get_nocache() : false);
187 for (map
<loff_t
, list
<Context
*> >::iterator p
= right
->waitfor_read
.begin();
188 p
!= right
->waitfor_read
.end();
190 left
->waitfor_read
[p
->first
].splice(left
->waitfor_read
[p
->first
].begin(),
196 ldout(oc
->cct
, 10) << "merge_left result " << *left
<< dendl
;
199 void ObjectCacher::Object::try_merge_bh(BufferHead
*bh
)
201 assert(oc
->lock
.is_locked());
202 ldout(oc
->cct
, 10) << "try_merge_bh " << *bh
<< dendl
;
204 // do not merge rx buffers; last_read_tid may not match
209 map
<loff_t
,BufferHead
*>::iterator p
= data
.find(bh
->start());
210 assert(p
->second
== bh
);
211 if (p
!= data
.begin()) {
213 if (p
->second
->end() == bh
->start() &&
214 p
->second
->get_state() == bh
->get_state() &&
215 p
->second
->can_merge_journal(bh
)) {
216 merge_left(p
->second
, bh
);
223 assert(p
->second
== bh
);
225 if (p
!= data
.end() &&
226 p
->second
->start() == bh
->end() &&
227 p
->second
->get_state() == bh
->get_state() &&
228 p
->second
->can_merge_journal(bh
))
229 merge_left(bh
, p
->second
);
233 * count bytes we have cached in given range
235 bool ObjectCacher::Object::is_cached(loff_t cur
, loff_t left
) const
237 assert(oc
->lock
.is_locked());
238 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(cur
);
243 if (p
->first
<= cur
) {
245 loff_t lenfromcur
= MIN(p
->second
->end() - cur
, left
);
250 } else if (p
->first
> cur
) {
261 * all cached data in this range[off, off+len]
263 bool ObjectCacher::Object::include_all_cached_data(loff_t off
, loff_t len
)
265 assert(oc
->lock
.is_locked());
268 map
<loff_t
, BufferHead
*>::iterator first
= data
.begin();
269 map
<loff_t
, BufferHead
*>::reverse_iterator last
= data
.rbegin();
270 if (first
->second
->start() >= off
&& last
->second
->end() <= (off
+ len
))
277 * map a range of bytes into buffer_heads.
278 * - create missing buffer_heads as necessary.
280 int ObjectCacher::Object::map_read(ObjectExtent
&ex
,
281 map
<loff_t
, BufferHead
*>& hits
,
282 map
<loff_t
, BufferHead
*>& missing
,
283 map
<loff_t
, BufferHead
*>& rx
,
284 map
<loff_t
, BufferHead
*>& errors
)
286 assert(oc
->lock
.is_locked());
287 ldout(oc
->cct
, 10) << "map_read " << ex
.oid
<< " "
288 << ex
.offset
<< "~" << ex
.length
<< dendl
;
290 loff_t cur
= ex
.offset
;
291 loff_t left
= ex
.length
;
293 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(ex
.offset
);
296 if (p
== data
.end()) {
298 BufferHead
*n
= new BufferHead(this);
305 ldout(oc
->cct
, 20) << "map_read miss+complete+zero " << left
<< " left, " << *n
<< dendl
;
308 ldout(oc
->cct
, 20) << "map_read miss " << left
<< " left, " << *n
<< dendl
;
311 assert(cur
== (loff_t
)ex
.offset
+ (loff_t
)ex
.length
);
315 if (p
->first
<= cur
) {
316 // have it (or part of it)
317 BufferHead
*e
= p
->second
;
323 hits
[cur
] = e
; // readable!
324 ldout(oc
->cct
, 20) << "map_read hit " << *e
<< dendl
;
325 } else if (e
->is_rx()) {
326 rx
[cur
] = e
; // missing, not readable.
327 ldout(oc
->cct
, 20) << "map_read rx " << *e
<< dendl
;
328 } else if (e
->is_error()) {
330 ldout(oc
->cct
, 20) << "map_read error " << *e
<< dendl
;
335 loff_t lenfromcur
= MIN(e
->end() - cur
, left
);
341 } else if (p
->first
> cur
) {
343 loff_t next
= p
->first
;
344 BufferHead
*n
= new BufferHead(this);
345 loff_t len
= MIN(next
- cur
, left
);
352 ldout(oc
->cct
, 20) << "map_read gap+complete+zero " << *n
<< dendl
;
355 ldout(oc
->cct
, 20) << "map_read gap " << *n
<< dendl
;
357 cur
+= MIN(left
, n
->length());
358 left
-= MIN(left
, n
->length());
367 void ObjectCacher::Object::audit_buffers()
370 for (map
<loff_t
, BufferHead
*>::const_iterator it
= data
.begin();
371 it
!= data
.end(); ++it
) {
372 if (it
->first
!= it
->second
->start()) {
373 lderr(oc
->cct
) << "AUDIT FAILURE: map position " << it
->first
374 << " does not match bh start position: "
375 << *it
->second
<< dendl
;
376 assert(it
->first
== it
->second
->start());
378 if (it
->first
< offset
) {
379 lderr(oc
->cct
) << "AUDIT FAILURE: " << it
->first
<< " " << *it
->second
380 << " overlaps with previous bh " << *((--it
)->second
)
382 assert(it
->first
>= offset
);
384 BufferHead
*bh
= it
->second
;
385 map
<loff_t
, list
<Context
*> >::const_iterator w_it
;
386 for (w_it
= bh
->waitfor_read
.begin();
387 w_it
!= bh
->waitfor_read
.end(); ++w_it
) {
388 if (w_it
->first
< bh
->start() ||
389 w_it
->first
>= bh
->start() + bh
->length()) {
390 lderr(oc
->cct
) << "AUDIT FAILURE: waiter at " << w_it
->first
391 << " is not within bh " << *bh
<< dendl
;
392 assert(w_it
->first
>= bh
->start());
393 assert(w_it
->first
< bh
->start() + bh
->length());
396 offset
= it
->first
+ it
->second
->length();
401 * map a range of extents on an object's buffer cache.
402 * - combine any bh's we're writing into one
403 * - break up bufferheads that don't fall completely within the range
404 * //no! - return a bh that includes the write. may also include
405 * other dirty data to left and/or right.
407 ObjectCacher::BufferHead
*ObjectCacher::Object::map_write(ObjectExtent
&ex
,
410 assert(oc
->lock
.is_locked());
411 BufferHead
*final
= 0;
413 ldout(oc
->cct
, 10) << "map_write oex " << ex
.oid
414 << " " << ex
.offset
<< "~" << ex
.length
<< dendl
;
416 loff_t cur
= ex
.offset
;
417 loff_t left
= ex
.length
;
419 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(ex
.offset
);
424 if (p
== data
.end()) {
426 final
= new BufferHead(this);
427 replace_journal_tid(final
, tid
);
428 final
->set_start( cur
);
429 final
->set_length( max
);
430 oc
->bh_add(this, final
);
431 ldout(oc
->cct
, 10) << "map_write adding trailing bh " << *final
<< dendl
;
433 oc
->bh_stat_sub(final
);
434 final
->set_length(final
->length() + max
);
435 oc
->bh_stat_add(final
);
442 ldout(oc
->cct
, 10) << "cur is " << cur
<< ", p is " << *p
->second
<< dendl
;
443 //oc->verify_stats();
445 if (p
->first
<= cur
) {
446 BufferHead
*bh
= p
->second
;
447 ldout(oc
->cct
, 10) << "map_write bh " << *bh
<< " intersected" << dendl
;
449 if (p
->first
< cur
) {
451 if (cur
+ max
>= bh
->end()) {
452 // we want right bit (one splice)
453 final
= split(bh
, cur
); // just split it, take right half.
454 replace_journal_tid(final
, tid
);
456 assert(p
->second
== final
);
458 // we want middle bit (two splices)
459 final
= split(bh
, cur
);
461 assert(p
->second
== final
);
462 split(final
, cur
+max
);
463 replace_journal_tid(final
, tid
);
466 assert(p
->first
== cur
);
467 if (bh
->length() <= max
) {
468 // whole bufferhead, piece of cake.
470 // we want left bit (one splice)
471 split(bh
, cur
+ max
); // just split
475 oc
->mark_dirty(final
);
476 --p
; // move iterator back to final
477 assert(p
->second
== final
);
478 replace_journal_tid(bh
, tid
);
479 merge_left(final
, bh
);
482 replace_journal_tid(final
, tid
);
487 loff_t lenfromcur
= final
->end() - cur
;
494 loff_t next
= p
->first
;
495 loff_t glen
= MIN(next
- cur
, max
);
496 ldout(oc
->cct
, 10) << "map_write gap " << cur
<< "~" << glen
<< dendl
;
498 oc
->bh_stat_sub(final
);
499 final
->set_length(final
->length() + glen
);
500 oc
->bh_stat_add(final
);
502 final
= new BufferHead(this);
503 replace_journal_tid(final
, tid
);
504 final
->set_start( cur
);
505 final
->set_length( glen
);
506 oc
->bh_add(this, final
);
517 assert(final
->get_journal_tid() == tid
);
518 ldout(oc
->cct
, 10) << "map_write final is " << *final
<< dendl
;
523 void ObjectCacher::Object::replace_journal_tid(BufferHead
*bh
,
525 ceph_tid_t bh_tid
= bh
->get_journal_tid();
527 assert(tid
== 0 || bh_tid
<= tid
);
528 if (bh_tid
!= 0 && bh_tid
!= tid
) {
529 // inform journal that it should not expect a writeback from this extent
530 oc
->writeback_handler
.overwrite_extent(get_oid(), bh
->start(),
531 bh
->length(), bh_tid
, tid
);
533 bh
->set_journal_tid(tid
);
536 void ObjectCacher::Object::truncate(loff_t s
)
538 assert(oc
->lock
.is_locked());
539 ldout(oc
->cct
, 10) << "truncate " << *this << " to " << s
<< dendl
;
541 while (!data
.empty()) {
542 BufferHead
*bh
= data
.rbegin()->second
;
546 // split bh at truncation point?
547 if (bh
->start() < s
) {
552 // remove bh entirely
553 assert(bh
->start() >= s
);
554 assert(bh
->waitfor_read
.empty());
555 replace_journal_tid(bh
, 0);
556 oc
->bh_remove(this, bh
);
561 void ObjectCacher::Object::discard(loff_t off
, loff_t len
)
563 assert(oc
->lock
.is_locked());
564 ldout(oc
->cct
, 10) << "discard " << *this << " " << off
<< "~" << len
568 ldout(oc
->cct
, 10) << " setting exists on " << *this << dendl
;
572 ldout(oc
->cct
, 10) << " clearing complete on " << *this << dendl
;
576 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(off
);
577 while (p
!= data
.end()) {
578 BufferHead
*bh
= p
->second
;
579 if (bh
->start() >= off
+ len
)
582 // split bh at truncation point?
583 if (bh
->start() < off
) {
589 assert(bh
->start() >= off
);
590 if (bh
->end() > off
+ len
) {
591 split(bh
, off
+ len
);
595 ldout(oc
->cct
, 10) << "discard " << *this << " bh " << *bh
<< dendl
;
596 assert(bh
->waitfor_read
.empty());
597 replace_journal_tid(bh
, 0);
598 oc
->bh_remove(this, bh
);
605 /*** ObjectCacher ***/
608 #define dout_prefix *_dout << "objectcacher "
611 ObjectCacher::ObjectCacher(CephContext
*cct_
, string name
,
612 WritebackHandler
& wb
, Mutex
& l
,
613 flush_set_callback_t flush_callback
,
614 void *flush_callback_arg
, uint64_t max_bytes
,
615 uint64_t max_objects
, uint64_t max_dirty
,
616 uint64_t target_dirty
, double max_dirty_age
,
617 bool block_writes_upfront
)
619 cct(cct_
), writeback_handler(wb
), name(name
), lock(l
),
620 max_dirty(max_dirty
), target_dirty(target_dirty
),
621 max_size(max_bytes
), max_objects(max_objects
),
622 max_dirty_age(ceph::make_timespan(max_dirty_age
)),
623 block_writes_upfront(block_writes_upfront
),
624 trace_endpoint("ObjectCacher"),
625 flush_set_callback(flush_callback
),
626 flush_set_callback_arg(flush_callback_arg
),
627 last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct
),
628 stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
629 stat_missing(0), stat_error(0), stat_dirty_waiting(0),
630 stat_nr_dirty_waiters(0), reads_outstanding(0)
634 scattered_write
= writeback_handler
.can_scattered_write();
637 ObjectCacher::~ObjectCacher()
641 // we should be empty.
642 for (vector
<ceph::unordered_map
<sobject_t
, Object
*> >::iterator i
647 assert(bh_lru_rest
.lru_get_size() == 0);
648 assert(bh_lru_dirty
.lru_get_size() == 0);
649 assert(ob_lru
.lru_get_size() == 0);
650 assert(dirty_or_tx_bh
.empty());
653 void ObjectCacher::perf_start()
655 string n
= "objectcacher-" + name
;
656 PerfCountersBuilder
plb(cct
, n
, l_objectcacher_first
, l_objectcacher_last
);
658 plb
.add_u64_counter(l_objectcacher_cache_ops_hit
,
659 "cache_ops_hit", "Hit operations");
660 plb
.add_u64_counter(l_objectcacher_cache_ops_miss
,
661 "cache_ops_miss", "Miss operations");
662 plb
.add_u64_counter(l_objectcacher_cache_bytes_hit
,
663 "cache_bytes_hit", "Hit data");
664 plb
.add_u64_counter(l_objectcacher_cache_bytes_miss
,
665 "cache_bytes_miss", "Miss data");
666 plb
.add_u64_counter(l_objectcacher_data_read
,
667 "data_read", "Read data");
668 plb
.add_u64_counter(l_objectcacher_data_written
,
669 "data_written", "Data written to cache");
670 plb
.add_u64_counter(l_objectcacher_data_flushed
,
671 "data_flushed", "Data flushed");
672 plb
.add_u64_counter(l_objectcacher_overwritten_in_flush
,
673 "data_overwritten_while_flushing",
674 "Data overwritten while flushing");
675 plb
.add_u64_counter(l_objectcacher_write_ops_blocked
, "write_ops_blocked",
676 "Write operations, delayed due to dirty limits");
677 plb
.add_u64_counter(l_objectcacher_write_bytes_blocked
,
678 "write_bytes_blocked",
679 "Write data blocked on dirty limit");
680 plb
.add_time(l_objectcacher_write_time_blocked
, "write_time_blocked",
681 "Time spent blocking a write due to dirty limits");
683 perfcounter
= plb
.create_perf_counters();
684 cct
->get_perfcounters_collection()->add(perfcounter
);
687 void ObjectCacher::perf_stop()
690 cct
->get_perfcounters_collection()->remove(perfcounter
);
695 ObjectCacher::Object
*ObjectCacher::get_object(sobject_t oid
,
699 uint64_t truncate_size
,
700 uint64_t truncate_seq
)
702 // XXX: Add handling of nspace in object_locator_t in cache
703 assert(lock
.is_locked());
705 if ((uint32_t)l
.pool
< objects
.size()) {
706 if (objects
[l
.pool
].count(oid
)) {
707 Object
*o
= objects
[l
.pool
][oid
];
708 o
->object_no
= object_no
;
709 o
->truncate_size
= truncate_size
;
710 o
->truncate_seq
= truncate_seq
;
714 objects
.resize(l
.pool
+1);
718 Object
*o
= new Object(this, oid
, object_no
, oset
, l
, truncate_size
,
720 objects
[l
.pool
][oid
] = o
;
721 ob_lru
.lru_insert_top(o
);
725 void ObjectCacher::close_object(Object
*ob
)
727 assert(lock
.is_locked());
728 ldout(cct
, 10) << "close_object " << *ob
<< dendl
;
729 assert(ob
->can_close());
732 ob_lru
.lru_remove(ob
);
733 objects
[ob
->oloc
.pool
].erase(ob
->get_soid());
734 ob
->set_item
.remove_myself();
738 void ObjectCacher::bh_read(BufferHead
*bh
, int op_flags
,
739 const ZTracer::Trace
&parent_trace
)
741 assert(lock
.is_locked());
742 ldout(cct
, 7) << "bh_read on " << *bh
<< " outstanding reads "
743 << reads_outstanding
<< dendl
;
745 ZTracer::Trace trace
;
746 if (parent_trace
.valid()) {
747 trace
.init("", &trace_endpoint
, &parent_trace
);
748 trace
.copy_name("bh_read " + bh
->ob
->get_oid().name
);
749 trace
.event("start");
753 bh
->last_read_tid
= ++last_read_tid
;
756 C_ReadFinish
*onfinish
= new C_ReadFinish(this, bh
->ob
, bh
->last_read_tid
,
757 bh
->start(), bh
->length(), trace
);
759 writeback_handler
.read(bh
->ob
->get_oid(), bh
->ob
->get_object_number(),
760 bh
->ob
->get_oloc(), bh
->start(), bh
->length(),
761 bh
->ob
->get_snap(), &onfinish
->bl
,
762 bh
->ob
->truncate_size
, bh
->ob
->truncate_seq
,
763 op_flags
, trace
, onfinish
);
768 void ObjectCacher::bh_read_finish(int64_t poolid
, sobject_t oid
,
769 ceph_tid_t tid
, loff_t start
,
770 uint64_t length
, bufferlist
&bl
, int r
,
773 assert(lock
.is_locked());
774 ldout(cct
, 7) << "bh_read_finish "
777 << " " << start
<< "~" << length
778 << " (bl is " << bl
.length() << ")"
780 << " outstanding reads " << reads_outstanding
783 if (r
>= 0 && bl
.length() < length
) {
784 ldout(cct
, 7) << "bh_read_finish " << oid
<< " padding " << start
<< "~"
785 << length
<< " with " << length
- bl
.length() << " bytes of zeroes"
787 bl
.append_zero(length
- bl
.length());
793 if (objects
[poolid
].count(oid
) == 0) {
794 ldout(cct
, 7) << "bh_read_finish no object cache" << dendl
;
796 Object
*ob
= objects
[poolid
][oid
];
798 if (r
== -ENOENT
&& !ob
->complete
) {
799 // wake up *all* rx waiters, or else we risk reordering
800 // identical reads. e.g.
802 // reply to unrelated 3~1 -> !exists
803 // read 1~1 -> immediate ENOENT
804 // reply to first 1~1 -> ooo ENOENT
806 for (map
<loff_t
, BufferHead
*>::iterator p
= ob
->data
.begin();
807 p
!= ob
->data
.end(); ++p
) {
808 BufferHead
*bh
= p
->second
;
809 for (map
<loff_t
, list
<Context
*> >::iterator p
810 = bh
->waitfor_read
.begin();
811 p
!= bh
->waitfor_read
.end();
813 ls
.splice(ls
.end(), p
->second
);
814 bh
->waitfor_read
.clear();
815 if (!bh
->is_zero() && !bh
->is_rx())
819 // just pass through and retry all waiters if we don't trust
820 // -ENOENT for this read
823 << "bh_read_finish ENOENT, marking complete and !exists on " << *ob
828 /* If all the bhs are effectively zero, get rid of them. All
829 * the waiters will be retried and get -ENOENT immediately, so
830 * it's safe to clean up the unneeded bh's now. Since we know
831 * it's safe to remove them now, do so, so they aren't hanging
832 *around waiting for more -ENOENTs from rados while the cache
833 * is being shut down.
835 * Only do this when all the bhs are rx or clean, to match the
836 * condition in _readx(). If there are any non-rx or non-clean
837 * bhs, _readx() will wait for the final result instead of
838 * returning -ENOENT immediately.
842 << "bh_read_finish ENOENT and allzero, getting rid of "
843 << "bhs for " << *ob
<< dendl
;
844 map
<loff_t
, BufferHead
*>::iterator p
= ob
->data
.begin();
845 while (p
!= ob
->data
.end()) {
846 BufferHead
*bh
= p
->second
;
847 // current iterator will be invalidated by bh_remove()
859 map
<loff_t
, BufferHead
*>::const_iterator p
= ob
->data_lower_bound(opos
);
860 if (p
== ob
->data
.end())
862 if (opos
>= start
+(loff_t
)length
) {
863 ldout(cct
, 20) << "break due to opos " << opos
<< " >= start+length "
864 << start
<< "+" << length
<< "=" << start
+(loff_t
)length
869 BufferHead
*bh
= p
->second
;
870 ldout(cct
, 20) << "checking bh " << *bh
<< dendl
;
873 for (map
<loff_t
, list
<Context
*> >::iterator it
874 = bh
->waitfor_read
.begin();
875 it
!= bh
->waitfor_read
.end();
877 ls
.splice(ls
.end(), it
->second
);
878 bh
->waitfor_read
.clear();
880 if (bh
->start() > opos
) {
881 ldout(cct
, 1) << "bh_read_finish skipping gap "
882 << opos
<< "~" << bh
->start() - opos
889 ldout(cct
, 10) << "bh_read_finish skipping non-rx " << *bh
<< dendl
;
894 if (bh
->last_read_tid
!= tid
) {
895 ldout(cct
, 10) << "bh_read_finish bh->last_read_tid "
896 << bh
->last_read_tid
<< " != tid " << tid
897 << ", skipping" << dendl
;
902 assert(opos
>= bh
->start());
903 assert(bh
->start() == opos
); // we don't merge rx bh's... yet!
904 assert(bh
->length() <= start
+(loff_t
)length
-opos
);
913 ldout(cct
, 10) << "bh_read_finish removing " << *bh
<< dendl
;
917 ldout(cct
, 10) << "skipping unstrusted -ENOENT and will retry for "
933 ldout(cct
, 10) << "bh_read_finish read " << *bh
<< dendl
;
935 ob
->try_merge_bh(bh
);
939 // called with lock held.
940 ldout(cct
, 20) << "finishing waiters " << ls
<< dendl
;
942 finish_contexts(cct
, ls
, err
);
943 retry_waiting_reads();
949 void ObjectCacher::bh_write_adjacencies(BufferHead
*bh
, ceph::real_time cutoff
,
950 int64_t *max_amount
, int *max_count
)
952 list
<BufferHead
*> blist
;
955 int64_t total_len
= 0;
956 set
<BufferHead
*, BufferHead::ptr_lt
>::iterator it
= dirty_or_tx_bh
.find(bh
);
957 assert(it
!= dirty_or_tx_bh
.end());
958 for (set
<BufferHead
*, BufferHead::ptr_lt
>::iterator p
= it
;
959 p
!= dirty_or_tx_bh
.end();
961 BufferHead
*obh
= *p
;
962 if (obh
->ob
!= bh
->ob
)
964 if (obh
->is_dirty() && obh
->last_write
<= cutoff
) {
965 blist
.push_back(obh
);
967 total_len
+= obh
->length();
968 if ((max_count
&& count
> *max_count
) ||
969 (max_amount
&& total_len
> *max_amount
))
974 while (it
!= dirty_or_tx_bh
.begin()) {
976 BufferHead
*obh
= *it
;
977 if (obh
->ob
!= bh
->ob
)
979 if (obh
->is_dirty() && obh
->last_write
<= cutoff
) {
980 blist
.push_front(obh
);
982 total_len
+= obh
->length();
983 if ((max_count
&& count
> *max_count
) ||
984 (max_amount
&& total_len
> *max_amount
))
991 *max_amount
-= total_len
;
993 bh_write_scattered(blist
);
996 class ObjectCacher::C_WriteCommit
: public Context
{
1000 vector
<pair
<loff_t
, uint64_t> > ranges
;
1001 ZTracer::Trace trace
;
1004 C_WriteCommit(ObjectCacher
*c
, int64_t _poolid
, sobject_t o
, loff_t s
,
1005 uint64_t l
, const ZTracer::Trace
&trace
) :
1006 oc(c
), poolid(_poolid
), oid(o
), trace(trace
) {
1007 ranges
.push_back(make_pair(s
, l
));
1009 C_WriteCommit(ObjectCacher
*c
, int64_t _poolid
, sobject_t o
,
1010 vector
<pair
<loff_t
, uint64_t> >& _ranges
) :
1011 oc(c
), poolid(_poolid
), oid(o
), tid(0) {
1012 ranges
.swap(_ranges
);
1014 void finish(int r
) override
{
1015 oc
->bh_write_commit(poolid
, oid
, ranges
, tid
, r
);
1016 trace
.event("finish");
1019 void ObjectCacher::bh_write_scattered(list
<BufferHead
*>& blist
)
1021 assert(lock
.is_locked());
1023 Object
*ob
= blist
.front()->ob
;
1026 ceph::real_time last_write
;
1028 vector
<pair
<loff_t
, uint64_t> > ranges
;
1029 vector
<pair
<uint64_t, bufferlist
> > io_vec
;
1031 ranges
.reserve(blist
.size());
1032 io_vec
.reserve(blist
.size());
1034 uint64_t total_len
= 0;
1035 for (list
<BufferHead
*>::iterator p
= blist
.begin(); p
!= blist
.end(); ++p
) {
1036 BufferHead
*bh
= *p
;
1037 ldout(cct
, 7) << "bh_write_scattered " << *bh
<< dendl
;
1038 assert(bh
->ob
== ob
);
1039 assert(bh
->bl
.length() == bh
->length());
1040 ranges
.push_back(pair
<loff_t
, uint64_t>(bh
->start(), bh
->length()));
1042 int n
= io_vec
.size();
1043 io_vec
.resize(n
+ 1);
1044 io_vec
[n
].first
= bh
->start();
1045 io_vec
[n
].second
= bh
->bl
;
1047 total_len
+= bh
->length();
1048 if (bh
->snapc
.seq
> snapc
.seq
)
1050 if (bh
->last_write
> last_write
)
1051 last_write
= bh
->last_write
;
1054 C_WriteCommit
*oncommit
= new C_WriteCommit(this, ob
->oloc
.pool
, ob
->get_soid(), ranges
);
1056 ceph_tid_t tid
= writeback_handler
.write(ob
->get_oid(), ob
->get_oloc(),
1057 io_vec
, snapc
, last_write
,
1058 ob
->truncate_size
, ob
->truncate_seq
,
1060 oncommit
->tid
= tid
;
1061 ob
->last_write_tid
= tid
;
1062 for (list
<BufferHead
*>::iterator p
= blist
.begin(); p
!= blist
.end(); ++p
) {
1063 BufferHead
*bh
= *p
;
1064 bh
->last_write_tid
= tid
;
1069 perfcounter
->inc(l_objectcacher_data_flushed
, total_len
);
1072 void ObjectCacher::bh_write(BufferHead
*bh
, const ZTracer::Trace
&parent_trace
)
1074 assert(lock
.is_locked());
1075 ldout(cct
, 7) << "bh_write " << *bh
<< dendl
;
1079 ZTracer::Trace trace
;
1080 if (parent_trace
.valid()) {
1081 trace
.init("", &trace_endpoint
, &parent_trace
);
1082 trace
.copy_name("bh_write " + bh
->ob
->get_oid().name
);
1083 trace
.event("start");
1087 C_WriteCommit
*oncommit
= new C_WriteCommit(this, bh
->ob
->oloc
.pool
,
1088 bh
->ob
->get_soid(), bh
->start(),
1089 bh
->length(), trace
);
1091 ceph_tid_t tid
= writeback_handler
.write(bh
->ob
->get_oid(),
1093 bh
->start(), bh
->length(),
1094 bh
->snapc
, bh
->bl
, bh
->last_write
,
1095 bh
->ob
->truncate_size
,
1096 bh
->ob
->truncate_seq
,
1097 bh
->journal_tid
, trace
, oncommit
);
1098 ldout(cct
, 20) << " tid " << tid
<< " on " << bh
->ob
->get_oid() << dendl
;
1100 // set bh last_write_tid
1101 oncommit
->tid
= tid
;
1102 bh
->ob
->last_write_tid
= tid
;
1103 bh
->last_write_tid
= tid
;
1106 perfcounter
->inc(l_objectcacher_data_flushed
, bh
->length());
1112 void ObjectCacher::bh_write_commit(int64_t poolid
, sobject_t oid
,
1113 vector
<pair
<loff_t
, uint64_t> >& ranges
,
1114 ceph_tid_t tid
, int r
)
1116 assert(lock
.is_locked());
1117 ldout(cct
, 7) << "bh_write_commit " << oid
<< " tid " << tid
1118 << " ranges " << ranges
<< " returned " << r
<< dendl
;
1120 if (objects
[poolid
].count(oid
) == 0) {
1121 ldout(cct
, 7) << "bh_write_commit no object cache" << dendl
;
1125 Object
*ob
= objects
[poolid
][oid
];
1126 int was_dirty_or_tx
= ob
->oset
->dirty_or_tx
;
1128 for (vector
<pair
<loff_t
, uint64_t> >::iterator p
= ranges
.begin();
1131 loff_t start
= p
->first
;
1132 uint64_t length
= p
->second
;
1134 ldout(cct
, 10) << "bh_write_commit marking exists on " << *ob
<< dendl
;
1137 if (writeback_handler
.may_copy_on_write(ob
->get_oid(), start
, length
,
1139 ldout(cct
, 10) << "bh_write_commit may copy on write, clearing "
1140 "complete on " << *ob
<< dendl
;
1141 ob
->complete
= false;
1145 vector
<pair
<loff_t
, BufferHead
*>> hit
;
1147 for (map
<loff_t
, BufferHead
*>::const_iterator p
= ob
->data_lower_bound(start
);
1148 p
!= ob
->data
.end();
1150 BufferHead
*bh
= p
->second
;
1152 if (bh
->start() > start
+(loff_t
)length
)
1155 if (bh
->start() < start
&&
1156 bh
->end() > start
+(loff_t
)length
) {
1157 ldout(cct
, 20) << "bh_write_commit skipping " << *bh
<< dendl
;
1161 // make sure bh is tx
1163 ldout(cct
, 10) << "bh_write_commit skipping non-tx " << *bh
<< dendl
;
1167 // make sure bh tid matches
1168 if (bh
->last_write_tid
!= tid
) {
1169 assert(bh
->last_write_tid
> tid
);
1170 ldout(cct
, 10) << "bh_write_commit newer tid on " << *bh
<< dendl
;
1175 // ok! mark bh clean and error-free
1177 bh
->set_journal_tid(0);
1178 if (bh
->get_nocache())
1179 bh_lru_rest
.lru_bottouch(bh
);
1180 hit
.push_back(make_pair(bh
->start(), bh
));
1181 ldout(cct
, 10) << "bh_write_commit clean " << *bh
<< dendl
;
1184 ldout(cct
, 10) << "bh_write_commit marking dirty again due to error "
1185 << *bh
<< " r = " << r
<< " " << cpp_strerror(-r
)
1190 for (auto& p
: hit
) {
1191 //p.second maybe merged and deleted in merge_left
1192 if (ob
->data
.count(p
.first
))
1193 ob
->try_merge_bh(p
.second
);
1197 // update last_commit.
1198 assert(ob
->last_commit_tid
< tid
);
1199 ob
->last_commit_tid
= tid
;
1203 if (ob
->waitfor_commit
.count(tid
)) {
1204 ls
.splice(ls
.begin(), ob
->waitfor_commit
[tid
]);
1205 ob
->waitfor_commit
.erase(tid
);
1208 // is the entire object set now clean and fully committed?
1209 ObjectSet
*oset
= ob
->oset
;
1212 if (flush_set_callback
&&
1213 was_dirty_or_tx
> 0 &&
1214 oset
->dirty_or_tx
== 0) { // nothing dirty/tx
1215 flush_set_callback(flush_set_callback_arg
, oset
);
1219 finish_contexts(cct
, ls
, r
);
1222 void ObjectCacher::flush(ZTracer::Trace
*trace
, loff_t amount
)
1224 assert(trace
!= nullptr);
1225 assert(lock
.is_locked());
1226 ceph::real_time cutoff
= ceph::real_clock::now();
1228 ldout(cct
, 10) << "flush " << amount
<< dendl
;
1231 * NOTE: we aren't actually pulling things off the LRU here, just
1232 * looking at the tail item. Then we call bh_write, which moves it
1233 * to the other LRU, so that we can call
1234 * lru_dirty.lru_get_next_expire() again.
1236 int64_t left
= amount
;
1237 while (amount
== 0 || left
> 0) {
1238 BufferHead
*bh
= static_cast<BufferHead
*>(
1239 bh_lru_dirty
.lru_get_next_expire());
1241 if (bh
->last_write
> cutoff
) break;
1243 if (scattered_write
) {
1244 bh_write_adjacencies(bh
, cutoff
, amount
> 0 ? &left
: NULL
, NULL
);
1246 left
-= bh
->length();
1247 bh_write(bh
, *trace
);
1253 void ObjectCacher::trim()
1255 assert(lock
.is_locked());
1256 ldout(cct
, 10) << "trim start: bytes: max " << max_size
<< " clean "
1257 << get_stat_clean() << ", objects: max " << max_objects
1258 << " current " << ob_lru
.lru_get_size() << dendl
;
1260 uint64_t max_clean_bh
= max_size
>> BUFFER_MEMORY_WEIGHT
;
1261 uint64_t nr_clean_bh
= bh_lru_rest
.lru_get_size() - bh_lru_rest
.lru_get_num_pinned();
1262 while (get_stat_clean() > 0 &&
1263 ((uint64_t)get_stat_clean() > max_size
||
1264 nr_clean_bh
> max_clean_bh
)) {
1265 BufferHead
*bh
= static_cast<BufferHead
*>(bh_lru_rest
.lru_expire());
1269 ldout(cct
, 10) << "trim trimming " << *bh
<< dendl
;
1270 assert(bh
->is_clean() || bh
->is_zero() || bh
->is_error());
1272 Object
*ob
= bh
->ob
;
1279 ldout(cct
, 10) << "trim clearing complete on " << *ob
<< dendl
;
1280 ob
->complete
= false;
1284 while (ob_lru
.lru_get_size() > max_objects
) {
1285 Object
*ob
= static_cast<Object
*>(ob_lru
.lru_expire());
1289 ldout(cct
, 10) << "trim trimming " << *ob
<< dendl
;
1293 ldout(cct
, 10) << "trim finish: max " << max_size
<< " clean "
1294 << get_stat_clean() << ", objects: max " << max_objects
1295 << " current " << ob_lru
.lru_get_size() << dendl
;
1302 bool ObjectCacher::is_cached(ObjectSet
*oset
, vector
<ObjectExtent
>& extents
,
1305 assert(lock
.is_locked());
1306 for (vector
<ObjectExtent
>::iterator ex_it
= extents
.begin();
1307 ex_it
!= extents
.end();
1309 ldout(cct
, 10) << "is_cached " << *ex_it
<< dendl
;
1312 sobject_t
soid(ex_it
->oid
, snapid
);
1313 Object
*o
= get_object_maybe(soid
, ex_it
->oloc
);
1316 if (!o
->is_cached(ex_it
->offset
, ex_it
->length
))
1324 * returns # bytes read (if in cache). onfinish is untouched (caller
1326 * returns 0 if doing async read
1328 int ObjectCacher::readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
,
1329 ZTracer::Trace
*parent_trace
)
1331 ZTracer::Trace trace
;
1332 if (parent_trace
!= nullptr) {
1333 trace
.init("read", &trace_endpoint
, parent_trace
);
1334 trace
.event("start");
1337 int r
=_readx(rd
, oset
, onfinish
, true, &trace
);
1339 trace
.event("finish");
1344 int ObjectCacher::_readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
,
1345 bool external_call
, ZTracer::Trace
*trace
)
1347 assert(trace
!= nullptr);
1348 assert(lock
.is_locked());
1349 bool success
= true;
1351 uint64_t bytes_in_cache
= 0;
1352 uint64_t bytes_not_in_cache
= 0;
1353 uint64_t total_bytes_read
= 0;
1354 map
<uint64_t, bufferlist
> stripe_map
; // final buffer offset -> substring
1355 bool dontneed
= rd
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_DONTNEED
;
1356 bool nocache
= rd
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_NOCACHE
;
1359 * WARNING: we can only meaningfully return ENOENT if the read request
1360 * passed in a single ObjectExtent. Any caller who wants ENOENT instead of
1361 * zeroed buffers needs to feed single extents into readx().
1363 assert(!oset
->return_enoent
|| rd
->extents
.size() == 1);
1365 for (vector
<ObjectExtent
>::iterator ex_it
= rd
->extents
.begin();
1366 ex_it
!= rd
->extents
.end();
1368 ldout(cct
, 10) << "readx " << *ex_it
<< dendl
;
1370 total_bytes_read
+= ex_it
->length
;
1373 sobject_t
soid(ex_it
->oid
, rd
->snap
);
1374 Object
*o
= get_object(soid
, ex_it
->objectno
, oset
, ex_it
->oloc
,
1375 ex_it
->truncate_size
, oset
->truncate_seq
);
1379 // does not exist and no hits?
1380 if (oset
->return_enoent
&& !o
->exists
) {
1381 ldout(cct
, 10) << "readx object !exists, 1 extent..." << dendl
;
1383 // should we worry about COW underneath us?
1384 if (writeback_handler
.may_copy_on_write(soid
.oid
, ex_it
->offset
,
1385 ex_it
->length
, soid
.snap
)) {
1386 ldout(cct
, 20) << "readx may copy on write" << dendl
;
1388 list
<BufferHead
*> blist
;
1389 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= o
->data
.begin();
1390 bh_it
!= o
->data
.end();
1392 BufferHead
*bh
= bh_it
->second
;
1393 if (bh
->is_dirty() || bh
->is_tx()) {
1394 ldout(cct
, 10) << "readx flushing " << *bh
<< dendl
;
1396 if (bh
->is_dirty()) {
1397 if (scattered_write
)
1398 blist
.push_back(bh
);
1400 bh_write(bh
, *trace
);
1404 if (scattered_write
&& !blist
.empty())
1405 bh_write_scattered(blist
);
1407 ldout(cct
, 10) << "readx waiting on tid " << o
->last_write_tid
1408 << " on " << *o
<< dendl
;
1409 o
->waitfor_commit
[o
->last_write_tid
].push_back(
1410 new C_RetryRead(this,rd
, oset
, onfinish
, *trace
));
1411 // FIXME: perfcounter!
1416 // can we return ENOENT?
1417 bool allzero
= true;
1418 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= o
->data
.begin();
1419 bh_it
!= o
->data
.end();
1421 ldout(cct
, 20) << "readx ob has bh " << *bh_it
->second
<< dendl
;
1422 if (!bh_it
->second
->is_zero() && !bh_it
->second
->is_rx()) {
1428 ldout(cct
, 10) << "readx ob has all zero|rx, returning ENOENT"
1437 // map extent into bufferheads
1438 map
<loff_t
, BufferHead
*> hits
, missing
, rx
, errors
;
1439 o
->map_read(*ex_it
, hits
, missing
, rx
, errors
);
1440 if (external_call
) {
1441 // retry reading error buffers
1442 missing
.insert(errors
.begin(), errors
.end());
1444 // some reads had errors, fail later so completions
1445 // are cleaned up properly
1446 // TODO: make read path not call _readx for every completion
1447 hits
.insert(errors
.begin(), errors
.end());
1450 if (!missing
.empty() || !rx
.empty()) {
1452 map
<loff_t
, BufferHead
*>::iterator last
= missing
.end();
1453 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= missing
.begin();
1454 bh_it
!= missing
.end();
1456 uint64_t rx_bytes
= static_cast<uint64_t>(
1457 stat_rx
+ bh_it
->second
->length());
1458 bytes_not_in_cache
+= bh_it
->second
->length();
1459 if (!waitfor_read
.empty() || (stat_rx
> 0 && rx_bytes
> max_size
)) {
1460 // cache is full with concurrent reads -- wait for rx's to complete
1461 // to constrain memory growth (especially during copy-ups)
1463 ldout(cct
, 10) << "readx missed, waiting on cache to complete "
1464 << waitfor_read
.size() << " blocked reads, "
1465 << (MAX(rx_bytes
, max_size
) - max_size
)
1466 << " read bytes" << dendl
;
1467 waitfor_read
.push_back(new C_RetryRead(this, rd
, oset
, onfinish
,
1471 bh_remove(o
, bh_it
->second
);
1472 delete bh_it
->second
;
1474 bh_it
->second
->set_nocache(nocache
);
1475 bh_read(bh_it
->second
, rd
->fadvise_flags
, *trace
);
1476 if ((success
&& onfinish
) || last
!= missing
.end())
1482 //add wait in last bh avoid wakeup early. Because read is order
1483 if (last
!= missing
.end()) {
1484 ldout(cct
, 10) << "readx missed, waiting on " << *last
->second
1485 << " off " << last
->first
<< dendl
;
1486 last
->second
->waitfor_read
[last
->first
].push_back(
1487 new C_RetryRead(this, rd
, oset
, onfinish
, *trace
) );
1492 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= rx
.begin();
1495 touch_bh(bh_it
->second
); // bump in lru, so we don't lose it.
1496 if (success
&& onfinish
) {
1497 ldout(cct
, 10) << "readx missed, waiting on " << *bh_it
->second
1498 << " off " << bh_it
->first
<< dendl
;
1499 bh_it
->second
->waitfor_read
[bh_it
->first
].push_back(
1500 new C_RetryRead(this, rd
, oset
, onfinish
, *trace
) );
1502 bytes_not_in_cache
+= bh_it
->second
->length();
1506 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= hits
.begin();
1507 bh_it
!= hits
.end(); ++bh_it
)
1508 //bump in lru, so we don't lose it when later read
1509 touch_bh(bh_it
->second
);
1512 assert(!hits
.empty());
1514 // make a plain list
1515 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= hits
.begin();
1516 bh_it
!= hits
.end();
1518 BufferHead
*bh
= bh_it
->second
;
1519 ldout(cct
, 10) << "readx hit bh " << *bh
<< dendl
;
1520 if (bh
->is_error() && bh
->error
)
1522 bytes_in_cache
+= bh
->length();
1524 if (bh
->get_nocache() && bh
->is_clean())
1525 bh_lru_rest
.lru_bottouch(bh
);
1528 //must be after touch_bh because touch_bh set dontneed false
1530 ((loff_t
)ex_it
->offset
<= bh
->start() &&
1531 (bh
->end() <=(loff_t
)(ex_it
->offset
+ ex_it
->length
)))) {
1532 bh
->set_dontneed(true); //if dirty
1534 bh_lru_rest
.lru_bottouch(bh
);
1539 // create reverse map of buffer offset -> object for the
1540 // eventual result. this is over a single ObjectExtent, so we
1542 // - the bh's are contiguous
1543 // - the buffer frags need not be (and almost certainly aren't)
1544 loff_t opos
= ex_it
->offset
;
1545 map
<loff_t
, BufferHead
*>::iterator bh_it
= hits
.begin();
1546 assert(bh_it
->second
->start() <= opos
);
1547 uint64_t bhoff
= opos
- bh_it
->second
->start();
1548 vector
<pair
<uint64_t,uint64_t> >::iterator f_it
1549 = ex_it
->buffer_extents
.begin();
1552 BufferHead
*bh
= bh_it
->second
;
1553 assert(opos
== (loff_t
)(bh
->start() + bhoff
));
1555 uint64_t len
= MIN(f_it
->second
- foff
, bh
->length() - bhoff
);
1556 ldout(cct
, 10) << "readx rmap opos " << opos
<< ": " << *bh
<< " +"
1557 << bhoff
<< " frag " << f_it
->first
<< "~"
1558 << f_it
->second
<< " +" << foff
<< "~" << len
1562 // put substr here first, since substr_of clobbers, and we
1563 // may get multiple bh's at this stripe_map position
1564 if (bh
->is_zero()) {
1565 stripe_map
[f_it
->first
].append_zero(len
);
1567 bit
.substr_of(bh
->bl
,
1570 stripe_map
[f_it
->first
].claim_append(bit
);
1576 if (opos
== bh
->end()) {
1580 if (foff
== f_it
->second
) {
1584 if (bh_it
== hits
.end()) break;
1585 if (f_it
== ex_it
->buffer_extents
.end())
1588 assert(f_it
== ex_it
->buffer_extents
.end());
1589 assert(opos
== (loff_t
)ex_it
->offset
+ (loff_t
)ex_it
->length
);
1592 if (dontneed
&& o
->include_all_cached_data(ex_it
->offset
, ex_it
->length
))
1598 if (perfcounter
&& external_call
) {
1599 perfcounter
->inc(l_objectcacher_data_read
, total_bytes_read
);
1600 perfcounter
->inc(l_objectcacher_cache_bytes_miss
, bytes_not_in_cache
);
1601 perfcounter
->inc(l_objectcacher_cache_ops_miss
);
1604 ldout(cct
, 20) << "readx defer " << rd
<< dendl
;
1606 ldout(cct
, 20) << "readx drop " << rd
<< " (no complete, but no waiter)"
1612 if (perfcounter
&& external_call
) {
1613 perfcounter
->inc(l_objectcacher_data_read
, total_bytes_read
);
1614 perfcounter
->inc(l_objectcacher_cache_bytes_hit
, bytes_in_cache
);
1615 perfcounter
->inc(l_objectcacher_cache_ops_hit
);
1618 // no misses... success! do the read.
1619 ldout(cct
, 10) << "readx has all buffers" << dendl
;
1621 // ok, assemble into result buffer.
1623 if (rd
->bl
&& !error
) {
1625 for (map
<uint64_t,bufferlist
>::iterator i
= stripe_map
.begin();
1626 i
!= stripe_map
.end();
1628 assert(pos
== i
->first
);
1629 ldout(cct
, 10) << "readx adding buffer len " << i
->second
.length()
1630 << " at " << pos
<< dendl
;
1631 pos
+= i
->second
.length();
1632 rd
->bl
->claim_append(i
->second
);
1633 assert(rd
->bl
->length() == pos
);
1635 ldout(cct
, 10) << "readx result is " << rd
->bl
->length() << dendl
;
1636 } else if (!error
) {
1637 ldout(cct
, 10) << "readx no bufferlist ptr (readahead?), done." << dendl
;
1638 map
<uint64_t,bufferlist
>::reverse_iterator i
= stripe_map
.rbegin();
1639 pos
= i
->first
+ i
->second
.length();
1643 int ret
= error
? error
: pos
;
1644 ldout(cct
, 20) << "readx done " << rd
<< " " << ret
<< dendl
;
1645 assert(pos
<= (uint64_t) INT_MAX
);
1654 void ObjectCacher::retry_waiting_reads()
1657 ls
.swap(waitfor_read
);
1659 while (!ls
.empty() && waitfor_read
.empty()) {
1660 Context
*ctx
= ls
.front();
1664 waitfor_read
.splice(waitfor_read
.end(), ls
);
1667 int ObjectCacher::writex(OSDWrite
*wr
, ObjectSet
*oset
, Context
*onfreespace
,
1668 ZTracer::Trace
*parent_trace
)
1670 assert(lock
.is_locked());
1671 ceph::real_time now
= ceph::real_clock::now();
1672 uint64_t bytes_written
= 0;
1673 uint64_t bytes_written_in_flush
= 0;
1674 bool dontneed
= wr
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_DONTNEED
;
1675 bool nocache
= wr
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_NOCACHE
;
1677 ZTracer::Trace trace
;
1678 if (parent_trace
!= nullptr) {
1679 trace
.init("write", &trace_endpoint
, parent_trace
);
1680 trace
.event("start");
1683 for (vector
<ObjectExtent
>::iterator ex_it
= wr
->extents
.begin();
1684 ex_it
!= wr
->extents
.end();
1687 sobject_t
soid(ex_it
->oid
, CEPH_NOSNAP
);
1688 Object
*o
= get_object(soid
, ex_it
->objectno
, oset
, ex_it
->oloc
,
1689 ex_it
->truncate_size
, oset
->truncate_seq
);
1691 // map it all into a single bufferhead.
1692 BufferHead
*bh
= o
->map_write(*ex_it
, wr
->journal_tid
);
1693 bool missing
= bh
->is_missing();
1694 bh
->snapc
= wr
->snapc
;
1696 bytes_written
+= ex_it
->length
;
1698 bytes_written_in_flush
+= ex_it
->length
;
1701 // adjust buffer pointers (ie "copy" data into my cache)
1702 // this is over a single ObjectExtent, so we know that
1703 // - there is one contiguous bh
1704 // - the buffer frags need not be (and almost certainly aren't)
1705 // note: i assume striping is monotonic... no jumps backwards, ever!
1706 loff_t opos
= ex_it
->offset
;
1707 for (vector
<pair
<uint64_t, uint64_t> >::iterator f_it
1708 = ex_it
->buffer_extents
.begin();
1709 f_it
!= ex_it
->buffer_extents
.end();
1711 ldout(cct
, 10) << "writex writing " << f_it
->first
<< "~"
1712 << f_it
->second
<< " into " << *bh
<< " at " << opos
1714 uint64_t bhoff
= bh
->start() - opos
;
1715 assert(f_it
->second
<= bh
->length() - bhoff
);
1717 // get the frag we're mapping in
1719 frag
.substr_of(wr
->bl
,
1720 f_it
->first
, f_it
->second
);
1722 // keep anything left of bhoff
1725 newbl
.substr_of(bh
->bl
, 0, bhoff
);
1726 newbl
.claim_append(frag
);
1729 opos
+= f_it
->second
;
1732 // ok, now bh is dirty.
1735 bh
->set_dontneed(true);
1736 else if (nocache
&& missing
)
1737 bh
->set_nocache(true);
1741 bh
->last_write
= now
;
1743 o
->try_merge_bh(bh
);
1747 perfcounter
->inc(l_objectcacher_data_written
, bytes_written
);
1748 if (bytes_written_in_flush
) {
1749 perfcounter
->inc(l_objectcacher_overwritten_in_flush
,
1750 bytes_written_in_flush
);
1754 int r
= _wait_for_write(wr
, bytes_written
, oset
, &trace
, onfreespace
);
1762 class ObjectCacher::C_WaitForWrite
: public Context
{
1764 C_WaitForWrite(ObjectCacher
*oc
, uint64_t len
,
1765 const ZTracer::Trace
&trace
, Context
*onfinish
) :
1766 m_oc(oc
), m_len(len
), m_trace(trace
), m_onfinish(onfinish
) {}
1767 void finish(int r
) override
;
1771 ZTracer::Trace m_trace
;
1772 Context
*m_onfinish
;
1775 void ObjectCacher::C_WaitForWrite::finish(int r
)
1777 Mutex::Locker
l(m_oc
->lock
);
1778 m_oc
->maybe_wait_for_writeback(m_len
, &m_trace
);
1779 m_onfinish
->complete(r
);
1782 void ObjectCacher::maybe_wait_for_writeback(uint64_t len
,
1783 ZTracer::Trace
*trace
)
1785 assert(lock
.is_locked());
1786 ceph::mono_time start
= ceph::mono_clock::now();
1788 // wait for writeback?
1789 // - wait for dirty and tx bytes (relative to the max_dirty threshold)
1790 // - do not wait for bytes other waiters are waiting on. this means that
1791 // threads do not wait for each other. this effectively allows the cache
1792 // size to balloon proportional to the data that is in flight.
1794 uint64_t max_dirty_bh
= max_dirty
>> BUFFER_MEMORY_WEIGHT
;
1795 while (get_stat_dirty() + get_stat_tx() > 0 &&
1796 (((uint64_t)(get_stat_dirty() + get_stat_tx()) >=
1797 max_dirty
+ get_stat_dirty_waiting()) ||
1798 (dirty_or_tx_bh
.size() >=
1799 max_dirty_bh
+ get_stat_nr_dirty_waiters()))) {
1802 trace
->event("start wait for writeback");
1804 ldout(cct
, 10) << __func__
<< " waiting for dirty|tx "
1805 << (get_stat_dirty() + get_stat_tx()) << " >= max "
1806 << max_dirty
<< " + dirty_waiting "
1807 << get_stat_dirty_waiting() << dendl
;
1808 flusher_cond
.Signal();
1809 stat_dirty_waiting
+= len
;
1810 ++stat_nr_dirty_waiters
;
1811 stat_cond
.Wait(lock
);
1812 stat_dirty_waiting
-= len
;
1813 --stat_nr_dirty_waiters
;
1815 ldout(cct
, 10) << __func__
<< " woke up" << dendl
;
1818 trace
->event("finish wait for writeback");
1820 if (blocked
&& perfcounter
) {
1821 perfcounter
->inc(l_objectcacher_write_ops_blocked
);
1822 perfcounter
->inc(l_objectcacher_write_bytes_blocked
, len
);
1823 ceph::timespan blocked
= ceph::mono_clock::now() - start
;
1824 perfcounter
->tinc(l_objectcacher_write_time_blocked
, blocked
);
1828 // blocking wait for write.
1829 int ObjectCacher::_wait_for_write(OSDWrite
*wr
, uint64_t len
, ObjectSet
*oset
,
1830 ZTracer::Trace
*trace
, Context
*onfreespace
)
1832 assert(lock
.is_locked());
1833 assert(trace
!= nullptr);
1836 if (max_dirty
> 0) {
1837 if (block_writes_upfront
) {
1838 maybe_wait_for_writeback(len
, trace
);
1840 onfreespace
->complete(0);
1842 assert(onfreespace
);
1843 finisher
.queue(new C_WaitForWrite(this, len
, *trace
, onfreespace
));
1846 // write-thru! flush what we just wrote.
1849 Context
*fin
= block_writes_upfront
?
1850 new C_Cond(&cond
, &done
, &ret
) : onfreespace
;
1852 bool flushed
= flush_set(oset
, wr
->extents
, trace
, fin
);
1853 assert(!flushed
); // we just dirtied it, and didn't drop our lock!
1854 ldout(cct
, 10) << "wait_for_write waiting on write-thru of " << len
1855 << " bytes" << dendl
;
1856 if (block_writes_upfront
) {
1859 ldout(cct
, 10) << "wait_for_write woke up, ret " << ret
<< dendl
;
1861 onfreespace
->complete(ret
);
1865 // start writeback anyway?
1866 if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty
) {
1867 ldout(cct
, 10) << "wait_for_write " << get_stat_dirty() << " > target "
1868 << target_dirty
<< ", nudging flusher" << dendl
;
1869 flusher_cond
.Signal();
1874 void ObjectCacher::flusher_entry()
1876 ldout(cct
, 10) << "flusher start" << dendl
;
1878 while (!flusher_stop
) {
1879 loff_t all
= get_stat_tx() + get_stat_rx() + get_stat_clean() +
1881 ldout(cct
, 11) << "flusher "
1882 << all
<< " / " << max_size
<< ": "
1883 << get_stat_tx() << " tx, "
1884 << get_stat_rx() << " rx, "
1885 << get_stat_clean() << " clean, "
1886 << get_stat_dirty() << " dirty ("
1887 << target_dirty
<< " target, "
1888 << max_dirty
<< " max)"
1890 loff_t actual
= get_stat_dirty() + get_stat_dirty_waiting();
1892 ZTracer::Trace trace
;
1893 if (cct
->_conf
->osdc_blkin_trace_all
) {
1894 trace
.init("flusher", &trace_endpoint
);
1895 trace
.event("start");
1898 if (actual
> 0 && (uint64_t) actual
> target_dirty
) {
1899 // flush some dirty pages
1900 ldout(cct
, 10) << "flusher " << get_stat_dirty() << " dirty + "
1901 << get_stat_dirty_waiting() << " dirty_waiting > target "
1902 << target_dirty
<< ", flushing some dirty bhs" << dendl
;
1903 flush(&trace
, actual
- target_dirty
);
1905 // check tail of lru for old dirty items
1906 ceph::real_time cutoff
= ceph::real_clock::now();
1907 cutoff
-= max_dirty_age
;
1909 int max
= MAX_FLUSH_UNDER_LOCK
;
1910 while ((bh
= static_cast<BufferHead
*>(bh_lru_dirty
.
1911 lru_get_next_expire())) != 0 &&
1912 bh
->last_write
<= cutoff
&&
1914 ldout(cct
, 10) << "flusher flushing aged dirty bh " << *bh
<< dendl
;
1915 if (scattered_write
) {
1916 bh_write_adjacencies(bh
, cutoff
, NULL
, &max
);
1918 bh_write(bh
, trace
);
1923 // back off the lock to avoid starving other threads
1924 trace
.event("backoff");
1931 trace
.event("finish");
1935 flusher_cond
.WaitInterval(lock
, seconds(1));
1938 /* Wait for reads to finish. This is only possible if handling
1939 * -ENOENT made some read completions finish before their rados read
1940 * came back. If we don't wait for them, and destroy the cache, when
1941 * the rados reads do come back their callback will try to access the
1942 * no-longer-valid ObjectCacher.
1944 while (reads_outstanding
> 0) {
1945 ldout(cct
, 10) << "Waiting for all reads to complete. Number left: "
1946 << reads_outstanding
<< dendl
;
1947 read_cond
.Wait(lock
);
1951 ldout(cct
, 10) << "flusher finish" << dendl
;
1955 // -------------------------------------------------
1957 bool ObjectCacher::set_is_empty(ObjectSet
*oset
)
1959 assert(lock
.is_locked());
1960 if (oset
->objects
.empty())
1963 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin(); !p
.end(); ++p
)
1964 if (!(*p
)->is_empty())
1970 bool ObjectCacher::set_is_cached(ObjectSet
*oset
)
1972 assert(lock
.is_locked());
1973 if (oset
->objects
.empty())
1976 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin();
1979 for (map
<loff_t
,BufferHead
*>::iterator q
= ob
->data
.begin();
1980 q
!= ob
->data
.end();
1982 BufferHead
*bh
= q
->second
;
1983 if (!bh
->is_dirty() && !bh
->is_tx())
1991 bool ObjectCacher::set_is_dirty_or_committing(ObjectSet
*oset
)
1993 assert(lock
.is_locked());
1994 if (oset
->objects
.empty())
1997 for (xlist
<Object
*>::iterator i
= oset
->objects
.begin();
2001 for (map
<loff_t
,BufferHead
*>::iterator p
= ob
->data
.begin();
2002 p
!= ob
->data
.end();
2004 BufferHead
*bh
= p
->second
;
2005 if (bh
->is_dirty() || bh
->is_tx())
2014 // purge. non-blocking. violently removes dirty buffers from cache.
2015 void ObjectCacher::purge(Object
*ob
)
2017 assert(lock
.is_locked());
2018 ldout(cct
, 10) << "purge " << *ob
<< dendl
;
2024 // flush. non-blocking. no callback.
2025 // true if clean, already flushed.
2026 // false if we wrote something.
2027 // be sloppy about the ranges and flush any buffer it touches
2028 bool ObjectCacher::flush(Object
*ob
, loff_t offset
, loff_t length
,
2029 ZTracer::Trace
*trace
)
2031 assert(trace
!= nullptr);
2032 assert(lock
.is_locked());
2033 list
<BufferHead
*> blist
;
2035 ldout(cct
, 10) << "flush " << *ob
<< " " << offset
<< "~" << length
<< dendl
;
2036 for (map
<loff_t
,BufferHead
*>::const_iterator p
= ob
->data_lower_bound(offset
);
2037 p
!= ob
->data
.end();
2039 BufferHead
*bh
= p
->second
;
2040 ldout(cct
, 20) << "flush " << *bh
<< dendl
;
2041 if (length
&& bh
->start() > offset
+length
) {
2048 if (!bh
->is_dirty()) {
2052 if (scattered_write
)
2053 blist
.push_back(bh
);
2055 bh_write(bh
, *trace
);
2058 if (scattered_write
&& !blist
.empty())
2059 bh_write_scattered(blist
);
2064 bool ObjectCacher::_flush_set_finish(C_GatherBuilder
*gather
,
2067 assert(lock
.is_locked());
2068 if (gather
->has_subs()) {
2069 gather
->set_finisher(onfinish
);
2074 ldout(cct
, 10) << "flush_set has no dirty|tx bhs" << dendl
;
2075 onfinish
->complete(0);
2079 // flush. non-blocking, takes callback.
2080 // returns true if already flushed
2081 bool ObjectCacher::flush_set(ObjectSet
*oset
, Context
*onfinish
)
2083 assert(lock
.is_locked());
2084 assert(onfinish
!= NULL
);
2085 if (oset
->objects
.empty()) {
2086 ldout(cct
, 10) << "flush_set on " << oset
<< " dne" << dendl
;
2087 onfinish
->complete(0);
2091 ldout(cct
, 10) << "flush_set " << oset
<< dendl
;
2093 // we'll need to wait for all objects to flush!
2094 C_GatherBuilder
gather(cct
);
2095 set
<Object
*> waitfor_commit
;
2097 list
<BufferHead
*> blist
;
2098 Object
*last_ob
= NULL
;
2099 set
<BufferHead
*, BufferHead::ptr_lt
>::const_iterator it
, p
, q
;
2101 // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
2102 // order. But items in oset->objects are not sorted. So the iterator can
2103 // point to any buffer head in the ObjectSet
2104 BufferHead
key(*oset
->objects
.begin());
2105 it
= dirty_or_tx_bh
.lower_bound(&key
);
2108 bool backwards
= true;
2109 if (it
!= dirty_or_tx_bh
.begin())
2114 for (; p
!= dirty_or_tx_bh
.end(); p
= q
) {
2116 BufferHead
*bh
= *p
;
2117 if (bh
->ob
->oset
!= oset
)
2119 waitfor_commit
.insert(bh
->ob
);
2120 if (bh
->is_dirty()) {
2121 if (scattered_write
) {
2122 if (last_ob
!= bh
->ob
) {
2123 if (!blist
.empty()) {
2124 bh_write_scattered(blist
);
2129 blist
.push_back(bh
);
2137 for(p
= q
= it
; true; p
= q
) {
2138 if (q
!= dirty_or_tx_bh
.begin())
2142 BufferHead
*bh
= *p
;
2143 if (bh
->ob
->oset
!= oset
)
2145 waitfor_commit
.insert(bh
->ob
);
2146 if (bh
->is_dirty()) {
2147 if (scattered_write
) {
2148 if (last_ob
!= bh
->ob
) {
2149 if (!blist
.empty()) {
2150 bh_write_scattered(blist
);
2155 blist
.push_front(bh
);
2165 if (scattered_write
&& !blist
.empty())
2166 bh_write_scattered(blist
);
2168 for (set
<Object
*>::iterator i
= waitfor_commit
.begin();
2169 i
!= waitfor_commit
.end(); ++i
) {
2172 // we'll need to gather...
2173 ldout(cct
, 10) << "flush_set " << oset
<< " will wait for ack tid "
2174 << ob
->last_write_tid
<< " on " << *ob
<< dendl
;
2175 ob
->waitfor_commit
[ob
->last_write_tid
].push_back(gather
.new_sub());
2178 return _flush_set_finish(&gather
, onfinish
);
2181 // flush. non-blocking, takes callback.
2182 // returns true if already flushed
2183 bool ObjectCacher::flush_set(ObjectSet
*oset
, vector
<ObjectExtent
>& exv
,
2184 ZTracer::Trace
*trace
, Context
*onfinish
)
2186 assert(lock
.is_locked());
2187 assert(trace
!= nullptr);
2188 assert(onfinish
!= NULL
);
2189 if (oset
->objects
.empty()) {
2190 ldout(cct
, 10) << "flush_set on " << oset
<< " dne" << dendl
;
2191 onfinish
->complete(0);
2195 ldout(cct
, 10) << "flush_set " << oset
<< " on " << exv
.size()
2196 << " ObjectExtents" << dendl
;
2198 // we'll need to wait for all objects to flush!
2199 C_GatherBuilder
gather(cct
);
2201 for (vector
<ObjectExtent
>::iterator p
= exv
.begin();
2204 ObjectExtent
&ex
= *p
;
2205 sobject_t
soid(ex
.oid
, CEPH_NOSNAP
);
2206 if (objects
[oset
->poolid
].count(soid
) == 0)
2208 Object
*ob
= objects
[oset
->poolid
][soid
];
2210 ldout(cct
, 20) << "flush_set " << oset
<< " ex " << ex
<< " ob " << soid
2211 << " " << ob
<< dendl
;
2213 if (!flush(ob
, ex
.offset
, ex
.length
, trace
)) {
2214 // we'll need to gather...
2215 ldout(cct
, 10) << "flush_set " << oset
<< " will wait for ack tid "
2216 << ob
->last_write_tid
<< " on " << *ob
<< dendl
;
2217 ob
->waitfor_commit
[ob
->last_write_tid
].push_back(gather
.new_sub());
2221 return _flush_set_finish(&gather
, onfinish
);
2224 // flush all dirty data. non-blocking, takes callback.
2225 // returns true if already flushed
2226 bool ObjectCacher::flush_all(Context
*onfinish
)
2228 assert(lock
.is_locked());
2229 assert(onfinish
!= NULL
);
2231 ldout(cct
, 10) << "flush_all " << dendl
;
2233 // we'll need to wait for all objects to flush!
2234 C_GatherBuilder
gather(cct
);
2235 set
<Object
*> waitfor_commit
;
2237 list
<BufferHead
*> blist
;
2238 Object
*last_ob
= NULL
;
2239 set
<BufferHead
*, BufferHead::ptr_lt
>::iterator next
, it
;
2240 next
= it
= dirty_or_tx_bh
.begin();
2241 while (it
!= dirty_or_tx_bh
.end()) {
2243 BufferHead
*bh
= *it
;
2244 waitfor_commit
.insert(bh
->ob
);
2246 if (bh
->is_dirty()) {
2247 if (scattered_write
) {
2248 if (last_ob
!= bh
->ob
) {
2249 if (!blist
.empty()) {
2250 bh_write_scattered(blist
);
2255 blist
.push_back(bh
);
2264 if (scattered_write
&& !blist
.empty())
2265 bh_write_scattered(blist
);
2267 for (set
<Object
*>::iterator i
= waitfor_commit
.begin();
2268 i
!= waitfor_commit
.end();
2272 // we'll need to gather...
2273 ldout(cct
, 10) << "flush_all will wait for ack tid "
2274 << ob
->last_write_tid
<< " on " << *ob
<< dendl
;
2275 ob
->waitfor_commit
[ob
->last_write_tid
].push_back(gather
.new_sub());
2278 return _flush_set_finish(&gather
, onfinish
);
2281 void ObjectCacher::purge_set(ObjectSet
*oset
)
2283 assert(lock
.is_locked());
2284 if (oset
->objects
.empty()) {
2285 ldout(cct
, 10) << "purge_set on " << oset
<< " dne" << dendl
;
2289 ldout(cct
, 10) << "purge_set " << oset
<< dendl
;
2290 const bool were_dirty
= oset
->dirty_or_tx
> 0;
2292 for (xlist
<Object
*>::iterator i
= oset
->objects
.begin();
2298 // Although we have purged rather than flushed, caller should still
2299 // drop any resources associate with dirty data.
2300 assert(oset
->dirty_or_tx
== 0);
2301 if (flush_set_callback
&& were_dirty
) {
2302 flush_set_callback(flush_set_callback_arg
, oset
);
2307 loff_t
ObjectCacher::release(Object
*ob
)
2309 assert(lock
.is_locked());
2310 list
<BufferHead
*> clean
;
2311 loff_t o_unclean
= 0;
2313 for (map
<loff_t
,BufferHead
*>::iterator p
= ob
->data
.begin();
2314 p
!= ob
->data
.end();
2316 BufferHead
*bh
= p
->second
;
2317 if (bh
->is_clean() || bh
->is_zero() || bh
->is_error())
2318 clean
.push_back(bh
);
2320 o_unclean
+= bh
->length();
2323 for (list
<BufferHead
*>::iterator p
= clean
.begin();
2330 if (ob
->can_close()) {
2331 ldout(cct
, 10) << "release trimming " << *ob
<< dendl
;
2333 assert(o_unclean
== 0);
2338 ldout(cct
, 10) << "release clearing complete on " << *ob
<< dendl
;
2339 ob
->complete
= false;
2342 ldout(cct
, 10) << "release setting exists on " << *ob
<< dendl
;
2349 loff_t
ObjectCacher::release_set(ObjectSet
*oset
)
2351 assert(lock
.is_locked());
2352 // return # bytes not clean (and thus not released).
2355 if (oset
->objects
.empty()) {
2356 ldout(cct
, 10) << "release_set on " << oset
<< " dne" << dendl
;
2360 ldout(cct
, 10) << "release_set " << oset
<< dendl
;
2362 xlist
<Object
*>::iterator q
;
2363 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin();
2369 loff_t o_unclean
= release(ob
);
2370 unclean
+= o_unclean
;
2373 ldout(cct
, 10) << "release_set " << oset
<< " " << *ob
2374 << " has " << o_unclean
<< " bytes left"
2380 ldout(cct
, 10) << "release_set " << oset
2381 << ", " << unclean
<< " bytes left" << dendl
;
2388 uint64_t ObjectCacher::release_all()
2390 assert(lock
.is_locked());
2391 ldout(cct
, 10) << "release_all" << dendl
;
2392 uint64_t unclean
= 0;
2394 vector
<ceph::unordered_map
<sobject_t
, Object
*> >::iterator i
2396 while (i
!= objects
.end()) {
2397 ceph::unordered_map
<sobject_t
, Object
*>::iterator p
= i
->begin();
2398 while (p
!= i
->end()) {
2399 ceph::unordered_map
<sobject_t
, Object
*>::iterator n
= p
;
2402 Object
*ob
= p
->second
;
2404 loff_t o_unclean
= release(ob
);
2405 unclean
+= o_unclean
;
2408 ldout(cct
, 10) << "release_all " << *ob
2409 << " has " << o_unclean
<< " bytes left"
2417 ldout(cct
, 10) << "release_all unclean " << unclean
<< " bytes left"
2424 void ObjectCacher::clear_nonexistence(ObjectSet
*oset
)
2426 assert(lock
.is_locked());
2427 ldout(cct
, 10) << "clear_nonexistence() " << oset
<< dendl
;
2429 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin();
2433 ldout(cct
, 10) << " setting exists and complete on " << *ob
<< dendl
;
2435 ob
->complete
= false;
2437 for (xlist
<C_ReadFinish
*>::iterator q
= ob
->reads
.begin();
2439 C_ReadFinish
*comp
= *q
;
2440 comp
->distrust_enoent();
2446 * discard object extents from an ObjectSet by removing the objects in
2447 * exls from the in-memory oset.
2449 void ObjectCacher::discard_set(ObjectSet
*oset
, const vector
<ObjectExtent
>& exls
)
2451 assert(lock
.is_locked());
2452 if (oset
->objects
.empty()) {
2453 ldout(cct
, 10) << "discard_set on " << oset
<< " dne" << dendl
;
2457 ldout(cct
, 10) << "discard_set " << oset
<< dendl
;
2459 bool were_dirty
= oset
->dirty_or_tx
> 0;
2461 for (vector
<ObjectExtent
>::const_iterator p
= exls
.begin();
2464 ldout(cct
, 10) << "discard_set " << oset
<< " ex " << *p
<< dendl
;
2465 const ObjectExtent
&ex
= *p
;
2466 sobject_t
soid(ex
.oid
, CEPH_NOSNAP
);
2467 if (objects
[oset
->poolid
].count(soid
) == 0)
2469 Object
*ob
= objects
[oset
->poolid
][soid
];
2471 ob
->discard(ex
.offset
, ex
.length
);
2474 // did we truncate off dirty data?
2475 if (flush_set_callback
&&
2476 were_dirty
&& oset
->dirty_or_tx
== 0)
2477 flush_set_callback(flush_set_callback_arg
, oset
);
2480 void ObjectCacher::verify_stats() const
2482 assert(lock
.is_locked());
2483 ldout(cct
, 10) << "verify_stats" << dendl
;
2485 loff_t clean
= 0, zero
= 0, dirty
= 0, rx
= 0, tx
= 0, missing
= 0,
2487 for (vector
<ceph::unordered_map
<sobject_t
, Object
*> >::const_iterator i
2491 for (ceph::unordered_map
<sobject_t
, Object
*>::const_iterator p
2495 Object
*ob
= p
->second
;
2496 for (map
<loff_t
, BufferHead
*>::const_iterator q
= ob
->data
.begin();
2497 q
!= ob
->data
.end();
2499 BufferHead
*bh
= q
->second
;
2500 switch (bh
->get_state()) {
2501 case BufferHead::STATE_MISSING
:
2502 missing
+= bh
->length();
2504 case BufferHead::STATE_CLEAN
:
2505 clean
+= bh
->length();
2507 case BufferHead::STATE_ZERO
:
2508 zero
+= bh
->length();
2510 case BufferHead::STATE_DIRTY
:
2511 dirty
+= bh
->length();
2513 case BufferHead::STATE_TX
:
2516 case BufferHead::STATE_RX
:
2519 case BufferHead::STATE_ERROR
:
2520 error
+= bh
->length();
2529 ldout(cct
, 10) << " clean " << clean
<< " rx " << rx
<< " tx " << tx
2530 << " dirty " << dirty
<< " missing " << missing
2531 << " error " << error
<< dendl
;
2532 assert(clean
== stat_clean
);
2533 assert(rx
== stat_rx
);
2534 assert(tx
== stat_tx
);
2535 assert(dirty
== stat_dirty
);
2536 assert(missing
== stat_missing
);
2537 assert(zero
== stat_zero
);
2538 assert(error
== stat_error
);
2541 void ObjectCacher::bh_stat_add(BufferHead
*bh
)
2543 assert(lock
.is_locked());
2544 switch (bh
->get_state()) {
2545 case BufferHead::STATE_MISSING
:
2546 stat_missing
+= bh
->length();
2548 case BufferHead::STATE_CLEAN
:
2549 stat_clean
+= bh
->length();
2551 case BufferHead::STATE_ZERO
:
2552 stat_zero
+= bh
->length();
2554 case BufferHead::STATE_DIRTY
:
2555 stat_dirty
+= bh
->length();
2556 bh
->ob
->dirty_or_tx
+= bh
->length();
2557 bh
->ob
->oset
->dirty_or_tx
+= bh
->length();
2559 case BufferHead::STATE_TX
:
2560 stat_tx
+= bh
->length();
2561 bh
->ob
->dirty_or_tx
+= bh
->length();
2562 bh
->ob
->oset
->dirty_or_tx
+= bh
->length();
2564 case BufferHead::STATE_RX
:
2565 stat_rx
+= bh
->length();
2567 case BufferHead::STATE_ERROR
:
2568 stat_error
+= bh
->length();
2571 assert(0 == "bh_stat_add: invalid bufferhead state");
2573 if (get_stat_dirty_waiting() > 0)
2577 void ObjectCacher::bh_stat_sub(BufferHead
*bh
)
2579 assert(lock
.is_locked());
2580 switch (bh
->get_state()) {
2581 case BufferHead::STATE_MISSING
:
2582 stat_missing
-= bh
->length();
2584 case BufferHead::STATE_CLEAN
:
2585 stat_clean
-= bh
->length();
2587 case BufferHead::STATE_ZERO
:
2588 stat_zero
-= bh
->length();
2590 case BufferHead::STATE_DIRTY
:
2591 stat_dirty
-= bh
->length();
2592 bh
->ob
->dirty_or_tx
-= bh
->length();
2593 bh
->ob
->oset
->dirty_or_tx
-= bh
->length();
2595 case BufferHead::STATE_TX
:
2596 stat_tx
-= bh
->length();
2597 bh
->ob
->dirty_or_tx
-= bh
->length();
2598 bh
->ob
->oset
->dirty_or_tx
-= bh
->length();
2600 case BufferHead::STATE_RX
:
2601 stat_rx
-= bh
->length();
2603 case BufferHead::STATE_ERROR
:
2604 stat_error
-= bh
->length();
2607 assert(0 == "bh_stat_sub: invalid bufferhead state");
2611 void ObjectCacher::bh_set_state(BufferHead
*bh
, int s
)
2613 assert(lock
.is_locked());
2614 int state
= bh
->get_state();
2615 // move between lru lists?
2616 if (s
== BufferHead::STATE_DIRTY
&& state
!= BufferHead::STATE_DIRTY
) {
2617 bh_lru_rest
.lru_remove(bh
);
2618 bh_lru_dirty
.lru_insert_top(bh
);
2619 } else if (s
!= BufferHead::STATE_DIRTY
&&state
== BufferHead::STATE_DIRTY
) {
2620 bh_lru_dirty
.lru_remove(bh
);
2621 if (bh
->get_dontneed())
2622 bh_lru_rest
.lru_insert_bot(bh
);
2624 bh_lru_rest
.lru_insert_top(bh
);
2627 if ((s
== BufferHead::STATE_TX
||
2628 s
== BufferHead::STATE_DIRTY
) &&
2629 state
!= BufferHead::STATE_TX
&&
2630 state
!= BufferHead::STATE_DIRTY
) {
2631 dirty_or_tx_bh
.insert(bh
);
2632 } else if ((state
== BufferHead::STATE_TX
||
2633 state
== BufferHead::STATE_DIRTY
) &&
2634 s
!= BufferHead::STATE_TX
&&
2635 s
!= BufferHead::STATE_DIRTY
) {
2636 dirty_or_tx_bh
.erase(bh
);
2639 if (s
!= BufferHead::STATE_ERROR
&&
2640 state
== BufferHead::STATE_ERROR
) {
2650 void ObjectCacher::bh_add(Object
*ob
, BufferHead
*bh
)
2652 assert(lock
.is_locked());
2653 ldout(cct
, 30) << "bh_add " << *ob
<< " " << *bh
<< dendl
;
2655 if (bh
->is_dirty()) {
2656 bh_lru_dirty
.lru_insert_top(bh
);
2657 dirty_or_tx_bh
.insert(bh
);
2659 if (bh
->get_dontneed())
2660 bh_lru_rest
.lru_insert_bot(bh
);
2662 bh_lru_rest
.lru_insert_top(bh
);
2666 dirty_or_tx_bh
.insert(bh
);
2671 void ObjectCacher::bh_remove(Object
*ob
, BufferHead
*bh
)
2673 assert(lock
.is_locked());
2674 assert(bh
->get_journal_tid() == 0);
2675 ldout(cct
, 30) << "bh_remove " << *ob
<< " " << *bh
<< dendl
;
2677 if (bh
->is_dirty()) {
2678 bh_lru_dirty
.lru_remove(bh
);
2679 dirty_or_tx_bh
.erase(bh
);
2681 bh_lru_rest
.lru_remove(bh
);
2685 dirty_or_tx_bh
.erase(bh
);
2688 if (get_stat_dirty_waiting() > 0)