1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include "msg/Messenger.h"
7 #include "ObjectCacher.h"
8 #include "WritebackHandler.h"
9 #include "common/errno.h"
10 #include "common/perf_counters.h"
12 #include "include/assert.h"
14 #define MAX_FLUSH_UNDER_LOCK 20 ///< max bh's we start writeback on
16 using std::chrono::seconds
;
17 /// while holding the lock
19 /*** ObjectCacher::BufferHead ***/
22 /*** ObjectCacher::Object ***/
24 #define dout_subsys ceph_subsys_objectcacher
26 #define dout_prefix *_dout << "objectcacher.object(" << oid << ") "
30 class ObjectCacher::C_ReadFinish
: public Context
{
36 xlist
<C_ReadFinish
*>::item set_item
;
43 C_ReadFinish(ObjectCacher
*c
, Object
*ob
, ceph_tid_t t
, loff_t s
,
44 uint64_t l
, const ZTracer::Trace
&trace
) :
45 oc(c
), poolid(ob
->oloc
.pool
), oid(ob
->get_soid()), start(s
), length(l
),
46 set_item(this), trust_enoent(true),
47 tid(t
), trace(trace
) {
48 ob
->reads
.push_back(&set_item
);
51 void finish(int r
) override
{
52 oc
->bh_read_finish(poolid
, oid
, tid
, start
, length
, bl
, r
, trust_enoent
);
53 trace
.event("finish");
55 // object destructor clears the list
56 if (set_item
.is_on_list())
57 set_item
.remove_myself();
60 void distrust_enoent() {
65 class ObjectCacher::C_RetryRead
: public Context
{
72 C_RetryRead(ObjectCacher
*_oc
, OSDRead
*r
, ObjectSet
*os
, Context
*c
,
73 const ZTracer::Trace
&trace
)
74 : oc(_oc
), rd(r
), oset(os
), onfinish(c
), trace(trace
) {
76 void finish(int r
) override
{
78 r
= oc
->_readx(rd
, oset
, onfinish
, false, &trace
);
82 // read is still in-progress
86 trace
.event("finish");
88 onfinish
->complete(r
);
93 ObjectCacher::BufferHead
*ObjectCacher::Object::split(BufferHead
*left
,
96 assert(oc
->lock
.is_locked());
97 ldout(oc
->cct
, 20) << "split " << *left
<< " at " << off
<< dendl
;
100 ObjectCacher::BufferHead
*right
= new BufferHead(this);
102 //inherit and if later access, this auto clean.
103 right
->set_dontneed(left
->get_dontneed());
104 right
->set_nocache(left
->get_nocache());
106 right
->last_write_tid
= left
->last_write_tid
;
107 right
->last_read_tid
= left
->last_read_tid
;
108 right
->set_state(left
->get_state());
109 right
->snapc
= left
->snapc
;
110 right
->set_journal_tid(left
->journal_tid
);
112 loff_t newleftlen
= off
- left
->start();
113 right
->set_start(off
);
114 right
->set_length(left
->length() - newleftlen
);
117 oc
->bh_stat_sub(left
);
118 left
->set_length(newleftlen
);
119 oc
->bh_stat_add(left
);
122 oc
->bh_add(this, right
);
128 assert(bl
.length() == (left
->length() + right
->length()));
129 right
->bl
.substr_of(bl
, left
->length(), right
->length());
130 left
->bl
.substr_of(bl
, 0, left
->length());
134 if (!left
->waitfor_read
.empty()) {
135 map
<loff_t
, list
<Context
*> >::iterator start_remove
136 = left
->waitfor_read
.begin();
137 while (start_remove
!= left
->waitfor_read
.end() &&
138 start_remove
->first
< right
->start())
140 for (map
<loff_t
, list
<Context
*> >::iterator p
= start_remove
;
141 p
!= left
->waitfor_read
.end(); ++p
) {
142 ldout(oc
->cct
, 20) << "split moving waiters at byte " << p
->first
143 << " to right bh" << dendl
;
144 right
->waitfor_read
[p
->first
].swap( p
->second
);
145 assert(p
->second
.empty());
147 left
->waitfor_read
.erase(start_remove
, left
->waitfor_read
.end());
150 ldout(oc
->cct
, 20) << "split left is " << *left
<< dendl
;
151 ldout(oc
->cct
, 20) << "split right is " << *right
<< dendl
;
156 void ObjectCacher::Object::merge_left(BufferHead
*left
, BufferHead
*right
)
158 assert(oc
->lock
.is_locked());
159 assert(left
->end() == right
->start());
160 assert(left
->get_state() == right
->get_state());
161 assert(left
->can_merge_journal(right
));
163 ldout(oc
->cct
, 10) << "merge_left " << *left
<< " + " << *right
<< dendl
;
164 if (left
->get_journal_tid() == 0) {
165 left
->set_journal_tid(right
->get_journal_tid());
167 right
->set_journal_tid(0);
169 oc
->bh_remove(this, right
);
170 oc
->bh_stat_sub(left
);
171 left
->set_length(left
->length() + right
->length());
172 oc
->bh_stat_add(left
);
175 left
->bl
.claim_append(right
->bl
);
178 // note: this is sorta busted, but should only be used for dirty buffers
179 left
->last_write_tid
= MAX( left
->last_write_tid
, right
->last_write_tid
);
180 left
->last_write
= MAX( left
->last_write
, right
->last_write
);
182 left
->set_dontneed(right
->get_dontneed() ? left
->get_dontneed() : false);
183 left
->set_nocache(right
->get_nocache() ? left
->get_nocache() : false);
186 for (map
<loff_t
, list
<Context
*> >::iterator p
= right
->waitfor_read
.begin();
187 p
!= right
->waitfor_read
.end();
189 left
->waitfor_read
[p
->first
].splice(left
->waitfor_read
[p
->first
].begin(),
195 ldout(oc
->cct
, 10) << "merge_left result " << *left
<< dendl
;
198 void ObjectCacher::Object::try_merge_bh(BufferHead
*bh
)
200 assert(oc
->lock
.is_locked());
201 ldout(oc
->cct
, 10) << "try_merge_bh " << *bh
<< dendl
;
203 // do not merge rx buffers; last_read_tid may not match
208 map
<loff_t
,BufferHead
*>::iterator p
= data
.find(bh
->start());
209 assert(p
->second
== bh
);
210 if (p
!= data
.begin()) {
212 if (p
->second
->end() == bh
->start() &&
213 p
->second
->get_state() == bh
->get_state() &&
214 p
->second
->can_merge_journal(bh
)) {
215 merge_left(p
->second
, bh
);
222 assert(p
->second
== bh
);
224 if (p
!= data
.end() &&
225 p
->second
->start() == bh
->end() &&
226 p
->second
->get_state() == bh
->get_state() &&
227 p
->second
->can_merge_journal(bh
))
228 merge_left(bh
, p
->second
);
232 * count bytes we have cached in given range
234 bool ObjectCacher::Object::is_cached(loff_t cur
, loff_t left
) const
236 assert(oc
->lock
.is_locked());
237 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(cur
);
242 if (p
->first
<= cur
) {
244 loff_t lenfromcur
= MIN(p
->second
->end() - cur
, left
);
249 } else if (p
->first
> cur
) {
260 * all cached data in this range[off, off+len]
262 bool ObjectCacher::Object::include_all_cached_data(loff_t off
, loff_t len
)
264 assert(oc
->lock
.is_locked());
267 map
<loff_t
, BufferHead
*>::iterator first
= data
.begin();
268 map
<loff_t
, BufferHead
*>::reverse_iterator last
= data
.rbegin();
269 if (first
->second
->start() >= off
&& last
->second
->end() <= (off
+ len
))
276 * map a range of bytes into buffer_heads.
277 * - create missing buffer_heads as necessary.
279 int ObjectCacher::Object::map_read(ObjectExtent
&ex
,
280 map
<loff_t
, BufferHead
*>& hits
,
281 map
<loff_t
, BufferHead
*>& missing
,
282 map
<loff_t
, BufferHead
*>& rx
,
283 map
<loff_t
, BufferHead
*>& errors
)
285 assert(oc
->lock
.is_locked());
286 ldout(oc
->cct
, 10) << "map_read " << ex
.oid
<< " "
287 << ex
.offset
<< "~" << ex
.length
<< dendl
;
289 loff_t cur
= ex
.offset
;
290 loff_t left
= ex
.length
;
292 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(ex
.offset
);
295 if (p
== data
.end()) {
297 BufferHead
*n
= new BufferHead(this);
304 ldout(oc
->cct
, 20) << "map_read miss+complete+zero " << left
<< " left, " << *n
<< dendl
;
307 ldout(oc
->cct
, 20) << "map_read miss " << left
<< " left, " << *n
<< dendl
;
310 assert(cur
== (loff_t
)ex
.offset
+ (loff_t
)ex
.length
);
314 if (p
->first
<= cur
) {
315 // have it (or part of it)
316 BufferHead
*e
= p
->second
;
322 hits
[cur
] = e
; // readable!
323 ldout(oc
->cct
, 20) << "map_read hit " << *e
<< dendl
;
324 } else if (e
->is_rx()) {
325 rx
[cur
] = e
; // missing, not readable.
326 ldout(oc
->cct
, 20) << "map_read rx " << *e
<< dendl
;
327 } else if (e
->is_error()) {
329 ldout(oc
->cct
, 20) << "map_read error " << *e
<< dendl
;
334 loff_t lenfromcur
= MIN(e
->end() - cur
, left
);
340 } else if (p
->first
> cur
) {
342 loff_t next
= p
->first
;
343 BufferHead
*n
= new BufferHead(this);
344 loff_t len
= MIN(next
- cur
, left
);
351 ldout(oc
->cct
, 20) << "map_read gap+complete+zero " << *n
<< dendl
;
354 ldout(oc
->cct
, 20) << "map_read gap " << *n
<< dendl
;
356 cur
+= MIN(left
, n
->length());
357 left
-= MIN(left
, n
->length());
366 void ObjectCacher::Object::audit_buffers()
369 for (map
<loff_t
, BufferHead
*>::const_iterator it
= data
.begin();
370 it
!= data
.end(); ++it
) {
371 if (it
->first
!= it
->second
->start()) {
372 lderr(oc
->cct
) << "AUDIT FAILURE: map position " << it
->first
373 << " does not match bh start position: "
374 << *it
->second
<< dendl
;
375 assert(it
->first
== it
->second
->start());
377 if (it
->first
< offset
) {
378 lderr(oc
->cct
) << "AUDIT FAILURE: " << it
->first
<< " " << *it
->second
379 << " overlaps with previous bh " << *((--it
)->second
)
381 assert(it
->first
>= offset
);
383 BufferHead
*bh
= it
->second
;
384 map
<loff_t
, list
<Context
*> >::const_iterator w_it
;
385 for (w_it
= bh
->waitfor_read
.begin();
386 w_it
!= bh
->waitfor_read
.end(); ++w_it
) {
387 if (w_it
->first
< bh
->start() ||
388 w_it
->first
>= bh
->start() + bh
->length()) {
389 lderr(oc
->cct
) << "AUDIT FAILURE: waiter at " << w_it
->first
390 << " is not within bh " << *bh
<< dendl
;
391 assert(w_it
->first
>= bh
->start());
392 assert(w_it
->first
< bh
->start() + bh
->length());
395 offset
= it
->first
+ it
->second
->length();
400 * map a range of extents on an object's buffer cache.
401 * - combine any bh's we're writing into one
402 * - break up bufferheads that don't fall completely within the range
403 * //no! - return a bh that includes the write. may also include
404 * other dirty data to left and/or right.
406 ObjectCacher::BufferHead
*ObjectCacher::Object::map_write(ObjectExtent
&ex
,
409 assert(oc
->lock
.is_locked());
410 BufferHead
*final
= 0;
412 ldout(oc
->cct
, 10) << "map_write oex " << ex
.oid
413 << " " << ex
.offset
<< "~" << ex
.length
<< dendl
;
415 loff_t cur
= ex
.offset
;
416 loff_t left
= ex
.length
;
418 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(ex
.offset
);
423 if (p
== data
.end()) {
425 final
= new BufferHead(this);
426 replace_journal_tid(final
, tid
);
427 final
->set_start( cur
);
428 final
->set_length( max
);
429 oc
->bh_add(this, final
);
430 ldout(oc
->cct
, 10) << "map_write adding trailing bh " << *final
<< dendl
;
432 oc
->bh_stat_sub(final
);
433 final
->set_length(final
->length() + max
);
434 oc
->bh_stat_add(final
);
441 ldout(oc
->cct
, 10) << "cur is " << cur
<< ", p is " << *p
->second
<< dendl
;
442 //oc->verify_stats();
444 if (p
->first
<= cur
) {
445 BufferHead
*bh
= p
->second
;
446 ldout(oc
->cct
, 10) << "map_write bh " << *bh
<< " intersected" << dendl
;
448 if (p
->first
< cur
) {
450 if (cur
+ max
>= bh
->end()) {
451 // we want right bit (one splice)
452 final
= split(bh
, cur
); // just split it, take right half.
453 replace_journal_tid(final
, tid
);
455 assert(p
->second
== final
);
457 // we want middle bit (two splices)
458 final
= split(bh
, cur
);
460 assert(p
->second
== final
);
461 split(final
, cur
+max
);
462 replace_journal_tid(final
, tid
);
465 assert(p
->first
== cur
);
466 if (bh
->length() <= max
) {
467 // whole bufferhead, piece of cake.
469 // we want left bit (one splice)
470 split(bh
, cur
+ max
); // just split
474 oc
->mark_dirty(final
);
475 --p
; // move iterator back to final
476 assert(p
->second
== final
);
477 replace_journal_tid(bh
, tid
);
478 merge_left(final
, bh
);
481 replace_journal_tid(final
, tid
);
486 loff_t lenfromcur
= final
->end() - cur
;
493 loff_t next
= p
->first
;
494 loff_t glen
= MIN(next
- cur
, max
);
495 ldout(oc
->cct
, 10) << "map_write gap " << cur
<< "~" << glen
<< dendl
;
497 oc
->bh_stat_sub(final
);
498 final
->set_length(final
->length() + glen
);
499 oc
->bh_stat_add(final
);
501 final
= new BufferHead(this);
502 replace_journal_tid(final
, tid
);
503 final
->set_start( cur
);
504 final
->set_length( glen
);
505 oc
->bh_add(this, final
);
516 assert(final
->get_journal_tid() == tid
);
517 ldout(oc
->cct
, 10) << "map_write final is " << *final
<< dendl
;
522 void ObjectCacher::Object::replace_journal_tid(BufferHead
*bh
,
524 ceph_tid_t bh_tid
= bh
->get_journal_tid();
526 assert(tid
== 0 || bh_tid
<= tid
);
527 if (bh_tid
!= 0 && bh_tid
!= tid
) {
528 // inform journal that it should not expect a writeback from this extent
529 oc
->writeback_handler
.overwrite_extent(get_oid(), bh
->start(),
530 bh
->length(), bh_tid
, tid
);
532 bh
->set_journal_tid(tid
);
535 void ObjectCacher::Object::truncate(loff_t s
)
537 assert(oc
->lock
.is_locked());
538 ldout(oc
->cct
, 10) << "truncate " << *this << " to " << s
<< dendl
;
540 while (!data
.empty()) {
541 BufferHead
*bh
= data
.rbegin()->second
;
545 // split bh at truncation point?
546 if (bh
->start() < s
) {
551 // remove bh entirely
552 assert(bh
->start() >= s
);
553 assert(bh
->waitfor_read
.empty());
554 replace_journal_tid(bh
, 0);
555 oc
->bh_remove(this, bh
);
560 void ObjectCacher::Object::discard(loff_t off
, loff_t len
)
562 assert(oc
->lock
.is_locked());
563 ldout(oc
->cct
, 10) << "discard " << *this << " " << off
<< "~" << len
567 ldout(oc
->cct
, 10) << " setting exists on " << *this << dendl
;
571 ldout(oc
->cct
, 10) << " clearing complete on " << *this << dendl
;
575 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(off
);
576 while (p
!= data
.end()) {
577 BufferHead
*bh
= p
->second
;
578 if (bh
->start() >= off
+ len
)
581 // split bh at truncation point?
582 if (bh
->start() < off
) {
588 assert(bh
->start() >= off
);
589 if (bh
->end() > off
+ len
) {
590 split(bh
, off
+ len
);
594 ldout(oc
->cct
, 10) << "discard " << *this << " bh " << *bh
<< dendl
;
595 assert(bh
->waitfor_read
.empty());
596 replace_journal_tid(bh
, 0);
597 oc
->bh_remove(this, bh
);
604 /*** ObjectCacher ***/
607 #define dout_prefix *_dout << "objectcacher "
610 ObjectCacher::ObjectCacher(CephContext
*cct_
, string name
,
611 WritebackHandler
& wb
, Mutex
& l
,
612 flush_set_callback_t flush_callback
,
613 void *flush_callback_arg
, uint64_t max_bytes
,
614 uint64_t max_objects
, uint64_t max_dirty
,
615 uint64_t target_dirty
, double max_dirty_age
,
616 bool block_writes_upfront
)
618 cct(cct_
), writeback_handler(wb
), name(name
), lock(l
),
619 max_dirty(max_dirty
), target_dirty(target_dirty
),
620 max_size(max_bytes
), max_objects(max_objects
),
621 max_dirty_age(ceph::make_timespan(max_dirty_age
)),
622 block_writes_upfront(block_writes_upfront
),
623 trace_endpoint("ObjectCacher"),
624 flush_set_callback(flush_callback
),
625 flush_set_callback_arg(flush_callback_arg
),
626 last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct
),
627 stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
628 stat_missing(0), stat_error(0), stat_dirty_waiting(0), reads_outstanding(0)
632 scattered_write
= writeback_handler
.can_scattered_write();
635 ObjectCacher::~ObjectCacher()
639 // we should be empty.
640 for (vector
<ceph::unordered_map
<sobject_t
, Object
*> >::iterator i
645 assert(bh_lru_rest
.lru_get_size() == 0);
646 assert(bh_lru_dirty
.lru_get_size() == 0);
647 assert(ob_lru
.lru_get_size() == 0);
648 assert(dirty_or_tx_bh
.empty());
651 void ObjectCacher::perf_start()
653 string n
= "objectcacher-" + name
;
654 PerfCountersBuilder
plb(cct
, n
, l_objectcacher_first
, l_objectcacher_last
);
656 plb
.add_u64_counter(l_objectcacher_cache_ops_hit
,
657 "cache_ops_hit", "Hit operations");
658 plb
.add_u64_counter(l_objectcacher_cache_ops_miss
,
659 "cache_ops_miss", "Miss operations");
660 plb
.add_u64_counter(l_objectcacher_cache_bytes_hit
,
661 "cache_bytes_hit", "Hit data");
662 plb
.add_u64_counter(l_objectcacher_cache_bytes_miss
,
663 "cache_bytes_miss", "Miss data");
664 plb
.add_u64_counter(l_objectcacher_data_read
,
665 "data_read", "Read data");
666 plb
.add_u64_counter(l_objectcacher_data_written
,
667 "data_written", "Data written to cache");
668 plb
.add_u64_counter(l_objectcacher_data_flushed
,
669 "data_flushed", "Data flushed");
670 plb
.add_u64_counter(l_objectcacher_overwritten_in_flush
,
671 "data_overwritten_while_flushing",
672 "Data overwritten while flushing");
673 plb
.add_u64_counter(l_objectcacher_write_ops_blocked
, "write_ops_blocked",
674 "Write operations, delayed due to dirty limits");
675 plb
.add_u64_counter(l_objectcacher_write_bytes_blocked
,
676 "write_bytes_blocked",
677 "Write data blocked on dirty limit");
678 plb
.add_time(l_objectcacher_write_time_blocked
, "write_time_blocked",
679 "Time spent blocking a write due to dirty limits");
681 perfcounter
= plb
.create_perf_counters();
682 cct
->get_perfcounters_collection()->add(perfcounter
);
685 void ObjectCacher::perf_stop()
688 cct
->get_perfcounters_collection()->remove(perfcounter
);
693 ObjectCacher::Object
*ObjectCacher::get_object(sobject_t oid
,
697 uint64_t truncate_size
,
698 uint64_t truncate_seq
)
700 // XXX: Add handling of nspace in object_locator_t in cache
701 assert(lock
.is_locked());
703 if ((uint32_t)l
.pool
< objects
.size()) {
704 if (objects
[l
.pool
].count(oid
)) {
705 Object
*o
= objects
[l
.pool
][oid
];
706 o
->object_no
= object_no
;
707 o
->truncate_size
= truncate_size
;
708 o
->truncate_seq
= truncate_seq
;
712 objects
.resize(l
.pool
+1);
716 Object
*o
= new Object(this, oid
, object_no
, oset
, l
, truncate_size
,
718 objects
[l
.pool
][oid
] = o
;
719 ob_lru
.lru_insert_top(o
);
723 void ObjectCacher::close_object(Object
*ob
)
725 assert(lock
.is_locked());
726 ldout(cct
, 10) << "close_object " << *ob
<< dendl
;
727 assert(ob
->can_close());
730 ob_lru
.lru_remove(ob
);
731 objects
[ob
->oloc
.pool
].erase(ob
->get_soid());
732 ob
->set_item
.remove_myself();
736 void ObjectCacher::bh_read(BufferHead
*bh
, int op_flags
,
737 const ZTracer::Trace
&parent_trace
)
739 assert(lock
.is_locked());
740 ldout(cct
, 7) << "bh_read on " << *bh
<< " outstanding reads "
741 << reads_outstanding
<< dendl
;
743 ZTracer::Trace trace
;
744 if (parent_trace
.valid()) {
745 trace
.init("", &trace_endpoint
, &parent_trace
);
746 trace
.copy_name("bh_read " + bh
->ob
->get_oid().name
);
747 trace
.event("start");
751 bh
->last_read_tid
= ++last_read_tid
;
754 C_ReadFinish
*onfinish
= new C_ReadFinish(this, bh
->ob
, bh
->last_read_tid
,
755 bh
->start(), bh
->length(), trace
);
757 writeback_handler
.read(bh
->ob
->get_oid(), bh
->ob
->get_object_number(),
758 bh
->ob
->get_oloc(), bh
->start(), bh
->length(),
759 bh
->ob
->get_snap(), &onfinish
->bl
,
760 bh
->ob
->truncate_size
, bh
->ob
->truncate_seq
,
761 op_flags
, trace
, onfinish
);
766 void ObjectCacher::bh_read_finish(int64_t poolid
, sobject_t oid
,
767 ceph_tid_t tid
, loff_t start
,
768 uint64_t length
, bufferlist
&bl
, int r
,
771 assert(lock
.is_locked());
772 ldout(cct
, 7) << "bh_read_finish "
775 << " " << start
<< "~" << length
776 << " (bl is " << bl
.length() << ")"
778 << " outstanding reads " << reads_outstanding
781 if (r
>= 0 && bl
.length() < length
) {
782 ldout(cct
, 7) << "bh_read_finish " << oid
<< " padding " << start
<< "~"
783 << length
<< " with " << length
- bl
.length() << " bytes of zeroes"
785 bl
.append_zero(length
- bl
.length());
791 if (objects
[poolid
].count(oid
) == 0) {
792 ldout(cct
, 7) << "bh_read_finish no object cache" << dendl
;
794 Object
*ob
= objects
[poolid
][oid
];
796 if (r
== -ENOENT
&& !ob
->complete
) {
797 // wake up *all* rx waiters, or else we risk reordering
798 // identical reads. e.g.
800 // reply to unrelated 3~1 -> !exists
801 // read 1~1 -> immediate ENOENT
802 // reply to first 1~1 -> ooo ENOENT
804 for (map
<loff_t
, BufferHead
*>::iterator p
= ob
->data
.begin();
805 p
!= ob
->data
.end(); ++p
) {
806 BufferHead
*bh
= p
->second
;
807 for (map
<loff_t
, list
<Context
*> >::iterator p
808 = bh
->waitfor_read
.begin();
809 p
!= bh
->waitfor_read
.end();
811 ls
.splice(ls
.end(), p
->second
);
812 bh
->waitfor_read
.clear();
813 if (!bh
->is_zero() && !bh
->is_rx())
817 // just pass through and retry all waiters if we don't trust
818 // -ENOENT for this read
821 << "bh_read_finish ENOENT, marking complete and !exists on " << *ob
826 /* If all the bhs are effectively zero, get rid of them. All
827 * the waiters will be retried and get -ENOENT immediately, so
828 * it's safe to clean up the unneeded bh's now. Since we know
829 * it's safe to remove them now, do so, so they aren't hanging
830 *around waiting for more -ENOENTs from rados while the cache
831 * is being shut down.
833 * Only do this when all the bhs are rx or clean, to match the
834 * condition in _readx(). If there are any non-rx or non-clean
835 * bhs, _readx() will wait for the final result instead of
836 * returning -ENOENT immediately.
840 << "bh_read_finish ENOENT and allzero, getting rid of "
841 << "bhs for " << *ob
<< dendl
;
842 map
<loff_t
, BufferHead
*>::iterator p
= ob
->data
.begin();
843 while (p
!= ob
->data
.end()) {
844 BufferHead
*bh
= p
->second
;
845 // current iterator will be invalidated by bh_remove()
857 map
<loff_t
, BufferHead
*>::const_iterator p
= ob
->data_lower_bound(opos
);
858 if (p
== ob
->data
.end())
860 if (opos
>= start
+(loff_t
)length
) {
861 ldout(cct
, 20) << "break due to opos " << opos
<< " >= start+length "
862 << start
<< "+" << length
<< "=" << start
+(loff_t
)length
867 BufferHead
*bh
= p
->second
;
868 ldout(cct
, 20) << "checking bh " << *bh
<< dendl
;
871 for (map
<loff_t
, list
<Context
*> >::iterator it
872 = bh
->waitfor_read
.begin();
873 it
!= bh
->waitfor_read
.end();
875 ls
.splice(ls
.end(), it
->second
);
876 bh
->waitfor_read
.clear();
878 if (bh
->start() > opos
) {
879 ldout(cct
, 1) << "bh_read_finish skipping gap "
880 << opos
<< "~" << bh
->start() - opos
887 ldout(cct
, 10) << "bh_read_finish skipping non-rx " << *bh
<< dendl
;
892 if (bh
->last_read_tid
!= tid
) {
893 ldout(cct
, 10) << "bh_read_finish bh->last_read_tid "
894 << bh
->last_read_tid
<< " != tid " << tid
895 << ", skipping" << dendl
;
900 assert(opos
>= bh
->start());
901 assert(bh
->start() == opos
); // we don't merge rx bh's... yet!
902 assert(bh
->length() <= start
+(loff_t
)length
-opos
);
911 ldout(cct
, 10) << "bh_read_finish removing " << *bh
<< dendl
;
915 ldout(cct
, 10) << "skipping unstrusted -ENOENT and will retry for "
931 ldout(cct
, 10) << "bh_read_finish read " << *bh
<< dendl
;
933 ob
->try_merge_bh(bh
);
937 // called with lock held.
938 ldout(cct
, 20) << "finishing waiters " << ls
<< dendl
;
940 finish_contexts(cct
, ls
, err
);
941 retry_waiting_reads();
947 void ObjectCacher::bh_write_adjacencies(BufferHead
*bh
, ceph::real_time cutoff
,
948 int64_t *max_amount
, int *max_count
)
950 list
<BufferHead
*> blist
;
953 int64_t total_len
= 0;
954 set
<BufferHead
*, BufferHead::ptr_lt
>::iterator it
= dirty_or_tx_bh
.find(bh
);
955 assert(it
!= dirty_or_tx_bh
.end());
956 for (set
<BufferHead
*, BufferHead::ptr_lt
>::iterator p
= it
;
957 p
!= dirty_or_tx_bh
.end();
959 BufferHead
*obh
= *p
;
960 if (obh
->ob
!= bh
->ob
)
962 if (obh
->is_dirty() && obh
->last_write
<= cutoff
) {
963 blist
.push_back(obh
);
965 total_len
+= obh
->length();
966 if ((max_count
&& count
> *max_count
) ||
967 (max_amount
&& total_len
> *max_amount
))
972 while (it
!= dirty_or_tx_bh
.begin()) {
974 BufferHead
*obh
= *it
;
975 if (obh
->ob
!= bh
->ob
)
977 if (obh
->is_dirty() && obh
->last_write
<= cutoff
) {
978 blist
.push_front(obh
);
980 total_len
+= obh
->length();
981 if ((max_count
&& count
> *max_count
) ||
982 (max_amount
&& total_len
> *max_amount
))
989 *max_amount
-= total_len
;
991 bh_write_scattered(blist
);
994 class ObjectCacher::C_WriteCommit
: public Context
{
998 vector
<pair
<loff_t
, uint64_t> > ranges
;
999 ZTracer::Trace trace
;
1002 C_WriteCommit(ObjectCacher
*c
, int64_t _poolid
, sobject_t o
, loff_t s
,
1003 uint64_t l
, const ZTracer::Trace
&trace
) :
1004 oc(c
), poolid(_poolid
), oid(o
), trace(trace
) {
1005 ranges
.push_back(make_pair(s
, l
));
1007 C_WriteCommit(ObjectCacher
*c
, int64_t _poolid
, sobject_t o
,
1008 vector
<pair
<loff_t
, uint64_t> >& _ranges
) :
1009 oc(c
), poolid(_poolid
), oid(o
), tid(0) {
1010 ranges
.swap(_ranges
);
1012 void finish(int r
) override
{
1013 oc
->bh_write_commit(poolid
, oid
, ranges
, tid
, r
);
1014 trace
.event("finish");
1017 void ObjectCacher::bh_write_scattered(list
<BufferHead
*>& blist
)
1019 assert(lock
.is_locked());
1021 Object
*ob
= blist
.front()->ob
;
1024 ceph::real_time last_write
;
1026 vector
<pair
<loff_t
, uint64_t> > ranges
;
1027 vector
<pair
<uint64_t, bufferlist
> > io_vec
;
1029 ranges
.reserve(blist
.size());
1030 io_vec
.reserve(blist
.size());
1032 uint64_t total_len
= 0;
1033 for (list
<BufferHead
*>::iterator p
= blist
.begin(); p
!= blist
.end(); ++p
) {
1034 BufferHead
*bh
= *p
;
1035 ldout(cct
, 7) << "bh_write_scattered " << *bh
<< dendl
;
1036 assert(bh
->ob
== ob
);
1037 assert(bh
->bl
.length() == bh
->length());
1038 ranges
.push_back(pair
<loff_t
, uint64_t>(bh
->start(), bh
->length()));
1040 int n
= io_vec
.size();
1041 io_vec
.resize(n
+ 1);
1042 io_vec
[n
].first
= bh
->start();
1043 io_vec
[n
].second
= bh
->bl
;
1045 total_len
+= bh
->length();
1046 if (bh
->snapc
.seq
> snapc
.seq
)
1048 if (bh
->last_write
> last_write
)
1049 last_write
= bh
->last_write
;
1052 C_WriteCommit
*oncommit
= new C_WriteCommit(this, ob
->oloc
.pool
, ob
->get_soid(), ranges
);
1054 ceph_tid_t tid
= writeback_handler
.write(ob
->get_oid(), ob
->get_oloc(),
1055 io_vec
, snapc
, last_write
,
1056 ob
->truncate_size
, ob
->truncate_seq
,
1058 oncommit
->tid
= tid
;
1059 ob
->last_write_tid
= tid
;
1060 for (list
<BufferHead
*>::iterator p
= blist
.begin(); p
!= blist
.end(); ++p
) {
1061 BufferHead
*bh
= *p
;
1062 bh
->last_write_tid
= tid
;
1067 perfcounter
->inc(l_objectcacher_data_flushed
, total_len
);
1070 void ObjectCacher::bh_write(BufferHead
*bh
, const ZTracer::Trace
&parent_trace
)
1072 assert(lock
.is_locked());
1073 ldout(cct
, 7) << "bh_write " << *bh
<< dendl
;
1077 ZTracer::Trace trace
;
1078 if (parent_trace
.valid()) {
1079 trace
.init("", &trace_endpoint
, &parent_trace
);
1080 trace
.copy_name("bh_write " + bh
->ob
->get_oid().name
);
1081 trace
.event("start");
1085 C_WriteCommit
*oncommit
= new C_WriteCommit(this, bh
->ob
->oloc
.pool
,
1086 bh
->ob
->get_soid(), bh
->start(),
1087 bh
->length(), trace
);
1089 ceph_tid_t tid
= writeback_handler
.write(bh
->ob
->get_oid(),
1091 bh
->start(), bh
->length(),
1092 bh
->snapc
, bh
->bl
, bh
->last_write
,
1093 bh
->ob
->truncate_size
,
1094 bh
->ob
->truncate_seq
,
1095 bh
->journal_tid
, trace
, oncommit
);
1096 ldout(cct
, 20) << " tid " << tid
<< " on " << bh
->ob
->get_oid() << dendl
;
1098 // set bh last_write_tid
1099 oncommit
->tid
= tid
;
1100 bh
->ob
->last_write_tid
= tid
;
1101 bh
->last_write_tid
= tid
;
1104 perfcounter
->inc(l_objectcacher_data_flushed
, bh
->length());
1110 void ObjectCacher::bh_write_commit(int64_t poolid
, sobject_t oid
,
1111 vector
<pair
<loff_t
, uint64_t> >& ranges
,
1112 ceph_tid_t tid
, int r
)
1114 assert(lock
.is_locked());
1115 ldout(cct
, 7) << "bh_write_commit " << oid
<< " tid " << tid
1116 << " ranges " << ranges
<< " returned " << r
<< dendl
;
1118 if (objects
[poolid
].count(oid
) == 0) {
1119 ldout(cct
, 7) << "bh_write_commit no object cache" << dendl
;
1123 Object
*ob
= objects
[poolid
][oid
];
1124 int was_dirty_or_tx
= ob
->oset
->dirty_or_tx
;
1126 for (vector
<pair
<loff_t
, uint64_t> >::iterator p
= ranges
.begin();
1129 loff_t start
= p
->first
;
1130 uint64_t length
= p
->second
;
1132 ldout(cct
, 10) << "bh_write_commit marking exists on " << *ob
<< dendl
;
1135 if (writeback_handler
.may_copy_on_write(ob
->get_oid(), start
, length
,
1137 ldout(cct
, 10) << "bh_write_commit may copy on write, clearing "
1138 "complete on " << *ob
<< dendl
;
1139 ob
->complete
= false;
1143 vector
<pair
<loff_t
, BufferHead
*>> hit
;
1145 for (map
<loff_t
, BufferHead
*>::const_iterator p
= ob
->data_lower_bound(start
);
1146 p
!= ob
->data
.end();
1148 BufferHead
*bh
= p
->second
;
1150 if (bh
->start() > start
+(loff_t
)length
)
1153 if (bh
->start() < start
&&
1154 bh
->end() > start
+(loff_t
)length
) {
1155 ldout(cct
, 20) << "bh_write_commit skipping " << *bh
<< dendl
;
1159 // make sure bh is tx
1161 ldout(cct
, 10) << "bh_write_commit skipping non-tx " << *bh
<< dendl
;
1165 // make sure bh tid matches
1166 if (bh
->last_write_tid
!= tid
) {
1167 assert(bh
->last_write_tid
> tid
);
1168 ldout(cct
, 10) << "bh_write_commit newer tid on " << *bh
<< dendl
;
1173 // ok! mark bh clean and error-free
1175 bh
->set_journal_tid(0);
1176 if (bh
->get_nocache())
1177 bh_lru_rest
.lru_bottouch(bh
);
1178 hit
.push_back(make_pair(bh
->start(), bh
));
1179 ldout(cct
, 10) << "bh_write_commit clean " << *bh
<< dendl
;
1182 ldout(cct
, 10) << "bh_write_commit marking dirty again due to error "
1183 << *bh
<< " r = " << r
<< " " << cpp_strerror(-r
)
1188 for (auto& p
: hit
) {
1189 //p.second maybe merged and deleted in merge_left
1190 if (ob
->data
.count(p
.first
))
1191 ob
->try_merge_bh(p
.second
);
1195 // update last_commit.
1196 assert(ob
->last_commit_tid
< tid
);
1197 ob
->last_commit_tid
= tid
;
1201 if (ob
->waitfor_commit
.count(tid
)) {
1202 ls
.splice(ls
.begin(), ob
->waitfor_commit
[tid
]);
1203 ob
->waitfor_commit
.erase(tid
);
1206 // is the entire object set now clean and fully committed?
1207 ObjectSet
*oset
= ob
->oset
;
1210 if (flush_set_callback
&&
1211 was_dirty_or_tx
> 0 &&
1212 oset
->dirty_or_tx
== 0) { // nothing dirty/tx
1213 flush_set_callback(flush_set_callback_arg
, oset
);
1217 finish_contexts(cct
, ls
, r
);
1220 void ObjectCacher::flush(ZTracer::Trace
*trace
, loff_t amount
)
1222 assert(trace
!= nullptr);
1223 assert(lock
.is_locked());
1224 ceph::real_time cutoff
= ceph::real_clock::now();
1226 ldout(cct
, 10) << "flush " << amount
<< dendl
;
1229 * NOTE: we aren't actually pulling things off the LRU here, just
1230 * looking at the tail item. Then we call bh_write, which moves it
1231 * to the other LRU, so that we can call
1232 * lru_dirty.lru_get_next_expire() again.
1234 int64_t left
= amount
;
1235 while (amount
== 0 || left
> 0) {
1236 BufferHead
*bh
= static_cast<BufferHead
*>(
1237 bh_lru_dirty
.lru_get_next_expire());
1239 if (bh
->last_write
> cutoff
) break;
1241 if (scattered_write
) {
1242 bh_write_adjacencies(bh
, cutoff
, amount
> 0 ? &left
: NULL
, NULL
);
1244 left
-= bh
->length();
1245 bh_write(bh
, *trace
);
1251 void ObjectCacher::trim()
1253 assert(lock
.is_locked());
1254 ldout(cct
, 10) << "trim start: bytes: max " << max_size
<< " clean "
1255 << get_stat_clean() << ", objects: max " << max_objects
1256 << " current " << ob_lru
.lru_get_size() << dendl
;
1258 while (get_stat_clean() > 0 && (uint64_t) get_stat_clean() > max_size
) {
1259 BufferHead
*bh
= static_cast<BufferHead
*>(bh_lru_rest
.lru_expire());
1263 ldout(cct
, 10) << "trim trimming " << *bh
<< dendl
;
1264 assert(bh
->is_clean() || bh
->is_zero() || bh
->is_error());
1266 Object
*ob
= bh
->ob
;
1271 ldout(cct
, 10) << "trim clearing complete on " << *ob
<< dendl
;
1272 ob
->complete
= false;
1276 while (ob_lru
.lru_get_size() > max_objects
) {
1277 Object
*ob
= static_cast<Object
*>(ob_lru
.lru_expire());
1281 ldout(cct
, 10) << "trim trimming " << *ob
<< dendl
;
1285 ldout(cct
, 10) << "trim finish: max " << max_size
<< " clean "
1286 << get_stat_clean() << ", objects: max " << max_objects
1287 << " current " << ob_lru
.lru_get_size() << dendl
;
1294 bool ObjectCacher::is_cached(ObjectSet
*oset
, vector
<ObjectExtent
>& extents
,
1297 assert(lock
.is_locked());
1298 for (vector
<ObjectExtent
>::iterator ex_it
= extents
.begin();
1299 ex_it
!= extents
.end();
1301 ldout(cct
, 10) << "is_cached " << *ex_it
<< dendl
;
1304 sobject_t
soid(ex_it
->oid
, snapid
);
1305 Object
*o
= get_object_maybe(soid
, ex_it
->oloc
);
1308 if (!o
->is_cached(ex_it
->offset
, ex_it
->length
))
1316 * returns # bytes read (if in cache). onfinish is untouched (caller
1318 * returns 0 if doing async read
1320 int ObjectCacher::readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
,
1321 ZTracer::Trace
*parent_trace
)
1323 ZTracer::Trace trace
;
1324 if (parent_trace
!= nullptr) {
1325 trace
.init("read", &trace_endpoint
, parent_trace
);
1326 trace
.event("start");
1329 int r
=_readx(rd
, oset
, onfinish
, true, &trace
);
1331 trace
.event("finish");
1336 int ObjectCacher::_readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
,
1337 bool external_call
, ZTracer::Trace
*trace
)
1339 assert(trace
!= nullptr);
1340 assert(lock
.is_locked());
1341 bool success
= true;
1343 uint64_t bytes_in_cache
= 0;
1344 uint64_t bytes_not_in_cache
= 0;
1345 uint64_t total_bytes_read
= 0;
1346 map
<uint64_t, bufferlist
> stripe_map
; // final buffer offset -> substring
1347 bool dontneed
= rd
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_DONTNEED
;
1348 bool nocache
= rd
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_NOCACHE
;
1351 * WARNING: we can only meaningfully return ENOENT if the read request
1352 * passed in a single ObjectExtent. Any caller who wants ENOENT instead of
1353 * zeroed buffers needs to feed single extents into readx().
1355 assert(!oset
->return_enoent
|| rd
->extents
.size() == 1);
1357 for (vector
<ObjectExtent
>::iterator ex_it
= rd
->extents
.begin();
1358 ex_it
!= rd
->extents
.end();
1360 ldout(cct
, 10) << "readx " << *ex_it
<< dendl
;
1362 total_bytes_read
+= ex_it
->length
;
1365 sobject_t
soid(ex_it
->oid
, rd
->snap
);
1366 Object
*o
= get_object(soid
, ex_it
->objectno
, oset
, ex_it
->oloc
,
1367 ex_it
->truncate_size
, oset
->truncate_seq
);
1371 // does not exist and no hits?
1372 if (oset
->return_enoent
&& !o
->exists
) {
1373 ldout(cct
, 10) << "readx object !exists, 1 extent..." << dendl
;
1375 // should we worry about COW underneath us?
1376 if (writeback_handler
.may_copy_on_write(soid
.oid
, ex_it
->offset
,
1377 ex_it
->length
, soid
.snap
)) {
1378 ldout(cct
, 20) << "readx may copy on write" << dendl
;
1380 list
<BufferHead
*> blist
;
1381 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= o
->data
.begin();
1382 bh_it
!= o
->data
.end();
1384 BufferHead
*bh
= bh_it
->second
;
1385 if (bh
->is_dirty() || bh
->is_tx()) {
1386 ldout(cct
, 10) << "readx flushing " << *bh
<< dendl
;
1388 if (bh
->is_dirty()) {
1389 if (scattered_write
)
1390 blist
.push_back(bh
);
1392 bh_write(bh
, *trace
);
1396 if (scattered_write
&& !blist
.empty())
1397 bh_write_scattered(blist
);
1399 ldout(cct
, 10) << "readx waiting on tid " << o
->last_write_tid
1400 << " on " << *o
<< dendl
;
1401 o
->waitfor_commit
[o
->last_write_tid
].push_back(
1402 new C_RetryRead(this,rd
, oset
, onfinish
, *trace
));
1403 // FIXME: perfcounter!
1408 // can we return ENOENT?
1409 bool allzero
= true;
1410 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= o
->data
.begin();
1411 bh_it
!= o
->data
.end();
1413 ldout(cct
, 20) << "readx ob has bh " << *bh_it
->second
<< dendl
;
1414 if (!bh_it
->second
->is_zero() && !bh_it
->second
->is_rx()) {
1420 ldout(cct
, 10) << "readx ob has all zero|rx, returning ENOENT"
1429 // map extent into bufferheads
1430 map
<loff_t
, BufferHead
*> hits
, missing
, rx
, errors
;
1431 o
->map_read(*ex_it
, hits
, missing
, rx
, errors
);
1432 if (external_call
) {
1433 // retry reading error buffers
1434 missing
.insert(errors
.begin(), errors
.end());
1436 // some reads had errors, fail later so completions
1437 // are cleaned up properly
1438 // TODO: make read path not call _readx for every completion
1439 hits
.insert(errors
.begin(), errors
.end());
1442 if (!missing
.empty() || !rx
.empty()) {
1444 map
<loff_t
, BufferHead
*>::iterator last
= missing
.end();
1445 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= missing
.begin();
1446 bh_it
!= missing
.end();
1448 uint64_t rx_bytes
= static_cast<uint64_t>(
1449 stat_rx
+ bh_it
->second
->length());
1450 bytes_not_in_cache
+= bh_it
->second
->length();
1451 if (!waitfor_read
.empty() || (stat_rx
> 0 && rx_bytes
> max_size
)) {
1452 // cache is full with concurrent reads -- wait for rx's to complete
1453 // to constrain memory growth (especially during copy-ups)
1455 ldout(cct
, 10) << "readx missed, waiting on cache to complete "
1456 << waitfor_read
.size() << " blocked reads, "
1457 << (MAX(rx_bytes
, max_size
) - max_size
)
1458 << " read bytes" << dendl
;
1459 waitfor_read
.push_back(new C_RetryRead(this, rd
, oset
, onfinish
,
1463 bh_remove(o
, bh_it
->second
);
1464 delete bh_it
->second
;
1466 bh_it
->second
->set_nocache(nocache
);
1467 bh_read(bh_it
->second
, rd
->fadvise_flags
, *trace
);
1468 if ((success
&& onfinish
) || last
!= missing
.end())
1474 //add wait in last bh avoid wakeup early. Because read is order
1475 if (last
!= missing
.end()) {
1476 ldout(cct
, 10) << "readx missed, waiting on " << *last
->second
1477 << " off " << last
->first
<< dendl
;
1478 last
->second
->waitfor_read
[last
->first
].push_back(
1479 new C_RetryRead(this, rd
, oset
, onfinish
, *trace
) );
1484 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= rx
.begin();
1487 touch_bh(bh_it
->second
); // bump in lru, so we don't lose it.
1488 if (success
&& onfinish
) {
1489 ldout(cct
, 10) << "readx missed, waiting on " << *bh_it
->second
1490 << " off " << bh_it
->first
<< dendl
;
1491 bh_it
->second
->waitfor_read
[bh_it
->first
].push_back(
1492 new C_RetryRead(this, rd
, oset
, onfinish
, *trace
) );
1494 bytes_not_in_cache
+= bh_it
->second
->length();
1498 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= hits
.begin();
1499 bh_it
!= hits
.end(); ++bh_it
)
1500 //bump in lru, so we don't lose it when later read
1501 touch_bh(bh_it
->second
);
1504 assert(!hits
.empty());
1506 // make a plain list
1507 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= hits
.begin();
1508 bh_it
!= hits
.end();
1510 BufferHead
*bh
= bh_it
->second
;
1511 ldout(cct
, 10) << "readx hit bh " << *bh
<< dendl
;
1512 if (bh
->is_error() && bh
->error
)
1514 bytes_in_cache
+= bh
->length();
1516 if (bh
->get_nocache() && bh
->is_clean())
1517 bh_lru_rest
.lru_bottouch(bh
);
1520 //must be after touch_bh because touch_bh set dontneed false
1522 ((loff_t
)ex_it
->offset
<= bh
->start() &&
1523 (bh
->end() <=(loff_t
)(ex_it
->offset
+ ex_it
->length
)))) {
1524 bh
->set_dontneed(true); //if dirty
1526 bh_lru_rest
.lru_bottouch(bh
);
1531 // create reverse map of buffer offset -> object for the
1532 // eventual result. this is over a single ObjectExtent, so we
1534 // - the bh's are contiguous
1535 // - the buffer frags need not be (and almost certainly aren't)
1536 loff_t opos
= ex_it
->offset
;
1537 map
<loff_t
, BufferHead
*>::iterator bh_it
= hits
.begin();
1538 assert(bh_it
->second
->start() <= opos
);
1539 uint64_t bhoff
= opos
- bh_it
->second
->start();
1540 vector
<pair
<uint64_t,uint64_t> >::iterator f_it
1541 = ex_it
->buffer_extents
.begin();
1544 BufferHead
*bh
= bh_it
->second
;
1545 assert(opos
== (loff_t
)(bh
->start() + bhoff
));
1547 uint64_t len
= MIN(f_it
->second
- foff
, bh
->length() - bhoff
);
1548 ldout(cct
, 10) << "readx rmap opos " << opos
<< ": " << *bh
<< " +"
1549 << bhoff
<< " frag " << f_it
->first
<< "~"
1550 << f_it
->second
<< " +" << foff
<< "~" << len
1554 // put substr here first, since substr_of clobbers, and we
1555 // may get multiple bh's at this stripe_map position
1556 if (bh
->is_zero()) {
1557 stripe_map
[f_it
->first
].append_zero(len
);
1559 bit
.substr_of(bh
->bl
,
1562 stripe_map
[f_it
->first
].claim_append(bit
);
1568 if (opos
== bh
->end()) {
1572 if (foff
== f_it
->second
) {
1576 if (bh_it
== hits
.end()) break;
1577 if (f_it
== ex_it
->buffer_extents
.end())
1580 assert(f_it
== ex_it
->buffer_extents
.end());
1581 assert(opos
== (loff_t
)ex_it
->offset
+ (loff_t
)ex_it
->length
);
1584 if (dontneed
&& o
->include_all_cached_data(ex_it
->offset
, ex_it
->length
))
1590 if (perfcounter
&& external_call
) {
1591 perfcounter
->inc(l_objectcacher_data_read
, total_bytes_read
);
1592 perfcounter
->inc(l_objectcacher_cache_bytes_miss
, bytes_not_in_cache
);
1593 perfcounter
->inc(l_objectcacher_cache_ops_miss
);
1596 ldout(cct
, 20) << "readx defer " << rd
<< dendl
;
1598 ldout(cct
, 20) << "readx drop " << rd
<< " (no complete, but no waiter)"
1604 if (perfcounter
&& external_call
) {
1605 perfcounter
->inc(l_objectcacher_data_read
, total_bytes_read
);
1606 perfcounter
->inc(l_objectcacher_cache_bytes_hit
, bytes_in_cache
);
1607 perfcounter
->inc(l_objectcacher_cache_ops_hit
);
1610 // no misses... success! do the read.
1611 ldout(cct
, 10) << "readx has all buffers" << dendl
;
1613 // ok, assemble into result buffer.
1615 if (rd
->bl
&& !error
) {
1617 for (map
<uint64_t,bufferlist
>::iterator i
= stripe_map
.begin();
1618 i
!= stripe_map
.end();
1620 assert(pos
== i
->first
);
1621 ldout(cct
, 10) << "readx adding buffer len " << i
->second
.length()
1622 << " at " << pos
<< dendl
;
1623 pos
+= i
->second
.length();
1624 rd
->bl
->claim_append(i
->second
);
1625 assert(rd
->bl
->length() == pos
);
1627 ldout(cct
, 10) << "readx result is " << rd
->bl
->length() << dendl
;
1628 } else if (!error
) {
1629 ldout(cct
, 10) << "readx no bufferlist ptr (readahead?), done." << dendl
;
1630 map
<uint64_t,bufferlist
>::reverse_iterator i
= stripe_map
.rbegin();
1631 pos
= i
->first
+ i
->second
.length();
1635 int ret
= error
? error
: pos
;
1636 ldout(cct
, 20) << "readx done " << rd
<< " " << ret
<< dendl
;
1637 assert(pos
<= (uint64_t) INT_MAX
);
1646 void ObjectCacher::retry_waiting_reads()
1649 ls
.swap(waitfor_read
);
1651 while (!ls
.empty() && waitfor_read
.empty()) {
1652 Context
*ctx
= ls
.front();
1656 waitfor_read
.splice(waitfor_read
.end(), ls
);
1659 int ObjectCacher::writex(OSDWrite
*wr
, ObjectSet
*oset
, Context
*onfreespace
,
1660 ZTracer::Trace
*parent_trace
)
1662 assert(lock
.is_locked());
1663 ceph::real_time now
= ceph::real_clock::now();
1664 uint64_t bytes_written
= 0;
1665 uint64_t bytes_written_in_flush
= 0;
1666 bool dontneed
= wr
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_DONTNEED
;
1667 bool nocache
= wr
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_NOCACHE
;
1669 ZTracer::Trace trace
;
1670 if (parent_trace
!= nullptr) {
1671 trace
.init("write", &trace_endpoint
, parent_trace
);
1672 trace
.event("start");
1675 for (vector
<ObjectExtent
>::iterator ex_it
= wr
->extents
.begin();
1676 ex_it
!= wr
->extents
.end();
1679 sobject_t
soid(ex_it
->oid
, CEPH_NOSNAP
);
1680 Object
*o
= get_object(soid
, ex_it
->objectno
, oset
, ex_it
->oloc
,
1681 ex_it
->truncate_size
, oset
->truncate_seq
);
1683 // map it all into a single bufferhead.
1684 BufferHead
*bh
= o
->map_write(*ex_it
, wr
->journal_tid
);
1685 bool missing
= bh
->is_missing();
1686 bh
->snapc
= wr
->snapc
;
1688 bytes_written
+= ex_it
->length
;
1690 bytes_written_in_flush
+= ex_it
->length
;
1693 // adjust buffer pointers (ie "copy" data into my cache)
1694 // this is over a single ObjectExtent, so we know that
1695 // - there is one contiguous bh
1696 // - the buffer frags need not be (and almost certainly aren't)
1697 // note: i assume striping is monotonic... no jumps backwards, ever!
1698 loff_t opos
= ex_it
->offset
;
1699 for (vector
<pair
<uint64_t, uint64_t> >::iterator f_it
1700 = ex_it
->buffer_extents
.begin();
1701 f_it
!= ex_it
->buffer_extents
.end();
1703 ldout(cct
, 10) << "writex writing " << f_it
->first
<< "~"
1704 << f_it
->second
<< " into " << *bh
<< " at " << opos
1706 uint64_t bhoff
= bh
->start() - opos
;
1707 assert(f_it
->second
<= bh
->length() - bhoff
);
1709 // get the frag we're mapping in
1711 frag
.substr_of(wr
->bl
,
1712 f_it
->first
, f_it
->second
);
1714 // keep anything left of bhoff
1717 newbl
.substr_of(bh
->bl
, 0, bhoff
);
1718 newbl
.claim_append(frag
);
1721 opos
+= f_it
->second
;
1724 // ok, now bh is dirty.
1727 bh
->set_dontneed(true);
1728 else if (nocache
&& missing
)
1729 bh
->set_nocache(true);
1733 bh
->last_write
= now
;
1735 o
->try_merge_bh(bh
);
1739 perfcounter
->inc(l_objectcacher_data_written
, bytes_written
);
1740 if (bytes_written_in_flush
) {
1741 perfcounter
->inc(l_objectcacher_overwritten_in_flush
,
1742 bytes_written_in_flush
);
1746 int r
= _wait_for_write(wr
, bytes_written
, oset
, &trace
, onfreespace
);
1754 class ObjectCacher::C_WaitForWrite
: public Context
{
1756 C_WaitForWrite(ObjectCacher
*oc
, uint64_t len
,
1757 const ZTracer::Trace
&trace
, Context
*onfinish
) :
1758 m_oc(oc
), m_len(len
), m_trace(trace
), m_onfinish(onfinish
) {}
1759 void finish(int r
) override
;
1763 ZTracer::Trace m_trace
;
1764 Context
*m_onfinish
;
1767 void ObjectCacher::C_WaitForWrite::finish(int r
)
1769 Mutex::Locker
l(m_oc
->lock
);
1770 m_oc
->maybe_wait_for_writeback(m_len
, &m_trace
);
1771 m_onfinish
->complete(r
);
1774 void ObjectCacher::maybe_wait_for_writeback(uint64_t len
,
1775 ZTracer::Trace
*trace
)
1777 assert(lock
.is_locked());
1778 ceph::mono_time start
= ceph::mono_clock::now();
1780 // wait for writeback?
1781 // - wait for dirty and tx bytes (relative to the max_dirty threshold)
1782 // - do not wait for bytes other waiters are waiting on. this means that
1783 // threads do not wait for each other. this effectively allows the cache
1784 // size to balloon proportional to the data that is in flight.
1785 while (get_stat_dirty() + get_stat_tx() > 0 &&
1786 (uint64_t) (get_stat_dirty() + get_stat_tx()) >=
1787 max_dirty
+ get_stat_dirty_waiting()) {
1789 trace
->event("start wait for writeback");
1791 ldout(cct
, 10) << __func__
<< " waiting for dirty|tx "
1792 << (get_stat_dirty() + get_stat_tx()) << " >= max "
1793 << max_dirty
<< " + dirty_waiting "
1794 << get_stat_dirty_waiting() << dendl
;
1795 flusher_cond
.Signal();
1796 stat_dirty_waiting
+= len
;
1797 stat_cond
.Wait(lock
);
1798 stat_dirty_waiting
-= len
;
1800 ldout(cct
, 10) << __func__
<< " woke up" << dendl
;
1803 trace
->event("finish wait for writeback");
1805 if (blocked
&& perfcounter
) {
1806 perfcounter
->inc(l_objectcacher_write_ops_blocked
);
1807 perfcounter
->inc(l_objectcacher_write_bytes_blocked
, len
);
1808 ceph::timespan blocked
= ceph::mono_clock::now() - start
;
1809 perfcounter
->tinc(l_objectcacher_write_time_blocked
, blocked
);
1813 // blocking wait for write.
1814 int ObjectCacher::_wait_for_write(OSDWrite
*wr
, uint64_t len
, ObjectSet
*oset
,
1815 ZTracer::Trace
*trace
, Context
*onfreespace
)
1817 assert(lock
.is_locked());
1818 assert(trace
!= nullptr);
1821 if (max_dirty
> 0) {
1822 if (block_writes_upfront
) {
1823 maybe_wait_for_writeback(len
, trace
);
1825 onfreespace
->complete(0);
1827 assert(onfreespace
);
1828 finisher
.queue(new C_WaitForWrite(this, len
, *trace
, onfreespace
));
1831 // write-thru! flush what we just wrote.
1834 Context
*fin
= block_writes_upfront
?
1835 new C_Cond(&cond
, &done
, &ret
) : onfreespace
;
1837 bool flushed
= flush_set(oset
, wr
->extents
, trace
, fin
);
1838 assert(!flushed
); // we just dirtied it, and didn't drop our lock!
1839 ldout(cct
, 10) << "wait_for_write waiting on write-thru of " << len
1840 << " bytes" << dendl
;
1841 if (block_writes_upfront
) {
1844 ldout(cct
, 10) << "wait_for_write woke up, ret " << ret
<< dendl
;
1846 onfreespace
->complete(ret
);
1850 // start writeback anyway?
1851 if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty
) {
1852 ldout(cct
, 10) << "wait_for_write " << get_stat_dirty() << " > target "
1853 << target_dirty
<< ", nudging flusher" << dendl
;
1854 flusher_cond
.Signal();
1859 void ObjectCacher::flusher_entry()
1861 ldout(cct
, 10) << "flusher start" << dendl
;
1863 while (!flusher_stop
) {
1864 loff_t all
= get_stat_tx() + get_stat_rx() + get_stat_clean() +
1866 ldout(cct
, 11) << "flusher "
1867 << all
<< " / " << max_size
<< ": "
1868 << get_stat_tx() << " tx, "
1869 << get_stat_rx() << " rx, "
1870 << get_stat_clean() << " clean, "
1871 << get_stat_dirty() << " dirty ("
1872 << target_dirty
<< " target, "
1873 << max_dirty
<< " max)"
1875 loff_t actual
= get_stat_dirty() + get_stat_dirty_waiting();
1877 ZTracer::Trace trace
;
1878 if (cct
->_conf
->osdc_blkin_trace_all
) {
1879 trace
.init("flusher", &trace_endpoint
);
1880 trace
.event("start");
1883 if (actual
> 0 && (uint64_t) actual
> target_dirty
) {
1884 // flush some dirty pages
1885 ldout(cct
, 10) << "flusher " << get_stat_dirty() << " dirty + "
1886 << get_stat_dirty_waiting() << " dirty_waiting > target "
1887 << target_dirty
<< ", flushing some dirty bhs" << dendl
;
1888 flush(&trace
, actual
- target_dirty
);
1890 // check tail of lru for old dirty items
1891 ceph::real_time cutoff
= ceph::real_clock::now();
1892 cutoff
-= max_dirty_age
;
1894 int max
= MAX_FLUSH_UNDER_LOCK
;
1895 while ((bh
= static_cast<BufferHead
*>(bh_lru_dirty
.
1896 lru_get_next_expire())) != 0 &&
1897 bh
->last_write
<= cutoff
&&
1899 ldout(cct
, 10) << "flusher flushing aged dirty bh " << *bh
<< dendl
;
1900 if (scattered_write
) {
1901 bh_write_adjacencies(bh
, cutoff
, NULL
, &max
);
1903 bh_write(bh
, trace
);
1908 // back off the lock to avoid starving other threads
1909 trace
.event("backoff");
1916 trace
.event("finish");
1920 flusher_cond
.WaitInterval(lock
, seconds(1));
1923 /* Wait for reads to finish. This is only possible if handling
1924 * -ENOENT made some read completions finish before their rados read
1925 * came back. If we don't wait for them, and destroy the cache, when
1926 * the rados reads do come back their callback will try to access the
1927 * no-longer-valid ObjectCacher.
1929 while (reads_outstanding
> 0) {
1930 ldout(cct
, 10) << "Waiting for all reads to complete. Number left: "
1931 << reads_outstanding
<< dendl
;
1932 read_cond
.Wait(lock
);
1936 ldout(cct
, 10) << "flusher finish" << dendl
;
1940 // -------------------------------------------------
1942 bool ObjectCacher::set_is_empty(ObjectSet
*oset
)
1944 assert(lock
.is_locked());
1945 if (oset
->objects
.empty())
1948 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin(); !p
.end(); ++p
)
1949 if (!(*p
)->is_empty())
1955 bool ObjectCacher::set_is_cached(ObjectSet
*oset
)
1957 assert(lock
.is_locked());
1958 if (oset
->objects
.empty())
1961 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin();
1964 for (map
<loff_t
,BufferHead
*>::iterator q
= ob
->data
.begin();
1965 q
!= ob
->data
.end();
1967 BufferHead
*bh
= q
->second
;
1968 if (!bh
->is_dirty() && !bh
->is_tx())
1976 bool ObjectCacher::set_is_dirty_or_committing(ObjectSet
*oset
)
1978 assert(lock
.is_locked());
1979 if (oset
->objects
.empty())
1982 for (xlist
<Object
*>::iterator i
= oset
->objects
.begin();
1986 for (map
<loff_t
,BufferHead
*>::iterator p
= ob
->data
.begin();
1987 p
!= ob
->data
.end();
1989 BufferHead
*bh
= p
->second
;
1990 if (bh
->is_dirty() || bh
->is_tx())
1999 // purge. non-blocking. violently removes dirty buffers from cache.
2000 void ObjectCacher::purge(Object
*ob
)
2002 assert(lock
.is_locked());
2003 ldout(cct
, 10) << "purge " << *ob
<< dendl
;
2009 // flush. non-blocking. no callback.
2010 // true if clean, already flushed.
2011 // false if we wrote something.
2012 // be sloppy about the ranges and flush any buffer it touches
2013 bool ObjectCacher::flush(Object
*ob
, loff_t offset
, loff_t length
,
2014 ZTracer::Trace
*trace
)
2016 assert(trace
!= nullptr);
2017 assert(lock
.is_locked());
2018 list
<BufferHead
*> blist
;
2020 ldout(cct
, 10) << "flush " << *ob
<< " " << offset
<< "~" << length
<< dendl
;
2021 for (map
<loff_t
,BufferHead
*>::const_iterator p
= ob
->data_lower_bound(offset
);
2022 p
!= ob
->data
.end();
2024 BufferHead
*bh
= p
->second
;
2025 ldout(cct
, 20) << "flush " << *bh
<< dendl
;
2026 if (length
&& bh
->start() > offset
+length
) {
2033 if (!bh
->is_dirty()) {
2037 if (scattered_write
)
2038 blist
.push_back(bh
);
2040 bh_write(bh
, *trace
);
2043 if (scattered_write
&& !blist
.empty())
2044 bh_write_scattered(blist
);
2049 bool ObjectCacher::_flush_set_finish(C_GatherBuilder
*gather
,
2052 assert(lock
.is_locked());
2053 if (gather
->has_subs()) {
2054 gather
->set_finisher(onfinish
);
2059 ldout(cct
, 10) << "flush_set has no dirty|tx bhs" << dendl
;
2060 onfinish
->complete(0);
2064 // flush. non-blocking, takes callback.
2065 // returns true if already flushed
2066 bool ObjectCacher::flush_set(ObjectSet
*oset
, Context
*onfinish
)
2068 assert(lock
.is_locked());
2069 assert(onfinish
!= NULL
);
2070 if (oset
->objects
.empty()) {
2071 ldout(cct
, 10) << "flush_set on " << oset
<< " dne" << dendl
;
2072 onfinish
->complete(0);
2076 ldout(cct
, 10) << "flush_set " << oset
<< dendl
;
2078 // we'll need to wait for all objects to flush!
2079 C_GatherBuilder
gather(cct
);
2080 set
<Object
*> waitfor_commit
;
2082 list
<BufferHead
*> blist
;
2083 Object
*last_ob
= NULL
;
2084 set
<BufferHead
*, BufferHead::ptr_lt
>::const_iterator it
, p
, q
;
2086 // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
2087 // order. But items in oset->objects are not sorted. So the iterator can
2088 // point to any buffer head in the ObjectSet
2089 BufferHead
key(*oset
->objects
.begin());
2090 it
= dirty_or_tx_bh
.lower_bound(&key
);
2093 bool backwards
= true;
2094 if (it
!= dirty_or_tx_bh
.begin())
2099 for (; p
!= dirty_or_tx_bh
.end(); p
= q
) {
2101 BufferHead
*bh
= *p
;
2102 if (bh
->ob
->oset
!= oset
)
2104 waitfor_commit
.insert(bh
->ob
);
2105 if (bh
->is_dirty()) {
2106 if (scattered_write
) {
2107 if (last_ob
!= bh
->ob
) {
2108 if (!blist
.empty()) {
2109 bh_write_scattered(blist
);
2114 blist
.push_back(bh
);
2122 for(p
= q
= it
; true; p
= q
) {
2123 if (q
!= dirty_or_tx_bh
.begin())
2127 BufferHead
*bh
= *p
;
2128 if (bh
->ob
->oset
!= oset
)
2130 waitfor_commit
.insert(bh
->ob
);
2131 if (bh
->is_dirty()) {
2132 if (scattered_write
) {
2133 if (last_ob
!= bh
->ob
) {
2134 if (!blist
.empty()) {
2135 bh_write_scattered(blist
);
2140 blist
.push_front(bh
);
2150 if (scattered_write
&& !blist
.empty())
2151 bh_write_scattered(blist
);
2153 for (set
<Object
*>::iterator i
= waitfor_commit
.begin();
2154 i
!= waitfor_commit
.end(); ++i
) {
2157 // we'll need to gather...
2158 ldout(cct
, 10) << "flush_set " << oset
<< " will wait for ack tid "
2159 << ob
->last_write_tid
<< " on " << *ob
<< dendl
;
2160 ob
->waitfor_commit
[ob
->last_write_tid
].push_back(gather
.new_sub());
2163 return _flush_set_finish(&gather
, onfinish
);
2166 // flush. non-blocking, takes callback.
2167 // returns true if already flushed
2168 bool ObjectCacher::flush_set(ObjectSet
*oset
, vector
<ObjectExtent
>& exv
,
2169 ZTracer::Trace
*trace
, Context
*onfinish
)
2171 assert(lock
.is_locked());
2172 assert(trace
!= nullptr);
2173 assert(onfinish
!= NULL
);
2174 if (oset
->objects
.empty()) {
2175 ldout(cct
, 10) << "flush_set on " << oset
<< " dne" << dendl
;
2176 onfinish
->complete(0);
2180 ldout(cct
, 10) << "flush_set " << oset
<< " on " << exv
.size()
2181 << " ObjectExtents" << dendl
;
2183 // we'll need to wait for all objects to flush!
2184 C_GatherBuilder
gather(cct
);
2186 for (vector
<ObjectExtent
>::iterator p
= exv
.begin();
2189 ObjectExtent
&ex
= *p
;
2190 sobject_t
soid(ex
.oid
, CEPH_NOSNAP
);
2191 if (objects
[oset
->poolid
].count(soid
) == 0)
2193 Object
*ob
= objects
[oset
->poolid
][soid
];
2195 ldout(cct
, 20) << "flush_set " << oset
<< " ex " << ex
<< " ob " << soid
2196 << " " << ob
<< dendl
;
2198 if (!flush(ob
, ex
.offset
, ex
.length
, trace
)) {
2199 // we'll need to gather...
2200 ldout(cct
, 10) << "flush_set " << oset
<< " will wait for ack tid "
2201 << ob
->last_write_tid
<< " on " << *ob
<< dendl
;
2202 ob
->waitfor_commit
[ob
->last_write_tid
].push_back(gather
.new_sub());
2206 return _flush_set_finish(&gather
, onfinish
);
2209 // flush all dirty data. non-blocking, takes callback.
2210 // returns true if already flushed
2211 bool ObjectCacher::flush_all(Context
*onfinish
)
2213 assert(lock
.is_locked());
2214 assert(onfinish
!= NULL
);
2216 ldout(cct
, 10) << "flush_all " << dendl
;
2218 // we'll need to wait for all objects to flush!
2219 C_GatherBuilder
gather(cct
);
2220 set
<Object
*> waitfor_commit
;
2222 list
<BufferHead
*> blist
;
2223 Object
*last_ob
= NULL
;
2224 set
<BufferHead
*, BufferHead::ptr_lt
>::iterator next
, it
;
2225 next
= it
= dirty_or_tx_bh
.begin();
2226 while (it
!= dirty_or_tx_bh
.end()) {
2228 BufferHead
*bh
= *it
;
2229 waitfor_commit
.insert(bh
->ob
);
2231 if (bh
->is_dirty()) {
2232 if (scattered_write
) {
2233 if (last_ob
!= bh
->ob
) {
2234 if (!blist
.empty()) {
2235 bh_write_scattered(blist
);
2240 blist
.push_back(bh
);
2249 if (scattered_write
&& !blist
.empty())
2250 bh_write_scattered(blist
);
2252 for (set
<Object
*>::iterator i
= waitfor_commit
.begin();
2253 i
!= waitfor_commit
.end();
2257 // we'll need to gather...
2258 ldout(cct
, 10) << "flush_all will wait for ack tid "
2259 << ob
->last_write_tid
<< " on " << *ob
<< dendl
;
2260 ob
->waitfor_commit
[ob
->last_write_tid
].push_back(gather
.new_sub());
2263 return _flush_set_finish(&gather
, onfinish
);
2266 void ObjectCacher::purge_set(ObjectSet
*oset
)
2268 assert(lock
.is_locked());
2269 if (oset
->objects
.empty()) {
2270 ldout(cct
, 10) << "purge_set on " << oset
<< " dne" << dendl
;
2274 ldout(cct
, 10) << "purge_set " << oset
<< dendl
;
2275 const bool were_dirty
= oset
->dirty_or_tx
> 0;
2277 for (xlist
<Object
*>::iterator i
= oset
->objects
.begin();
2283 // Although we have purged rather than flushed, caller should still
2284 // drop any resources associate with dirty data.
2285 assert(oset
->dirty_or_tx
== 0);
2286 if (flush_set_callback
&& were_dirty
) {
2287 flush_set_callback(flush_set_callback_arg
, oset
);
2292 loff_t
ObjectCacher::release(Object
*ob
)
2294 assert(lock
.is_locked());
2295 list
<BufferHead
*> clean
;
2296 loff_t o_unclean
= 0;
2298 for (map
<loff_t
,BufferHead
*>::iterator p
= ob
->data
.begin();
2299 p
!= ob
->data
.end();
2301 BufferHead
*bh
= p
->second
;
2302 if (bh
->is_clean() || bh
->is_zero() || bh
->is_error())
2303 clean
.push_back(bh
);
2305 o_unclean
+= bh
->length();
2308 for (list
<BufferHead
*>::iterator p
= clean
.begin();
2315 if (ob
->can_close()) {
2316 ldout(cct
, 10) << "release trimming " << *ob
<< dendl
;
2318 assert(o_unclean
== 0);
2323 ldout(cct
, 10) << "release clearing complete on " << *ob
<< dendl
;
2324 ob
->complete
= false;
2327 ldout(cct
, 10) << "release setting exists on " << *ob
<< dendl
;
2334 loff_t
ObjectCacher::release_set(ObjectSet
*oset
)
2336 assert(lock
.is_locked());
2337 // return # bytes not clean (and thus not released).
2340 if (oset
->objects
.empty()) {
2341 ldout(cct
, 10) << "release_set on " << oset
<< " dne" << dendl
;
2345 ldout(cct
, 10) << "release_set " << oset
<< dendl
;
2347 xlist
<Object
*>::iterator q
;
2348 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin();
2354 loff_t o_unclean
= release(ob
);
2355 unclean
+= o_unclean
;
2358 ldout(cct
, 10) << "release_set " << oset
<< " " << *ob
2359 << " has " << o_unclean
<< " bytes left"
2365 ldout(cct
, 10) << "release_set " << oset
2366 << ", " << unclean
<< " bytes left" << dendl
;
2373 uint64_t ObjectCacher::release_all()
2375 assert(lock
.is_locked());
2376 ldout(cct
, 10) << "release_all" << dendl
;
2377 uint64_t unclean
= 0;
2379 vector
<ceph::unordered_map
<sobject_t
, Object
*> >::iterator i
2381 while (i
!= objects
.end()) {
2382 ceph::unordered_map
<sobject_t
, Object
*>::iterator p
= i
->begin();
2383 while (p
!= i
->end()) {
2384 ceph::unordered_map
<sobject_t
, Object
*>::iterator n
= p
;
2387 Object
*ob
= p
->second
;
2389 loff_t o_unclean
= release(ob
);
2390 unclean
+= o_unclean
;
2393 ldout(cct
, 10) << "release_all " << *ob
2394 << " has " << o_unclean
<< " bytes left"
2402 ldout(cct
, 10) << "release_all unclean " << unclean
<< " bytes left"
2409 void ObjectCacher::clear_nonexistence(ObjectSet
*oset
)
2411 assert(lock
.is_locked());
2412 ldout(cct
, 10) << "clear_nonexistence() " << oset
<< dendl
;
2414 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin();
2418 ldout(cct
, 10) << " setting exists and complete on " << *ob
<< dendl
;
2420 ob
->complete
= false;
2422 for (xlist
<C_ReadFinish
*>::iterator q
= ob
->reads
.begin();
2424 C_ReadFinish
*comp
= *q
;
2425 comp
->distrust_enoent();
2431 * discard object extents from an ObjectSet by removing the objects in
2432 * exls from the in-memory oset.
2434 void ObjectCacher::discard_set(ObjectSet
*oset
, const vector
<ObjectExtent
>& exls
)
2436 assert(lock
.is_locked());
2437 if (oset
->objects
.empty()) {
2438 ldout(cct
, 10) << "discard_set on " << oset
<< " dne" << dendl
;
2442 ldout(cct
, 10) << "discard_set " << oset
<< dendl
;
2444 bool were_dirty
= oset
->dirty_or_tx
> 0;
2446 for (vector
<ObjectExtent
>::const_iterator p
= exls
.begin();
2449 ldout(cct
, 10) << "discard_set " << oset
<< " ex " << *p
<< dendl
;
2450 const ObjectExtent
&ex
= *p
;
2451 sobject_t
soid(ex
.oid
, CEPH_NOSNAP
);
2452 if (objects
[oset
->poolid
].count(soid
) == 0)
2454 Object
*ob
= objects
[oset
->poolid
][soid
];
2456 ob
->discard(ex
.offset
, ex
.length
);
2459 // did we truncate off dirty data?
2460 if (flush_set_callback
&&
2461 were_dirty
&& oset
->dirty_or_tx
== 0)
2462 flush_set_callback(flush_set_callback_arg
, oset
);
2465 void ObjectCacher::verify_stats() const
2467 assert(lock
.is_locked());
2468 ldout(cct
, 10) << "verify_stats" << dendl
;
2470 loff_t clean
= 0, zero
= 0, dirty
= 0, rx
= 0, tx
= 0, missing
= 0,
2472 for (vector
<ceph::unordered_map
<sobject_t
, Object
*> >::const_iterator i
2476 for (ceph::unordered_map
<sobject_t
, Object
*>::const_iterator p
2480 Object
*ob
= p
->second
;
2481 for (map
<loff_t
, BufferHead
*>::const_iterator q
= ob
->data
.begin();
2482 q
!= ob
->data
.end();
2484 BufferHead
*bh
= q
->second
;
2485 switch (bh
->get_state()) {
2486 case BufferHead::STATE_MISSING
:
2487 missing
+= bh
->length();
2489 case BufferHead::STATE_CLEAN
:
2490 clean
+= bh
->length();
2492 case BufferHead::STATE_ZERO
:
2493 zero
+= bh
->length();
2495 case BufferHead::STATE_DIRTY
:
2496 dirty
+= bh
->length();
2498 case BufferHead::STATE_TX
:
2501 case BufferHead::STATE_RX
:
2504 case BufferHead::STATE_ERROR
:
2505 error
+= bh
->length();
2514 ldout(cct
, 10) << " clean " << clean
<< " rx " << rx
<< " tx " << tx
2515 << " dirty " << dirty
<< " missing " << missing
2516 << " error " << error
<< dendl
;
2517 assert(clean
== stat_clean
);
2518 assert(rx
== stat_rx
);
2519 assert(tx
== stat_tx
);
2520 assert(dirty
== stat_dirty
);
2521 assert(missing
== stat_missing
);
2522 assert(zero
== stat_zero
);
2523 assert(error
== stat_error
);
2526 void ObjectCacher::bh_stat_add(BufferHead
*bh
)
2528 assert(lock
.is_locked());
2529 switch (bh
->get_state()) {
2530 case BufferHead::STATE_MISSING
:
2531 stat_missing
+= bh
->length();
2533 case BufferHead::STATE_CLEAN
:
2534 stat_clean
+= bh
->length();
2536 case BufferHead::STATE_ZERO
:
2537 stat_zero
+= bh
->length();
2539 case BufferHead::STATE_DIRTY
:
2540 stat_dirty
+= bh
->length();
2541 bh
->ob
->dirty_or_tx
+= bh
->length();
2542 bh
->ob
->oset
->dirty_or_tx
+= bh
->length();
2544 case BufferHead::STATE_TX
:
2545 stat_tx
+= bh
->length();
2546 bh
->ob
->dirty_or_tx
+= bh
->length();
2547 bh
->ob
->oset
->dirty_or_tx
+= bh
->length();
2549 case BufferHead::STATE_RX
:
2550 stat_rx
+= bh
->length();
2552 case BufferHead::STATE_ERROR
:
2553 stat_error
+= bh
->length();
2556 assert(0 == "bh_stat_add: invalid bufferhead state");
2558 if (get_stat_dirty_waiting() > 0)
2562 void ObjectCacher::bh_stat_sub(BufferHead
*bh
)
2564 assert(lock
.is_locked());
2565 switch (bh
->get_state()) {
2566 case BufferHead::STATE_MISSING
:
2567 stat_missing
-= bh
->length();
2569 case BufferHead::STATE_CLEAN
:
2570 stat_clean
-= bh
->length();
2572 case BufferHead::STATE_ZERO
:
2573 stat_zero
-= bh
->length();
2575 case BufferHead::STATE_DIRTY
:
2576 stat_dirty
-= bh
->length();
2577 bh
->ob
->dirty_or_tx
-= bh
->length();
2578 bh
->ob
->oset
->dirty_or_tx
-= bh
->length();
2580 case BufferHead::STATE_TX
:
2581 stat_tx
-= bh
->length();
2582 bh
->ob
->dirty_or_tx
-= bh
->length();
2583 bh
->ob
->oset
->dirty_or_tx
-= bh
->length();
2585 case BufferHead::STATE_RX
:
2586 stat_rx
-= bh
->length();
2588 case BufferHead::STATE_ERROR
:
2589 stat_error
-= bh
->length();
2592 assert(0 == "bh_stat_sub: invalid bufferhead state");
2596 void ObjectCacher::bh_set_state(BufferHead
*bh
, int s
)
2598 assert(lock
.is_locked());
2599 int state
= bh
->get_state();
2600 // move between lru lists?
2601 if (s
== BufferHead::STATE_DIRTY
&& state
!= BufferHead::STATE_DIRTY
) {
2602 bh_lru_rest
.lru_remove(bh
);
2603 bh_lru_dirty
.lru_insert_top(bh
);
2604 } else if (s
!= BufferHead::STATE_DIRTY
&&state
== BufferHead::STATE_DIRTY
) {
2605 bh_lru_dirty
.lru_remove(bh
);
2606 if (bh
->get_dontneed())
2607 bh_lru_rest
.lru_insert_bot(bh
);
2609 bh_lru_rest
.lru_insert_top(bh
);
2612 if ((s
== BufferHead::STATE_TX
||
2613 s
== BufferHead::STATE_DIRTY
) &&
2614 state
!= BufferHead::STATE_TX
&&
2615 state
!= BufferHead::STATE_DIRTY
) {
2616 dirty_or_tx_bh
.insert(bh
);
2617 } else if ((state
== BufferHead::STATE_TX
||
2618 state
== BufferHead::STATE_DIRTY
) &&
2619 s
!= BufferHead::STATE_TX
&&
2620 s
!= BufferHead::STATE_DIRTY
) {
2621 dirty_or_tx_bh
.erase(bh
);
2624 if (s
!= BufferHead::STATE_ERROR
&&
2625 state
== BufferHead::STATE_ERROR
) {
2635 void ObjectCacher::bh_add(Object
*ob
, BufferHead
*bh
)
2637 assert(lock
.is_locked());
2638 ldout(cct
, 30) << "bh_add " << *ob
<< " " << *bh
<< dendl
;
2640 if (bh
->is_dirty()) {
2641 bh_lru_dirty
.lru_insert_top(bh
);
2642 dirty_or_tx_bh
.insert(bh
);
2644 if (bh
->get_dontneed())
2645 bh_lru_rest
.lru_insert_bot(bh
);
2647 bh_lru_rest
.lru_insert_top(bh
);
2651 dirty_or_tx_bh
.insert(bh
);
2656 void ObjectCacher::bh_remove(Object
*ob
, BufferHead
*bh
)
2658 assert(lock
.is_locked());
2659 assert(bh
->get_journal_tid() == 0);
2660 ldout(cct
, 30) << "bh_remove " << *ob
<< " " << *bh
<< dendl
;
2662 if (bh
->is_dirty()) {
2663 bh_lru_dirty
.lru_remove(bh
);
2664 dirty_or_tx_bh
.erase(bh
);
2666 bh_lru_rest
.lru_remove(bh
);
2670 dirty_or_tx_bh
.erase(bh
);
2673 if (get_stat_dirty_waiting() > 0)