]> git.proxmox.com Git - ceph.git/blob - ceph/src/osdc/ObjectCacher.cc
27df042f2f86c5c6c90c625a2fb492cb068cf2cc
[ceph.git] / ceph / src / osdc / ObjectCacher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <limits.h>
5
6 #include "msg/Messenger.h"
7 #include "ObjectCacher.h"
8 #include "WritebackHandler.h"
9 #include "common/errno.h"
10 #include "common/perf_counters.h"
11
12 #include "include/ceph_assert.h"
13
14 #define MAX_FLUSH_UNDER_LOCK 20 ///< max bh's we start writeback on
15 #define BUFFER_MEMORY_WEIGHT CEPH_PAGE_SHIFT // memory usage of BufferHead, count in (1<<n)
16
17 using std::chrono::seconds;
18 /// while holding the lock
19
20 /*** ObjectCacher::BufferHead ***/
21
22
23 /*** ObjectCacher::Object ***/
24
25 #define dout_subsys ceph_subsys_objectcacher
26 #undef dout_prefix
27 #define dout_prefix *_dout << "objectcacher.object(" << oid << ") "
28
29
30
31 class ObjectCacher::C_ReadFinish : public Context {
32 ObjectCacher *oc;
33 int64_t poolid;
34 sobject_t oid;
35 loff_t start;
36 uint64_t length;
37 xlist<C_ReadFinish*>::item set_item;
38 bool trust_enoent;
39 ceph_tid_t tid;
40 ZTracer::Trace trace;
41
42 public:
43 bufferlist bl;
44 C_ReadFinish(ObjectCacher *c, Object *ob, ceph_tid_t t, loff_t s,
45 uint64_t l, const ZTracer::Trace &trace) :
46 oc(c), poolid(ob->oloc.pool), oid(ob->get_soid()), start(s), length(l),
47 set_item(this), trust_enoent(true),
48 tid(t), trace(trace) {
49 ob->reads.push_back(&set_item);
50 }
51
52 void finish(int r) override {
53 oc->bh_read_finish(poolid, oid, tid, start, length, bl, r, trust_enoent);
54 trace.event("finish");
55
56 // object destructor clears the list
57 if (set_item.is_on_list())
58 set_item.remove_myself();
59 }
60
61 void distrust_enoent() {
62 trust_enoent = false;
63 }
64 };
65
66 class ObjectCacher::C_RetryRead : public Context {
67 ObjectCacher *oc;
68 OSDRead *rd;
69 ObjectSet *oset;
70 Context *onfinish;
71 ZTracer::Trace trace;
72 public:
73 C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c,
74 const ZTracer::Trace &trace)
75 : oc(_oc), rd(r), oset(os), onfinish(c), trace(trace) {
76 }
77 void finish(int r) override {
78 if (r >= 0) {
79 r = oc->_readx(rd, oset, onfinish, false, &trace);
80 }
81
82 if (r == 0) {
83 // read is still in-progress
84 return;
85 }
86
87 trace.event("finish");
88 if (onfinish) {
89 onfinish->complete(r);
90 }
91 }
92 };
93
94 ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left,
95 loff_t off)
96 {
97 ceph_assert(ceph_mutex_is_locked(oc->lock));
98 ldout(oc->cct, 20) << "split " << *left << " at " << off << dendl;
99
100 // split off right
101 ObjectCacher::BufferHead *right = new BufferHead(this);
102
103 //inherit and if later access, this auto clean.
104 right->set_dontneed(left->get_dontneed());
105 right->set_nocache(left->get_nocache());
106
107 right->last_write_tid = left->last_write_tid;
108 right->last_read_tid = left->last_read_tid;
109 right->set_state(left->get_state());
110 right->snapc = left->snapc;
111 right->set_journal_tid(left->journal_tid);
112
113 loff_t newleftlen = off - left->start();
114 right->set_start(off);
115 right->set_length(left->length() - newleftlen);
116
117 // shorten left
118 oc->bh_stat_sub(left);
119 left->set_length(newleftlen);
120 oc->bh_stat_add(left);
121
122 // add right
123 oc->bh_add(this, right);
124
125 // split buffers too
126 bufferlist bl;
127 bl.claim(left->bl);
128 if (bl.length()) {
129 ceph_assert(bl.length() == (left->length() + right->length()));
130 right->bl.substr_of(bl, left->length(), right->length());
131 left->bl.substr_of(bl, 0, left->length());
132 }
133
134 // move read waiters
135 if (!left->waitfor_read.empty()) {
136 map<loff_t, list<Context*> >::iterator start_remove
137 = left->waitfor_read.begin();
138 while (start_remove != left->waitfor_read.end() &&
139 start_remove->first < right->start())
140 ++start_remove;
141 for (map<loff_t, list<Context*> >::iterator p = start_remove;
142 p != left->waitfor_read.end(); ++p) {
143 ldout(oc->cct, 20) << "split moving waiters at byte " << p->first
144 << " to right bh" << dendl;
145 right->waitfor_read[p->first].swap( p->second );
146 ceph_assert(p->second.empty());
147 }
148 left->waitfor_read.erase(start_remove, left->waitfor_read.end());
149 }
150
151 ldout(oc->cct, 20) << "split left is " << *left << dendl;
152 ldout(oc->cct, 20) << "split right is " << *right << dendl;
153 return right;
154 }
155
156
157 void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
158 {
159 ceph_assert(ceph_mutex_is_locked(oc->lock));
160
161 ldout(oc->cct, 10) << "merge_left " << *left << " + " << *right << dendl;
162 if (left->get_journal_tid() == 0) {
163 left->set_journal_tid(right->get_journal_tid());
164 }
165 right->set_journal_tid(0);
166
167 oc->bh_remove(this, right);
168 oc->bh_stat_sub(left);
169 left->set_length(left->length() + right->length());
170 oc->bh_stat_add(left);
171
172 // data
173 left->bl.claim_append(right->bl);
174
175 // version
176 // note: this is sorta busted, but should only be used for dirty buffers
177 left->last_write_tid = std::max( left->last_write_tid, right->last_write_tid );
178 left->last_write = std::max( left->last_write, right->last_write );
179
180 left->set_dontneed(right->get_dontneed() ? left->get_dontneed() : false);
181 left->set_nocache(right->get_nocache() ? left->get_nocache() : false);
182
183 // waiters
184 for (map<loff_t, list<Context*> >::iterator p = right->waitfor_read.begin();
185 p != right->waitfor_read.end();
186 ++p)
187 left->waitfor_read[p->first].splice(left->waitfor_read[p->first].begin(),
188 p->second );
189
190 // hose right
191 delete right;
192
193 ldout(oc->cct, 10) << "merge_left result " << *left << dendl;
194 }
195
196 bool ObjectCacher::Object::can_merge_bh(BufferHead *left, BufferHead *right)
197 {
198 if (left->end() != right->start() ||
199 left->get_state() != right->get_state() ||
200 !left->can_merge_journal(right))
201 return false;
202 if (left->is_tx() && left->last_write_tid != right->last_write_tid)
203 return false;
204 return true;
205 }
206
207 void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
208 {
209 ceph_assert(ceph_mutex_is_locked(oc->lock));
210 ldout(oc->cct, 10) << "try_merge_bh " << *bh << dendl;
211
212 // do not merge rx buffers; last_read_tid may not match
213 if (bh->is_rx())
214 return;
215
216 // to the left?
217 map<loff_t,BufferHead*>::iterator p = data.find(bh->start());
218 ceph_assert(p->second == bh);
219 if (p != data.begin()) {
220 --p;
221 if (can_merge_bh(p->second, bh)) {
222 merge_left(p->second, bh);
223 bh = p->second;
224 } else {
225 ++p;
226 }
227 }
228 // to the right?
229 ceph_assert(p->second == bh);
230 ++p;
231 if (p != data.end() && can_merge_bh(bh, p->second))
232 merge_left(bh, p->second);
233
234 maybe_rebuild_buffer(bh);
235 }
236
237 void ObjectCacher::Object::maybe_rebuild_buffer(BufferHead *bh)
238 {
239 auto& bl = bh->bl;
240 if (bl.get_num_buffers() <= 1)
241 return;
242
243 auto wasted = bl.get_wasted_space();
244 if (wasted * 2 > bl.length() &&
245 wasted > (1U << BUFFER_MEMORY_WEIGHT))
246 bl.rebuild();
247 }
248
249 /*
250 * count bytes we have cached in given range
251 */
252 bool ObjectCacher::Object::is_cached(loff_t cur, loff_t left) const
253 {
254 ceph_assert(ceph_mutex_is_locked(oc->lock));
255 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(cur);
256 while (left > 0) {
257 if (p == data.end())
258 return false;
259
260 if (p->first <= cur) {
261 // have part of it
262 loff_t lenfromcur = std::min(p->second->end() - cur, left);
263 cur += lenfromcur;
264 left -= lenfromcur;
265 ++p;
266 continue;
267 } else if (p->first > cur) {
268 // gap
269 return false;
270 } else
271 ceph_abort();
272 }
273
274 return true;
275 }
276
277 /*
278 * all cached data in this range[off, off+len]
279 */
280 bool ObjectCacher::Object::include_all_cached_data(loff_t off, loff_t len)
281 {
282 ceph_assert(ceph_mutex_is_locked(oc->lock));
283 if (data.empty())
284 return true;
285 map<loff_t, BufferHead*>::iterator first = data.begin();
286 map<loff_t, BufferHead*>::reverse_iterator last = data.rbegin();
287 if (first->second->start() >= off && last->second->end() <= (off + len))
288 return true;
289 else
290 return false;
291 }
292
293 /*
294 * map a range of bytes into buffer_heads.
295 * - create missing buffer_heads as necessary.
296 */
297 int ObjectCacher::Object::map_read(ObjectExtent &ex,
298 map<loff_t, BufferHead*>& hits,
299 map<loff_t, BufferHead*>& missing,
300 map<loff_t, BufferHead*>& rx,
301 map<loff_t, BufferHead*>& errors)
302 {
303 ceph_assert(ceph_mutex_is_locked(oc->lock));
304 ldout(oc->cct, 10) << "map_read " << ex.oid << " "
305 << ex.offset << "~" << ex.length << dendl;
306
307 loff_t cur = ex.offset;
308 loff_t left = ex.length;
309
310 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
311 while (left > 0) {
312 // at end?
313 if (p == data.end()) {
314 // rest is a miss.
315 BufferHead *n = new BufferHead(this);
316 n->set_start(cur);
317 n->set_length(left);
318 oc->bh_add(this, n);
319 if (complete) {
320 oc->mark_zero(n);
321 hits[cur] = n;
322 ldout(oc->cct, 20) << "map_read miss+complete+zero " << left << " left, " << *n << dendl;
323 } else {
324 missing[cur] = n;
325 ldout(oc->cct, 20) << "map_read miss " << left << " left, " << *n << dendl;
326 }
327 cur += left;
328 ceph_assert(cur == (loff_t)ex.offset + (loff_t)ex.length);
329 break; // no more.
330 }
331
332 if (p->first <= cur) {
333 // have it (or part of it)
334 BufferHead *e = p->second;
335
336 if (e->is_clean() ||
337 e->is_dirty() ||
338 e->is_tx() ||
339 e->is_zero()) {
340 hits[cur] = e; // readable!
341 ldout(oc->cct, 20) << "map_read hit " << *e << dendl;
342 } else if (e->is_rx()) {
343 rx[cur] = e; // missing, not readable.
344 ldout(oc->cct, 20) << "map_read rx " << *e << dendl;
345 } else if (e->is_error()) {
346 errors[cur] = e;
347 ldout(oc->cct, 20) << "map_read error " << *e << dendl;
348 } else {
349 ceph_abort();
350 }
351
352 loff_t lenfromcur = std::min(e->end() - cur, left);
353 cur += lenfromcur;
354 left -= lenfromcur;
355 ++p;
356 continue; // more?
357
358 } else if (p->first > cur) {
359 // gap.. miss
360 loff_t next = p->first;
361 BufferHead *n = new BufferHead(this);
362 loff_t len = std::min(next - cur, left);
363 n->set_start(cur);
364 n->set_length(len);
365 oc->bh_add(this,n);
366 if (complete) {
367 oc->mark_zero(n);
368 hits[cur] = n;
369 ldout(oc->cct, 20) << "map_read gap+complete+zero " << *n << dendl;
370 } else {
371 missing[cur] = n;
372 ldout(oc->cct, 20) << "map_read gap " << *n << dendl;
373 }
374 cur += std::min(left, n->length());
375 left -= std::min(left, n->length());
376 continue; // more?
377 } else {
378 ceph_abort();
379 }
380 }
381 return 0;
382 }
383
384 void ObjectCacher::Object::audit_buffers()
385 {
386 loff_t offset = 0;
387 for (map<loff_t, BufferHead*>::const_iterator it = data.begin();
388 it != data.end(); ++it) {
389 if (it->first != it->second->start()) {
390 lderr(oc->cct) << "AUDIT FAILURE: map position " << it->first
391 << " does not match bh start position: "
392 << *it->second << dendl;
393 ceph_assert(it->first == it->second->start());
394 }
395 if (it->first < offset) {
396 lderr(oc->cct) << "AUDIT FAILURE: " << it->first << " " << *it->second
397 << " overlaps with previous bh " << *((--it)->second)
398 << dendl;
399 ceph_assert(it->first >= offset);
400 }
401 BufferHead *bh = it->second;
402 map<loff_t, list<Context*> >::const_iterator w_it;
403 for (w_it = bh->waitfor_read.begin();
404 w_it != bh->waitfor_read.end(); ++w_it) {
405 if (w_it->first < bh->start() ||
406 w_it->first >= bh->start() + bh->length()) {
407 lderr(oc->cct) << "AUDIT FAILURE: waiter at " << w_it->first
408 << " is not within bh " << *bh << dendl;
409 ceph_assert(w_it->first >= bh->start());
410 ceph_assert(w_it->first < bh->start() + bh->length());
411 }
412 }
413 offset = it->first + it->second->length();
414 }
415 }
416
417 /*
418 * map a range of extents on an object's buffer cache.
419 * - combine any bh's we're writing into one
420 * - break up bufferheads that don't fall completely within the range
421 * //no! - return a bh that includes the write. may also include
422 * other dirty data to left and/or right.
423 */
424 ObjectCacher::BufferHead *ObjectCacher::Object::map_write(ObjectExtent &ex,
425 ceph_tid_t tid)
426 {
427 ceph_assert(ceph_mutex_is_locked(oc->lock));
428 BufferHead *final = 0;
429
430 ldout(oc->cct, 10) << "map_write oex " << ex.oid
431 << " " << ex.offset << "~" << ex.length << dendl;
432
433 loff_t cur = ex.offset;
434 loff_t left = ex.length;
435
436 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
437 while (left > 0) {
438 loff_t max = left;
439
440 // at end ?
441 if (p == data.end()) {
442 if (final == NULL) {
443 final = new BufferHead(this);
444 replace_journal_tid(final, tid);
445 final->set_start( cur );
446 final->set_length( max );
447 oc->bh_add(this, final);
448 ldout(oc->cct, 10) << "map_write adding trailing bh " << *final << dendl;
449 } else {
450 oc->bh_stat_sub(final);
451 final->set_length(final->length() + max);
452 oc->bh_stat_add(final);
453 }
454 left -= max;
455 cur += max;
456 continue;
457 }
458
459 ldout(oc->cct, 10) << "cur is " << cur << ", p is " << *p->second << dendl;
460 //oc->verify_stats();
461
462 if (p->first <= cur) {
463 BufferHead *bh = p->second;
464 ldout(oc->cct, 10) << "map_write bh " << *bh << " intersected" << dendl;
465
466 if (p->first < cur) {
467 ceph_assert(final == 0);
468 if (cur + max >= bh->end()) {
469 // we want right bit (one splice)
470 final = split(bh, cur); // just split it, take right half.
471 maybe_rebuild_buffer(bh);
472 replace_journal_tid(final, tid);
473 ++p;
474 ceph_assert(p->second == final);
475 } else {
476 // we want middle bit (two splices)
477 final = split(bh, cur);
478 maybe_rebuild_buffer(bh);
479 ++p;
480 ceph_assert(p->second == final);
481 auto right = split(final, cur+max);
482 maybe_rebuild_buffer(right);
483 replace_journal_tid(final, tid);
484 }
485 } else {
486 ceph_assert(p->first == cur);
487 if (bh->length() <= max) {
488 // whole bufferhead, piece of cake.
489 } else {
490 // we want left bit (one splice)
491 auto right = split(bh, cur + max); // just split
492 maybe_rebuild_buffer(right);
493 }
494 if (final) {
495 oc->mark_dirty(bh);
496 oc->mark_dirty(final);
497 --p; // move iterator back to final
498 ceph_assert(p->second == final);
499 replace_journal_tid(bh, tid);
500 merge_left(final, bh);
501 } else {
502 final = bh;
503 replace_journal_tid(final, tid);
504 }
505 }
506
507 // keep going.
508 loff_t lenfromcur = final->end() - cur;
509 cur += lenfromcur;
510 left -= lenfromcur;
511 ++p;
512 continue;
513 } else {
514 // gap!
515 loff_t next = p->first;
516 loff_t glen = std::min(next - cur, max);
517 ldout(oc->cct, 10) << "map_write gap " << cur << "~" << glen << dendl;
518 if (final) {
519 oc->bh_stat_sub(final);
520 final->set_length(final->length() + glen);
521 oc->bh_stat_add(final);
522 } else {
523 final = new BufferHead(this);
524 replace_journal_tid(final, tid);
525 final->set_start( cur );
526 final->set_length( glen );
527 oc->bh_add(this, final);
528 }
529
530 cur += glen;
531 left -= glen;
532 continue; // more?
533 }
534 }
535
536 // set version
537 ceph_assert(final);
538 ceph_assert(final->get_journal_tid() == tid);
539 ldout(oc->cct, 10) << "map_write final is " << *final << dendl;
540
541 return final;
542 }
543
544 void ObjectCacher::Object::replace_journal_tid(BufferHead *bh,
545 ceph_tid_t tid) {
546 ceph_tid_t bh_tid = bh->get_journal_tid();
547
548 ceph_assert(tid == 0 || bh_tid <= tid);
549 if (bh_tid != 0 && bh_tid != tid) {
550 // inform journal that it should not expect a writeback from this extent
551 oc->writeback_handler.overwrite_extent(get_oid(), bh->start(),
552 bh->length(), bh_tid, tid);
553 }
554 bh->set_journal_tid(tid);
555 }
556
557 void ObjectCacher::Object::truncate(loff_t s)
558 {
559 ceph_assert(ceph_mutex_is_locked(oc->lock));
560 ldout(oc->cct, 10) << "truncate " << *this << " to " << s << dendl;
561
562 while (!data.empty()) {
563 BufferHead *bh = data.rbegin()->second;
564 if (bh->end() <= s)
565 break;
566
567 // split bh at truncation point?
568 if (bh->start() < s) {
569 split(bh, s);
570 maybe_rebuild_buffer(bh);
571 continue;
572 }
573
574 // remove bh entirely
575 ceph_assert(bh->start() >= s);
576 ceph_assert(bh->waitfor_read.empty());
577 replace_journal_tid(bh, 0);
578 oc->bh_remove(this, bh);
579 delete bh;
580 }
581 }
582
583 void ObjectCacher::Object::discard(loff_t off, loff_t len,
584 C_GatherBuilder* commit_gather)
585 {
586 ceph_assert(ceph_mutex_is_locked(oc->lock));
587 ldout(oc->cct, 10) << "discard " << *this << " " << off << "~" << len
588 << dendl;
589
590 if (!exists) {
591 ldout(oc->cct, 10) << " setting exists on " << *this << dendl;
592 exists = true;
593 }
594 if (complete) {
595 ldout(oc->cct, 10) << " clearing complete on " << *this << dendl;
596 complete = false;
597 }
598
599 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(off);
600 while (p != data.end()) {
601 BufferHead *bh = p->second;
602 if (bh->start() >= off + len)
603 break;
604
605 // split bh at truncation point?
606 if (bh->start() < off) {
607 split(bh, off);
608 maybe_rebuild_buffer(bh);
609 ++p;
610 continue;
611 }
612
613 ceph_assert(bh->start() >= off);
614 if (bh->end() > off + len) {
615 auto right = split(bh, off + len);
616 maybe_rebuild_buffer(right);
617 }
618
619 ++p;
620 ldout(oc->cct, 10) << "discard " << *this << " bh " << *bh << dendl;
621 replace_journal_tid(bh, 0);
622
623 if (bh->is_tx() && commit_gather != nullptr) {
624 // wait for the writeback to commit
625 waitfor_commit[bh->last_write_tid].emplace_back(commit_gather->new_sub());
626 } else if (bh->is_rx()) {
627 // cannot remove bh with in-flight read, but we can ensure the
628 // read won't overwrite the discard
629 bh->last_read_tid = ++oc->last_read_tid;
630 bh->bl.clear();
631 bh->set_nocache(true);
632 oc->mark_zero(bh);
633 // we should mark all Rx bh to zero
634 continue;
635 } else {
636 ceph_assert(bh->waitfor_read.empty());
637 }
638
639 oc->bh_remove(this, bh);
640 delete bh;
641 }
642 }
643
644
645
646 /*** ObjectCacher ***/
647
648 #undef dout_prefix
649 #define dout_prefix *_dout << "objectcacher "
650
651
652 ObjectCacher::ObjectCacher(CephContext *cct_, string name,
653 WritebackHandler& wb, ceph::mutex& l,
654 flush_set_callback_t flush_callback,
655 void *flush_callback_arg, uint64_t max_bytes,
656 uint64_t max_objects, uint64_t max_dirty,
657 uint64_t target_dirty, double max_dirty_age,
658 bool block_writes_upfront)
659 : perfcounter(NULL),
660 cct(cct_), writeback_handler(wb), name(name), lock(l),
661 max_dirty(max_dirty), target_dirty(target_dirty),
662 max_size(max_bytes), max_objects(max_objects),
663 max_dirty_age(ceph::make_timespan(max_dirty_age)),
664 block_writes_upfront(block_writes_upfront),
665 trace_endpoint("ObjectCacher"),
666 flush_set_callback(flush_callback),
667 flush_set_callback_arg(flush_callback_arg),
668 last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct),
669 stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
670 stat_missing(0), stat_error(0), stat_dirty_waiting(0),
671 stat_nr_dirty_waiters(0), reads_outstanding(0)
672 {
673 perf_start();
674 finisher.start();
675 scattered_write = writeback_handler.can_scattered_write();
676 }
677
678 ObjectCacher::~ObjectCacher()
679 {
680 finisher.stop();
681 perf_stop();
682 // we should be empty.
683 for (vector<ceph::unordered_map<sobject_t, Object *> >::iterator i
684 = objects.begin();
685 i != objects.end();
686 ++i)
687 ceph_assert(i->empty());
688 ceph_assert(bh_lru_rest.lru_get_size() == 0);
689 ceph_assert(bh_lru_dirty.lru_get_size() == 0);
690 ceph_assert(ob_lru.lru_get_size() == 0);
691 ceph_assert(dirty_or_tx_bh.empty());
692 }
693
694 void ObjectCacher::perf_start()
695 {
696 string n = "objectcacher-" + name;
697 PerfCountersBuilder plb(cct, n, l_objectcacher_first, l_objectcacher_last);
698
699 plb.add_u64_counter(l_objectcacher_cache_ops_hit,
700 "cache_ops_hit", "Hit operations");
701 plb.add_u64_counter(l_objectcacher_cache_ops_miss,
702 "cache_ops_miss", "Miss operations");
703 plb.add_u64_counter(l_objectcacher_cache_bytes_hit,
704 "cache_bytes_hit", "Hit data", NULL, 0, unit_t(UNIT_BYTES));
705 plb.add_u64_counter(l_objectcacher_cache_bytes_miss,
706 "cache_bytes_miss", "Miss data", NULL, 0, unit_t(UNIT_BYTES));
707 plb.add_u64_counter(l_objectcacher_data_read,
708 "data_read", "Read data");
709 plb.add_u64_counter(l_objectcacher_data_written,
710 "data_written", "Data written to cache");
711 plb.add_u64_counter(l_objectcacher_data_flushed,
712 "data_flushed", "Data flushed");
713 plb.add_u64_counter(l_objectcacher_overwritten_in_flush,
714 "data_overwritten_while_flushing",
715 "Data overwritten while flushing");
716 plb.add_u64_counter(l_objectcacher_write_ops_blocked, "write_ops_blocked",
717 "Write operations, delayed due to dirty limits");
718 plb.add_u64_counter(l_objectcacher_write_bytes_blocked,
719 "write_bytes_blocked",
720 "Write data blocked on dirty limit", NULL, 0, unit_t(UNIT_BYTES));
721 plb.add_time(l_objectcacher_write_time_blocked, "write_time_blocked",
722 "Time spent blocking a write due to dirty limits");
723
724 perfcounter = plb.create_perf_counters();
725 cct->get_perfcounters_collection()->add(perfcounter);
726 }
727
728 void ObjectCacher::perf_stop()
729 {
730 ceph_assert(perfcounter);
731 cct->get_perfcounters_collection()->remove(perfcounter);
732 delete perfcounter;
733 }
734
735 /* private */
736 ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid,
737 uint64_t object_no,
738 ObjectSet *oset,
739 object_locator_t &l,
740 uint64_t truncate_size,
741 uint64_t truncate_seq)
742 {
743 // XXX: Add handling of nspace in object_locator_t in cache
744 ceph_assert(ceph_mutex_is_locked(lock));
745 // have it?
746 if ((uint32_t)l.pool < objects.size()) {
747 if (objects[l.pool].count(oid)) {
748 Object *o = objects[l.pool][oid];
749 o->object_no = object_no;
750 o->truncate_size = truncate_size;
751 o->truncate_seq = truncate_seq;
752 return o;
753 }
754 } else {
755 objects.resize(l.pool+1);
756 }
757
758 // create it.
759 Object *o = new Object(this, oid, object_no, oset, l, truncate_size,
760 truncate_seq);
761 objects[l.pool][oid] = o;
762 ob_lru.lru_insert_top(o);
763 return o;
764 }
765
766 void ObjectCacher::close_object(Object *ob)
767 {
768 ceph_assert(ceph_mutex_is_locked(lock));
769 ldout(cct, 10) << "close_object " << *ob << dendl;
770 ceph_assert(ob->can_close());
771
772 // ok!
773 ob_lru.lru_remove(ob);
774 objects[ob->oloc.pool].erase(ob->get_soid());
775 ob->set_item.remove_myself();
776 delete ob;
777 }
778
779 void ObjectCacher::bh_read(BufferHead *bh, int op_flags,
780 const ZTracer::Trace &parent_trace)
781 {
782 ceph_assert(ceph_mutex_is_locked(lock));
783 ldout(cct, 7) << "bh_read on " << *bh << " outstanding reads "
784 << reads_outstanding << dendl;
785
786 ZTracer::Trace trace;
787 if (parent_trace.valid()) {
788 trace.init("", &trace_endpoint, &parent_trace);
789 trace.copy_name("bh_read " + bh->ob->get_oid().name);
790 trace.event("start");
791 }
792
793 mark_rx(bh);
794 bh->last_read_tid = ++last_read_tid;
795
796 // finisher
797 C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob, bh->last_read_tid,
798 bh->start(), bh->length(), trace);
799 // go
800 writeback_handler.read(bh->ob->get_oid(), bh->ob->get_object_number(),
801 bh->ob->get_oloc(), bh->start(), bh->length(),
802 bh->ob->get_snap(), &onfinish->bl,
803 bh->ob->truncate_size, bh->ob->truncate_seq,
804 op_flags, trace, onfinish);
805
806 ++reads_outstanding;
807 }
808
809 void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid,
810 ceph_tid_t tid, loff_t start,
811 uint64_t length, bufferlist &bl, int r,
812 bool trust_enoent)
813 {
814 ceph_assert(ceph_mutex_is_locked(lock));
815 ldout(cct, 7) << "bh_read_finish "
816 << oid
817 << " tid " << tid
818 << " " << start << "~" << length
819 << " (bl is " << bl.length() << ")"
820 << " returned " << r
821 << " outstanding reads " << reads_outstanding
822 << dendl;
823
824 if (r >= 0 && bl.length() < length) {
825 ldout(cct, 7) << "bh_read_finish " << oid << " padding " << start << "~"
826 << length << " with " << length - bl.length() << " bytes of zeroes"
827 << dendl;
828 bl.append_zero(length - bl.length());
829 }
830
831 list<Context*> ls;
832 int err = 0;
833
834 if (objects[poolid].count(oid) == 0) {
835 ldout(cct, 7) << "bh_read_finish no object cache" << dendl;
836 } else {
837 Object *ob = objects[poolid][oid];
838
839 if (r == -ENOENT && !ob->complete) {
840 // wake up *all* rx waiters, or else we risk reordering
841 // identical reads. e.g.
842 // read 1~1
843 // reply to unrelated 3~1 -> !exists
844 // read 1~1 -> immediate ENOENT
845 // reply to first 1~1 -> ooo ENOENT
846 bool allzero = true;
847 for (map<loff_t, BufferHead*>::iterator p = ob->data.begin();
848 p != ob->data.end(); ++p) {
849 BufferHead *bh = p->second;
850 for (map<loff_t, list<Context*> >::iterator p
851 = bh->waitfor_read.begin();
852 p != bh->waitfor_read.end();
853 ++p)
854 ls.splice(ls.end(), p->second);
855 bh->waitfor_read.clear();
856 if (!bh->is_zero() && !bh->is_rx())
857 allzero = false;
858 }
859
860 // just pass through and retry all waiters if we don't trust
861 // -ENOENT for this read
862 if (trust_enoent) {
863 ldout(cct, 7)
864 << "bh_read_finish ENOENT, marking complete and !exists on " << *ob
865 << dendl;
866 ob->complete = true;
867 ob->exists = false;
868
869 /* If all the bhs are effectively zero, get rid of them. All
870 * the waiters will be retried and get -ENOENT immediately, so
871 * it's safe to clean up the unneeded bh's now. Since we know
872 * it's safe to remove them now, do so, so they aren't hanging
873 *around waiting for more -ENOENTs from rados while the cache
874 * is being shut down.
875 *
876 * Only do this when all the bhs are rx or clean, to match the
877 * condition in _readx(). If there are any non-rx or non-clean
878 * bhs, _readx() will wait for the final result instead of
879 * returning -ENOENT immediately.
880 */
881 if (allzero) {
882 ldout(cct, 10)
883 << "bh_read_finish ENOENT and allzero, getting rid of "
884 << "bhs for " << *ob << dendl;
885 map<loff_t, BufferHead*>::iterator p = ob->data.begin();
886 while (p != ob->data.end()) {
887 BufferHead *bh = p->second;
888 // current iterator will be invalidated by bh_remove()
889 ++p;
890 bh_remove(ob, bh);
891 delete bh;
892 }
893 }
894 }
895 }
896
897 // apply to bh's!
898 loff_t opos = start;
899 while (true) {
900 map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(opos);
901 if (p == ob->data.end())
902 break;
903 if (opos >= start+(loff_t)length) {
904 ldout(cct, 20) << "break due to opos " << opos << " >= start+length "
905 << start << "+" << length << "=" << start+(loff_t)length
906 << dendl;
907 break;
908 }
909
910 BufferHead *bh = p->second;
911 ldout(cct, 20) << "checking bh " << *bh << dendl;
912
913 // finishers?
914 for (map<loff_t, list<Context*> >::iterator it
915 = bh->waitfor_read.begin();
916 it != bh->waitfor_read.end();
917 ++it)
918 ls.splice(ls.end(), it->second);
919 bh->waitfor_read.clear();
920
921 if (bh->start() > opos) {
922 ldout(cct, 1) << "bh_read_finish skipping gap "
923 << opos << "~" << bh->start() - opos
924 << dendl;
925 opos = bh->start();
926 continue;
927 }
928
929 if (!bh->is_rx()) {
930 ldout(cct, 10) << "bh_read_finish skipping non-rx " << *bh << dendl;
931 opos = bh->end();
932 continue;
933 }
934
935 if (bh->last_read_tid != tid) {
936 ldout(cct, 10) << "bh_read_finish bh->last_read_tid "
937 << bh->last_read_tid << " != tid " << tid
938 << ", skipping" << dendl;
939 opos = bh->end();
940 continue;
941 }
942
943 ceph_assert(opos >= bh->start());
944 ceph_assert(bh->start() == opos); // we don't merge rx bh's... yet!
945 ceph_assert(bh->length() <= start+(loff_t)length-opos);
946
947 if (bh->error < 0)
948 err = bh->error;
949
950 opos = bh->end();
951
952 if (r == -ENOENT) {
953 if (trust_enoent) {
954 ldout(cct, 10) << "bh_read_finish removing " << *bh << dendl;
955 bh_remove(ob, bh);
956 delete bh;
957 } else {
958 ldout(cct, 10) << "skipping unstrusted -ENOENT and will retry for "
959 << *bh << dendl;
960 }
961 continue;
962 }
963
964 if (r < 0) {
965 bh->error = r;
966 mark_error(bh);
967 } else {
968 bh->bl.substr_of(bl,
969 bh->start() - start,
970 bh->length());
971 mark_clean(bh);
972 }
973
974 ldout(cct, 10) << "bh_read_finish read " << *bh << dendl;
975
976 ob->try_merge_bh(bh);
977 }
978 }
979
980 // called with lock held.
981 ldout(cct, 20) << "finishing waiters " << ls << dendl;
982
983 finish_contexts(cct, ls, err);
984 retry_waiting_reads();
985
986 --reads_outstanding;
987 read_cond.notify_all();
988 }
989
990 void ObjectCacher::bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
991 int64_t *max_amount, int *max_count)
992 {
993 list<BufferHead*> blist;
994
995 int count = 0;
996 int64_t total_len = 0;
997 set<BufferHead*, BufferHead::ptr_lt>::iterator it = dirty_or_tx_bh.find(bh);
998 ceph_assert(it != dirty_or_tx_bh.end());
999 for (set<BufferHead*, BufferHead::ptr_lt>::iterator p = it;
1000 p != dirty_or_tx_bh.end();
1001 ++p) {
1002 BufferHead *obh = *p;
1003 if (obh->ob != bh->ob)
1004 break;
1005 if (obh->is_dirty() && obh->last_write <= cutoff) {
1006 blist.push_back(obh);
1007 ++count;
1008 total_len += obh->length();
1009 if ((max_count && count > *max_count) ||
1010 (max_amount && total_len > *max_amount))
1011 break;
1012 }
1013 }
1014
1015 while (it != dirty_or_tx_bh.begin()) {
1016 --it;
1017 BufferHead *obh = *it;
1018 if (obh->ob != bh->ob)
1019 break;
1020 if (obh->is_dirty() && obh->last_write <= cutoff) {
1021 blist.push_front(obh);
1022 ++count;
1023 total_len += obh->length();
1024 if ((max_count && count > *max_count) ||
1025 (max_amount && total_len > *max_amount))
1026 break;
1027 }
1028 }
1029 if (max_count)
1030 *max_count -= count;
1031 if (max_amount)
1032 *max_amount -= total_len;
1033
1034 bh_write_scattered(blist);
1035 }
1036
1037 class ObjectCacher::C_WriteCommit : public Context {
1038 ObjectCacher *oc;
1039 int64_t poolid;
1040 sobject_t oid;
1041 vector<pair<loff_t, uint64_t> > ranges;
1042 ZTracer::Trace trace;
1043 public:
1044 ceph_tid_t tid = 0;
1045 C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, loff_t s,
1046 uint64_t l, const ZTracer::Trace &trace) :
1047 oc(c), poolid(_poolid), oid(o), trace(trace) {
1048 ranges.push_back(make_pair(s, l));
1049 }
1050 C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o,
1051 vector<pair<loff_t, uint64_t> >& _ranges) :
1052 oc(c), poolid(_poolid), oid(o), tid(0) {
1053 ranges.swap(_ranges);
1054 }
1055 void finish(int r) override {
1056 oc->bh_write_commit(poolid, oid, ranges, tid, r);
1057 trace.event("finish");
1058 }
1059 };
1060 void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
1061 {
1062 ceph_assert(ceph_mutex_is_locked(lock));
1063
1064 Object *ob = blist.front()->ob;
1065 ob->get();
1066
1067 ceph::real_time last_write;
1068 SnapContext snapc;
1069 vector<pair<loff_t, uint64_t> > ranges;
1070 vector<pair<uint64_t, bufferlist> > io_vec;
1071
1072 ranges.reserve(blist.size());
1073 io_vec.reserve(blist.size());
1074
1075 uint64_t total_len = 0;
1076 for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1077 BufferHead *bh = *p;
1078 ldout(cct, 7) << "bh_write_scattered " << *bh << dendl;
1079 ceph_assert(bh->ob == ob);
1080 ceph_assert(bh->bl.length() == bh->length());
1081 ranges.push_back(pair<loff_t, uint64_t>(bh->start(), bh->length()));
1082
1083 int n = io_vec.size();
1084 io_vec.resize(n + 1);
1085 io_vec[n].first = bh->start();
1086 io_vec[n].second = bh->bl;
1087
1088 total_len += bh->length();
1089 if (bh->snapc.seq > snapc.seq)
1090 snapc = bh->snapc;
1091 if (bh->last_write > last_write)
1092 last_write = bh->last_write;
1093 }
1094
1095 C_WriteCommit *oncommit = new C_WriteCommit(this, ob->oloc.pool, ob->get_soid(), ranges);
1096
1097 ceph_tid_t tid = writeback_handler.write(ob->get_oid(), ob->get_oloc(),
1098 io_vec, snapc, last_write,
1099 ob->truncate_size, ob->truncate_seq,
1100 oncommit);
1101 oncommit->tid = tid;
1102 ob->last_write_tid = tid;
1103 for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1104 BufferHead *bh = *p;
1105 bh->last_write_tid = tid;
1106 mark_tx(bh);
1107 }
1108
1109 if (perfcounter)
1110 perfcounter->inc(l_objectcacher_data_flushed, total_len);
1111 }
1112
1113 void ObjectCacher::bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace)
1114 {
1115 ceph_assert(ceph_mutex_is_locked(lock));
1116 ldout(cct, 7) << "bh_write " << *bh << dendl;
1117
1118 bh->ob->get();
1119
1120 ZTracer::Trace trace;
1121 if (parent_trace.valid()) {
1122 trace.init("", &trace_endpoint, &parent_trace);
1123 trace.copy_name("bh_write " + bh->ob->get_oid().name);
1124 trace.event("start");
1125 }
1126
1127 // finishers
1128 C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->oloc.pool,
1129 bh->ob->get_soid(), bh->start(),
1130 bh->length(), trace);
1131 // go
1132 ceph_tid_t tid = writeback_handler.write(bh->ob->get_oid(),
1133 bh->ob->get_oloc(),
1134 bh->start(), bh->length(),
1135 bh->snapc, bh->bl, bh->last_write,
1136 bh->ob->truncate_size,
1137 bh->ob->truncate_seq,
1138 bh->journal_tid, trace, oncommit);
1139 ldout(cct, 20) << " tid " << tid << " on " << bh->ob->get_oid() << dendl;
1140
1141 // set bh last_write_tid
1142 oncommit->tid = tid;
1143 bh->ob->last_write_tid = tid;
1144 bh->last_write_tid = tid;
1145
1146 if (perfcounter) {
1147 perfcounter->inc(l_objectcacher_data_flushed, bh->length());
1148 }
1149
1150 mark_tx(bh);
1151 }
1152
1153 void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid,
1154 vector<pair<loff_t, uint64_t> >& ranges,
1155 ceph_tid_t tid, int r)
1156 {
1157 ceph_assert(ceph_mutex_is_locked(lock));
1158 ldout(cct, 7) << "bh_write_commit " << oid << " tid " << tid
1159 << " ranges " << ranges << " returned " << r << dendl;
1160
1161 if (objects[poolid].count(oid) == 0) {
1162 ldout(cct, 7) << "bh_write_commit no object cache" << dendl;
1163 return;
1164 }
1165
1166 Object *ob = objects[poolid][oid];
1167 int was_dirty_or_tx = ob->oset->dirty_or_tx;
1168
1169 for (vector<pair<loff_t, uint64_t> >::iterator p = ranges.begin();
1170 p != ranges.end();
1171 ++p) {
1172 loff_t start = p->first;
1173 uint64_t length = p->second;
1174 if (!ob->exists) {
1175 ldout(cct, 10) << "bh_write_commit marking exists on " << *ob << dendl;
1176 ob->exists = true;
1177
1178 if (writeback_handler.may_copy_on_write(ob->get_oid(), start, length,
1179 ob->get_snap())) {
1180 ldout(cct, 10) << "bh_write_commit may copy on write, clearing "
1181 "complete on " << *ob << dendl;
1182 ob->complete = false;
1183 }
1184 }
1185
1186 vector<pair<loff_t, BufferHead*>> hit;
1187 // apply to bh's!
1188 for (map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(start);
1189 p != ob->data.end();
1190 ++p) {
1191 BufferHead *bh = p->second;
1192
1193 if (bh->start() >= start+(loff_t)length)
1194 break;
1195
1196 // make sure bh is tx
1197 if (!bh->is_tx()) {
1198 ldout(cct, 10) << "bh_write_commit skipping non-tx " << *bh << dendl;
1199 continue;
1200 }
1201
1202 // make sure bh tid matches
1203 if (bh->last_write_tid != tid) {
1204 ceph_assert(bh->last_write_tid > tid);
1205 ldout(cct, 10) << "bh_write_commit newer tid on " << *bh << dendl;
1206 continue;
1207 }
1208
1209 // we don't merge tx buffers. tx buffer should be within the range
1210 ceph_assert(bh->start() >= start);
1211 ceph_assert(bh->end() <= start+(loff_t)length);
1212
1213 if (r >= 0) {
1214 // ok! mark bh clean and error-free
1215 mark_clean(bh);
1216 bh->set_journal_tid(0);
1217 if (bh->get_nocache())
1218 bh_lru_rest.lru_bottouch(bh);
1219 hit.push_back(make_pair(bh->start(), bh));
1220 ldout(cct, 10) << "bh_write_commit clean " << *bh << dendl;
1221 } else {
1222 mark_dirty(bh);
1223 ldout(cct, 10) << "bh_write_commit marking dirty again due to error "
1224 << *bh << " r = " << r << " " << cpp_strerror(-r)
1225 << dendl;
1226 }
1227 }
1228
1229 for (auto& p : hit) {
1230 //p.second maybe merged and deleted in merge_left
1231 if (ob->data.count(p.first))
1232 ob->try_merge_bh(p.second);
1233 }
1234 }
1235
1236 // update last_commit.
1237 ceph_assert(ob->last_commit_tid < tid);
1238 ob->last_commit_tid = tid;
1239
1240 // waiters?
1241 list<Context*> ls;
1242 if (ob->waitfor_commit.count(tid)) {
1243 ls.splice(ls.begin(), ob->waitfor_commit[tid]);
1244 ob->waitfor_commit.erase(tid);
1245 }
1246
1247 // is the entire object set now clean and fully committed?
1248 ObjectSet *oset = ob->oset;
1249 ob->put();
1250
1251 if (flush_set_callback &&
1252 was_dirty_or_tx > 0 &&
1253 oset->dirty_or_tx == 0) { // nothing dirty/tx
1254 flush_set_callback(flush_set_callback_arg, oset);
1255 }
1256
1257 if (!ls.empty())
1258 finish_contexts(cct, ls, r);
1259 }
1260
1261 void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount)
1262 {
1263 ceph_assert(trace != nullptr);
1264 ceph_assert(ceph_mutex_is_locked(lock));
1265 ceph::real_time cutoff = ceph::real_clock::now();
1266
1267 ldout(cct, 10) << "flush " << amount << dendl;
1268
1269 /*
1270 * NOTE: we aren't actually pulling things off the LRU here, just
1271 * looking at the tail item. Then we call bh_write, which moves it
1272 * to the other LRU, so that we can call
1273 * lru_dirty.lru_get_next_expire() again.
1274 */
1275 int64_t left = amount;
1276 while (amount == 0 || left > 0) {
1277 BufferHead *bh = static_cast<BufferHead*>(
1278 bh_lru_dirty.lru_get_next_expire());
1279 if (!bh) break;
1280 if (bh->last_write > cutoff) break;
1281
1282 if (scattered_write) {
1283 bh_write_adjacencies(bh, cutoff, amount > 0 ? &left : NULL, NULL);
1284 } else {
1285 left -= bh->length();
1286 bh_write(bh, *trace);
1287 }
1288 }
1289 }
1290
1291
1292 void ObjectCacher::trim()
1293 {
1294 ceph_assert(ceph_mutex_is_locked(lock));
1295 ldout(cct, 10) << "trim start: bytes: max " << max_size << " clean "
1296 << get_stat_clean() << ", objects: max " << max_objects
1297 << " current " << ob_lru.lru_get_size() << dendl;
1298
1299 uint64_t max_clean_bh = max_size >> BUFFER_MEMORY_WEIGHT;
1300 uint64_t nr_clean_bh = bh_lru_rest.lru_get_size() - bh_lru_rest.lru_get_num_pinned();
1301 while (get_stat_clean() > 0 &&
1302 ((uint64_t)get_stat_clean() > max_size ||
1303 nr_clean_bh > max_clean_bh)) {
1304 BufferHead *bh = static_cast<BufferHead*>(bh_lru_rest.lru_expire());
1305 if (!bh)
1306 break;
1307
1308 ldout(cct, 10) << "trim trimming " << *bh << dendl;
1309 ceph_assert(bh->is_clean() || bh->is_zero() || bh->is_error());
1310
1311 Object *ob = bh->ob;
1312 bh_remove(ob, bh);
1313 delete bh;
1314
1315 --nr_clean_bh;
1316
1317 if (ob->complete) {
1318 ldout(cct, 10) << "trim clearing complete on " << *ob << dendl;
1319 ob->complete = false;
1320 }
1321 }
1322
1323 while (ob_lru.lru_get_size() > max_objects) {
1324 Object *ob = static_cast<Object*>(ob_lru.lru_expire());
1325 if (!ob)
1326 break;
1327
1328 ldout(cct, 10) << "trim trimming " << *ob << dendl;
1329 close_object(ob);
1330 }
1331
1332 ldout(cct, 10) << "trim finish: max " << max_size << " clean "
1333 << get_stat_clean() << ", objects: max " << max_objects
1334 << " current " << ob_lru.lru_get_size() << dendl;
1335 }
1336
1337
1338
1339 /* public */
1340
1341 bool ObjectCacher::is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
1342 snapid_t snapid)
1343 {
1344 ceph_assert(ceph_mutex_is_locked(lock));
1345 for (vector<ObjectExtent>::iterator ex_it = extents.begin();
1346 ex_it != extents.end();
1347 ++ex_it) {
1348 ldout(cct, 10) << "is_cached " << *ex_it << dendl;
1349
1350 // get Object cache
1351 sobject_t soid(ex_it->oid, snapid);
1352 Object *o = get_object_maybe(soid, ex_it->oloc);
1353 if (!o)
1354 return false;
1355 if (!o->is_cached(ex_it->offset, ex_it->length))
1356 return false;
1357 }
1358 return true;
1359 }
1360
1361
1362 /*
1363 * returns # bytes read (if in cache). onfinish is untouched (caller
1364 * must delete it)
1365 * returns 0 if doing async read
1366 */
1367 int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
1368 ZTracer::Trace *parent_trace)
1369 {
1370 ZTracer::Trace trace;
1371 if (parent_trace != nullptr) {
1372 trace.init("read", &trace_endpoint, parent_trace);
1373 trace.event("start");
1374 }
1375
1376 int r =_readx(rd, oset, onfinish, true, &trace);
1377 if (r < 0) {
1378 trace.event("finish");
1379 }
1380 return r;
1381 }
1382
1383 int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
1384 bool external_call, ZTracer::Trace *trace)
1385 {
1386 ceph_assert(trace != nullptr);
1387 ceph_assert(ceph_mutex_is_locked(lock));
1388 bool success = true;
1389 int error = 0;
1390 uint64_t bytes_in_cache = 0;
1391 uint64_t bytes_not_in_cache = 0;
1392 uint64_t total_bytes_read = 0;
1393 map<uint64_t, bufferlist> stripe_map; // final buffer offset -> substring
1394 bool dontneed = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1395 bool nocache = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1396
1397 /*
1398 * WARNING: we can only meaningfully return ENOENT if the read request
1399 * passed in a single ObjectExtent. Any caller who wants ENOENT instead of
1400 * zeroed buffers needs to feed single extents into readx().
1401 */
1402 ceph_assert(!oset->return_enoent || rd->extents.size() == 1);
1403
1404 for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
1405 ex_it != rd->extents.end();
1406 ++ex_it) {
1407 ldout(cct, 10) << "readx " << *ex_it << dendl;
1408
1409 total_bytes_read += ex_it->length;
1410
1411 // get Object cache
1412 sobject_t soid(ex_it->oid, rd->snap);
1413 Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1414 ex_it->truncate_size, oset->truncate_seq);
1415 if (external_call)
1416 touch_ob(o);
1417
1418 // does not exist and no hits?
1419 if (oset->return_enoent && !o->exists) {
1420 ldout(cct, 10) << "readx object !exists, 1 extent..." << dendl;
1421
1422 // should we worry about COW underneath us?
1423 if (writeback_handler.may_copy_on_write(soid.oid, ex_it->offset,
1424 ex_it->length, soid.snap)) {
1425 ldout(cct, 20) << "readx may copy on write" << dendl;
1426 bool wait = false;
1427 list<BufferHead*> blist;
1428 for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1429 bh_it != o->data.end();
1430 ++bh_it) {
1431 BufferHead *bh = bh_it->second;
1432 if (bh->is_dirty() || bh->is_tx()) {
1433 ldout(cct, 10) << "readx flushing " << *bh << dendl;
1434 wait = true;
1435 if (bh->is_dirty()) {
1436 if (scattered_write)
1437 blist.push_back(bh);
1438 else
1439 bh_write(bh, *trace);
1440 }
1441 }
1442 }
1443 if (scattered_write && !blist.empty())
1444 bh_write_scattered(blist);
1445 if (wait) {
1446 ldout(cct, 10) << "readx waiting on tid " << o->last_write_tid
1447 << " on " << *o << dendl;
1448 o->waitfor_commit[o->last_write_tid].push_back(
1449 new C_RetryRead(this,rd, oset, onfinish, *trace));
1450 // FIXME: perfcounter!
1451 return 0;
1452 }
1453 }
1454
1455 // can we return ENOENT?
1456 bool allzero = true;
1457 for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1458 bh_it != o->data.end();
1459 ++bh_it) {
1460 ldout(cct, 20) << "readx ob has bh " << *bh_it->second << dendl;
1461 if (!bh_it->second->is_zero() && !bh_it->second->is_rx()) {
1462 allzero = false;
1463 break;
1464 }
1465 }
1466 if (allzero) {
1467 ldout(cct, 10) << "readx ob has all zero|rx, returning ENOENT"
1468 << dendl;
1469 delete rd;
1470 if (dontneed)
1471 bottouch_ob(o);
1472 return -ENOENT;
1473 }
1474 }
1475
1476 // map extent into bufferheads
1477 map<loff_t, BufferHead*> hits, missing, rx, errors;
1478 o->map_read(*ex_it, hits, missing, rx, errors);
1479 if (external_call) {
1480 // retry reading error buffers
1481 missing.insert(errors.begin(), errors.end());
1482 } else {
1483 // some reads had errors, fail later so completions
1484 // are cleaned up properly
1485 // TODO: make read path not call _readx for every completion
1486 hits.insert(errors.begin(), errors.end());
1487 }
1488
1489 if (!missing.empty() || !rx.empty()) {
1490 // read missing
1491 map<loff_t, BufferHead*>::iterator last = missing.end();
1492 for (map<loff_t, BufferHead*>::iterator bh_it = missing.begin();
1493 bh_it != missing.end();
1494 ++bh_it) {
1495 uint64_t rx_bytes = static_cast<uint64_t>(
1496 stat_rx + bh_it->second->length());
1497 bytes_not_in_cache += bh_it->second->length();
1498 if (!waitfor_read.empty() || (stat_rx > 0 && rx_bytes > max_size)) {
1499 // cache is full with concurrent reads -- wait for rx's to complete
1500 // to constrain memory growth (especially during copy-ups)
1501 if (success) {
1502 ldout(cct, 10) << "readx missed, waiting on cache to complete "
1503 << waitfor_read.size() << " blocked reads, "
1504 << (std::max(rx_bytes, max_size) - max_size)
1505 << " read bytes" << dendl;
1506 waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish,
1507 *trace));
1508 }
1509
1510 bh_remove(o, bh_it->second);
1511 delete bh_it->second;
1512 } else {
1513 bh_it->second->set_nocache(nocache);
1514 bh_read(bh_it->second, rd->fadvise_flags, *trace);
1515 if ((success && onfinish) || last != missing.end())
1516 last = bh_it;
1517 }
1518 success = false;
1519 }
1520
1521 //add wait in last bh avoid wakeup early. Because read is order
1522 if (last != missing.end()) {
1523 ldout(cct, 10) << "readx missed, waiting on " << *last->second
1524 << " off " << last->first << dendl;
1525 last->second->waitfor_read[last->first].push_back(
1526 new C_RetryRead(this, rd, oset, onfinish, *trace) );
1527
1528 }
1529
1530 // bump rx
1531 for (map<loff_t, BufferHead*>::iterator bh_it = rx.begin();
1532 bh_it != rx.end();
1533 ++bh_it) {
1534 touch_bh(bh_it->second); // bump in lru, so we don't lose it.
1535 if (success && onfinish) {
1536 ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
1537 << " off " << bh_it->first << dendl;
1538 bh_it->second->waitfor_read[bh_it->first].push_back(
1539 new C_RetryRead(this, rd, oset, onfinish, *trace) );
1540 }
1541 bytes_not_in_cache += bh_it->second->length();
1542 success = false;
1543 }
1544
1545 for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1546 bh_it != hits.end(); ++bh_it)
1547 //bump in lru, so we don't lose it when later read
1548 touch_bh(bh_it->second);
1549
1550 } else {
1551 ceph_assert(!hits.empty());
1552
1553 // make a plain list
1554 for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1555 bh_it != hits.end();
1556 ++bh_it) {
1557 BufferHead *bh = bh_it->second;
1558 ldout(cct, 10) << "readx hit bh " << *bh << dendl;
1559 if (bh->is_error() && bh->error)
1560 error = bh->error;
1561 bytes_in_cache += bh->length();
1562
1563 if (bh->get_nocache() && bh->is_clean())
1564 bh_lru_rest.lru_bottouch(bh);
1565 else
1566 touch_bh(bh);
1567 //must be after touch_bh because touch_bh set dontneed false
1568 if (dontneed &&
1569 ((loff_t)ex_it->offset <= bh->start() &&
1570 (bh->end() <=(loff_t)(ex_it->offset + ex_it->length)))) {
1571 bh->set_dontneed(true); //if dirty
1572 if (bh->is_clean())
1573 bh_lru_rest.lru_bottouch(bh);
1574 }
1575 }
1576
1577 if (!error) {
1578 // create reverse map of buffer offset -> object for the
1579 // eventual result. this is over a single ObjectExtent, so we
1580 // know that
1581 // - the bh's are contiguous
1582 // - the buffer frags need not be (and almost certainly aren't)
1583 loff_t opos = ex_it->offset;
1584 map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1585 ceph_assert(bh_it->second->start() <= opos);
1586 uint64_t bhoff = opos - bh_it->second->start();
1587 vector<pair<uint64_t,uint64_t> >::iterator f_it
1588 = ex_it->buffer_extents.begin();
1589 uint64_t foff = 0;
1590 while (1) {
1591 BufferHead *bh = bh_it->second;
1592 ceph_assert(opos == (loff_t)(bh->start() + bhoff));
1593
1594 uint64_t len = std::min(f_it->second - foff, bh->length() - bhoff);
1595 ldout(cct, 10) << "readx rmap opos " << opos << ": " << *bh << " +"
1596 << bhoff << " frag " << f_it->first << "~"
1597 << f_it->second << " +" << foff << "~" << len
1598 << dendl;
1599
1600 bufferlist bit;
1601 // put substr here first, since substr_of clobbers, and we
1602 // may get multiple bh's at this stripe_map position
1603 if (bh->is_zero()) {
1604 stripe_map[f_it->first].append_zero(len);
1605 } else {
1606 bit.substr_of(bh->bl,
1607 opos - bh->start(),
1608 len);
1609 stripe_map[f_it->first].claim_append(bit);
1610 }
1611
1612 opos += len;
1613 bhoff += len;
1614 foff += len;
1615 if (opos == bh->end()) {
1616 ++bh_it;
1617 bhoff = 0;
1618 }
1619 if (foff == f_it->second) {
1620 ++f_it;
1621 foff = 0;
1622 }
1623 if (bh_it == hits.end()) break;
1624 if (f_it == ex_it->buffer_extents.end())
1625 break;
1626 }
1627 ceph_assert(f_it == ex_it->buffer_extents.end());
1628 ceph_assert(opos == (loff_t)ex_it->offset + (loff_t)ex_it->length);
1629 }
1630
1631 if (dontneed && o->include_all_cached_data(ex_it->offset, ex_it->length))
1632 bottouch_ob(o);
1633 }
1634 }
1635
1636 if (!success) {
1637 if (perfcounter && external_call) {
1638 perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1639 perfcounter->inc(l_objectcacher_cache_bytes_miss, bytes_not_in_cache);
1640 perfcounter->inc(l_objectcacher_cache_ops_miss);
1641 }
1642 if (onfinish) {
1643 ldout(cct, 20) << "readx defer " << rd << dendl;
1644 } else {
1645 ldout(cct, 20) << "readx drop " << rd << " (no complete, but no waiter)"
1646 << dendl;
1647 delete rd;
1648 }
1649 return 0; // wait!
1650 }
1651 if (perfcounter && external_call) {
1652 perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1653 perfcounter->inc(l_objectcacher_cache_bytes_hit, bytes_in_cache);
1654 perfcounter->inc(l_objectcacher_cache_ops_hit);
1655 }
1656
1657 // no misses... success! do the read.
1658 ldout(cct, 10) << "readx has all buffers" << dendl;
1659
1660 // ok, assemble into result buffer.
1661 uint64_t pos = 0;
1662 if (rd->bl && !error) {
1663 rd->bl->clear();
1664 for (map<uint64_t,bufferlist>::iterator i = stripe_map.begin();
1665 i != stripe_map.end();
1666 ++i) {
1667 ceph_assert(pos == i->first);
1668 ldout(cct, 10) << "readx adding buffer len " << i->second.length()
1669 << " at " << pos << dendl;
1670 pos += i->second.length();
1671 rd->bl->claim_append(i->second);
1672 ceph_assert(rd->bl->length() == pos);
1673 }
1674 ldout(cct, 10) << "readx result is " << rd->bl->length() << dendl;
1675 } else if (!error) {
1676 ldout(cct, 10) << "readx no bufferlist ptr (readahead?), done." << dendl;
1677 map<uint64_t,bufferlist>::reverse_iterator i = stripe_map.rbegin();
1678 pos = i->first + i->second.length();
1679 }
1680
1681 // done with read.
1682 int ret = error ? error : pos;
1683 ldout(cct, 20) << "readx done " << rd << " " << ret << dendl;
1684 ceph_assert(pos <= (uint64_t) INT_MAX);
1685
1686 delete rd;
1687
1688 trim();
1689
1690 return ret;
1691 }
1692
1693 void ObjectCacher::retry_waiting_reads()
1694 {
1695 list<Context *> ls;
1696 ls.swap(waitfor_read);
1697
1698 while (!ls.empty() && waitfor_read.empty()) {
1699 Context *ctx = ls.front();
1700 ls.pop_front();
1701 ctx->complete(0);
1702 }
1703 waitfor_read.splice(waitfor_read.end(), ls);
1704 }
1705
1706 int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
1707 ZTracer::Trace *parent_trace)
1708 {
1709 ceph_assert(ceph_mutex_is_locked(lock));
1710 ceph::real_time now = ceph::real_clock::now();
1711 uint64_t bytes_written = 0;
1712 uint64_t bytes_written_in_flush = 0;
1713 bool dontneed = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1714 bool nocache = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1715
1716 ZTracer::Trace trace;
1717 if (parent_trace != nullptr) {
1718 trace.init("write", &trace_endpoint, parent_trace);
1719 trace.event("start");
1720 }
1721
1722 list<Context*> wait_for_reads;
1723 for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
1724 ex_it != wr->extents.end();
1725 ++ex_it) {
1726 // get object cache
1727 sobject_t soid(ex_it->oid, CEPH_NOSNAP);
1728 Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1729 ex_it->truncate_size, oset->truncate_seq);
1730
1731 // map it all into a single bufferhead.
1732 BufferHead *bh = o->map_write(*ex_it, wr->journal_tid);
1733 bool missing = bh->is_missing();
1734 bh->snapc = wr->snapc;
1735
1736 // readers that need to be woken up due to an overwrite
1737 for (auto& [_, wait_for_read] : bh->waitfor_read) {
1738 wait_for_reads.splice(wait_for_reads.end(), wait_for_read);
1739 }
1740 bh->waitfor_read.clear();
1741
1742 bytes_written += ex_it->length;
1743 if (bh->is_tx()) {
1744 bytes_written_in_flush += ex_it->length;
1745 }
1746
1747 // adjust buffer pointers (ie "copy" data into my cache)
1748 // this is over a single ObjectExtent, so we know that
1749 // - there is one contiguous bh
1750 // - the buffer frags need not be (and almost certainly aren't)
1751 // note: i assume striping is monotonic... no jumps backwards, ever!
1752 loff_t opos = ex_it->offset;
1753 for (vector<pair<uint64_t, uint64_t> >::iterator f_it
1754 = ex_it->buffer_extents.begin();
1755 f_it != ex_it->buffer_extents.end();
1756 ++f_it) {
1757 ldout(cct, 10) << "writex writing " << f_it->first << "~"
1758 << f_it->second << " into " << *bh << " at " << opos
1759 << dendl;
1760 uint64_t bhoff = opos - bh->start();
1761 ceph_assert(f_it->second <= bh->length() - bhoff);
1762
1763 // get the frag we're mapping in
1764 bufferlist frag;
1765 frag.substr_of(wr->bl, f_it->first, f_it->second);
1766
1767 // keep anything left of bhoff
1768 if (!bhoff)
1769 bh->bl.swap(frag);
1770 else
1771 bh->bl.claim_append(frag);
1772
1773 opos += f_it->second;
1774 }
1775
1776 // ok, now bh is dirty.
1777 mark_dirty(bh);
1778 if (dontneed)
1779 bh->set_dontneed(true);
1780 else if (nocache && missing)
1781 bh->set_nocache(true);
1782 else
1783 touch_bh(bh);
1784
1785 bh->last_write = now;
1786
1787 o->try_merge_bh(bh);
1788 }
1789
1790 if (perfcounter) {
1791 perfcounter->inc(l_objectcacher_data_written, bytes_written);
1792 if (bytes_written_in_flush) {
1793 perfcounter->inc(l_objectcacher_overwritten_in_flush,
1794 bytes_written_in_flush);
1795 }
1796 }
1797
1798 int r = _wait_for_write(wr, bytes_written, oset, &trace, onfreespace);
1799 delete wr;
1800
1801 finish_contexts(cct, wait_for_reads, 0);
1802
1803 //verify_stats();
1804 trim();
1805 return r;
1806 }
1807
1808 class ObjectCacher::C_WaitForWrite : public Context {
1809 public:
1810 C_WaitForWrite(ObjectCacher *oc, uint64_t len,
1811 const ZTracer::Trace &trace, Context *onfinish) :
1812 m_oc(oc), m_len(len), m_trace(trace), m_onfinish(onfinish) {}
1813 void finish(int r) override;
1814 private:
1815 ObjectCacher *m_oc;
1816 uint64_t m_len;
1817 ZTracer::Trace m_trace;
1818 Context *m_onfinish;
1819 };
1820
1821 void ObjectCacher::C_WaitForWrite::finish(int r)
1822 {
1823 std::lock_guard l(m_oc->lock);
1824 m_oc->_maybe_wait_for_writeback(m_len, &m_trace);
1825 m_onfinish->complete(r);
1826 }
1827
1828 void ObjectCacher::_maybe_wait_for_writeback(uint64_t len,
1829 ZTracer::Trace *trace)
1830 {
1831 ceph_assert(ceph_mutex_is_locked(lock));
1832 ceph::mono_time start = ceph::mono_clock::now();
1833 int blocked = 0;
1834 // wait for writeback?
1835 // - wait for dirty and tx bytes (relative to the max_dirty threshold)
1836 // - do not wait for bytes other waiters are waiting on. this means that
1837 // threads do not wait for each other. this effectively allows the cache
1838 // size to balloon proportional to the data that is in flight.
1839
1840 uint64_t max_dirty_bh = max_dirty >> BUFFER_MEMORY_WEIGHT;
1841 while (get_stat_dirty() + get_stat_tx() > 0 &&
1842 (((uint64_t)(get_stat_dirty() + get_stat_tx()) >=
1843 max_dirty + get_stat_dirty_waiting()) ||
1844 (dirty_or_tx_bh.size() >=
1845 max_dirty_bh + get_stat_nr_dirty_waiters()))) {
1846
1847 if (blocked == 0) {
1848 trace->event("start wait for writeback");
1849 }
1850 ldout(cct, 10) << __func__ << " waiting for dirty|tx "
1851 << (get_stat_dirty() + get_stat_tx()) << " >= max "
1852 << max_dirty << " + dirty_waiting "
1853 << get_stat_dirty_waiting() << dendl;
1854 flusher_cond.notify_all();
1855 stat_dirty_waiting += len;
1856 ++stat_nr_dirty_waiters;
1857 std::unique_lock l{lock, std::adopt_lock};
1858 stat_cond.wait(l);
1859 l.release();
1860 stat_dirty_waiting -= len;
1861 --stat_nr_dirty_waiters;
1862 ++blocked;
1863 ldout(cct, 10) << __func__ << " woke up" << dendl;
1864 }
1865 if (blocked > 0) {
1866 trace->event("finish wait for writeback");
1867 }
1868 if (blocked && perfcounter) {
1869 perfcounter->inc(l_objectcacher_write_ops_blocked);
1870 perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
1871 ceph::timespan blocked = ceph::mono_clock::now() - start;
1872 perfcounter->tinc(l_objectcacher_write_time_blocked, blocked);
1873 }
1874 }
1875
1876 // blocking wait for write.
1877 int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
1878 ZTracer::Trace *trace, Context *onfreespace)
1879 {
1880 ceph_assert(ceph_mutex_is_locked(lock));
1881 ceph_assert(trace != nullptr);
1882 int ret = 0;
1883
1884 if (max_dirty > 0 && !(wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_FUA)) {
1885 if (block_writes_upfront) {
1886 _maybe_wait_for_writeback(len, trace);
1887 if (onfreespace)
1888 onfreespace->complete(0);
1889 } else {
1890 ceph_assert(onfreespace);
1891 finisher.queue(new C_WaitForWrite(this, len, *trace, onfreespace));
1892 }
1893 } else {
1894 // write-thru! flush what we just wrote.
1895 ceph::condition_variable cond;
1896 bool done = false;
1897 Context *fin = block_writes_upfront ?
1898 new C_Cond(cond, &done, &ret) : onfreespace;
1899 ceph_assert(fin);
1900 bool flushed = flush_set(oset, wr->extents, trace, fin);
1901 ceph_assert(!flushed); // we just dirtied it, and didn't drop our lock!
1902 ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len
1903 << " bytes" << dendl;
1904 if (block_writes_upfront) {
1905 std::unique_lock l{lock, std::adopt_lock};
1906 cond.wait(l, [&done] { return done; });
1907 l.release();
1908 ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl;
1909 if (onfreespace)
1910 onfreespace->complete(ret);
1911 }
1912 }
1913
1914 // start writeback anyway?
1915 if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty) {
1916 ldout(cct, 10) << "wait_for_write " << get_stat_dirty() << " > target "
1917 << target_dirty << ", nudging flusher" << dendl;
1918 flusher_cond.notify_all();
1919 }
1920 return ret;
1921 }
1922
1923 void ObjectCacher::flusher_entry()
1924 {
1925 ldout(cct, 10) << "flusher start" << dendl;
1926 std::unique_lock l{lock};
1927 while (!flusher_stop) {
1928 loff_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() +
1929 get_stat_dirty();
1930 ldout(cct, 11) << "flusher "
1931 << all << " / " << max_size << ": "
1932 << get_stat_tx() << " tx, "
1933 << get_stat_rx() << " rx, "
1934 << get_stat_clean() << " clean, "
1935 << get_stat_dirty() << " dirty ("
1936 << target_dirty << " target, "
1937 << max_dirty << " max)"
1938 << dendl;
1939 loff_t actual = get_stat_dirty() + get_stat_dirty_waiting();
1940
1941 ZTracer::Trace trace;
1942 if (cct->_conf->osdc_blkin_trace_all) {
1943 trace.init("flusher", &trace_endpoint);
1944 trace.event("start");
1945 }
1946
1947 if (actual > 0 && (uint64_t) actual > target_dirty) {
1948 // flush some dirty pages
1949 ldout(cct, 10) << "flusher " << get_stat_dirty() << " dirty + "
1950 << get_stat_dirty_waiting() << " dirty_waiting > target "
1951 << target_dirty << ", flushing some dirty bhs" << dendl;
1952 flush(&trace, actual - target_dirty);
1953 } else {
1954 // check tail of lru for old dirty items
1955 ceph::real_time cutoff = ceph::real_clock::now();
1956 cutoff -= max_dirty_age;
1957 BufferHead *bh = 0;
1958 int max = MAX_FLUSH_UNDER_LOCK;
1959 while ((bh = static_cast<BufferHead*>(bh_lru_dirty.
1960 lru_get_next_expire())) != 0 &&
1961 bh->last_write <= cutoff &&
1962 max > 0) {
1963 ldout(cct, 10) << "flusher flushing aged dirty bh " << *bh << dendl;
1964 if (scattered_write) {
1965 bh_write_adjacencies(bh, cutoff, NULL, &max);
1966 } else {
1967 bh_write(bh, trace);
1968 --max;
1969 }
1970 }
1971 if (!max) {
1972 // back off the lock to avoid starving other threads
1973 trace.event("backoff");
1974 l.unlock();
1975 l.lock();
1976 continue;
1977 }
1978 }
1979
1980 trace.event("finish");
1981 if (flusher_stop)
1982 break;
1983
1984 flusher_cond.wait_for(l, 1s);
1985 }
1986
1987 /* Wait for reads to finish. This is only possible if handling
1988 * -ENOENT made some read completions finish before their rados read
1989 * came back. If we don't wait for them, and destroy the cache, when
1990 * the rados reads do come back their callback will try to access the
1991 * no-longer-valid ObjectCacher.
1992 */
1993 read_cond.wait(l, [this] {
1994 if (reads_outstanding > 0) {
1995 ldout(cct, 10) << "Waiting for all reads to complete. Number left: "
1996 << reads_outstanding << dendl;
1997 return false;
1998 } else {
1999 return true;
2000 }
2001 });
2002 ldout(cct, 10) << "flusher finish" << dendl;
2003 }
2004
2005
2006 // -------------------------------------------------
2007
2008 bool ObjectCacher::set_is_empty(ObjectSet *oset)
2009 {
2010 ceph_assert(ceph_mutex_is_locked(lock));
2011 if (oset->objects.empty())
2012 return true;
2013
2014 for (xlist<Object*>::iterator p = oset->objects.begin(); !p.end(); ++p)
2015 if (!(*p)->is_empty())
2016 return false;
2017
2018 return true;
2019 }
2020
2021 bool ObjectCacher::set_is_cached(ObjectSet *oset)
2022 {
2023 ceph_assert(ceph_mutex_is_locked(lock));
2024 if (oset->objects.empty())
2025 return false;
2026
2027 for (xlist<Object*>::iterator p = oset->objects.begin();
2028 !p.end(); ++p) {
2029 Object *ob = *p;
2030 for (map<loff_t,BufferHead*>::iterator q = ob->data.begin();
2031 q != ob->data.end();
2032 ++q) {
2033 BufferHead *bh = q->second;
2034 if (!bh->is_dirty() && !bh->is_tx())
2035 return true;
2036 }
2037 }
2038
2039 return false;
2040 }
2041
2042 bool ObjectCacher::set_is_dirty_or_committing(ObjectSet *oset)
2043 {
2044 ceph_assert(ceph_mutex_is_locked(lock));
2045 if (oset->objects.empty())
2046 return false;
2047
2048 for (xlist<Object*>::iterator i = oset->objects.begin();
2049 !i.end(); ++i) {
2050 Object *ob = *i;
2051
2052 for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
2053 p != ob->data.end();
2054 ++p) {
2055 BufferHead *bh = p->second;
2056 if (bh->is_dirty() || bh->is_tx())
2057 return true;
2058 }
2059 }
2060
2061 return false;
2062 }
2063
2064
2065 // purge. non-blocking. violently removes dirty buffers from cache.
2066 void ObjectCacher::purge(Object *ob)
2067 {
2068 ceph_assert(ceph_mutex_is_locked(lock));
2069 ldout(cct, 10) << "purge " << *ob << dendl;
2070
2071 ob->truncate(0);
2072 }
2073
2074
2075 // flush. non-blocking. no callback.
2076 // true if clean, already flushed.
2077 // false if we wrote something.
2078 // be sloppy about the ranges and flush any buffer it touches
2079 bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length,
2080 ZTracer::Trace *trace)
2081 {
2082 ceph_assert(trace != nullptr);
2083 ceph_assert(ceph_mutex_is_locked(lock));
2084 list<BufferHead*> blist;
2085 bool clean = true;
2086 ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl;
2087 for (map<loff_t,BufferHead*>::const_iterator p = ob->data_lower_bound(offset);
2088 p != ob->data.end();
2089 ++p) {
2090 BufferHead *bh = p->second;
2091 ldout(cct, 20) << "flush " << *bh << dendl;
2092 if (length && bh->start() > offset+length) {
2093 break;
2094 }
2095 if (bh->is_tx()) {
2096 clean = false;
2097 continue;
2098 }
2099 if (!bh->is_dirty()) {
2100 continue;
2101 }
2102
2103 if (scattered_write)
2104 blist.push_back(bh);
2105 else
2106 bh_write(bh, *trace);
2107 clean = false;
2108 }
2109 if (scattered_write && !blist.empty())
2110 bh_write_scattered(blist);
2111
2112 return clean;
2113 }
2114
2115 bool ObjectCacher::_flush_set_finish(C_GatherBuilder *gather,
2116 Context *onfinish)
2117 {
2118 ceph_assert(ceph_mutex_is_locked(lock));
2119 if (gather->has_subs()) {
2120 gather->set_finisher(onfinish);
2121 gather->activate();
2122 return false;
2123 }
2124
2125 ldout(cct, 10) << "flush_set has no dirty|tx bhs" << dendl;
2126 onfinish->complete(0);
2127 return true;
2128 }
2129
2130 // flush. non-blocking, takes callback.
2131 // returns true if already flushed
2132 bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
2133 {
2134 ceph_assert(ceph_mutex_is_locked(lock));
2135 ceph_assert(onfinish != NULL);
2136 if (oset->objects.empty()) {
2137 ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2138 onfinish->complete(0);
2139 return true;
2140 }
2141
2142 ldout(cct, 10) << "flush_set " << oset << dendl;
2143
2144 // we'll need to wait for all objects to flush!
2145 C_GatherBuilder gather(cct);
2146 set<Object*> waitfor_commit;
2147
2148 list<BufferHead*> blist;
2149 Object *last_ob = NULL;
2150 set<BufferHead*, BufferHead::ptr_lt>::const_iterator it, p, q;
2151
2152 // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
2153 // order. But items in oset->objects are not sorted. So the iterator can
2154 // point to any buffer head in the ObjectSet
2155 BufferHead key(*oset->objects.begin());
2156 it = dirty_or_tx_bh.lower_bound(&key);
2157 p = q = it;
2158
2159 bool backwards = true;
2160 if (it != dirty_or_tx_bh.begin())
2161 --it;
2162 else
2163 backwards = false;
2164
2165 for (; p != dirty_or_tx_bh.end(); p = q) {
2166 ++q;
2167 BufferHead *bh = *p;
2168 if (bh->ob->oset != oset)
2169 break;
2170 waitfor_commit.insert(bh->ob);
2171 if (bh->is_dirty()) {
2172 if (scattered_write) {
2173 if (last_ob != bh->ob) {
2174 if (!blist.empty()) {
2175 bh_write_scattered(blist);
2176 blist.clear();
2177 }
2178 last_ob = bh->ob;
2179 }
2180 blist.push_back(bh);
2181 } else {
2182 bh_write(bh, {});
2183 }
2184 }
2185 }
2186
2187 if (backwards) {
2188 for(p = q = it; true; p = q) {
2189 if (q != dirty_or_tx_bh.begin())
2190 --q;
2191 else
2192 backwards = false;
2193 BufferHead *bh = *p;
2194 if (bh->ob->oset != oset)
2195 break;
2196 waitfor_commit.insert(bh->ob);
2197 if (bh->is_dirty()) {
2198 if (scattered_write) {
2199 if (last_ob != bh->ob) {
2200 if (!blist.empty()) {
2201 bh_write_scattered(blist);
2202 blist.clear();
2203 }
2204 last_ob = bh->ob;
2205 }
2206 blist.push_front(bh);
2207 } else {
2208 bh_write(bh, {});
2209 }
2210 }
2211 if (!backwards)
2212 break;
2213 }
2214 }
2215
2216 if (scattered_write && !blist.empty())
2217 bh_write_scattered(blist);
2218
2219 for (set<Object*>::iterator i = waitfor_commit.begin();
2220 i != waitfor_commit.end(); ++i) {
2221 Object *ob = *i;
2222
2223 // we'll need to gather...
2224 ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2225 << ob->last_write_tid << " on " << *ob << dendl;
2226 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2227 }
2228
2229 return _flush_set_finish(&gather, onfinish);
2230 }
2231
2232 // flush. non-blocking, takes callback.
2233 // returns true if already flushed
2234 bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv,
2235 ZTracer::Trace *trace, Context *onfinish)
2236 {
2237 ceph_assert(ceph_mutex_is_locked(lock));
2238 ceph_assert(trace != nullptr);
2239 ceph_assert(onfinish != NULL);
2240 if (oset->objects.empty()) {
2241 ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2242 onfinish->complete(0);
2243 return true;
2244 }
2245
2246 ldout(cct, 10) << "flush_set " << oset << " on " << exv.size()
2247 << " ObjectExtents" << dendl;
2248
2249 // we'll need to wait for all objects to flush!
2250 C_GatherBuilder gather(cct);
2251
2252 for (vector<ObjectExtent>::iterator p = exv.begin();
2253 p != exv.end();
2254 ++p) {
2255 ObjectExtent &ex = *p;
2256 sobject_t soid(ex.oid, CEPH_NOSNAP);
2257 if (objects[oset->poolid].count(soid) == 0)
2258 continue;
2259 Object *ob = objects[oset->poolid][soid];
2260
2261 ldout(cct, 20) << "flush_set " << oset << " ex " << ex << " ob " << soid
2262 << " " << ob << dendl;
2263
2264 if (!flush(ob, ex.offset, ex.length, trace)) {
2265 // we'll need to gather...
2266 ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2267 << ob->last_write_tid << " on " << *ob << dendl;
2268 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2269 }
2270 }
2271
2272 return _flush_set_finish(&gather, onfinish);
2273 }
2274
2275 // flush all dirty data. non-blocking, takes callback.
2276 // returns true if already flushed
2277 bool ObjectCacher::flush_all(Context *onfinish)
2278 {
2279 ceph_assert(ceph_mutex_is_locked(lock));
2280 ceph_assert(onfinish != NULL);
2281
2282 ldout(cct, 10) << "flush_all " << dendl;
2283
2284 // we'll need to wait for all objects to flush!
2285 C_GatherBuilder gather(cct);
2286 set<Object*> waitfor_commit;
2287
2288 list<BufferHead*> blist;
2289 Object *last_ob = NULL;
2290 set<BufferHead*, BufferHead::ptr_lt>::iterator next, it;
2291 next = it = dirty_or_tx_bh.begin();
2292 while (it != dirty_or_tx_bh.end()) {
2293 ++next;
2294 BufferHead *bh = *it;
2295 waitfor_commit.insert(bh->ob);
2296
2297 if (bh->is_dirty()) {
2298 if (scattered_write) {
2299 if (last_ob != bh->ob) {
2300 if (!blist.empty()) {
2301 bh_write_scattered(blist);
2302 blist.clear();
2303 }
2304 last_ob = bh->ob;
2305 }
2306 blist.push_back(bh);
2307 } else {
2308 bh_write(bh, {});
2309 }
2310 }
2311
2312 it = next;
2313 }
2314
2315 if (scattered_write && !blist.empty())
2316 bh_write_scattered(blist);
2317
2318 for (set<Object*>::iterator i = waitfor_commit.begin();
2319 i != waitfor_commit.end();
2320 ++i) {
2321 Object *ob = *i;
2322
2323 // we'll need to gather...
2324 ldout(cct, 10) << "flush_all will wait for ack tid "
2325 << ob->last_write_tid << " on " << *ob << dendl;
2326 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2327 }
2328
2329 return _flush_set_finish(&gather, onfinish);
2330 }
2331
2332 void ObjectCacher::purge_set(ObjectSet *oset)
2333 {
2334 ceph_assert(ceph_mutex_is_locked(lock));
2335 if (oset->objects.empty()) {
2336 ldout(cct, 10) << "purge_set on " << oset << " dne" << dendl;
2337 return;
2338 }
2339
2340 ldout(cct, 10) << "purge_set " << oset << dendl;
2341 const bool were_dirty = oset->dirty_or_tx > 0;
2342
2343 for (xlist<Object*>::iterator i = oset->objects.begin();
2344 !i.end(); ++i) {
2345 Object *ob = *i;
2346 purge(ob);
2347 }
2348
2349 // Although we have purged rather than flushed, caller should still
2350 // drop any resources associate with dirty data.
2351 ceph_assert(oset->dirty_or_tx == 0);
2352 if (flush_set_callback && were_dirty) {
2353 flush_set_callback(flush_set_callback_arg, oset);
2354 }
2355 }
2356
2357
2358 loff_t ObjectCacher::release(Object *ob)
2359 {
2360 ceph_assert(ceph_mutex_is_locked(lock));
2361 list<BufferHead*> clean;
2362 loff_t o_unclean = 0;
2363
2364 for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
2365 p != ob->data.end();
2366 ++p) {
2367 BufferHead *bh = p->second;
2368 if (bh->is_clean() || bh->is_zero() || bh->is_error())
2369 clean.push_back(bh);
2370 else
2371 o_unclean += bh->length();
2372 }
2373
2374 for (list<BufferHead*>::iterator p = clean.begin();
2375 p != clean.end();
2376 ++p) {
2377 bh_remove(ob, *p);
2378 delete *p;
2379 }
2380
2381 if (ob->can_close()) {
2382 ldout(cct, 10) << "release trimming " << *ob << dendl;
2383 close_object(ob);
2384 ceph_assert(o_unclean == 0);
2385 return 0;
2386 }
2387
2388 if (ob->complete) {
2389 ldout(cct, 10) << "release clearing complete on " << *ob << dendl;
2390 ob->complete = false;
2391 }
2392 if (!ob->exists) {
2393 ldout(cct, 10) << "release setting exists on " << *ob << dendl;
2394 ob->exists = true;
2395 }
2396
2397 return o_unclean;
2398 }
2399
2400 loff_t ObjectCacher::release_set(ObjectSet *oset)
2401 {
2402 ceph_assert(ceph_mutex_is_locked(lock));
2403 // return # bytes not clean (and thus not released).
2404 loff_t unclean = 0;
2405
2406 if (oset->objects.empty()) {
2407 ldout(cct, 10) << "release_set on " << oset << " dne" << dendl;
2408 return 0;
2409 }
2410
2411 ldout(cct, 10) << "release_set " << oset << dendl;
2412
2413 xlist<Object*>::iterator q;
2414 for (xlist<Object*>::iterator p = oset->objects.begin();
2415 !p.end(); ) {
2416 q = p;
2417 ++q;
2418 Object *ob = *p;
2419
2420 loff_t o_unclean = release(ob);
2421 unclean += o_unclean;
2422
2423 if (o_unclean)
2424 ldout(cct, 10) << "release_set " << oset << " " << *ob
2425 << " has " << o_unclean << " bytes left"
2426 << dendl;
2427 p = q;
2428 }
2429
2430 if (unclean) {
2431 ldout(cct, 10) << "release_set " << oset
2432 << ", " << unclean << " bytes left" << dendl;
2433 }
2434
2435 return unclean;
2436 }
2437
2438
2439 uint64_t ObjectCacher::release_all()
2440 {
2441 ceph_assert(ceph_mutex_is_locked(lock));
2442 ldout(cct, 10) << "release_all" << dendl;
2443 uint64_t unclean = 0;
2444
2445 vector<ceph::unordered_map<sobject_t, Object*> >::iterator i
2446 = objects.begin();
2447 while (i != objects.end()) {
2448 ceph::unordered_map<sobject_t, Object*>::iterator p = i->begin();
2449 while (p != i->end()) {
2450 ceph::unordered_map<sobject_t, Object*>::iterator n = p;
2451 ++n;
2452
2453 Object *ob = p->second;
2454
2455 loff_t o_unclean = release(ob);
2456 unclean += o_unclean;
2457
2458 if (o_unclean)
2459 ldout(cct, 10) << "release_all " << *ob
2460 << " has " << o_unclean << " bytes left"
2461 << dendl;
2462 p = n;
2463 }
2464 ++i;
2465 }
2466
2467 if (unclean) {
2468 ldout(cct, 10) << "release_all unclean " << unclean << " bytes left"
2469 << dendl;
2470 }
2471
2472 return unclean;
2473 }
2474
2475 void ObjectCacher::clear_nonexistence(ObjectSet *oset)
2476 {
2477 ceph_assert(ceph_mutex_is_locked(lock));
2478 ldout(cct, 10) << "clear_nonexistence() " << oset << dendl;
2479
2480 for (xlist<Object*>::iterator p = oset->objects.begin();
2481 !p.end(); ++p) {
2482 Object *ob = *p;
2483 if (!ob->exists) {
2484 ldout(cct, 10) << " setting exists and complete on " << *ob << dendl;
2485 ob->exists = true;
2486 ob->complete = false;
2487 }
2488 for (xlist<C_ReadFinish*>::iterator q = ob->reads.begin();
2489 !q.end(); ++q) {
2490 C_ReadFinish *comp = *q;
2491 comp->distrust_enoent();
2492 }
2493 }
2494 }
2495
2496 /**
2497 * discard object extents from an ObjectSet by removing the objects in
2498 * exls from the in-memory oset.
2499 */
2500 void ObjectCacher::discard_set(ObjectSet *oset, const vector<ObjectExtent>& exls)
2501 {
2502 ceph_assert(ceph_mutex_is_locked(lock));
2503 bool was_dirty = oset->dirty_or_tx > 0;
2504
2505 _discard(oset, exls, nullptr);
2506 _discard_finish(oset, was_dirty, nullptr);
2507 }
2508
2509 /**
2510 * discard object extents from an ObjectSet by removing the objects in
2511 * exls from the in-memory oset. If the bh is in TX state, the discard
2512 * will wait for the write to commit prior to invoking on_finish.
2513 */
2514 void ObjectCacher::discard_writeback(ObjectSet *oset,
2515 const vector<ObjectExtent>& exls,
2516 Context* on_finish)
2517 {
2518 ceph_assert(ceph_mutex_is_locked(lock));
2519 bool was_dirty = oset->dirty_or_tx > 0;
2520
2521 C_GatherBuilder gather(cct);
2522 _discard(oset, exls, &gather);
2523
2524 if (gather.has_subs()) {
2525 bool flushed = was_dirty && oset->dirty_or_tx == 0;
2526 gather.set_finisher(new LambdaContext(
2527 [this, oset, flushed, on_finish](int) {
2528 ceph_assert(ceph_mutex_is_locked(lock));
2529 if (flushed && flush_set_callback)
2530 flush_set_callback(flush_set_callback_arg, oset);
2531 if (on_finish)
2532 on_finish->complete(0);
2533 }));
2534 gather.activate();
2535 return;
2536 }
2537
2538 _discard_finish(oset, was_dirty, on_finish);
2539 }
2540
2541 void ObjectCacher::_discard(ObjectSet *oset, const vector<ObjectExtent>& exls,
2542 C_GatherBuilder* gather)
2543 {
2544 if (oset->objects.empty()) {
2545 ldout(cct, 10) << __func__ << " on " << oset << " dne" << dendl;
2546 return;
2547 }
2548
2549 ldout(cct, 10) << __func__ << " " << oset << dendl;
2550
2551 for (auto& ex : exls) {
2552 ldout(cct, 10) << __func__ << " " << oset << " ex " << ex << dendl;
2553 sobject_t soid(ex.oid, CEPH_NOSNAP);
2554 if (objects[oset->poolid].count(soid) == 0)
2555 continue;
2556 Object *ob = objects[oset->poolid][soid];
2557
2558 ob->discard(ex.offset, ex.length, gather);
2559 }
2560 }
2561
2562 void ObjectCacher::_discard_finish(ObjectSet *oset, bool was_dirty,
2563 Context* on_finish)
2564 {
2565 ceph_assert(ceph_mutex_is_locked(lock));
2566
2567 // did we truncate off dirty data?
2568 if (flush_set_callback && was_dirty && oset->dirty_or_tx == 0) {
2569 flush_set_callback(flush_set_callback_arg, oset);
2570 }
2571
2572 // notify that in-flight writeback has completed
2573 if (on_finish != nullptr) {
2574 on_finish->complete(0);
2575 }
2576 }
2577
2578 void ObjectCacher::verify_stats() const
2579 {
2580 ceph_assert(ceph_mutex_is_locked(lock));
2581 ldout(cct, 10) << "verify_stats" << dendl;
2582
2583 loff_t clean = 0, zero = 0, dirty = 0, rx = 0, tx = 0, missing = 0,
2584 error = 0;
2585 for (vector<ceph::unordered_map<sobject_t, Object*> >::const_iterator i
2586 = objects.begin();
2587 i != objects.end();
2588 ++i) {
2589 for (ceph::unordered_map<sobject_t, Object*>::const_iterator p
2590 = i->begin();
2591 p != i->end();
2592 ++p) {
2593 Object *ob = p->second;
2594 for (map<loff_t, BufferHead*>::const_iterator q = ob->data.begin();
2595 q != ob->data.end();
2596 ++q) {
2597 BufferHead *bh = q->second;
2598 switch (bh->get_state()) {
2599 case BufferHead::STATE_MISSING:
2600 missing += bh->length();
2601 break;
2602 case BufferHead::STATE_CLEAN:
2603 clean += bh->length();
2604 break;
2605 case BufferHead::STATE_ZERO:
2606 zero += bh->length();
2607 break;
2608 case BufferHead::STATE_DIRTY:
2609 dirty += bh->length();
2610 break;
2611 case BufferHead::STATE_TX:
2612 tx += bh->length();
2613 break;
2614 case BufferHead::STATE_RX:
2615 rx += bh->length();
2616 break;
2617 case BufferHead::STATE_ERROR:
2618 error += bh->length();
2619 break;
2620 default:
2621 ceph_abort();
2622 }
2623 }
2624 }
2625 }
2626
2627 ldout(cct, 10) << " clean " << clean << " rx " << rx << " tx " << tx
2628 << " dirty " << dirty << " missing " << missing
2629 << " error " << error << dendl;
2630 ceph_assert(clean == stat_clean);
2631 ceph_assert(rx == stat_rx);
2632 ceph_assert(tx == stat_tx);
2633 ceph_assert(dirty == stat_dirty);
2634 ceph_assert(missing == stat_missing);
2635 ceph_assert(zero == stat_zero);
2636 ceph_assert(error == stat_error);
2637 }
2638
2639 void ObjectCacher::bh_stat_add(BufferHead *bh)
2640 {
2641 ceph_assert(ceph_mutex_is_locked(lock));
2642 switch (bh->get_state()) {
2643 case BufferHead::STATE_MISSING:
2644 stat_missing += bh->length();
2645 break;
2646 case BufferHead::STATE_CLEAN:
2647 stat_clean += bh->length();
2648 break;
2649 case BufferHead::STATE_ZERO:
2650 stat_zero += bh->length();
2651 break;
2652 case BufferHead::STATE_DIRTY:
2653 stat_dirty += bh->length();
2654 bh->ob->dirty_or_tx += bh->length();
2655 bh->ob->oset->dirty_or_tx += bh->length();
2656 break;
2657 case BufferHead::STATE_TX:
2658 stat_tx += bh->length();
2659 bh->ob->dirty_or_tx += bh->length();
2660 bh->ob->oset->dirty_or_tx += bh->length();
2661 break;
2662 case BufferHead::STATE_RX:
2663 stat_rx += bh->length();
2664 break;
2665 case BufferHead::STATE_ERROR:
2666 stat_error += bh->length();
2667 break;
2668 default:
2669 ceph_abort_msg("bh_stat_add: invalid bufferhead state");
2670 }
2671 if (get_stat_dirty_waiting() > 0)
2672 stat_cond.notify_all();
2673 }
2674
2675 void ObjectCacher::bh_stat_sub(BufferHead *bh)
2676 {
2677 ceph_assert(ceph_mutex_is_locked(lock));
2678 switch (bh->get_state()) {
2679 case BufferHead::STATE_MISSING:
2680 stat_missing -= bh->length();
2681 break;
2682 case BufferHead::STATE_CLEAN:
2683 stat_clean -= bh->length();
2684 break;
2685 case BufferHead::STATE_ZERO:
2686 stat_zero -= bh->length();
2687 break;
2688 case BufferHead::STATE_DIRTY:
2689 stat_dirty -= bh->length();
2690 bh->ob->dirty_or_tx -= bh->length();
2691 bh->ob->oset->dirty_or_tx -= bh->length();
2692 break;
2693 case BufferHead::STATE_TX:
2694 stat_tx -= bh->length();
2695 bh->ob->dirty_or_tx -= bh->length();
2696 bh->ob->oset->dirty_or_tx -= bh->length();
2697 break;
2698 case BufferHead::STATE_RX:
2699 stat_rx -= bh->length();
2700 break;
2701 case BufferHead::STATE_ERROR:
2702 stat_error -= bh->length();
2703 break;
2704 default:
2705 ceph_abort_msg("bh_stat_sub: invalid bufferhead state");
2706 }
2707 }
2708
2709 void ObjectCacher::bh_set_state(BufferHead *bh, int s)
2710 {
2711 ceph_assert(ceph_mutex_is_locked(lock));
2712 int state = bh->get_state();
2713 // move between lru lists?
2714 if (s == BufferHead::STATE_DIRTY && state != BufferHead::STATE_DIRTY) {
2715 bh_lru_rest.lru_remove(bh);
2716 bh_lru_dirty.lru_insert_top(bh);
2717 } else if (s != BufferHead::STATE_DIRTY &&state == BufferHead::STATE_DIRTY) {
2718 bh_lru_dirty.lru_remove(bh);
2719 if (bh->get_dontneed())
2720 bh_lru_rest.lru_insert_bot(bh);
2721 else
2722 bh_lru_rest.lru_insert_top(bh);
2723 }
2724
2725 if ((s == BufferHead::STATE_TX ||
2726 s == BufferHead::STATE_DIRTY) &&
2727 state != BufferHead::STATE_TX &&
2728 state != BufferHead::STATE_DIRTY) {
2729 dirty_or_tx_bh.insert(bh);
2730 } else if ((state == BufferHead::STATE_TX ||
2731 state == BufferHead::STATE_DIRTY) &&
2732 s != BufferHead::STATE_TX &&
2733 s != BufferHead::STATE_DIRTY) {
2734 dirty_or_tx_bh.erase(bh);
2735 }
2736
2737 if (s != BufferHead::STATE_ERROR &&
2738 state == BufferHead::STATE_ERROR) {
2739 bh->error = 0;
2740 }
2741
2742 // set state
2743 bh_stat_sub(bh);
2744 bh->set_state(s);
2745 bh_stat_add(bh);
2746 }
2747
2748 void ObjectCacher::bh_add(Object *ob, BufferHead *bh)
2749 {
2750 ceph_assert(ceph_mutex_is_locked(lock));
2751 ldout(cct, 30) << "bh_add " << *ob << " " << *bh << dendl;
2752 ob->add_bh(bh);
2753 if (bh->is_dirty()) {
2754 bh_lru_dirty.lru_insert_top(bh);
2755 dirty_or_tx_bh.insert(bh);
2756 } else {
2757 if (bh->get_dontneed())
2758 bh_lru_rest.lru_insert_bot(bh);
2759 else
2760 bh_lru_rest.lru_insert_top(bh);
2761 }
2762
2763 if (bh->is_tx()) {
2764 dirty_or_tx_bh.insert(bh);
2765 }
2766 bh_stat_add(bh);
2767 }
2768
2769 void ObjectCacher::bh_remove(Object *ob, BufferHead *bh)
2770 {
2771 ceph_assert(ceph_mutex_is_locked(lock));
2772 ceph_assert(bh->get_journal_tid() == 0);
2773 ldout(cct, 30) << "bh_remove " << *ob << " " << *bh << dendl;
2774 ob->remove_bh(bh);
2775 if (bh->is_dirty()) {
2776 bh_lru_dirty.lru_remove(bh);
2777 dirty_or_tx_bh.erase(bh);
2778 } else {
2779 bh_lru_rest.lru_remove(bh);
2780 }
2781
2782 if (bh->is_tx()) {
2783 dirty_or_tx_bh.erase(bh);
2784 }
2785 bh_stat_sub(bh);
2786 if (get_stat_dirty_waiting() > 0)
2787 stat_cond.notify_all();
2788 }
2789