1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include "msg/Messenger.h"
7 #include "ObjectCacher.h"
8 #include "WritebackHandler.h"
9 #include "common/errno.h"
10 #include "common/perf_counters.h"
12 #include "include/assert.h"
14 #define MAX_FLUSH_UNDER_LOCK 20 ///< max bh's we start writeback on
15 #define BUFFER_MEMORY_WEIGHT 12 // memory usage of BufferHead, count in (1<<n)
17 using std::chrono::seconds
;
18 /// while holding the lock
20 /*** ObjectCacher::BufferHead ***/
23 /*** ObjectCacher::Object ***/
25 #define dout_subsys ceph_subsys_objectcacher
27 #define dout_prefix *_dout << "objectcacher.object(" << oid << ") "
31 class ObjectCacher::C_ReadFinish
: public Context
{
37 xlist
<C_ReadFinish
*>::item set_item
;
44 C_ReadFinish(ObjectCacher
*c
, Object
*ob
, ceph_tid_t t
, loff_t s
,
45 uint64_t l
, const ZTracer::Trace
&trace
) :
46 oc(c
), poolid(ob
->oloc
.pool
), oid(ob
->get_soid()), start(s
), length(l
),
47 set_item(this), trust_enoent(true),
48 tid(t
), trace(trace
) {
49 ob
->reads
.push_back(&set_item
);
52 void finish(int r
) override
{
53 oc
->bh_read_finish(poolid
, oid
, tid
, start
, length
, bl
, r
, trust_enoent
);
54 trace
.event("finish");
56 // object destructor clears the list
57 if (set_item
.is_on_list())
58 set_item
.remove_myself();
61 void distrust_enoent() {
66 class ObjectCacher::C_RetryRead
: public Context
{
73 C_RetryRead(ObjectCacher
*_oc
, OSDRead
*r
, ObjectSet
*os
, Context
*c
,
74 const ZTracer::Trace
&trace
)
75 : oc(_oc
), rd(r
), oset(os
), onfinish(c
), trace(trace
) {
77 void finish(int r
) override
{
79 r
= oc
->_readx(rd
, oset
, onfinish
, false, &trace
);
83 // read is still in-progress
87 trace
.event("finish");
89 onfinish
->complete(r
);
94 ObjectCacher::BufferHead
*ObjectCacher::Object::split(BufferHead
*left
,
97 assert(oc
->lock
.is_locked());
98 ldout(oc
->cct
, 20) << "split " << *left
<< " at " << off
<< dendl
;
101 ObjectCacher::BufferHead
*right
= new BufferHead(this);
103 //inherit and if later access, this auto clean.
104 right
->set_dontneed(left
->get_dontneed());
105 right
->set_nocache(left
->get_nocache());
107 right
->last_write_tid
= left
->last_write_tid
;
108 right
->last_read_tid
= left
->last_read_tid
;
109 right
->set_state(left
->get_state());
110 right
->snapc
= left
->snapc
;
111 right
->set_journal_tid(left
->journal_tid
);
113 loff_t newleftlen
= off
- left
->start();
114 right
->set_start(off
);
115 right
->set_length(left
->length() - newleftlen
);
118 oc
->bh_stat_sub(left
);
119 left
->set_length(newleftlen
);
120 oc
->bh_stat_add(left
);
123 oc
->bh_add(this, right
);
129 assert(bl
.length() == (left
->length() + right
->length()));
130 right
->bl
.substr_of(bl
, left
->length(), right
->length());
131 left
->bl
.substr_of(bl
, 0, left
->length());
135 if (!left
->waitfor_read
.empty()) {
136 map
<loff_t
, list
<Context
*> >::iterator start_remove
137 = left
->waitfor_read
.begin();
138 while (start_remove
!= left
->waitfor_read
.end() &&
139 start_remove
->first
< right
->start())
141 for (map
<loff_t
, list
<Context
*> >::iterator p
= start_remove
;
142 p
!= left
->waitfor_read
.end(); ++p
) {
143 ldout(oc
->cct
, 20) << "split moving waiters at byte " << p
->first
144 << " to right bh" << dendl
;
145 right
->waitfor_read
[p
->first
].swap( p
->second
);
146 assert(p
->second
.empty());
148 left
->waitfor_read
.erase(start_remove
, left
->waitfor_read
.end());
151 ldout(oc
->cct
, 20) << "split left is " << *left
<< dendl
;
152 ldout(oc
->cct
, 20) << "split right is " << *right
<< dendl
;
157 void ObjectCacher::Object::merge_left(BufferHead
*left
, BufferHead
*right
)
159 assert(oc
->lock
.is_locked());
161 ldout(oc
->cct
, 10) << "merge_left " << *left
<< " + " << *right
<< dendl
;
162 if (left
->get_journal_tid() == 0) {
163 left
->set_journal_tid(right
->get_journal_tid());
165 right
->set_journal_tid(0);
167 oc
->bh_remove(this, right
);
168 oc
->bh_stat_sub(left
);
169 left
->set_length(left
->length() + right
->length());
170 oc
->bh_stat_add(left
);
173 left
->bl
.claim_append(right
->bl
);
176 // note: this is sorta busted, but should only be used for dirty buffers
177 left
->last_write_tid
= MAX( left
->last_write_tid
, right
->last_write_tid
);
178 left
->last_write
= MAX( left
->last_write
, right
->last_write
);
180 left
->set_dontneed(right
->get_dontneed() ? left
->get_dontneed() : false);
181 left
->set_nocache(right
->get_nocache() ? left
->get_nocache() : false);
184 for (map
<loff_t
, list
<Context
*> >::iterator p
= right
->waitfor_read
.begin();
185 p
!= right
->waitfor_read
.end();
187 left
->waitfor_read
[p
->first
].splice(left
->waitfor_read
[p
->first
].begin(),
193 ldout(oc
->cct
, 10) << "merge_left result " << *left
<< dendl
;
196 bool ObjectCacher::Object::can_merge_bh(BufferHead
*left
, BufferHead
*right
)
198 if (left
->end() != right
->start() ||
199 left
->get_state() != right
->get_state() ||
200 !left
->can_merge_journal(right
))
202 if (left
->is_tx() && left
->last_write_tid
!= right
->last_write_tid
)
207 void ObjectCacher::Object::try_merge_bh(BufferHead
*bh
)
209 assert(oc
->lock
.is_locked());
210 ldout(oc
->cct
, 10) << "try_merge_bh " << *bh
<< dendl
;
212 // do not merge rx buffers; last_read_tid may not match
217 map
<loff_t
,BufferHead
*>::iterator p
= data
.find(bh
->start());
218 assert(p
->second
== bh
);
219 if (p
!= data
.begin()) {
221 if (can_merge_bh(p
->second
, bh
)) {
222 merge_left(p
->second
, bh
);
229 assert(p
->second
== bh
);
231 if (p
!= data
.end() && can_merge_bh(bh
, p
->second
))
232 merge_left(bh
, p
->second
);
236 * count bytes we have cached in given range
238 bool ObjectCacher::Object::is_cached(loff_t cur
, loff_t left
) const
240 assert(oc
->lock
.is_locked());
241 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(cur
);
246 if (p
->first
<= cur
) {
248 loff_t lenfromcur
= MIN(p
->second
->end() - cur
, left
);
253 } else if (p
->first
> cur
) {
264 * all cached data in this range[off, off+len]
266 bool ObjectCacher::Object::include_all_cached_data(loff_t off
, loff_t len
)
268 assert(oc
->lock
.is_locked());
271 map
<loff_t
, BufferHead
*>::iterator first
= data
.begin();
272 map
<loff_t
, BufferHead
*>::reverse_iterator last
= data
.rbegin();
273 if (first
->second
->start() >= off
&& last
->second
->end() <= (off
+ len
))
280 * map a range of bytes into buffer_heads.
281 * - create missing buffer_heads as necessary.
283 int ObjectCacher::Object::map_read(ObjectExtent
&ex
,
284 map
<loff_t
, BufferHead
*>& hits
,
285 map
<loff_t
, BufferHead
*>& missing
,
286 map
<loff_t
, BufferHead
*>& rx
,
287 map
<loff_t
, BufferHead
*>& errors
)
289 assert(oc
->lock
.is_locked());
290 ldout(oc
->cct
, 10) << "map_read " << ex
.oid
<< " "
291 << ex
.offset
<< "~" << ex
.length
<< dendl
;
293 loff_t cur
= ex
.offset
;
294 loff_t left
= ex
.length
;
296 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(ex
.offset
);
299 if (p
== data
.end()) {
301 BufferHead
*n
= new BufferHead(this);
308 ldout(oc
->cct
, 20) << "map_read miss+complete+zero " << left
<< " left, " << *n
<< dendl
;
311 ldout(oc
->cct
, 20) << "map_read miss " << left
<< " left, " << *n
<< dendl
;
314 assert(cur
== (loff_t
)ex
.offset
+ (loff_t
)ex
.length
);
318 if (p
->first
<= cur
) {
319 // have it (or part of it)
320 BufferHead
*e
= p
->second
;
326 hits
[cur
] = e
; // readable!
327 ldout(oc
->cct
, 20) << "map_read hit " << *e
<< dendl
;
328 } else if (e
->is_rx()) {
329 rx
[cur
] = e
; // missing, not readable.
330 ldout(oc
->cct
, 20) << "map_read rx " << *e
<< dendl
;
331 } else if (e
->is_error()) {
333 ldout(oc
->cct
, 20) << "map_read error " << *e
<< dendl
;
338 loff_t lenfromcur
= MIN(e
->end() - cur
, left
);
344 } else if (p
->first
> cur
) {
346 loff_t next
= p
->first
;
347 BufferHead
*n
= new BufferHead(this);
348 loff_t len
= MIN(next
- cur
, left
);
355 ldout(oc
->cct
, 20) << "map_read gap+complete+zero " << *n
<< dendl
;
358 ldout(oc
->cct
, 20) << "map_read gap " << *n
<< dendl
;
360 cur
+= MIN(left
, n
->length());
361 left
-= MIN(left
, n
->length());
370 void ObjectCacher::Object::audit_buffers()
373 for (map
<loff_t
, BufferHead
*>::const_iterator it
= data
.begin();
374 it
!= data
.end(); ++it
) {
375 if (it
->first
!= it
->second
->start()) {
376 lderr(oc
->cct
) << "AUDIT FAILURE: map position " << it
->first
377 << " does not match bh start position: "
378 << *it
->second
<< dendl
;
379 assert(it
->first
== it
->second
->start());
381 if (it
->first
< offset
) {
382 lderr(oc
->cct
) << "AUDIT FAILURE: " << it
->first
<< " " << *it
->second
383 << " overlaps with previous bh " << *((--it
)->second
)
385 assert(it
->first
>= offset
);
387 BufferHead
*bh
= it
->second
;
388 map
<loff_t
, list
<Context
*> >::const_iterator w_it
;
389 for (w_it
= bh
->waitfor_read
.begin();
390 w_it
!= bh
->waitfor_read
.end(); ++w_it
) {
391 if (w_it
->first
< bh
->start() ||
392 w_it
->first
>= bh
->start() + bh
->length()) {
393 lderr(oc
->cct
) << "AUDIT FAILURE: waiter at " << w_it
->first
394 << " is not within bh " << *bh
<< dendl
;
395 assert(w_it
->first
>= bh
->start());
396 assert(w_it
->first
< bh
->start() + bh
->length());
399 offset
= it
->first
+ it
->second
->length();
404 * map a range of extents on an object's buffer cache.
405 * - combine any bh's we're writing into one
406 * - break up bufferheads that don't fall completely within the range
407 * //no! - return a bh that includes the write. may also include
408 * other dirty data to left and/or right.
410 ObjectCacher::BufferHead
*ObjectCacher::Object::map_write(ObjectExtent
&ex
,
413 assert(oc
->lock
.is_locked());
414 BufferHead
*final
= 0;
416 ldout(oc
->cct
, 10) << "map_write oex " << ex
.oid
417 << " " << ex
.offset
<< "~" << ex
.length
<< dendl
;
419 loff_t cur
= ex
.offset
;
420 loff_t left
= ex
.length
;
422 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(ex
.offset
);
427 if (p
== data
.end()) {
429 final
= new BufferHead(this);
430 replace_journal_tid(final
, tid
);
431 final
->set_start( cur
);
432 final
->set_length( max
);
433 oc
->bh_add(this, final
);
434 ldout(oc
->cct
, 10) << "map_write adding trailing bh " << *final
<< dendl
;
436 oc
->bh_stat_sub(final
);
437 final
->set_length(final
->length() + max
);
438 oc
->bh_stat_add(final
);
445 ldout(oc
->cct
, 10) << "cur is " << cur
<< ", p is " << *p
->second
<< dendl
;
446 //oc->verify_stats();
448 if (p
->first
<= cur
) {
449 BufferHead
*bh
= p
->second
;
450 ldout(oc
->cct
, 10) << "map_write bh " << *bh
<< " intersected" << dendl
;
452 if (p
->first
< cur
) {
454 if (cur
+ max
>= bh
->end()) {
455 // we want right bit (one splice)
456 final
= split(bh
, cur
); // just split it, take right half.
457 replace_journal_tid(final
, tid
);
459 assert(p
->second
== final
);
461 // we want middle bit (two splices)
462 final
= split(bh
, cur
);
464 assert(p
->second
== final
);
465 split(final
, cur
+max
);
466 replace_journal_tid(final
, tid
);
469 assert(p
->first
== cur
);
470 if (bh
->length() <= max
) {
471 // whole bufferhead, piece of cake.
473 // we want left bit (one splice)
474 split(bh
, cur
+ max
); // just split
478 oc
->mark_dirty(final
);
479 --p
; // move iterator back to final
480 assert(p
->second
== final
);
481 replace_journal_tid(bh
, tid
);
482 merge_left(final
, bh
);
485 replace_journal_tid(final
, tid
);
490 loff_t lenfromcur
= final
->end() - cur
;
497 loff_t next
= p
->first
;
498 loff_t glen
= MIN(next
- cur
, max
);
499 ldout(oc
->cct
, 10) << "map_write gap " << cur
<< "~" << glen
<< dendl
;
501 oc
->bh_stat_sub(final
);
502 final
->set_length(final
->length() + glen
);
503 oc
->bh_stat_add(final
);
505 final
= new BufferHead(this);
506 replace_journal_tid(final
, tid
);
507 final
->set_start( cur
);
508 final
->set_length( glen
);
509 oc
->bh_add(this, final
);
520 assert(final
->get_journal_tid() == tid
);
521 ldout(oc
->cct
, 10) << "map_write final is " << *final
<< dendl
;
526 void ObjectCacher::Object::replace_journal_tid(BufferHead
*bh
,
528 ceph_tid_t bh_tid
= bh
->get_journal_tid();
530 assert(tid
== 0 || bh_tid
<= tid
);
531 if (bh_tid
!= 0 && bh_tid
!= tid
) {
532 // inform journal that it should not expect a writeback from this extent
533 oc
->writeback_handler
.overwrite_extent(get_oid(), bh
->start(),
534 bh
->length(), bh_tid
, tid
);
536 bh
->set_journal_tid(tid
);
539 void ObjectCacher::Object::truncate(loff_t s
)
541 assert(oc
->lock
.is_locked());
542 ldout(oc
->cct
, 10) << "truncate " << *this << " to " << s
<< dendl
;
544 while (!data
.empty()) {
545 BufferHead
*bh
= data
.rbegin()->second
;
549 // split bh at truncation point?
550 if (bh
->start() < s
) {
555 // remove bh entirely
556 assert(bh
->start() >= s
);
557 assert(bh
->waitfor_read
.empty());
558 replace_journal_tid(bh
, 0);
559 oc
->bh_remove(this, bh
);
564 void ObjectCacher::Object::discard(loff_t off
, loff_t len
)
566 assert(oc
->lock
.is_locked());
567 ldout(oc
->cct
, 10) << "discard " << *this << " " << off
<< "~" << len
571 ldout(oc
->cct
, 10) << " setting exists on " << *this << dendl
;
575 ldout(oc
->cct
, 10) << " clearing complete on " << *this << dendl
;
579 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(off
);
580 while (p
!= data
.end()) {
581 BufferHead
*bh
= p
->second
;
582 if (bh
->start() >= off
+ len
)
585 // split bh at truncation point?
586 if (bh
->start() < off
) {
592 assert(bh
->start() >= off
);
593 if (bh
->end() > off
+ len
) {
594 split(bh
, off
+ len
);
598 ldout(oc
->cct
, 10) << "discard " << *this << " bh " << *bh
<< dendl
;
599 assert(bh
->waitfor_read
.empty());
600 replace_journal_tid(bh
, 0);
601 oc
->bh_remove(this, bh
);
608 /*** ObjectCacher ***/
611 #define dout_prefix *_dout << "objectcacher "
614 ObjectCacher::ObjectCacher(CephContext
*cct_
, string name
,
615 WritebackHandler
& wb
, Mutex
& l
,
616 flush_set_callback_t flush_callback
,
617 void *flush_callback_arg
, uint64_t max_bytes
,
618 uint64_t max_objects
, uint64_t max_dirty
,
619 uint64_t target_dirty
, double max_dirty_age
,
620 bool block_writes_upfront
)
622 cct(cct_
), writeback_handler(wb
), name(name
), lock(l
),
623 max_dirty(max_dirty
), target_dirty(target_dirty
),
624 max_size(max_bytes
), max_objects(max_objects
),
625 max_dirty_age(ceph::make_timespan(max_dirty_age
)),
626 block_writes_upfront(block_writes_upfront
),
627 trace_endpoint("ObjectCacher"),
628 flush_set_callback(flush_callback
),
629 flush_set_callback_arg(flush_callback_arg
),
630 last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct
),
631 stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
632 stat_missing(0), stat_error(0), stat_dirty_waiting(0),
633 stat_nr_dirty_waiters(0), reads_outstanding(0)
637 scattered_write
= writeback_handler
.can_scattered_write();
640 ObjectCacher::~ObjectCacher()
644 // we should be empty.
645 for (vector
<ceph::unordered_map
<sobject_t
, Object
*> >::iterator i
650 assert(bh_lru_rest
.lru_get_size() == 0);
651 assert(bh_lru_dirty
.lru_get_size() == 0);
652 assert(ob_lru
.lru_get_size() == 0);
653 assert(dirty_or_tx_bh
.empty());
656 void ObjectCacher::perf_start()
658 string n
= "objectcacher-" + name
;
659 PerfCountersBuilder
plb(cct
, n
, l_objectcacher_first
, l_objectcacher_last
);
661 plb
.add_u64_counter(l_objectcacher_cache_ops_hit
,
662 "cache_ops_hit", "Hit operations");
663 plb
.add_u64_counter(l_objectcacher_cache_ops_miss
,
664 "cache_ops_miss", "Miss operations");
665 plb
.add_u64_counter(l_objectcacher_cache_bytes_hit
,
666 "cache_bytes_hit", "Hit data");
667 plb
.add_u64_counter(l_objectcacher_cache_bytes_miss
,
668 "cache_bytes_miss", "Miss data");
669 plb
.add_u64_counter(l_objectcacher_data_read
,
670 "data_read", "Read data");
671 plb
.add_u64_counter(l_objectcacher_data_written
,
672 "data_written", "Data written to cache");
673 plb
.add_u64_counter(l_objectcacher_data_flushed
,
674 "data_flushed", "Data flushed");
675 plb
.add_u64_counter(l_objectcacher_overwritten_in_flush
,
676 "data_overwritten_while_flushing",
677 "Data overwritten while flushing");
678 plb
.add_u64_counter(l_objectcacher_write_ops_blocked
, "write_ops_blocked",
679 "Write operations, delayed due to dirty limits");
680 plb
.add_u64_counter(l_objectcacher_write_bytes_blocked
,
681 "write_bytes_blocked",
682 "Write data blocked on dirty limit");
683 plb
.add_time(l_objectcacher_write_time_blocked
, "write_time_blocked",
684 "Time spent blocking a write due to dirty limits");
686 perfcounter
= plb
.create_perf_counters();
687 cct
->get_perfcounters_collection()->add(perfcounter
);
690 void ObjectCacher::perf_stop()
693 cct
->get_perfcounters_collection()->remove(perfcounter
);
698 ObjectCacher::Object
*ObjectCacher::get_object(sobject_t oid
,
702 uint64_t truncate_size
,
703 uint64_t truncate_seq
)
705 // XXX: Add handling of nspace in object_locator_t in cache
706 assert(lock
.is_locked());
708 if ((uint32_t)l
.pool
< objects
.size()) {
709 if (objects
[l
.pool
].count(oid
)) {
710 Object
*o
= objects
[l
.pool
][oid
];
711 o
->object_no
= object_no
;
712 o
->truncate_size
= truncate_size
;
713 o
->truncate_seq
= truncate_seq
;
717 objects
.resize(l
.pool
+1);
721 Object
*o
= new Object(this, oid
, object_no
, oset
, l
, truncate_size
,
723 objects
[l
.pool
][oid
] = o
;
724 ob_lru
.lru_insert_top(o
);
728 void ObjectCacher::close_object(Object
*ob
)
730 assert(lock
.is_locked());
731 ldout(cct
, 10) << "close_object " << *ob
<< dendl
;
732 assert(ob
->can_close());
735 ob_lru
.lru_remove(ob
);
736 objects
[ob
->oloc
.pool
].erase(ob
->get_soid());
737 ob
->set_item
.remove_myself();
741 void ObjectCacher::bh_read(BufferHead
*bh
, int op_flags
,
742 const ZTracer::Trace
&parent_trace
)
744 assert(lock
.is_locked());
745 ldout(cct
, 7) << "bh_read on " << *bh
<< " outstanding reads "
746 << reads_outstanding
<< dendl
;
748 ZTracer::Trace trace
;
749 if (parent_trace
.valid()) {
750 trace
.init("", &trace_endpoint
, &parent_trace
);
751 trace
.copy_name("bh_read " + bh
->ob
->get_oid().name
);
752 trace
.event("start");
756 bh
->last_read_tid
= ++last_read_tid
;
759 C_ReadFinish
*onfinish
= new C_ReadFinish(this, bh
->ob
, bh
->last_read_tid
,
760 bh
->start(), bh
->length(), trace
);
762 writeback_handler
.read(bh
->ob
->get_oid(), bh
->ob
->get_object_number(),
763 bh
->ob
->get_oloc(), bh
->start(), bh
->length(),
764 bh
->ob
->get_snap(), &onfinish
->bl
,
765 bh
->ob
->truncate_size
, bh
->ob
->truncate_seq
,
766 op_flags
, trace
, onfinish
);
771 void ObjectCacher::bh_read_finish(int64_t poolid
, sobject_t oid
,
772 ceph_tid_t tid
, loff_t start
,
773 uint64_t length
, bufferlist
&bl
, int r
,
776 assert(lock
.is_locked());
777 ldout(cct
, 7) << "bh_read_finish "
780 << " " << start
<< "~" << length
781 << " (bl is " << bl
.length() << ")"
783 << " outstanding reads " << reads_outstanding
786 if (r
>= 0 && bl
.length() < length
) {
787 ldout(cct
, 7) << "bh_read_finish " << oid
<< " padding " << start
<< "~"
788 << length
<< " with " << length
- bl
.length() << " bytes of zeroes"
790 bl
.append_zero(length
- bl
.length());
796 if (objects
[poolid
].count(oid
) == 0) {
797 ldout(cct
, 7) << "bh_read_finish no object cache" << dendl
;
799 Object
*ob
= objects
[poolid
][oid
];
801 if (r
== -ENOENT
&& !ob
->complete
) {
802 // wake up *all* rx waiters, or else we risk reordering
803 // identical reads. e.g.
805 // reply to unrelated 3~1 -> !exists
806 // read 1~1 -> immediate ENOENT
807 // reply to first 1~1 -> ooo ENOENT
809 for (map
<loff_t
, BufferHead
*>::iterator p
= ob
->data
.begin();
810 p
!= ob
->data
.end(); ++p
) {
811 BufferHead
*bh
= p
->second
;
812 for (map
<loff_t
, list
<Context
*> >::iterator p
813 = bh
->waitfor_read
.begin();
814 p
!= bh
->waitfor_read
.end();
816 ls
.splice(ls
.end(), p
->second
);
817 bh
->waitfor_read
.clear();
818 if (!bh
->is_zero() && !bh
->is_rx())
822 // just pass through and retry all waiters if we don't trust
823 // -ENOENT for this read
826 << "bh_read_finish ENOENT, marking complete and !exists on " << *ob
831 /* If all the bhs are effectively zero, get rid of them. All
832 * the waiters will be retried and get -ENOENT immediately, so
833 * it's safe to clean up the unneeded bh's now. Since we know
834 * it's safe to remove them now, do so, so they aren't hanging
835 *around waiting for more -ENOENTs from rados while the cache
836 * is being shut down.
838 * Only do this when all the bhs are rx or clean, to match the
839 * condition in _readx(). If there are any non-rx or non-clean
840 * bhs, _readx() will wait for the final result instead of
841 * returning -ENOENT immediately.
845 << "bh_read_finish ENOENT and allzero, getting rid of "
846 << "bhs for " << *ob
<< dendl
;
847 map
<loff_t
, BufferHead
*>::iterator p
= ob
->data
.begin();
848 while (p
!= ob
->data
.end()) {
849 BufferHead
*bh
= p
->second
;
850 // current iterator will be invalidated by bh_remove()
862 map
<loff_t
, BufferHead
*>::const_iterator p
= ob
->data_lower_bound(opos
);
863 if (p
== ob
->data
.end())
865 if (opos
>= start
+(loff_t
)length
) {
866 ldout(cct
, 20) << "break due to opos " << opos
<< " >= start+length "
867 << start
<< "+" << length
<< "=" << start
+(loff_t
)length
872 BufferHead
*bh
= p
->second
;
873 ldout(cct
, 20) << "checking bh " << *bh
<< dendl
;
876 for (map
<loff_t
, list
<Context
*> >::iterator it
877 = bh
->waitfor_read
.begin();
878 it
!= bh
->waitfor_read
.end();
880 ls
.splice(ls
.end(), it
->second
);
881 bh
->waitfor_read
.clear();
883 if (bh
->start() > opos
) {
884 ldout(cct
, 1) << "bh_read_finish skipping gap "
885 << opos
<< "~" << bh
->start() - opos
892 ldout(cct
, 10) << "bh_read_finish skipping non-rx " << *bh
<< dendl
;
897 if (bh
->last_read_tid
!= tid
) {
898 ldout(cct
, 10) << "bh_read_finish bh->last_read_tid "
899 << bh
->last_read_tid
<< " != tid " << tid
900 << ", skipping" << dendl
;
905 assert(opos
>= bh
->start());
906 assert(bh
->start() == opos
); // we don't merge rx bh's... yet!
907 assert(bh
->length() <= start
+(loff_t
)length
-opos
);
916 ldout(cct
, 10) << "bh_read_finish removing " << *bh
<< dendl
;
920 ldout(cct
, 10) << "skipping unstrusted -ENOENT and will retry for "
936 ldout(cct
, 10) << "bh_read_finish read " << *bh
<< dendl
;
938 ob
->try_merge_bh(bh
);
942 // called with lock held.
943 ldout(cct
, 20) << "finishing waiters " << ls
<< dendl
;
945 finish_contexts(cct
, ls
, err
);
946 retry_waiting_reads();
952 void ObjectCacher::bh_write_adjacencies(BufferHead
*bh
, ceph::real_time cutoff
,
953 int64_t *max_amount
, int *max_count
)
955 list
<BufferHead
*> blist
;
958 int64_t total_len
= 0;
959 set
<BufferHead
*, BufferHead::ptr_lt
>::iterator it
= dirty_or_tx_bh
.find(bh
);
960 assert(it
!= dirty_or_tx_bh
.end());
961 for (set
<BufferHead
*, BufferHead::ptr_lt
>::iterator p
= it
;
962 p
!= dirty_or_tx_bh
.end();
964 BufferHead
*obh
= *p
;
965 if (obh
->ob
!= bh
->ob
)
967 if (obh
->is_dirty() && obh
->last_write
<= cutoff
) {
968 blist
.push_back(obh
);
970 total_len
+= obh
->length();
971 if ((max_count
&& count
> *max_count
) ||
972 (max_amount
&& total_len
> *max_amount
))
977 while (it
!= dirty_or_tx_bh
.begin()) {
979 BufferHead
*obh
= *it
;
980 if (obh
->ob
!= bh
->ob
)
982 if (obh
->is_dirty() && obh
->last_write
<= cutoff
) {
983 blist
.push_front(obh
);
985 total_len
+= obh
->length();
986 if ((max_count
&& count
> *max_count
) ||
987 (max_amount
&& total_len
> *max_amount
))
994 *max_amount
-= total_len
;
996 bh_write_scattered(blist
);
999 class ObjectCacher::C_WriteCommit
: public Context
{
1003 vector
<pair
<loff_t
, uint64_t> > ranges
;
1004 ZTracer::Trace trace
;
1007 C_WriteCommit(ObjectCacher
*c
, int64_t _poolid
, sobject_t o
, loff_t s
,
1008 uint64_t l
, const ZTracer::Trace
&trace
) :
1009 oc(c
), poolid(_poolid
), oid(o
), trace(trace
) {
1010 ranges
.push_back(make_pair(s
, l
));
1012 C_WriteCommit(ObjectCacher
*c
, int64_t _poolid
, sobject_t o
,
1013 vector
<pair
<loff_t
, uint64_t> >& _ranges
) :
1014 oc(c
), poolid(_poolid
), oid(o
), tid(0) {
1015 ranges
.swap(_ranges
);
1017 void finish(int r
) override
{
1018 oc
->bh_write_commit(poolid
, oid
, ranges
, tid
, r
);
1019 trace
.event("finish");
1022 void ObjectCacher::bh_write_scattered(list
<BufferHead
*>& blist
)
1024 assert(lock
.is_locked());
1026 Object
*ob
= blist
.front()->ob
;
1029 ceph::real_time last_write
;
1031 vector
<pair
<loff_t
, uint64_t> > ranges
;
1032 vector
<pair
<uint64_t, bufferlist
> > io_vec
;
1034 ranges
.reserve(blist
.size());
1035 io_vec
.reserve(blist
.size());
1037 uint64_t total_len
= 0;
1038 for (list
<BufferHead
*>::iterator p
= blist
.begin(); p
!= blist
.end(); ++p
) {
1039 BufferHead
*bh
= *p
;
1040 ldout(cct
, 7) << "bh_write_scattered " << *bh
<< dendl
;
1041 assert(bh
->ob
== ob
);
1042 assert(bh
->bl
.length() == bh
->length());
1043 ranges
.push_back(pair
<loff_t
, uint64_t>(bh
->start(), bh
->length()));
1045 int n
= io_vec
.size();
1046 io_vec
.resize(n
+ 1);
1047 io_vec
[n
].first
= bh
->start();
1048 io_vec
[n
].second
= bh
->bl
;
1050 total_len
+= bh
->length();
1051 if (bh
->snapc
.seq
> snapc
.seq
)
1053 if (bh
->last_write
> last_write
)
1054 last_write
= bh
->last_write
;
1057 C_WriteCommit
*oncommit
= new C_WriteCommit(this, ob
->oloc
.pool
, ob
->get_soid(), ranges
);
1059 ceph_tid_t tid
= writeback_handler
.write(ob
->get_oid(), ob
->get_oloc(),
1060 io_vec
, snapc
, last_write
,
1061 ob
->truncate_size
, ob
->truncate_seq
,
1063 oncommit
->tid
= tid
;
1064 ob
->last_write_tid
= tid
;
1065 for (list
<BufferHead
*>::iterator p
= blist
.begin(); p
!= blist
.end(); ++p
) {
1066 BufferHead
*bh
= *p
;
1067 bh
->last_write_tid
= tid
;
1072 perfcounter
->inc(l_objectcacher_data_flushed
, total_len
);
1075 void ObjectCacher::bh_write(BufferHead
*bh
, const ZTracer::Trace
&parent_trace
)
1077 assert(lock
.is_locked());
1078 ldout(cct
, 7) << "bh_write " << *bh
<< dendl
;
1082 ZTracer::Trace trace
;
1083 if (parent_trace
.valid()) {
1084 trace
.init("", &trace_endpoint
, &parent_trace
);
1085 trace
.copy_name("bh_write " + bh
->ob
->get_oid().name
);
1086 trace
.event("start");
1090 C_WriteCommit
*oncommit
= new C_WriteCommit(this, bh
->ob
->oloc
.pool
,
1091 bh
->ob
->get_soid(), bh
->start(),
1092 bh
->length(), trace
);
1094 ceph_tid_t tid
= writeback_handler
.write(bh
->ob
->get_oid(),
1096 bh
->start(), bh
->length(),
1097 bh
->snapc
, bh
->bl
, bh
->last_write
,
1098 bh
->ob
->truncate_size
,
1099 bh
->ob
->truncate_seq
,
1100 bh
->journal_tid
, trace
, oncommit
);
1101 ldout(cct
, 20) << " tid " << tid
<< " on " << bh
->ob
->get_oid() << dendl
;
1103 // set bh last_write_tid
1104 oncommit
->tid
= tid
;
1105 bh
->ob
->last_write_tid
= tid
;
1106 bh
->last_write_tid
= tid
;
1109 perfcounter
->inc(l_objectcacher_data_flushed
, bh
->length());
1115 void ObjectCacher::bh_write_commit(int64_t poolid
, sobject_t oid
,
1116 vector
<pair
<loff_t
, uint64_t> >& ranges
,
1117 ceph_tid_t tid
, int r
)
1119 assert(lock
.is_locked());
1120 ldout(cct
, 7) << "bh_write_commit " << oid
<< " tid " << tid
1121 << " ranges " << ranges
<< " returned " << r
<< dendl
;
1123 if (objects
[poolid
].count(oid
) == 0) {
1124 ldout(cct
, 7) << "bh_write_commit no object cache" << dendl
;
1128 Object
*ob
= objects
[poolid
][oid
];
1129 int was_dirty_or_tx
= ob
->oset
->dirty_or_tx
;
1131 for (vector
<pair
<loff_t
, uint64_t> >::iterator p
= ranges
.begin();
1134 loff_t start
= p
->first
;
1135 uint64_t length
= p
->second
;
1137 ldout(cct
, 10) << "bh_write_commit marking exists on " << *ob
<< dendl
;
1140 if (writeback_handler
.may_copy_on_write(ob
->get_oid(), start
, length
,
1142 ldout(cct
, 10) << "bh_write_commit may copy on write, clearing "
1143 "complete on " << *ob
<< dendl
;
1144 ob
->complete
= false;
1148 vector
<pair
<loff_t
, BufferHead
*>> hit
;
1150 for (map
<loff_t
, BufferHead
*>::const_iterator p
= ob
->data_lower_bound(start
);
1151 p
!= ob
->data
.end();
1153 BufferHead
*bh
= p
->second
;
1155 if (bh
->start() >= start
+(loff_t
)length
)
1158 // make sure bh is tx
1160 ldout(cct
, 10) << "bh_write_commit skipping non-tx " << *bh
<< dendl
;
1164 // make sure bh tid matches
1165 if (bh
->last_write_tid
!= tid
) {
1166 assert(bh
->last_write_tid
> tid
);
1167 ldout(cct
, 10) << "bh_write_commit newer tid on " << *bh
<< dendl
;
1171 // we don't merge tx buffers. tx buffer should be within the range
1172 assert(bh
->start() >= start
);
1173 assert(bh
->end() <= start
+(loff_t
)length
);
1176 // ok! mark bh clean and error-free
1178 bh
->set_journal_tid(0);
1179 if (bh
->get_nocache())
1180 bh_lru_rest
.lru_bottouch(bh
);
1181 hit
.push_back(make_pair(bh
->start(), bh
));
1182 ldout(cct
, 10) << "bh_write_commit clean " << *bh
<< dendl
;
1185 ldout(cct
, 10) << "bh_write_commit marking dirty again due to error "
1186 << *bh
<< " r = " << r
<< " " << cpp_strerror(-r
)
1191 for (auto& p
: hit
) {
1192 //p.second maybe merged and deleted in merge_left
1193 if (ob
->data
.count(p
.first
))
1194 ob
->try_merge_bh(p
.second
);
1198 // update last_commit.
1199 assert(ob
->last_commit_tid
< tid
);
1200 ob
->last_commit_tid
= tid
;
1204 if (ob
->waitfor_commit
.count(tid
)) {
1205 ls
.splice(ls
.begin(), ob
->waitfor_commit
[tid
]);
1206 ob
->waitfor_commit
.erase(tid
);
1209 // is the entire object set now clean and fully committed?
1210 ObjectSet
*oset
= ob
->oset
;
1213 if (flush_set_callback
&&
1214 was_dirty_or_tx
> 0 &&
1215 oset
->dirty_or_tx
== 0) { // nothing dirty/tx
1216 flush_set_callback(flush_set_callback_arg
, oset
);
1220 finish_contexts(cct
, ls
, r
);
1223 void ObjectCacher::flush(ZTracer::Trace
*trace
, loff_t amount
)
1225 assert(trace
!= nullptr);
1226 assert(lock
.is_locked());
1227 ceph::real_time cutoff
= ceph::real_clock::now();
1229 ldout(cct
, 10) << "flush " << amount
<< dendl
;
1232 * NOTE: we aren't actually pulling things off the LRU here, just
1233 * looking at the tail item. Then we call bh_write, which moves it
1234 * to the other LRU, so that we can call
1235 * lru_dirty.lru_get_next_expire() again.
1237 int64_t left
= amount
;
1238 while (amount
== 0 || left
> 0) {
1239 BufferHead
*bh
= static_cast<BufferHead
*>(
1240 bh_lru_dirty
.lru_get_next_expire());
1242 if (bh
->last_write
> cutoff
) break;
1244 if (scattered_write
) {
1245 bh_write_adjacencies(bh
, cutoff
, amount
> 0 ? &left
: NULL
, NULL
);
1247 left
-= bh
->length();
1248 bh_write(bh
, *trace
);
1254 void ObjectCacher::trim()
1256 assert(lock
.is_locked());
1257 ldout(cct
, 10) << "trim start: bytes: max " << max_size
<< " clean "
1258 << get_stat_clean() << ", objects: max " << max_objects
1259 << " current " << ob_lru
.lru_get_size() << dendl
;
1261 uint64_t max_clean_bh
= max_size
>> BUFFER_MEMORY_WEIGHT
;
1262 uint64_t nr_clean_bh
= bh_lru_rest
.lru_get_size() - bh_lru_rest
.lru_get_num_pinned();
1263 while (get_stat_clean() > 0 &&
1264 ((uint64_t)get_stat_clean() > max_size
||
1265 nr_clean_bh
> max_clean_bh
)) {
1266 BufferHead
*bh
= static_cast<BufferHead
*>(bh_lru_rest
.lru_expire());
1270 ldout(cct
, 10) << "trim trimming " << *bh
<< dendl
;
1271 assert(bh
->is_clean() || bh
->is_zero() || bh
->is_error());
1273 Object
*ob
= bh
->ob
;
1280 ldout(cct
, 10) << "trim clearing complete on " << *ob
<< dendl
;
1281 ob
->complete
= false;
1285 while (ob_lru
.lru_get_size() > max_objects
) {
1286 Object
*ob
= static_cast<Object
*>(ob_lru
.lru_expire());
1290 ldout(cct
, 10) << "trim trimming " << *ob
<< dendl
;
1294 ldout(cct
, 10) << "trim finish: max " << max_size
<< " clean "
1295 << get_stat_clean() << ", objects: max " << max_objects
1296 << " current " << ob_lru
.lru_get_size() << dendl
;
1303 bool ObjectCacher::is_cached(ObjectSet
*oset
, vector
<ObjectExtent
>& extents
,
1306 assert(lock
.is_locked());
1307 for (vector
<ObjectExtent
>::iterator ex_it
= extents
.begin();
1308 ex_it
!= extents
.end();
1310 ldout(cct
, 10) << "is_cached " << *ex_it
<< dendl
;
1313 sobject_t
soid(ex_it
->oid
, snapid
);
1314 Object
*o
= get_object_maybe(soid
, ex_it
->oloc
);
1317 if (!o
->is_cached(ex_it
->offset
, ex_it
->length
))
1325 * returns # bytes read (if in cache). onfinish is untouched (caller
1327 * returns 0 if doing async read
1329 int ObjectCacher::readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
,
1330 ZTracer::Trace
*parent_trace
)
1332 ZTracer::Trace trace
;
1333 if (parent_trace
!= nullptr) {
1334 trace
.init("read", &trace_endpoint
, parent_trace
);
1335 trace
.event("start");
1338 int r
=_readx(rd
, oset
, onfinish
, true, &trace
);
1340 trace
.event("finish");
1345 int ObjectCacher::_readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
,
1346 bool external_call
, ZTracer::Trace
*trace
)
1348 assert(trace
!= nullptr);
1349 assert(lock
.is_locked());
1350 bool success
= true;
1352 uint64_t bytes_in_cache
= 0;
1353 uint64_t bytes_not_in_cache
= 0;
1354 uint64_t total_bytes_read
= 0;
1355 map
<uint64_t, bufferlist
> stripe_map
; // final buffer offset -> substring
1356 bool dontneed
= rd
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_DONTNEED
;
1357 bool nocache
= rd
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_NOCACHE
;
1360 * WARNING: we can only meaningfully return ENOENT if the read request
1361 * passed in a single ObjectExtent. Any caller who wants ENOENT instead of
1362 * zeroed buffers needs to feed single extents into readx().
1364 assert(!oset
->return_enoent
|| rd
->extents
.size() == 1);
1366 for (vector
<ObjectExtent
>::iterator ex_it
= rd
->extents
.begin();
1367 ex_it
!= rd
->extents
.end();
1369 ldout(cct
, 10) << "readx " << *ex_it
<< dendl
;
1371 total_bytes_read
+= ex_it
->length
;
1374 sobject_t
soid(ex_it
->oid
, rd
->snap
);
1375 Object
*o
= get_object(soid
, ex_it
->objectno
, oset
, ex_it
->oloc
,
1376 ex_it
->truncate_size
, oset
->truncate_seq
);
1380 // does not exist and no hits?
1381 if (oset
->return_enoent
&& !o
->exists
) {
1382 ldout(cct
, 10) << "readx object !exists, 1 extent..." << dendl
;
1384 // should we worry about COW underneath us?
1385 if (writeback_handler
.may_copy_on_write(soid
.oid
, ex_it
->offset
,
1386 ex_it
->length
, soid
.snap
)) {
1387 ldout(cct
, 20) << "readx may copy on write" << dendl
;
1389 list
<BufferHead
*> blist
;
1390 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= o
->data
.begin();
1391 bh_it
!= o
->data
.end();
1393 BufferHead
*bh
= bh_it
->second
;
1394 if (bh
->is_dirty() || bh
->is_tx()) {
1395 ldout(cct
, 10) << "readx flushing " << *bh
<< dendl
;
1397 if (bh
->is_dirty()) {
1398 if (scattered_write
)
1399 blist
.push_back(bh
);
1401 bh_write(bh
, *trace
);
1405 if (scattered_write
&& !blist
.empty())
1406 bh_write_scattered(blist
);
1408 ldout(cct
, 10) << "readx waiting on tid " << o
->last_write_tid
1409 << " on " << *o
<< dendl
;
1410 o
->waitfor_commit
[o
->last_write_tid
].push_back(
1411 new C_RetryRead(this,rd
, oset
, onfinish
, *trace
));
1412 // FIXME: perfcounter!
1417 // can we return ENOENT?
1418 bool allzero
= true;
1419 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= o
->data
.begin();
1420 bh_it
!= o
->data
.end();
1422 ldout(cct
, 20) << "readx ob has bh " << *bh_it
->second
<< dendl
;
1423 if (!bh_it
->second
->is_zero() && !bh_it
->second
->is_rx()) {
1429 ldout(cct
, 10) << "readx ob has all zero|rx, returning ENOENT"
1438 // map extent into bufferheads
1439 map
<loff_t
, BufferHead
*> hits
, missing
, rx
, errors
;
1440 o
->map_read(*ex_it
, hits
, missing
, rx
, errors
);
1441 if (external_call
) {
1442 // retry reading error buffers
1443 missing
.insert(errors
.begin(), errors
.end());
1445 // some reads had errors, fail later so completions
1446 // are cleaned up properly
1447 // TODO: make read path not call _readx for every completion
1448 hits
.insert(errors
.begin(), errors
.end());
1451 if (!missing
.empty() || !rx
.empty()) {
1453 map
<loff_t
, BufferHead
*>::iterator last
= missing
.end();
1454 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= missing
.begin();
1455 bh_it
!= missing
.end();
1457 uint64_t rx_bytes
= static_cast<uint64_t>(
1458 stat_rx
+ bh_it
->second
->length());
1459 bytes_not_in_cache
+= bh_it
->second
->length();
1460 if (!waitfor_read
.empty() || (stat_rx
> 0 && rx_bytes
> max_size
)) {
1461 // cache is full with concurrent reads -- wait for rx's to complete
1462 // to constrain memory growth (especially during copy-ups)
1464 ldout(cct
, 10) << "readx missed, waiting on cache to complete "
1465 << waitfor_read
.size() << " blocked reads, "
1466 << (MAX(rx_bytes
, max_size
) - max_size
)
1467 << " read bytes" << dendl
;
1468 waitfor_read
.push_back(new C_RetryRead(this, rd
, oset
, onfinish
,
1472 bh_remove(o
, bh_it
->second
);
1473 delete bh_it
->second
;
1475 bh_it
->second
->set_nocache(nocache
);
1476 bh_read(bh_it
->second
, rd
->fadvise_flags
, *trace
);
1477 if ((success
&& onfinish
) || last
!= missing
.end())
1483 //add wait in last bh avoid wakeup early. Because read is order
1484 if (last
!= missing
.end()) {
1485 ldout(cct
, 10) << "readx missed, waiting on " << *last
->second
1486 << " off " << last
->first
<< dendl
;
1487 last
->second
->waitfor_read
[last
->first
].push_back(
1488 new C_RetryRead(this, rd
, oset
, onfinish
, *trace
) );
1493 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= rx
.begin();
1496 touch_bh(bh_it
->second
); // bump in lru, so we don't lose it.
1497 if (success
&& onfinish
) {
1498 ldout(cct
, 10) << "readx missed, waiting on " << *bh_it
->second
1499 << " off " << bh_it
->first
<< dendl
;
1500 bh_it
->second
->waitfor_read
[bh_it
->first
].push_back(
1501 new C_RetryRead(this, rd
, oset
, onfinish
, *trace
) );
1503 bytes_not_in_cache
+= bh_it
->second
->length();
1507 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= hits
.begin();
1508 bh_it
!= hits
.end(); ++bh_it
)
1509 //bump in lru, so we don't lose it when later read
1510 touch_bh(bh_it
->second
);
1513 assert(!hits
.empty());
1515 // make a plain list
1516 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= hits
.begin();
1517 bh_it
!= hits
.end();
1519 BufferHead
*bh
= bh_it
->second
;
1520 ldout(cct
, 10) << "readx hit bh " << *bh
<< dendl
;
1521 if (bh
->is_error() && bh
->error
)
1523 bytes_in_cache
+= bh
->length();
1525 if (bh
->get_nocache() && bh
->is_clean())
1526 bh_lru_rest
.lru_bottouch(bh
);
1529 //must be after touch_bh because touch_bh set dontneed false
1531 ((loff_t
)ex_it
->offset
<= bh
->start() &&
1532 (bh
->end() <=(loff_t
)(ex_it
->offset
+ ex_it
->length
)))) {
1533 bh
->set_dontneed(true); //if dirty
1535 bh_lru_rest
.lru_bottouch(bh
);
1540 // create reverse map of buffer offset -> object for the
1541 // eventual result. this is over a single ObjectExtent, so we
1543 // - the bh's are contiguous
1544 // - the buffer frags need not be (and almost certainly aren't)
1545 loff_t opos
= ex_it
->offset
;
1546 map
<loff_t
, BufferHead
*>::iterator bh_it
= hits
.begin();
1547 assert(bh_it
->second
->start() <= opos
);
1548 uint64_t bhoff
= opos
- bh_it
->second
->start();
1549 vector
<pair
<uint64_t,uint64_t> >::iterator f_it
1550 = ex_it
->buffer_extents
.begin();
1553 BufferHead
*bh
= bh_it
->second
;
1554 assert(opos
== (loff_t
)(bh
->start() + bhoff
));
1556 uint64_t len
= MIN(f_it
->second
- foff
, bh
->length() - bhoff
);
1557 ldout(cct
, 10) << "readx rmap opos " << opos
<< ": " << *bh
<< " +"
1558 << bhoff
<< " frag " << f_it
->first
<< "~"
1559 << f_it
->second
<< " +" << foff
<< "~" << len
1563 // put substr here first, since substr_of clobbers, and we
1564 // may get multiple bh's at this stripe_map position
1565 if (bh
->is_zero()) {
1566 stripe_map
[f_it
->first
].append_zero(len
);
1568 bit
.substr_of(bh
->bl
,
1571 stripe_map
[f_it
->first
].claim_append(bit
);
1577 if (opos
== bh
->end()) {
1581 if (foff
== f_it
->second
) {
1585 if (bh_it
== hits
.end()) break;
1586 if (f_it
== ex_it
->buffer_extents
.end())
1589 assert(f_it
== ex_it
->buffer_extents
.end());
1590 assert(opos
== (loff_t
)ex_it
->offset
+ (loff_t
)ex_it
->length
);
1593 if (dontneed
&& o
->include_all_cached_data(ex_it
->offset
, ex_it
->length
))
1599 if (perfcounter
&& external_call
) {
1600 perfcounter
->inc(l_objectcacher_data_read
, total_bytes_read
);
1601 perfcounter
->inc(l_objectcacher_cache_bytes_miss
, bytes_not_in_cache
);
1602 perfcounter
->inc(l_objectcacher_cache_ops_miss
);
1605 ldout(cct
, 20) << "readx defer " << rd
<< dendl
;
1607 ldout(cct
, 20) << "readx drop " << rd
<< " (no complete, but no waiter)"
1613 if (perfcounter
&& external_call
) {
1614 perfcounter
->inc(l_objectcacher_data_read
, total_bytes_read
);
1615 perfcounter
->inc(l_objectcacher_cache_bytes_hit
, bytes_in_cache
);
1616 perfcounter
->inc(l_objectcacher_cache_ops_hit
);
1619 // no misses... success! do the read.
1620 ldout(cct
, 10) << "readx has all buffers" << dendl
;
1622 // ok, assemble into result buffer.
1624 if (rd
->bl
&& !error
) {
1626 for (map
<uint64_t,bufferlist
>::iterator i
= stripe_map
.begin();
1627 i
!= stripe_map
.end();
1629 assert(pos
== i
->first
);
1630 ldout(cct
, 10) << "readx adding buffer len " << i
->second
.length()
1631 << " at " << pos
<< dendl
;
1632 pos
+= i
->second
.length();
1633 rd
->bl
->claim_append(i
->second
);
1634 assert(rd
->bl
->length() == pos
);
1636 ldout(cct
, 10) << "readx result is " << rd
->bl
->length() << dendl
;
1637 } else if (!error
) {
1638 ldout(cct
, 10) << "readx no bufferlist ptr (readahead?), done." << dendl
;
1639 map
<uint64_t,bufferlist
>::reverse_iterator i
= stripe_map
.rbegin();
1640 pos
= i
->first
+ i
->second
.length();
1644 int ret
= error
? error
: pos
;
1645 ldout(cct
, 20) << "readx done " << rd
<< " " << ret
<< dendl
;
1646 assert(pos
<= (uint64_t) INT_MAX
);
1655 void ObjectCacher::retry_waiting_reads()
1658 ls
.swap(waitfor_read
);
1660 while (!ls
.empty() && waitfor_read
.empty()) {
1661 Context
*ctx
= ls
.front();
1665 waitfor_read
.splice(waitfor_read
.end(), ls
);
1668 int ObjectCacher::writex(OSDWrite
*wr
, ObjectSet
*oset
, Context
*onfreespace
,
1669 ZTracer::Trace
*parent_trace
)
1671 assert(lock
.is_locked());
1672 ceph::real_time now
= ceph::real_clock::now();
1673 uint64_t bytes_written
= 0;
1674 uint64_t bytes_written_in_flush
= 0;
1675 bool dontneed
= wr
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_DONTNEED
;
1676 bool nocache
= wr
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_NOCACHE
;
1678 ZTracer::Trace trace
;
1679 if (parent_trace
!= nullptr) {
1680 trace
.init("write", &trace_endpoint
, parent_trace
);
1681 trace
.event("start");
1684 for (vector
<ObjectExtent
>::iterator ex_it
= wr
->extents
.begin();
1685 ex_it
!= wr
->extents
.end();
1688 sobject_t
soid(ex_it
->oid
, CEPH_NOSNAP
);
1689 Object
*o
= get_object(soid
, ex_it
->objectno
, oset
, ex_it
->oloc
,
1690 ex_it
->truncate_size
, oset
->truncate_seq
);
1692 // map it all into a single bufferhead.
1693 BufferHead
*bh
= o
->map_write(*ex_it
, wr
->journal_tid
);
1694 bool missing
= bh
->is_missing();
1695 bh
->snapc
= wr
->snapc
;
1697 bytes_written
+= ex_it
->length
;
1699 bytes_written_in_flush
+= ex_it
->length
;
1702 // adjust buffer pointers (ie "copy" data into my cache)
1703 // this is over a single ObjectExtent, so we know that
1704 // - there is one contiguous bh
1705 // - the buffer frags need not be (and almost certainly aren't)
1706 // note: i assume striping is monotonic... no jumps backwards, ever!
1707 loff_t opos
= ex_it
->offset
;
1708 for (vector
<pair
<uint64_t, uint64_t> >::iterator f_it
1709 = ex_it
->buffer_extents
.begin();
1710 f_it
!= ex_it
->buffer_extents
.end();
1712 ldout(cct
, 10) << "writex writing " << f_it
->first
<< "~"
1713 << f_it
->second
<< " into " << *bh
<< " at " << opos
1715 uint64_t bhoff
= bh
->start() - opos
;
1716 assert(f_it
->second
<= bh
->length() - bhoff
);
1718 // get the frag we're mapping in
1720 frag
.substr_of(wr
->bl
,
1721 f_it
->first
, f_it
->second
);
1723 // keep anything left of bhoff
1726 newbl
.substr_of(bh
->bl
, 0, bhoff
);
1727 newbl
.claim_append(frag
);
1730 opos
+= f_it
->second
;
1733 // ok, now bh is dirty.
1736 bh
->set_dontneed(true);
1737 else if (nocache
&& missing
)
1738 bh
->set_nocache(true);
1742 bh
->last_write
= now
;
1744 o
->try_merge_bh(bh
);
1748 perfcounter
->inc(l_objectcacher_data_written
, bytes_written
);
1749 if (bytes_written_in_flush
) {
1750 perfcounter
->inc(l_objectcacher_overwritten_in_flush
,
1751 bytes_written_in_flush
);
1755 int r
= _wait_for_write(wr
, bytes_written
, oset
, &trace
, onfreespace
);
1763 class ObjectCacher::C_WaitForWrite
: public Context
{
1765 C_WaitForWrite(ObjectCacher
*oc
, uint64_t len
,
1766 const ZTracer::Trace
&trace
, Context
*onfinish
) :
1767 m_oc(oc
), m_len(len
), m_trace(trace
), m_onfinish(onfinish
) {}
1768 void finish(int r
) override
;
1772 ZTracer::Trace m_trace
;
1773 Context
*m_onfinish
;
1776 void ObjectCacher::C_WaitForWrite::finish(int r
)
1778 Mutex::Locker
l(m_oc
->lock
);
1779 m_oc
->maybe_wait_for_writeback(m_len
, &m_trace
);
1780 m_onfinish
->complete(r
);
1783 void ObjectCacher::maybe_wait_for_writeback(uint64_t len
,
1784 ZTracer::Trace
*trace
)
1786 assert(lock
.is_locked());
1787 ceph::mono_time start
= ceph::mono_clock::now();
1789 // wait for writeback?
1790 // - wait for dirty and tx bytes (relative to the max_dirty threshold)
1791 // - do not wait for bytes other waiters are waiting on. this means that
1792 // threads do not wait for each other. this effectively allows the cache
1793 // size to balloon proportional to the data that is in flight.
1795 uint64_t max_dirty_bh
= max_dirty
>> BUFFER_MEMORY_WEIGHT
;
1796 while (get_stat_dirty() + get_stat_tx() > 0 &&
1797 (((uint64_t)(get_stat_dirty() + get_stat_tx()) >=
1798 max_dirty
+ get_stat_dirty_waiting()) ||
1799 (dirty_or_tx_bh
.size() >=
1800 max_dirty_bh
+ get_stat_nr_dirty_waiters()))) {
1803 trace
->event("start wait for writeback");
1805 ldout(cct
, 10) << __func__
<< " waiting for dirty|tx "
1806 << (get_stat_dirty() + get_stat_tx()) << " >= max "
1807 << max_dirty
<< " + dirty_waiting "
1808 << get_stat_dirty_waiting() << dendl
;
1809 flusher_cond
.Signal();
1810 stat_dirty_waiting
+= len
;
1811 ++stat_nr_dirty_waiters
;
1812 stat_cond
.Wait(lock
);
1813 stat_dirty_waiting
-= len
;
1814 --stat_nr_dirty_waiters
;
1816 ldout(cct
, 10) << __func__
<< " woke up" << dendl
;
1819 trace
->event("finish wait for writeback");
1821 if (blocked
&& perfcounter
) {
1822 perfcounter
->inc(l_objectcacher_write_ops_blocked
);
1823 perfcounter
->inc(l_objectcacher_write_bytes_blocked
, len
);
1824 ceph::timespan blocked
= ceph::mono_clock::now() - start
;
1825 perfcounter
->tinc(l_objectcacher_write_time_blocked
, blocked
);
1829 // blocking wait for write.
1830 int ObjectCacher::_wait_for_write(OSDWrite
*wr
, uint64_t len
, ObjectSet
*oset
,
1831 ZTracer::Trace
*trace
, Context
*onfreespace
)
1833 assert(lock
.is_locked());
1834 assert(trace
!= nullptr);
1837 if (max_dirty
> 0) {
1838 if (block_writes_upfront
) {
1839 maybe_wait_for_writeback(len
, trace
);
1841 onfreespace
->complete(0);
1843 assert(onfreespace
);
1844 finisher
.queue(new C_WaitForWrite(this, len
, *trace
, onfreespace
));
1847 // write-thru! flush what we just wrote.
1850 Context
*fin
= block_writes_upfront
?
1851 new C_Cond(&cond
, &done
, &ret
) : onfreespace
;
1853 bool flushed
= flush_set(oset
, wr
->extents
, trace
, fin
);
1854 assert(!flushed
); // we just dirtied it, and didn't drop our lock!
1855 ldout(cct
, 10) << "wait_for_write waiting on write-thru of " << len
1856 << " bytes" << dendl
;
1857 if (block_writes_upfront
) {
1860 ldout(cct
, 10) << "wait_for_write woke up, ret " << ret
<< dendl
;
1862 onfreespace
->complete(ret
);
1866 // start writeback anyway?
1867 if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty
) {
1868 ldout(cct
, 10) << "wait_for_write " << get_stat_dirty() << " > target "
1869 << target_dirty
<< ", nudging flusher" << dendl
;
1870 flusher_cond
.Signal();
1875 void ObjectCacher::flusher_entry()
1877 ldout(cct
, 10) << "flusher start" << dendl
;
1879 while (!flusher_stop
) {
1880 loff_t all
= get_stat_tx() + get_stat_rx() + get_stat_clean() +
1882 ldout(cct
, 11) << "flusher "
1883 << all
<< " / " << max_size
<< ": "
1884 << get_stat_tx() << " tx, "
1885 << get_stat_rx() << " rx, "
1886 << get_stat_clean() << " clean, "
1887 << get_stat_dirty() << " dirty ("
1888 << target_dirty
<< " target, "
1889 << max_dirty
<< " max)"
1891 loff_t actual
= get_stat_dirty() + get_stat_dirty_waiting();
1893 ZTracer::Trace trace
;
1894 if (cct
->_conf
->osdc_blkin_trace_all
) {
1895 trace
.init("flusher", &trace_endpoint
);
1896 trace
.event("start");
1899 if (actual
> 0 && (uint64_t) actual
> target_dirty
) {
1900 // flush some dirty pages
1901 ldout(cct
, 10) << "flusher " << get_stat_dirty() << " dirty + "
1902 << get_stat_dirty_waiting() << " dirty_waiting > target "
1903 << target_dirty
<< ", flushing some dirty bhs" << dendl
;
1904 flush(&trace
, actual
- target_dirty
);
1906 // check tail of lru for old dirty items
1907 ceph::real_time cutoff
= ceph::real_clock::now();
1908 cutoff
-= max_dirty_age
;
1910 int max
= MAX_FLUSH_UNDER_LOCK
;
1911 while ((bh
= static_cast<BufferHead
*>(bh_lru_dirty
.
1912 lru_get_next_expire())) != 0 &&
1913 bh
->last_write
<= cutoff
&&
1915 ldout(cct
, 10) << "flusher flushing aged dirty bh " << *bh
<< dendl
;
1916 if (scattered_write
) {
1917 bh_write_adjacencies(bh
, cutoff
, NULL
, &max
);
1919 bh_write(bh
, trace
);
1924 // back off the lock to avoid starving other threads
1925 trace
.event("backoff");
1932 trace
.event("finish");
1936 flusher_cond
.WaitInterval(lock
, seconds(1));
1939 /* Wait for reads to finish. This is only possible if handling
1940 * -ENOENT made some read completions finish before their rados read
1941 * came back. If we don't wait for them, and destroy the cache, when
1942 * the rados reads do come back their callback will try to access the
1943 * no-longer-valid ObjectCacher.
1945 while (reads_outstanding
> 0) {
1946 ldout(cct
, 10) << "Waiting for all reads to complete. Number left: "
1947 << reads_outstanding
<< dendl
;
1948 read_cond
.Wait(lock
);
1952 ldout(cct
, 10) << "flusher finish" << dendl
;
1956 // -------------------------------------------------
1958 bool ObjectCacher::set_is_empty(ObjectSet
*oset
)
1960 assert(lock
.is_locked());
1961 if (oset
->objects
.empty())
1964 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin(); !p
.end(); ++p
)
1965 if (!(*p
)->is_empty())
1971 bool ObjectCacher::set_is_cached(ObjectSet
*oset
)
1973 assert(lock
.is_locked());
1974 if (oset
->objects
.empty())
1977 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin();
1980 for (map
<loff_t
,BufferHead
*>::iterator q
= ob
->data
.begin();
1981 q
!= ob
->data
.end();
1983 BufferHead
*bh
= q
->second
;
1984 if (!bh
->is_dirty() && !bh
->is_tx())
1992 bool ObjectCacher::set_is_dirty_or_committing(ObjectSet
*oset
)
1994 assert(lock
.is_locked());
1995 if (oset
->objects
.empty())
1998 for (xlist
<Object
*>::iterator i
= oset
->objects
.begin();
2002 for (map
<loff_t
,BufferHead
*>::iterator p
= ob
->data
.begin();
2003 p
!= ob
->data
.end();
2005 BufferHead
*bh
= p
->second
;
2006 if (bh
->is_dirty() || bh
->is_tx())
2015 // purge. non-blocking. violently removes dirty buffers from cache.
2016 void ObjectCacher::purge(Object
*ob
)
2018 assert(lock
.is_locked());
2019 ldout(cct
, 10) << "purge " << *ob
<< dendl
;
2025 // flush. non-blocking. no callback.
2026 // true if clean, already flushed.
2027 // false if we wrote something.
2028 // be sloppy about the ranges and flush any buffer it touches
2029 bool ObjectCacher::flush(Object
*ob
, loff_t offset
, loff_t length
,
2030 ZTracer::Trace
*trace
)
2032 assert(trace
!= nullptr);
2033 assert(lock
.is_locked());
2034 list
<BufferHead
*> blist
;
2036 ldout(cct
, 10) << "flush " << *ob
<< " " << offset
<< "~" << length
<< dendl
;
2037 for (map
<loff_t
,BufferHead
*>::const_iterator p
= ob
->data_lower_bound(offset
);
2038 p
!= ob
->data
.end();
2040 BufferHead
*bh
= p
->second
;
2041 ldout(cct
, 20) << "flush " << *bh
<< dendl
;
2042 if (length
&& bh
->start() > offset
+length
) {
2049 if (!bh
->is_dirty()) {
2053 if (scattered_write
)
2054 blist
.push_back(bh
);
2056 bh_write(bh
, *trace
);
2059 if (scattered_write
&& !blist
.empty())
2060 bh_write_scattered(blist
);
2065 bool ObjectCacher::_flush_set_finish(C_GatherBuilder
*gather
,
2068 assert(lock
.is_locked());
2069 if (gather
->has_subs()) {
2070 gather
->set_finisher(onfinish
);
2075 ldout(cct
, 10) << "flush_set has no dirty|tx bhs" << dendl
;
2076 onfinish
->complete(0);
2080 // flush. non-blocking, takes callback.
2081 // returns true if already flushed
2082 bool ObjectCacher::flush_set(ObjectSet
*oset
, Context
*onfinish
)
2084 assert(lock
.is_locked());
2085 assert(onfinish
!= NULL
);
2086 if (oset
->objects
.empty()) {
2087 ldout(cct
, 10) << "flush_set on " << oset
<< " dne" << dendl
;
2088 onfinish
->complete(0);
2092 ldout(cct
, 10) << "flush_set " << oset
<< dendl
;
2094 // we'll need to wait for all objects to flush!
2095 C_GatherBuilder
gather(cct
);
2096 set
<Object
*> waitfor_commit
;
2098 list
<BufferHead
*> blist
;
2099 Object
*last_ob
= NULL
;
2100 set
<BufferHead
*, BufferHead::ptr_lt
>::const_iterator it
, p
, q
;
2102 // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
2103 // order. But items in oset->objects are not sorted. So the iterator can
2104 // point to any buffer head in the ObjectSet
2105 BufferHead
key(*oset
->objects
.begin());
2106 it
= dirty_or_tx_bh
.lower_bound(&key
);
2109 bool backwards
= true;
2110 if (it
!= dirty_or_tx_bh
.begin())
2115 for (; p
!= dirty_or_tx_bh
.end(); p
= q
) {
2117 BufferHead
*bh
= *p
;
2118 if (bh
->ob
->oset
!= oset
)
2120 waitfor_commit
.insert(bh
->ob
);
2121 if (bh
->is_dirty()) {
2122 if (scattered_write
) {
2123 if (last_ob
!= bh
->ob
) {
2124 if (!blist
.empty()) {
2125 bh_write_scattered(blist
);
2130 blist
.push_back(bh
);
2138 for(p
= q
= it
; true; p
= q
) {
2139 if (q
!= dirty_or_tx_bh
.begin())
2143 BufferHead
*bh
= *p
;
2144 if (bh
->ob
->oset
!= oset
)
2146 waitfor_commit
.insert(bh
->ob
);
2147 if (bh
->is_dirty()) {
2148 if (scattered_write
) {
2149 if (last_ob
!= bh
->ob
) {
2150 if (!blist
.empty()) {
2151 bh_write_scattered(blist
);
2156 blist
.push_front(bh
);
2166 if (scattered_write
&& !blist
.empty())
2167 bh_write_scattered(blist
);
2169 for (set
<Object
*>::iterator i
= waitfor_commit
.begin();
2170 i
!= waitfor_commit
.end(); ++i
) {
2173 // we'll need to gather...
2174 ldout(cct
, 10) << "flush_set " << oset
<< " will wait for ack tid "
2175 << ob
->last_write_tid
<< " on " << *ob
<< dendl
;
2176 ob
->waitfor_commit
[ob
->last_write_tid
].push_back(gather
.new_sub());
2179 return _flush_set_finish(&gather
, onfinish
);
2182 // flush. non-blocking, takes callback.
2183 // returns true if already flushed
2184 bool ObjectCacher::flush_set(ObjectSet
*oset
, vector
<ObjectExtent
>& exv
,
2185 ZTracer::Trace
*trace
, Context
*onfinish
)
2187 assert(lock
.is_locked());
2188 assert(trace
!= nullptr);
2189 assert(onfinish
!= NULL
);
2190 if (oset
->objects
.empty()) {
2191 ldout(cct
, 10) << "flush_set on " << oset
<< " dne" << dendl
;
2192 onfinish
->complete(0);
2196 ldout(cct
, 10) << "flush_set " << oset
<< " on " << exv
.size()
2197 << " ObjectExtents" << dendl
;
2199 // we'll need to wait for all objects to flush!
2200 C_GatherBuilder
gather(cct
);
2202 for (vector
<ObjectExtent
>::iterator p
= exv
.begin();
2205 ObjectExtent
&ex
= *p
;
2206 sobject_t
soid(ex
.oid
, CEPH_NOSNAP
);
2207 if (objects
[oset
->poolid
].count(soid
) == 0)
2209 Object
*ob
= objects
[oset
->poolid
][soid
];
2211 ldout(cct
, 20) << "flush_set " << oset
<< " ex " << ex
<< " ob " << soid
2212 << " " << ob
<< dendl
;
2214 if (!flush(ob
, ex
.offset
, ex
.length
, trace
)) {
2215 // we'll need to gather...
2216 ldout(cct
, 10) << "flush_set " << oset
<< " will wait for ack tid "
2217 << ob
->last_write_tid
<< " on " << *ob
<< dendl
;
2218 ob
->waitfor_commit
[ob
->last_write_tid
].push_back(gather
.new_sub());
2222 return _flush_set_finish(&gather
, onfinish
);
2225 // flush all dirty data. non-blocking, takes callback.
2226 // returns true if already flushed
2227 bool ObjectCacher::flush_all(Context
*onfinish
)
2229 assert(lock
.is_locked());
2230 assert(onfinish
!= NULL
);
2232 ldout(cct
, 10) << "flush_all " << dendl
;
2234 // we'll need to wait for all objects to flush!
2235 C_GatherBuilder
gather(cct
);
2236 set
<Object
*> waitfor_commit
;
2238 list
<BufferHead
*> blist
;
2239 Object
*last_ob
= NULL
;
2240 set
<BufferHead
*, BufferHead::ptr_lt
>::iterator next
, it
;
2241 next
= it
= dirty_or_tx_bh
.begin();
2242 while (it
!= dirty_or_tx_bh
.end()) {
2244 BufferHead
*bh
= *it
;
2245 waitfor_commit
.insert(bh
->ob
);
2247 if (bh
->is_dirty()) {
2248 if (scattered_write
) {
2249 if (last_ob
!= bh
->ob
) {
2250 if (!blist
.empty()) {
2251 bh_write_scattered(blist
);
2256 blist
.push_back(bh
);
2265 if (scattered_write
&& !blist
.empty())
2266 bh_write_scattered(blist
);
2268 for (set
<Object
*>::iterator i
= waitfor_commit
.begin();
2269 i
!= waitfor_commit
.end();
2273 // we'll need to gather...
2274 ldout(cct
, 10) << "flush_all will wait for ack tid "
2275 << ob
->last_write_tid
<< " on " << *ob
<< dendl
;
2276 ob
->waitfor_commit
[ob
->last_write_tid
].push_back(gather
.new_sub());
2279 return _flush_set_finish(&gather
, onfinish
);
2282 void ObjectCacher::purge_set(ObjectSet
*oset
)
2284 assert(lock
.is_locked());
2285 if (oset
->objects
.empty()) {
2286 ldout(cct
, 10) << "purge_set on " << oset
<< " dne" << dendl
;
2290 ldout(cct
, 10) << "purge_set " << oset
<< dendl
;
2291 const bool were_dirty
= oset
->dirty_or_tx
> 0;
2293 for (xlist
<Object
*>::iterator i
= oset
->objects
.begin();
2299 // Although we have purged rather than flushed, caller should still
2300 // drop any resources associate with dirty data.
2301 assert(oset
->dirty_or_tx
== 0);
2302 if (flush_set_callback
&& were_dirty
) {
2303 flush_set_callback(flush_set_callback_arg
, oset
);
2308 loff_t
ObjectCacher::release(Object
*ob
)
2310 assert(lock
.is_locked());
2311 list
<BufferHead
*> clean
;
2312 loff_t o_unclean
= 0;
2314 for (map
<loff_t
,BufferHead
*>::iterator p
= ob
->data
.begin();
2315 p
!= ob
->data
.end();
2317 BufferHead
*bh
= p
->second
;
2318 if (bh
->is_clean() || bh
->is_zero() || bh
->is_error())
2319 clean
.push_back(bh
);
2321 o_unclean
+= bh
->length();
2324 for (list
<BufferHead
*>::iterator p
= clean
.begin();
2331 if (ob
->can_close()) {
2332 ldout(cct
, 10) << "release trimming " << *ob
<< dendl
;
2334 assert(o_unclean
== 0);
2339 ldout(cct
, 10) << "release clearing complete on " << *ob
<< dendl
;
2340 ob
->complete
= false;
2343 ldout(cct
, 10) << "release setting exists on " << *ob
<< dendl
;
2350 loff_t
ObjectCacher::release_set(ObjectSet
*oset
)
2352 assert(lock
.is_locked());
2353 // return # bytes not clean (and thus not released).
2356 if (oset
->objects
.empty()) {
2357 ldout(cct
, 10) << "release_set on " << oset
<< " dne" << dendl
;
2361 ldout(cct
, 10) << "release_set " << oset
<< dendl
;
2363 xlist
<Object
*>::iterator q
;
2364 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin();
2370 loff_t o_unclean
= release(ob
);
2371 unclean
+= o_unclean
;
2374 ldout(cct
, 10) << "release_set " << oset
<< " " << *ob
2375 << " has " << o_unclean
<< " bytes left"
2381 ldout(cct
, 10) << "release_set " << oset
2382 << ", " << unclean
<< " bytes left" << dendl
;
2389 uint64_t ObjectCacher::release_all()
2391 assert(lock
.is_locked());
2392 ldout(cct
, 10) << "release_all" << dendl
;
2393 uint64_t unclean
= 0;
2395 vector
<ceph::unordered_map
<sobject_t
, Object
*> >::iterator i
2397 while (i
!= objects
.end()) {
2398 ceph::unordered_map
<sobject_t
, Object
*>::iterator p
= i
->begin();
2399 while (p
!= i
->end()) {
2400 ceph::unordered_map
<sobject_t
, Object
*>::iterator n
= p
;
2403 Object
*ob
= p
->second
;
2405 loff_t o_unclean
= release(ob
);
2406 unclean
+= o_unclean
;
2409 ldout(cct
, 10) << "release_all " << *ob
2410 << " has " << o_unclean
<< " bytes left"
2418 ldout(cct
, 10) << "release_all unclean " << unclean
<< " bytes left"
2425 void ObjectCacher::clear_nonexistence(ObjectSet
*oset
)
2427 assert(lock
.is_locked());
2428 ldout(cct
, 10) << "clear_nonexistence() " << oset
<< dendl
;
2430 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin();
2434 ldout(cct
, 10) << " setting exists and complete on " << *ob
<< dendl
;
2436 ob
->complete
= false;
2438 for (xlist
<C_ReadFinish
*>::iterator q
= ob
->reads
.begin();
2440 C_ReadFinish
*comp
= *q
;
2441 comp
->distrust_enoent();
2447 * discard object extents from an ObjectSet by removing the objects in
2448 * exls from the in-memory oset.
2450 void ObjectCacher::discard_set(ObjectSet
*oset
, const vector
<ObjectExtent
>& exls
)
2452 assert(lock
.is_locked());
2453 if (oset
->objects
.empty()) {
2454 ldout(cct
, 10) << "discard_set on " << oset
<< " dne" << dendl
;
2458 ldout(cct
, 10) << "discard_set " << oset
<< dendl
;
2460 bool were_dirty
= oset
->dirty_or_tx
> 0;
2462 for (vector
<ObjectExtent
>::const_iterator p
= exls
.begin();
2465 ldout(cct
, 10) << "discard_set " << oset
<< " ex " << *p
<< dendl
;
2466 const ObjectExtent
&ex
= *p
;
2467 sobject_t
soid(ex
.oid
, CEPH_NOSNAP
);
2468 if (objects
[oset
->poolid
].count(soid
) == 0)
2470 Object
*ob
= objects
[oset
->poolid
][soid
];
2472 ob
->discard(ex
.offset
, ex
.length
);
2475 // did we truncate off dirty data?
2476 if (flush_set_callback
&&
2477 were_dirty
&& oset
->dirty_or_tx
== 0)
2478 flush_set_callback(flush_set_callback_arg
, oset
);
2481 void ObjectCacher::verify_stats() const
2483 assert(lock
.is_locked());
2484 ldout(cct
, 10) << "verify_stats" << dendl
;
2486 loff_t clean
= 0, zero
= 0, dirty
= 0, rx
= 0, tx
= 0, missing
= 0,
2488 for (vector
<ceph::unordered_map
<sobject_t
, Object
*> >::const_iterator i
2492 for (ceph::unordered_map
<sobject_t
, Object
*>::const_iterator p
2496 Object
*ob
= p
->second
;
2497 for (map
<loff_t
, BufferHead
*>::const_iterator q
= ob
->data
.begin();
2498 q
!= ob
->data
.end();
2500 BufferHead
*bh
= q
->second
;
2501 switch (bh
->get_state()) {
2502 case BufferHead::STATE_MISSING
:
2503 missing
+= bh
->length();
2505 case BufferHead::STATE_CLEAN
:
2506 clean
+= bh
->length();
2508 case BufferHead::STATE_ZERO
:
2509 zero
+= bh
->length();
2511 case BufferHead::STATE_DIRTY
:
2512 dirty
+= bh
->length();
2514 case BufferHead::STATE_TX
:
2517 case BufferHead::STATE_RX
:
2520 case BufferHead::STATE_ERROR
:
2521 error
+= bh
->length();
2530 ldout(cct
, 10) << " clean " << clean
<< " rx " << rx
<< " tx " << tx
2531 << " dirty " << dirty
<< " missing " << missing
2532 << " error " << error
<< dendl
;
2533 assert(clean
== stat_clean
);
2534 assert(rx
== stat_rx
);
2535 assert(tx
== stat_tx
);
2536 assert(dirty
== stat_dirty
);
2537 assert(missing
== stat_missing
);
2538 assert(zero
== stat_zero
);
2539 assert(error
== stat_error
);
2542 void ObjectCacher::bh_stat_add(BufferHead
*bh
)
2544 assert(lock
.is_locked());
2545 switch (bh
->get_state()) {
2546 case BufferHead::STATE_MISSING
:
2547 stat_missing
+= bh
->length();
2549 case BufferHead::STATE_CLEAN
:
2550 stat_clean
+= bh
->length();
2552 case BufferHead::STATE_ZERO
:
2553 stat_zero
+= bh
->length();
2555 case BufferHead::STATE_DIRTY
:
2556 stat_dirty
+= bh
->length();
2557 bh
->ob
->dirty_or_tx
+= bh
->length();
2558 bh
->ob
->oset
->dirty_or_tx
+= bh
->length();
2560 case BufferHead::STATE_TX
:
2561 stat_tx
+= bh
->length();
2562 bh
->ob
->dirty_or_tx
+= bh
->length();
2563 bh
->ob
->oset
->dirty_or_tx
+= bh
->length();
2565 case BufferHead::STATE_RX
:
2566 stat_rx
+= bh
->length();
2568 case BufferHead::STATE_ERROR
:
2569 stat_error
+= bh
->length();
2572 assert(0 == "bh_stat_add: invalid bufferhead state");
2574 if (get_stat_dirty_waiting() > 0)
2578 void ObjectCacher::bh_stat_sub(BufferHead
*bh
)
2580 assert(lock
.is_locked());
2581 switch (bh
->get_state()) {
2582 case BufferHead::STATE_MISSING
:
2583 stat_missing
-= bh
->length();
2585 case BufferHead::STATE_CLEAN
:
2586 stat_clean
-= bh
->length();
2588 case BufferHead::STATE_ZERO
:
2589 stat_zero
-= bh
->length();
2591 case BufferHead::STATE_DIRTY
:
2592 stat_dirty
-= bh
->length();
2593 bh
->ob
->dirty_or_tx
-= bh
->length();
2594 bh
->ob
->oset
->dirty_or_tx
-= bh
->length();
2596 case BufferHead::STATE_TX
:
2597 stat_tx
-= bh
->length();
2598 bh
->ob
->dirty_or_tx
-= bh
->length();
2599 bh
->ob
->oset
->dirty_or_tx
-= bh
->length();
2601 case BufferHead::STATE_RX
:
2602 stat_rx
-= bh
->length();
2604 case BufferHead::STATE_ERROR
:
2605 stat_error
-= bh
->length();
2608 assert(0 == "bh_stat_sub: invalid bufferhead state");
2612 void ObjectCacher::bh_set_state(BufferHead
*bh
, int s
)
2614 assert(lock
.is_locked());
2615 int state
= bh
->get_state();
2616 // move between lru lists?
2617 if (s
== BufferHead::STATE_DIRTY
&& state
!= BufferHead::STATE_DIRTY
) {
2618 bh_lru_rest
.lru_remove(bh
);
2619 bh_lru_dirty
.lru_insert_top(bh
);
2620 } else if (s
!= BufferHead::STATE_DIRTY
&&state
== BufferHead::STATE_DIRTY
) {
2621 bh_lru_dirty
.lru_remove(bh
);
2622 if (bh
->get_dontneed())
2623 bh_lru_rest
.lru_insert_bot(bh
);
2625 bh_lru_rest
.lru_insert_top(bh
);
2628 if ((s
== BufferHead::STATE_TX
||
2629 s
== BufferHead::STATE_DIRTY
) &&
2630 state
!= BufferHead::STATE_TX
&&
2631 state
!= BufferHead::STATE_DIRTY
) {
2632 dirty_or_tx_bh
.insert(bh
);
2633 } else if ((state
== BufferHead::STATE_TX
||
2634 state
== BufferHead::STATE_DIRTY
) &&
2635 s
!= BufferHead::STATE_TX
&&
2636 s
!= BufferHead::STATE_DIRTY
) {
2637 dirty_or_tx_bh
.erase(bh
);
2640 if (s
!= BufferHead::STATE_ERROR
&&
2641 state
== BufferHead::STATE_ERROR
) {
2651 void ObjectCacher::bh_add(Object
*ob
, BufferHead
*bh
)
2653 assert(lock
.is_locked());
2654 ldout(cct
, 30) << "bh_add " << *ob
<< " " << *bh
<< dendl
;
2656 if (bh
->is_dirty()) {
2657 bh_lru_dirty
.lru_insert_top(bh
);
2658 dirty_or_tx_bh
.insert(bh
);
2660 if (bh
->get_dontneed())
2661 bh_lru_rest
.lru_insert_bot(bh
);
2663 bh_lru_rest
.lru_insert_top(bh
);
2667 dirty_or_tx_bh
.insert(bh
);
2672 void ObjectCacher::bh_remove(Object
*ob
, BufferHead
*bh
)
2674 assert(lock
.is_locked());
2675 assert(bh
->get_journal_tid() == 0);
2676 ldout(cct
, 30) << "bh_remove " << *ob
<< " " << *bh
<< dendl
;
2678 if (bh
->is_dirty()) {
2679 bh_lru_dirty
.lru_remove(bh
);
2680 dirty_or_tx_bh
.erase(bh
);
2682 bh_lru_rest
.lru_remove(bh
);
2686 dirty_or_tx_bh
.erase(bh
);
2689 if (get_stat_dirty_waiting() > 0)