1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include "msg/Messenger.h"
7 #include "ObjectCacher.h"
8 #include "WritebackHandler.h"
9 #include "common/errno.h"
10 #include "common/perf_counters.h"
12 #include "include/assert.h"
14 #define MAX_FLUSH_UNDER_LOCK 20 ///< max bh's we start writeback on
15 #define BUFFER_MEMORY_WEIGHT 12 // memory usage of BufferHead, count in (1<<n)
17 using std::chrono::seconds
;
18 /// while holding the lock
20 /*** ObjectCacher::BufferHead ***/
23 /*** ObjectCacher::Object ***/
25 #define dout_subsys ceph_subsys_objectcacher
27 #define dout_prefix *_dout << "objectcacher.object(" << oid << ") "
31 class ObjectCacher::C_ReadFinish
: public Context
{
37 xlist
<C_ReadFinish
*>::item set_item
;
44 C_ReadFinish(ObjectCacher
*c
, Object
*ob
, ceph_tid_t t
, loff_t s
,
45 uint64_t l
, const ZTracer::Trace
&trace
) :
46 oc(c
), poolid(ob
->oloc
.pool
), oid(ob
->get_soid()), start(s
), length(l
),
47 set_item(this), trust_enoent(true),
48 tid(t
), trace(trace
) {
49 ob
->reads
.push_back(&set_item
);
52 void finish(int r
) override
{
53 oc
->bh_read_finish(poolid
, oid
, tid
, start
, length
, bl
, r
, trust_enoent
);
54 trace
.event("finish");
56 // object destructor clears the list
57 if (set_item
.is_on_list())
58 set_item
.remove_myself();
61 void distrust_enoent() {
66 class ObjectCacher::C_RetryRead
: public Context
{
73 C_RetryRead(ObjectCacher
*_oc
, OSDRead
*r
, ObjectSet
*os
, Context
*c
,
74 const ZTracer::Trace
&trace
)
75 : oc(_oc
), rd(r
), oset(os
), onfinish(c
), trace(trace
) {
77 void finish(int r
) override
{
79 r
= oc
->_readx(rd
, oset
, onfinish
, false, &trace
);
83 // read is still in-progress
87 trace
.event("finish");
89 onfinish
->complete(r
);
94 ObjectCacher::BufferHead
*ObjectCacher::Object::split(BufferHead
*left
,
97 assert(oc
->lock
.is_locked());
98 ldout(oc
->cct
, 20) << "split " << *left
<< " at " << off
<< dendl
;
101 ObjectCacher::BufferHead
*right
= new BufferHead(this);
103 //inherit and if later access, this auto clean.
104 right
->set_dontneed(left
->get_dontneed());
105 right
->set_nocache(left
->get_nocache());
107 right
->last_write_tid
= left
->last_write_tid
;
108 right
->last_read_tid
= left
->last_read_tid
;
109 right
->set_state(left
->get_state());
110 right
->snapc
= left
->snapc
;
111 right
->set_journal_tid(left
->journal_tid
);
113 loff_t newleftlen
= off
- left
->start();
114 right
->set_start(off
);
115 right
->set_length(left
->length() - newleftlen
);
118 oc
->bh_stat_sub(left
);
119 left
->set_length(newleftlen
);
120 oc
->bh_stat_add(left
);
123 oc
->bh_add(this, right
);
129 assert(bl
.length() == (left
->length() + right
->length()));
130 right
->bl
.substr_of(bl
, left
->length(), right
->length());
131 left
->bl
.substr_of(bl
, 0, left
->length());
135 if (!left
->waitfor_read
.empty()) {
136 map
<loff_t
, list
<Context
*> >::iterator start_remove
137 = left
->waitfor_read
.begin();
138 while (start_remove
!= left
->waitfor_read
.end() &&
139 start_remove
->first
< right
->start())
141 for (map
<loff_t
, list
<Context
*> >::iterator p
= start_remove
;
142 p
!= left
->waitfor_read
.end(); ++p
) {
143 ldout(oc
->cct
, 20) << "split moving waiters at byte " << p
->first
144 << " to right bh" << dendl
;
145 right
->waitfor_read
[p
->first
].swap( p
->second
);
146 assert(p
->second
.empty());
148 left
->waitfor_read
.erase(start_remove
, left
->waitfor_read
.end());
151 ldout(oc
->cct
, 20) << "split left is " << *left
<< dendl
;
152 ldout(oc
->cct
, 20) << "split right is " << *right
<< dendl
;
157 void ObjectCacher::Object::merge_left(BufferHead
*left
, BufferHead
*right
)
159 assert(oc
->lock
.is_locked());
161 ldout(oc
->cct
, 10) << "merge_left " << *left
<< " + " << *right
<< dendl
;
162 if (left
->get_journal_tid() == 0) {
163 left
->set_journal_tid(right
->get_journal_tid());
165 right
->set_journal_tid(0);
167 oc
->bh_remove(this, right
);
168 oc
->bh_stat_sub(left
);
169 left
->set_length(left
->length() + right
->length());
170 oc
->bh_stat_add(left
);
173 left
->bl
.claim_append(right
->bl
);
176 // note: this is sorta busted, but should only be used for dirty buffers
177 left
->last_write_tid
= MAX( left
->last_write_tid
, right
->last_write_tid
);
178 left
->last_write
= MAX( left
->last_write
, right
->last_write
);
180 left
->set_dontneed(right
->get_dontneed() ? left
->get_dontneed() : false);
181 left
->set_nocache(right
->get_nocache() ? left
->get_nocache() : false);
184 for (map
<loff_t
, list
<Context
*> >::iterator p
= right
->waitfor_read
.begin();
185 p
!= right
->waitfor_read
.end();
187 left
->waitfor_read
[p
->first
].splice(left
->waitfor_read
[p
->first
].begin(),
193 ldout(oc
->cct
, 10) << "merge_left result " << *left
<< dendl
;
196 bool ObjectCacher::Object::can_merge_bh(BufferHead
*left
, BufferHead
*right
)
198 if (left
->end() != right
->start() ||
199 left
->get_state() != right
->get_state() ||
200 !left
->can_merge_journal(right
))
202 if (left
->is_tx() && left
->last_write_tid
!= right
->last_write_tid
)
207 void ObjectCacher::Object::try_merge_bh(BufferHead
*bh
)
209 assert(oc
->lock
.is_locked());
210 ldout(oc
->cct
, 10) << "try_merge_bh " << *bh
<< dendl
;
212 // do not merge rx buffers; last_read_tid may not match
217 map
<loff_t
,BufferHead
*>::iterator p
= data
.find(bh
->start());
218 assert(p
->second
== bh
);
219 if (p
!= data
.begin()) {
221 if (can_merge_bh(p
->second
, bh
)) {
222 merge_left(p
->second
, bh
);
229 assert(p
->second
== bh
);
231 if (p
!= data
.end() && can_merge_bh(bh
, p
->second
))
232 merge_left(bh
, p
->second
);
236 * count bytes we have cached in given range
238 bool ObjectCacher::Object::is_cached(loff_t cur
, loff_t left
) const
240 assert(oc
->lock
.is_locked());
241 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(cur
);
246 if (p
->first
<= cur
) {
248 loff_t lenfromcur
= MIN(p
->second
->end() - cur
, left
);
253 } else if (p
->first
> cur
) {
264 * all cached data in this range[off, off+len]
266 bool ObjectCacher::Object::include_all_cached_data(loff_t off
, loff_t len
)
268 assert(oc
->lock
.is_locked());
271 map
<loff_t
, BufferHead
*>::iterator first
= data
.begin();
272 map
<loff_t
, BufferHead
*>::reverse_iterator last
= data
.rbegin();
273 if (first
->second
->start() >= off
&& last
->second
->end() <= (off
+ len
))
280 * map a range of bytes into buffer_heads.
281 * - create missing buffer_heads as necessary.
283 int ObjectCacher::Object::map_read(ObjectExtent
&ex
,
284 map
<loff_t
, BufferHead
*>& hits
,
285 map
<loff_t
, BufferHead
*>& missing
,
286 map
<loff_t
, BufferHead
*>& rx
,
287 map
<loff_t
, BufferHead
*>& errors
)
289 assert(oc
->lock
.is_locked());
290 ldout(oc
->cct
, 10) << "map_read " << ex
.oid
<< " "
291 << ex
.offset
<< "~" << ex
.length
<< dendl
;
293 loff_t cur
= ex
.offset
;
294 loff_t left
= ex
.length
;
296 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(ex
.offset
);
299 if (p
== data
.end()) {
301 BufferHead
*n
= new BufferHead(this);
308 ldout(oc
->cct
, 20) << "map_read miss+complete+zero " << left
<< " left, " << *n
<< dendl
;
311 ldout(oc
->cct
, 20) << "map_read miss " << left
<< " left, " << *n
<< dendl
;
314 assert(cur
== (loff_t
)ex
.offset
+ (loff_t
)ex
.length
);
318 if (p
->first
<= cur
) {
319 // have it (or part of it)
320 BufferHead
*e
= p
->second
;
326 hits
[cur
] = e
; // readable!
327 ldout(oc
->cct
, 20) << "map_read hit " << *e
<< dendl
;
328 } else if (e
->is_rx()) {
329 rx
[cur
] = e
; // missing, not readable.
330 ldout(oc
->cct
, 20) << "map_read rx " << *e
<< dendl
;
331 } else if (e
->is_error()) {
333 ldout(oc
->cct
, 20) << "map_read error " << *e
<< dendl
;
338 loff_t lenfromcur
= MIN(e
->end() - cur
, left
);
344 } else if (p
->first
> cur
) {
346 loff_t next
= p
->first
;
347 BufferHead
*n
= new BufferHead(this);
348 loff_t len
= MIN(next
- cur
, left
);
355 ldout(oc
->cct
, 20) << "map_read gap+complete+zero " << *n
<< dendl
;
358 ldout(oc
->cct
, 20) << "map_read gap " << *n
<< dendl
;
360 cur
+= MIN(left
, n
->length());
361 left
-= MIN(left
, n
->length());
370 void ObjectCacher::Object::audit_buffers()
373 for (map
<loff_t
, BufferHead
*>::const_iterator it
= data
.begin();
374 it
!= data
.end(); ++it
) {
375 if (it
->first
!= it
->second
->start()) {
376 lderr(oc
->cct
) << "AUDIT FAILURE: map position " << it
->first
377 << " does not match bh start position: "
378 << *it
->second
<< dendl
;
379 assert(it
->first
== it
->second
->start());
381 if (it
->first
< offset
) {
382 lderr(oc
->cct
) << "AUDIT FAILURE: " << it
->first
<< " " << *it
->second
383 << " overlaps with previous bh " << *((--it
)->second
)
385 assert(it
->first
>= offset
);
387 BufferHead
*bh
= it
->second
;
388 map
<loff_t
, list
<Context
*> >::const_iterator w_it
;
389 for (w_it
= bh
->waitfor_read
.begin();
390 w_it
!= bh
->waitfor_read
.end(); ++w_it
) {
391 if (w_it
->first
< bh
->start() ||
392 w_it
->first
>= bh
->start() + bh
->length()) {
393 lderr(oc
->cct
) << "AUDIT FAILURE: waiter at " << w_it
->first
394 << " is not within bh " << *bh
<< dendl
;
395 assert(w_it
->first
>= bh
->start());
396 assert(w_it
->first
< bh
->start() + bh
->length());
399 offset
= it
->first
+ it
->second
->length();
404 * map a range of extents on an object's buffer cache.
405 * - combine any bh's we're writing into one
406 * - break up bufferheads that don't fall completely within the range
407 * //no! - return a bh that includes the write. may also include
408 * other dirty data to left and/or right.
410 ObjectCacher::BufferHead
*ObjectCacher::Object::map_write(ObjectExtent
&ex
,
413 assert(oc
->lock
.is_locked());
414 BufferHead
*final
= 0;
416 ldout(oc
->cct
, 10) << "map_write oex " << ex
.oid
417 << " " << ex
.offset
<< "~" << ex
.length
<< dendl
;
419 loff_t cur
= ex
.offset
;
420 loff_t left
= ex
.length
;
422 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(ex
.offset
);
427 if (p
== data
.end()) {
429 final
= new BufferHead(this);
430 replace_journal_tid(final
, tid
);
431 final
->set_start( cur
);
432 final
->set_length( max
);
433 oc
->bh_add(this, final
);
434 ldout(oc
->cct
, 10) << "map_write adding trailing bh " << *final
<< dendl
;
436 oc
->bh_stat_sub(final
);
437 final
->set_length(final
->length() + max
);
438 oc
->bh_stat_add(final
);
445 ldout(oc
->cct
, 10) << "cur is " << cur
<< ", p is " << *p
->second
<< dendl
;
446 //oc->verify_stats();
448 if (p
->first
<= cur
) {
449 BufferHead
*bh
= p
->second
;
450 ldout(oc
->cct
, 10) << "map_write bh " << *bh
<< " intersected" << dendl
;
452 if (p
->first
< cur
) {
454 if (cur
+ max
>= bh
->end()) {
455 // we want right bit (one splice)
456 final
= split(bh
, cur
); // just split it, take right half.
457 replace_journal_tid(final
, tid
);
459 assert(p
->second
== final
);
461 // we want middle bit (two splices)
462 final
= split(bh
, cur
);
464 assert(p
->second
== final
);
465 split(final
, cur
+max
);
466 replace_journal_tid(final
, tid
);
469 assert(p
->first
== cur
);
470 if (bh
->length() <= max
) {
471 // whole bufferhead, piece of cake.
473 // we want left bit (one splice)
474 split(bh
, cur
+ max
); // just split
478 oc
->mark_dirty(final
);
479 --p
; // move iterator back to final
480 assert(p
->second
== final
);
481 replace_journal_tid(bh
, tid
);
482 merge_left(final
, bh
);
485 replace_journal_tid(final
, tid
);
490 loff_t lenfromcur
= final
->end() - cur
;
497 loff_t next
= p
->first
;
498 loff_t glen
= MIN(next
- cur
, max
);
499 ldout(oc
->cct
, 10) << "map_write gap " << cur
<< "~" << glen
<< dendl
;
501 oc
->bh_stat_sub(final
);
502 final
->set_length(final
->length() + glen
);
503 oc
->bh_stat_add(final
);
505 final
= new BufferHead(this);
506 replace_journal_tid(final
, tid
);
507 final
->set_start( cur
);
508 final
->set_length( glen
);
509 oc
->bh_add(this, final
);
520 assert(final
->get_journal_tid() == tid
);
521 ldout(oc
->cct
, 10) << "map_write final is " << *final
<< dendl
;
526 void ObjectCacher::Object::replace_journal_tid(BufferHead
*bh
,
528 ceph_tid_t bh_tid
= bh
->get_journal_tid();
530 assert(tid
== 0 || bh_tid
<= tid
);
531 if (bh_tid
!= 0 && bh_tid
!= tid
) {
532 // inform journal that it should not expect a writeback from this extent
533 oc
->writeback_handler
.overwrite_extent(get_oid(), bh
->start(),
534 bh
->length(), bh_tid
, tid
);
536 bh
->set_journal_tid(tid
);
539 void ObjectCacher::Object::truncate(loff_t s
)
541 assert(oc
->lock
.is_locked());
542 ldout(oc
->cct
, 10) << "truncate " << *this << " to " << s
<< dendl
;
544 while (!data
.empty()) {
545 BufferHead
*bh
= data
.rbegin()->second
;
549 // split bh at truncation point?
550 if (bh
->start() < s
) {
555 // remove bh entirely
556 assert(bh
->start() >= s
);
557 assert(bh
->waitfor_read
.empty());
558 replace_journal_tid(bh
, 0);
559 oc
->bh_remove(this, bh
);
564 void ObjectCacher::Object::discard(loff_t off
, loff_t len
,
565 C_GatherBuilder
* commit_gather
)
567 assert(oc
->lock
.is_locked());
568 ldout(oc
->cct
, 10) << "discard " << *this << " " << off
<< "~" << len
572 ldout(oc
->cct
, 10) << " setting exists on " << *this << dendl
;
576 ldout(oc
->cct
, 10) << " clearing complete on " << *this << dendl
;
580 map
<loff_t
, BufferHead
*>::const_iterator p
= data_lower_bound(off
);
581 while (p
!= data
.end()) {
582 BufferHead
*bh
= p
->second
;
583 if (bh
->start() >= off
+ len
)
586 // split bh at truncation point?
587 if (bh
->start() < off
) {
593 assert(bh
->start() >= off
);
594 if (bh
->end() > off
+ len
) {
595 split(bh
, off
+ len
);
599 ldout(oc
->cct
, 10) << "discard " << *this << " bh " << *bh
<< dendl
;
600 replace_journal_tid(bh
, 0);
602 if (bh
->is_tx() && commit_gather
!= nullptr) {
603 // wait for the writeback to commit
604 waitfor_commit
[bh
->last_write_tid
].emplace_back(commit_gather
->new_sub());
605 } else if (bh
->is_rx()) {
606 // cannot remove bh with in-flight read, but we can ensure the
607 // read won't overwrite the discard
608 bh
->last_read_tid
= ++oc
->last_read_tid
;
610 bh
->set_nocache(true);
612 // we should mark all Rx bh to zero
615 assert(bh
->waitfor_read
.empty());
618 oc
->bh_remove(this, bh
);
625 /*** ObjectCacher ***/
628 #define dout_prefix *_dout << "objectcacher "
631 ObjectCacher::ObjectCacher(CephContext
*cct_
, string name
,
632 WritebackHandler
& wb
, Mutex
& l
,
633 flush_set_callback_t flush_callback
,
634 void *flush_callback_arg
, uint64_t max_bytes
,
635 uint64_t max_objects
, uint64_t max_dirty
,
636 uint64_t target_dirty
, double max_dirty_age
,
637 bool block_writes_upfront
)
639 cct(cct_
), writeback_handler(wb
), name(name
), lock(l
),
640 max_dirty(max_dirty
), target_dirty(target_dirty
),
641 max_size(max_bytes
), max_objects(max_objects
),
642 max_dirty_age(ceph::make_timespan(max_dirty_age
)),
643 block_writes_upfront(block_writes_upfront
),
644 trace_endpoint("ObjectCacher"),
645 flush_set_callback(flush_callback
),
646 flush_set_callback_arg(flush_callback_arg
),
647 last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct
),
648 stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
649 stat_missing(0), stat_error(0), stat_dirty_waiting(0),
650 stat_nr_dirty_waiters(0), reads_outstanding(0)
654 scattered_write
= writeback_handler
.can_scattered_write();
657 ObjectCacher::~ObjectCacher()
661 // we should be empty.
662 for (vector
<ceph::unordered_map
<sobject_t
, Object
*> >::iterator i
667 assert(bh_lru_rest
.lru_get_size() == 0);
668 assert(bh_lru_dirty
.lru_get_size() == 0);
669 assert(ob_lru
.lru_get_size() == 0);
670 assert(dirty_or_tx_bh
.empty());
673 void ObjectCacher::perf_start()
675 string n
= "objectcacher-" + name
;
676 PerfCountersBuilder
plb(cct
, n
, l_objectcacher_first
, l_objectcacher_last
);
678 plb
.add_u64_counter(l_objectcacher_cache_ops_hit
,
679 "cache_ops_hit", "Hit operations");
680 plb
.add_u64_counter(l_objectcacher_cache_ops_miss
,
681 "cache_ops_miss", "Miss operations");
682 plb
.add_u64_counter(l_objectcacher_cache_bytes_hit
,
683 "cache_bytes_hit", "Hit data");
684 plb
.add_u64_counter(l_objectcacher_cache_bytes_miss
,
685 "cache_bytes_miss", "Miss data");
686 plb
.add_u64_counter(l_objectcacher_data_read
,
687 "data_read", "Read data");
688 plb
.add_u64_counter(l_objectcacher_data_written
,
689 "data_written", "Data written to cache");
690 plb
.add_u64_counter(l_objectcacher_data_flushed
,
691 "data_flushed", "Data flushed");
692 plb
.add_u64_counter(l_objectcacher_overwritten_in_flush
,
693 "data_overwritten_while_flushing",
694 "Data overwritten while flushing");
695 plb
.add_u64_counter(l_objectcacher_write_ops_blocked
, "write_ops_blocked",
696 "Write operations, delayed due to dirty limits");
697 plb
.add_u64_counter(l_objectcacher_write_bytes_blocked
,
698 "write_bytes_blocked",
699 "Write data blocked on dirty limit");
700 plb
.add_time(l_objectcacher_write_time_blocked
, "write_time_blocked",
701 "Time spent blocking a write due to dirty limits");
703 perfcounter
= plb
.create_perf_counters();
704 cct
->get_perfcounters_collection()->add(perfcounter
);
707 void ObjectCacher::perf_stop()
710 cct
->get_perfcounters_collection()->remove(perfcounter
);
715 ObjectCacher::Object
*ObjectCacher::get_object(sobject_t oid
,
719 uint64_t truncate_size
,
720 uint64_t truncate_seq
)
722 // XXX: Add handling of nspace in object_locator_t in cache
723 assert(lock
.is_locked());
725 if ((uint32_t)l
.pool
< objects
.size()) {
726 if (objects
[l
.pool
].count(oid
)) {
727 Object
*o
= objects
[l
.pool
][oid
];
728 o
->object_no
= object_no
;
729 o
->truncate_size
= truncate_size
;
730 o
->truncate_seq
= truncate_seq
;
734 objects
.resize(l
.pool
+1);
738 Object
*o
= new Object(this, oid
, object_no
, oset
, l
, truncate_size
,
740 objects
[l
.pool
][oid
] = o
;
741 ob_lru
.lru_insert_top(o
);
745 void ObjectCacher::close_object(Object
*ob
)
747 assert(lock
.is_locked());
748 ldout(cct
, 10) << "close_object " << *ob
<< dendl
;
749 assert(ob
->can_close());
752 ob_lru
.lru_remove(ob
);
753 objects
[ob
->oloc
.pool
].erase(ob
->get_soid());
754 ob
->set_item
.remove_myself();
758 void ObjectCacher::bh_read(BufferHead
*bh
, int op_flags
,
759 const ZTracer::Trace
&parent_trace
)
761 assert(lock
.is_locked());
762 ldout(cct
, 7) << "bh_read on " << *bh
<< " outstanding reads "
763 << reads_outstanding
<< dendl
;
765 ZTracer::Trace trace
;
766 if (parent_trace
.valid()) {
767 trace
.init("", &trace_endpoint
, &parent_trace
);
768 trace
.copy_name("bh_read " + bh
->ob
->get_oid().name
);
769 trace
.event("start");
773 bh
->last_read_tid
= ++last_read_tid
;
776 C_ReadFinish
*onfinish
= new C_ReadFinish(this, bh
->ob
, bh
->last_read_tid
,
777 bh
->start(), bh
->length(), trace
);
779 writeback_handler
.read(bh
->ob
->get_oid(), bh
->ob
->get_object_number(),
780 bh
->ob
->get_oloc(), bh
->start(), bh
->length(),
781 bh
->ob
->get_snap(), &onfinish
->bl
,
782 bh
->ob
->truncate_size
, bh
->ob
->truncate_seq
,
783 op_flags
, trace
, onfinish
);
788 void ObjectCacher::bh_read_finish(int64_t poolid
, sobject_t oid
,
789 ceph_tid_t tid
, loff_t start
,
790 uint64_t length
, bufferlist
&bl
, int r
,
793 assert(lock
.is_locked());
794 ldout(cct
, 7) << "bh_read_finish "
797 << " " << start
<< "~" << length
798 << " (bl is " << bl
.length() << ")"
800 << " outstanding reads " << reads_outstanding
803 if (r
>= 0 && bl
.length() < length
) {
804 ldout(cct
, 7) << "bh_read_finish " << oid
<< " padding " << start
<< "~"
805 << length
<< " with " << length
- bl
.length() << " bytes of zeroes"
807 bl
.append_zero(length
- bl
.length());
813 if (objects
[poolid
].count(oid
) == 0) {
814 ldout(cct
, 7) << "bh_read_finish no object cache" << dendl
;
816 Object
*ob
= objects
[poolid
][oid
];
818 if (r
== -ENOENT
&& !ob
->complete
) {
819 // wake up *all* rx waiters, or else we risk reordering
820 // identical reads. e.g.
822 // reply to unrelated 3~1 -> !exists
823 // read 1~1 -> immediate ENOENT
824 // reply to first 1~1 -> ooo ENOENT
826 for (map
<loff_t
, BufferHead
*>::iterator p
= ob
->data
.begin();
827 p
!= ob
->data
.end(); ++p
) {
828 BufferHead
*bh
= p
->second
;
829 for (map
<loff_t
, list
<Context
*> >::iterator p
830 = bh
->waitfor_read
.begin();
831 p
!= bh
->waitfor_read
.end();
833 ls
.splice(ls
.end(), p
->second
);
834 bh
->waitfor_read
.clear();
835 if (!bh
->is_zero() && !bh
->is_rx())
839 // just pass through and retry all waiters if we don't trust
840 // -ENOENT for this read
843 << "bh_read_finish ENOENT, marking complete and !exists on " << *ob
848 /* If all the bhs are effectively zero, get rid of them. All
849 * the waiters will be retried and get -ENOENT immediately, so
850 * it's safe to clean up the unneeded bh's now. Since we know
851 * it's safe to remove them now, do so, so they aren't hanging
852 *around waiting for more -ENOENTs from rados while the cache
853 * is being shut down.
855 * Only do this when all the bhs are rx or clean, to match the
856 * condition in _readx(). If there are any non-rx or non-clean
857 * bhs, _readx() will wait for the final result instead of
858 * returning -ENOENT immediately.
862 << "bh_read_finish ENOENT and allzero, getting rid of "
863 << "bhs for " << *ob
<< dendl
;
864 map
<loff_t
, BufferHead
*>::iterator p
= ob
->data
.begin();
865 while (p
!= ob
->data
.end()) {
866 BufferHead
*bh
= p
->second
;
867 // current iterator will be invalidated by bh_remove()
879 map
<loff_t
, BufferHead
*>::const_iterator p
= ob
->data_lower_bound(opos
);
880 if (p
== ob
->data
.end())
882 if (opos
>= start
+(loff_t
)length
) {
883 ldout(cct
, 20) << "break due to opos " << opos
<< " >= start+length "
884 << start
<< "+" << length
<< "=" << start
+(loff_t
)length
889 BufferHead
*bh
= p
->second
;
890 ldout(cct
, 20) << "checking bh " << *bh
<< dendl
;
893 for (map
<loff_t
, list
<Context
*> >::iterator it
894 = bh
->waitfor_read
.begin();
895 it
!= bh
->waitfor_read
.end();
897 ls
.splice(ls
.end(), it
->second
);
898 bh
->waitfor_read
.clear();
900 if (bh
->start() > opos
) {
901 ldout(cct
, 1) << "bh_read_finish skipping gap "
902 << opos
<< "~" << bh
->start() - opos
909 ldout(cct
, 10) << "bh_read_finish skipping non-rx " << *bh
<< dendl
;
914 if (bh
->last_read_tid
!= tid
) {
915 ldout(cct
, 10) << "bh_read_finish bh->last_read_tid "
916 << bh
->last_read_tid
<< " != tid " << tid
917 << ", skipping" << dendl
;
922 assert(opos
>= bh
->start());
923 assert(bh
->start() == opos
); // we don't merge rx bh's... yet!
924 assert(bh
->length() <= start
+(loff_t
)length
-opos
);
933 ldout(cct
, 10) << "bh_read_finish removing " << *bh
<< dendl
;
937 ldout(cct
, 10) << "skipping unstrusted -ENOENT and will retry for "
953 ldout(cct
, 10) << "bh_read_finish read " << *bh
<< dendl
;
955 ob
->try_merge_bh(bh
);
959 // called with lock held.
960 ldout(cct
, 20) << "finishing waiters " << ls
<< dendl
;
962 finish_contexts(cct
, ls
, err
);
963 retry_waiting_reads();
969 void ObjectCacher::bh_write_adjacencies(BufferHead
*bh
, ceph::real_time cutoff
,
970 int64_t *max_amount
, int *max_count
)
972 list
<BufferHead
*> blist
;
975 int64_t total_len
= 0;
976 set
<BufferHead
*, BufferHead::ptr_lt
>::iterator it
= dirty_or_tx_bh
.find(bh
);
977 assert(it
!= dirty_or_tx_bh
.end());
978 for (set
<BufferHead
*, BufferHead::ptr_lt
>::iterator p
= it
;
979 p
!= dirty_or_tx_bh
.end();
981 BufferHead
*obh
= *p
;
982 if (obh
->ob
!= bh
->ob
)
984 if (obh
->is_dirty() && obh
->last_write
<= cutoff
) {
985 blist
.push_back(obh
);
987 total_len
+= obh
->length();
988 if ((max_count
&& count
> *max_count
) ||
989 (max_amount
&& total_len
> *max_amount
))
994 while (it
!= dirty_or_tx_bh
.begin()) {
996 BufferHead
*obh
= *it
;
997 if (obh
->ob
!= bh
->ob
)
999 if (obh
->is_dirty() && obh
->last_write
<= cutoff
) {
1000 blist
.push_front(obh
);
1002 total_len
+= obh
->length();
1003 if ((max_count
&& count
> *max_count
) ||
1004 (max_amount
&& total_len
> *max_amount
))
1009 *max_count
-= count
;
1011 *max_amount
-= total_len
;
1013 bh_write_scattered(blist
);
1016 class ObjectCacher::C_WriteCommit
: public Context
{
1020 vector
<pair
<loff_t
, uint64_t> > ranges
;
1021 ZTracer::Trace trace
;
1024 C_WriteCommit(ObjectCacher
*c
, int64_t _poolid
, sobject_t o
, loff_t s
,
1025 uint64_t l
, const ZTracer::Trace
&trace
) :
1026 oc(c
), poolid(_poolid
), oid(o
), trace(trace
) {
1027 ranges
.push_back(make_pair(s
, l
));
1029 C_WriteCommit(ObjectCacher
*c
, int64_t _poolid
, sobject_t o
,
1030 vector
<pair
<loff_t
, uint64_t> >& _ranges
) :
1031 oc(c
), poolid(_poolid
), oid(o
), tid(0) {
1032 ranges
.swap(_ranges
);
1034 void finish(int r
) override
{
1035 oc
->bh_write_commit(poolid
, oid
, ranges
, tid
, r
);
1036 trace
.event("finish");
1039 void ObjectCacher::bh_write_scattered(list
<BufferHead
*>& blist
)
1041 assert(lock
.is_locked());
1043 Object
*ob
= blist
.front()->ob
;
1046 ceph::real_time last_write
;
1048 vector
<pair
<loff_t
, uint64_t> > ranges
;
1049 vector
<pair
<uint64_t, bufferlist
> > io_vec
;
1051 ranges
.reserve(blist
.size());
1052 io_vec
.reserve(blist
.size());
1054 uint64_t total_len
= 0;
1055 for (list
<BufferHead
*>::iterator p
= blist
.begin(); p
!= blist
.end(); ++p
) {
1056 BufferHead
*bh
= *p
;
1057 ldout(cct
, 7) << "bh_write_scattered " << *bh
<< dendl
;
1058 assert(bh
->ob
== ob
);
1059 assert(bh
->bl
.length() == bh
->length());
1060 ranges
.push_back(pair
<loff_t
, uint64_t>(bh
->start(), bh
->length()));
1062 int n
= io_vec
.size();
1063 io_vec
.resize(n
+ 1);
1064 io_vec
[n
].first
= bh
->start();
1065 io_vec
[n
].second
= bh
->bl
;
1067 total_len
+= bh
->length();
1068 if (bh
->snapc
.seq
> snapc
.seq
)
1070 if (bh
->last_write
> last_write
)
1071 last_write
= bh
->last_write
;
1074 C_WriteCommit
*oncommit
= new C_WriteCommit(this, ob
->oloc
.pool
, ob
->get_soid(), ranges
);
1076 ceph_tid_t tid
= writeback_handler
.write(ob
->get_oid(), ob
->get_oloc(),
1077 io_vec
, snapc
, last_write
,
1078 ob
->truncate_size
, ob
->truncate_seq
,
1080 oncommit
->tid
= tid
;
1081 ob
->last_write_tid
= tid
;
1082 for (list
<BufferHead
*>::iterator p
= blist
.begin(); p
!= blist
.end(); ++p
) {
1083 BufferHead
*bh
= *p
;
1084 bh
->last_write_tid
= tid
;
1089 perfcounter
->inc(l_objectcacher_data_flushed
, total_len
);
1092 void ObjectCacher::bh_write(BufferHead
*bh
, const ZTracer::Trace
&parent_trace
)
1094 assert(lock
.is_locked());
1095 ldout(cct
, 7) << "bh_write " << *bh
<< dendl
;
1099 ZTracer::Trace trace
;
1100 if (parent_trace
.valid()) {
1101 trace
.init("", &trace_endpoint
, &parent_trace
);
1102 trace
.copy_name("bh_write " + bh
->ob
->get_oid().name
);
1103 trace
.event("start");
1107 C_WriteCommit
*oncommit
= new C_WriteCommit(this, bh
->ob
->oloc
.pool
,
1108 bh
->ob
->get_soid(), bh
->start(),
1109 bh
->length(), trace
);
1111 ceph_tid_t tid
= writeback_handler
.write(bh
->ob
->get_oid(),
1113 bh
->start(), bh
->length(),
1114 bh
->snapc
, bh
->bl
, bh
->last_write
,
1115 bh
->ob
->truncate_size
,
1116 bh
->ob
->truncate_seq
,
1117 bh
->journal_tid
, trace
, oncommit
);
1118 ldout(cct
, 20) << " tid " << tid
<< " on " << bh
->ob
->get_oid() << dendl
;
1120 // set bh last_write_tid
1121 oncommit
->tid
= tid
;
1122 bh
->ob
->last_write_tid
= tid
;
1123 bh
->last_write_tid
= tid
;
1126 perfcounter
->inc(l_objectcacher_data_flushed
, bh
->length());
1132 void ObjectCacher::bh_write_commit(int64_t poolid
, sobject_t oid
,
1133 vector
<pair
<loff_t
, uint64_t> >& ranges
,
1134 ceph_tid_t tid
, int r
)
1136 assert(lock
.is_locked());
1137 ldout(cct
, 7) << "bh_write_commit " << oid
<< " tid " << tid
1138 << " ranges " << ranges
<< " returned " << r
<< dendl
;
1140 if (objects
[poolid
].count(oid
) == 0) {
1141 ldout(cct
, 7) << "bh_write_commit no object cache" << dendl
;
1145 Object
*ob
= objects
[poolid
][oid
];
1146 int was_dirty_or_tx
= ob
->oset
->dirty_or_tx
;
1148 for (vector
<pair
<loff_t
, uint64_t> >::iterator p
= ranges
.begin();
1151 loff_t start
= p
->first
;
1152 uint64_t length
= p
->second
;
1154 ldout(cct
, 10) << "bh_write_commit marking exists on " << *ob
<< dendl
;
1157 if (writeback_handler
.may_copy_on_write(ob
->get_oid(), start
, length
,
1159 ldout(cct
, 10) << "bh_write_commit may copy on write, clearing "
1160 "complete on " << *ob
<< dendl
;
1161 ob
->complete
= false;
1165 vector
<pair
<loff_t
, BufferHead
*>> hit
;
1167 for (map
<loff_t
, BufferHead
*>::const_iterator p
= ob
->data_lower_bound(start
);
1168 p
!= ob
->data
.end();
1170 BufferHead
*bh
= p
->second
;
1172 if (bh
->start() >= start
+(loff_t
)length
)
1175 // make sure bh is tx
1177 ldout(cct
, 10) << "bh_write_commit skipping non-tx " << *bh
<< dendl
;
1181 // make sure bh tid matches
1182 if (bh
->last_write_tid
!= tid
) {
1183 assert(bh
->last_write_tid
> tid
);
1184 ldout(cct
, 10) << "bh_write_commit newer tid on " << *bh
<< dendl
;
1188 // we don't merge tx buffers. tx buffer should be within the range
1189 assert(bh
->start() >= start
);
1190 assert(bh
->end() <= start
+(loff_t
)length
);
1193 // ok! mark bh clean and error-free
1195 bh
->set_journal_tid(0);
1196 if (bh
->get_nocache())
1197 bh_lru_rest
.lru_bottouch(bh
);
1198 hit
.push_back(make_pair(bh
->start(), bh
));
1199 ldout(cct
, 10) << "bh_write_commit clean " << *bh
<< dendl
;
1202 ldout(cct
, 10) << "bh_write_commit marking dirty again due to error "
1203 << *bh
<< " r = " << r
<< " " << cpp_strerror(-r
)
1208 for (auto& p
: hit
) {
1209 //p.second maybe merged and deleted in merge_left
1210 if (ob
->data
.count(p
.first
))
1211 ob
->try_merge_bh(p
.second
);
1215 // update last_commit.
1216 assert(ob
->last_commit_tid
< tid
);
1217 ob
->last_commit_tid
= tid
;
1221 if (ob
->waitfor_commit
.count(tid
)) {
1222 ls
.splice(ls
.begin(), ob
->waitfor_commit
[tid
]);
1223 ob
->waitfor_commit
.erase(tid
);
1226 // is the entire object set now clean and fully committed?
1227 ObjectSet
*oset
= ob
->oset
;
1230 if (flush_set_callback
&&
1231 was_dirty_or_tx
> 0 &&
1232 oset
->dirty_or_tx
== 0) { // nothing dirty/tx
1233 flush_set_callback(flush_set_callback_arg
, oset
);
1237 finish_contexts(cct
, ls
, r
);
1240 void ObjectCacher::flush(ZTracer::Trace
*trace
, loff_t amount
)
1242 assert(trace
!= nullptr);
1243 assert(lock
.is_locked());
1244 ceph::real_time cutoff
= ceph::real_clock::now();
1246 ldout(cct
, 10) << "flush " << amount
<< dendl
;
1249 * NOTE: we aren't actually pulling things off the LRU here, just
1250 * looking at the tail item. Then we call bh_write, which moves it
1251 * to the other LRU, so that we can call
1252 * lru_dirty.lru_get_next_expire() again.
1254 int64_t left
= amount
;
1255 while (amount
== 0 || left
> 0) {
1256 BufferHead
*bh
= static_cast<BufferHead
*>(
1257 bh_lru_dirty
.lru_get_next_expire());
1259 if (bh
->last_write
> cutoff
) break;
1261 if (scattered_write
) {
1262 bh_write_adjacencies(bh
, cutoff
, amount
> 0 ? &left
: NULL
, NULL
);
1264 left
-= bh
->length();
1265 bh_write(bh
, *trace
);
1271 void ObjectCacher::trim()
1273 assert(lock
.is_locked());
1274 ldout(cct
, 10) << "trim start: bytes: max " << max_size
<< " clean "
1275 << get_stat_clean() << ", objects: max " << max_objects
1276 << " current " << ob_lru
.lru_get_size() << dendl
;
1278 uint64_t max_clean_bh
= max_size
>> BUFFER_MEMORY_WEIGHT
;
1279 uint64_t nr_clean_bh
= bh_lru_rest
.lru_get_size() - bh_lru_rest
.lru_get_num_pinned();
1280 while (get_stat_clean() > 0 &&
1281 ((uint64_t)get_stat_clean() > max_size
||
1282 nr_clean_bh
> max_clean_bh
)) {
1283 BufferHead
*bh
= static_cast<BufferHead
*>(bh_lru_rest
.lru_expire());
1287 ldout(cct
, 10) << "trim trimming " << *bh
<< dendl
;
1288 assert(bh
->is_clean() || bh
->is_zero() || bh
->is_error());
1290 Object
*ob
= bh
->ob
;
1297 ldout(cct
, 10) << "trim clearing complete on " << *ob
<< dendl
;
1298 ob
->complete
= false;
1302 while (ob_lru
.lru_get_size() > max_objects
) {
1303 Object
*ob
= static_cast<Object
*>(ob_lru
.lru_expire());
1307 ldout(cct
, 10) << "trim trimming " << *ob
<< dendl
;
1311 ldout(cct
, 10) << "trim finish: max " << max_size
<< " clean "
1312 << get_stat_clean() << ", objects: max " << max_objects
1313 << " current " << ob_lru
.lru_get_size() << dendl
;
1320 bool ObjectCacher::is_cached(ObjectSet
*oset
, vector
<ObjectExtent
>& extents
,
1323 assert(lock
.is_locked());
1324 for (vector
<ObjectExtent
>::iterator ex_it
= extents
.begin();
1325 ex_it
!= extents
.end();
1327 ldout(cct
, 10) << "is_cached " << *ex_it
<< dendl
;
1330 sobject_t
soid(ex_it
->oid
, snapid
);
1331 Object
*o
= get_object_maybe(soid
, ex_it
->oloc
);
1334 if (!o
->is_cached(ex_it
->offset
, ex_it
->length
))
1342 * returns # bytes read (if in cache). onfinish is untouched (caller
1344 * returns 0 if doing async read
1346 int ObjectCacher::readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
,
1347 ZTracer::Trace
*parent_trace
)
1349 ZTracer::Trace trace
;
1350 if (parent_trace
!= nullptr) {
1351 trace
.init("read", &trace_endpoint
, parent_trace
);
1352 trace
.event("start");
1355 int r
=_readx(rd
, oset
, onfinish
, true, &trace
);
1357 trace
.event("finish");
1362 int ObjectCacher::_readx(OSDRead
*rd
, ObjectSet
*oset
, Context
*onfinish
,
1363 bool external_call
, ZTracer::Trace
*trace
)
1365 assert(trace
!= nullptr);
1366 assert(lock
.is_locked());
1367 bool success
= true;
1369 uint64_t bytes_in_cache
= 0;
1370 uint64_t bytes_not_in_cache
= 0;
1371 uint64_t total_bytes_read
= 0;
1372 map
<uint64_t, bufferlist
> stripe_map
; // final buffer offset -> substring
1373 bool dontneed
= rd
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_DONTNEED
;
1374 bool nocache
= rd
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_NOCACHE
;
1377 * WARNING: we can only meaningfully return ENOENT if the read request
1378 * passed in a single ObjectExtent. Any caller who wants ENOENT instead of
1379 * zeroed buffers needs to feed single extents into readx().
1381 assert(!oset
->return_enoent
|| rd
->extents
.size() == 1);
1383 for (vector
<ObjectExtent
>::iterator ex_it
= rd
->extents
.begin();
1384 ex_it
!= rd
->extents
.end();
1386 ldout(cct
, 10) << "readx " << *ex_it
<< dendl
;
1388 total_bytes_read
+= ex_it
->length
;
1391 sobject_t
soid(ex_it
->oid
, rd
->snap
);
1392 Object
*o
= get_object(soid
, ex_it
->objectno
, oset
, ex_it
->oloc
,
1393 ex_it
->truncate_size
, oset
->truncate_seq
);
1397 // does not exist and no hits?
1398 if (oset
->return_enoent
&& !o
->exists
) {
1399 ldout(cct
, 10) << "readx object !exists, 1 extent..." << dendl
;
1401 // should we worry about COW underneath us?
1402 if (writeback_handler
.may_copy_on_write(soid
.oid
, ex_it
->offset
,
1403 ex_it
->length
, soid
.snap
)) {
1404 ldout(cct
, 20) << "readx may copy on write" << dendl
;
1406 list
<BufferHead
*> blist
;
1407 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= o
->data
.begin();
1408 bh_it
!= o
->data
.end();
1410 BufferHead
*bh
= bh_it
->second
;
1411 if (bh
->is_dirty() || bh
->is_tx()) {
1412 ldout(cct
, 10) << "readx flushing " << *bh
<< dendl
;
1414 if (bh
->is_dirty()) {
1415 if (scattered_write
)
1416 blist
.push_back(bh
);
1418 bh_write(bh
, *trace
);
1422 if (scattered_write
&& !blist
.empty())
1423 bh_write_scattered(blist
);
1425 ldout(cct
, 10) << "readx waiting on tid " << o
->last_write_tid
1426 << " on " << *o
<< dendl
;
1427 o
->waitfor_commit
[o
->last_write_tid
].push_back(
1428 new C_RetryRead(this,rd
, oset
, onfinish
, *trace
));
1429 // FIXME: perfcounter!
1434 // can we return ENOENT?
1435 bool allzero
= true;
1436 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= o
->data
.begin();
1437 bh_it
!= o
->data
.end();
1439 ldout(cct
, 20) << "readx ob has bh " << *bh_it
->second
<< dendl
;
1440 if (!bh_it
->second
->is_zero() && !bh_it
->second
->is_rx()) {
1446 ldout(cct
, 10) << "readx ob has all zero|rx, returning ENOENT"
1455 // map extent into bufferheads
1456 map
<loff_t
, BufferHead
*> hits
, missing
, rx
, errors
;
1457 o
->map_read(*ex_it
, hits
, missing
, rx
, errors
);
1458 if (external_call
) {
1459 // retry reading error buffers
1460 missing
.insert(errors
.begin(), errors
.end());
1462 // some reads had errors, fail later so completions
1463 // are cleaned up properly
1464 // TODO: make read path not call _readx for every completion
1465 hits
.insert(errors
.begin(), errors
.end());
1468 if (!missing
.empty() || !rx
.empty()) {
1470 map
<loff_t
, BufferHead
*>::iterator last
= missing
.end();
1471 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= missing
.begin();
1472 bh_it
!= missing
.end();
1474 uint64_t rx_bytes
= static_cast<uint64_t>(
1475 stat_rx
+ bh_it
->second
->length());
1476 bytes_not_in_cache
+= bh_it
->second
->length();
1477 if (!waitfor_read
.empty() || (stat_rx
> 0 && rx_bytes
> max_size
)) {
1478 // cache is full with concurrent reads -- wait for rx's to complete
1479 // to constrain memory growth (especially during copy-ups)
1481 ldout(cct
, 10) << "readx missed, waiting on cache to complete "
1482 << waitfor_read
.size() << " blocked reads, "
1483 << (MAX(rx_bytes
, max_size
) - max_size
)
1484 << " read bytes" << dendl
;
1485 waitfor_read
.push_back(new C_RetryRead(this, rd
, oset
, onfinish
,
1489 bh_remove(o
, bh_it
->second
);
1490 delete bh_it
->second
;
1492 bh_it
->second
->set_nocache(nocache
);
1493 bh_read(bh_it
->second
, rd
->fadvise_flags
, *trace
);
1494 if ((success
&& onfinish
) || last
!= missing
.end())
1500 //add wait in last bh avoid wakeup early. Because read is order
1501 if (last
!= missing
.end()) {
1502 ldout(cct
, 10) << "readx missed, waiting on " << *last
->second
1503 << " off " << last
->first
<< dendl
;
1504 last
->second
->waitfor_read
[last
->first
].push_back(
1505 new C_RetryRead(this, rd
, oset
, onfinish
, *trace
) );
1510 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= rx
.begin();
1513 touch_bh(bh_it
->second
); // bump in lru, so we don't lose it.
1514 if (success
&& onfinish
) {
1515 ldout(cct
, 10) << "readx missed, waiting on " << *bh_it
->second
1516 << " off " << bh_it
->first
<< dendl
;
1517 bh_it
->second
->waitfor_read
[bh_it
->first
].push_back(
1518 new C_RetryRead(this, rd
, oset
, onfinish
, *trace
) );
1520 bytes_not_in_cache
+= bh_it
->second
->length();
1524 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= hits
.begin();
1525 bh_it
!= hits
.end(); ++bh_it
)
1526 //bump in lru, so we don't lose it when later read
1527 touch_bh(bh_it
->second
);
1530 assert(!hits
.empty());
1532 // make a plain list
1533 for (map
<loff_t
, BufferHead
*>::iterator bh_it
= hits
.begin();
1534 bh_it
!= hits
.end();
1536 BufferHead
*bh
= bh_it
->second
;
1537 ldout(cct
, 10) << "readx hit bh " << *bh
<< dendl
;
1538 if (bh
->is_error() && bh
->error
)
1540 bytes_in_cache
+= bh
->length();
1542 if (bh
->get_nocache() && bh
->is_clean())
1543 bh_lru_rest
.lru_bottouch(bh
);
1546 //must be after touch_bh because touch_bh set dontneed false
1548 ((loff_t
)ex_it
->offset
<= bh
->start() &&
1549 (bh
->end() <=(loff_t
)(ex_it
->offset
+ ex_it
->length
)))) {
1550 bh
->set_dontneed(true); //if dirty
1552 bh_lru_rest
.lru_bottouch(bh
);
1557 // create reverse map of buffer offset -> object for the
1558 // eventual result. this is over a single ObjectExtent, so we
1560 // - the bh's are contiguous
1561 // - the buffer frags need not be (and almost certainly aren't)
1562 loff_t opos
= ex_it
->offset
;
1563 map
<loff_t
, BufferHead
*>::iterator bh_it
= hits
.begin();
1564 assert(bh_it
->second
->start() <= opos
);
1565 uint64_t bhoff
= opos
- bh_it
->second
->start();
1566 vector
<pair
<uint64_t,uint64_t> >::iterator f_it
1567 = ex_it
->buffer_extents
.begin();
1570 BufferHead
*bh
= bh_it
->second
;
1571 assert(opos
== (loff_t
)(bh
->start() + bhoff
));
1573 uint64_t len
= MIN(f_it
->second
- foff
, bh
->length() - bhoff
);
1574 ldout(cct
, 10) << "readx rmap opos " << opos
<< ": " << *bh
<< " +"
1575 << bhoff
<< " frag " << f_it
->first
<< "~"
1576 << f_it
->second
<< " +" << foff
<< "~" << len
1580 // put substr here first, since substr_of clobbers, and we
1581 // may get multiple bh's at this stripe_map position
1582 if (bh
->is_zero()) {
1583 stripe_map
[f_it
->first
].append_zero(len
);
1585 bit
.substr_of(bh
->bl
,
1588 stripe_map
[f_it
->first
].claim_append(bit
);
1594 if (opos
== bh
->end()) {
1598 if (foff
== f_it
->second
) {
1602 if (bh_it
== hits
.end()) break;
1603 if (f_it
== ex_it
->buffer_extents
.end())
1606 assert(f_it
== ex_it
->buffer_extents
.end());
1607 assert(opos
== (loff_t
)ex_it
->offset
+ (loff_t
)ex_it
->length
);
1610 if (dontneed
&& o
->include_all_cached_data(ex_it
->offset
, ex_it
->length
))
1616 if (perfcounter
&& external_call
) {
1617 perfcounter
->inc(l_objectcacher_data_read
, total_bytes_read
);
1618 perfcounter
->inc(l_objectcacher_cache_bytes_miss
, bytes_not_in_cache
);
1619 perfcounter
->inc(l_objectcacher_cache_ops_miss
);
1622 ldout(cct
, 20) << "readx defer " << rd
<< dendl
;
1624 ldout(cct
, 20) << "readx drop " << rd
<< " (no complete, but no waiter)"
1630 if (perfcounter
&& external_call
) {
1631 perfcounter
->inc(l_objectcacher_data_read
, total_bytes_read
);
1632 perfcounter
->inc(l_objectcacher_cache_bytes_hit
, bytes_in_cache
);
1633 perfcounter
->inc(l_objectcacher_cache_ops_hit
);
1636 // no misses... success! do the read.
1637 ldout(cct
, 10) << "readx has all buffers" << dendl
;
1639 // ok, assemble into result buffer.
1641 if (rd
->bl
&& !error
) {
1643 for (map
<uint64_t,bufferlist
>::iterator i
= stripe_map
.begin();
1644 i
!= stripe_map
.end();
1646 assert(pos
== i
->first
);
1647 ldout(cct
, 10) << "readx adding buffer len " << i
->second
.length()
1648 << " at " << pos
<< dendl
;
1649 pos
+= i
->second
.length();
1650 rd
->bl
->claim_append(i
->second
);
1651 assert(rd
->bl
->length() == pos
);
1653 ldout(cct
, 10) << "readx result is " << rd
->bl
->length() << dendl
;
1654 } else if (!error
) {
1655 ldout(cct
, 10) << "readx no bufferlist ptr (readahead?), done." << dendl
;
1656 map
<uint64_t,bufferlist
>::reverse_iterator i
= stripe_map
.rbegin();
1657 pos
= i
->first
+ i
->second
.length();
1661 int ret
= error
? error
: pos
;
1662 ldout(cct
, 20) << "readx done " << rd
<< " " << ret
<< dendl
;
1663 assert(pos
<= (uint64_t) INT_MAX
);
1672 void ObjectCacher::retry_waiting_reads()
1675 ls
.swap(waitfor_read
);
1677 while (!ls
.empty() && waitfor_read
.empty()) {
1678 Context
*ctx
= ls
.front();
1682 waitfor_read
.splice(waitfor_read
.end(), ls
);
1685 int ObjectCacher::writex(OSDWrite
*wr
, ObjectSet
*oset
, Context
*onfreespace
,
1686 ZTracer::Trace
*parent_trace
)
1688 assert(lock
.is_locked());
1689 ceph::real_time now
= ceph::real_clock::now();
1690 uint64_t bytes_written
= 0;
1691 uint64_t bytes_written_in_flush
= 0;
1692 bool dontneed
= wr
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_DONTNEED
;
1693 bool nocache
= wr
->fadvise_flags
& LIBRADOS_OP_FLAG_FADVISE_NOCACHE
;
1695 ZTracer::Trace trace
;
1696 if (parent_trace
!= nullptr) {
1697 trace
.init("write", &trace_endpoint
, parent_trace
);
1698 trace
.event("start");
1701 for (vector
<ObjectExtent
>::iterator ex_it
= wr
->extents
.begin();
1702 ex_it
!= wr
->extents
.end();
1705 sobject_t
soid(ex_it
->oid
, CEPH_NOSNAP
);
1706 Object
*o
= get_object(soid
, ex_it
->objectno
, oset
, ex_it
->oloc
,
1707 ex_it
->truncate_size
, oset
->truncate_seq
);
1709 // map it all into a single bufferhead.
1710 BufferHead
*bh
= o
->map_write(*ex_it
, wr
->journal_tid
);
1711 bool missing
= bh
->is_missing();
1712 bh
->snapc
= wr
->snapc
;
1714 bytes_written
+= ex_it
->length
;
1716 bytes_written_in_flush
+= ex_it
->length
;
1719 // adjust buffer pointers (ie "copy" data into my cache)
1720 // this is over a single ObjectExtent, so we know that
1721 // - there is one contiguous bh
1722 // - the buffer frags need not be (and almost certainly aren't)
1723 // note: i assume striping is monotonic... no jumps backwards, ever!
1724 loff_t opos
= ex_it
->offset
;
1725 for (vector
<pair
<uint64_t, uint64_t> >::iterator f_it
1726 = ex_it
->buffer_extents
.begin();
1727 f_it
!= ex_it
->buffer_extents
.end();
1729 ldout(cct
, 10) << "writex writing " << f_it
->first
<< "~"
1730 << f_it
->second
<< " into " << *bh
<< " at " << opos
1732 uint64_t bhoff
= bh
->start() - opos
;
1733 assert(f_it
->second
<= bh
->length() - bhoff
);
1735 // get the frag we're mapping in
1737 frag
.substr_of(wr
->bl
,
1738 f_it
->first
, f_it
->second
);
1740 // keep anything left of bhoff
1743 newbl
.substr_of(bh
->bl
, 0, bhoff
);
1744 newbl
.claim_append(frag
);
1747 opos
+= f_it
->second
;
1750 // ok, now bh is dirty.
1753 bh
->set_dontneed(true);
1754 else if (nocache
&& missing
)
1755 bh
->set_nocache(true);
1759 bh
->last_write
= now
;
1761 o
->try_merge_bh(bh
);
1765 perfcounter
->inc(l_objectcacher_data_written
, bytes_written
);
1766 if (bytes_written_in_flush
) {
1767 perfcounter
->inc(l_objectcacher_overwritten_in_flush
,
1768 bytes_written_in_flush
);
1772 int r
= _wait_for_write(wr
, bytes_written
, oset
, &trace
, onfreespace
);
1780 class ObjectCacher::C_WaitForWrite
: public Context
{
1782 C_WaitForWrite(ObjectCacher
*oc
, uint64_t len
,
1783 const ZTracer::Trace
&trace
, Context
*onfinish
) :
1784 m_oc(oc
), m_len(len
), m_trace(trace
), m_onfinish(onfinish
) {}
1785 void finish(int r
) override
;
1789 ZTracer::Trace m_trace
;
1790 Context
*m_onfinish
;
1793 void ObjectCacher::C_WaitForWrite::finish(int r
)
1795 Mutex::Locker
l(m_oc
->lock
);
1796 m_oc
->maybe_wait_for_writeback(m_len
, &m_trace
);
1797 m_onfinish
->complete(r
);
1800 void ObjectCacher::maybe_wait_for_writeback(uint64_t len
,
1801 ZTracer::Trace
*trace
)
1803 assert(lock
.is_locked());
1804 ceph::mono_time start
= ceph::mono_clock::now();
1806 // wait for writeback?
1807 // - wait for dirty and tx bytes (relative to the max_dirty threshold)
1808 // - do not wait for bytes other waiters are waiting on. this means that
1809 // threads do not wait for each other. this effectively allows the cache
1810 // size to balloon proportional to the data that is in flight.
1812 uint64_t max_dirty_bh
= max_dirty
>> BUFFER_MEMORY_WEIGHT
;
1813 while (get_stat_dirty() + get_stat_tx() > 0 &&
1814 (((uint64_t)(get_stat_dirty() + get_stat_tx()) >=
1815 max_dirty
+ get_stat_dirty_waiting()) ||
1816 (dirty_or_tx_bh
.size() >=
1817 max_dirty_bh
+ get_stat_nr_dirty_waiters()))) {
1820 trace
->event("start wait for writeback");
1822 ldout(cct
, 10) << __func__
<< " waiting for dirty|tx "
1823 << (get_stat_dirty() + get_stat_tx()) << " >= max "
1824 << max_dirty
<< " + dirty_waiting "
1825 << get_stat_dirty_waiting() << dendl
;
1826 flusher_cond
.Signal();
1827 stat_dirty_waiting
+= len
;
1828 ++stat_nr_dirty_waiters
;
1829 stat_cond
.Wait(lock
);
1830 stat_dirty_waiting
-= len
;
1831 --stat_nr_dirty_waiters
;
1833 ldout(cct
, 10) << __func__
<< " woke up" << dendl
;
1836 trace
->event("finish wait for writeback");
1838 if (blocked
&& perfcounter
) {
1839 perfcounter
->inc(l_objectcacher_write_ops_blocked
);
1840 perfcounter
->inc(l_objectcacher_write_bytes_blocked
, len
);
1841 ceph::timespan blocked
= ceph::mono_clock::now() - start
;
1842 perfcounter
->tinc(l_objectcacher_write_time_blocked
, blocked
);
1846 // blocking wait for write.
1847 int ObjectCacher::_wait_for_write(OSDWrite
*wr
, uint64_t len
, ObjectSet
*oset
,
1848 ZTracer::Trace
*trace
, Context
*onfreespace
)
1850 assert(lock
.is_locked());
1851 assert(trace
!= nullptr);
1854 if (max_dirty
> 0) {
1855 if (block_writes_upfront
) {
1856 maybe_wait_for_writeback(len
, trace
);
1858 onfreespace
->complete(0);
1860 assert(onfreespace
);
1861 finisher
.queue(new C_WaitForWrite(this, len
, *trace
, onfreespace
));
1864 // write-thru! flush what we just wrote.
1867 Context
*fin
= block_writes_upfront
?
1868 new C_Cond(&cond
, &done
, &ret
) : onfreespace
;
1870 bool flushed
= flush_set(oset
, wr
->extents
, trace
, fin
);
1871 assert(!flushed
); // we just dirtied it, and didn't drop our lock!
1872 ldout(cct
, 10) << "wait_for_write waiting on write-thru of " << len
1873 << " bytes" << dendl
;
1874 if (block_writes_upfront
) {
1877 ldout(cct
, 10) << "wait_for_write woke up, ret " << ret
<< dendl
;
1879 onfreespace
->complete(ret
);
1883 // start writeback anyway?
1884 if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty
) {
1885 ldout(cct
, 10) << "wait_for_write " << get_stat_dirty() << " > target "
1886 << target_dirty
<< ", nudging flusher" << dendl
;
1887 flusher_cond
.Signal();
1892 void ObjectCacher::flusher_entry()
1894 ldout(cct
, 10) << "flusher start" << dendl
;
1896 while (!flusher_stop
) {
1897 loff_t all
= get_stat_tx() + get_stat_rx() + get_stat_clean() +
1899 ldout(cct
, 11) << "flusher "
1900 << all
<< " / " << max_size
<< ": "
1901 << get_stat_tx() << " tx, "
1902 << get_stat_rx() << " rx, "
1903 << get_stat_clean() << " clean, "
1904 << get_stat_dirty() << " dirty ("
1905 << target_dirty
<< " target, "
1906 << max_dirty
<< " max)"
1908 loff_t actual
= get_stat_dirty() + get_stat_dirty_waiting();
1910 ZTracer::Trace trace
;
1911 if (cct
->_conf
->osdc_blkin_trace_all
) {
1912 trace
.init("flusher", &trace_endpoint
);
1913 trace
.event("start");
1916 if (actual
> 0 && (uint64_t) actual
> target_dirty
) {
1917 // flush some dirty pages
1918 ldout(cct
, 10) << "flusher " << get_stat_dirty() << " dirty + "
1919 << get_stat_dirty_waiting() << " dirty_waiting > target "
1920 << target_dirty
<< ", flushing some dirty bhs" << dendl
;
1921 flush(&trace
, actual
- target_dirty
);
1923 // check tail of lru for old dirty items
1924 ceph::real_time cutoff
= ceph::real_clock::now();
1925 cutoff
-= max_dirty_age
;
1927 int max
= MAX_FLUSH_UNDER_LOCK
;
1928 while ((bh
= static_cast<BufferHead
*>(bh_lru_dirty
.
1929 lru_get_next_expire())) != 0 &&
1930 bh
->last_write
<= cutoff
&&
1932 ldout(cct
, 10) << "flusher flushing aged dirty bh " << *bh
<< dendl
;
1933 if (scattered_write
) {
1934 bh_write_adjacencies(bh
, cutoff
, NULL
, &max
);
1936 bh_write(bh
, trace
);
1941 // back off the lock to avoid starving other threads
1942 trace
.event("backoff");
1949 trace
.event("finish");
1953 flusher_cond
.WaitInterval(lock
, seconds(1));
1956 /* Wait for reads to finish. This is only possible if handling
1957 * -ENOENT made some read completions finish before their rados read
1958 * came back. If we don't wait for them, and destroy the cache, when
1959 * the rados reads do come back their callback will try to access the
1960 * no-longer-valid ObjectCacher.
1962 while (reads_outstanding
> 0) {
1963 ldout(cct
, 10) << "Waiting for all reads to complete. Number left: "
1964 << reads_outstanding
<< dendl
;
1965 read_cond
.Wait(lock
);
1969 ldout(cct
, 10) << "flusher finish" << dendl
;
1973 // -------------------------------------------------
1975 bool ObjectCacher::set_is_empty(ObjectSet
*oset
)
1977 assert(lock
.is_locked());
1978 if (oset
->objects
.empty())
1981 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin(); !p
.end(); ++p
)
1982 if (!(*p
)->is_empty())
1988 bool ObjectCacher::set_is_cached(ObjectSet
*oset
)
1990 assert(lock
.is_locked());
1991 if (oset
->objects
.empty())
1994 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin();
1997 for (map
<loff_t
,BufferHead
*>::iterator q
= ob
->data
.begin();
1998 q
!= ob
->data
.end();
2000 BufferHead
*bh
= q
->second
;
2001 if (!bh
->is_dirty() && !bh
->is_tx())
2009 bool ObjectCacher::set_is_dirty_or_committing(ObjectSet
*oset
)
2011 assert(lock
.is_locked());
2012 if (oset
->objects
.empty())
2015 for (xlist
<Object
*>::iterator i
= oset
->objects
.begin();
2019 for (map
<loff_t
,BufferHead
*>::iterator p
= ob
->data
.begin();
2020 p
!= ob
->data
.end();
2022 BufferHead
*bh
= p
->second
;
2023 if (bh
->is_dirty() || bh
->is_tx())
2032 // purge. non-blocking. violently removes dirty buffers from cache.
2033 void ObjectCacher::purge(Object
*ob
)
2035 assert(lock
.is_locked());
2036 ldout(cct
, 10) << "purge " << *ob
<< dendl
;
2042 // flush. non-blocking. no callback.
2043 // true if clean, already flushed.
2044 // false if we wrote something.
2045 // be sloppy about the ranges and flush any buffer it touches
2046 bool ObjectCacher::flush(Object
*ob
, loff_t offset
, loff_t length
,
2047 ZTracer::Trace
*trace
)
2049 assert(trace
!= nullptr);
2050 assert(lock
.is_locked());
2051 list
<BufferHead
*> blist
;
2053 ldout(cct
, 10) << "flush " << *ob
<< " " << offset
<< "~" << length
<< dendl
;
2054 for (map
<loff_t
,BufferHead
*>::const_iterator p
= ob
->data_lower_bound(offset
);
2055 p
!= ob
->data
.end();
2057 BufferHead
*bh
= p
->second
;
2058 ldout(cct
, 20) << "flush " << *bh
<< dendl
;
2059 if (length
&& bh
->start() > offset
+length
) {
2066 if (!bh
->is_dirty()) {
2070 if (scattered_write
)
2071 blist
.push_back(bh
);
2073 bh_write(bh
, *trace
);
2076 if (scattered_write
&& !blist
.empty())
2077 bh_write_scattered(blist
);
2082 bool ObjectCacher::_flush_set_finish(C_GatherBuilder
*gather
,
2085 assert(lock
.is_locked());
2086 if (gather
->has_subs()) {
2087 gather
->set_finisher(onfinish
);
2092 ldout(cct
, 10) << "flush_set has no dirty|tx bhs" << dendl
;
2093 onfinish
->complete(0);
2097 // flush. non-blocking, takes callback.
2098 // returns true if already flushed
2099 bool ObjectCacher::flush_set(ObjectSet
*oset
, Context
*onfinish
)
2101 assert(lock
.is_locked());
2102 assert(onfinish
!= NULL
);
2103 if (oset
->objects
.empty()) {
2104 ldout(cct
, 10) << "flush_set on " << oset
<< " dne" << dendl
;
2105 onfinish
->complete(0);
2109 ldout(cct
, 10) << "flush_set " << oset
<< dendl
;
2111 // we'll need to wait for all objects to flush!
2112 C_GatherBuilder
gather(cct
);
2113 set
<Object
*> waitfor_commit
;
2115 list
<BufferHead
*> blist
;
2116 Object
*last_ob
= NULL
;
2117 set
<BufferHead
*, BufferHead::ptr_lt
>::const_iterator it
, p
, q
;
2119 // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
2120 // order. But items in oset->objects are not sorted. So the iterator can
2121 // point to any buffer head in the ObjectSet
2122 BufferHead
key(*oset
->objects
.begin());
2123 it
= dirty_or_tx_bh
.lower_bound(&key
);
2126 bool backwards
= true;
2127 if (it
!= dirty_or_tx_bh
.begin())
2132 for (; p
!= dirty_or_tx_bh
.end(); p
= q
) {
2134 BufferHead
*bh
= *p
;
2135 if (bh
->ob
->oset
!= oset
)
2137 waitfor_commit
.insert(bh
->ob
);
2138 if (bh
->is_dirty()) {
2139 if (scattered_write
) {
2140 if (last_ob
!= bh
->ob
) {
2141 if (!blist
.empty()) {
2142 bh_write_scattered(blist
);
2147 blist
.push_back(bh
);
2155 for(p
= q
= it
; true; p
= q
) {
2156 if (q
!= dirty_or_tx_bh
.begin())
2160 BufferHead
*bh
= *p
;
2161 if (bh
->ob
->oset
!= oset
)
2163 waitfor_commit
.insert(bh
->ob
);
2164 if (bh
->is_dirty()) {
2165 if (scattered_write
) {
2166 if (last_ob
!= bh
->ob
) {
2167 if (!blist
.empty()) {
2168 bh_write_scattered(blist
);
2173 blist
.push_front(bh
);
2183 if (scattered_write
&& !blist
.empty())
2184 bh_write_scattered(blist
);
2186 for (set
<Object
*>::iterator i
= waitfor_commit
.begin();
2187 i
!= waitfor_commit
.end(); ++i
) {
2190 // we'll need to gather...
2191 ldout(cct
, 10) << "flush_set " << oset
<< " will wait for ack tid "
2192 << ob
->last_write_tid
<< " on " << *ob
<< dendl
;
2193 ob
->waitfor_commit
[ob
->last_write_tid
].push_back(gather
.new_sub());
2196 return _flush_set_finish(&gather
, onfinish
);
2199 // flush. non-blocking, takes callback.
2200 // returns true if already flushed
2201 bool ObjectCacher::flush_set(ObjectSet
*oset
, vector
<ObjectExtent
>& exv
,
2202 ZTracer::Trace
*trace
, Context
*onfinish
)
2204 assert(lock
.is_locked());
2205 assert(trace
!= nullptr);
2206 assert(onfinish
!= NULL
);
2207 if (oset
->objects
.empty()) {
2208 ldout(cct
, 10) << "flush_set on " << oset
<< " dne" << dendl
;
2209 onfinish
->complete(0);
2213 ldout(cct
, 10) << "flush_set " << oset
<< " on " << exv
.size()
2214 << " ObjectExtents" << dendl
;
2216 // we'll need to wait for all objects to flush!
2217 C_GatherBuilder
gather(cct
);
2219 for (vector
<ObjectExtent
>::iterator p
= exv
.begin();
2222 ObjectExtent
&ex
= *p
;
2223 sobject_t
soid(ex
.oid
, CEPH_NOSNAP
);
2224 if (objects
[oset
->poolid
].count(soid
) == 0)
2226 Object
*ob
= objects
[oset
->poolid
][soid
];
2228 ldout(cct
, 20) << "flush_set " << oset
<< " ex " << ex
<< " ob " << soid
2229 << " " << ob
<< dendl
;
2231 if (!flush(ob
, ex
.offset
, ex
.length
, trace
)) {
2232 // we'll need to gather...
2233 ldout(cct
, 10) << "flush_set " << oset
<< " will wait for ack tid "
2234 << ob
->last_write_tid
<< " on " << *ob
<< dendl
;
2235 ob
->waitfor_commit
[ob
->last_write_tid
].push_back(gather
.new_sub());
2239 return _flush_set_finish(&gather
, onfinish
);
2242 // flush all dirty data. non-blocking, takes callback.
2243 // returns true if already flushed
2244 bool ObjectCacher::flush_all(Context
*onfinish
)
2246 assert(lock
.is_locked());
2247 assert(onfinish
!= NULL
);
2249 ldout(cct
, 10) << "flush_all " << dendl
;
2251 // we'll need to wait for all objects to flush!
2252 C_GatherBuilder
gather(cct
);
2253 set
<Object
*> waitfor_commit
;
2255 list
<BufferHead
*> blist
;
2256 Object
*last_ob
= NULL
;
2257 set
<BufferHead
*, BufferHead::ptr_lt
>::iterator next
, it
;
2258 next
= it
= dirty_or_tx_bh
.begin();
2259 while (it
!= dirty_or_tx_bh
.end()) {
2261 BufferHead
*bh
= *it
;
2262 waitfor_commit
.insert(bh
->ob
);
2264 if (bh
->is_dirty()) {
2265 if (scattered_write
) {
2266 if (last_ob
!= bh
->ob
) {
2267 if (!blist
.empty()) {
2268 bh_write_scattered(blist
);
2273 blist
.push_back(bh
);
2282 if (scattered_write
&& !blist
.empty())
2283 bh_write_scattered(blist
);
2285 for (set
<Object
*>::iterator i
= waitfor_commit
.begin();
2286 i
!= waitfor_commit
.end();
2290 // we'll need to gather...
2291 ldout(cct
, 10) << "flush_all will wait for ack tid "
2292 << ob
->last_write_tid
<< " on " << *ob
<< dendl
;
2293 ob
->waitfor_commit
[ob
->last_write_tid
].push_back(gather
.new_sub());
2296 return _flush_set_finish(&gather
, onfinish
);
2299 void ObjectCacher::purge_set(ObjectSet
*oset
)
2301 assert(lock
.is_locked());
2302 if (oset
->objects
.empty()) {
2303 ldout(cct
, 10) << "purge_set on " << oset
<< " dne" << dendl
;
2307 ldout(cct
, 10) << "purge_set " << oset
<< dendl
;
2308 const bool were_dirty
= oset
->dirty_or_tx
> 0;
2310 for (xlist
<Object
*>::iterator i
= oset
->objects
.begin();
2316 // Although we have purged rather than flushed, caller should still
2317 // drop any resources associate with dirty data.
2318 assert(oset
->dirty_or_tx
== 0);
2319 if (flush_set_callback
&& were_dirty
) {
2320 flush_set_callback(flush_set_callback_arg
, oset
);
2325 loff_t
ObjectCacher::release(Object
*ob
)
2327 assert(lock
.is_locked());
2328 list
<BufferHead
*> clean
;
2329 loff_t o_unclean
= 0;
2331 for (map
<loff_t
,BufferHead
*>::iterator p
= ob
->data
.begin();
2332 p
!= ob
->data
.end();
2334 BufferHead
*bh
= p
->second
;
2335 if (bh
->is_clean() || bh
->is_zero() || bh
->is_error())
2336 clean
.push_back(bh
);
2338 o_unclean
+= bh
->length();
2341 for (list
<BufferHead
*>::iterator p
= clean
.begin();
2348 if (ob
->can_close()) {
2349 ldout(cct
, 10) << "release trimming " << *ob
<< dendl
;
2351 assert(o_unclean
== 0);
2356 ldout(cct
, 10) << "release clearing complete on " << *ob
<< dendl
;
2357 ob
->complete
= false;
2360 ldout(cct
, 10) << "release setting exists on " << *ob
<< dendl
;
2367 loff_t
ObjectCacher::release_set(ObjectSet
*oset
)
2369 assert(lock
.is_locked());
2370 // return # bytes not clean (and thus not released).
2373 if (oset
->objects
.empty()) {
2374 ldout(cct
, 10) << "release_set on " << oset
<< " dne" << dendl
;
2378 ldout(cct
, 10) << "release_set " << oset
<< dendl
;
2380 xlist
<Object
*>::iterator q
;
2381 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin();
2387 loff_t o_unclean
= release(ob
);
2388 unclean
+= o_unclean
;
2391 ldout(cct
, 10) << "release_set " << oset
<< " " << *ob
2392 << " has " << o_unclean
<< " bytes left"
2398 ldout(cct
, 10) << "release_set " << oset
2399 << ", " << unclean
<< " bytes left" << dendl
;
2406 uint64_t ObjectCacher::release_all()
2408 assert(lock
.is_locked());
2409 ldout(cct
, 10) << "release_all" << dendl
;
2410 uint64_t unclean
= 0;
2412 vector
<ceph::unordered_map
<sobject_t
, Object
*> >::iterator i
2414 while (i
!= objects
.end()) {
2415 ceph::unordered_map
<sobject_t
, Object
*>::iterator p
= i
->begin();
2416 while (p
!= i
->end()) {
2417 ceph::unordered_map
<sobject_t
, Object
*>::iterator n
= p
;
2420 Object
*ob
= p
->second
;
2422 loff_t o_unclean
= release(ob
);
2423 unclean
+= o_unclean
;
2426 ldout(cct
, 10) << "release_all " << *ob
2427 << " has " << o_unclean
<< " bytes left"
2435 ldout(cct
, 10) << "release_all unclean " << unclean
<< " bytes left"
2442 void ObjectCacher::clear_nonexistence(ObjectSet
*oset
)
2444 assert(lock
.is_locked());
2445 ldout(cct
, 10) << "clear_nonexistence() " << oset
<< dendl
;
2447 for (xlist
<Object
*>::iterator p
= oset
->objects
.begin();
2451 ldout(cct
, 10) << " setting exists and complete on " << *ob
<< dendl
;
2453 ob
->complete
= false;
2455 for (xlist
<C_ReadFinish
*>::iterator q
= ob
->reads
.begin();
2457 C_ReadFinish
*comp
= *q
;
2458 comp
->distrust_enoent();
2464 * discard object extents from an ObjectSet by removing the objects in
2465 * exls from the in-memory oset.
2467 void ObjectCacher::discard_set(ObjectSet
*oset
, const vector
<ObjectExtent
>& exls
)
2469 assert(lock
.is_locked());
2470 bool was_dirty
= oset
->dirty_or_tx
> 0;
2472 _discard(oset
, exls
, nullptr);
2473 _discard_finish(oset
, was_dirty
, nullptr);
2477 * discard object extents from an ObjectSet by removing the objects in
2478 * exls from the in-memory oset. If the bh is in TX state, the discard
2479 * will wait for the write to commit prior to invoking on_finish.
2481 void ObjectCacher::discard_writeback(ObjectSet
*oset
,
2482 const vector
<ObjectExtent
>& exls
,
2485 assert(lock
.is_locked());
2486 bool was_dirty
= oset
->dirty_or_tx
> 0;
2488 C_GatherBuilder
gather(cct
);
2489 _discard(oset
, exls
, &gather
);
2491 if (gather
.has_subs()) {
2492 bool flushed
= was_dirty
&& oset
->dirty_or_tx
== 0;
2493 gather
.set_finisher(new FunctionContext(
2494 [this, oset
, flushed
, on_finish
](int) {
2495 assert(lock
.is_locked());
2496 if (flushed
&& flush_set_callback
)
2497 flush_set_callback(flush_set_callback_arg
, oset
);
2499 on_finish
->complete(0);
2505 _discard_finish(oset
, was_dirty
, on_finish
);
2508 void ObjectCacher::_discard(ObjectSet
*oset
, const vector
<ObjectExtent
>& exls
,
2509 C_GatherBuilder
* gather
)
2511 if (oset
->objects
.empty()) {
2512 ldout(cct
, 10) << __func__
<< " on " << oset
<< " dne" << dendl
;
2516 ldout(cct
, 10) << __func__
<< " " << oset
<< dendl
;
2518 for (auto& ex
: exls
) {
2519 ldout(cct
, 10) << __func__
<< " " << oset
<< " ex " << ex
<< dendl
;
2520 sobject_t
soid(ex
.oid
, CEPH_NOSNAP
);
2521 if (objects
[oset
->poolid
].count(soid
) == 0)
2523 Object
*ob
= objects
[oset
->poolid
][soid
];
2525 ob
->discard(ex
.offset
, ex
.length
, gather
);
2529 void ObjectCacher::_discard_finish(ObjectSet
*oset
, bool was_dirty
,
2532 assert(lock
.is_locked());
2534 // did we truncate off dirty data?
2535 if (flush_set_callback
&& was_dirty
&& oset
->dirty_or_tx
== 0) {
2536 flush_set_callback(flush_set_callback_arg
, oset
);
2539 // notify that in-flight writeback has completed
2540 if (on_finish
!= nullptr) {
2541 on_finish
->complete(0);
2545 void ObjectCacher::verify_stats() const
2547 assert(lock
.is_locked());
2548 ldout(cct
, 10) << "verify_stats" << dendl
;
2550 loff_t clean
= 0, zero
= 0, dirty
= 0, rx
= 0, tx
= 0, missing
= 0,
2552 for (vector
<ceph::unordered_map
<sobject_t
, Object
*> >::const_iterator i
2556 for (ceph::unordered_map
<sobject_t
, Object
*>::const_iterator p
2560 Object
*ob
= p
->second
;
2561 for (map
<loff_t
, BufferHead
*>::const_iterator q
= ob
->data
.begin();
2562 q
!= ob
->data
.end();
2564 BufferHead
*bh
= q
->second
;
2565 switch (bh
->get_state()) {
2566 case BufferHead::STATE_MISSING
:
2567 missing
+= bh
->length();
2569 case BufferHead::STATE_CLEAN
:
2570 clean
+= bh
->length();
2572 case BufferHead::STATE_ZERO
:
2573 zero
+= bh
->length();
2575 case BufferHead::STATE_DIRTY
:
2576 dirty
+= bh
->length();
2578 case BufferHead::STATE_TX
:
2581 case BufferHead::STATE_RX
:
2584 case BufferHead::STATE_ERROR
:
2585 error
+= bh
->length();
2594 ldout(cct
, 10) << " clean " << clean
<< " rx " << rx
<< " tx " << tx
2595 << " dirty " << dirty
<< " missing " << missing
2596 << " error " << error
<< dendl
;
2597 assert(clean
== stat_clean
);
2598 assert(rx
== stat_rx
);
2599 assert(tx
== stat_tx
);
2600 assert(dirty
== stat_dirty
);
2601 assert(missing
== stat_missing
);
2602 assert(zero
== stat_zero
);
2603 assert(error
== stat_error
);
2606 void ObjectCacher::bh_stat_add(BufferHead
*bh
)
2608 assert(lock
.is_locked());
2609 switch (bh
->get_state()) {
2610 case BufferHead::STATE_MISSING
:
2611 stat_missing
+= bh
->length();
2613 case BufferHead::STATE_CLEAN
:
2614 stat_clean
+= bh
->length();
2616 case BufferHead::STATE_ZERO
:
2617 stat_zero
+= bh
->length();
2619 case BufferHead::STATE_DIRTY
:
2620 stat_dirty
+= bh
->length();
2621 bh
->ob
->dirty_or_tx
+= bh
->length();
2622 bh
->ob
->oset
->dirty_or_tx
+= bh
->length();
2624 case BufferHead::STATE_TX
:
2625 stat_tx
+= bh
->length();
2626 bh
->ob
->dirty_or_tx
+= bh
->length();
2627 bh
->ob
->oset
->dirty_or_tx
+= bh
->length();
2629 case BufferHead::STATE_RX
:
2630 stat_rx
+= bh
->length();
2632 case BufferHead::STATE_ERROR
:
2633 stat_error
+= bh
->length();
2636 assert(0 == "bh_stat_add: invalid bufferhead state");
2638 if (get_stat_dirty_waiting() > 0)
2642 void ObjectCacher::bh_stat_sub(BufferHead
*bh
)
2644 assert(lock
.is_locked());
2645 switch (bh
->get_state()) {
2646 case BufferHead::STATE_MISSING
:
2647 stat_missing
-= bh
->length();
2649 case BufferHead::STATE_CLEAN
:
2650 stat_clean
-= bh
->length();
2652 case BufferHead::STATE_ZERO
:
2653 stat_zero
-= bh
->length();
2655 case BufferHead::STATE_DIRTY
:
2656 stat_dirty
-= bh
->length();
2657 bh
->ob
->dirty_or_tx
-= bh
->length();
2658 bh
->ob
->oset
->dirty_or_tx
-= bh
->length();
2660 case BufferHead::STATE_TX
:
2661 stat_tx
-= bh
->length();
2662 bh
->ob
->dirty_or_tx
-= bh
->length();
2663 bh
->ob
->oset
->dirty_or_tx
-= bh
->length();
2665 case BufferHead::STATE_RX
:
2666 stat_rx
-= bh
->length();
2668 case BufferHead::STATE_ERROR
:
2669 stat_error
-= bh
->length();
2672 assert(0 == "bh_stat_sub: invalid bufferhead state");
2676 void ObjectCacher::bh_set_state(BufferHead
*bh
, int s
)
2678 assert(lock
.is_locked());
2679 int state
= bh
->get_state();
2680 // move between lru lists?
2681 if (s
== BufferHead::STATE_DIRTY
&& state
!= BufferHead::STATE_DIRTY
) {
2682 bh_lru_rest
.lru_remove(bh
);
2683 bh_lru_dirty
.lru_insert_top(bh
);
2684 } else if (s
!= BufferHead::STATE_DIRTY
&&state
== BufferHead::STATE_DIRTY
) {
2685 bh_lru_dirty
.lru_remove(bh
);
2686 if (bh
->get_dontneed())
2687 bh_lru_rest
.lru_insert_bot(bh
);
2689 bh_lru_rest
.lru_insert_top(bh
);
2692 if ((s
== BufferHead::STATE_TX
||
2693 s
== BufferHead::STATE_DIRTY
) &&
2694 state
!= BufferHead::STATE_TX
&&
2695 state
!= BufferHead::STATE_DIRTY
) {
2696 dirty_or_tx_bh
.insert(bh
);
2697 } else if ((state
== BufferHead::STATE_TX
||
2698 state
== BufferHead::STATE_DIRTY
) &&
2699 s
!= BufferHead::STATE_TX
&&
2700 s
!= BufferHead::STATE_DIRTY
) {
2701 dirty_or_tx_bh
.erase(bh
);
2704 if (s
!= BufferHead::STATE_ERROR
&&
2705 state
== BufferHead::STATE_ERROR
) {
2715 void ObjectCacher::bh_add(Object
*ob
, BufferHead
*bh
)
2717 assert(lock
.is_locked());
2718 ldout(cct
, 30) << "bh_add " << *ob
<< " " << *bh
<< dendl
;
2720 if (bh
->is_dirty()) {
2721 bh_lru_dirty
.lru_insert_top(bh
);
2722 dirty_or_tx_bh
.insert(bh
);
2724 if (bh
->get_dontneed())
2725 bh_lru_rest
.lru_insert_bot(bh
);
2727 bh_lru_rest
.lru_insert_top(bh
);
2731 dirty_or_tx_bh
.insert(bh
);
2736 void ObjectCacher::bh_remove(Object
*ob
, BufferHead
*bh
)
2738 assert(lock
.is_locked());
2739 assert(bh
->get_journal_tid() == 0);
2740 ldout(cct
, 30) << "bh_remove " << *ob
<< " " << *bh
<< dendl
;
2742 if (bh
->is_dirty()) {
2743 bh_lru_dirty
.lru_remove(bh
);
2744 dirty_or_tx_bh
.erase(bh
);
2746 bh_lru_rest
.lru_remove(bh
);
2750 dirty_or_tx_bh
.erase(bh
);
2753 if (get_stat_dirty_waiting() > 0)