]> git.proxmox.com Git - ceph.git/blob - ceph/src/osdc/ObjectCacher.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / osdc / ObjectCacher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <limits.h>
5
6 #include "msg/Messenger.h"
7 #include "ObjectCacher.h"
8 #include "WritebackHandler.h"
9 #include "common/errno.h"
10 #include "common/perf_counters.h"
11
12 #include "include/ceph_assert.h"
13
14 #define MAX_FLUSH_UNDER_LOCK 20 ///< max bh's we start writeback on
15 #define BUFFER_MEMORY_WEIGHT CEPH_PAGE_SHIFT // memory usage of BufferHead, count in (1<<n)
16 /// while holding the lock
17
18 using std::chrono::seconds;
19 using std::list;
20 using std::map;
21 using std::make_pair;
22 using std::pair;
23 using std::set;
24 using std::string;
25 using std::vector;
26
27 using ceph::bufferlist;
28
29 using namespace std::literals;
30
31 /*** ObjectCacher::BufferHead ***/
32
33
34 /*** ObjectCacher::Object ***/
35
36 #define dout_subsys ceph_subsys_objectcacher
37 #undef dout_prefix
38 #define dout_prefix *_dout << "objectcacher.object(" << oid << ") "
39
40
41
42 class ObjectCacher::C_ReadFinish : public Context {
43 ObjectCacher *oc;
44 int64_t poolid;
45 sobject_t oid;
46 loff_t start;
47 uint64_t length;
48 xlist<C_ReadFinish*>::item set_item;
49 bool trust_enoent;
50 ceph_tid_t tid;
51 ZTracer::Trace trace;
52
53 public:
54 bufferlist bl;
55 C_ReadFinish(ObjectCacher *c, Object *ob, ceph_tid_t t, loff_t s,
56 uint64_t l, const ZTracer::Trace &trace) :
57 oc(c), poolid(ob->oloc.pool), oid(ob->get_soid()), start(s), length(l),
58 set_item(this), trust_enoent(true),
59 tid(t), trace(trace) {
60 ob->reads.push_back(&set_item);
61 }
62
63 void finish(int r) override {
64 oc->bh_read_finish(poolid, oid, tid, start, length, bl, r, trust_enoent);
65 trace.event("finish");
66
67 // object destructor clears the list
68 if (set_item.is_on_list())
69 set_item.remove_myself();
70 }
71
72 void distrust_enoent() {
73 trust_enoent = false;
74 }
75 };
76
77 class ObjectCacher::C_RetryRead : public Context {
78 ObjectCacher *oc;
79 OSDRead *rd;
80 ObjectSet *oset;
81 Context *onfinish;
82 ZTracer::Trace trace;
83 public:
84 C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c,
85 const ZTracer::Trace &trace)
86 : oc(_oc), rd(r), oset(os), onfinish(c), trace(trace) {
87 }
88 void finish(int r) override {
89 if (r >= 0) {
90 r = oc->_readx(rd, oset, onfinish, false, &trace);
91 }
92
93 if (r == 0) {
94 // read is still in-progress
95 return;
96 }
97
98 trace.event("finish");
99 if (onfinish) {
100 onfinish->complete(r);
101 }
102 }
103 };
104
105 ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left,
106 loff_t off)
107 {
108 ceph_assert(ceph_mutex_is_locked(oc->lock));
109 ldout(oc->cct, 20) << "split " << *left << " at " << off << dendl;
110
111 // split off right
112 ObjectCacher::BufferHead *right = new BufferHead(this);
113
114 //inherit and if later access, this auto clean.
115 right->set_dontneed(left->get_dontneed());
116 right->set_nocache(left->get_nocache());
117
118 right->last_write_tid = left->last_write_tid;
119 right->last_read_tid = left->last_read_tid;
120 right->set_state(left->get_state());
121 right->set_error(left->error);
122 right->snapc = left->snapc;
123 right->set_journal_tid(left->journal_tid);
124
125 loff_t newleftlen = off - left->start();
126 right->set_start(off);
127 right->set_length(left->length() - newleftlen);
128
129 // shorten left
130 oc->bh_stat_sub(left);
131 left->set_length(newleftlen);
132 oc->bh_stat_add(left);
133
134 // add right
135 oc->bh_add(this, right);
136
137 // split buffers too
138 bufferlist bl;
139 bl = std::move(left->bl);
140 if (bl.length()) {
141 ceph_assert(bl.length() == (left->length() + right->length()));
142 right->bl.substr_of(bl, left->length(), right->length());
143 left->bl.substr_of(bl, 0, left->length());
144 }
145
146 // move read waiters
147 if (!left->waitfor_read.empty()) {
148 auto start_remove = left->waitfor_read.begin();
149 while (start_remove != left->waitfor_read.end() &&
150 start_remove->first < right->start())
151 ++start_remove;
152 for (auto p = start_remove; p != left->waitfor_read.end(); ++p) {
153 ldout(oc->cct, 20) << "split moving waiters at byte " << p->first
154 << " to right bh" << dendl;
155 right->waitfor_read[p->first].swap( p->second );
156 ceph_assert(p->second.empty());
157 }
158 left->waitfor_read.erase(start_remove, left->waitfor_read.end());
159 }
160
161 ldout(oc->cct, 20) << "split left is " << *left << dendl;
162 ldout(oc->cct, 20) << "split right is " << *right << dendl;
163 return right;
164 }
165
166
167 void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
168 {
169 ceph_assert(ceph_mutex_is_locked(oc->lock));
170
171 ldout(oc->cct, 10) << "merge_left " << *left << " + " << *right << dendl;
172 if (left->get_journal_tid() == 0) {
173 left->set_journal_tid(right->get_journal_tid());
174 }
175 right->set_journal_tid(0);
176
177 oc->bh_remove(this, right);
178 oc->bh_stat_sub(left);
179 left->set_length(left->length() + right->length());
180 oc->bh_stat_add(left);
181
182 // data
183 left->bl.claim_append(right->bl);
184
185 // version
186 // note: this is sorta busted, but should only be used for dirty buffers
187 left->last_write_tid = std::max( left->last_write_tid, right->last_write_tid );
188 left->last_write = std::max( left->last_write, right->last_write );
189
190 left->set_dontneed(right->get_dontneed() ? left->get_dontneed() : false);
191 left->set_nocache(right->get_nocache() ? left->get_nocache() : false);
192
193 // waiters
194 for (auto p = right->waitfor_read.begin();
195 p != right->waitfor_read.end();
196 ++p)
197 left->waitfor_read[p->first].splice(left->waitfor_read[p->first].begin(),
198 p->second );
199
200 // hose right
201 delete right;
202
203 ldout(oc->cct, 10) << "merge_left result " << *left << dendl;
204 }
205
206 bool ObjectCacher::Object::can_merge_bh(BufferHead *left, BufferHead *right)
207 {
208 if (left->end() != right->start() ||
209 left->get_state() != right->get_state() ||
210 !left->can_merge_journal(right))
211 return false;
212 if (left->is_tx() && left->last_write_tid != right->last_write_tid)
213 return false;
214 return true;
215 }
216
217 void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
218 {
219 ceph_assert(ceph_mutex_is_locked(oc->lock));
220 ldout(oc->cct, 10) << "try_merge_bh " << *bh << dendl;
221
222 // do not merge rx buffers; last_read_tid may not match
223 if (bh->is_rx())
224 return;
225
226 // to the left?
227 auto p = data.find(bh->start());
228 ceph_assert(p->second == bh);
229 if (p != data.begin()) {
230 --p;
231 if (can_merge_bh(p->second, bh)) {
232 merge_left(p->second, bh);
233 bh = p->second;
234 } else {
235 ++p;
236 }
237 }
238 // to the right?
239 ceph_assert(p->second == bh);
240 ++p;
241 if (p != data.end() && can_merge_bh(bh, p->second))
242 merge_left(bh, p->second);
243
244 maybe_rebuild_buffer(bh);
245 }
246
247 void ObjectCacher::Object::maybe_rebuild_buffer(BufferHead *bh)
248 {
249 auto& bl = bh->bl;
250 if (bl.get_num_buffers() <= 1)
251 return;
252
253 auto wasted = bl.get_wasted_space();
254 if (wasted * 2 > bl.length() &&
255 wasted > (1U << BUFFER_MEMORY_WEIGHT))
256 bl.rebuild();
257 }
258
259 /*
260 * count bytes we have cached in given range
261 */
262 bool ObjectCacher::Object::is_cached(loff_t cur, loff_t left) const
263 {
264 ceph_assert(ceph_mutex_is_locked(oc->lock));
265 auto p = data_lower_bound(cur);
266 while (left > 0) {
267 if (p == data.end())
268 return false;
269
270 if (p->first <= cur) {
271 // have part of it
272 loff_t lenfromcur = std::min(p->second->end() - cur, left);
273 cur += lenfromcur;
274 left -= lenfromcur;
275 ++p;
276 continue;
277 } else if (p->first > cur) {
278 // gap
279 return false;
280 } else
281 ceph_abort();
282 }
283
284 return true;
285 }
286
287 /*
288 * all cached data in this range[off, off+len]
289 */
290 bool ObjectCacher::Object::include_all_cached_data(loff_t off, loff_t len)
291 {
292 ceph_assert(ceph_mutex_is_locked(oc->lock));
293 if (data.empty())
294 return true;
295 auto first = data.begin();
296 auto last = data.rbegin();
297 if (first->second->start() >= off && last->second->end() <= (off + len))
298 return true;
299 else
300 return false;
301 }
302
303 /*
304 * map a range of bytes into buffer_heads.
305 * - create missing buffer_heads as necessary.
306 */
307 int ObjectCacher::Object::map_read(ObjectExtent &ex,
308 map<loff_t, BufferHead*>& hits,
309 map<loff_t, BufferHead*>& missing,
310 map<loff_t, BufferHead*>& rx,
311 map<loff_t, BufferHead*>& errors)
312 {
313 ceph_assert(ceph_mutex_is_locked(oc->lock));
314 ldout(oc->cct, 10) << "map_read " << ex.oid << " "
315 << ex.offset << "~" << ex.length << dendl;
316
317 loff_t cur = ex.offset;
318 loff_t left = ex.length;
319
320 auto p = data_lower_bound(ex.offset);
321 while (left > 0) {
322 // at end?
323 if (p == data.end()) {
324 // rest is a miss.
325 BufferHead *n = new BufferHead(this);
326 n->set_start(cur);
327 n->set_length(left);
328 oc->bh_add(this, n);
329 if (complete) {
330 oc->mark_zero(n);
331 hits[cur] = n;
332 ldout(oc->cct, 20) << "map_read miss+complete+zero " << left << " left, " << *n << dendl;
333 } else {
334 missing[cur] = n;
335 ldout(oc->cct, 20) << "map_read miss " << left << " left, " << *n << dendl;
336 }
337 cur += left;
338 ceph_assert(cur == (loff_t)ex.offset + (loff_t)ex.length);
339 break; // no more.
340 }
341
342 if (p->first <= cur) {
343 // have it (or part of it)
344 BufferHead *e = p->second;
345
346 if (e->is_clean() ||
347 e->is_dirty() ||
348 e->is_tx() ||
349 e->is_zero()) {
350 hits[cur] = e; // readable!
351 ldout(oc->cct, 20) << "map_read hit " << *e << dendl;
352 } else if (e->is_rx()) {
353 rx[cur] = e; // missing, not readable.
354 ldout(oc->cct, 20) << "map_read rx " << *e << dendl;
355 } else if (e->is_error()) {
356 errors[cur] = e;
357 ldout(oc->cct, 20) << "map_read error " << *e << dendl;
358 } else {
359 ceph_abort();
360 }
361
362 loff_t lenfromcur = std::min(e->end() - cur, left);
363 cur += lenfromcur;
364 left -= lenfromcur;
365 ++p;
366 continue; // more?
367
368 } else if (p->first > cur) {
369 // gap.. miss
370 loff_t next = p->first;
371 BufferHead *n = new BufferHead(this);
372 loff_t len = std::min(next - cur, left);
373 n->set_start(cur);
374 n->set_length(len);
375 oc->bh_add(this,n);
376 if (complete) {
377 oc->mark_zero(n);
378 hits[cur] = n;
379 ldout(oc->cct, 20) << "map_read gap+complete+zero " << *n << dendl;
380 } else {
381 missing[cur] = n;
382 ldout(oc->cct, 20) << "map_read gap " << *n << dendl;
383 }
384 cur += std::min(left, n->length());
385 left -= std::min(left, n->length());
386 continue; // more?
387 } else {
388 ceph_abort();
389 }
390 }
391 return 0;
392 }
393
394 void ObjectCacher::Object::audit_buffers()
395 {
396 loff_t offset = 0;
397 for (auto it = data.begin(); it != data.end(); ++it) {
398 if (it->first != it->second->start()) {
399 lderr(oc->cct) << "AUDIT FAILURE: map position " << it->first
400 << " does not match bh start position: "
401 << *it->second << dendl;
402 ceph_assert(it->first == it->second->start());
403 }
404 if (it->first < offset) {
405 lderr(oc->cct) << "AUDIT FAILURE: " << it->first << " " << *it->second
406 << " overlaps with previous bh " << *((--it)->second)
407 << dendl;
408 ceph_assert(it->first >= offset);
409 }
410 BufferHead *bh = it->second;
411 for (auto w_it = bh->waitfor_read.begin();
412 w_it != bh->waitfor_read.end(); ++w_it) {
413 if (w_it->first < bh->start() ||
414 w_it->first >= bh->start() + bh->length()) {
415 lderr(oc->cct) << "AUDIT FAILURE: waiter at " << w_it->first
416 << " is not within bh " << *bh << dendl;
417 ceph_assert(w_it->first >= bh->start());
418 ceph_assert(w_it->first < bh->start() + bh->length());
419 }
420 }
421 offset = it->first + it->second->length();
422 }
423 }
424
425 /*
426 * map a range of extents on an object's buffer cache.
427 * - combine any bh's we're writing into one
428 * - break up bufferheads that don't fall completely within the range
429 * //no! - return a bh that includes the write. may also include
430 * other dirty data to left and/or right.
431 */
432 ObjectCacher::BufferHead *ObjectCacher::Object::map_write(ObjectExtent &ex,
433 ceph_tid_t tid)
434 {
435 ceph_assert(ceph_mutex_is_locked(oc->lock));
436 BufferHead *final = 0;
437
438 ldout(oc->cct, 10) << "map_write oex " << ex.oid
439 << " " << ex.offset << "~" << ex.length << dendl;
440
441 loff_t cur = ex.offset;
442 loff_t left = ex.length;
443
444 auto p = data_lower_bound(ex.offset);
445 while (left > 0) {
446 loff_t max = left;
447
448 // at end ?
449 if (p == data.end()) {
450 if (final == NULL) {
451 final = new BufferHead(this);
452 replace_journal_tid(final, tid);
453 final->set_start( cur );
454 final->set_length( max );
455 oc->bh_add(this, final);
456 ldout(oc->cct, 10) << "map_write adding trailing bh " << *final << dendl;
457 } else {
458 oc->bh_stat_sub(final);
459 final->set_length(final->length() + max);
460 oc->bh_stat_add(final);
461 }
462 left -= max;
463 cur += max;
464 continue;
465 }
466
467 ldout(oc->cct, 10) << "cur is " << cur << ", p is " << *p->second << dendl;
468 //oc->verify_stats();
469
470 if (p->first <= cur) {
471 BufferHead *bh = p->second;
472 ldout(oc->cct, 10) << "map_write bh " << *bh << " intersected" << dendl;
473
474 if (p->first < cur) {
475 ceph_assert(final == 0);
476 if (cur + max >= bh->end()) {
477 // we want right bit (one splice)
478 final = split(bh, cur); // just split it, take right half.
479 maybe_rebuild_buffer(bh);
480 replace_journal_tid(final, tid);
481 ++p;
482 ceph_assert(p->second == final);
483 } else {
484 // we want middle bit (two splices)
485 final = split(bh, cur);
486 maybe_rebuild_buffer(bh);
487 ++p;
488 ceph_assert(p->second == final);
489 auto right = split(final, cur+max);
490 maybe_rebuild_buffer(right);
491 replace_journal_tid(final, tid);
492 }
493 } else {
494 ceph_assert(p->first == cur);
495 if (bh->length() <= max) {
496 // whole bufferhead, piece of cake.
497 } else {
498 // we want left bit (one splice)
499 auto right = split(bh, cur + max); // just split
500 maybe_rebuild_buffer(right);
501 }
502 if (final) {
503 oc->mark_dirty(bh);
504 oc->mark_dirty(final);
505 --p; // move iterator back to final
506 ceph_assert(p->second == final);
507 replace_journal_tid(bh, tid);
508 merge_left(final, bh);
509 } else {
510 final = bh;
511 replace_journal_tid(final, tid);
512 }
513 }
514
515 // keep going.
516 loff_t lenfromcur = final->end() - cur;
517 cur += lenfromcur;
518 left -= lenfromcur;
519 ++p;
520 continue;
521 } else {
522 // gap!
523 loff_t next = p->first;
524 loff_t glen = std::min(next - cur, max);
525 ldout(oc->cct, 10) << "map_write gap " << cur << "~" << glen << dendl;
526 if (final) {
527 oc->bh_stat_sub(final);
528 final->set_length(final->length() + glen);
529 oc->bh_stat_add(final);
530 } else {
531 final = new BufferHead(this);
532 replace_journal_tid(final, tid);
533 final->set_start( cur );
534 final->set_length( glen );
535 oc->bh_add(this, final);
536 }
537
538 cur += glen;
539 left -= glen;
540 continue; // more?
541 }
542 }
543
544 // set version
545 ceph_assert(final);
546 ceph_assert(final->get_journal_tid() == tid);
547 ldout(oc->cct, 10) << "map_write final is " << *final << dendl;
548
549 return final;
550 }
551
552 void ObjectCacher::Object::replace_journal_tid(BufferHead *bh,
553 ceph_tid_t tid) {
554 ceph_tid_t bh_tid = bh->get_journal_tid();
555
556 ceph_assert(tid == 0 || bh_tid <= tid);
557 if (bh_tid != 0 && bh_tid != tid) {
558 // inform journal that it should not expect a writeback from this extent
559 oc->writeback_handler.overwrite_extent(get_oid(), bh->start(),
560 bh->length(), bh_tid, tid);
561 }
562 bh->set_journal_tid(tid);
563 }
564
565 void ObjectCacher::Object::truncate(loff_t s)
566 {
567 ceph_assert(ceph_mutex_is_locked(oc->lock));
568 ldout(oc->cct, 10) << "truncate " << *this << " to " << s << dendl;
569
570 std::list<Context*> waiting_for_read;
571 while (!data.empty()) {
572 BufferHead *bh = data.rbegin()->second;
573 if (bh->end() <= s)
574 break;
575
576 // split bh at truncation point?
577 if (bh->start() < s) {
578 split(bh, s);
579 maybe_rebuild_buffer(bh);
580 continue;
581 }
582
583 // remove bh entirely
584 ceph_assert(bh->start() >= s);
585 for ([[maybe_unused]] auto& [off, ctxs] : bh->waitfor_read) {
586 waiting_for_read.splice(waiting_for_read.end(), ctxs);
587 }
588 bh->waitfor_read.clear();
589 replace_journal_tid(bh, 0);
590 oc->bh_remove(this, bh);
591 delete bh;
592 }
593 if (!waiting_for_read.empty()) {
594 ldout(oc->cct, 10) << "restarting reads post-truncate" << dendl;
595 }
596 finish_contexts(oc->cct, waiting_for_read, 0);
597 }
598
599 void ObjectCacher::Object::discard(loff_t off, loff_t len,
600 C_GatherBuilder* commit_gather)
601 {
602 ceph_assert(ceph_mutex_is_locked(oc->lock));
603 ldout(oc->cct, 10) << "discard " << *this << " " << off << "~" << len
604 << dendl;
605
606 if (!exists) {
607 ldout(oc->cct, 10) << " setting exists on " << *this << dendl;
608 exists = true;
609 }
610 if (complete) {
611 ldout(oc->cct, 10) << " clearing complete on " << *this << dendl;
612 complete = false;
613 }
614
615 std::list<Context*> waiting_for_read;
616 auto p = data_lower_bound(off);
617 while (p != data.end()) {
618 BufferHead *bh = p->second;
619 if (bh->start() >= off + len)
620 break;
621
622 // split bh at truncation point?
623 if (bh->start() < off) {
624 split(bh, off);
625 maybe_rebuild_buffer(bh);
626 ++p;
627 continue;
628 }
629
630 ceph_assert(bh->start() >= off);
631 if (bh->end() > off + len) {
632 auto right = split(bh, off + len);
633 maybe_rebuild_buffer(right);
634 }
635
636 ++p;
637 ldout(oc->cct, 10) << "discard " << *this << " bh " << *bh << dendl;
638 replace_journal_tid(bh, 0);
639
640 if (bh->is_tx() && commit_gather != nullptr) {
641 // wait for the writeback to commit
642 waitfor_commit[bh->last_write_tid].emplace_back(commit_gather->new_sub());
643 } else if (bh->is_rx()) {
644 // cannot remove bh with in-flight read, but we can ensure the
645 // read won't overwrite the discard
646 bh->last_read_tid = ++oc->last_read_tid;
647 bh->bl.clear();
648 bh->set_nocache(true);
649 oc->mark_zero(bh);
650 // we should mark all Rx bh to zero
651 continue;
652 } else {
653 for ([[maybe_unused]] auto& [off, ctxs] : bh->waitfor_read) {
654 waiting_for_read.splice(waiting_for_read.end(), ctxs);
655 }
656 bh->waitfor_read.clear();
657 }
658
659 oc->bh_remove(this, bh);
660 delete bh;
661 }
662 if (!waiting_for_read.empty()) {
663 ldout(oc->cct, 10) << "restarting reads post-discard" << dendl;
664 }
665 finish_contexts(oc->cct, waiting_for_read, 0); /* restart reads */
666 }
667
668
669
670 /*** ObjectCacher ***/
671
672 #undef dout_prefix
673 #define dout_prefix *_dout << "objectcacher "
674
675
676 ObjectCacher::ObjectCacher(CephContext *cct_, string name,
677 WritebackHandler& wb, ceph::mutex& l,
678 flush_set_callback_t flush_callback,
679 void *flush_callback_arg, uint64_t max_bytes,
680 uint64_t max_objects, uint64_t max_dirty,
681 uint64_t target_dirty, double max_dirty_age,
682 bool block_writes_upfront)
683 : perfcounter(NULL),
684 cct(cct_), writeback_handler(wb), name(name), lock(l),
685 max_dirty(max_dirty), target_dirty(target_dirty),
686 max_size(max_bytes), max_objects(max_objects),
687 max_dirty_age(ceph::make_timespan(max_dirty_age)),
688 block_writes_upfront(block_writes_upfront),
689 trace_endpoint("ObjectCacher"),
690 flush_set_callback(flush_callback),
691 flush_set_callback_arg(flush_callback_arg),
692 last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct),
693 stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
694 stat_missing(0), stat_error(0), stat_dirty_waiting(0),
695 stat_nr_dirty_waiters(0), reads_outstanding(0)
696 {
697 perf_start();
698 finisher.start();
699 scattered_write = writeback_handler.can_scattered_write();
700 }
701
702 ObjectCacher::~ObjectCacher()
703 {
704 finisher.stop();
705 perf_stop();
706 // we should be empty.
707 for (auto i = objects.begin(); i != objects.end(); ++i)
708 ceph_assert(i->empty());
709 ceph_assert(bh_lru_rest.lru_get_size() == 0);
710 ceph_assert(bh_lru_dirty.lru_get_size() == 0);
711 ceph_assert(ob_lru.lru_get_size() == 0);
712 ceph_assert(dirty_or_tx_bh.empty());
713 }
714
715 void ObjectCacher::perf_start()
716 {
717 string n = "objectcacher-" + name;
718 PerfCountersBuilder plb(cct, n, l_objectcacher_first, l_objectcacher_last);
719
720 plb.add_u64_counter(l_objectcacher_cache_ops_hit,
721 "cache_ops_hit", "Hit operations");
722 plb.add_u64_counter(l_objectcacher_cache_ops_miss,
723 "cache_ops_miss", "Miss operations");
724 plb.add_u64_counter(l_objectcacher_cache_bytes_hit,
725 "cache_bytes_hit", "Hit data", NULL, 0, unit_t(UNIT_BYTES));
726 plb.add_u64_counter(l_objectcacher_cache_bytes_miss,
727 "cache_bytes_miss", "Miss data", NULL, 0, unit_t(UNIT_BYTES));
728 plb.add_u64_counter(l_objectcacher_data_read,
729 "data_read", "Read data");
730 plb.add_u64_counter(l_objectcacher_data_written,
731 "data_written", "Data written to cache");
732 plb.add_u64_counter(l_objectcacher_data_flushed,
733 "data_flushed", "Data flushed");
734 plb.add_u64_counter(l_objectcacher_overwritten_in_flush,
735 "data_overwritten_while_flushing",
736 "Data overwritten while flushing");
737 plb.add_u64_counter(l_objectcacher_write_ops_blocked, "write_ops_blocked",
738 "Write operations, delayed due to dirty limits");
739 plb.add_u64_counter(l_objectcacher_write_bytes_blocked,
740 "write_bytes_blocked",
741 "Write data blocked on dirty limit", NULL, 0, unit_t(UNIT_BYTES));
742 plb.add_time(l_objectcacher_write_time_blocked, "write_time_blocked",
743 "Time spent blocking a write due to dirty limits");
744
745 perfcounter = plb.create_perf_counters();
746 cct->get_perfcounters_collection()->add(perfcounter);
747 }
748
749 void ObjectCacher::perf_stop()
750 {
751 ceph_assert(perfcounter);
752 cct->get_perfcounters_collection()->remove(perfcounter);
753 delete perfcounter;
754 }
755
756 /* private */
757 ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid,
758 uint64_t object_no,
759 ObjectSet *oset,
760 object_locator_t &l,
761 uint64_t truncate_size,
762 uint64_t truncate_seq)
763 {
764 // XXX: Add handling of nspace in object_locator_t in cache
765 ceph_assert(ceph_mutex_is_locked(lock));
766 // have it?
767 if ((uint32_t)l.pool < objects.size()) {
768 if (objects[l.pool].count(oid)) {
769 Object *o = objects[l.pool][oid];
770 o->object_no = object_no;
771 o->truncate_size = truncate_size;
772 o->truncate_seq = truncate_seq;
773 return o;
774 }
775 } else {
776 objects.resize(l.pool+1);
777 }
778
779 // create it.
780 Object *o = new Object(this, oid, object_no, oset, l, truncate_size,
781 truncate_seq);
782 objects[l.pool][oid] = o;
783 ob_lru.lru_insert_top(o);
784 return o;
785 }
786
787 void ObjectCacher::close_object(Object *ob)
788 {
789 ceph_assert(ceph_mutex_is_locked(lock));
790 ldout(cct, 10) << "close_object " << *ob << dendl;
791 ceph_assert(ob->can_close());
792
793 // ok!
794 ob_lru.lru_remove(ob);
795 objects[ob->oloc.pool].erase(ob->get_soid());
796 ob->set_item.remove_myself();
797 delete ob;
798 }
799
800 void ObjectCacher::bh_read(BufferHead *bh, int op_flags,
801 const ZTracer::Trace &parent_trace)
802 {
803 ceph_assert(ceph_mutex_is_locked(lock));
804 ldout(cct, 7) << "bh_read on " << *bh << " outstanding reads "
805 << reads_outstanding << dendl;
806
807 ZTracer::Trace trace;
808 if (parent_trace.valid()) {
809 trace.init("", &trace_endpoint, &parent_trace);
810 trace.copy_name("bh_read " + bh->ob->get_oid().name);
811 trace.event("start");
812 }
813
814 mark_rx(bh);
815 bh->last_read_tid = ++last_read_tid;
816
817 // finisher
818 C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob, bh->last_read_tid,
819 bh->start(), bh->length(), trace);
820 // go
821 writeback_handler.read(bh->ob->get_oid(), bh->ob->get_object_number(),
822 bh->ob->get_oloc(), bh->start(), bh->length(),
823 bh->ob->get_snap(), &onfinish->bl,
824 bh->ob->truncate_size, bh->ob->truncate_seq,
825 op_flags, trace, onfinish);
826
827 ++reads_outstanding;
828 }
829
830 void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid,
831 ceph_tid_t tid, loff_t start,
832 uint64_t length, bufferlist &bl, int r,
833 bool trust_enoent)
834 {
835 ceph_assert(ceph_mutex_is_locked(lock));
836 ldout(cct, 7) << "bh_read_finish "
837 << oid
838 << " tid " << tid
839 << " " << start << "~" << length
840 << " (bl is " << bl.length() << ")"
841 << " returned " << r
842 << " outstanding reads " << reads_outstanding
843 << dendl;
844
845 if (r >= 0 && bl.length() < length) {
846 ldout(cct, 7) << "bh_read_finish " << oid << " padding " << start << "~"
847 << length << " with " << length - bl.length() << " bytes of zeroes"
848 << dendl;
849 bl.append_zero(length - bl.length());
850 }
851
852 list<Context*> ls;
853 int err = 0;
854
855 if (objects[poolid].count(oid) == 0) {
856 ldout(cct, 7) << "bh_read_finish no object cache" << dendl;
857 } else {
858 Object *ob = objects[poolid][oid];
859
860 if (r == -ENOENT && !ob->complete) {
861 // wake up *all* rx waiters, or else we risk reordering
862 // identical reads. e.g.
863 // read 1~1
864 // reply to unrelated 3~1 -> !exists
865 // read 1~1 -> immediate ENOENT
866 // reply to first 1~1 -> ooo ENOENT
867 bool allzero = true;
868 for (auto p = ob->data.begin(); p != ob->data.end(); ++p) {
869 BufferHead *bh = p->second;
870 for (auto p = bh->waitfor_read.begin();
871 p != bh->waitfor_read.end();
872 ++p)
873 ls.splice(ls.end(), p->second);
874 bh->waitfor_read.clear();
875 if (!bh->is_zero() && !bh->is_rx())
876 allzero = false;
877 }
878
879 // just pass through and retry all waiters if we don't trust
880 // -ENOENT for this read
881 if (trust_enoent) {
882 ldout(cct, 7)
883 << "bh_read_finish ENOENT, marking complete and !exists on " << *ob
884 << dendl;
885 ob->complete = true;
886 ob->exists = false;
887
888 /* If all the bhs are effectively zero, get rid of them. All
889 * the waiters will be retried and get -ENOENT immediately, so
890 * it's safe to clean up the unneeded bh's now. Since we know
891 * it's safe to remove them now, do so, so they aren't hanging
892 *around waiting for more -ENOENTs from rados while the cache
893 * is being shut down.
894 *
895 * Only do this when all the bhs are rx or clean, to match the
896 * condition in _readx(). If there are any non-rx or non-clean
897 * bhs, _readx() will wait for the final result instead of
898 * returning -ENOENT immediately.
899 */
900 if (allzero) {
901 ldout(cct, 10)
902 << "bh_read_finish ENOENT and allzero, getting rid of "
903 << "bhs for " << *ob << dendl;
904 auto p = ob->data.begin();
905 while (p != ob->data.end()) {
906 BufferHead *bh = p->second;
907 // current iterator will be invalidated by bh_remove()
908 ++p;
909 bh_remove(ob, bh);
910 delete bh;
911 }
912 }
913 }
914 }
915
916 // apply to bh's!
917 loff_t opos = start;
918 while (true) {
919 auto p = ob->data_lower_bound(opos);
920 if (p == ob->data.end())
921 break;
922 if (opos >= start+(loff_t)length) {
923 ldout(cct, 20) << "break due to opos " << opos << " >= start+length "
924 << start << "+" << length << "=" << start+(loff_t)length
925 << dendl;
926 break;
927 }
928
929 BufferHead *bh = p->second;
930 ldout(cct, 20) << "checking bh " << *bh << dendl;
931
932 // finishers?
933 for (auto it = bh->waitfor_read.begin();
934 it != bh->waitfor_read.end();
935 ++it)
936 ls.splice(ls.end(), it->second);
937 bh->waitfor_read.clear();
938
939 if (bh->start() > opos) {
940 ldout(cct, 1) << "bh_read_finish skipping gap "
941 << opos << "~" << bh->start() - opos
942 << dendl;
943 opos = bh->start();
944 continue;
945 }
946
947 if (!bh->is_rx()) {
948 ldout(cct, 10) << "bh_read_finish skipping non-rx " << *bh << dendl;
949 opos = bh->end();
950 continue;
951 }
952
953 if (bh->last_read_tid != tid) {
954 ldout(cct, 10) << "bh_read_finish bh->last_read_tid "
955 << bh->last_read_tid << " != tid " << tid
956 << ", skipping" << dendl;
957 opos = bh->end();
958 continue;
959 }
960
961 ceph_assert(opos >= bh->start());
962 ceph_assert(bh->start() == opos); // we don't merge rx bh's... yet!
963 ceph_assert(bh->length() <= start+(loff_t)length-opos);
964
965 if (bh->error < 0)
966 err = bh->error;
967
968 opos = bh->end();
969
970 if (r == -ENOENT) {
971 if (trust_enoent) {
972 ldout(cct, 10) << "bh_read_finish removing " << *bh << dendl;
973 bh_remove(ob, bh);
974 delete bh;
975 } else {
976 ldout(cct, 10) << "skipping unstrusted -ENOENT and will retry for "
977 << *bh << dendl;
978 }
979 continue;
980 }
981
982 if (r < 0) {
983 bh->error = r;
984 mark_error(bh);
985 } else {
986 bh->bl.substr_of(bl,
987 bh->start() - start,
988 bh->length());
989 mark_clean(bh);
990 }
991
992 ldout(cct, 10) << "bh_read_finish read " << *bh << dendl;
993
994 ob->try_merge_bh(bh);
995 }
996 }
997
998 // called with lock held.
999 ldout(cct, 20) << "finishing waiters " << ls << dendl;
1000
1001 finish_contexts(cct, ls, err);
1002 retry_waiting_reads();
1003
1004 --reads_outstanding;
1005 read_cond.notify_all();
1006 }
1007
1008 void ObjectCacher::bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
1009 int64_t *max_amount, int *max_count)
1010 {
1011 list<BufferHead*> blist;
1012
1013 int count = 0;
1014 int64_t total_len = 0;
1015 set<BufferHead*, BufferHead::ptr_lt>::iterator it = dirty_or_tx_bh.find(bh);
1016 ceph_assert(it != dirty_or_tx_bh.end());
1017 for (set<BufferHead*, BufferHead::ptr_lt>::iterator p = it;
1018 p != dirty_or_tx_bh.end();
1019 ++p) {
1020 BufferHead *obh = *p;
1021 if (obh->ob != bh->ob)
1022 break;
1023 if (obh->is_dirty() && obh->last_write <= cutoff) {
1024 blist.push_back(obh);
1025 ++count;
1026 total_len += obh->length();
1027 if ((max_count && count > *max_count) ||
1028 (max_amount && total_len > *max_amount))
1029 break;
1030 }
1031 }
1032
1033 while (it != dirty_or_tx_bh.begin()) {
1034 --it;
1035 BufferHead *obh = *it;
1036 if (obh->ob != bh->ob)
1037 break;
1038 if (obh->is_dirty() && obh->last_write <= cutoff) {
1039 blist.push_front(obh);
1040 ++count;
1041 total_len += obh->length();
1042 if ((max_count && count > *max_count) ||
1043 (max_amount && total_len > *max_amount))
1044 break;
1045 }
1046 }
1047 if (max_count)
1048 *max_count -= count;
1049 if (max_amount)
1050 *max_amount -= total_len;
1051
1052 bh_write_scattered(blist);
1053 }
1054
1055 class ObjectCacher::C_WriteCommit : public Context {
1056 ObjectCacher *oc;
1057 int64_t poolid;
1058 sobject_t oid;
1059 vector<pair<loff_t, uint64_t> > ranges;
1060 ZTracer::Trace trace;
1061 public:
1062 ceph_tid_t tid = 0;
1063 C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, loff_t s,
1064 uint64_t l, const ZTracer::Trace &trace) :
1065 oc(c), poolid(_poolid), oid(o), trace(trace) {
1066 ranges.push_back(make_pair(s, l));
1067 }
1068 C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o,
1069 vector<pair<loff_t, uint64_t> >& _ranges) :
1070 oc(c), poolid(_poolid), oid(o), tid(0) {
1071 ranges.swap(_ranges);
1072 }
1073 void finish(int r) override {
1074 oc->bh_write_commit(poolid, oid, ranges, tid, r);
1075 trace.event("finish");
1076 }
1077 };
1078 void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
1079 {
1080 ceph_assert(ceph_mutex_is_locked(lock));
1081
1082 Object *ob = blist.front()->ob;
1083 ob->get();
1084
1085 ceph::real_time last_write;
1086 SnapContext snapc;
1087 vector<pair<loff_t, uint64_t> > ranges;
1088 vector<pair<uint64_t, bufferlist> > io_vec;
1089
1090 ranges.reserve(blist.size());
1091 io_vec.reserve(blist.size());
1092
1093 uint64_t total_len = 0;
1094 for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1095 BufferHead *bh = *p;
1096 ldout(cct, 7) << "bh_write_scattered " << *bh << dendl;
1097 ceph_assert(bh->ob == ob);
1098 ceph_assert(bh->bl.length() == bh->length());
1099 ranges.push_back(pair<loff_t, uint64_t>(bh->start(), bh->length()));
1100
1101 int n = io_vec.size();
1102 io_vec.resize(n + 1);
1103 io_vec[n].first = bh->start();
1104 io_vec[n].second = bh->bl;
1105
1106 total_len += bh->length();
1107 if (bh->snapc.seq > snapc.seq)
1108 snapc = bh->snapc;
1109 if (bh->last_write > last_write)
1110 last_write = bh->last_write;
1111 }
1112
1113 C_WriteCommit *oncommit = new C_WriteCommit(this, ob->oloc.pool, ob->get_soid(), ranges);
1114
1115 ceph_tid_t tid = writeback_handler.write(ob->get_oid(), ob->get_oloc(),
1116 io_vec, snapc, last_write,
1117 ob->truncate_size, ob->truncate_seq,
1118 oncommit);
1119 oncommit->tid = tid;
1120 ob->last_write_tid = tid;
1121 for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1122 BufferHead *bh = *p;
1123 bh->last_write_tid = tid;
1124 mark_tx(bh);
1125 }
1126
1127 if (perfcounter)
1128 perfcounter->inc(l_objectcacher_data_flushed, total_len);
1129 }
1130
1131 void ObjectCacher::bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace)
1132 {
1133 ceph_assert(ceph_mutex_is_locked(lock));
1134 ldout(cct, 7) << "bh_write " << *bh << dendl;
1135
1136 bh->ob->get();
1137
1138 ZTracer::Trace trace;
1139 if (parent_trace.valid()) {
1140 trace.init("", &trace_endpoint, &parent_trace);
1141 trace.copy_name("bh_write " + bh->ob->get_oid().name);
1142 trace.event("start");
1143 }
1144
1145 // finishers
1146 C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->oloc.pool,
1147 bh->ob->get_soid(), bh->start(),
1148 bh->length(), trace);
1149 // go
1150 ceph_tid_t tid = writeback_handler.write(bh->ob->get_oid(),
1151 bh->ob->get_oloc(),
1152 bh->start(), bh->length(),
1153 bh->snapc, bh->bl, bh->last_write,
1154 bh->ob->truncate_size,
1155 bh->ob->truncate_seq,
1156 bh->journal_tid, trace, oncommit);
1157 ldout(cct, 20) << " tid " << tid << " on " << bh->ob->get_oid() << dendl;
1158
1159 // set bh last_write_tid
1160 oncommit->tid = tid;
1161 bh->ob->last_write_tid = tid;
1162 bh->last_write_tid = tid;
1163
1164 if (perfcounter) {
1165 perfcounter->inc(l_objectcacher_data_flushed, bh->length());
1166 }
1167
1168 mark_tx(bh);
1169 }
1170
1171 void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid,
1172 vector<pair<loff_t, uint64_t> >& ranges,
1173 ceph_tid_t tid, int r)
1174 {
1175 ceph_assert(ceph_mutex_is_locked(lock));
1176 ldout(cct, 7) << "bh_write_commit " << oid << " tid " << tid
1177 << " ranges " << ranges << " returned " << r << dendl;
1178
1179 if (objects[poolid].count(oid) == 0) {
1180 ldout(cct, 7) << "bh_write_commit no object cache" << dendl;
1181 return;
1182 }
1183
1184 Object *ob = objects[poolid][oid];
1185 int was_dirty_or_tx = ob->oset->dirty_or_tx;
1186
1187 for (vector<pair<loff_t, uint64_t> >::iterator p = ranges.begin();
1188 p != ranges.end();
1189 ++p) {
1190 loff_t start = p->first;
1191 uint64_t length = p->second;
1192 if (!ob->exists) {
1193 ldout(cct, 10) << "bh_write_commit marking exists on " << *ob << dendl;
1194 ob->exists = true;
1195
1196 if (writeback_handler.may_copy_on_write(ob->get_oid(), start, length,
1197 ob->get_snap())) {
1198 ldout(cct, 10) << "bh_write_commit may copy on write, clearing "
1199 "complete on " << *ob << dendl;
1200 ob->complete = false;
1201 }
1202 }
1203
1204 vector<pair<loff_t, BufferHead*>> hit;
1205 // apply to bh's!
1206 for (map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(start);
1207 p != ob->data.end();
1208 ++p) {
1209 BufferHead *bh = p->second;
1210
1211 if (bh->start() >= start+(loff_t)length)
1212 break;
1213
1214 // make sure bh is tx
1215 if (!bh->is_tx()) {
1216 ldout(cct, 10) << "bh_write_commit skipping non-tx " << *bh << dendl;
1217 continue;
1218 }
1219
1220 // make sure bh tid matches
1221 if (bh->last_write_tid != tid) {
1222 ceph_assert(bh->last_write_tid > tid);
1223 ldout(cct, 10) << "bh_write_commit newer tid on " << *bh << dendl;
1224 continue;
1225 }
1226
1227 // we don't merge tx buffers. tx buffer should be within the range
1228 ceph_assert(bh->start() >= start);
1229 ceph_assert(bh->end() <= start+(loff_t)length);
1230
1231 if (r >= 0) {
1232 // ok! mark bh clean and error-free
1233 mark_clean(bh);
1234 bh->set_journal_tid(0);
1235 if (bh->get_nocache())
1236 bh_lru_rest.lru_bottouch(bh);
1237 hit.push_back(make_pair(bh->start(), bh));
1238 ldout(cct, 10) << "bh_write_commit clean " << *bh << dendl;
1239 } else {
1240 mark_dirty(bh);
1241 ldout(cct, 10) << "bh_write_commit marking dirty again due to error "
1242 << *bh << " r = " << r << " " << cpp_strerror(-r)
1243 << dendl;
1244 }
1245 }
1246
1247 for (auto& p : hit) {
1248 //p.second maybe merged and deleted in merge_left
1249 if (ob->data.count(p.first))
1250 ob->try_merge_bh(p.second);
1251 }
1252 }
1253
1254 // update last_commit.
1255 ceph_assert(ob->last_commit_tid < tid);
1256 ob->last_commit_tid = tid;
1257
1258 // waiters?
1259 list<Context*> ls;
1260 if (ob->waitfor_commit.count(tid)) {
1261 ls.splice(ls.begin(), ob->waitfor_commit[tid]);
1262 ob->waitfor_commit.erase(tid);
1263 }
1264
1265 // is the entire object set now clean and fully committed?
1266 ObjectSet *oset = ob->oset;
1267 ob->put();
1268
1269 if (flush_set_callback &&
1270 was_dirty_or_tx > 0 &&
1271 oset->dirty_or_tx == 0) { // nothing dirty/tx
1272 flush_set_callback(flush_set_callback_arg, oset);
1273 }
1274
1275 if (!ls.empty())
1276 finish_contexts(cct, ls, r);
1277 }
1278
1279 void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount)
1280 {
1281 ceph_assert(trace != nullptr);
1282 ceph_assert(ceph_mutex_is_locked(lock));
1283 ceph::real_time cutoff = ceph::real_clock::now();
1284
1285 ldout(cct, 10) << "flush " << amount << dendl;
1286
1287 /*
1288 * NOTE: we aren't actually pulling things off the LRU here, just
1289 * looking at the tail item. Then we call bh_write, which moves it
1290 * to the other LRU, so that we can call
1291 * lru_dirty.lru_get_next_expire() again.
1292 */
1293 int64_t left = amount;
1294 while (amount == 0 || left > 0) {
1295 BufferHead *bh = static_cast<BufferHead*>(
1296 bh_lru_dirty.lru_get_next_expire());
1297 if (!bh) break;
1298 if (bh->last_write > cutoff) break;
1299
1300 if (scattered_write) {
1301 bh_write_adjacencies(bh, cutoff, amount > 0 ? &left : NULL, NULL);
1302 } else {
1303 left -= bh->length();
1304 bh_write(bh, *trace);
1305 }
1306 }
1307 }
1308
1309
1310 void ObjectCacher::trim()
1311 {
1312 ceph_assert(ceph_mutex_is_locked(lock));
1313 ldout(cct, 10) << "trim start: bytes: max " << max_size << " clean "
1314 << get_stat_clean() << ", objects: max " << max_objects
1315 << " current " << ob_lru.lru_get_size() << dendl;
1316
1317 uint64_t max_clean_bh = max_size >> BUFFER_MEMORY_WEIGHT;
1318 uint64_t nr_clean_bh = bh_lru_rest.lru_get_size() - bh_lru_rest.lru_get_num_pinned();
1319 while (get_stat_clean() > 0 &&
1320 ((uint64_t)get_stat_clean() > max_size ||
1321 nr_clean_bh > max_clean_bh)) {
1322 BufferHead *bh = static_cast<BufferHead*>(bh_lru_rest.lru_expire());
1323 if (!bh)
1324 break;
1325
1326 ldout(cct, 10) << "trim trimming " << *bh << dendl;
1327 ceph_assert(bh->is_clean() || bh->is_zero() || bh->is_error());
1328
1329 Object *ob = bh->ob;
1330 bh_remove(ob, bh);
1331 delete bh;
1332
1333 --nr_clean_bh;
1334
1335 if (ob->complete) {
1336 ldout(cct, 10) << "trim clearing complete on " << *ob << dendl;
1337 ob->complete = false;
1338 }
1339 }
1340
1341 while (ob_lru.lru_get_size() > max_objects) {
1342 Object *ob = static_cast<Object*>(ob_lru.lru_expire());
1343 if (!ob)
1344 break;
1345
1346 ldout(cct, 10) << "trim trimming " << *ob << dendl;
1347 close_object(ob);
1348 }
1349
1350 ldout(cct, 10) << "trim finish: max " << max_size << " clean "
1351 << get_stat_clean() << ", objects: max " << max_objects
1352 << " current " << ob_lru.lru_get_size() << dendl;
1353 }
1354
1355
1356
1357 /* public */
1358
1359 bool ObjectCacher::is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
1360 snapid_t snapid)
1361 {
1362 ceph_assert(ceph_mutex_is_locked(lock));
1363 for (vector<ObjectExtent>::iterator ex_it = extents.begin();
1364 ex_it != extents.end();
1365 ++ex_it) {
1366 ldout(cct, 10) << "is_cached " << *ex_it << dendl;
1367
1368 // get Object cache
1369 sobject_t soid(ex_it->oid, snapid);
1370 Object *o = get_object_maybe(soid, ex_it->oloc);
1371 if (!o)
1372 return false;
1373 if (!o->is_cached(ex_it->offset, ex_it->length))
1374 return false;
1375 }
1376 return true;
1377 }
1378
1379
1380 /*
1381 * returns # bytes read (if in cache). onfinish is untouched (caller
1382 * must delete it)
1383 * returns 0 if doing async read
1384 */
1385 int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
1386 ZTracer::Trace *parent_trace)
1387 {
1388 ZTracer::Trace trace;
1389 if (parent_trace != nullptr) {
1390 trace.init("read", &trace_endpoint, parent_trace);
1391 trace.event("start");
1392 }
1393
1394 int r =_readx(rd, oset, onfinish, true, &trace);
1395 if (r < 0) {
1396 trace.event("finish");
1397 }
1398 return r;
1399 }
1400
1401 int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
1402 bool external_call, ZTracer::Trace *trace)
1403 {
1404 ceph_assert(trace != nullptr);
1405 ceph_assert(ceph_mutex_is_locked(lock));
1406 bool success = true;
1407 int error = 0;
1408 uint64_t bytes_in_cache = 0;
1409 uint64_t bytes_not_in_cache = 0;
1410 uint64_t total_bytes_read = 0;
1411 map<uint64_t, bufferlist> stripe_map; // final buffer offset -> substring
1412 bool dontneed = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1413 bool nocache = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1414
1415 /*
1416 * WARNING: we can only meaningfully return ENOENT if the read request
1417 * passed in a single ObjectExtent. Any caller who wants ENOENT instead of
1418 * zeroed buffers needs to feed single extents into readx().
1419 */
1420 ceph_assert(!oset->return_enoent || rd->extents.size() == 1);
1421
1422 for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
1423 ex_it != rd->extents.end();
1424 ++ex_it) {
1425 ldout(cct, 10) << "readx " << *ex_it << dendl;
1426
1427 total_bytes_read += ex_it->length;
1428
1429 // get Object cache
1430 sobject_t soid(ex_it->oid, rd->snap);
1431 Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1432 ex_it->truncate_size, oset->truncate_seq);
1433 if (external_call)
1434 touch_ob(o);
1435
1436 // does not exist and no hits?
1437 if (oset->return_enoent && !o->exists) {
1438 ldout(cct, 10) << "readx object !exists, 1 extent..." << dendl;
1439
1440 // should we worry about COW underneath us?
1441 if (writeback_handler.may_copy_on_write(soid.oid, ex_it->offset,
1442 ex_it->length, soid.snap)) {
1443 ldout(cct, 20) << "readx may copy on write" << dendl;
1444 bool wait = false;
1445 list<BufferHead*> blist;
1446 for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1447 bh_it != o->data.end();
1448 ++bh_it) {
1449 BufferHead *bh = bh_it->second;
1450 if (bh->is_dirty() || bh->is_tx()) {
1451 ldout(cct, 10) << "readx flushing " << *bh << dendl;
1452 wait = true;
1453 if (bh->is_dirty()) {
1454 if (scattered_write)
1455 blist.push_back(bh);
1456 else
1457 bh_write(bh, *trace);
1458 }
1459 }
1460 }
1461 if (scattered_write && !blist.empty())
1462 bh_write_scattered(blist);
1463 if (wait) {
1464 ldout(cct, 10) << "readx waiting on tid " << o->last_write_tid
1465 << " on " << *o << dendl;
1466 o->waitfor_commit[o->last_write_tid].push_back(
1467 new C_RetryRead(this,rd, oset, onfinish, *trace));
1468 // FIXME: perfcounter!
1469 return 0;
1470 }
1471 }
1472
1473 // can we return ENOENT?
1474 bool allzero = true;
1475 for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1476 bh_it != o->data.end();
1477 ++bh_it) {
1478 ldout(cct, 20) << "readx ob has bh " << *bh_it->second << dendl;
1479 if (!bh_it->second->is_zero() && !bh_it->second->is_rx()) {
1480 allzero = false;
1481 break;
1482 }
1483 }
1484 if (allzero) {
1485 ldout(cct, 10) << "readx ob has all zero|rx, returning ENOENT"
1486 << dendl;
1487 delete rd;
1488 if (dontneed)
1489 bottouch_ob(o);
1490 return -ENOENT;
1491 }
1492 }
1493
1494 // map extent into bufferheads
1495 map<loff_t, BufferHead*> hits, missing, rx, errors;
1496 o->map_read(*ex_it, hits, missing, rx, errors);
1497 if (external_call) {
1498 // retry reading error buffers
1499 missing.insert(errors.begin(), errors.end());
1500 } else {
1501 // some reads had errors, fail later so completions
1502 // are cleaned up properly
1503 // TODO: make read path not call _readx for every completion
1504 hits.insert(errors.begin(), errors.end());
1505 }
1506
1507 if (!missing.empty() || !rx.empty()) {
1508 // read missing
1509 map<loff_t, BufferHead*>::iterator last = missing.end();
1510 for (map<loff_t, BufferHead*>::iterator bh_it = missing.begin();
1511 bh_it != missing.end();
1512 ++bh_it) {
1513 uint64_t rx_bytes = static_cast<uint64_t>(
1514 stat_rx + bh_it->second->length());
1515 bytes_not_in_cache += bh_it->second->length();
1516 if (!waitfor_read.empty() || (stat_rx > 0 && rx_bytes > max_size)) {
1517 // cache is full with concurrent reads -- wait for rx's to complete
1518 // to constrain memory growth (especially during copy-ups)
1519 if (success) {
1520 ldout(cct, 10) << "readx missed, waiting on cache to complete "
1521 << waitfor_read.size() << " blocked reads, "
1522 << (std::max(rx_bytes, max_size) - max_size)
1523 << " read bytes" << dendl;
1524 waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish,
1525 *trace));
1526 }
1527
1528 bh_remove(o, bh_it->second);
1529 delete bh_it->second;
1530 } else {
1531 bh_it->second->set_nocache(nocache);
1532 bh_read(bh_it->second, rd->fadvise_flags, *trace);
1533 if ((success && onfinish) || last != missing.end())
1534 last = bh_it;
1535 }
1536 success = false;
1537 }
1538
1539 //add wait in last bh avoid wakeup early. Because read is order
1540 if (last != missing.end()) {
1541 ldout(cct, 10) << "readx missed, waiting on " << *last->second
1542 << " off " << last->first << dendl;
1543 last->second->waitfor_read[last->first].push_back(
1544 new C_RetryRead(this, rd, oset, onfinish, *trace) );
1545
1546 }
1547
1548 // bump rx
1549 for (map<loff_t, BufferHead*>::iterator bh_it = rx.begin();
1550 bh_it != rx.end();
1551 ++bh_it) {
1552 touch_bh(bh_it->second); // bump in lru, so we don't lose it.
1553 if (success && onfinish) {
1554 ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
1555 << " off " << bh_it->first << dendl;
1556 bh_it->second->waitfor_read[bh_it->first].push_back(
1557 new C_RetryRead(this, rd, oset, onfinish, *trace) );
1558 }
1559 bytes_not_in_cache += bh_it->second->length();
1560 success = false;
1561 }
1562
1563 for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1564 bh_it != hits.end(); ++bh_it)
1565 //bump in lru, so we don't lose it when later read
1566 touch_bh(bh_it->second);
1567
1568 } else {
1569 ceph_assert(!hits.empty());
1570
1571 // make a plain list
1572 for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1573 bh_it != hits.end();
1574 ++bh_it) {
1575 BufferHead *bh = bh_it->second;
1576 ldout(cct, 10) << "readx hit bh " << *bh << dendl;
1577 if (bh->is_error() && bh->error)
1578 error = bh->error;
1579 bytes_in_cache += bh->length();
1580
1581 if (bh->get_nocache() && bh->is_clean())
1582 bh_lru_rest.lru_bottouch(bh);
1583 else
1584 touch_bh(bh);
1585 //must be after touch_bh because touch_bh set dontneed false
1586 if (dontneed &&
1587 ((loff_t)ex_it->offset <= bh->start() &&
1588 (bh->end() <=(loff_t)(ex_it->offset + ex_it->length)))) {
1589 bh->set_dontneed(true); //if dirty
1590 if (bh->is_clean())
1591 bh_lru_rest.lru_bottouch(bh);
1592 }
1593 }
1594
1595 if (!error) {
1596 // create reverse map of buffer offset -> object for the
1597 // eventual result. this is over a single ObjectExtent, so we
1598 // know that
1599 // - the bh's are contiguous
1600 // - the buffer frags need not be (and almost certainly aren't)
1601 loff_t opos = ex_it->offset;
1602 map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1603 ceph_assert(bh_it->second->start() <= opos);
1604 uint64_t bhoff = opos - bh_it->second->start();
1605 vector<pair<uint64_t,uint64_t> >::iterator f_it
1606 = ex_it->buffer_extents.begin();
1607 uint64_t foff = 0;
1608 while (1) {
1609 BufferHead *bh = bh_it->second;
1610 ceph_assert(opos == (loff_t)(bh->start() + bhoff));
1611
1612 uint64_t len = std::min(f_it->second - foff, bh->length() - bhoff);
1613 ldout(cct, 10) << "readx rmap opos " << opos << ": " << *bh << " +"
1614 << bhoff << " frag " << f_it->first << "~"
1615 << f_it->second << " +" << foff << "~" << len
1616 << dendl;
1617
1618 bufferlist bit;
1619 // put substr here first, since substr_of clobbers, and we
1620 // may get multiple bh's at this stripe_map position
1621 if (bh->is_zero()) {
1622 stripe_map[f_it->first].append_zero(len);
1623 } else {
1624 bit.substr_of(bh->bl,
1625 opos - bh->start(),
1626 len);
1627 stripe_map[f_it->first].claim_append(bit);
1628 }
1629
1630 opos += len;
1631 bhoff += len;
1632 foff += len;
1633 if (opos == bh->end()) {
1634 ++bh_it;
1635 bhoff = 0;
1636 }
1637 if (foff == f_it->second) {
1638 ++f_it;
1639 foff = 0;
1640 }
1641 if (bh_it == hits.end()) break;
1642 if (f_it == ex_it->buffer_extents.end())
1643 break;
1644 }
1645 ceph_assert(f_it == ex_it->buffer_extents.end());
1646 ceph_assert(opos == (loff_t)ex_it->offset + (loff_t)ex_it->length);
1647 }
1648
1649 if (dontneed && o->include_all_cached_data(ex_it->offset, ex_it->length))
1650 bottouch_ob(o);
1651 }
1652 }
1653
1654 if (!success) {
1655 if (perfcounter && external_call) {
1656 perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1657 perfcounter->inc(l_objectcacher_cache_bytes_miss, bytes_not_in_cache);
1658 perfcounter->inc(l_objectcacher_cache_ops_miss);
1659 }
1660 if (onfinish) {
1661 ldout(cct, 20) << "readx defer " << rd << dendl;
1662 } else {
1663 ldout(cct, 20) << "readx drop " << rd << " (no complete, but no waiter)"
1664 << dendl;
1665 delete rd;
1666 }
1667 return 0; // wait!
1668 }
1669 if (perfcounter && external_call) {
1670 perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1671 perfcounter->inc(l_objectcacher_cache_bytes_hit, bytes_in_cache);
1672 perfcounter->inc(l_objectcacher_cache_ops_hit);
1673 }
1674
1675 // no misses... success! do the read.
1676 ldout(cct, 10) << "readx has all buffers" << dendl;
1677
1678 // ok, assemble into result buffer.
1679 uint64_t pos = 0;
1680 if (rd->bl && !error) {
1681 rd->bl->clear();
1682 for (map<uint64_t,bufferlist>::iterator i = stripe_map.begin();
1683 i != stripe_map.end();
1684 ++i) {
1685 ceph_assert(pos == i->first);
1686 ldout(cct, 10) << "readx adding buffer len " << i->second.length()
1687 << " at " << pos << dendl;
1688 pos += i->second.length();
1689 rd->bl->claim_append(i->second);
1690 ceph_assert(rd->bl->length() == pos);
1691 }
1692 ldout(cct, 10) << "readx result is " << rd->bl->length() << dendl;
1693 } else if (!error) {
1694 ldout(cct, 10) << "readx no bufferlist ptr (readahead?), done." << dendl;
1695 map<uint64_t,bufferlist>::reverse_iterator i = stripe_map.rbegin();
1696 pos = i->first + i->second.length();
1697 }
1698
1699 // done with read.
1700 int ret = error ? error : pos;
1701 ldout(cct, 20) << "readx done " << rd << " " << ret << dendl;
1702 ceph_assert(pos <= (uint64_t) INT_MAX);
1703
1704 delete rd;
1705
1706 trim();
1707
1708 return ret;
1709 }
1710
1711 void ObjectCacher::retry_waiting_reads()
1712 {
1713 list<Context *> ls;
1714 ls.swap(waitfor_read);
1715
1716 while (!ls.empty() && waitfor_read.empty()) {
1717 Context *ctx = ls.front();
1718 ls.pop_front();
1719 ctx->complete(0);
1720 }
1721 waitfor_read.splice(waitfor_read.end(), ls);
1722 }
1723
1724 int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
1725 ZTracer::Trace *parent_trace)
1726 {
1727 ceph_assert(ceph_mutex_is_locked(lock));
1728 ceph::real_time now = ceph::real_clock::now();
1729 uint64_t bytes_written = 0;
1730 uint64_t bytes_written_in_flush = 0;
1731 bool dontneed = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1732 bool nocache = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1733
1734 ZTracer::Trace trace;
1735 if (parent_trace != nullptr) {
1736 trace.init("write", &trace_endpoint, parent_trace);
1737 trace.event("start");
1738 }
1739
1740 list<Context*> wait_for_reads;
1741 for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
1742 ex_it != wr->extents.end();
1743 ++ex_it) {
1744 // get object cache
1745 sobject_t soid(ex_it->oid, CEPH_NOSNAP);
1746 Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1747 ex_it->truncate_size, oset->truncate_seq);
1748
1749 // map it all into a single bufferhead.
1750 BufferHead *bh = o->map_write(*ex_it, wr->journal_tid);
1751 bool missing = bh->is_missing();
1752 bh->snapc = wr->snapc;
1753
1754 // readers that need to be woken up due to an overwrite
1755 for (auto& [_, wait_for_read] : bh->waitfor_read) {
1756 wait_for_reads.splice(wait_for_reads.end(), wait_for_read);
1757 }
1758 bh->waitfor_read.clear();
1759
1760 bytes_written += ex_it->length;
1761 if (bh->is_tx()) {
1762 bytes_written_in_flush += ex_it->length;
1763 }
1764
1765 // adjust buffer pointers (ie "copy" data into my cache)
1766 // this is over a single ObjectExtent, so we know that
1767 // - there is one contiguous bh
1768 // - the buffer frags need not be (and almost certainly aren't)
1769 // note: i assume striping is monotonic... no jumps backwards, ever!
1770 loff_t opos = ex_it->offset;
1771 for (vector<pair<uint64_t, uint64_t> >::iterator f_it
1772 = ex_it->buffer_extents.begin();
1773 f_it != ex_it->buffer_extents.end();
1774 ++f_it) {
1775 ldout(cct, 10) << "writex writing " << f_it->first << "~"
1776 << f_it->second << " into " << *bh << " at " << opos
1777 << dendl;
1778 uint64_t bhoff = opos - bh->start();
1779 ceph_assert(f_it->second <= bh->length() - bhoff);
1780
1781 // get the frag we're mapping in
1782 bufferlist frag;
1783 frag.substr_of(wr->bl, f_it->first, f_it->second);
1784
1785 // keep anything left of bhoff
1786 if (!bhoff)
1787 bh->bl.swap(frag);
1788 else
1789 bh->bl.claim_append(frag);
1790
1791 opos += f_it->second;
1792 }
1793
1794 // ok, now bh is dirty.
1795 mark_dirty(bh);
1796 if (dontneed)
1797 bh->set_dontneed(true);
1798 else if (nocache && missing)
1799 bh->set_nocache(true);
1800 else
1801 touch_bh(bh);
1802
1803 bh->last_write = now;
1804
1805 o->try_merge_bh(bh);
1806 }
1807
1808 if (perfcounter) {
1809 perfcounter->inc(l_objectcacher_data_written, bytes_written);
1810 if (bytes_written_in_flush) {
1811 perfcounter->inc(l_objectcacher_overwritten_in_flush,
1812 bytes_written_in_flush);
1813 }
1814 }
1815
1816 int r = _wait_for_write(wr, bytes_written, oset, &trace, onfreespace);
1817 delete wr;
1818
1819 finish_contexts(cct, wait_for_reads, 0);
1820
1821 //verify_stats();
1822 trim();
1823 return r;
1824 }
1825
1826 class ObjectCacher::C_WaitForWrite : public Context {
1827 public:
1828 C_WaitForWrite(ObjectCacher *oc, uint64_t len,
1829 const ZTracer::Trace &trace, Context *onfinish) :
1830 m_oc(oc), m_len(len), m_trace(trace), m_onfinish(onfinish) {}
1831 void finish(int r) override;
1832 private:
1833 ObjectCacher *m_oc;
1834 uint64_t m_len;
1835 ZTracer::Trace m_trace;
1836 Context *m_onfinish;
1837 };
1838
1839 void ObjectCacher::C_WaitForWrite::finish(int r)
1840 {
1841 std::lock_guard l(m_oc->lock);
1842 m_oc->_maybe_wait_for_writeback(m_len, &m_trace);
1843 m_onfinish->complete(r);
1844 }
1845
1846 void ObjectCacher::_maybe_wait_for_writeback(uint64_t len,
1847 ZTracer::Trace *trace)
1848 {
1849 ceph_assert(ceph_mutex_is_locked(lock));
1850 ceph::mono_time start = ceph::mono_clock::now();
1851 int blocked = 0;
1852 // wait for writeback?
1853 // - wait for dirty and tx bytes (relative to the max_dirty threshold)
1854 // - do not wait for bytes other waiters are waiting on. this means that
1855 // threads do not wait for each other. this effectively allows the cache
1856 // size to balloon proportional to the data that is in flight.
1857
1858 uint64_t max_dirty_bh = max_dirty >> BUFFER_MEMORY_WEIGHT;
1859 while (get_stat_dirty() + get_stat_tx() > 0 &&
1860 (((uint64_t)(get_stat_dirty() + get_stat_tx()) >=
1861 max_dirty + get_stat_dirty_waiting()) ||
1862 (dirty_or_tx_bh.size() >=
1863 max_dirty_bh + get_stat_nr_dirty_waiters()))) {
1864
1865 if (blocked == 0) {
1866 trace->event("start wait for writeback");
1867 }
1868 ldout(cct, 10) << __func__ << " waiting for dirty|tx "
1869 << (get_stat_dirty() + get_stat_tx()) << " >= max "
1870 << max_dirty << " + dirty_waiting "
1871 << get_stat_dirty_waiting() << dendl;
1872 flusher_cond.notify_all();
1873 stat_dirty_waiting += len;
1874 ++stat_nr_dirty_waiters;
1875 std::unique_lock l{lock, std::adopt_lock};
1876 stat_cond.wait(l);
1877 l.release();
1878 stat_dirty_waiting -= len;
1879 --stat_nr_dirty_waiters;
1880 ++blocked;
1881 ldout(cct, 10) << __func__ << " woke up" << dendl;
1882 }
1883 if (blocked > 0) {
1884 trace->event("finish wait for writeback");
1885 }
1886 if (blocked && perfcounter) {
1887 perfcounter->inc(l_objectcacher_write_ops_blocked);
1888 perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
1889 ceph::timespan blocked = ceph::mono_clock::now() - start;
1890 perfcounter->tinc(l_objectcacher_write_time_blocked, blocked);
1891 }
1892 }
1893
1894 // blocking wait for write.
1895 int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
1896 ZTracer::Trace *trace, Context *onfreespace)
1897 {
1898 ceph_assert(ceph_mutex_is_locked(lock));
1899 ceph_assert(trace != nullptr);
1900 int ret = 0;
1901
1902 if (max_dirty > 0 && !(wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_FUA)) {
1903 if (block_writes_upfront) {
1904 _maybe_wait_for_writeback(len, trace);
1905 if (onfreespace)
1906 onfreespace->complete(0);
1907 } else {
1908 ceph_assert(onfreespace);
1909 finisher.queue(new C_WaitForWrite(this, len, *trace, onfreespace));
1910 }
1911 } else {
1912 // write-thru! flush what we just wrote.
1913 ceph::condition_variable cond;
1914 bool done = false;
1915 Context *fin = block_writes_upfront ?
1916 new C_Cond(cond, &done, &ret) : onfreespace;
1917 ceph_assert(fin);
1918 bool flushed = flush_set(oset, wr->extents, trace, fin);
1919 ceph_assert(!flushed); // we just dirtied it, and didn't drop our lock!
1920 ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len
1921 << " bytes" << dendl;
1922 if (block_writes_upfront) {
1923 std::unique_lock l{lock, std::adopt_lock};
1924 cond.wait(l, [&done] { return done; });
1925 l.release();
1926 ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl;
1927 if (onfreespace)
1928 onfreespace->complete(ret);
1929 }
1930 }
1931
1932 // start writeback anyway?
1933 if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty) {
1934 ldout(cct, 10) << "wait_for_write " << get_stat_dirty() << " > target "
1935 << target_dirty << ", nudging flusher" << dendl;
1936 flusher_cond.notify_all();
1937 }
1938 return ret;
1939 }
1940
1941 void ObjectCacher::flusher_entry()
1942 {
1943 ldout(cct, 10) << "flusher start" << dendl;
1944 std::unique_lock l{lock};
1945 while (!flusher_stop) {
1946 loff_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() +
1947 get_stat_dirty();
1948 ldout(cct, 11) << "flusher "
1949 << all << " / " << max_size << ": "
1950 << get_stat_tx() << " tx, "
1951 << get_stat_rx() << " rx, "
1952 << get_stat_clean() << " clean, "
1953 << get_stat_dirty() << " dirty ("
1954 << target_dirty << " target, "
1955 << max_dirty << " max)"
1956 << dendl;
1957 loff_t actual = get_stat_dirty() + get_stat_dirty_waiting();
1958
1959 ZTracer::Trace trace;
1960 if (cct->_conf->osdc_blkin_trace_all) {
1961 trace.init("flusher", &trace_endpoint);
1962 trace.event("start");
1963 }
1964
1965 if (actual > 0 && (uint64_t) actual > target_dirty) {
1966 // flush some dirty pages
1967 ldout(cct, 10) << "flusher " << get_stat_dirty() << " dirty + "
1968 << get_stat_dirty_waiting() << " dirty_waiting > target "
1969 << target_dirty << ", flushing some dirty bhs" << dendl;
1970 flush(&trace, actual - target_dirty);
1971 } else {
1972 // check tail of lru for old dirty items
1973 ceph::real_time cutoff = ceph::real_clock::now();
1974 cutoff -= max_dirty_age;
1975 BufferHead *bh = 0;
1976 int max = MAX_FLUSH_UNDER_LOCK;
1977 while ((bh = static_cast<BufferHead*>(bh_lru_dirty.
1978 lru_get_next_expire())) != 0 &&
1979 bh->last_write <= cutoff &&
1980 max > 0) {
1981 ldout(cct, 10) << "flusher flushing aged dirty bh " << *bh << dendl;
1982 if (scattered_write) {
1983 bh_write_adjacencies(bh, cutoff, NULL, &max);
1984 } else {
1985 bh_write(bh, trace);
1986 --max;
1987 }
1988 }
1989 if (!max) {
1990 // back off the lock to avoid starving other threads
1991 trace.event("backoff");
1992 l.unlock();
1993 l.lock();
1994 continue;
1995 }
1996 }
1997
1998 trace.event("finish");
1999 if (flusher_stop)
2000 break;
2001
2002 flusher_cond.wait_for(l, 1s);
2003 }
2004
2005 /* Wait for reads to finish. This is only possible if handling
2006 * -ENOENT made some read completions finish before their rados read
2007 * came back. If we don't wait for them, and destroy the cache, when
2008 * the rados reads do come back their callback will try to access the
2009 * no-longer-valid ObjectCacher.
2010 */
2011 read_cond.wait(l, [this] {
2012 if (reads_outstanding > 0) {
2013 ldout(cct, 10) << "Waiting for all reads to complete. Number left: "
2014 << reads_outstanding << dendl;
2015 return false;
2016 } else {
2017 return true;
2018 }
2019 });
2020 ldout(cct, 10) << "flusher finish" << dendl;
2021 }
2022
2023
2024 // -------------------------------------------------
2025
2026 bool ObjectCacher::set_is_empty(ObjectSet *oset)
2027 {
2028 ceph_assert(ceph_mutex_is_locked(lock));
2029 if (oset->objects.empty())
2030 return true;
2031
2032 for (xlist<Object*>::iterator p = oset->objects.begin(); !p.end(); ++p)
2033 if (!(*p)->is_empty())
2034 return false;
2035
2036 return true;
2037 }
2038
2039 bool ObjectCacher::set_is_cached(ObjectSet *oset)
2040 {
2041 ceph_assert(ceph_mutex_is_locked(lock));
2042 if (oset->objects.empty())
2043 return false;
2044
2045 for (xlist<Object*>::iterator p = oset->objects.begin();
2046 !p.end(); ++p) {
2047 Object *ob = *p;
2048 for (map<loff_t,BufferHead*>::iterator q = ob->data.begin();
2049 q != ob->data.end();
2050 ++q) {
2051 BufferHead *bh = q->second;
2052 if (!bh->is_dirty() && !bh->is_tx())
2053 return true;
2054 }
2055 }
2056
2057 return false;
2058 }
2059
2060 bool ObjectCacher::set_is_dirty_or_committing(ObjectSet *oset)
2061 {
2062 ceph_assert(ceph_mutex_is_locked(lock));
2063 if (oset->objects.empty())
2064 return false;
2065
2066 for (xlist<Object*>::iterator i = oset->objects.begin();
2067 !i.end(); ++i) {
2068 Object *ob = *i;
2069
2070 for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
2071 p != ob->data.end();
2072 ++p) {
2073 BufferHead *bh = p->second;
2074 if (bh->is_dirty() || bh->is_tx())
2075 return true;
2076 }
2077 }
2078
2079 return false;
2080 }
2081
2082
2083 // purge. non-blocking. violently removes dirty buffers from cache.
2084 void ObjectCacher::purge(Object *ob)
2085 {
2086 ceph_assert(ceph_mutex_is_locked(lock));
2087 ldout(cct, 10) << "purge " << *ob << dendl;
2088
2089 ob->truncate(0);
2090 }
2091
2092
2093 // flush. non-blocking. no callback.
2094 // true if clean, already flushed.
2095 // false if we wrote something.
2096 // be sloppy about the ranges and flush any buffer it touches
2097 bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length,
2098 ZTracer::Trace *trace)
2099 {
2100 ceph_assert(trace != nullptr);
2101 ceph_assert(ceph_mutex_is_locked(lock));
2102 list<BufferHead*> blist;
2103 bool clean = true;
2104 ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl;
2105 for (map<loff_t,BufferHead*>::const_iterator p = ob->data_lower_bound(offset);
2106 p != ob->data.end();
2107 ++p) {
2108 BufferHead *bh = p->second;
2109 ldout(cct, 20) << "flush " << *bh << dendl;
2110 if (length && bh->start() > offset+length) {
2111 break;
2112 }
2113 if (bh->is_tx()) {
2114 clean = false;
2115 continue;
2116 }
2117 if (!bh->is_dirty()) {
2118 continue;
2119 }
2120
2121 if (scattered_write)
2122 blist.push_back(bh);
2123 else
2124 bh_write(bh, *trace);
2125 clean = false;
2126 }
2127 if (scattered_write && !blist.empty())
2128 bh_write_scattered(blist);
2129
2130 return clean;
2131 }
2132
2133 bool ObjectCacher::_flush_set_finish(C_GatherBuilder *gather,
2134 Context *onfinish)
2135 {
2136 ceph_assert(ceph_mutex_is_locked(lock));
2137 if (gather->has_subs()) {
2138 gather->set_finisher(onfinish);
2139 gather->activate();
2140 return false;
2141 }
2142
2143 ldout(cct, 10) << "flush_set has no dirty|tx bhs" << dendl;
2144 onfinish->complete(0);
2145 return true;
2146 }
2147
2148 // flush. non-blocking, takes callback.
2149 // returns true if already flushed
2150 bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
2151 {
2152 ceph_assert(ceph_mutex_is_locked(lock));
2153 ceph_assert(onfinish != NULL);
2154 if (oset->objects.empty()) {
2155 ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2156 onfinish->complete(0);
2157 return true;
2158 }
2159
2160 ldout(cct, 10) << "flush_set " << oset << dendl;
2161
2162 // we'll need to wait for all objects to flush!
2163 C_GatherBuilder gather(cct);
2164 set<Object*> waitfor_commit;
2165
2166 list<BufferHead*> blist;
2167 Object *last_ob = NULL;
2168 set<BufferHead*, BufferHead::ptr_lt>::const_iterator it, p, q;
2169
2170 // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
2171 // order. But items in oset->objects are not sorted. So the iterator can
2172 // point to any buffer head in the ObjectSet
2173 BufferHead key(*oset->objects.begin());
2174 it = dirty_or_tx_bh.lower_bound(&key);
2175 p = q = it;
2176
2177 bool backwards = true;
2178 if (it != dirty_or_tx_bh.begin())
2179 --it;
2180 else
2181 backwards = false;
2182
2183 for (; p != dirty_or_tx_bh.end(); p = q) {
2184 ++q;
2185 BufferHead *bh = *p;
2186 if (bh->ob->oset != oset)
2187 break;
2188 waitfor_commit.insert(bh->ob);
2189 if (bh->is_dirty()) {
2190 if (scattered_write) {
2191 if (last_ob != bh->ob) {
2192 if (!blist.empty()) {
2193 bh_write_scattered(blist);
2194 blist.clear();
2195 }
2196 last_ob = bh->ob;
2197 }
2198 blist.push_back(bh);
2199 } else {
2200 bh_write(bh, {});
2201 }
2202 }
2203 }
2204
2205 if (backwards) {
2206 for(p = q = it; true; p = q) {
2207 if (q != dirty_or_tx_bh.begin())
2208 --q;
2209 else
2210 backwards = false;
2211 BufferHead *bh = *p;
2212 if (bh->ob->oset != oset)
2213 break;
2214 waitfor_commit.insert(bh->ob);
2215 if (bh->is_dirty()) {
2216 if (scattered_write) {
2217 if (last_ob != bh->ob) {
2218 if (!blist.empty()) {
2219 bh_write_scattered(blist);
2220 blist.clear();
2221 }
2222 last_ob = bh->ob;
2223 }
2224 blist.push_front(bh);
2225 } else {
2226 bh_write(bh, {});
2227 }
2228 }
2229 if (!backwards)
2230 break;
2231 }
2232 }
2233
2234 if (scattered_write && !blist.empty())
2235 bh_write_scattered(blist);
2236
2237 for (set<Object*>::iterator i = waitfor_commit.begin();
2238 i != waitfor_commit.end(); ++i) {
2239 Object *ob = *i;
2240
2241 // we'll need to gather...
2242 ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2243 << ob->last_write_tid << " on " << *ob << dendl;
2244 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2245 }
2246
2247 return _flush_set_finish(&gather, onfinish);
2248 }
2249
2250 // flush. non-blocking, takes callback.
2251 // returns true if already flushed
2252 bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv,
2253 ZTracer::Trace *trace, Context *onfinish)
2254 {
2255 ceph_assert(ceph_mutex_is_locked(lock));
2256 ceph_assert(trace != nullptr);
2257 ceph_assert(onfinish != NULL);
2258 if (oset->objects.empty()) {
2259 ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2260 onfinish->complete(0);
2261 return true;
2262 }
2263
2264 ldout(cct, 10) << "flush_set " << oset << " on " << exv.size()
2265 << " ObjectExtents" << dendl;
2266
2267 // we'll need to wait for all objects to flush!
2268 C_GatherBuilder gather(cct);
2269
2270 for (vector<ObjectExtent>::iterator p = exv.begin();
2271 p != exv.end();
2272 ++p) {
2273 ObjectExtent &ex = *p;
2274 sobject_t soid(ex.oid, CEPH_NOSNAP);
2275 if (objects[oset->poolid].count(soid) == 0)
2276 continue;
2277 Object *ob = objects[oset->poolid][soid];
2278
2279 ldout(cct, 20) << "flush_set " << oset << " ex " << ex << " ob " << soid
2280 << " " << ob << dendl;
2281
2282 if (!flush(ob, ex.offset, ex.length, trace)) {
2283 // we'll need to gather...
2284 ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2285 << ob->last_write_tid << " on " << *ob << dendl;
2286 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2287 }
2288 }
2289
2290 return _flush_set_finish(&gather, onfinish);
2291 }
2292
2293 // flush all dirty data. non-blocking, takes callback.
2294 // returns true if already flushed
2295 bool ObjectCacher::flush_all(Context *onfinish)
2296 {
2297 ceph_assert(ceph_mutex_is_locked(lock));
2298 ceph_assert(onfinish != NULL);
2299
2300 ldout(cct, 10) << "flush_all " << dendl;
2301
2302 // we'll need to wait for all objects to flush!
2303 C_GatherBuilder gather(cct);
2304 set<Object*> waitfor_commit;
2305
2306 list<BufferHead*> blist;
2307 Object *last_ob = NULL;
2308 set<BufferHead*, BufferHead::ptr_lt>::iterator next, it;
2309 next = it = dirty_or_tx_bh.begin();
2310 while (it != dirty_or_tx_bh.end()) {
2311 ++next;
2312 BufferHead *bh = *it;
2313 waitfor_commit.insert(bh->ob);
2314
2315 if (bh->is_dirty()) {
2316 if (scattered_write) {
2317 if (last_ob != bh->ob) {
2318 if (!blist.empty()) {
2319 bh_write_scattered(blist);
2320 blist.clear();
2321 }
2322 last_ob = bh->ob;
2323 }
2324 blist.push_back(bh);
2325 } else {
2326 bh_write(bh, {});
2327 }
2328 }
2329
2330 it = next;
2331 }
2332
2333 if (scattered_write && !blist.empty())
2334 bh_write_scattered(blist);
2335
2336 for (set<Object*>::iterator i = waitfor_commit.begin();
2337 i != waitfor_commit.end();
2338 ++i) {
2339 Object *ob = *i;
2340
2341 // we'll need to gather...
2342 ldout(cct, 10) << "flush_all will wait for ack tid "
2343 << ob->last_write_tid << " on " << *ob << dendl;
2344 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2345 }
2346
2347 return _flush_set_finish(&gather, onfinish);
2348 }
2349
2350 void ObjectCacher::purge_set(ObjectSet *oset)
2351 {
2352 ceph_assert(ceph_mutex_is_locked(lock));
2353 if (oset->objects.empty()) {
2354 ldout(cct, 10) << "purge_set on " << oset << " dne" << dendl;
2355 return;
2356 }
2357
2358 ldout(cct, 10) << "purge_set " << oset << dendl;
2359 const bool were_dirty = oset->dirty_or_tx > 0;
2360
2361 for (xlist<Object*>::iterator i = oset->objects.begin();
2362 !i.end(); ++i) {
2363 Object *ob = *i;
2364 purge(ob);
2365 }
2366
2367 // Although we have purged rather than flushed, caller should still
2368 // drop any resources associate with dirty data.
2369 ceph_assert(oset->dirty_or_tx == 0);
2370 if (flush_set_callback && were_dirty) {
2371 flush_set_callback(flush_set_callback_arg, oset);
2372 }
2373 }
2374
2375
2376 loff_t ObjectCacher::release(Object *ob)
2377 {
2378 ceph_assert(ceph_mutex_is_locked(lock));
2379 list<BufferHead*> clean;
2380 loff_t o_unclean = 0;
2381
2382 for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
2383 p != ob->data.end();
2384 ++p) {
2385 BufferHead *bh = p->second;
2386 if (bh->is_clean() || bh->is_zero() || bh->is_error())
2387 clean.push_back(bh);
2388 else
2389 o_unclean += bh->length();
2390 }
2391
2392 for (list<BufferHead*>::iterator p = clean.begin();
2393 p != clean.end();
2394 ++p) {
2395 bh_remove(ob, *p);
2396 delete *p;
2397 }
2398
2399 if (ob->can_close()) {
2400 ldout(cct, 10) << "release trimming " << *ob << dendl;
2401 close_object(ob);
2402 ceph_assert(o_unclean == 0);
2403 return 0;
2404 }
2405
2406 if (ob->complete) {
2407 ldout(cct, 10) << "release clearing complete on " << *ob << dendl;
2408 ob->complete = false;
2409 }
2410 if (!ob->exists) {
2411 ldout(cct, 10) << "release setting exists on " << *ob << dendl;
2412 ob->exists = true;
2413 }
2414
2415 return o_unclean;
2416 }
2417
2418 loff_t ObjectCacher::release_set(ObjectSet *oset)
2419 {
2420 ceph_assert(ceph_mutex_is_locked(lock));
2421 // return # bytes not clean (and thus not released).
2422 loff_t unclean = 0;
2423
2424 if (oset->objects.empty()) {
2425 ldout(cct, 10) << "release_set on " << oset << " dne" << dendl;
2426 return 0;
2427 }
2428
2429 ldout(cct, 10) << "release_set " << oset << dendl;
2430
2431 xlist<Object*>::iterator q;
2432 for (xlist<Object*>::iterator p = oset->objects.begin();
2433 !p.end(); ) {
2434 q = p;
2435 ++q;
2436 Object *ob = *p;
2437
2438 loff_t o_unclean = release(ob);
2439 unclean += o_unclean;
2440
2441 if (o_unclean)
2442 ldout(cct, 10) << "release_set " << oset << " " << *ob
2443 << " has " << o_unclean << " bytes left"
2444 << dendl;
2445 p = q;
2446 }
2447
2448 if (unclean) {
2449 ldout(cct, 10) << "release_set " << oset
2450 << ", " << unclean << " bytes left" << dendl;
2451 }
2452
2453 return unclean;
2454 }
2455
2456
2457 uint64_t ObjectCacher::release_all()
2458 {
2459 ceph_assert(ceph_mutex_is_locked(lock));
2460 ldout(cct, 10) << "release_all" << dendl;
2461 uint64_t unclean = 0;
2462
2463 vector<ceph::unordered_map<sobject_t, Object*> >::iterator i
2464 = objects.begin();
2465 while (i != objects.end()) {
2466 ceph::unordered_map<sobject_t, Object*>::iterator p = i->begin();
2467 while (p != i->end()) {
2468 ceph::unordered_map<sobject_t, Object*>::iterator n = p;
2469 ++n;
2470
2471 Object *ob = p->second;
2472
2473 loff_t o_unclean = release(ob);
2474 unclean += o_unclean;
2475
2476 if (o_unclean)
2477 ldout(cct, 10) << "release_all " << *ob
2478 << " has " << o_unclean << " bytes left"
2479 << dendl;
2480 p = n;
2481 }
2482 ++i;
2483 }
2484
2485 if (unclean) {
2486 ldout(cct, 10) << "release_all unclean " << unclean << " bytes left"
2487 << dendl;
2488 }
2489
2490 return unclean;
2491 }
2492
2493 void ObjectCacher::clear_nonexistence(ObjectSet *oset)
2494 {
2495 ceph_assert(ceph_mutex_is_locked(lock));
2496 ldout(cct, 10) << "clear_nonexistence() " << oset << dendl;
2497
2498 for (xlist<Object*>::iterator p = oset->objects.begin();
2499 !p.end(); ++p) {
2500 Object *ob = *p;
2501 if (!ob->exists) {
2502 ldout(cct, 10) << " setting exists and complete on " << *ob << dendl;
2503 ob->exists = true;
2504 ob->complete = false;
2505 }
2506 for (xlist<C_ReadFinish*>::iterator q = ob->reads.begin();
2507 !q.end(); ++q) {
2508 C_ReadFinish *comp = *q;
2509 comp->distrust_enoent();
2510 }
2511 }
2512 }
2513
2514 /**
2515 * discard object extents from an ObjectSet by removing the objects in
2516 * exls from the in-memory oset.
2517 */
2518 void ObjectCacher::discard_set(ObjectSet *oset, const vector<ObjectExtent>& exls)
2519 {
2520 ceph_assert(ceph_mutex_is_locked(lock));
2521 bool was_dirty = oset->dirty_or_tx > 0;
2522
2523 _discard(oset, exls, nullptr);
2524 _discard_finish(oset, was_dirty, nullptr);
2525 }
2526
2527 /**
2528 * discard object extents from an ObjectSet by removing the objects in
2529 * exls from the in-memory oset. If the bh is in TX state, the discard
2530 * will wait for the write to commit prior to invoking on_finish.
2531 */
2532 void ObjectCacher::discard_writeback(ObjectSet *oset,
2533 const vector<ObjectExtent>& exls,
2534 Context* on_finish)
2535 {
2536 ceph_assert(ceph_mutex_is_locked(lock));
2537 bool was_dirty = oset->dirty_or_tx > 0;
2538
2539 C_GatherBuilder gather(cct);
2540 _discard(oset, exls, &gather);
2541
2542 if (gather.has_subs()) {
2543 bool flushed = was_dirty && oset->dirty_or_tx == 0;
2544 gather.set_finisher(new LambdaContext(
2545 [this, oset, flushed, on_finish](int) {
2546 ceph_assert(ceph_mutex_is_locked(lock));
2547 if (flushed && flush_set_callback)
2548 flush_set_callback(flush_set_callback_arg, oset);
2549 if (on_finish)
2550 on_finish->complete(0);
2551 }));
2552 gather.activate();
2553 return;
2554 }
2555
2556 _discard_finish(oset, was_dirty, on_finish);
2557 }
2558
2559 void ObjectCacher::_discard(ObjectSet *oset, const vector<ObjectExtent>& exls,
2560 C_GatherBuilder* gather)
2561 {
2562 if (oset->objects.empty()) {
2563 ldout(cct, 10) << __func__ << " on " << oset << " dne" << dendl;
2564 return;
2565 }
2566
2567 ldout(cct, 10) << __func__ << " " << oset << dendl;
2568
2569 for (auto& ex : exls) {
2570 ldout(cct, 10) << __func__ << " " << oset << " ex " << ex << dendl;
2571 sobject_t soid(ex.oid, CEPH_NOSNAP);
2572 if (objects[oset->poolid].count(soid) == 0)
2573 continue;
2574 Object *ob = objects[oset->poolid][soid];
2575
2576 ob->discard(ex.offset, ex.length, gather);
2577 }
2578 }
2579
2580 void ObjectCacher::_discard_finish(ObjectSet *oset, bool was_dirty,
2581 Context* on_finish)
2582 {
2583 ceph_assert(ceph_mutex_is_locked(lock));
2584
2585 // did we truncate off dirty data?
2586 if (flush_set_callback && was_dirty && oset->dirty_or_tx == 0) {
2587 flush_set_callback(flush_set_callback_arg, oset);
2588 }
2589
2590 // notify that in-flight writeback has completed
2591 if (on_finish != nullptr) {
2592 on_finish->complete(0);
2593 }
2594 }
2595
2596 void ObjectCacher::verify_stats() const
2597 {
2598 ceph_assert(ceph_mutex_is_locked(lock));
2599 ldout(cct, 10) << "verify_stats" << dendl;
2600
2601 loff_t clean = 0, zero = 0, dirty = 0, rx = 0, tx = 0, missing = 0,
2602 error = 0;
2603 for (vector<ceph::unordered_map<sobject_t, Object*> >::const_iterator i
2604 = objects.begin();
2605 i != objects.end();
2606 ++i) {
2607 for (ceph::unordered_map<sobject_t, Object*>::const_iterator p
2608 = i->begin();
2609 p != i->end();
2610 ++p) {
2611 Object *ob = p->second;
2612 for (map<loff_t, BufferHead*>::const_iterator q = ob->data.begin();
2613 q != ob->data.end();
2614 ++q) {
2615 BufferHead *bh = q->second;
2616 switch (bh->get_state()) {
2617 case BufferHead::STATE_MISSING:
2618 missing += bh->length();
2619 break;
2620 case BufferHead::STATE_CLEAN:
2621 clean += bh->length();
2622 break;
2623 case BufferHead::STATE_ZERO:
2624 zero += bh->length();
2625 break;
2626 case BufferHead::STATE_DIRTY:
2627 dirty += bh->length();
2628 break;
2629 case BufferHead::STATE_TX:
2630 tx += bh->length();
2631 break;
2632 case BufferHead::STATE_RX:
2633 rx += bh->length();
2634 break;
2635 case BufferHead::STATE_ERROR:
2636 error += bh->length();
2637 break;
2638 default:
2639 ceph_abort();
2640 }
2641 }
2642 }
2643 }
2644
2645 ldout(cct, 10) << " clean " << clean << " rx " << rx << " tx " << tx
2646 << " dirty " << dirty << " missing " << missing
2647 << " error " << error << dendl;
2648 ceph_assert(clean == stat_clean);
2649 ceph_assert(rx == stat_rx);
2650 ceph_assert(tx == stat_tx);
2651 ceph_assert(dirty == stat_dirty);
2652 ceph_assert(missing == stat_missing);
2653 ceph_assert(zero == stat_zero);
2654 ceph_assert(error == stat_error);
2655 }
2656
2657 void ObjectCacher::bh_stat_add(BufferHead *bh)
2658 {
2659 ceph_assert(ceph_mutex_is_locked(lock));
2660 switch (bh->get_state()) {
2661 case BufferHead::STATE_MISSING:
2662 stat_missing += bh->length();
2663 break;
2664 case BufferHead::STATE_CLEAN:
2665 stat_clean += bh->length();
2666 break;
2667 case BufferHead::STATE_ZERO:
2668 stat_zero += bh->length();
2669 break;
2670 case BufferHead::STATE_DIRTY:
2671 stat_dirty += bh->length();
2672 bh->ob->dirty_or_tx += bh->length();
2673 bh->ob->oset->dirty_or_tx += bh->length();
2674 break;
2675 case BufferHead::STATE_TX:
2676 stat_tx += bh->length();
2677 bh->ob->dirty_or_tx += bh->length();
2678 bh->ob->oset->dirty_or_tx += bh->length();
2679 break;
2680 case BufferHead::STATE_RX:
2681 stat_rx += bh->length();
2682 break;
2683 case BufferHead::STATE_ERROR:
2684 stat_error += bh->length();
2685 break;
2686 default:
2687 ceph_abort_msg("bh_stat_add: invalid bufferhead state");
2688 }
2689 if (get_stat_dirty_waiting() > 0)
2690 stat_cond.notify_all();
2691 }
2692
2693 void ObjectCacher::bh_stat_sub(BufferHead *bh)
2694 {
2695 ceph_assert(ceph_mutex_is_locked(lock));
2696 switch (bh->get_state()) {
2697 case BufferHead::STATE_MISSING:
2698 stat_missing -= bh->length();
2699 break;
2700 case BufferHead::STATE_CLEAN:
2701 stat_clean -= bh->length();
2702 break;
2703 case BufferHead::STATE_ZERO:
2704 stat_zero -= bh->length();
2705 break;
2706 case BufferHead::STATE_DIRTY:
2707 stat_dirty -= bh->length();
2708 bh->ob->dirty_or_tx -= bh->length();
2709 bh->ob->oset->dirty_or_tx -= bh->length();
2710 break;
2711 case BufferHead::STATE_TX:
2712 stat_tx -= bh->length();
2713 bh->ob->dirty_or_tx -= bh->length();
2714 bh->ob->oset->dirty_or_tx -= bh->length();
2715 break;
2716 case BufferHead::STATE_RX:
2717 stat_rx -= bh->length();
2718 break;
2719 case BufferHead::STATE_ERROR:
2720 stat_error -= bh->length();
2721 break;
2722 default:
2723 ceph_abort_msg("bh_stat_sub: invalid bufferhead state");
2724 }
2725 }
2726
2727 void ObjectCacher::bh_set_state(BufferHead *bh, int s)
2728 {
2729 ceph_assert(ceph_mutex_is_locked(lock));
2730 int state = bh->get_state();
2731 // move between lru lists?
2732 if (s == BufferHead::STATE_DIRTY && state != BufferHead::STATE_DIRTY) {
2733 bh_lru_rest.lru_remove(bh);
2734 bh_lru_dirty.lru_insert_top(bh);
2735 } else if (s != BufferHead::STATE_DIRTY &&state == BufferHead::STATE_DIRTY) {
2736 bh_lru_dirty.lru_remove(bh);
2737 if (bh->get_dontneed())
2738 bh_lru_rest.lru_insert_bot(bh);
2739 else
2740 bh_lru_rest.lru_insert_top(bh);
2741 }
2742
2743 if ((s == BufferHead::STATE_TX ||
2744 s == BufferHead::STATE_DIRTY) &&
2745 state != BufferHead::STATE_TX &&
2746 state != BufferHead::STATE_DIRTY) {
2747 dirty_or_tx_bh.insert(bh);
2748 } else if ((state == BufferHead::STATE_TX ||
2749 state == BufferHead::STATE_DIRTY) &&
2750 s != BufferHead::STATE_TX &&
2751 s != BufferHead::STATE_DIRTY) {
2752 dirty_or_tx_bh.erase(bh);
2753 }
2754
2755 if (s != BufferHead::STATE_ERROR &&
2756 state == BufferHead::STATE_ERROR) {
2757 bh->error = 0;
2758 }
2759
2760 // set state
2761 bh_stat_sub(bh);
2762 bh->set_state(s);
2763 bh_stat_add(bh);
2764 }
2765
2766 void ObjectCacher::bh_add(Object *ob, BufferHead *bh)
2767 {
2768 ceph_assert(ceph_mutex_is_locked(lock));
2769 ldout(cct, 30) << "bh_add " << *ob << " " << *bh << dendl;
2770 ob->add_bh(bh);
2771 if (bh->is_dirty()) {
2772 bh_lru_dirty.lru_insert_top(bh);
2773 dirty_or_tx_bh.insert(bh);
2774 } else {
2775 if (bh->get_dontneed())
2776 bh_lru_rest.lru_insert_bot(bh);
2777 else
2778 bh_lru_rest.lru_insert_top(bh);
2779 }
2780
2781 if (bh->is_tx()) {
2782 dirty_or_tx_bh.insert(bh);
2783 }
2784 bh_stat_add(bh);
2785 }
2786
2787 void ObjectCacher::bh_remove(Object *ob, BufferHead *bh)
2788 {
2789 ceph_assert(ceph_mutex_is_locked(lock));
2790 ceph_assert(bh->get_journal_tid() == 0);
2791 ldout(cct, 30) << "bh_remove " << *ob << " " << *bh << dendl;
2792 ob->remove_bh(bh);
2793 if (bh->is_dirty()) {
2794 bh_lru_dirty.lru_remove(bh);
2795 dirty_or_tx_bh.erase(bh);
2796 } else {
2797 bh_lru_rest.lru_remove(bh);
2798 }
2799
2800 if (bh->is_tx()) {
2801 dirty_or_tx_bh.erase(bh);
2802 }
2803 bh_stat_sub(bh);
2804 if (get_stat_dirty_waiting() > 0)
2805 stat_cond.notify_all();
2806 }
2807