1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include "msg/Messenger.h"
7 #include "ObjectCacher.h"
8 #include "WritebackHandler.h"
9 #include "common/errno.h"
10 #include "common/perf_counters.h"
12 #include "include/assert.h"
14 #define MAX_FLUSH_UNDER_LOCK 20 ///< max bh's we start writeback on
16 using std::chrono::seconds
;
17 /// while holding the lock
19 /*** ObjectCacher::BufferHead ***/
22 /*** ObjectCacher::Object ***/
24 #define dout_subsys ceph_subsys_objectcacher
26 #define dout_prefix *_dout << "objectcacher.object(" << oid << ") "
30 class ObjectCacher::C_ReadFinish
: public Context
{
36 xlist
<C_ReadFinish
*>::item set_item
;
42 C_ReadFinish(ObjectCacher
*c
, Object
*ob
, ceph_tid_t t
, loff_t s
,
44 oc(c
), poolid(ob
->oloc
.pool
), oid(ob
->get_soid()), start(s
), length(l
),
45 set_item(this), trust_enoent(true),
47 ob
->reads
.push_back(&set_item
);
50 void finish(int r
) override
{
51 oc
->bh_read_finish(poolid
, oid
, tid
, start
, length
, bl
, r
, trust_enoent
);
53 // object destructor clears the list
54 if (set_item
.is_on_list())
55 set_item
.remove_myself();
58 void distrust_enoent() {
63 class ObjectCacher::C_RetryRead
: public Context
{
69 C_RetryRead(ObjectCacher
*_oc
, OSDRead
*r
, ObjectSet
*os
, Context
*c
)
70 : oc(_oc
), rd(r
), oset(os
), onfinish(c
) {}
71 void finish(int r
) override
{
74 onfinish
->complete(r
);
77 int ret
= oc
->_readx(rd
, oset
, onfinish
, false);
78 if (ret
!= 0 && onfinish
) {
79 onfinish
->complete(ret
);
84 ObjectCacher::BufferHead
*ObjectCacher::Object::split(BufferHead
*left
,
87 assert(oc
->lock
.is_locked());
88 ldout(oc
->cct
, 20) << "split " << *left
<< " at " << off
<< dendl
;
91 ObjectCacher::BufferHead
*right
= new BufferHead(this);
93 //inherit and if later access, this auto clean.
94 right
->set_dontneed(left
->get_dontneed());
95 right
->set_nocache(left
->get_nocache());
97 right
->last_write_tid
= left
->last_write_tid
;
98 right
->last_read_tid
= left
->last_read_tid
;
99 right
->set_state(left
->get_state());
100 right
->snapc
= left
->snapc
;
101 right
->set_journal_tid(left
->journal_tid
);
103 loff_t newleftlen
= off
- left
->start();
104 right
->set_start(off
);
105 right
->set_length(left
->length() - newleftlen
);
108 oc
->bh_stat_sub(left
);
109 left
->set_length(newleftlen
);
110 oc
->bh_stat_add(left
);
113 oc
->bh_add(this, right
);
119 assert(bl
.length() == (left
->length() + right
->length()));
120 right
->bl
.substr_of(bl
, left
->length(), right
->length());
121 left
->bl
.substr_of(bl
, 0, left
->length());
125 if (!left
->waitfor_read
.empty()) {
126 map
<loff_t
, list
<Context
*> >::iterator start_remove
127 = left
->waitfor_read
.begin();
128 while (start_remove
!= left
->waitfor_read
.end() &&
129 start_remove
->first
< right
->start())
131 for (map
<loff_t
, list
<Context
*> >::iterator p
= start_remove
;
132 p
!= left
->waitfor_read
.end(); ++p
) {
133 ldout(oc
->cct
, 20) << "split moving waiters at byte " << p
->first
134 << " to right bh" << dendl
;
135 right
->waitfor_read
[p
->first
].swap( p
->second
);
136 assert(p
->second
.empty());
138 left
->waitfor_read
.erase(start_remove
, left
->waitfor_read
.end());
141 ldout(oc
->cct
, 20) << "split left is " << *left
<< dendl
;
142 ldout(oc
->cct
, 20) << "split right is " << *right
<< dendl
;
147 void ObjectCacher::Object::merge_left(BufferHead
*left
, BufferHead
*right
)
149 assert(oc
->lock
.is_locked());
150 assert(left
->end() == right
->start());
151 assert(left
->get_state() == right
->get_state());
152 assert(left
->can_merge_journal(right
));
154 ldout(oc
->cct
, 10) << "merge_left " << *left
<< " + " << *right
<< dendl
;
155 if (left
->get_journal_tid() == 0) {
156 left
->set_journal_tid(right
->get_journal_tid());
158 right
->set_journal_tid(0);
160 oc
->bh_remove(this, right
);
161 oc
->bh_stat_sub(left
);
162 left
->set_length(left
->length() + right
->length());
163 oc
->bh_stat_add(left
);
166 left
->bl
.claim_append(right
->bl
);
169 // note: this is sorta busted, but should only be used for dirty buffers
170 left
->last_write_tid
= MAX( left
->last_write_tid
, right
->last_write_tid
);
171 left
->last_write
= MAX( left
->last_write
, right
->last_write
);
173 left
->set_dontneed(right
->get_dontneed() ? left
->get_dontneed() : false);
174 left
->set_nocache(right
->get_nocache() ? left
->get_nocache() : false);
177 for (map
<loff_t
, list
<Context
*> >::iterator p
= right
->waitfor_read
.begin();
178 p
!= right
->waitfor_read
.end();
180 left
->waitfor_read
[p
->first
].splice(left
->waitfor_read
[p
->first
].begin(),
186 ldout(oc
->cct
, 10) << "merge_left result " << *left
<< dendl
;
189 void ObjectCacher::Object::try_merge_bh(BufferHead
*bh
)
191 assert(oc
->lock
.is_locked());
192 ldout(oc
->cct
, 10) << "try_merge_bh " << *bh
<< dendl
;
194 // do not merge rx buffers; last_read_tid may not match
199 map
<loff_t
,BufferHead
*>::iterator p
= data
.find(bh
->start());
200 assert(p
->second
== bh
);
201 if (p
!= data
.begin()) {
203 if (p
->second
->end() == bh
->start() &&
204 p
->second
->get_state() == bh
->get_state() &&
205 p
->second
->can_merge_journal(bh
)) {
206 merge_left(p
->second
, bh
);
213 assert(p
->second
== bh
);
215 if (p
!= data
.end() &&
216 p
->second
->start() == bh
->end() &&
217 p
->second
->get_state() == bh
->get_state() &&
218 p
->second
->can_merge_journal(bh
))
219 merge_left(bh
, p
->second
);
223 * count bytes we have cached in given range
225 bool ObjectCacher::Object::is_cached(loff_t cur
, loff_t left
) const
227 assert(oc
->lock
.is_locked());
228 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(cur
);
233 if (p
->first
<= cur
) {
235 loff_t lenfromcur
= MIN(p
->second
->end() - cur
, left
);
240 } else if (p
->first
> cur
) {
251 * all cached data in this range[off, off+len]
253 bool ObjectCacher::Object::include_all_cached_data(loff_t off
, loff_t len
)
255 assert(oc
->lock
.is_locked());
258 map
<loff_t
, BufferHead
*>::iterator first
= data
.begin();
259 map
<loff_t
, BufferHead
*>::reverse_iterator last
= data
.rbegin();
260 if (first
->second
->start() >= off
&& last
->second
->end() <= (off
+ len
))
267 * map a range of bytes into buffer_heads.
268 * - create missing buffer_heads as necessary.
270 int ObjectCacher::Object::map_read(ObjectExtent
&ex
,
271 map
<loff_t
, BufferHead
*>& hits
,
272 map
<loff_t
, BufferHead
*>& missing
,
273 map
<loff_t
, BufferHead
*>& rx
,
274 map
<loff_t
, BufferHead
*>& errors
)
276 assert(oc
->lock
.is_locked());
277 ldout(oc
->cct
, 10) << "map_read " << ex
.oid
278 << " " << ex
.offset
<< "~" << ex
.length
281 loff_t cur
= ex
.offset
;
282 loff_t left
= ex
.length
;
284 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(ex
.offset
);
287 if (p
== data
.end()) {
289 BufferHead
*n
= new BufferHead(this);
296 ldout(oc
->cct
, 20) << "map_read miss+complete+zero " << left
<< " left, " << *n
<< dendl
;
299 ldout(oc
->cct
, 20) << "map_read miss " << left
<< " left, " << *n
<< dendl
;
302 assert(cur
== (loff_t
)ex
.offset
+ (loff_t
)ex
.length
);
306 if (p
->first
<= cur
) {
307 // have it (or part of it)
308 BufferHead
*e
= p
->second
;
314 hits
[cur
] = e
; // readable!
315 ldout(oc
->cct
, 20) << "map_read hit " << *e
<< dendl
;
316 } else if (e
->is_rx()) {
317 rx
[cur
] = e
; // missing, not readable.
318 ldout(oc
->cct
, 20) << "map_read rx " << *e
<< dendl
;
319 } else if (e
->is_error()) {
321 ldout(oc
->cct
, 20) << "map_read error " << *e
<< dendl
;
326 loff_t lenfromcur
= MIN(e
->end() - cur
, left
);
332 } else if (p
->first
> cur
) {
334 loff_t next
= p
->first
;
335 BufferHead
*n
= new BufferHead(this);
336 loff_t len
= MIN(next
- cur
, left
);
343 ldout(oc
->cct
, 20) << "map_read gap+complete+zero " << *n
<< dendl
;
346 ldout(oc
->cct
, 20) << "map_read gap " << *n
<< dendl
;
348 cur
+= MIN(left
, n
->length());
349 left
-= MIN(left
, n
->length());
358 void ObjectCacher::Object::audit_buffers()
361 for (map
<loff_t
, BufferHead
*>::const_iterator it
= data
.begin();
362 it
!= data
.end(); ++it
) {
363 if (it
->first
!= it
->second
->start()) {
364 lderr(oc
->cct
) << "AUDIT FAILURE: map position " << it
->first
365 << " does not match bh start position: "
366 << *it
->second
<< dendl
;
367 assert(it
->first
== it
->second
->start());
369 if (it
->first
< offset
) {
370 lderr(oc
->cct
) << "AUDIT FAILURE: " << it
->first
<< " " << *it
->second
371 << " overlaps with previous bh " << *((--it
)->second
)
373 assert(it
->first
>= offset
);
375 BufferHead
*bh
= it
->second
;
376 map
<loff_t
, list
<Context
*> >::const_iterator w_it
;
377 for (w_it
= bh
->waitfor_read
.begin();
378 w_it
!= bh
->waitfor_read
.end(); ++w_it
) {
379 if (w_it
->first
< bh
->start() ||
380 w_it
->first
>= bh
->start() + bh
->length()) {
381 lderr(oc
->cct
) << "AUDIT FAILURE: waiter at " << w_it
->first
382 << " is not within bh " << *bh
<< dendl
;
383 assert(w_it
->first
>= bh
->start());
384 assert(w_it
->first
< bh
->start() + bh
->length());
387 offset
= it
->first
+ it
->second
->length();
392 * map a range of extents on an object's buffer cache.
393 * - combine any bh's we're writing into one
394 * - break up bufferheads that don't fall completely within the range
395 * //no! - return a bh that includes the write. may also include
396 * other dirty data to left and/or right.
398 ObjectCacher::BufferHead
*ObjectCacher::Object::map_write(ObjectExtent
&ex
,
401 assert(oc
->lock
.is_locked());
402 BufferHead
*final
= 0;
404 ldout(oc
->cct
, 10) << "map_write oex " << ex
.oid
405 << " " << ex
.offset
<< "~" << ex
.length
<< dendl
;
407 loff_t cur
= ex
.offset
;
408 loff_t left
= ex
.length
;
410 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(ex
.offset
);
415 if (p
== data
.end()) {
417 final
= new BufferHead(this);
418 replace_journal_tid(final
, tid
);
419 final
->set_start( cur
);
420 final
->set_length( max
);
421 oc
->bh_add(this, final
);
422 ldout(oc
->cct
, 10) << "map_write adding trailing bh " << *final
<< dendl
;
424 oc
->bh_stat_sub(final
);
425 final
->set_length(final
->length() + max
);
426 oc
->bh_stat_add(final
);
433 ldout(oc
->cct
, 10) << "cur is " << cur
<< ", p is " << *p
->second
<< dendl
;
434 //oc->verify_stats();
436 if (p
->first
<= cur
) {
437 BufferHead
*bh
= p
->second
;
438 ldout(oc
->cct
, 10) << "map_write bh " << *bh
<< " intersected" << dendl
;
440 if (p
->first
< cur
) {
442 if (cur
+ max
>= bh
->end()) {
443 // we want right bit (one splice)
444 final
= split(bh
, cur
); // just split it, take right half.
445 replace_journal_tid(final
, tid
);
447 assert(p
->second
== final
);
449 // we want middle bit (two splices)
450 final
= split(bh
, cur
);
452 assert(p
->second
== final
);
453 split(final
, cur
+max
);
454 replace_journal_tid(final
, tid
);
457 assert(p
->first
== cur
);
458 if (bh
->length() <= max
) {
459 // whole bufferhead, piece of cake.
461 // we want left bit (one splice)
462 split(bh
, cur
+ max
); // just split
466 oc
->mark_dirty(final
);
467 --p
; // move iterator back to final
468 assert(p
->second
== final
);
469 replace_journal_tid(bh
, tid
);
470 merge_left(final
, bh
);
473 replace_journal_tid(final
, tid
);
478 loff_t lenfromcur
= final
->end() - cur
;
485 loff_t next
= p
->first
;
486 loff_t glen
= MIN(next
- cur
, max
);
487 ldout(oc
->cct
, 10) << "map_write gap " << cur
<< "~" << glen
<< dendl
;
489 oc
->bh_stat_sub(final
);
490 final
->set_length(final
->length() + glen
);
491 oc
->bh_stat_add(final
);
493 final
= new BufferHead(this);
494 replace_journal_tid(final
, tid
);
495 final
->set_start( cur
);
496 final
->set_length( glen
);
497 oc
->bh_add(this, final
);
508 assert(final
->get_journal_tid() == tid
);
509 ldout(oc
->cct
, 10) << "map_write final is " << *final
<< dendl
;
514 void ObjectCacher::Object::replace_journal_tid(BufferHead
*bh
,
516 ceph_tid_t bh_tid
= bh
->get_journal_tid();
518 assert(tid
== 0 || bh_tid
<= tid
);
519 if (bh_tid
!= 0 && bh_tid
!= tid
) {
520 // inform journal that it should not expect a writeback from this extent
521 oc
->writeback_handler
.overwrite_extent(get_oid(), bh
->start(),
522 bh
->length(), bh_tid
, tid
);
524 bh
->set_journal_tid(tid
);
527 void ObjectCacher::Object::truncate(loff_t s
)
529 assert(oc
->lock
.is_locked());
530 ldout(oc
->cct
, 10) << "truncate " << *this << " to " << s
<< dendl
;
532 while (!data
.empty()) {
533 BufferHead
*bh
= data
.rbegin()->second
;
537 // split bh at truncation point?
538 if (bh
->start() < s
) {
543 // remove bh entirely
544 assert(bh
->start() >= s
);
545 assert(bh
->waitfor_read
.empty());
546 replace_journal_tid(bh
, 0);
547 oc
->bh_remove(this, bh
);
552 void ObjectCacher::Object::discard(loff_t off
, loff_t len
)
554 assert(oc
->lock
.is_locked());
555 ldout(oc
->cct
, 10) << "discard " << *this << " " << off
<< "~" << len
559 ldout(oc
->cct
, 10) << " setting exists on " << *this << dendl
;
563 ldout(oc
->cct
, 10) << " clearing complete on " << *this << dendl
;
567 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(off
);
568 while (p
!= data
.end()) {
569 BufferHead
*bh
= p
->second
;
570 if (bh
->start() >= off
+ len
)
573 // split bh at truncation point?
574 if (bh
->start() < off
) {
580 assert(bh
->start() >= off
);
581 if (bh
->end() > off
+ len
) {
582 split(bh
, off
+ len
);
586 ldout(oc
->cct
, 10) << "discard " << *this << " bh " << *bh
<< dendl
;
587 assert(bh
->waitfor_read
.empty());
588 replace_journal_tid(bh
, 0);
589 oc
->bh_remove(this, bh
);
596 /*** ObjectCacher ***/
599 #define dout_prefix *_dout << "objectcacher "
602 ObjectCacher::ObjectCacher(CephContext
*cct_
, string name
,
603 WritebackHandler
& wb
, Mutex
& l
,
604 flush_set_callback_t flush_callback
,
605 void *flush_callback_arg
, uint64_t max_bytes
,
606 uint64_t max_objects
, uint64_t max_dirty
,
607 uint64_t target_dirty
, double max_dirty_age
,
608 bool block_writes_upfront
)
610 cct(cct_
), writeback_handler(wb
), name(name
), lock(l
),
611 max_dirty(max_dirty
), target_dirty(target_dirty
),
612 max_size(max_bytes
), max_objects(max_objects
),
613 max_dirty_age(ceph::make_timespan(max_dirty_age
)),
614 block_writes_upfront(block_writes_upfront
),
615 flush_set_callback(flush_callback
),
616 flush_set_callback_arg(flush_callback_arg
),
617 last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct
),
618 stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
619 stat_missing(0), stat_error(0), stat_dirty_waiting(0), reads_outstanding(0)
623 scattered_write
= writeback_handler
.can_scattered_write();
626 ObjectCacher::~ObjectCacher()
630 // we should be empty.
631 for (vector
<ceph::unordered_map
<sobject_t
, Object
*> >::iterator i
636 assert(bh_lru_rest
.lru_get_size() == 0);
637 assert(bh_lru_dirty
.lru_get_size() == 0);
638 assert(ob_lru
.lru_get_size() == 0);
639 assert(dirty_or_tx_bh
.empty());
642 void ObjectCacher::perf_start()
644 string n
= "objectcacher-" + name
;
645 PerfCountersBuilder
plb(cct
, n
, l_objectcacher_first
, l_objectcacher_last
);
647 plb
.add_u64_counter(l_objectcacher_cache_ops_hit
,
648 "cache_ops_hit", "Hit operations");
649 plb
.add_u64_counter(l_objectcacher_cache_ops_miss
,
650 "cache_ops_miss", "Miss operations");
651 plb
.add_u64_counter(l_objectcacher_cache_bytes_hit
,
652 "cache_bytes_hit", "Hit data");
653 plb
.add_u64_counter(l_objectcacher_cache_bytes_miss
,
654 "cache_bytes_miss", "Miss data");
655 plb
.add_u64_counter(l_objectcacher_data_read
,
656 "data_read", "Read data");
657 plb
.add_u64_counter(l_objectcacher_data_written
,
658 "data_written", "Data written to cache");
659 plb
.add_u64_counter(l_objectcacher_data_flushed
,
660 "data_flushed", "Data flushed");
661 plb
.add_u64_counter(l_objectcacher_overwritten_in_flush
,
662 "data_overwritten_while_flushing",
663 "Data overwritten while flushing");
664 plb
.add_u64_counter(l_objectcacher_write_ops_blocked
, "write_ops_blocked",
665 "Write operations, delayed due to dirty limits");
666 plb
.add_u64_counter(l_objectcacher_write_bytes_blocked
,
667 "write_bytes_blocked",
668 "Write data blocked on dirty limit");
669 plb
.add_time(l_objectcacher_write_time_blocked
, "write_time_blocked",
670 "Time spent blocking a write due to dirty limits");
672 perfcounter
= plb
.create_perf_counters();
673 cct
->get_perfcounters_collection()->add(perfcounter
);
676 void ObjectCacher::perf_stop()
679 cct
->get_perfcounters_collection()->remove(perfcounter
);
684 ObjectCacher::Object
*ObjectCacher::get_object(sobject_t oid
,
688 uint64_t truncate_size
,
689 uint64_t truncate_seq
)
691 // XXX: Add handling of nspace in object_locator_t in cache
692 assert(lock
.is_locked());
694 if ((uint32_t)l
.pool
< objects
.size()) {
695 if (objects
[l
.pool
].count(oid
)) {
696 Object
*o
= objects
[l
.pool
][oid
];
697 o
->object_no
= object_no
;
698 o
->truncate_size
= truncate_size
;
699 o
->truncate_seq
= truncate_seq
;
703 objects
.resize(l
.pool
+1);
707 Object
*o
= new Object(this, oid
, object_no
, oset
, l
, truncate_size
,
709 objects
[l
.pool
][oid
] = o
;
710 ob_lru
.lru_insert_top(o
);
714 void ObjectCacher::close_object(Object
*ob
)
716 assert(lock
.is_locked());
717 ldout(cct
, 10) << "close_object " << *ob
<< dendl
;
718 assert(ob
->can_close());
721 ob_lru
.lru_remove(ob
);
722 objects
[ob
->oloc
.pool
].erase(ob
->get_soid());
723 ob
->set_item
.remove_myself();
727 void ObjectCacher::bh_read(BufferHead
*bh
, int op_flags
)
729 assert(lock
.is_locked());
730 ldout(cct
, 7) << "bh_read on " << *bh
<< " outstanding reads "
731 << reads_outstanding
<< dendl
;
734 bh
->last_read_tid
= ++last_read_tid
;
737 C_ReadFinish
*onfinish
= new C_ReadFinish(this, bh
->ob
, bh
->last_read_tid
,
738 bh
->start(), bh
->length());
740 writeback_handler
.read(bh
->ob
->get_oid(), bh
->ob
->get_object_number(),
741 bh
->ob
->get_oloc(), bh
->start(), bh
->length(),
742 bh
->ob
->get_snap(), &onfinish
->bl
,
743 bh
->ob
->truncate_size
, bh
->ob
->truncate_seq
,
749 void ObjectCacher::bh_read_finish(int64_t poolid
, sobject_t oid
,
750 ceph_tid_t tid
, loff_t start
,
751 uint64_t length
, bufferlist
&bl
, int r
,
754 assert(lock
.is_locked());
755 ldout(cct
, 7) << "bh_read_finish "
758 << " " << start
<< "~" << length
759 << " (bl is " << bl
.length() << ")"
761 << " outstanding reads " << reads_outstanding
764 if (r
>= 0 && bl
.length() < length
) {
765 ldout(cct
, 7) << "bh_read_finish " << oid
<< " padding " << start
<< "~"
766 << length
<< " with " << length
- bl
.length() << " bytes of zeroes"
768 bl
.append_zero(length
- bl
.length());
774 if (objects
[poolid
].count(oid
) == 0) {
775 ldout(cct
, 7) << "bh_read_finish no object cache" << dendl
;
777 Object
*ob
= objects
[poolid
][oid
];
779 if (r
== -ENOENT
&& !ob
->complete
) {
780 // wake up *all* rx waiters, or else we risk reordering
781 // identical reads. e.g.
783 // reply to unrelated 3~1 -> !exists
784 // read 1~1 -> immediate ENOENT
785 // reply to first 1~1 -> ooo ENOENT
787 for (map
<loff_t
, BufferHead
*>::iterator p
= ob
->data
.begin();
788 p
!= ob
->data
.end(); ++p
) {
789 BufferHead
*bh
= p
->second
;
790 for (map
<loff_t
, list
<Context
*> >::iterator p
791 = bh
->waitfor_read
.begin();
792 p
!= bh
->waitfor_read
.end();
794 ls
.splice(ls
.end(), p
->second
);
795 bh
->waitfor_read
.clear();
796 if (!bh
->is_zero() && !bh
->is_rx())
800 // just pass through and retry all waiters if we don't trust
801 // -ENOENT for this read
804 << "bh_read_finish ENOENT, marking complete and !exists on " << *ob
809 /* If all the bhs are effectively zero, get rid of them. All
810 * the waiters will be retried and get -ENOENT immediately, so
811 * it's safe to clean up the unneeded bh's now. Since we know
812 * it's safe to remove them now, do so, so they aren't hanging
813 *around waiting for more -ENOENTs from rados while the cache
814 * is being shut down.
816 * Only do this when all the bhs are rx or clean, to match the
817 * condition in _readx(). If there are any non-rx or non-clean
818 * bhs, _readx() will wait for the final result instead of
819 * returning -ENOENT immediately.
823 << "bh_read_finish ENOENT and allzero, getting rid of "
824 << "bhs for " << *ob
<< dendl
;
825 map
<loff_t
, BufferHead
*>::iterator p
= ob
->data
.begin();
826 while (p
!= ob
->data
.end()) {
827 BufferHead
*bh
= p
->second
;
828 // current iterator will be invalidated by bh_remove()
840 map
<loff_t
, BufferHead
*>::const_iterator p
= ob
->data_lower_bound(opos
);
841 if (p
== ob
->data
.end())
843 if (opos
>= start
+(loff_t
)length
) {
844 ldout(cct
, 20) << "break due to opos " << opos
<< " >= start+length "
845 << start
<< "+" << length
<< "=" << start
+(loff_t
)length
850 BufferHead
*bh
= p
->second
;
851 ldout(cct
, 20) << "checking bh " << *bh
<< dendl
;
854 for (map
<loff_t
, list
<Context
*> >::iterator it
855 = bh
->waitfor_read
.begin();
856 it
!= bh
->waitfor_read
.end();
858 ls
.splice(ls
.end(), it
->second
);
859 bh
->waitfor_read
.clear();
861 if (bh
->start() > opos
) {
862 ldout(cct
, 1) << "bh_read_finish skipping gap "
863 << opos
<< "~" << bh
->start() - opos
870 ldout(cct
, 10) << "bh_read_finish skipping non-rx " << *bh
<< dendl
;
875 if (bh
->last_read_tid
!= tid
) {
876 ldout(cct
, 10) << "bh_read_finish bh->last_read_tid "
877 << bh
->last_read_tid
<< " != tid " << tid
878 << ", skipping" << dendl
;
883 assert(opos
>= bh
->start());
884 assert(bh
->start() == opos
); // we don't merge rx bh's... yet!
885 assert(bh
->length() <= start
+(loff_t
)length
-opos
);
894 ldout(cct
, 10) << "bh_read_finish removing " << *bh
<< dendl
;
898 ldout(cct
, 10) << "skipping unstrusted -ENOENT and will retry for "
914 ldout(cct
, 10) << "bh_read_finish read " << *bh
<< dendl
;
916 ob
->try_merge_bh(bh
);
920 // called with lock held.
921 ldout(cct
, 20) << "finishing waiters " << ls
<< dendl
;
923 finish_contexts(cct
, ls
, err
);
924 retry_waiting_reads();
930 void ObjectCacher::bh_write_adjacencies(BufferHead
*bh
, ceph::real_time cutoff
,
931 int64_t *max_amount
, int *max_count
)
933 list
<BufferHead
*> blist
;
936 int64_t total_len
= 0;
937 set
<BufferHead
*, BufferHead::ptr_lt
>::iterator it
= dirty_or_tx_bh
.find(bh
);
938 assert(it
!= dirty_or_tx_bh
.end());
939 for (set
<BufferHead
*, BufferHead::ptr_lt
>::iterator p
= it
;
940 p
!= dirty_or_tx_bh
.end();
942 BufferHead
*obh
= *p
;
943 if (obh
->ob
!= bh
->ob
)
945 if (obh
->is_dirty() && obh
->last_write
<= cutoff
) {
946 blist
.push_back(obh
);
948 total_len
+= obh
->length();
949 if ((max_count
&& count
> *max_count
) ||
950 (max_amount
&& total_len
> *max_amount
))
955 while (it
!= dirty_or_tx_bh
.begin()) {
957 BufferHead
*obh
= *it
;
958 if (obh
->ob
!= bh
->ob
)
960 if (obh
->is_dirty() && obh
->last_write
<= cutoff
) {
961 blist
.push_front(obh
);
963 total_len
+= obh
->length();
964 if ((max_count
&& count
> *max_count
) ||
965 (max_amount
&& total_len
> *max_amount
))
972 *max_amount
-= total_len
;
974 bh_write_scattered(blist
);
977 class ObjectCacher::C_WriteCommit
: public Context
{
981 vector
<pair
<loff_t
, uint64_t> > ranges
;
984 C_WriteCommit(ObjectCacher
*c
, int64_t _poolid
, sobject_t o
, loff_t s
,
986 oc(c
), poolid(_poolid
), oid(o
), tid(0) {
987 ranges
.push_back(make_pair(s
, l
));
989 C_WriteCommit(ObjectCacher
*c
, int64_t _poolid
, sobject_t o
,
990 vector
<pair
<loff_t
, uint64_t> >& _ranges
) :
991 oc(c
), poolid(_poolid
), oid(o
), tid(0) {
992 ranges
.swap(_ranges
);
994 void finish(int r
) override
{
995 oc
->bh_write_commit(poolid
, oid
, ranges
, tid
, r
);
998 void ObjectCacher::bh_write_scattered(list
<BufferHead
*>& blist
)
1000 assert(lock
.is_locked());
1002 Object
*ob
= blist
.front()->ob
;
1005 ceph::real_time last_write
;
1007 vector
<pair
<loff_t
, uint64_t> > ranges
;
1008 vector
<pair
<uint64_t, bufferlist
> > io_vec
;
1010 ranges
.reserve(blist
.size());
1011 io_vec
.reserve(blist
.size());
1013 uint64_t total_len
= 0;
1014 for (list
<BufferHead
*>::iterator p
= blist
.begin(); p
!= blist
.end(); ++p
) {
1015 BufferHead
*bh
= *p
;
1016 ldout(cct
, 7) << "bh_write_scattered " << *bh
<< dendl
;
1017 assert(bh
->ob
== ob
);
1018 assert(bh
->bl
.length() == bh
->length());
1019 ranges
.push_back(pair
<loff_t
, uint64_t>(bh
->start(), bh
->length()));
1021 int n
= io_vec
.size();
1022 io_vec
.resize(n
+ 1);
1023 io_vec
[n
].first
= bh
->start();
1024 io_vec
[n
].second
= bh
->bl
;
1026 total_len
+= bh
->length();
1027 if (bh
->snapc
.seq
> snapc
.seq
)
1029 if (bh
->last_write
> last_write
)
1030 last_write
= bh
->last_write
;
1033 C_WriteCommit
*oncommit
= new C_WriteCommit(this, ob
->oloc
.pool
, ob
->get_soid(), ranges
);
1035 ceph_tid_t tid
= writeback_handler
.write(ob
->get_oid(), ob
->get_oloc(),
1036 io_vec
, snapc
, last_write
,
1037 ob
->truncate_size
, ob
->truncate_seq
,
1039 oncommit
->tid
= tid
;
1040 ob
->last_write_tid
= tid
;
1041 for (list
<BufferHead
*>::iterator p
= blist
.begin(); p
!= blist
.end(); ++p
) {
1042 BufferHead
*bh
= *p
;
1043 bh
->last_write_tid
= tid
;
1048 perfcounter
->inc(l_objectcacher_data_flushed
, total_len
);
1051 void ObjectCacher::bh_write(BufferHead
*bh
)
1053 assert(lock
.is_locked());
1054 ldout(cct
, 7) << "bh_write " << *bh
<< dendl
;
1059 C_WriteCommit
*oncommit
= new C_WriteCommit(this, bh
->ob
->oloc
.pool
,
1060 bh
->ob
->get_soid(), bh
->start(),
1063 ceph_tid_t tid
= writeback_handler
.write(bh
->ob
->get_oid(),
1065 bh
->start(), bh
->length(),
1066 bh
->snapc
, bh
->bl
, bh
->last_write
,
1067 bh
->ob
->truncate_size
,
1068 bh
->ob
->truncate_seq
,
1069 bh
->journal_tid
, oncommit
);
1070 ldout(cct
, 20) << " tid " << tid
<< " on " << bh
->ob
->get_oid() << dendl
;
1072 // set bh last_write_tid
1073 oncommit
->tid
= tid
;
1074 bh
->ob
->last_write_tid
= tid
;
1075 bh
->last_write_tid
= tid
;
1078 perfcounter
->inc(l_objectcacher_data_flushed
, bh
->length());
1084 void ObjectCacher::bh_write_commit(int64_t poolid
, sobject_t oid
,
1085 vector
<pair
<loff_t
, uint64_t> >& ranges
,
1086 ceph_tid_t tid
, int r
)
1088 assert(lock
.is_locked());
1089 ldout(cct
, 7) << "bh_write_commit " << oid
<< " tid " << tid
1090 << " ranges " << ranges
<< " returned " << r
<< dendl
;
1092 if (objects
[poolid
].count(oid
) == 0) {
1093 ldout(cct
, 7) << "bh_write_commit no object cache" << dendl
;
1097 Object
*ob
= objects
[poolid
][oid
];
1098 int was_dirty_or_tx
= ob
->oset
->dirty_or_tx
;
1100 for (vector
<pair
<loff_t
, uint64_t> >::iterator p
= ranges
.begin();
1103 loff_t start
= p
->first
;
1104 uint64_t length
= p
->second
;
1106 ldout(cct
, 10) << "bh_write_commit marking exists on " << *ob
<< dendl
;
1109 if (writeback_handler
.may_copy_on_write(ob
->get_oid(), start
, length
,
1111 ldout(cct
, 10) << "bh_write_commit may copy on write, clearing "
1112 "complete on " << *ob
<< dendl
;
1113 ob
->complete
= false;
1117 vector
<pair
<loff_t
, BufferHead
*>> hit
;
1119 for (map
<loff_t
, BufferHead
*>::const_iterator p
= ob
->data_lower_bound(start
);
1120 p
!= ob
->data
.end();
1122 BufferHead
*bh
= p
->second
;
1124 if (bh
->start() > start
+(loff_t
)length
)
1127 if (bh
->start() < start
&&
1128 bh
->end() > start
+(loff_t
)length
) {
1129 ldout(cct
, 20) << "bh_write_commit skipping " << *bh
<< dendl
;
1133 // make sure bh is tx
1135 ldout(cct
, 10) << "bh_write_commit skipping non-tx " << *bh
<< dendl
;
1139 // make sure bh tid matches
1140 if (bh
->last_write_tid
!= tid
) {
1141 assert(bh
->last_write_tid
> tid
);
1142 ldout(cct
, 10) << "bh_write_commit newer tid on " << *bh
<< dendl
;
1147 // ok! mark bh clean and error-free
1149 bh
->set_journal_tid(0);
1150 if (bh
->get_nocache())
1151 bh_lru_rest
.lru_bottouch(bh
);
1152 hit
.push_back(make_pair(bh
->start(), bh
));
1153 ldout(cct
, 10) << "bh_write_commit clean " << *bh
<< dendl
;
1156 ldout(cct
, 10) << "bh_write_commit marking dirty again due to error "
1157 << *bh
<< " r = " << r
<< " " << cpp_strerror(-r
)
1162 for (auto& p
: hit
) {
1163 //p.second maybe merged and deleted in merge_left
1164 if (ob
->data
.count(p
.first
))
1165 ob
->try_merge_bh(p
.second
);
1169 // update last_commit.
1170 assert(ob
->last_commit_tid
< tid
);
1171 ob
->last_commit_tid
= tid
;
1175 if (ob
->waitfor_commit
.count(tid
)) {
1176 ls
.splice(ls
.begin(), ob
->waitfor_commit
[tid
]);
1177 ob
->waitfor_commit
.erase(tid
);
1180 // is the entire object set now clean and fully committed?
1181 ObjectSet
*oset
= ob
->oset
;
1184 if (flush_set_callback
&&
1185 was_dirty_or_tx
> 0 &&
1186 oset
->dirty_or_tx
== 0) { // nothing dirty/tx
1187 flush_set_callback(flush_set_callback_arg
, oset
);
1191 finish_contexts(cct
, ls
, r
);
1194 void ObjectCacher::flush(loff_t amount
)
1196 assert(lock
.is_locked());
1197 ceph::real_time cutoff
= ceph::real_clock::now();
1199 ldout(cct
, 10) << "flush " << amount
<< dendl
;
1202 * NOTE: we aren't actually pulling things off the LRU here, just
1203 * looking at the tail item. Then we call bh_write, which moves it
1204 * to the other LRU, so that we can call
1205 * lru_dirty.lru_get_next_expire() again.
1207 int64_t left
= amount
;
1208 while (amount
== 0 || left
> 0) {
1209 BufferHead
*bh
= static_cast<BufferHead
*>(
1210 bh_lru_dirty
.lru_get_next_expire());
1212 if (bh
->last_write
> cutoff
) break;
1214 if (scattered_write
) {
1215 bh_write_adjacencies(bh
, cutoff
, amount
> 0 ? &left
: NULL
, NULL
);
1217 left
-= bh
->length();
1224 void ObjectCacher::trim()
1226 assert(lock
.is_locked());
1227 ldout(cct
, 10) << "trim start: bytes: max " << max_size
<< " clean "
1228 << get_stat_clean() << ", objects: max " << max_objects
1229 << " current " << ob_lru
.lru_get_size() << dendl
;
1231 while (get_stat_clean() > 0 && (uint64_t) get_stat_clean() > max_size
) {
1232 BufferHead
*bh
= static_cast<BufferHead
*>(bh_lru_rest
.lru_expire());
1236 ldout(cct
, 10) << "trim trimming " << *bh
<< dendl
;
1237 assert(bh
->is_clean() || bh
->is_zero() || bh
->is_error());
1239 Object
*ob
= bh
->ob
;
1244 ldout(cct
, 10) << "trim clearing complete on " << *ob
<< dendl
;
1245 ob
->complete
= false;
1249 while (ob_lru
.lru_get_size() > max_objects
) {
1250 Object
*ob
= static_cast<Object
*>(ob_lru
.lru_expire());
1254 ldout(cct
, 10) << "trim trimming " << *ob
<< dendl
;
1258 ldout(cct
, 10) << "trim finish: max " << max_size
<< " clean "
1259 << get_stat_clean() << ", objects: max " << max_objects
1260 << " current " << ob_lru
.lru_get_size() << dendl
;
1267 bool ObjectCacher::is_cached(ObjectSet
*oset
, vector
<ObjectExtent
>& extents
,
1270 assert(lock
.is_locked());
1271 for (vector
<ObjectExtent
>::iterator ex_it
= extents
.begin();
1272 ex_it
!= extents
.end();
1274 ldout(cct
, 10) << "is_cached " << *ex_it
<< dendl
;
1277 sobject_t
soid(ex_it
->oid
, snapid
);
1278 Object
*o
= get_object_maybe(soid
, ex_it
->oloc
);
1281 if (!o
->is_cached(ex_it
->offset
, ex_it
->length
))
1289 * returns # bytes read (if in cache). onfinish is untouched (caller
1291 * returns 0 if doing async read
1293 int ObjectCacher::readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
)
1295 return _readx(rd
, oset
, onfinish
, true);
1298 int ObjectCacher::_readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
,
1301 assert(lock
.is_locked());
1302 bool success
= true;
1304 uint64_t bytes_in_cache
= 0;
1305 uint64_t bytes_not_in_cache
= 0;
1306 uint64_t total_bytes_read
= 0;
1307 map
<uint64_t, bufferlist
> stripe_map
; // final buffer offset -> substring
1308 bool dontneed
= rd
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_DONTNEED
;
1309 bool nocache
= rd
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_NOCACHE
;
1312 * WARNING: we can only meaningfully return ENOENT if the read request
1313 * passed in a single ObjectExtent. Any caller who wants ENOENT instead of
1314 * zeroed buffers needs to feed single extents into readx().
1316 assert(!oset
->return_enoent
|| rd
->extents
.size() == 1);
1318 for (vector
<ObjectExtent
>::iterator ex_it
= rd
->extents
.begin();
1319 ex_it
!= rd
->extents
.end();
1321 ldout(cct
, 10) << "readx " << *ex_it
<< dendl
;
1323 total_bytes_read
+= ex_it
->length
;
1326 sobject_t
soid(ex_it
->oid
, rd
->snap
);
1327 Object
*o
= get_object(soid
, ex_it
->objectno
, oset
, ex_it
->oloc
,
1328 ex_it
->truncate_size
, oset
->truncate_seq
);
1332 // does not exist and no hits?
1333 if (oset
->return_enoent
&& !o
->exists
) {
1334 ldout(cct
, 10) << "readx object !exists, 1 extent..." << dendl
;
1336 // should we worry about COW underneath us?
1337 if (writeback_handler
.may_copy_on_write(soid
.oid
, ex_it
->offset
,
1338 ex_it
->length
, soid
.snap
)) {
1339 ldout(cct
, 20) << "readx may copy on write" << dendl
;
1341 list
<BufferHead
*> blist
;
1342 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= o
->data
.begin();
1343 bh_it
!= o
->data
.end();
1345 BufferHead
*bh
= bh_it
->second
;
1346 if (bh
->is_dirty() || bh
->is_tx()) {
1347 ldout(cct
, 10) << "readx flushing " << *bh
<< dendl
;
1349 if (bh
->is_dirty()) {
1350 if (scattered_write
)
1351 blist
.push_back(bh
);
1357 if (scattered_write
&& !blist
.empty())
1358 bh_write_scattered(blist
);
1360 ldout(cct
, 10) << "readx waiting on tid " << o
->last_write_tid
1361 << " on " << *o
<< dendl
;
1362 o
->waitfor_commit
[o
->last_write_tid
].push_back(
1363 new C_RetryRead(this,rd
, oset
, onfinish
));
1364 // FIXME: perfcounter!
1369 // can we return ENOENT?
1370 bool allzero
= true;
1371 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= o
->data
.begin();
1372 bh_it
!= o
->data
.end();
1374 ldout(cct
, 20) << "readx ob has bh " << *bh_it
->second
<< dendl
;
1375 if (!bh_it
->second
->is_zero() && !bh_it
->second
->is_rx()) {
1381 ldout(cct
, 10) << "readx ob has all zero|rx, returning ENOENT"
1390 // map extent into bufferheads
1391 map
<loff_t
, BufferHead
*> hits
, missing
, rx
, errors
;
1392 o
->map_read(*ex_it
, hits
, missing
, rx
, errors
);
1393 if (external_call
) {
1394 // retry reading error buffers
1395 missing
.insert(errors
.begin(), errors
.end());
1397 // some reads had errors, fail later so completions
1398 // are cleaned up properly
1399 // TODO: make read path not call _readx for every completion
1400 hits
.insert(errors
.begin(), errors
.end());
1403 if (!missing
.empty() || !rx
.empty()) {
1405 map
<loff_t
, BufferHead
*>::iterator last
= missing
.end();
1406 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= missing
.begin();
1407 bh_it
!= missing
.end();
1409 uint64_t rx_bytes
= static_cast<uint64_t>(
1410 stat_rx
+ bh_it
->second
->length());
1411 bytes_not_in_cache
+= bh_it
->second
->length();
1412 if (!waitfor_read
.empty() || (stat_rx
> 0 && rx_bytes
> max_size
)) {
1413 // cache is full with concurrent reads -- wait for rx's to complete
1414 // to constrain memory growth (especially during copy-ups)
1416 ldout(cct
, 10) << "readx missed, waiting on cache to complete "
1417 << waitfor_read
.size() << " blocked reads, "
1418 << (MAX(rx_bytes
, max_size
) - max_size
)
1419 << " read bytes" << dendl
;
1420 waitfor_read
.push_back(new C_RetryRead(this, rd
, oset
, onfinish
));
1423 bh_remove(o
, bh_it
->second
);
1424 delete bh_it
->second
;
1426 bh_it
->second
->set_nocache(nocache
);
1427 bh_read(bh_it
->second
, rd
->fadvise_flags
);
1428 if ((success
&& onfinish
) || last
!= missing
.end())
1434 //add wait in last bh avoid wakeup early. Because read is order
1435 if (last
!= missing
.end()) {
1436 ldout(cct
, 10) << "readx missed, waiting on " << *last
->second
1437 << " off " << last
->first
<< dendl
;
1438 last
->second
->waitfor_read
[last
->first
].push_back(
1439 new C_RetryRead(this, rd
, oset
, onfinish
) );
1444 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= rx
.begin();
1447 touch_bh(bh_it
->second
); // bump in lru, so we don't lose it.
1448 if (success
&& onfinish
) {
1449 ldout(cct
, 10) << "readx missed, waiting on " << *bh_it
->second
1450 << " off " << bh_it
->first
<< dendl
;
1451 bh_it
->second
->waitfor_read
[bh_it
->first
].push_back(
1452 new C_RetryRead(this, rd
, oset
, onfinish
) );
1454 bytes_not_in_cache
+= bh_it
->second
->length();
1458 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= hits
.begin();
1459 bh_it
!= hits
.end(); ++bh_it
)
1460 //bump in lru, so we don't lose it when later read
1461 touch_bh(bh_it
->second
);
1464 assert(!hits
.empty());
1466 // make a plain list
1467 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= hits
.begin();
1468 bh_it
!= hits
.end();
1470 BufferHead
*bh
= bh_it
->second
;
1471 ldout(cct
, 10) << "readx hit bh " << *bh
<< dendl
;
1472 if (bh
->is_error() && bh
->error
)
1474 bytes_in_cache
+= bh
->length();
1476 if (bh
->get_nocache() && bh
->is_clean())
1477 bh_lru_rest
.lru_bottouch(bh
);
1480 //must be after touch_bh because touch_bh set dontneed false
1482 ((loff_t
)ex_it
->offset
<= bh
->start() &&
1483 (bh
->end() <=(loff_t
)(ex_it
->offset
+ ex_it
->length
)))) {
1484 bh
->set_dontneed(true); //if dirty
1486 bh_lru_rest
.lru_bottouch(bh
);
1491 // create reverse map of buffer offset -> object for the
1492 // eventual result. this is over a single ObjectExtent, so we
1494 // - the bh's are contiguous
1495 // - the buffer frags need not be (and almost certainly aren't)
1496 loff_t opos
= ex_it
->offset
;
1497 map
<loff_t
, BufferHead
*>::iterator bh_it
= hits
.begin();
1498 assert(bh_it
->second
->start() <= opos
);
1499 uint64_t bhoff
= opos
- bh_it
->second
->start();
1500 vector
<pair
<uint64_t,uint64_t> >::iterator f_it
1501 = ex_it
->buffer_extents
.begin();
1504 BufferHead
*bh
= bh_it
->second
;
1505 assert(opos
== (loff_t
)(bh
->start() + bhoff
));
1507 uint64_t len
= MIN(f_it
->second
- foff
, bh
->length() - bhoff
);
1508 ldout(cct
, 10) << "readx rmap opos " << opos
<< ": " << *bh
<< " +"
1509 << bhoff
<< " frag " << f_it
->first
<< "~"
1510 << f_it
->second
<< " +" << foff
<< "~" << len
1514 // put substr here first, since substr_of clobbers, and we
1515 // may get multiple bh's at this stripe_map position
1516 if (bh
->is_zero()) {
1517 stripe_map
[f_it
->first
].append_zero(len
);
1519 bit
.substr_of(bh
->bl
,
1522 stripe_map
[f_it
->first
].claim_append(bit
);
1528 if (opos
== bh
->end()) {
1532 if (foff
== f_it
->second
) {
1536 if (bh_it
== hits
.end()) break;
1537 if (f_it
== ex_it
->buffer_extents
.end())
1540 assert(f_it
== ex_it
->buffer_extents
.end());
1541 assert(opos
== (loff_t
)ex_it
->offset
+ (loff_t
)ex_it
->length
);
1544 if (dontneed
&& o
->include_all_cached_data(ex_it
->offset
, ex_it
->length
))
1550 if (perfcounter
&& external_call
) {
1551 perfcounter
->inc(l_objectcacher_data_read
, total_bytes_read
);
1552 perfcounter
->inc(l_objectcacher_cache_bytes_miss
, bytes_not_in_cache
);
1553 perfcounter
->inc(l_objectcacher_cache_ops_miss
);
1556 ldout(cct
, 20) << "readx defer " << rd
<< dendl
;
1558 ldout(cct
, 20) << "readx drop " << rd
<< " (no complete, but no waiter)"
1564 if (perfcounter
&& external_call
) {
1565 perfcounter
->inc(l_objectcacher_data_read
, total_bytes_read
);
1566 perfcounter
->inc(l_objectcacher_cache_bytes_hit
, bytes_in_cache
);
1567 perfcounter
->inc(l_objectcacher_cache_ops_hit
);
1570 // no misses... success! do the read.
1571 ldout(cct
, 10) << "readx has all buffers" << dendl
;
1573 // ok, assemble into result buffer.
1575 if (rd
->bl
&& !error
) {
1577 for (map
<uint64_t,bufferlist
>::iterator i
= stripe_map
.begin();
1578 i
!= stripe_map
.end();
1580 assert(pos
== i
->first
);
1581 ldout(cct
, 10) << "readx adding buffer len " << i
->second
.length()
1582 << " at " << pos
<< dendl
;
1583 pos
+= i
->second
.length();
1584 rd
->bl
->claim_append(i
->second
);
1585 assert(rd
->bl
->length() == pos
);
1587 ldout(cct
, 10) << "readx result is " << rd
->bl
->length() << dendl
;
1588 } else if (!error
) {
1589 ldout(cct
, 10) << "readx no bufferlist ptr (readahead?), done." << dendl
;
1590 map
<uint64_t,bufferlist
>::reverse_iterator i
= stripe_map
.rbegin();
1591 pos
= i
->first
+ i
->second
.length();
1595 int ret
= error
? error
: pos
;
1596 ldout(cct
, 20) << "readx done " << rd
<< " " << ret
<< dendl
;
1597 assert(pos
<= (uint64_t) INT_MAX
);
1606 void ObjectCacher::retry_waiting_reads()
1609 ls
.swap(waitfor_read
);
1611 while (!ls
.empty() && waitfor_read
.empty()) {
1612 Context
*ctx
= ls
.front();
1616 waitfor_read
.splice(waitfor_read
.end(), ls
);
1619 int ObjectCacher::writex(OSDWrite
*wr
, ObjectSet
*oset
, Context
*onfreespace
)
1621 assert(lock
.is_locked());
1622 ceph::real_time now
= ceph::real_clock::now();
1623 uint64_t bytes_written
= 0;
1624 uint64_t bytes_written_in_flush
= 0;
1625 bool dontneed
= wr
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_DONTNEED
;
1626 bool nocache
= wr
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_NOCACHE
;
1628 for (vector
<ObjectExtent
>::iterator ex_it
= wr
->extents
.begin();
1629 ex_it
!= wr
->extents
.end();
1632 sobject_t
soid(ex_it
->oid
, CEPH_NOSNAP
);
1633 Object
*o
= get_object(soid
, ex_it
->objectno
, oset
, ex_it
->oloc
,
1634 ex_it
->truncate_size
, oset
->truncate_seq
);
1636 // map it all into a single bufferhead.
1637 BufferHead
*bh
= o
->map_write(*ex_it
, wr
->journal_tid
);
1638 bool missing
= bh
->is_missing();
1639 bh
->snapc
= wr
->snapc
;
1641 bytes_written
+= ex_it
->length
;
1643 bytes_written_in_flush
+= ex_it
->length
;
1646 // adjust buffer pointers (ie "copy" data into my cache)
1647 // this is over a single ObjectExtent, so we know that
1648 // - there is one contiguous bh
1649 // - the buffer frags need not be (and almost certainly aren't)
1650 // note: i assume striping is monotonic... no jumps backwards, ever!
1651 loff_t opos
= ex_it
->offset
;
1652 for (vector
<pair
<uint64_t, uint64_t> >::iterator f_it
1653 = ex_it
->buffer_extents
.begin();
1654 f_it
!= ex_it
->buffer_extents
.end();
1656 ldout(cct
, 10) << "writex writing " << f_it
->first
<< "~"
1657 << f_it
->second
<< " into " << *bh
<< " at " << opos
1659 uint64_t bhoff
= bh
->start() - opos
;
1660 assert(f_it
->second
<= bh
->length() - bhoff
);
1662 // get the frag we're mapping in
1664 frag
.substr_of(wr
->bl
,
1665 f_it
->first
, f_it
->second
);
1667 // keep anything left of bhoff
1670 newbl
.substr_of(bh
->bl
, 0, bhoff
);
1671 newbl
.claim_append(frag
);
1674 opos
+= f_it
->second
;
1677 // ok, now bh is dirty.
1680 bh
->set_dontneed(true);
1681 else if (nocache
&& missing
)
1682 bh
->set_nocache(true);
1686 bh
->last_write
= now
;
1688 o
->try_merge_bh(bh
);
1692 perfcounter
->inc(l_objectcacher_data_written
, bytes_written
);
1693 if (bytes_written_in_flush
) {
1694 perfcounter
->inc(l_objectcacher_overwritten_in_flush
,
1695 bytes_written_in_flush
);
1699 int r
= _wait_for_write(wr
, bytes_written
, oset
, onfreespace
);
1707 class ObjectCacher::C_WaitForWrite
: public Context
{
1709 C_WaitForWrite(ObjectCacher
*oc
, uint64_t len
, Context
*onfinish
) :
1710 m_oc(oc
), m_len(len
), m_onfinish(onfinish
) {}
1711 void finish(int r
) override
;
1715 Context
*m_onfinish
;
1718 void ObjectCacher::C_WaitForWrite::finish(int r
)
1720 Mutex::Locker
l(m_oc
->lock
);
1721 m_oc
->maybe_wait_for_writeback(m_len
);
1722 m_onfinish
->complete(r
);
1725 void ObjectCacher::maybe_wait_for_writeback(uint64_t len
)
1727 assert(lock
.is_locked());
1728 ceph::mono_time start
= ceph::mono_clock::now();
1730 // wait for writeback?
1731 // - wait for dirty and tx bytes (relative to the max_dirty threshold)
1732 // - do not wait for bytes other waiters are waiting on. this means that
1733 // threads do not wait for each other. this effectively allows the cache
1734 // size to balloon proportional to the data that is in flight.
1735 while (get_stat_dirty() + get_stat_tx() > 0 &&
1736 (uint64_t) (get_stat_dirty() + get_stat_tx()) >=
1737 max_dirty
+ get_stat_dirty_waiting()) {
1738 ldout(cct
, 10) << __func__
<< " waiting for dirty|tx "
1739 << (get_stat_dirty() + get_stat_tx()) << " >= max "
1740 << max_dirty
<< " + dirty_waiting "
1741 << get_stat_dirty_waiting() << dendl
;
1742 flusher_cond
.Signal();
1743 stat_dirty_waiting
+= len
;
1744 stat_cond
.Wait(lock
);
1745 stat_dirty_waiting
-= len
;
1747 ldout(cct
, 10) << __func__
<< " woke up" << dendl
;
1749 if (blocked
&& perfcounter
) {
1750 perfcounter
->inc(l_objectcacher_write_ops_blocked
);
1751 perfcounter
->inc(l_objectcacher_write_bytes_blocked
, len
);
1752 ceph::timespan blocked
= ceph::mono_clock::now() - start
;
1753 perfcounter
->tinc(l_objectcacher_write_time_blocked
, blocked
);
1757 // blocking wait for write.
1758 int ObjectCacher::_wait_for_write(OSDWrite
*wr
, uint64_t len
, ObjectSet
*oset
,
1759 Context
*onfreespace
)
1761 assert(lock
.is_locked());
1764 if (max_dirty
> 0) {
1765 if (block_writes_upfront
) {
1766 maybe_wait_for_writeback(len
);
1768 onfreespace
->complete(0);
1770 assert(onfreespace
);
1771 finisher
.queue(new C_WaitForWrite(this, len
, onfreespace
));
1774 // write-thru! flush what we just wrote.
1777 Context
*fin
= block_writes_upfront
?
1778 new C_Cond(&cond
, &done
, &ret
) : onfreespace
;
1780 bool flushed
= flush_set(oset
, wr
->extents
, fin
);
1781 assert(!flushed
); // we just dirtied it, and didn't drop our lock!
1782 ldout(cct
, 10) << "wait_for_write waiting on write-thru of " << len
1783 << " bytes" << dendl
;
1784 if (block_writes_upfront
) {
1787 ldout(cct
, 10) << "wait_for_write woke up, ret " << ret
<< dendl
;
1789 onfreespace
->complete(ret
);
1793 // start writeback anyway?
1794 if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty
) {
1795 ldout(cct
, 10) << "wait_for_write " << get_stat_dirty() << " > target "
1796 << target_dirty
<< ", nudging flusher" << dendl
;
1797 flusher_cond
.Signal();
1802 void ObjectCacher::flusher_entry()
1804 ldout(cct
, 10) << "flusher start" << dendl
;
1806 while (!flusher_stop
) {
1807 loff_t all
= get_stat_tx() + get_stat_rx() + get_stat_clean() +
1809 ldout(cct
, 11) << "flusher "
1810 << all
<< " / " << max_size
<< ": "
1811 << get_stat_tx() << " tx, "
1812 << get_stat_rx() << " rx, "
1813 << get_stat_clean() << " clean, "
1814 << get_stat_dirty() << " dirty ("
1815 << target_dirty
<< " target, "
1816 << max_dirty
<< " max)"
1818 loff_t actual
= get_stat_dirty() + get_stat_dirty_waiting();
1819 if (actual
> 0 && (uint64_t) actual
> target_dirty
) {
1820 // flush some dirty pages
1821 ldout(cct
, 10) << "flusher " << get_stat_dirty() << " dirty + "
1822 << get_stat_dirty_waiting() << " dirty_waiting > target "
1823 << target_dirty
<< ", flushing some dirty bhs" << dendl
;
1824 flush(actual
- target_dirty
);
1826 // check tail of lru for old dirty items
1827 ceph::real_time cutoff
= ceph::real_clock::now();
1828 cutoff
-= max_dirty_age
;
1830 int max
= MAX_FLUSH_UNDER_LOCK
;
1831 while ((bh
= static_cast<BufferHead
*>(bh_lru_dirty
.
1832 lru_get_next_expire())) != 0 &&
1833 bh
->last_write
<= cutoff
&&
1835 ldout(cct
, 10) << "flusher flushing aged dirty bh " << *bh
<< dendl
;
1836 if (scattered_write
) {
1837 bh_write_adjacencies(bh
, cutoff
, NULL
, &max
);
1844 // back off the lock to avoid starving other threads
1853 flusher_cond
.WaitInterval(lock
, seconds(1));
1856 /* Wait for reads to finish. This is only possible if handling
1857 * -ENOENT made some read completions finish before their rados read
1858 * came back. If we don't wait for them, and destroy the cache, when
1859 * the rados reads do come back their callback will try to access the
1860 * no-longer-valid ObjectCacher.
1862 while (reads_outstanding
> 0) {
1863 ldout(cct
, 10) << "Waiting for all reads to complete. Number left: "
1864 << reads_outstanding
<< dendl
;
1865 read_cond
.Wait(lock
);
1869 ldout(cct
, 10) << "flusher finish" << dendl
;
1873 // -------------------------------------------------
1875 bool ObjectCacher::set_is_empty(ObjectSet
*oset
)
1877 assert(lock
.is_locked());
1878 if (oset
->objects
.empty())
1881 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin(); !p
.end(); ++p
)
1882 if (!(*p
)->is_empty())
1888 bool ObjectCacher::set_is_cached(ObjectSet
*oset
)
1890 assert(lock
.is_locked());
1891 if (oset
->objects
.empty())
1894 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin();
1897 for (map
<loff_t
,BufferHead
*>::iterator q
= ob
->data
.begin();
1898 q
!= ob
->data
.end();
1900 BufferHead
*bh
= q
->second
;
1901 if (!bh
->is_dirty() && !bh
->is_tx())
1909 bool ObjectCacher::set_is_dirty_or_committing(ObjectSet
*oset
)
1911 assert(lock
.is_locked());
1912 if (oset
->objects
.empty())
1915 for (xlist
<Object
*>::iterator i
= oset
->objects
.begin();
1919 for (map
<loff_t
,BufferHead
*>::iterator p
= ob
->data
.begin();
1920 p
!= ob
->data
.end();
1922 BufferHead
*bh
= p
->second
;
1923 if (bh
->is_dirty() || bh
->is_tx())
1932 // purge. non-blocking. violently removes dirty buffers from cache.
1933 void ObjectCacher::purge(Object
*ob
)
1935 assert(lock
.is_locked());
1936 ldout(cct
, 10) << "purge " << *ob
<< dendl
;
1942 // flush. non-blocking. no callback.
1943 // true if clean, already flushed.
1944 // false if we wrote something.
1945 // be sloppy about the ranges and flush any buffer it touches
1946 bool ObjectCacher::flush(Object
*ob
, loff_t offset
, loff_t length
)
1948 assert(lock
.is_locked());
1949 list
<BufferHead
*> blist
;
1951 ldout(cct
, 10) << "flush " << *ob
<< " " << offset
<< "~" << length
<< dendl
;
1952 for (map
<loff_t
,BufferHead
*>::const_iterator p
= ob
->data_lower_bound(offset
);
1953 p
!= ob
->data
.end();
1955 BufferHead
*bh
= p
->second
;
1956 ldout(cct
, 20) << "flush " << *bh
<< dendl
;
1957 if (length
&& bh
->start() > offset
+length
) {
1964 if (!bh
->is_dirty()) {
1968 if (scattered_write
)
1969 blist
.push_back(bh
);
1974 if (scattered_write
&& !blist
.empty())
1975 bh_write_scattered(blist
);
1980 bool ObjectCacher::_flush_set_finish(C_GatherBuilder
*gather
,
1983 assert(lock
.is_locked());
1984 if (gather
->has_subs()) {
1985 gather
->set_finisher(onfinish
);
1990 ldout(cct
, 10) << "flush_set has no dirty|tx bhs" << dendl
;
1991 onfinish
->complete(0);
1995 // flush. non-blocking, takes callback.
1996 // returns true if already flushed
1997 bool ObjectCacher::flush_set(ObjectSet
*oset
, Context
*onfinish
)
1999 assert(lock
.is_locked());
2000 assert(onfinish
!= NULL
);
2001 if (oset
->objects
.empty()) {
2002 ldout(cct
, 10) << "flush_set on " << oset
<< " dne" << dendl
;
2003 onfinish
->complete(0);
2007 ldout(cct
, 10) << "flush_set " << oset
<< dendl
;
2009 // we'll need to wait for all objects to flush!
2010 C_GatherBuilder
gather(cct
);
2011 set
<Object
*> waitfor_commit
;
2013 list
<BufferHead
*> blist
;
2014 Object
*last_ob
= NULL
;
2015 set
<BufferHead
*, BufferHead::ptr_lt
>::const_iterator it
, p
, q
;
2017 // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
2018 // order. But items in oset->objects are not sorted. So the iterator can
2019 // point to any buffer head in the ObjectSet
2020 BufferHead
key(*oset
->objects
.begin());
2021 it
= dirty_or_tx_bh
.lower_bound(&key
);
2024 bool backwards
= true;
2025 if (it
!= dirty_or_tx_bh
.begin())
2030 for (; p
!= dirty_or_tx_bh
.end(); p
= q
) {
2032 BufferHead
*bh
= *p
;
2033 if (bh
->ob
->oset
!= oset
)
2035 waitfor_commit
.insert(bh
->ob
);
2036 if (bh
->is_dirty()) {
2037 if (scattered_write
) {
2038 if (last_ob
!= bh
->ob
) {
2039 if (!blist
.empty()) {
2040 bh_write_scattered(blist
);
2045 blist
.push_back(bh
);
2053 for(p
= q
= it
; true; p
= q
) {
2054 if (q
!= dirty_or_tx_bh
.begin())
2058 BufferHead
*bh
= *p
;
2059 if (bh
->ob
->oset
!= oset
)
2061 waitfor_commit
.insert(bh
->ob
);
2062 if (bh
->is_dirty()) {
2063 if (scattered_write
) {
2064 if (last_ob
!= bh
->ob
) {
2065 if (!blist
.empty()) {
2066 bh_write_scattered(blist
);
2071 blist
.push_front(bh
);
2081 if (scattered_write
&& !blist
.empty())
2082 bh_write_scattered(blist
);
2084 for (set
<Object
*>::iterator i
= waitfor_commit
.begin();
2085 i
!= waitfor_commit
.end(); ++i
) {
2088 // we'll need to gather...
2089 ldout(cct
, 10) << "flush_set " << oset
<< " will wait for ack tid "
2090 << ob
->last_write_tid
<< " on " << *ob
<< dendl
;
2091 ob
->waitfor_commit
[ob
->last_write_tid
].push_back(gather
.new_sub());
2094 return _flush_set_finish(&gather
, onfinish
);
2097 // flush. non-blocking, takes callback.
2098 // returns true if already flushed
2099 bool ObjectCacher::flush_set(ObjectSet
*oset
, vector
<ObjectExtent
>& exv
,
2102 assert(lock
.is_locked());
2103 assert(onfinish
!= NULL
);
2104 if (oset
->objects
.empty()) {
2105 ldout(cct
, 10) << "flush_set on " << oset
<< " dne" << dendl
;
2106 onfinish
->complete(0);
2110 ldout(cct
, 10) << "flush_set " << oset
<< " on " << exv
.size()
2111 << " ObjectExtents" << dendl
;
2113 // we'll need to wait for all objects to flush!
2114 C_GatherBuilder
gather(cct
);
2116 for (vector
<ObjectExtent
>::iterator p
= exv
.begin();
2119 ObjectExtent
&ex
= *p
;
2120 sobject_t
soid(ex
.oid
, CEPH_NOSNAP
);
2121 if (objects
[oset
->poolid
].count(soid
) == 0)
2123 Object
*ob
= objects
[oset
->poolid
][soid
];
2125 ldout(cct
, 20) << "flush_set " << oset
<< " ex " << ex
<< " ob " << soid
2126 << " " << ob
<< dendl
;
2128 if (!flush(ob
, ex
.offset
, ex
.length
)) {
2129 // we'll need to gather...
2130 ldout(cct
, 10) << "flush_set " << oset
<< " will wait for ack tid "
2131 << ob
->last_write_tid
<< " on " << *ob
<< dendl
;
2132 ob
->waitfor_commit
[ob
->last_write_tid
].push_back(gather
.new_sub());
2136 return _flush_set_finish(&gather
, onfinish
);
2139 // flush all dirty data. non-blocking, takes callback.
2140 // returns true if already flushed
2141 bool ObjectCacher::flush_all(Context
*onfinish
)
2143 assert(lock
.is_locked());
2144 assert(onfinish
!= NULL
);
2146 ldout(cct
, 10) << "flush_all " << dendl
;
2148 // we'll need to wait for all objects to flush!
2149 C_GatherBuilder
gather(cct
);
2150 set
<Object
*> waitfor_commit
;
2152 list
<BufferHead
*> blist
;
2153 Object
*last_ob
= NULL
;
2154 set
<BufferHead
*, BufferHead::ptr_lt
>::iterator next
, it
;
2155 next
= it
= dirty_or_tx_bh
.begin();
2156 while (it
!= dirty_or_tx_bh
.end()) {
2158 BufferHead
*bh
= *it
;
2159 waitfor_commit
.insert(bh
->ob
);
2161 if (bh
->is_dirty()) {
2162 if (scattered_write
) {
2163 if (last_ob
!= bh
->ob
) {
2164 if (!blist
.empty()) {
2165 bh_write_scattered(blist
);
2170 blist
.push_back(bh
);
2179 if (scattered_write
&& !blist
.empty())
2180 bh_write_scattered(blist
);
2182 for (set
<Object
*>::iterator i
= waitfor_commit
.begin();
2183 i
!= waitfor_commit
.end();
2187 // we'll need to gather...
2188 ldout(cct
, 10) << "flush_all will wait for ack tid "
2189 << ob
->last_write_tid
<< " on " << *ob
<< dendl
;
2190 ob
->waitfor_commit
[ob
->last_write_tid
].push_back(gather
.new_sub());
2193 return _flush_set_finish(&gather
, onfinish
);
2196 void ObjectCacher::purge_set(ObjectSet
*oset
)
2198 assert(lock
.is_locked());
2199 if (oset
->objects
.empty()) {
2200 ldout(cct
, 10) << "purge_set on " << oset
<< " dne" << dendl
;
2204 ldout(cct
, 10) << "purge_set " << oset
<< dendl
;
2205 const bool were_dirty
= oset
->dirty_or_tx
> 0;
2207 for (xlist
<Object
*>::iterator i
= oset
->objects
.begin();
2213 // Although we have purged rather than flushed, caller should still
2214 // drop any resources associate with dirty data.
2215 assert(oset
->dirty_or_tx
== 0);
2216 if (flush_set_callback
&& were_dirty
) {
2217 flush_set_callback(flush_set_callback_arg
, oset
);
2222 loff_t
ObjectCacher::release(Object
*ob
)
2224 assert(lock
.is_locked());
2225 list
<BufferHead
*> clean
;
2226 loff_t o_unclean
= 0;
2228 for (map
<loff_t
,BufferHead
*>::iterator p
= ob
->data
.begin();
2229 p
!= ob
->data
.end();
2231 BufferHead
*bh
= p
->second
;
2232 if (bh
->is_clean() || bh
->is_zero() || bh
->is_error())
2233 clean
.push_back(bh
);
2235 o_unclean
+= bh
->length();
2238 for (list
<BufferHead
*>::iterator p
= clean
.begin();
2245 if (ob
->can_close()) {
2246 ldout(cct
, 10) << "release trimming " << *ob
<< dendl
;
2248 assert(o_unclean
== 0);
2253 ldout(cct
, 10) << "release clearing complete on " << *ob
<< dendl
;
2254 ob
->complete
= false;
2257 ldout(cct
, 10) << "release setting exists on " << *ob
<< dendl
;
2264 loff_t
ObjectCacher::release_set(ObjectSet
*oset
)
2266 assert(lock
.is_locked());
2267 // return # bytes not clean (and thus not released).
2270 if (oset
->objects
.empty()) {
2271 ldout(cct
, 10) << "release_set on " << oset
<< " dne" << dendl
;
2275 ldout(cct
, 10) << "release_set " << oset
<< dendl
;
2277 xlist
<Object
*>::iterator q
;
2278 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin();
2284 loff_t o_unclean
= release(ob
);
2285 unclean
+= o_unclean
;
2288 ldout(cct
, 10) << "release_set " << oset
<< " " << *ob
2289 << " has " << o_unclean
<< " bytes left"
2295 ldout(cct
, 10) << "release_set " << oset
2296 << ", " << unclean
<< " bytes left" << dendl
;
2303 uint64_t ObjectCacher::release_all()
2305 assert(lock
.is_locked());
2306 ldout(cct
, 10) << "release_all" << dendl
;
2307 uint64_t unclean
= 0;
2309 vector
<ceph::unordered_map
<sobject_t
, Object
*> >::iterator i
2311 while (i
!= objects
.end()) {
2312 ceph::unordered_map
<sobject_t
, Object
*>::iterator p
= i
->begin();
2313 while (p
!= i
->end()) {
2314 ceph::unordered_map
<sobject_t
, Object
*>::iterator n
= p
;
2317 Object
*ob
= p
->second
;
2319 loff_t o_unclean
= release(ob
);
2320 unclean
+= o_unclean
;
2323 ldout(cct
, 10) << "release_all " << *ob
2324 << " has " << o_unclean
<< " bytes left"
2332 ldout(cct
, 10) << "release_all unclean " << unclean
<< " bytes left"
2339 void ObjectCacher::clear_nonexistence(ObjectSet
*oset
)
2341 assert(lock
.is_locked());
2342 ldout(cct
, 10) << "clear_nonexistence() " << oset
<< dendl
;
2344 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin();
2348 ldout(cct
, 10) << " setting exists and complete on " << *ob
<< dendl
;
2350 ob
->complete
= false;
2352 for (xlist
<C_ReadFinish
*>::iterator q
= ob
->reads
.begin();
2354 C_ReadFinish
*comp
= *q
;
2355 comp
->distrust_enoent();
2361 * discard object extents from an ObjectSet by removing the objects in
2362 * exls from the in-memory oset.
2364 void ObjectCacher::discard_set(ObjectSet
*oset
, const vector
<ObjectExtent
>& exls
)
2366 assert(lock
.is_locked());
2367 if (oset
->objects
.empty()) {
2368 ldout(cct
, 10) << "discard_set on " << oset
<< " dne" << dendl
;
2372 ldout(cct
, 10) << "discard_set " << oset
<< dendl
;
2374 bool were_dirty
= oset
->dirty_or_tx
> 0;
2376 for (vector
<ObjectExtent
>::const_iterator p
= exls
.begin();
2379 ldout(cct
, 10) << "discard_set " << oset
<< " ex " << *p
<< dendl
;
2380 const ObjectExtent
&ex
= *p
;
2381 sobject_t
soid(ex
.oid
, CEPH_NOSNAP
);
2382 if (objects
[oset
->poolid
].count(soid
) == 0)
2384 Object
*ob
= objects
[oset
->poolid
][soid
];
2386 ob
->discard(ex
.offset
, ex
.length
);
2389 // did we truncate off dirty data?
2390 if (flush_set_callback
&&
2391 were_dirty
&& oset
->dirty_or_tx
== 0)
2392 flush_set_callback(flush_set_callback_arg
, oset
);
2395 void ObjectCacher::verify_stats() const
2397 assert(lock
.is_locked());
2398 ldout(cct
, 10) << "verify_stats" << dendl
;
2400 loff_t clean
= 0, zero
= 0, dirty
= 0, rx
= 0, tx
= 0, missing
= 0,
2402 for (vector
<ceph::unordered_map
<sobject_t
, Object
*> >::const_iterator i
2406 for (ceph::unordered_map
<sobject_t
, Object
*>::const_iterator p
2410 Object
*ob
= p
->second
;
2411 for (map
<loff_t
, BufferHead
*>::const_iterator q
= ob
->data
.begin();
2412 q
!= ob
->data
.end();
2414 BufferHead
*bh
= q
->second
;
2415 switch (bh
->get_state()) {
2416 case BufferHead::STATE_MISSING
:
2417 missing
+= bh
->length();
2419 case BufferHead::STATE_CLEAN
:
2420 clean
+= bh
->length();
2422 case BufferHead::STATE_ZERO
:
2423 zero
+= bh
->length();
2425 case BufferHead::STATE_DIRTY
:
2426 dirty
+= bh
->length();
2428 case BufferHead::STATE_TX
:
2431 case BufferHead::STATE_RX
:
2434 case BufferHead::STATE_ERROR
:
2435 error
+= bh
->length();
2444 ldout(cct
, 10) << " clean " << clean
<< " rx " << rx
<< " tx " << tx
2445 << " dirty " << dirty
<< " missing " << missing
2446 << " error " << error
<< dendl
;
2447 assert(clean
== stat_clean
);
2448 assert(rx
== stat_rx
);
2449 assert(tx
== stat_tx
);
2450 assert(dirty
== stat_dirty
);
2451 assert(missing
== stat_missing
);
2452 assert(zero
== stat_zero
);
2453 assert(error
== stat_error
);
2456 void ObjectCacher::bh_stat_add(BufferHead
*bh
)
2458 assert(lock
.is_locked());
2459 switch (bh
->get_state()) {
2460 case BufferHead::STATE_MISSING
:
2461 stat_missing
+= bh
->length();
2463 case BufferHead::STATE_CLEAN
:
2464 stat_clean
+= bh
->length();
2466 case BufferHead::STATE_ZERO
:
2467 stat_zero
+= bh
->length();
2469 case BufferHead::STATE_DIRTY
:
2470 stat_dirty
+= bh
->length();
2471 bh
->ob
->dirty_or_tx
+= bh
->length();
2472 bh
->ob
->oset
->dirty_or_tx
+= bh
->length();
2474 case BufferHead::STATE_TX
:
2475 stat_tx
+= bh
->length();
2476 bh
->ob
->dirty_or_tx
+= bh
->length();
2477 bh
->ob
->oset
->dirty_or_tx
+= bh
->length();
2479 case BufferHead::STATE_RX
:
2480 stat_rx
+= bh
->length();
2482 case BufferHead::STATE_ERROR
:
2483 stat_error
+= bh
->length();
2486 assert(0 == "bh_stat_add: invalid bufferhead state");
2488 if (get_stat_dirty_waiting() > 0)
2492 void ObjectCacher::bh_stat_sub(BufferHead
*bh
)
2494 assert(lock
.is_locked());
2495 switch (bh
->get_state()) {
2496 case BufferHead::STATE_MISSING
:
2497 stat_missing
-= bh
->length();
2499 case BufferHead::STATE_CLEAN
:
2500 stat_clean
-= bh
->length();
2502 case BufferHead::STATE_ZERO
:
2503 stat_zero
-= bh
->length();
2505 case BufferHead::STATE_DIRTY
:
2506 stat_dirty
-= bh
->length();
2507 bh
->ob
->dirty_or_tx
-= bh
->length();
2508 bh
->ob
->oset
->dirty_or_tx
-= bh
->length();
2510 case BufferHead::STATE_TX
:
2511 stat_tx
-= bh
->length();
2512 bh
->ob
->dirty_or_tx
-= bh
->length();
2513 bh
->ob
->oset
->dirty_or_tx
-= bh
->length();
2515 case BufferHead::STATE_RX
:
2516 stat_rx
-= bh
->length();
2518 case BufferHead::STATE_ERROR
:
2519 stat_error
-= bh
->length();
2522 assert(0 == "bh_stat_sub: invalid bufferhead state");
2526 void ObjectCacher::bh_set_state(BufferHead
*bh
, int s
)
2528 assert(lock
.is_locked());
2529 int state
= bh
->get_state();
2530 // move between lru lists?
2531 if (s
== BufferHead::STATE_DIRTY
&& state
!= BufferHead::STATE_DIRTY
) {
2532 bh_lru_rest
.lru_remove(bh
);
2533 bh_lru_dirty
.lru_insert_top(bh
);
2534 } else if (s
!= BufferHead::STATE_DIRTY
&&state
== BufferHead::STATE_DIRTY
) {
2535 bh_lru_dirty
.lru_remove(bh
);
2536 if (bh
->get_dontneed())
2537 bh_lru_rest
.lru_insert_bot(bh
);
2539 bh_lru_rest
.lru_insert_top(bh
);
2542 if ((s
== BufferHead::STATE_TX
||
2543 s
== BufferHead::STATE_DIRTY
) &&
2544 state
!= BufferHead::STATE_TX
&&
2545 state
!= BufferHead::STATE_DIRTY
) {
2546 dirty_or_tx_bh
.insert(bh
);
2547 } else if ((state
== BufferHead::STATE_TX
||
2548 state
== BufferHead::STATE_DIRTY
) &&
2549 s
!= BufferHead::STATE_TX
&&
2550 s
!= BufferHead::STATE_DIRTY
) {
2551 dirty_or_tx_bh
.erase(bh
);
2554 if (s
!= BufferHead::STATE_ERROR
&&
2555 state
== BufferHead::STATE_ERROR
) {
2565 void ObjectCacher::bh_add(Object
*ob
, BufferHead
*bh
)
2567 assert(lock
.is_locked());
2568 ldout(cct
, 30) << "bh_add " << *ob
<< " " << *bh
<< dendl
;
2570 if (bh
->is_dirty()) {
2571 bh_lru_dirty
.lru_insert_top(bh
);
2572 dirty_or_tx_bh
.insert(bh
);
2574 if (bh
->get_dontneed())
2575 bh_lru_rest
.lru_insert_bot(bh
);
2577 bh_lru_rest
.lru_insert_top(bh
);
2581 dirty_or_tx_bh
.insert(bh
);
2586 void ObjectCacher::bh_remove(Object
*ob
, BufferHead
*bh
)
2588 assert(lock
.is_locked());
2589 assert(bh
->get_journal_tid() == 0);
2590 ldout(cct
, 30) << "bh_remove " << *ob
<< " " << *bh
<< dendl
;
2592 if (bh
->is_dirty()) {
2593 bh_lru_dirty
.lru_remove(bh
);
2594 dirty_or_tx_bh
.erase(bh
);
2596 bh_lru_rest
.lru_remove(bh
);
2600 dirty_or_tx_bh
.erase(bh
);
2603 if (get_stat_dirty_waiting() > 0)