]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/ObjectCacher.cc
update sources to 12.2.2
[ceph.git] / ceph / src / osdc / ObjectCacher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <limits.h>
5
6#include "msg/Messenger.h"
7#include "ObjectCacher.h"
8#include "WritebackHandler.h"
9#include "common/errno.h"
10#include "common/perf_counters.h"
11
12#include "include/assert.h"
13
14#define MAX_FLUSH_UNDER_LOCK 20 ///< max bh's we start writeback on
3efd9988 15#define BUFFER_MEMORY_WEIGHT 12 // memory usage of BufferHead, count in (1<<n)
7c673cae
FG
16
17using std::chrono::seconds;
18 /// while holding the lock
19
20/*** ObjectCacher::BufferHead ***/
21
22
23/*** ObjectCacher::Object ***/
24
25#define dout_subsys ceph_subsys_objectcacher
26#undef dout_prefix
27#define dout_prefix *_dout << "objectcacher.object(" << oid << ") "
28
29
30
31class ObjectCacher::C_ReadFinish : public Context {
32 ObjectCacher *oc;
33 int64_t poolid;
34 sobject_t oid;
35 loff_t start;
36 uint64_t length;
37 xlist<C_ReadFinish*>::item set_item;
38 bool trust_enoent;
39 ceph_tid_t tid;
31f18b77 40 ZTracer::Trace trace;
7c673cae
FG
41
42public:
43 bufferlist bl;
44 C_ReadFinish(ObjectCacher *c, Object *ob, ceph_tid_t t, loff_t s,
31f18b77 45 uint64_t l, const ZTracer::Trace &trace) :
7c673cae
FG
46 oc(c), poolid(ob->oloc.pool), oid(ob->get_soid()), start(s), length(l),
47 set_item(this), trust_enoent(true),
31f18b77 48 tid(t), trace(trace) {
7c673cae
FG
49 ob->reads.push_back(&set_item);
50 }
51
52 void finish(int r) override {
53 oc->bh_read_finish(poolid, oid, tid, start, length, bl, r, trust_enoent);
31f18b77 54 trace.event("finish");
7c673cae
FG
55
56 // object destructor clears the list
57 if (set_item.is_on_list())
58 set_item.remove_myself();
59 }
60
61 void distrust_enoent() {
62 trust_enoent = false;
63 }
64};
65
66class ObjectCacher::C_RetryRead : public Context {
67 ObjectCacher *oc;
68 OSDRead *rd;
69 ObjectSet *oset;
70 Context *onfinish;
31f18b77 71 ZTracer::Trace trace;
7c673cae 72public:
31f18b77
FG
73 C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c,
74 const ZTracer::Trace &trace)
75 : oc(_oc), rd(r), oset(os), onfinish(c), trace(trace) {
76 }
7c673cae 77 void finish(int r) override {
31f18b77
FG
78 if (r >= 0) {
79 r = oc->_readx(rd, oset, onfinish, false, &trace);
80 }
81
82 if (r == 0) {
83 // read is still in-progress
7c673cae
FG
84 return;
85 }
31f18b77
FG
86
87 trace.event("finish");
88 if (onfinish) {
89 onfinish->complete(r);
7c673cae
FG
90 }
91 }
92};
93
94ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left,
95 loff_t off)
96{
97 assert(oc->lock.is_locked());
98 ldout(oc->cct, 20) << "split " << *left << " at " << off << dendl;
99
100 // split off right
101 ObjectCacher::BufferHead *right = new BufferHead(this);
102
103 //inherit and if later access, this auto clean.
104 right->set_dontneed(left->get_dontneed());
105 right->set_nocache(left->get_nocache());
106
107 right->last_write_tid = left->last_write_tid;
108 right->last_read_tid = left->last_read_tid;
109 right->set_state(left->get_state());
110 right->snapc = left->snapc;
111 right->set_journal_tid(left->journal_tid);
112
113 loff_t newleftlen = off - left->start();
114 right->set_start(off);
115 right->set_length(left->length() - newleftlen);
116
117 // shorten left
118 oc->bh_stat_sub(left);
119 left->set_length(newleftlen);
120 oc->bh_stat_add(left);
121
122 // add right
123 oc->bh_add(this, right);
124
125 // split buffers too
126 bufferlist bl;
127 bl.claim(left->bl);
128 if (bl.length()) {
129 assert(bl.length() == (left->length() + right->length()));
130 right->bl.substr_of(bl, left->length(), right->length());
131 left->bl.substr_of(bl, 0, left->length());
132 }
133
134 // move read waiters
135 if (!left->waitfor_read.empty()) {
136 map<loff_t, list<Context*> >::iterator start_remove
137 = left->waitfor_read.begin();
138 while (start_remove != left->waitfor_read.end() &&
139 start_remove->first < right->start())
140 ++start_remove;
141 for (map<loff_t, list<Context*> >::iterator p = start_remove;
142 p != left->waitfor_read.end(); ++p) {
143 ldout(oc->cct, 20) << "split moving waiters at byte " << p->first
144 << " to right bh" << dendl;
145 right->waitfor_read[p->first].swap( p->second );
146 assert(p->second.empty());
147 }
148 left->waitfor_read.erase(start_remove, left->waitfor_read.end());
149 }
150
151 ldout(oc->cct, 20) << "split left is " << *left << dendl;
152 ldout(oc->cct, 20) << "split right is " << *right << dendl;
153 return right;
154}
155
156
157void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
158{
159 assert(oc->lock.is_locked());
160 assert(left->end() == right->start());
161 assert(left->get_state() == right->get_state());
162 assert(left->can_merge_journal(right));
163
164 ldout(oc->cct, 10) << "merge_left " << *left << " + " << *right << dendl;
165 if (left->get_journal_tid() == 0) {
166 left->set_journal_tid(right->get_journal_tid());
167 }
168 right->set_journal_tid(0);
169
170 oc->bh_remove(this, right);
171 oc->bh_stat_sub(left);
172 left->set_length(left->length() + right->length());
173 oc->bh_stat_add(left);
174
175 // data
176 left->bl.claim_append(right->bl);
177
178 // version
179 // note: this is sorta busted, but should only be used for dirty buffers
180 left->last_write_tid = MAX( left->last_write_tid, right->last_write_tid );
181 left->last_write = MAX( left->last_write, right->last_write );
182
183 left->set_dontneed(right->get_dontneed() ? left->get_dontneed() : false);
184 left->set_nocache(right->get_nocache() ? left->get_nocache() : false);
185
186 // waiters
187 for (map<loff_t, list<Context*> >::iterator p = right->waitfor_read.begin();
188 p != right->waitfor_read.end();
189 ++p)
190 left->waitfor_read[p->first].splice(left->waitfor_read[p->first].begin(),
191 p->second );
192
193 // hose right
194 delete right;
195
196 ldout(oc->cct, 10) << "merge_left result " << *left << dendl;
197}
198
199void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
200{
201 assert(oc->lock.is_locked());
202 ldout(oc->cct, 10) << "try_merge_bh " << *bh << dendl;
203
204 // do not merge rx buffers; last_read_tid may not match
205 if (bh->is_rx())
206 return;
207
208 // to the left?
209 map<loff_t,BufferHead*>::iterator p = data.find(bh->start());
210 assert(p->second == bh);
211 if (p != data.begin()) {
212 --p;
213 if (p->second->end() == bh->start() &&
214 p->second->get_state() == bh->get_state() &&
215 p->second->can_merge_journal(bh)) {
216 merge_left(p->second, bh);
217 bh = p->second;
218 } else {
219 ++p;
220 }
221 }
222 // to the right?
223 assert(p->second == bh);
224 ++p;
225 if (p != data.end() &&
226 p->second->start() == bh->end() &&
227 p->second->get_state() == bh->get_state() &&
228 p->second->can_merge_journal(bh))
229 merge_left(bh, p->second);
230}
231
232/*
233 * count bytes we have cached in given range
234 */
235bool ObjectCacher::Object::is_cached(loff_t cur, loff_t left) const
236{
237 assert(oc->lock.is_locked());
238 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(cur);
239 while (left > 0) {
240 if (p == data.end())
241 return false;
242
243 if (p->first <= cur) {
244 // have part of it
245 loff_t lenfromcur = MIN(p->second->end() - cur, left);
246 cur += lenfromcur;
247 left -= lenfromcur;
248 ++p;
249 continue;
250 } else if (p->first > cur) {
251 // gap
252 return false;
253 } else
254 ceph_abort();
255 }
256
257 return true;
258}
259
260/*
261 * all cached data in this range[off, off+len]
262 */
263bool ObjectCacher::Object::include_all_cached_data(loff_t off, loff_t len)
264{
265 assert(oc->lock.is_locked());
266 if (data.empty())
267 return true;
268 map<loff_t, BufferHead*>::iterator first = data.begin();
269 map<loff_t, BufferHead*>::reverse_iterator last = data.rbegin();
270 if (first->second->start() >= off && last->second->end() <= (off + len))
271 return true;
272 else
273 return false;
274}
275
276/*
277 * map a range of bytes into buffer_heads.
278 * - create missing buffer_heads as necessary.
279 */
280int ObjectCacher::Object::map_read(ObjectExtent &ex,
281 map<loff_t, BufferHead*>& hits,
282 map<loff_t, BufferHead*>& missing,
283 map<loff_t, BufferHead*>& rx,
284 map<loff_t, BufferHead*>& errors)
285{
286 assert(oc->lock.is_locked());
31f18b77
FG
287 ldout(oc->cct, 10) << "map_read " << ex.oid << " "
288 << ex.offset << "~" << ex.length << dendl;
289
7c673cae
FG
290 loff_t cur = ex.offset;
291 loff_t left = ex.length;
292
293 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
294 while (left > 0) {
295 // at end?
296 if (p == data.end()) {
297 // rest is a miss.
298 BufferHead *n = new BufferHead(this);
299 n->set_start(cur);
300 n->set_length(left);
301 oc->bh_add(this, n);
302 if (complete) {
303 oc->mark_zero(n);
304 hits[cur] = n;
305 ldout(oc->cct, 20) << "map_read miss+complete+zero " << left << " left, " << *n << dendl;
306 } else {
307 missing[cur] = n;
308 ldout(oc->cct, 20) << "map_read miss " << left << " left, " << *n << dendl;
309 }
310 cur += left;
311 assert(cur == (loff_t)ex.offset + (loff_t)ex.length);
312 break; // no more.
313 }
31f18b77 314
7c673cae
FG
315 if (p->first <= cur) {
316 // have it (or part of it)
317 BufferHead *e = p->second;
318
319 if (e->is_clean() ||
320 e->is_dirty() ||
321 e->is_tx() ||
322 e->is_zero()) {
323 hits[cur] = e; // readable!
324 ldout(oc->cct, 20) << "map_read hit " << *e << dendl;
325 } else if (e->is_rx()) {
326 rx[cur] = e; // missing, not readable.
327 ldout(oc->cct, 20) << "map_read rx " << *e << dendl;
328 } else if (e->is_error()) {
329 errors[cur] = e;
330 ldout(oc->cct, 20) << "map_read error " << *e << dendl;
331 } else {
332 ceph_abort();
333 }
31f18b77 334
7c673cae
FG
335 loff_t lenfromcur = MIN(e->end() - cur, left);
336 cur += lenfromcur;
337 left -= lenfromcur;
338 ++p;
339 continue; // more?
31f18b77 340
7c673cae
FG
341 } else if (p->first > cur) {
342 // gap.. miss
343 loff_t next = p->first;
344 BufferHead *n = new BufferHead(this);
345 loff_t len = MIN(next - cur, left);
346 n->set_start(cur);
347 n->set_length(len);
348 oc->bh_add(this,n);
349 if (complete) {
350 oc->mark_zero(n);
351 hits[cur] = n;
352 ldout(oc->cct, 20) << "map_read gap+complete+zero " << *n << dendl;
353 } else {
354 missing[cur] = n;
355 ldout(oc->cct, 20) << "map_read gap " << *n << dendl;
356 }
357 cur += MIN(left, n->length());
358 left -= MIN(left, n->length());
359 continue; // more?
360 } else {
361 ceph_abort();
362 }
363 }
364 return 0;
365}
366
367void ObjectCacher::Object::audit_buffers()
368{
369 loff_t offset = 0;
370 for (map<loff_t, BufferHead*>::const_iterator it = data.begin();
371 it != data.end(); ++it) {
372 if (it->first != it->second->start()) {
373 lderr(oc->cct) << "AUDIT FAILURE: map position " << it->first
374 << " does not match bh start position: "
375 << *it->second << dendl;
376 assert(it->first == it->second->start());
377 }
378 if (it->first < offset) {
379 lderr(oc->cct) << "AUDIT FAILURE: " << it->first << " " << *it->second
380 << " overlaps with previous bh " << *((--it)->second)
381 << dendl;
382 assert(it->first >= offset);
383 }
384 BufferHead *bh = it->second;
385 map<loff_t, list<Context*> >::const_iterator w_it;
386 for (w_it = bh->waitfor_read.begin();
387 w_it != bh->waitfor_read.end(); ++w_it) {
388 if (w_it->first < bh->start() ||
389 w_it->first >= bh->start() + bh->length()) {
390 lderr(oc->cct) << "AUDIT FAILURE: waiter at " << w_it->first
391 << " is not within bh " << *bh << dendl;
392 assert(w_it->first >= bh->start());
393 assert(w_it->first < bh->start() + bh->length());
394 }
395 }
396 offset = it->first + it->second->length();
397 }
398}
399
400/*
401 * map a range of extents on an object's buffer cache.
402 * - combine any bh's we're writing into one
403 * - break up bufferheads that don't fall completely within the range
404 * //no! - return a bh that includes the write. may also include
405 * other dirty data to left and/or right.
406 */
407ObjectCacher::BufferHead *ObjectCacher::Object::map_write(ObjectExtent &ex,
31f18b77 408 ceph_tid_t tid)
7c673cae
FG
409{
410 assert(oc->lock.is_locked());
411 BufferHead *final = 0;
412
413 ldout(oc->cct, 10) << "map_write oex " << ex.oid
414 << " " << ex.offset << "~" << ex.length << dendl;
415
416 loff_t cur = ex.offset;
417 loff_t left = ex.length;
418
419 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
420 while (left > 0) {
421 loff_t max = left;
422
423 // at end ?
424 if (p == data.end()) {
425 if (final == NULL) {
426 final = new BufferHead(this);
427 replace_journal_tid(final, tid);
428 final->set_start( cur );
429 final->set_length( max );
430 oc->bh_add(this, final);
431 ldout(oc->cct, 10) << "map_write adding trailing bh " << *final << dendl;
432 } else {
433 oc->bh_stat_sub(final);
434 final->set_length(final->length() + max);
435 oc->bh_stat_add(final);
436 }
437 left -= max;
438 cur += max;
439 continue;
440 }
441
442 ldout(oc->cct, 10) << "cur is " << cur << ", p is " << *p->second << dendl;
443 //oc->verify_stats();
444
445 if (p->first <= cur) {
446 BufferHead *bh = p->second;
447 ldout(oc->cct, 10) << "map_write bh " << *bh << " intersected" << dendl;
448
449 if (p->first < cur) {
450 assert(final == 0);
451 if (cur + max >= bh->end()) {
452 // we want right bit (one splice)
453 final = split(bh, cur); // just split it, take right half.
454 replace_journal_tid(final, tid);
455 ++p;
456 assert(p->second == final);
457 } else {
458 // we want middle bit (two splices)
459 final = split(bh, cur);
460 ++p;
461 assert(p->second == final);
462 split(final, cur+max);
463 replace_journal_tid(final, tid);
464 }
465 } else {
466 assert(p->first == cur);
467 if (bh->length() <= max) {
468 // whole bufferhead, piece of cake.
469 } else {
470 // we want left bit (one splice)
471 split(bh, cur + max); // just split
472 }
473 if (final) {
474 oc->mark_dirty(bh);
475 oc->mark_dirty(final);
476 --p; // move iterator back to final
477 assert(p->second == final);
478 replace_journal_tid(bh, tid);
479 merge_left(final, bh);
480 } else {
481 final = bh;
482 replace_journal_tid(final, tid);
483 }
484 }
485
486 // keep going.
487 loff_t lenfromcur = final->end() - cur;
488 cur += lenfromcur;
489 left -= lenfromcur;
490 ++p;
491 continue;
492 } else {
493 // gap!
494 loff_t next = p->first;
495 loff_t glen = MIN(next - cur, max);
496 ldout(oc->cct, 10) << "map_write gap " << cur << "~" << glen << dendl;
497 if (final) {
498 oc->bh_stat_sub(final);
499 final->set_length(final->length() + glen);
500 oc->bh_stat_add(final);
501 } else {
502 final = new BufferHead(this);
503 replace_journal_tid(final, tid);
504 final->set_start( cur );
505 final->set_length( glen );
506 oc->bh_add(this, final);
507 }
508
509 cur += glen;
510 left -= glen;
511 continue; // more?
512 }
513 }
514
515 // set version
516 assert(final);
517 assert(final->get_journal_tid() == tid);
518 ldout(oc->cct, 10) << "map_write final is " << *final << dendl;
519
520 return final;
521}
522
523void ObjectCacher::Object::replace_journal_tid(BufferHead *bh,
524 ceph_tid_t tid) {
525 ceph_tid_t bh_tid = bh->get_journal_tid();
526
527 assert(tid == 0 || bh_tid <= tid);
528 if (bh_tid != 0 && bh_tid != tid) {
529 // inform journal that it should not expect a writeback from this extent
530 oc->writeback_handler.overwrite_extent(get_oid(), bh->start(),
531 bh->length(), bh_tid, tid);
532 }
533 bh->set_journal_tid(tid);
534}
535
536void ObjectCacher::Object::truncate(loff_t s)
537{
538 assert(oc->lock.is_locked());
539 ldout(oc->cct, 10) << "truncate " << *this << " to " << s << dendl;
540
541 while (!data.empty()) {
542 BufferHead *bh = data.rbegin()->second;
543 if (bh->end() <= s)
544 break;
545
546 // split bh at truncation point?
547 if (bh->start() < s) {
548 split(bh, s);
549 continue;
550 }
551
552 // remove bh entirely
553 assert(bh->start() >= s);
554 assert(bh->waitfor_read.empty());
555 replace_journal_tid(bh, 0);
556 oc->bh_remove(this, bh);
557 delete bh;
558 }
559}
560
561void ObjectCacher::Object::discard(loff_t off, loff_t len)
562{
563 assert(oc->lock.is_locked());
564 ldout(oc->cct, 10) << "discard " << *this << " " << off << "~" << len
565 << dendl;
566
567 if (!exists) {
568 ldout(oc->cct, 10) << " setting exists on " << *this << dendl;
569 exists = true;
570 }
571 if (complete) {
572 ldout(oc->cct, 10) << " clearing complete on " << *this << dendl;
573 complete = false;
574 }
575
576 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(off);
577 while (p != data.end()) {
578 BufferHead *bh = p->second;
579 if (bh->start() >= off + len)
580 break;
581
582 // split bh at truncation point?
583 if (bh->start() < off) {
584 split(bh, off);
585 ++p;
586 continue;
587 }
588
589 assert(bh->start() >= off);
590 if (bh->end() > off + len) {
591 split(bh, off + len);
592 }
593
594 ++p;
595 ldout(oc->cct, 10) << "discard " << *this << " bh " << *bh << dendl;
596 assert(bh->waitfor_read.empty());
597 replace_journal_tid(bh, 0);
598 oc->bh_remove(this, bh);
599 delete bh;
600 }
601}
602
603
604
605/*** ObjectCacher ***/
606
607#undef dout_prefix
608#define dout_prefix *_dout << "objectcacher "
609
610
611ObjectCacher::ObjectCacher(CephContext *cct_, string name,
612 WritebackHandler& wb, Mutex& l,
613 flush_set_callback_t flush_callback,
614 void *flush_callback_arg, uint64_t max_bytes,
615 uint64_t max_objects, uint64_t max_dirty,
616 uint64_t target_dirty, double max_dirty_age,
617 bool block_writes_upfront)
618 : perfcounter(NULL),
619 cct(cct_), writeback_handler(wb), name(name), lock(l),
620 max_dirty(max_dirty), target_dirty(target_dirty),
621 max_size(max_bytes), max_objects(max_objects),
622 max_dirty_age(ceph::make_timespan(max_dirty_age)),
623 block_writes_upfront(block_writes_upfront),
31f18b77 624 trace_endpoint("ObjectCacher"),
7c673cae
FG
625 flush_set_callback(flush_callback),
626 flush_set_callback_arg(flush_callback_arg),
627 last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct),
628 stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
3efd9988
FG
629 stat_missing(0), stat_error(0), stat_dirty_waiting(0),
630 stat_nr_dirty_waiters(0), reads_outstanding(0)
7c673cae
FG
631{
632 perf_start();
633 finisher.start();
634 scattered_write = writeback_handler.can_scattered_write();
635}
636
637ObjectCacher::~ObjectCacher()
638{
639 finisher.stop();
640 perf_stop();
641 // we should be empty.
642 for (vector<ceph::unordered_map<sobject_t, Object *> >::iterator i
643 = objects.begin();
644 i != objects.end();
645 ++i)
646 assert(i->empty());
647 assert(bh_lru_rest.lru_get_size() == 0);
648 assert(bh_lru_dirty.lru_get_size() == 0);
649 assert(ob_lru.lru_get_size() == 0);
650 assert(dirty_or_tx_bh.empty());
651}
652
653void ObjectCacher::perf_start()
654{
655 string n = "objectcacher-" + name;
656 PerfCountersBuilder plb(cct, n, l_objectcacher_first, l_objectcacher_last);
657
658 plb.add_u64_counter(l_objectcacher_cache_ops_hit,
659 "cache_ops_hit", "Hit operations");
660 plb.add_u64_counter(l_objectcacher_cache_ops_miss,
661 "cache_ops_miss", "Miss operations");
662 plb.add_u64_counter(l_objectcacher_cache_bytes_hit,
663 "cache_bytes_hit", "Hit data");
664 plb.add_u64_counter(l_objectcacher_cache_bytes_miss,
665 "cache_bytes_miss", "Miss data");
666 plb.add_u64_counter(l_objectcacher_data_read,
667 "data_read", "Read data");
668 plb.add_u64_counter(l_objectcacher_data_written,
669 "data_written", "Data written to cache");
670 plb.add_u64_counter(l_objectcacher_data_flushed,
671 "data_flushed", "Data flushed");
672 plb.add_u64_counter(l_objectcacher_overwritten_in_flush,
673 "data_overwritten_while_flushing",
674 "Data overwritten while flushing");
675 plb.add_u64_counter(l_objectcacher_write_ops_blocked, "write_ops_blocked",
676 "Write operations, delayed due to dirty limits");
677 plb.add_u64_counter(l_objectcacher_write_bytes_blocked,
678 "write_bytes_blocked",
679 "Write data blocked on dirty limit");
680 plb.add_time(l_objectcacher_write_time_blocked, "write_time_blocked",
681 "Time spent blocking a write due to dirty limits");
682
683 perfcounter = plb.create_perf_counters();
684 cct->get_perfcounters_collection()->add(perfcounter);
685}
686
687void ObjectCacher::perf_stop()
688{
689 assert(perfcounter);
690 cct->get_perfcounters_collection()->remove(perfcounter);
691 delete perfcounter;
692}
693
694/* private */
695ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid,
696 uint64_t object_no,
697 ObjectSet *oset,
698 object_locator_t &l,
699 uint64_t truncate_size,
700 uint64_t truncate_seq)
701{
702 // XXX: Add handling of nspace in object_locator_t in cache
703 assert(lock.is_locked());
704 // have it?
705 if ((uint32_t)l.pool < objects.size()) {
706 if (objects[l.pool].count(oid)) {
707 Object *o = objects[l.pool][oid];
708 o->object_no = object_no;
709 o->truncate_size = truncate_size;
710 o->truncate_seq = truncate_seq;
711 return o;
712 }
713 } else {
714 objects.resize(l.pool+1);
715 }
716
717 // create it.
718 Object *o = new Object(this, oid, object_no, oset, l, truncate_size,
719 truncate_seq);
720 objects[l.pool][oid] = o;
721 ob_lru.lru_insert_top(o);
722 return o;
723}
724
725void ObjectCacher::close_object(Object *ob)
726{
727 assert(lock.is_locked());
728 ldout(cct, 10) << "close_object " << *ob << dendl;
729 assert(ob->can_close());
730
731 // ok!
732 ob_lru.lru_remove(ob);
733 objects[ob->oloc.pool].erase(ob->get_soid());
734 ob->set_item.remove_myself();
735 delete ob;
736}
737
31f18b77
FG
738void ObjectCacher::bh_read(BufferHead *bh, int op_flags,
739 const ZTracer::Trace &parent_trace)
7c673cae
FG
740{
741 assert(lock.is_locked());
742 ldout(cct, 7) << "bh_read on " << *bh << " outstanding reads "
743 << reads_outstanding << dendl;
744
31f18b77
FG
745 ZTracer::Trace trace;
746 if (parent_trace.valid()) {
747 trace.init("", &trace_endpoint, &parent_trace);
748 trace.copy_name("bh_read " + bh->ob->get_oid().name);
749 trace.event("start");
750 }
751
7c673cae
FG
752 mark_rx(bh);
753 bh->last_read_tid = ++last_read_tid;
754
755 // finisher
756 C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob, bh->last_read_tid,
31f18b77 757 bh->start(), bh->length(), trace);
7c673cae
FG
758 // go
759 writeback_handler.read(bh->ob->get_oid(), bh->ob->get_object_number(),
760 bh->ob->get_oloc(), bh->start(), bh->length(),
761 bh->ob->get_snap(), &onfinish->bl,
762 bh->ob->truncate_size, bh->ob->truncate_seq,
31f18b77 763 op_flags, trace, onfinish);
7c673cae
FG
764
765 ++reads_outstanding;
766}
767
768void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid,
769 ceph_tid_t tid, loff_t start,
770 uint64_t length, bufferlist &bl, int r,
771 bool trust_enoent)
772{
773 assert(lock.is_locked());
774 ldout(cct, 7) << "bh_read_finish "
775 << oid
776 << " tid " << tid
777 << " " << start << "~" << length
778 << " (bl is " << bl.length() << ")"
779 << " returned " << r
780 << " outstanding reads " << reads_outstanding
781 << dendl;
782
783 if (r >= 0 && bl.length() < length) {
784 ldout(cct, 7) << "bh_read_finish " << oid << " padding " << start << "~"
785 << length << " with " << length - bl.length() << " bytes of zeroes"
786 << dendl;
787 bl.append_zero(length - bl.length());
788 }
789
790 list<Context*> ls;
791 int err = 0;
792
793 if (objects[poolid].count(oid) == 0) {
794 ldout(cct, 7) << "bh_read_finish no object cache" << dendl;
795 } else {
796 Object *ob = objects[poolid][oid];
797
798 if (r == -ENOENT && !ob->complete) {
799 // wake up *all* rx waiters, or else we risk reordering
800 // identical reads. e.g.
801 // read 1~1
802 // reply to unrelated 3~1 -> !exists
803 // read 1~1 -> immediate ENOENT
804 // reply to first 1~1 -> ooo ENOENT
805 bool allzero = true;
806 for (map<loff_t, BufferHead*>::iterator p = ob->data.begin();
807 p != ob->data.end(); ++p) {
808 BufferHead *bh = p->second;
809 for (map<loff_t, list<Context*> >::iterator p
810 = bh->waitfor_read.begin();
811 p != bh->waitfor_read.end();
812 ++p)
813 ls.splice(ls.end(), p->second);
814 bh->waitfor_read.clear();
815 if (!bh->is_zero() && !bh->is_rx())
816 allzero = false;
817 }
818
819 // just pass through and retry all waiters if we don't trust
820 // -ENOENT for this read
821 if (trust_enoent) {
822 ldout(cct, 7)
823 << "bh_read_finish ENOENT, marking complete and !exists on " << *ob
824 << dendl;
825 ob->complete = true;
826 ob->exists = false;
827
828 /* If all the bhs are effectively zero, get rid of them. All
829 * the waiters will be retried and get -ENOENT immediately, so
830 * it's safe to clean up the unneeded bh's now. Since we know
831 * it's safe to remove them now, do so, so they aren't hanging
832 *around waiting for more -ENOENTs from rados while the cache
833 * is being shut down.
834 *
835 * Only do this when all the bhs are rx or clean, to match the
836 * condition in _readx(). If there are any non-rx or non-clean
837 * bhs, _readx() will wait for the final result instead of
838 * returning -ENOENT immediately.
839 */
840 if (allzero) {
841 ldout(cct, 10)
842 << "bh_read_finish ENOENT and allzero, getting rid of "
843 << "bhs for " << *ob << dendl;
844 map<loff_t, BufferHead*>::iterator p = ob->data.begin();
845 while (p != ob->data.end()) {
846 BufferHead *bh = p->second;
847 // current iterator will be invalidated by bh_remove()
848 ++p;
849 bh_remove(ob, bh);
850 delete bh;
851 }
852 }
853 }
854 }
855
856 // apply to bh's!
857 loff_t opos = start;
858 while (true) {
859 map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(opos);
860 if (p == ob->data.end())
861 break;
862 if (opos >= start+(loff_t)length) {
863 ldout(cct, 20) << "break due to opos " << opos << " >= start+length "
864 << start << "+" << length << "=" << start+(loff_t)length
865 << dendl;
866 break;
867 }
868
869 BufferHead *bh = p->second;
870 ldout(cct, 20) << "checking bh " << *bh << dendl;
871
872 // finishers?
873 for (map<loff_t, list<Context*> >::iterator it
874 = bh->waitfor_read.begin();
875 it != bh->waitfor_read.end();
876 ++it)
877 ls.splice(ls.end(), it->second);
878 bh->waitfor_read.clear();
879
880 if (bh->start() > opos) {
881 ldout(cct, 1) << "bh_read_finish skipping gap "
882 << opos << "~" << bh->start() - opos
883 << dendl;
884 opos = bh->start();
885 continue;
886 }
887
888 if (!bh->is_rx()) {
889 ldout(cct, 10) << "bh_read_finish skipping non-rx " << *bh << dendl;
890 opos = bh->end();
891 continue;
892 }
893
894 if (bh->last_read_tid != tid) {
895 ldout(cct, 10) << "bh_read_finish bh->last_read_tid "
896 << bh->last_read_tid << " != tid " << tid
897 << ", skipping" << dendl;
898 opos = bh->end();
899 continue;
900 }
901
902 assert(opos >= bh->start());
903 assert(bh->start() == opos); // we don't merge rx bh's... yet!
904 assert(bh->length() <= start+(loff_t)length-opos);
905
906 if (bh->error < 0)
907 err = bh->error;
908
909 opos = bh->end();
910
911 if (r == -ENOENT) {
912 if (trust_enoent) {
913 ldout(cct, 10) << "bh_read_finish removing " << *bh << dendl;
914 bh_remove(ob, bh);
915 delete bh;
916 } else {
917 ldout(cct, 10) << "skipping unstrusted -ENOENT and will retry for "
918 << *bh << dendl;
919 }
920 continue;
921 }
922
923 if (r < 0) {
924 bh->error = r;
925 mark_error(bh);
926 } else {
927 bh->bl.substr_of(bl,
928 bh->start() - start,
929 bh->length());
930 mark_clean(bh);
931 }
932
933 ldout(cct, 10) << "bh_read_finish read " << *bh << dendl;
934
935 ob->try_merge_bh(bh);
936 }
937 }
938
939 // called with lock held.
940 ldout(cct, 20) << "finishing waiters " << ls << dendl;
941
942 finish_contexts(cct, ls, err);
943 retry_waiting_reads();
944
945 --reads_outstanding;
946 read_cond.Signal();
947}
948
949void ObjectCacher::bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
950 int64_t *max_amount, int *max_count)
951{
952 list<BufferHead*> blist;
953
954 int count = 0;
955 int64_t total_len = 0;
956 set<BufferHead*, BufferHead::ptr_lt>::iterator it = dirty_or_tx_bh.find(bh);
957 assert(it != dirty_or_tx_bh.end());
958 for (set<BufferHead*, BufferHead::ptr_lt>::iterator p = it;
959 p != dirty_or_tx_bh.end();
960 ++p) {
961 BufferHead *obh = *p;
962 if (obh->ob != bh->ob)
963 break;
964 if (obh->is_dirty() && obh->last_write <= cutoff) {
965 blist.push_back(obh);
966 ++count;
967 total_len += obh->length();
968 if ((max_count && count > *max_count) ||
969 (max_amount && total_len > *max_amount))
970 break;
971 }
972 }
973
974 while (it != dirty_or_tx_bh.begin()) {
975 --it;
976 BufferHead *obh = *it;
977 if (obh->ob != bh->ob)
978 break;
979 if (obh->is_dirty() && obh->last_write <= cutoff) {
980 blist.push_front(obh);
981 ++count;
982 total_len += obh->length();
983 if ((max_count && count > *max_count) ||
984 (max_amount && total_len > *max_amount))
985 break;
986 }
987 }
988 if (max_count)
989 *max_count -= count;
990 if (max_amount)
991 *max_amount -= total_len;
992
993 bh_write_scattered(blist);
994}
995
996class ObjectCacher::C_WriteCommit : public Context {
997 ObjectCacher *oc;
998 int64_t poolid;
999 sobject_t oid;
1000 vector<pair<loff_t, uint64_t> > ranges;
31f18b77 1001 ZTracer::Trace trace;
7c673cae 1002public:
31f18b77 1003 ceph_tid_t tid = 0;
7c673cae 1004 C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, loff_t s,
31f18b77
FG
1005 uint64_t l, const ZTracer::Trace &trace) :
1006 oc(c), poolid(_poolid), oid(o), trace(trace) {
7c673cae
FG
1007 ranges.push_back(make_pair(s, l));
1008 }
1009 C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o,
1010 vector<pair<loff_t, uint64_t> >& _ranges) :
1011 oc(c), poolid(_poolid), oid(o), tid(0) {
1012 ranges.swap(_ranges);
1013 }
1014 void finish(int r) override {
1015 oc->bh_write_commit(poolid, oid, ranges, tid, r);
31f18b77 1016 trace.event("finish");
7c673cae
FG
1017 }
1018};
1019void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
1020{
1021 assert(lock.is_locked());
1022
1023 Object *ob = blist.front()->ob;
1024 ob->get();
1025
1026 ceph::real_time last_write;
1027 SnapContext snapc;
1028 vector<pair<loff_t, uint64_t> > ranges;
1029 vector<pair<uint64_t, bufferlist> > io_vec;
1030
1031 ranges.reserve(blist.size());
1032 io_vec.reserve(blist.size());
1033
1034 uint64_t total_len = 0;
1035 for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1036 BufferHead *bh = *p;
1037 ldout(cct, 7) << "bh_write_scattered " << *bh << dendl;
1038 assert(bh->ob == ob);
1039 assert(bh->bl.length() == bh->length());
1040 ranges.push_back(pair<loff_t, uint64_t>(bh->start(), bh->length()));
1041
1042 int n = io_vec.size();
1043 io_vec.resize(n + 1);
1044 io_vec[n].first = bh->start();
1045 io_vec[n].second = bh->bl;
1046
1047 total_len += bh->length();
1048 if (bh->snapc.seq > snapc.seq)
1049 snapc = bh->snapc;
1050 if (bh->last_write > last_write)
1051 last_write = bh->last_write;
1052 }
1053
1054 C_WriteCommit *oncommit = new C_WriteCommit(this, ob->oloc.pool, ob->get_soid(), ranges);
1055
1056 ceph_tid_t tid = writeback_handler.write(ob->get_oid(), ob->get_oloc(),
1057 io_vec, snapc, last_write,
1058 ob->truncate_size, ob->truncate_seq,
1059 oncommit);
1060 oncommit->tid = tid;
1061 ob->last_write_tid = tid;
1062 for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1063 BufferHead *bh = *p;
1064 bh->last_write_tid = tid;
1065 mark_tx(bh);
1066 }
1067
1068 if (perfcounter)
1069 perfcounter->inc(l_objectcacher_data_flushed, total_len);
1070}
1071
31f18b77 1072void ObjectCacher::bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace)
7c673cae
FG
1073{
1074 assert(lock.is_locked());
1075 ldout(cct, 7) << "bh_write " << *bh << dendl;
1076
1077 bh->ob->get();
1078
31f18b77
FG
1079 ZTracer::Trace trace;
1080 if (parent_trace.valid()) {
1081 trace.init("", &trace_endpoint, &parent_trace);
1082 trace.copy_name("bh_write " + bh->ob->get_oid().name);
1083 trace.event("start");
1084 }
1085
7c673cae
FG
1086 // finishers
1087 C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->oloc.pool,
1088 bh->ob->get_soid(), bh->start(),
31f18b77 1089 bh->length(), trace);
7c673cae
FG
1090 // go
1091 ceph_tid_t tid = writeback_handler.write(bh->ob->get_oid(),
1092 bh->ob->get_oloc(),
1093 bh->start(), bh->length(),
1094 bh->snapc, bh->bl, bh->last_write,
1095 bh->ob->truncate_size,
1096 bh->ob->truncate_seq,
31f18b77 1097 bh->journal_tid, trace, oncommit);
7c673cae
FG
1098 ldout(cct, 20) << " tid " << tid << " on " << bh->ob->get_oid() << dendl;
1099
1100 // set bh last_write_tid
1101 oncommit->tid = tid;
1102 bh->ob->last_write_tid = tid;
1103 bh->last_write_tid = tid;
1104
1105 if (perfcounter) {
1106 perfcounter->inc(l_objectcacher_data_flushed, bh->length());
1107 }
1108
1109 mark_tx(bh);
1110}
1111
1112void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid,
1113 vector<pair<loff_t, uint64_t> >& ranges,
1114 ceph_tid_t tid, int r)
1115{
1116 assert(lock.is_locked());
1117 ldout(cct, 7) << "bh_write_commit " << oid << " tid " << tid
1118 << " ranges " << ranges << " returned " << r << dendl;
1119
1120 if (objects[poolid].count(oid) == 0) {
1121 ldout(cct, 7) << "bh_write_commit no object cache" << dendl;
1122 return;
1123 }
1124
1125 Object *ob = objects[poolid][oid];
1126 int was_dirty_or_tx = ob->oset->dirty_or_tx;
1127
1128 for (vector<pair<loff_t, uint64_t> >::iterator p = ranges.begin();
1129 p != ranges.end();
1130 ++p) {
1131 loff_t start = p->first;
1132 uint64_t length = p->second;
1133 if (!ob->exists) {
1134 ldout(cct, 10) << "bh_write_commit marking exists on " << *ob << dendl;
1135 ob->exists = true;
1136
1137 if (writeback_handler.may_copy_on_write(ob->get_oid(), start, length,
1138 ob->get_snap())) {
1139 ldout(cct, 10) << "bh_write_commit may copy on write, clearing "
1140 "complete on " << *ob << dendl;
1141 ob->complete = false;
1142 }
1143 }
1144
1145 vector<pair<loff_t, BufferHead*>> hit;
1146 // apply to bh's!
1147 for (map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(start);
1148 p != ob->data.end();
1149 ++p) {
1150 BufferHead *bh = p->second;
1151
1152 if (bh->start() > start+(loff_t)length)
1153 break;
1154
1155 if (bh->start() < start &&
1156 bh->end() > start+(loff_t)length) {
1157 ldout(cct, 20) << "bh_write_commit skipping " << *bh << dendl;
1158 continue;
1159 }
1160
1161 // make sure bh is tx
1162 if (!bh->is_tx()) {
1163 ldout(cct, 10) << "bh_write_commit skipping non-tx " << *bh << dendl;
1164 continue;
1165 }
1166
1167 // make sure bh tid matches
1168 if (bh->last_write_tid != tid) {
1169 assert(bh->last_write_tid > tid);
1170 ldout(cct, 10) << "bh_write_commit newer tid on " << *bh << dendl;
1171 continue;
1172 }
1173
1174 if (r >= 0) {
1175 // ok! mark bh clean and error-free
1176 mark_clean(bh);
1177 bh->set_journal_tid(0);
1178 if (bh->get_nocache())
1179 bh_lru_rest.lru_bottouch(bh);
1180 hit.push_back(make_pair(bh->start(), bh));
1181 ldout(cct, 10) << "bh_write_commit clean " << *bh << dendl;
1182 } else {
1183 mark_dirty(bh);
1184 ldout(cct, 10) << "bh_write_commit marking dirty again due to error "
1185 << *bh << " r = " << r << " " << cpp_strerror(-r)
1186 << dendl;
1187 }
1188 }
1189
1190 for (auto& p : hit) {
1191 //p.second maybe merged and deleted in merge_left
1192 if (ob->data.count(p.first))
1193 ob->try_merge_bh(p.second);
1194 }
1195 }
1196
1197 // update last_commit.
1198 assert(ob->last_commit_tid < tid);
1199 ob->last_commit_tid = tid;
1200
1201 // waiters?
1202 list<Context*> ls;
1203 if (ob->waitfor_commit.count(tid)) {
1204 ls.splice(ls.begin(), ob->waitfor_commit[tid]);
1205 ob->waitfor_commit.erase(tid);
1206 }
1207
1208 // is the entire object set now clean and fully committed?
1209 ObjectSet *oset = ob->oset;
1210 ob->put();
1211
1212 if (flush_set_callback &&
1213 was_dirty_or_tx > 0 &&
1214 oset->dirty_or_tx == 0) { // nothing dirty/tx
1215 flush_set_callback(flush_set_callback_arg, oset);
1216 }
1217
1218 if (!ls.empty())
1219 finish_contexts(cct, ls, r);
1220}
1221
31f18b77 1222void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount)
7c673cae 1223{
31f18b77 1224 assert(trace != nullptr);
7c673cae
FG
1225 assert(lock.is_locked());
1226 ceph::real_time cutoff = ceph::real_clock::now();
1227
1228 ldout(cct, 10) << "flush " << amount << dendl;
1229
1230 /*
1231 * NOTE: we aren't actually pulling things off the LRU here, just
1232 * looking at the tail item. Then we call bh_write, which moves it
1233 * to the other LRU, so that we can call
1234 * lru_dirty.lru_get_next_expire() again.
1235 */
1236 int64_t left = amount;
1237 while (amount == 0 || left > 0) {
1238 BufferHead *bh = static_cast<BufferHead*>(
1239 bh_lru_dirty.lru_get_next_expire());
1240 if (!bh) break;
1241 if (bh->last_write > cutoff) break;
1242
1243 if (scattered_write) {
1244 bh_write_adjacencies(bh, cutoff, amount > 0 ? &left : NULL, NULL);
1245 } else {
1246 left -= bh->length();
31f18b77 1247 bh_write(bh, *trace);
7c673cae 1248 }
31f18b77 1249 }
7c673cae
FG
1250}
1251
1252
1253void ObjectCacher::trim()
1254{
1255 assert(lock.is_locked());
1256 ldout(cct, 10) << "trim start: bytes: max " << max_size << " clean "
1257 << get_stat_clean() << ", objects: max " << max_objects
1258 << " current " << ob_lru.lru_get_size() << dendl;
1259
3efd9988
FG
1260 uint64_t max_clean_bh = max_size >> BUFFER_MEMORY_WEIGHT;
1261 uint64_t nr_clean_bh = bh_lru_rest.lru_get_size() - bh_lru_rest.lru_get_num_pinned();
1262 while (get_stat_clean() > 0 &&
1263 ((uint64_t)get_stat_clean() > max_size ||
1264 nr_clean_bh > max_clean_bh)) {
7c673cae
FG
1265 BufferHead *bh = static_cast<BufferHead*>(bh_lru_rest.lru_expire());
1266 if (!bh)
1267 break;
1268
1269 ldout(cct, 10) << "trim trimming " << *bh << dendl;
1270 assert(bh->is_clean() || bh->is_zero() || bh->is_error());
1271
1272 Object *ob = bh->ob;
1273 bh_remove(ob, bh);
1274 delete bh;
1275
3efd9988
FG
1276 --nr_clean_bh;
1277
7c673cae
FG
1278 if (ob->complete) {
1279 ldout(cct, 10) << "trim clearing complete on " << *ob << dendl;
1280 ob->complete = false;
1281 }
1282 }
1283
1284 while (ob_lru.lru_get_size() > max_objects) {
1285 Object *ob = static_cast<Object*>(ob_lru.lru_expire());
1286 if (!ob)
1287 break;
1288
1289 ldout(cct, 10) << "trim trimming " << *ob << dendl;
1290 close_object(ob);
1291 }
1292
1293 ldout(cct, 10) << "trim finish: max " << max_size << " clean "
1294 << get_stat_clean() << ", objects: max " << max_objects
1295 << " current " << ob_lru.lru_get_size() << dendl;
1296}
1297
1298
1299
1300/* public */
1301
1302bool ObjectCacher::is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
1303 snapid_t snapid)
1304{
1305 assert(lock.is_locked());
1306 for (vector<ObjectExtent>::iterator ex_it = extents.begin();
1307 ex_it != extents.end();
1308 ++ex_it) {
1309 ldout(cct, 10) << "is_cached " << *ex_it << dendl;
1310
1311 // get Object cache
1312 sobject_t soid(ex_it->oid, snapid);
1313 Object *o = get_object_maybe(soid, ex_it->oloc);
1314 if (!o)
1315 return false;
1316 if (!o->is_cached(ex_it->offset, ex_it->length))
1317 return false;
1318 }
1319 return true;
1320}
1321
1322
1323/*
1324 * returns # bytes read (if in cache). onfinish is untouched (caller
1325 * must delete it)
1326 * returns 0 if doing async read
1327 */
31f18b77
FG
1328int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
1329 ZTracer::Trace *parent_trace)
7c673cae 1330{
31f18b77
FG
1331 ZTracer::Trace trace;
1332 if (parent_trace != nullptr) {
1333 trace.init("read", &trace_endpoint, parent_trace);
1334 trace.event("start");
1335 }
1336
1337 int r =_readx(rd, oset, onfinish, true, &trace);
1338 if (r < 0) {
1339 trace.event("finish");
1340 }
1341 return r;
7c673cae
FG
1342}
1343
1344int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
31f18b77 1345 bool external_call, ZTracer::Trace *trace)
7c673cae 1346{
31f18b77 1347 assert(trace != nullptr);
7c673cae
FG
1348 assert(lock.is_locked());
1349 bool success = true;
1350 int error = 0;
1351 uint64_t bytes_in_cache = 0;
1352 uint64_t bytes_not_in_cache = 0;
1353 uint64_t total_bytes_read = 0;
1354 map<uint64_t, bufferlist> stripe_map; // final buffer offset -> substring
1355 bool dontneed = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1356 bool nocache = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1357
1358 /*
1359 * WARNING: we can only meaningfully return ENOENT if the read request
1360 * passed in a single ObjectExtent. Any caller who wants ENOENT instead of
1361 * zeroed buffers needs to feed single extents into readx().
1362 */
1363 assert(!oset->return_enoent || rd->extents.size() == 1);
1364
1365 for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
1366 ex_it != rd->extents.end();
1367 ++ex_it) {
1368 ldout(cct, 10) << "readx " << *ex_it << dendl;
1369
1370 total_bytes_read += ex_it->length;
1371
1372 // get Object cache
1373 sobject_t soid(ex_it->oid, rd->snap);
1374 Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1375 ex_it->truncate_size, oset->truncate_seq);
1376 if (external_call)
1377 touch_ob(o);
1378
1379 // does not exist and no hits?
1380 if (oset->return_enoent && !o->exists) {
1381 ldout(cct, 10) << "readx object !exists, 1 extent..." << dendl;
1382
1383 // should we worry about COW underneath us?
1384 if (writeback_handler.may_copy_on_write(soid.oid, ex_it->offset,
1385 ex_it->length, soid.snap)) {
1386 ldout(cct, 20) << "readx may copy on write" << dendl;
1387 bool wait = false;
1388 list<BufferHead*> blist;
1389 for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1390 bh_it != o->data.end();
1391 ++bh_it) {
1392 BufferHead *bh = bh_it->second;
1393 if (bh->is_dirty() || bh->is_tx()) {
1394 ldout(cct, 10) << "readx flushing " << *bh << dendl;
1395 wait = true;
1396 if (bh->is_dirty()) {
1397 if (scattered_write)
1398 blist.push_back(bh);
1399 else
31f18b77 1400 bh_write(bh, *trace);
7c673cae
FG
1401 }
1402 }
1403 }
1404 if (scattered_write && !blist.empty())
1405 bh_write_scattered(blist);
1406 if (wait) {
1407 ldout(cct, 10) << "readx waiting on tid " << o->last_write_tid
1408 << " on " << *o << dendl;
1409 o->waitfor_commit[o->last_write_tid].push_back(
31f18b77 1410 new C_RetryRead(this,rd, oset, onfinish, *trace));
7c673cae
FG
1411 // FIXME: perfcounter!
1412 return 0;
1413 }
1414 }
1415
1416 // can we return ENOENT?
1417 bool allzero = true;
1418 for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1419 bh_it != o->data.end();
1420 ++bh_it) {
1421 ldout(cct, 20) << "readx ob has bh " << *bh_it->second << dendl;
1422 if (!bh_it->second->is_zero() && !bh_it->second->is_rx()) {
1423 allzero = false;
1424 break;
1425 }
1426 }
1427 if (allzero) {
1428 ldout(cct, 10) << "readx ob has all zero|rx, returning ENOENT"
1429 << dendl;
1430 delete rd;
1431 if (dontneed)
1432 bottouch_ob(o);
1433 return -ENOENT;
1434 }
1435 }
1436
1437 // map extent into bufferheads
1438 map<loff_t, BufferHead*> hits, missing, rx, errors;
1439 o->map_read(*ex_it, hits, missing, rx, errors);
1440 if (external_call) {
1441 // retry reading error buffers
1442 missing.insert(errors.begin(), errors.end());
1443 } else {
1444 // some reads had errors, fail later so completions
1445 // are cleaned up properly
1446 // TODO: make read path not call _readx for every completion
1447 hits.insert(errors.begin(), errors.end());
1448 }
1449
1450 if (!missing.empty() || !rx.empty()) {
1451 // read missing
1452 map<loff_t, BufferHead*>::iterator last = missing.end();
1453 for (map<loff_t, BufferHead*>::iterator bh_it = missing.begin();
1454 bh_it != missing.end();
1455 ++bh_it) {
1456 uint64_t rx_bytes = static_cast<uint64_t>(
1457 stat_rx + bh_it->second->length());
1458 bytes_not_in_cache += bh_it->second->length();
1459 if (!waitfor_read.empty() || (stat_rx > 0 && rx_bytes > max_size)) {
1460 // cache is full with concurrent reads -- wait for rx's to complete
1461 // to constrain memory growth (especially during copy-ups)
1462 if (success) {
1463 ldout(cct, 10) << "readx missed, waiting on cache to complete "
1464 << waitfor_read.size() << " blocked reads, "
1465 << (MAX(rx_bytes, max_size) - max_size)
1466 << " read bytes" << dendl;
31f18b77
FG
1467 waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish,
1468 *trace));
7c673cae
FG
1469 }
1470
1471 bh_remove(o, bh_it->second);
1472 delete bh_it->second;
1473 } else {
1474 bh_it->second->set_nocache(nocache);
31f18b77 1475 bh_read(bh_it->second, rd->fadvise_flags, *trace);
7c673cae
FG
1476 if ((success && onfinish) || last != missing.end())
1477 last = bh_it;
1478 }
1479 success = false;
1480 }
1481
1482 //add wait in last bh avoid wakeup early. Because read is order
1483 if (last != missing.end()) {
1484 ldout(cct, 10) << "readx missed, waiting on " << *last->second
1485 << " off " << last->first << dendl;
1486 last->second->waitfor_read[last->first].push_back(
31f18b77 1487 new C_RetryRead(this, rd, oset, onfinish, *trace) );
7c673cae
FG
1488
1489 }
1490
1491 // bump rx
1492 for (map<loff_t, BufferHead*>::iterator bh_it = rx.begin();
1493 bh_it != rx.end();
1494 ++bh_it) {
1495 touch_bh(bh_it->second); // bump in lru, so we don't lose it.
1496 if (success && onfinish) {
1497 ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
1498 << " off " << bh_it->first << dendl;
1499 bh_it->second->waitfor_read[bh_it->first].push_back(
31f18b77 1500 new C_RetryRead(this, rd, oset, onfinish, *trace) );
7c673cae
FG
1501 }
1502 bytes_not_in_cache += bh_it->second->length();
1503 success = false;
1504 }
1505
1506 for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1507 bh_it != hits.end(); ++bh_it)
1508 //bump in lru, so we don't lose it when later read
1509 touch_bh(bh_it->second);
1510
1511 } else {
1512 assert(!hits.empty());
1513
1514 // make a plain list
1515 for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1516 bh_it != hits.end();
1517 ++bh_it) {
1518 BufferHead *bh = bh_it->second;
1519 ldout(cct, 10) << "readx hit bh " << *bh << dendl;
1520 if (bh->is_error() && bh->error)
1521 error = bh->error;
1522 bytes_in_cache += bh->length();
1523
1524 if (bh->get_nocache() && bh->is_clean())
1525 bh_lru_rest.lru_bottouch(bh);
1526 else
1527 touch_bh(bh);
1528 //must be after touch_bh because touch_bh set dontneed false
1529 if (dontneed &&
1530 ((loff_t)ex_it->offset <= bh->start() &&
1531 (bh->end() <=(loff_t)(ex_it->offset + ex_it->length)))) {
1532 bh->set_dontneed(true); //if dirty
1533 if (bh->is_clean())
1534 bh_lru_rest.lru_bottouch(bh);
1535 }
1536 }
1537
1538 if (!error) {
1539 // create reverse map of buffer offset -> object for the
1540 // eventual result. this is over a single ObjectExtent, so we
1541 // know that
1542 // - the bh's are contiguous
1543 // - the buffer frags need not be (and almost certainly aren't)
1544 loff_t opos = ex_it->offset;
1545 map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1546 assert(bh_it->second->start() <= opos);
1547 uint64_t bhoff = opos - bh_it->second->start();
1548 vector<pair<uint64_t,uint64_t> >::iterator f_it
1549 = ex_it->buffer_extents.begin();
1550 uint64_t foff = 0;
1551 while (1) {
1552 BufferHead *bh = bh_it->second;
1553 assert(opos == (loff_t)(bh->start() + bhoff));
1554
1555 uint64_t len = MIN(f_it->second - foff, bh->length() - bhoff);
1556 ldout(cct, 10) << "readx rmap opos " << opos << ": " << *bh << " +"
1557 << bhoff << " frag " << f_it->first << "~"
1558 << f_it->second << " +" << foff << "~" << len
1559 << dendl;
1560
1561 bufferlist bit;
1562 // put substr here first, since substr_of clobbers, and we
1563 // may get multiple bh's at this stripe_map position
1564 if (bh->is_zero()) {
1565 stripe_map[f_it->first].append_zero(len);
1566 } else {
1567 bit.substr_of(bh->bl,
1568 opos - bh->start(),
1569 len);
1570 stripe_map[f_it->first].claim_append(bit);
1571 }
1572
1573 opos += len;
1574 bhoff += len;
1575 foff += len;
1576 if (opos == bh->end()) {
1577 ++bh_it;
1578 bhoff = 0;
1579 }
1580 if (foff == f_it->second) {
1581 ++f_it;
1582 foff = 0;
1583 }
1584 if (bh_it == hits.end()) break;
1585 if (f_it == ex_it->buffer_extents.end())
1586 break;
1587 }
1588 assert(f_it == ex_it->buffer_extents.end());
1589 assert(opos == (loff_t)ex_it->offset + (loff_t)ex_it->length);
1590 }
1591
1592 if (dontneed && o->include_all_cached_data(ex_it->offset, ex_it->length))
1593 bottouch_ob(o);
1594 }
1595 }
1596
1597 if (!success) {
1598 if (perfcounter && external_call) {
1599 perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1600 perfcounter->inc(l_objectcacher_cache_bytes_miss, bytes_not_in_cache);
1601 perfcounter->inc(l_objectcacher_cache_ops_miss);
1602 }
1603 if (onfinish) {
1604 ldout(cct, 20) << "readx defer " << rd << dendl;
1605 } else {
1606 ldout(cct, 20) << "readx drop " << rd << " (no complete, but no waiter)"
1607 << dendl;
1608 delete rd;
1609 }
1610 return 0; // wait!
1611 }
1612 if (perfcounter && external_call) {
1613 perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1614 perfcounter->inc(l_objectcacher_cache_bytes_hit, bytes_in_cache);
1615 perfcounter->inc(l_objectcacher_cache_ops_hit);
1616 }
1617
1618 // no misses... success! do the read.
1619 ldout(cct, 10) << "readx has all buffers" << dendl;
1620
1621 // ok, assemble into result buffer.
1622 uint64_t pos = 0;
1623 if (rd->bl && !error) {
1624 rd->bl->clear();
1625 for (map<uint64_t,bufferlist>::iterator i = stripe_map.begin();
1626 i != stripe_map.end();
1627 ++i) {
1628 assert(pos == i->first);
1629 ldout(cct, 10) << "readx adding buffer len " << i->second.length()
1630 << " at " << pos << dendl;
1631 pos += i->second.length();
1632 rd->bl->claim_append(i->second);
1633 assert(rd->bl->length() == pos);
1634 }
1635 ldout(cct, 10) << "readx result is " << rd->bl->length() << dendl;
1636 } else if (!error) {
1637 ldout(cct, 10) << "readx no bufferlist ptr (readahead?), done." << dendl;
1638 map<uint64_t,bufferlist>::reverse_iterator i = stripe_map.rbegin();
1639 pos = i->first + i->second.length();
1640 }
1641
1642 // done with read.
1643 int ret = error ? error : pos;
1644 ldout(cct, 20) << "readx done " << rd << " " << ret << dendl;
1645 assert(pos <= (uint64_t) INT_MAX);
1646
1647 delete rd;
1648
1649 trim();
1650
1651 return ret;
1652}
1653
1654void ObjectCacher::retry_waiting_reads()
1655{
1656 list<Context *> ls;
1657 ls.swap(waitfor_read);
1658
1659 while (!ls.empty() && waitfor_read.empty()) {
1660 Context *ctx = ls.front();
1661 ls.pop_front();
1662 ctx->complete(0);
1663 }
1664 waitfor_read.splice(waitfor_read.end(), ls);
1665}
1666
31f18b77
FG
1667int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
1668 ZTracer::Trace *parent_trace)
7c673cae
FG
1669{
1670 assert(lock.is_locked());
1671 ceph::real_time now = ceph::real_clock::now();
1672 uint64_t bytes_written = 0;
1673 uint64_t bytes_written_in_flush = 0;
1674 bool dontneed = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1675 bool nocache = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1676
31f18b77
FG
1677 ZTracer::Trace trace;
1678 if (parent_trace != nullptr) {
1679 trace.init("write", &trace_endpoint, parent_trace);
1680 trace.event("start");
1681 }
1682
7c673cae
FG
1683 for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
1684 ex_it != wr->extents.end();
1685 ++ex_it) {
1686 // get object cache
1687 sobject_t soid(ex_it->oid, CEPH_NOSNAP);
1688 Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1689 ex_it->truncate_size, oset->truncate_seq);
1690
1691 // map it all into a single bufferhead.
1692 BufferHead *bh = o->map_write(*ex_it, wr->journal_tid);
1693 bool missing = bh->is_missing();
1694 bh->snapc = wr->snapc;
31f18b77 1695
7c673cae
FG
1696 bytes_written += ex_it->length;
1697 if (bh->is_tx()) {
1698 bytes_written_in_flush += ex_it->length;
1699 }
1700
1701 // adjust buffer pointers (ie "copy" data into my cache)
1702 // this is over a single ObjectExtent, so we know that
1703 // - there is one contiguous bh
1704 // - the buffer frags need not be (and almost certainly aren't)
1705 // note: i assume striping is monotonic... no jumps backwards, ever!
1706 loff_t opos = ex_it->offset;
1707 for (vector<pair<uint64_t, uint64_t> >::iterator f_it
1708 = ex_it->buffer_extents.begin();
1709 f_it != ex_it->buffer_extents.end();
1710 ++f_it) {
1711 ldout(cct, 10) << "writex writing " << f_it->first << "~"
1712 << f_it->second << " into " << *bh << " at " << opos
1713 << dendl;
1714 uint64_t bhoff = bh->start() - opos;
1715 assert(f_it->second <= bh->length() - bhoff);
1716
1717 // get the frag we're mapping in
1718 bufferlist frag;
1719 frag.substr_of(wr->bl,
1720 f_it->first, f_it->second);
1721
1722 // keep anything left of bhoff
1723 bufferlist newbl;
1724 if (bhoff)
1725 newbl.substr_of(bh->bl, 0, bhoff);
1726 newbl.claim_append(frag);
1727 bh->bl.swap(newbl);
1728
1729 opos += f_it->second;
1730 }
1731
1732 // ok, now bh is dirty.
1733 mark_dirty(bh);
1734 if (dontneed)
1735 bh->set_dontneed(true);
1736 else if (nocache && missing)
1737 bh->set_nocache(true);
1738 else
1739 touch_bh(bh);
1740
1741 bh->last_write = now;
1742
1743 o->try_merge_bh(bh);
1744 }
1745
1746 if (perfcounter) {
1747 perfcounter->inc(l_objectcacher_data_written, bytes_written);
1748 if (bytes_written_in_flush) {
1749 perfcounter->inc(l_objectcacher_overwritten_in_flush,
1750 bytes_written_in_flush);
1751 }
1752 }
1753
31f18b77 1754 int r = _wait_for_write(wr, bytes_written, oset, &trace, onfreespace);
7c673cae
FG
1755 delete wr;
1756
1757 //verify_stats();
1758 trim();
1759 return r;
1760}
1761
1762class ObjectCacher::C_WaitForWrite : public Context {
1763public:
31f18b77
FG
1764 C_WaitForWrite(ObjectCacher *oc, uint64_t len,
1765 const ZTracer::Trace &trace, Context *onfinish) :
1766 m_oc(oc), m_len(len), m_trace(trace), m_onfinish(onfinish) {}
7c673cae
FG
1767 void finish(int r) override;
1768private:
1769 ObjectCacher *m_oc;
1770 uint64_t m_len;
31f18b77 1771 ZTracer::Trace m_trace;
7c673cae
FG
1772 Context *m_onfinish;
1773};
1774
1775void ObjectCacher::C_WaitForWrite::finish(int r)
1776{
1777 Mutex::Locker l(m_oc->lock);
31f18b77 1778 m_oc->maybe_wait_for_writeback(m_len, &m_trace);
7c673cae
FG
1779 m_onfinish->complete(r);
1780}
1781
31f18b77
FG
1782void ObjectCacher::maybe_wait_for_writeback(uint64_t len,
1783 ZTracer::Trace *trace)
7c673cae
FG
1784{
1785 assert(lock.is_locked());
1786 ceph::mono_time start = ceph::mono_clock::now();
1787 int blocked = 0;
1788 // wait for writeback?
1789 // - wait for dirty and tx bytes (relative to the max_dirty threshold)
1790 // - do not wait for bytes other waiters are waiting on. this means that
1791 // threads do not wait for each other. this effectively allows the cache
1792 // size to balloon proportional to the data that is in flight.
3efd9988
FG
1793
1794 uint64_t max_dirty_bh = max_dirty >> BUFFER_MEMORY_WEIGHT;
7c673cae 1795 while (get_stat_dirty() + get_stat_tx() > 0 &&
3efd9988
FG
1796 (((uint64_t)(get_stat_dirty() + get_stat_tx()) >=
1797 max_dirty + get_stat_dirty_waiting()) ||
1798 (dirty_or_tx_bh.size() >=
1799 max_dirty_bh + get_stat_nr_dirty_waiters()))) {
1800
31f18b77
FG
1801 if (blocked == 0) {
1802 trace->event("start wait for writeback");
1803 }
7c673cae
FG
1804 ldout(cct, 10) << __func__ << " waiting for dirty|tx "
1805 << (get_stat_dirty() + get_stat_tx()) << " >= max "
1806 << max_dirty << " + dirty_waiting "
1807 << get_stat_dirty_waiting() << dendl;
1808 flusher_cond.Signal();
1809 stat_dirty_waiting += len;
3efd9988 1810 ++stat_nr_dirty_waiters;
7c673cae
FG
1811 stat_cond.Wait(lock);
1812 stat_dirty_waiting -= len;
3efd9988 1813 --stat_nr_dirty_waiters;
7c673cae
FG
1814 ++blocked;
1815 ldout(cct, 10) << __func__ << " woke up" << dendl;
1816 }
31f18b77
FG
1817 if (blocked > 0) {
1818 trace->event("finish wait for writeback");
1819 }
7c673cae
FG
1820 if (blocked && perfcounter) {
1821 perfcounter->inc(l_objectcacher_write_ops_blocked);
1822 perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
1823 ceph::timespan blocked = ceph::mono_clock::now() - start;
1824 perfcounter->tinc(l_objectcacher_write_time_blocked, blocked);
1825 }
1826}
1827
1828// blocking wait for write.
1829int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
31f18b77 1830 ZTracer::Trace *trace, Context *onfreespace)
7c673cae
FG
1831{
1832 assert(lock.is_locked());
31f18b77 1833 assert(trace != nullptr);
7c673cae
FG
1834 int ret = 0;
1835
1836 if (max_dirty > 0) {
1837 if (block_writes_upfront) {
31f18b77 1838 maybe_wait_for_writeback(len, trace);
7c673cae
FG
1839 if (onfreespace)
1840 onfreespace->complete(0);
1841 } else {
1842 assert(onfreespace);
31f18b77 1843 finisher.queue(new C_WaitForWrite(this, len, *trace, onfreespace));
7c673cae
FG
1844 }
1845 } else {
1846 // write-thru! flush what we just wrote.
1847 Cond cond;
1848 bool done = false;
1849 Context *fin = block_writes_upfront ?
1850 new C_Cond(&cond, &done, &ret) : onfreespace;
1851 assert(fin);
31f18b77 1852 bool flushed = flush_set(oset, wr->extents, trace, fin);
7c673cae
FG
1853 assert(!flushed); // we just dirtied it, and didn't drop our lock!
1854 ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len
1855 << " bytes" << dendl;
1856 if (block_writes_upfront) {
1857 while (!done)
1858 cond.Wait(lock);
1859 ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl;
1860 if (onfreespace)
1861 onfreespace->complete(ret);
1862 }
1863 }
1864
1865 // start writeback anyway?
1866 if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty) {
1867 ldout(cct, 10) << "wait_for_write " << get_stat_dirty() << " > target "
1868 << target_dirty << ", nudging flusher" << dendl;
1869 flusher_cond.Signal();
1870 }
1871 return ret;
1872}
1873
1874void ObjectCacher::flusher_entry()
1875{
1876 ldout(cct, 10) << "flusher start" << dendl;
1877 lock.Lock();
1878 while (!flusher_stop) {
1879 loff_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() +
1880 get_stat_dirty();
1881 ldout(cct, 11) << "flusher "
1882 << all << " / " << max_size << ": "
1883 << get_stat_tx() << " tx, "
1884 << get_stat_rx() << " rx, "
1885 << get_stat_clean() << " clean, "
1886 << get_stat_dirty() << " dirty ("
1887 << target_dirty << " target, "
1888 << max_dirty << " max)"
1889 << dendl;
1890 loff_t actual = get_stat_dirty() + get_stat_dirty_waiting();
31f18b77
FG
1891
1892 ZTracer::Trace trace;
1893 if (cct->_conf->osdc_blkin_trace_all) {
1894 trace.init("flusher", &trace_endpoint);
1895 trace.event("start");
1896 }
1897
7c673cae
FG
1898 if (actual > 0 && (uint64_t) actual > target_dirty) {
1899 // flush some dirty pages
1900 ldout(cct, 10) << "flusher " << get_stat_dirty() << " dirty + "
1901 << get_stat_dirty_waiting() << " dirty_waiting > target "
1902 << target_dirty << ", flushing some dirty bhs" << dendl;
31f18b77 1903 flush(&trace, actual - target_dirty);
7c673cae
FG
1904 } else {
1905 // check tail of lru for old dirty items
1906 ceph::real_time cutoff = ceph::real_clock::now();
1907 cutoff -= max_dirty_age;
1908 BufferHead *bh = 0;
1909 int max = MAX_FLUSH_UNDER_LOCK;
1910 while ((bh = static_cast<BufferHead*>(bh_lru_dirty.
1911 lru_get_next_expire())) != 0 &&
1912 bh->last_write <= cutoff &&
1913 max > 0) {
1914 ldout(cct, 10) << "flusher flushing aged dirty bh " << *bh << dendl;
1915 if (scattered_write) {
1916 bh_write_adjacencies(bh, cutoff, NULL, &max);
1917 } else {
31f18b77 1918 bh_write(bh, trace);
7c673cae
FG
1919 --max;
1920 }
1921 }
1922 if (!max) {
1923 // back off the lock to avoid starving other threads
31f18b77 1924 trace.event("backoff");
7c673cae
FG
1925 lock.Unlock();
1926 lock.Lock();
1927 continue;
1928 }
1929 }
31f18b77
FG
1930
1931 trace.event("finish");
7c673cae
FG
1932 if (flusher_stop)
1933 break;
1934
1935 flusher_cond.WaitInterval(lock, seconds(1));
1936 }
1937
1938 /* Wait for reads to finish. This is only possible if handling
1939 * -ENOENT made some read completions finish before their rados read
1940 * came back. If we don't wait for them, and destroy the cache, when
1941 * the rados reads do come back their callback will try to access the
1942 * no-longer-valid ObjectCacher.
1943 */
1944 while (reads_outstanding > 0) {
1945 ldout(cct, 10) << "Waiting for all reads to complete. Number left: "
1946 << reads_outstanding << dendl;
1947 read_cond.Wait(lock);
1948 }
1949
1950 lock.Unlock();
1951 ldout(cct, 10) << "flusher finish" << dendl;
1952}
1953
1954
1955// -------------------------------------------------
1956
1957bool ObjectCacher::set_is_empty(ObjectSet *oset)
1958{
1959 assert(lock.is_locked());
1960 if (oset->objects.empty())
1961 return true;
1962
1963 for (xlist<Object*>::iterator p = oset->objects.begin(); !p.end(); ++p)
1964 if (!(*p)->is_empty())
1965 return false;
1966
1967 return true;
1968}
1969
1970bool ObjectCacher::set_is_cached(ObjectSet *oset)
1971{
1972 assert(lock.is_locked());
1973 if (oset->objects.empty())
1974 return false;
1975
1976 for (xlist<Object*>::iterator p = oset->objects.begin();
1977 !p.end(); ++p) {
1978 Object *ob = *p;
1979 for (map<loff_t,BufferHead*>::iterator q = ob->data.begin();
1980 q != ob->data.end();
1981 ++q) {
1982 BufferHead *bh = q->second;
1983 if (!bh->is_dirty() && !bh->is_tx())
1984 return true;
1985 }
1986 }
1987
1988 return false;
1989}
1990
1991bool ObjectCacher::set_is_dirty_or_committing(ObjectSet *oset)
1992{
1993 assert(lock.is_locked());
1994 if (oset->objects.empty())
1995 return false;
1996
1997 for (xlist<Object*>::iterator i = oset->objects.begin();
1998 !i.end(); ++i) {
1999 Object *ob = *i;
2000
2001 for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
2002 p != ob->data.end();
2003 ++p) {
2004 BufferHead *bh = p->second;
2005 if (bh->is_dirty() || bh->is_tx())
2006 return true;
2007 }
2008 }
2009
2010 return false;
2011}
2012
2013
2014// purge. non-blocking. violently removes dirty buffers from cache.
2015void ObjectCacher::purge(Object *ob)
2016{
2017 assert(lock.is_locked());
2018 ldout(cct, 10) << "purge " << *ob << dendl;
2019
2020 ob->truncate(0);
2021}
2022
2023
2024// flush. non-blocking. no callback.
2025// true if clean, already flushed.
2026// false if we wrote something.
2027// be sloppy about the ranges and flush any buffer it touches
31f18b77
FG
2028bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length,
2029 ZTracer::Trace *trace)
7c673cae 2030{
31f18b77 2031 assert(trace != nullptr);
7c673cae
FG
2032 assert(lock.is_locked());
2033 list<BufferHead*> blist;
2034 bool clean = true;
2035 ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl;
2036 for (map<loff_t,BufferHead*>::const_iterator p = ob->data_lower_bound(offset);
2037 p != ob->data.end();
2038 ++p) {
2039 BufferHead *bh = p->second;
2040 ldout(cct, 20) << "flush " << *bh << dendl;
2041 if (length && bh->start() > offset+length) {
2042 break;
2043 }
2044 if (bh->is_tx()) {
2045 clean = false;
2046 continue;
2047 }
2048 if (!bh->is_dirty()) {
2049 continue;
2050 }
2051
2052 if (scattered_write)
2053 blist.push_back(bh);
2054 else
31f18b77 2055 bh_write(bh, *trace);
7c673cae
FG
2056 clean = false;
2057 }
2058 if (scattered_write && !blist.empty())
2059 bh_write_scattered(blist);
2060
2061 return clean;
2062}
2063
2064bool ObjectCacher::_flush_set_finish(C_GatherBuilder *gather,
2065 Context *onfinish)
2066{
2067 assert(lock.is_locked());
2068 if (gather->has_subs()) {
2069 gather->set_finisher(onfinish);
2070 gather->activate();
2071 return false;
2072 }
2073
2074 ldout(cct, 10) << "flush_set has no dirty|tx bhs" << dendl;
2075 onfinish->complete(0);
2076 return true;
2077}
2078
2079// flush. non-blocking, takes callback.
2080// returns true if already flushed
2081bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
2082{
2083 assert(lock.is_locked());
2084 assert(onfinish != NULL);
2085 if (oset->objects.empty()) {
2086 ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2087 onfinish->complete(0);
2088 return true;
2089 }
2090
2091 ldout(cct, 10) << "flush_set " << oset << dendl;
2092
2093 // we'll need to wait for all objects to flush!
2094 C_GatherBuilder gather(cct);
2095 set<Object*> waitfor_commit;
2096
2097 list<BufferHead*> blist;
2098 Object *last_ob = NULL;
2099 set<BufferHead*, BufferHead::ptr_lt>::const_iterator it, p, q;
2100
2101 // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
2102 // order. But items in oset->objects are not sorted. So the iterator can
2103 // point to any buffer head in the ObjectSet
2104 BufferHead key(*oset->objects.begin());
2105 it = dirty_or_tx_bh.lower_bound(&key);
2106 p = q = it;
2107
2108 bool backwards = true;
2109 if (it != dirty_or_tx_bh.begin())
2110 --it;
2111 else
2112 backwards = false;
2113
2114 for (; p != dirty_or_tx_bh.end(); p = q) {
2115 ++q;
2116 BufferHead *bh = *p;
2117 if (bh->ob->oset != oset)
2118 break;
2119 waitfor_commit.insert(bh->ob);
2120 if (bh->is_dirty()) {
2121 if (scattered_write) {
2122 if (last_ob != bh->ob) {
2123 if (!blist.empty()) {
2124 bh_write_scattered(blist);
2125 blist.clear();
2126 }
2127 last_ob = bh->ob;
2128 }
2129 blist.push_back(bh);
2130 } else {
31f18b77 2131 bh_write(bh, {});
7c673cae
FG
2132 }
2133 }
2134 }
2135
2136 if (backwards) {
2137 for(p = q = it; true; p = q) {
2138 if (q != dirty_or_tx_bh.begin())
2139 --q;
2140 else
2141 backwards = false;
2142 BufferHead *bh = *p;
2143 if (bh->ob->oset != oset)
2144 break;
2145 waitfor_commit.insert(bh->ob);
2146 if (bh->is_dirty()) {
2147 if (scattered_write) {
2148 if (last_ob != bh->ob) {
2149 if (!blist.empty()) {
2150 bh_write_scattered(blist);
2151 blist.clear();
2152 }
2153 last_ob = bh->ob;
2154 }
2155 blist.push_front(bh);
2156 } else {
31f18b77 2157 bh_write(bh, {});
7c673cae
FG
2158 }
2159 }
2160 if (!backwards)
2161 break;
2162 }
2163 }
2164
2165 if (scattered_write && !blist.empty())
2166 bh_write_scattered(blist);
2167
2168 for (set<Object*>::iterator i = waitfor_commit.begin();
2169 i != waitfor_commit.end(); ++i) {
2170 Object *ob = *i;
2171
2172 // we'll need to gather...
2173 ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2174 << ob->last_write_tid << " on " << *ob << dendl;
2175 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2176 }
2177
2178 return _flush_set_finish(&gather, onfinish);
2179}
2180
2181// flush. non-blocking, takes callback.
2182// returns true if already flushed
2183bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv,
31f18b77 2184 ZTracer::Trace *trace, Context *onfinish)
7c673cae
FG
2185{
2186 assert(lock.is_locked());
31f18b77 2187 assert(trace != nullptr);
7c673cae
FG
2188 assert(onfinish != NULL);
2189 if (oset->objects.empty()) {
2190 ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2191 onfinish->complete(0);
2192 return true;
2193 }
2194
2195 ldout(cct, 10) << "flush_set " << oset << " on " << exv.size()
2196 << " ObjectExtents" << dendl;
2197
2198 // we'll need to wait for all objects to flush!
2199 C_GatherBuilder gather(cct);
2200
2201 for (vector<ObjectExtent>::iterator p = exv.begin();
2202 p != exv.end();
2203 ++p) {
2204 ObjectExtent &ex = *p;
2205 sobject_t soid(ex.oid, CEPH_NOSNAP);
2206 if (objects[oset->poolid].count(soid) == 0)
2207 continue;
2208 Object *ob = objects[oset->poolid][soid];
2209
2210 ldout(cct, 20) << "flush_set " << oset << " ex " << ex << " ob " << soid
2211 << " " << ob << dendl;
2212
31f18b77 2213 if (!flush(ob, ex.offset, ex.length, trace)) {
7c673cae
FG
2214 // we'll need to gather...
2215 ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2216 << ob->last_write_tid << " on " << *ob << dendl;
2217 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2218 }
2219 }
2220
2221 return _flush_set_finish(&gather, onfinish);
2222}
2223
2224// flush all dirty data. non-blocking, takes callback.
2225// returns true if already flushed
2226bool ObjectCacher::flush_all(Context *onfinish)
2227{
2228 assert(lock.is_locked());
2229 assert(onfinish != NULL);
2230
2231 ldout(cct, 10) << "flush_all " << dendl;
2232
2233 // we'll need to wait for all objects to flush!
2234 C_GatherBuilder gather(cct);
2235 set<Object*> waitfor_commit;
2236
2237 list<BufferHead*> blist;
2238 Object *last_ob = NULL;
2239 set<BufferHead*, BufferHead::ptr_lt>::iterator next, it;
2240 next = it = dirty_or_tx_bh.begin();
2241 while (it != dirty_or_tx_bh.end()) {
2242 ++next;
2243 BufferHead *bh = *it;
2244 waitfor_commit.insert(bh->ob);
2245
2246 if (bh->is_dirty()) {
2247 if (scattered_write) {
2248 if (last_ob != bh->ob) {
2249 if (!blist.empty()) {
2250 bh_write_scattered(blist);
2251 blist.clear();
2252 }
2253 last_ob = bh->ob;
2254 }
2255 blist.push_back(bh);
2256 } else {
31f18b77 2257 bh_write(bh, {});
7c673cae
FG
2258 }
2259 }
2260
2261 it = next;
2262 }
2263
2264 if (scattered_write && !blist.empty())
2265 bh_write_scattered(blist);
2266
2267 for (set<Object*>::iterator i = waitfor_commit.begin();
2268 i != waitfor_commit.end();
2269 ++i) {
2270 Object *ob = *i;
2271
2272 // we'll need to gather...
2273 ldout(cct, 10) << "flush_all will wait for ack tid "
2274 << ob->last_write_tid << " on " << *ob << dendl;
2275 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2276 }
2277
2278 return _flush_set_finish(&gather, onfinish);
2279}
2280
2281void ObjectCacher::purge_set(ObjectSet *oset)
2282{
2283 assert(lock.is_locked());
2284 if (oset->objects.empty()) {
2285 ldout(cct, 10) << "purge_set on " << oset << " dne" << dendl;
2286 return;
2287 }
2288
2289 ldout(cct, 10) << "purge_set " << oset << dendl;
2290 const bool were_dirty = oset->dirty_or_tx > 0;
2291
2292 for (xlist<Object*>::iterator i = oset->objects.begin();
2293 !i.end(); ++i) {
2294 Object *ob = *i;
2295 purge(ob);
2296 }
2297
2298 // Although we have purged rather than flushed, caller should still
2299 // drop any resources associate with dirty data.
2300 assert(oset->dirty_or_tx == 0);
2301 if (flush_set_callback && were_dirty) {
2302 flush_set_callback(flush_set_callback_arg, oset);
2303 }
2304}
2305
2306
2307loff_t ObjectCacher::release(Object *ob)
2308{
2309 assert(lock.is_locked());
2310 list<BufferHead*> clean;
2311 loff_t o_unclean = 0;
2312
2313 for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
2314 p != ob->data.end();
2315 ++p) {
2316 BufferHead *bh = p->second;
2317 if (bh->is_clean() || bh->is_zero() || bh->is_error())
2318 clean.push_back(bh);
2319 else
2320 o_unclean += bh->length();
2321 }
2322
2323 for (list<BufferHead*>::iterator p = clean.begin();
2324 p != clean.end();
2325 ++p) {
2326 bh_remove(ob, *p);
2327 delete *p;
2328 }
2329
2330 if (ob->can_close()) {
2331 ldout(cct, 10) << "release trimming " << *ob << dendl;
2332 close_object(ob);
2333 assert(o_unclean == 0);
2334 return 0;
2335 }
2336
2337 if (ob->complete) {
2338 ldout(cct, 10) << "release clearing complete on " << *ob << dendl;
2339 ob->complete = false;
2340 }
2341 if (!ob->exists) {
2342 ldout(cct, 10) << "release setting exists on " << *ob << dendl;
2343 ob->exists = true;
2344 }
2345
2346 return o_unclean;
2347}
2348
2349loff_t ObjectCacher::release_set(ObjectSet *oset)
2350{
2351 assert(lock.is_locked());
2352 // return # bytes not clean (and thus not released).
2353 loff_t unclean = 0;
2354
2355 if (oset->objects.empty()) {
2356 ldout(cct, 10) << "release_set on " << oset << " dne" << dendl;
2357 return 0;
2358 }
2359
2360 ldout(cct, 10) << "release_set " << oset << dendl;
2361
2362 xlist<Object*>::iterator q;
2363 for (xlist<Object*>::iterator p = oset->objects.begin();
2364 !p.end(); ) {
2365 q = p;
2366 ++q;
2367 Object *ob = *p;
2368
2369 loff_t o_unclean = release(ob);
2370 unclean += o_unclean;
2371
2372 if (o_unclean)
2373 ldout(cct, 10) << "release_set " << oset << " " << *ob
2374 << " has " << o_unclean << " bytes left"
2375 << dendl;
2376 p = q;
2377 }
2378
2379 if (unclean) {
2380 ldout(cct, 10) << "release_set " << oset
2381 << ", " << unclean << " bytes left" << dendl;
2382 }
2383
2384 return unclean;
2385}
2386
2387
2388uint64_t ObjectCacher::release_all()
2389{
2390 assert(lock.is_locked());
2391 ldout(cct, 10) << "release_all" << dendl;
2392 uint64_t unclean = 0;
2393
2394 vector<ceph::unordered_map<sobject_t, Object*> >::iterator i
2395 = objects.begin();
2396 while (i != objects.end()) {
2397 ceph::unordered_map<sobject_t, Object*>::iterator p = i->begin();
2398 while (p != i->end()) {
2399 ceph::unordered_map<sobject_t, Object*>::iterator n = p;
2400 ++n;
2401
2402 Object *ob = p->second;
2403
2404 loff_t o_unclean = release(ob);
2405 unclean += o_unclean;
2406
2407 if (o_unclean)
2408 ldout(cct, 10) << "release_all " << *ob
2409 << " has " << o_unclean << " bytes left"
2410 << dendl;
2411 p = n;
2412 }
2413 ++i;
2414 }
2415
2416 if (unclean) {
2417 ldout(cct, 10) << "release_all unclean " << unclean << " bytes left"
2418 << dendl;
2419 }
2420
2421 return unclean;
2422}
2423
2424void ObjectCacher::clear_nonexistence(ObjectSet *oset)
2425{
2426 assert(lock.is_locked());
2427 ldout(cct, 10) << "clear_nonexistence() " << oset << dendl;
2428
2429 for (xlist<Object*>::iterator p = oset->objects.begin();
2430 !p.end(); ++p) {
2431 Object *ob = *p;
2432 if (!ob->exists) {
2433 ldout(cct, 10) << " setting exists and complete on " << *ob << dendl;
2434 ob->exists = true;
2435 ob->complete = false;
2436 }
2437 for (xlist<C_ReadFinish*>::iterator q = ob->reads.begin();
2438 !q.end(); ++q) {
2439 C_ReadFinish *comp = *q;
2440 comp->distrust_enoent();
2441 }
2442 }
2443}
2444
2445/**
2446 * discard object extents from an ObjectSet by removing the objects in
2447 * exls from the in-memory oset.
2448 */
2449void ObjectCacher::discard_set(ObjectSet *oset, const vector<ObjectExtent>& exls)
2450{
2451 assert(lock.is_locked());
2452 if (oset->objects.empty()) {
2453 ldout(cct, 10) << "discard_set on " << oset << " dne" << dendl;
2454 return;
2455 }
2456
2457 ldout(cct, 10) << "discard_set " << oset << dendl;
2458
2459 bool were_dirty = oset->dirty_or_tx > 0;
2460
2461 for (vector<ObjectExtent>::const_iterator p = exls.begin();
2462 p != exls.end();
2463 ++p) {
2464 ldout(cct, 10) << "discard_set " << oset << " ex " << *p << dendl;
2465 const ObjectExtent &ex = *p;
2466 sobject_t soid(ex.oid, CEPH_NOSNAP);
2467 if (objects[oset->poolid].count(soid) == 0)
2468 continue;
2469 Object *ob = objects[oset->poolid][soid];
2470
2471 ob->discard(ex.offset, ex.length);
2472 }
2473
2474 // did we truncate off dirty data?
2475 if (flush_set_callback &&
2476 were_dirty && oset->dirty_or_tx == 0)
2477 flush_set_callback(flush_set_callback_arg, oset);
2478}
2479
2480void ObjectCacher::verify_stats() const
2481{
2482 assert(lock.is_locked());
2483 ldout(cct, 10) << "verify_stats" << dendl;
2484
2485 loff_t clean = 0, zero = 0, dirty = 0, rx = 0, tx = 0, missing = 0,
2486 error = 0;
2487 for (vector<ceph::unordered_map<sobject_t, Object*> >::const_iterator i
2488 = objects.begin();
2489 i != objects.end();
2490 ++i) {
2491 for (ceph::unordered_map<sobject_t, Object*>::const_iterator p
2492 = i->begin();
2493 p != i->end();
2494 ++p) {
2495 Object *ob = p->second;
2496 for (map<loff_t, BufferHead*>::const_iterator q = ob->data.begin();
2497 q != ob->data.end();
2498 ++q) {
2499 BufferHead *bh = q->second;
2500 switch (bh->get_state()) {
2501 case BufferHead::STATE_MISSING:
2502 missing += bh->length();
2503 break;
2504 case BufferHead::STATE_CLEAN:
2505 clean += bh->length();
2506 break;
2507 case BufferHead::STATE_ZERO:
2508 zero += bh->length();
2509 break;
2510 case BufferHead::STATE_DIRTY:
2511 dirty += bh->length();
2512 break;
2513 case BufferHead::STATE_TX:
2514 tx += bh->length();
2515 break;
2516 case BufferHead::STATE_RX:
2517 rx += bh->length();
2518 break;
2519 case BufferHead::STATE_ERROR:
2520 error += bh->length();
2521 break;
2522 default:
2523 ceph_abort();
2524 }
2525 }
2526 }
2527 }
2528
2529 ldout(cct, 10) << " clean " << clean << " rx " << rx << " tx " << tx
2530 << " dirty " << dirty << " missing " << missing
2531 << " error " << error << dendl;
2532 assert(clean == stat_clean);
2533 assert(rx == stat_rx);
2534 assert(tx == stat_tx);
2535 assert(dirty == stat_dirty);
2536 assert(missing == stat_missing);
2537 assert(zero == stat_zero);
2538 assert(error == stat_error);
2539}
2540
2541void ObjectCacher::bh_stat_add(BufferHead *bh)
2542{
2543 assert(lock.is_locked());
2544 switch (bh->get_state()) {
2545 case BufferHead::STATE_MISSING:
2546 stat_missing += bh->length();
2547 break;
2548 case BufferHead::STATE_CLEAN:
2549 stat_clean += bh->length();
2550 break;
2551 case BufferHead::STATE_ZERO:
2552 stat_zero += bh->length();
2553 break;
2554 case BufferHead::STATE_DIRTY:
2555 stat_dirty += bh->length();
2556 bh->ob->dirty_or_tx += bh->length();
2557 bh->ob->oset->dirty_or_tx += bh->length();
2558 break;
2559 case BufferHead::STATE_TX:
2560 stat_tx += bh->length();
2561 bh->ob->dirty_or_tx += bh->length();
2562 bh->ob->oset->dirty_or_tx += bh->length();
2563 break;
2564 case BufferHead::STATE_RX:
2565 stat_rx += bh->length();
2566 break;
2567 case BufferHead::STATE_ERROR:
2568 stat_error += bh->length();
2569 break;
2570 default:
2571 assert(0 == "bh_stat_add: invalid bufferhead state");
2572 }
2573 if (get_stat_dirty_waiting() > 0)
2574 stat_cond.Signal();
2575}
2576
2577void ObjectCacher::bh_stat_sub(BufferHead *bh)
2578{
2579 assert(lock.is_locked());
2580 switch (bh->get_state()) {
2581 case BufferHead::STATE_MISSING:
2582 stat_missing -= bh->length();
2583 break;
2584 case BufferHead::STATE_CLEAN:
2585 stat_clean -= bh->length();
2586 break;
2587 case BufferHead::STATE_ZERO:
2588 stat_zero -= bh->length();
2589 break;
2590 case BufferHead::STATE_DIRTY:
2591 stat_dirty -= bh->length();
2592 bh->ob->dirty_or_tx -= bh->length();
2593 bh->ob->oset->dirty_or_tx -= bh->length();
2594 break;
2595 case BufferHead::STATE_TX:
2596 stat_tx -= bh->length();
2597 bh->ob->dirty_or_tx -= bh->length();
2598 bh->ob->oset->dirty_or_tx -= bh->length();
2599 break;
2600 case BufferHead::STATE_RX:
2601 stat_rx -= bh->length();
2602 break;
2603 case BufferHead::STATE_ERROR:
2604 stat_error -= bh->length();
2605 break;
2606 default:
2607 assert(0 == "bh_stat_sub: invalid bufferhead state");
2608 }
2609}
2610
2611void ObjectCacher::bh_set_state(BufferHead *bh, int s)
2612{
2613 assert(lock.is_locked());
2614 int state = bh->get_state();
2615 // move between lru lists?
2616 if (s == BufferHead::STATE_DIRTY && state != BufferHead::STATE_DIRTY) {
2617 bh_lru_rest.lru_remove(bh);
2618 bh_lru_dirty.lru_insert_top(bh);
2619 } else if (s != BufferHead::STATE_DIRTY &&state == BufferHead::STATE_DIRTY) {
2620 bh_lru_dirty.lru_remove(bh);
2621 if (bh->get_dontneed())
2622 bh_lru_rest.lru_insert_bot(bh);
2623 else
2624 bh_lru_rest.lru_insert_top(bh);
2625 }
2626
2627 if ((s == BufferHead::STATE_TX ||
2628 s == BufferHead::STATE_DIRTY) &&
2629 state != BufferHead::STATE_TX &&
2630 state != BufferHead::STATE_DIRTY) {
2631 dirty_or_tx_bh.insert(bh);
2632 } else if ((state == BufferHead::STATE_TX ||
2633 state == BufferHead::STATE_DIRTY) &&
2634 s != BufferHead::STATE_TX &&
2635 s != BufferHead::STATE_DIRTY) {
2636 dirty_or_tx_bh.erase(bh);
2637 }
2638
2639 if (s != BufferHead::STATE_ERROR &&
2640 state == BufferHead::STATE_ERROR) {
2641 bh->error = 0;
2642 }
2643
2644 // set state
2645 bh_stat_sub(bh);
2646 bh->set_state(s);
2647 bh_stat_add(bh);
2648}
2649
2650void ObjectCacher::bh_add(Object *ob, BufferHead *bh)
2651{
2652 assert(lock.is_locked());
2653 ldout(cct, 30) << "bh_add " << *ob << " " << *bh << dendl;
2654 ob->add_bh(bh);
2655 if (bh->is_dirty()) {
2656 bh_lru_dirty.lru_insert_top(bh);
2657 dirty_or_tx_bh.insert(bh);
2658 } else {
2659 if (bh->get_dontneed())
2660 bh_lru_rest.lru_insert_bot(bh);
2661 else
2662 bh_lru_rest.lru_insert_top(bh);
2663 }
2664
2665 if (bh->is_tx()) {
2666 dirty_or_tx_bh.insert(bh);
2667 }
2668 bh_stat_add(bh);
2669}
2670
2671void ObjectCacher::bh_remove(Object *ob, BufferHead *bh)
2672{
2673 assert(lock.is_locked());
2674 assert(bh->get_journal_tid() == 0);
2675 ldout(cct, 30) << "bh_remove " << *ob << " " << *bh << dendl;
2676 ob->remove_bh(bh);
2677 if (bh->is_dirty()) {
2678 bh_lru_dirty.lru_remove(bh);
2679 dirty_or_tx_bh.erase(bh);
2680 } else {
2681 bh_lru_rest.lru_remove(bh);
2682 }
2683
2684 if (bh->is_tx()) {
2685 dirty_or_tx_bh.erase(bh);
2686 }
2687 bh_stat_sub(bh);
2688 if (get_stat_dirty_waiting() > 0)
2689 stat_cond.Signal();
2690}
2691