]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/ObjectCacher.cc
update sources to 12.2.7
[ceph.git] / ceph / src / osdc / ObjectCacher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <limits.h>
5
6#include "msg/Messenger.h"
7#include "ObjectCacher.h"
8#include "WritebackHandler.h"
9#include "common/errno.h"
10#include "common/perf_counters.h"
11
12#include "include/assert.h"
13
14#define MAX_FLUSH_UNDER_LOCK 20 ///< max bh's we start writeback on
3efd9988 15#define BUFFER_MEMORY_WEIGHT 12 // memory usage of BufferHead, count in (1<<n)
7c673cae
FG
16
17using std::chrono::seconds;
18 /// while holding the lock
19
20/*** ObjectCacher::BufferHead ***/
21
22
23/*** ObjectCacher::Object ***/
24
25#define dout_subsys ceph_subsys_objectcacher
26#undef dout_prefix
27#define dout_prefix *_dout << "objectcacher.object(" << oid << ") "
28
29
30
31class ObjectCacher::C_ReadFinish : public Context {
32 ObjectCacher *oc;
33 int64_t poolid;
34 sobject_t oid;
35 loff_t start;
36 uint64_t length;
37 xlist<C_ReadFinish*>::item set_item;
38 bool trust_enoent;
39 ceph_tid_t tid;
31f18b77 40 ZTracer::Trace trace;
7c673cae
FG
41
42public:
43 bufferlist bl;
44 C_ReadFinish(ObjectCacher *c, Object *ob, ceph_tid_t t, loff_t s,
31f18b77 45 uint64_t l, const ZTracer::Trace &trace) :
7c673cae
FG
46 oc(c), poolid(ob->oloc.pool), oid(ob->get_soid()), start(s), length(l),
47 set_item(this), trust_enoent(true),
31f18b77 48 tid(t), trace(trace) {
7c673cae
FG
49 ob->reads.push_back(&set_item);
50 }
51
52 void finish(int r) override {
53 oc->bh_read_finish(poolid, oid, tid, start, length, bl, r, trust_enoent);
31f18b77 54 trace.event("finish");
7c673cae
FG
55
56 // object destructor clears the list
57 if (set_item.is_on_list())
58 set_item.remove_myself();
59 }
60
61 void distrust_enoent() {
62 trust_enoent = false;
63 }
64};
65
66class ObjectCacher::C_RetryRead : public Context {
67 ObjectCacher *oc;
68 OSDRead *rd;
69 ObjectSet *oset;
70 Context *onfinish;
31f18b77 71 ZTracer::Trace trace;
7c673cae 72public:
31f18b77
FG
73 C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c,
74 const ZTracer::Trace &trace)
75 : oc(_oc), rd(r), oset(os), onfinish(c), trace(trace) {
76 }
7c673cae 77 void finish(int r) override {
31f18b77
FG
78 if (r >= 0) {
79 r = oc->_readx(rd, oset, onfinish, false, &trace);
80 }
81
82 if (r == 0) {
83 // read is still in-progress
7c673cae
FG
84 return;
85 }
31f18b77
FG
86
87 trace.event("finish");
88 if (onfinish) {
89 onfinish->complete(r);
7c673cae
FG
90 }
91 }
92};
93
94ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left,
95 loff_t off)
96{
97 assert(oc->lock.is_locked());
98 ldout(oc->cct, 20) << "split " << *left << " at " << off << dendl;
99
100 // split off right
101 ObjectCacher::BufferHead *right = new BufferHead(this);
102
103 //inherit and if later access, this auto clean.
104 right->set_dontneed(left->get_dontneed());
105 right->set_nocache(left->get_nocache());
106
107 right->last_write_tid = left->last_write_tid;
108 right->last_read_tid = left->last_read_tid;
109 right->set_state(left->get_state());
110 right->snapc = left->snapc;
111 right->set_journal_tid(left->journal_tid);
112
113 loff_t newleftlen = off - left->start();
114 right->set_start(off);
115 right->set_length(left->length() - newleftlen);
116
117 // shorten left
118 oc->bh_stat_sub(left);
119 left->set_length(newleftlen);
120 oc->bh_stat_add(left);
121
122 // add right
123 oc->bh_add(this, right);
124
125 // split buffers too
126 bufferlist bl;
127 bl.claim(left->bl);
128 if (bl.length()) {
129 assert(bl.length() == (left->length() + right->length()));
130 right->bl.substr_of(bl, left->length(), right->length());
131 left->bl.substr_of(bl, 0, left->length());
132 }
133
134 // move read waiters
135 if (!left->waitfor_read.empty()) {
136 map<loff_t, list<Context*> >::iterator start_remove
137 = left->waitfor_read.begin();
138 while (start_remove != left->waitfor_read.end() &&
139 start_remove->first < right->start())
140 ++start_remove;
141 for (map<loff_t, list<Context*> >::iterator p = start_remove;
142 p != left->waitfor_read.end(); ++p) {
143 ldout(oc->cct, 20) << "split moving waiters at byte " << p->first
144 << " to right bh" << dendl;
145 right->waitfor_read[p->first].swap( p->second );
146 assert(p->second.empty());
147 }
148 left->waitfor_read.erase(start_remove, left->waitfor_read.end());
149 }
150
151 ldout(oc->cct, 20) << "split left is " << *left << dendl;
152 ldout(oc->cct, 20) << "split right is " << *right << dendl;
153 return right;
154}
155
156
157void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
158{
159 assert(oc->lock.is_locked());
7c673cae
FG
160
161 ldout(oc->cct, 10) << "merge_left " << *left << " + " << *right << dendl;
162 if (left->get_journal_tid() == 0) {
163 left->set_journal_tid(right->get_journal_tid());
164 }
165 right->set_journal_tid(0);
166
167 oc->bh_remove(this, right);
168 oc->bh_stat_sub(left);
169 left->set_length(left->length() + right->length());
170 oc->bh_stat_add(left);
171
172 // data
173 left->bl.claim_append(right->bl);
174
175 // version
176 // note: this is sorta busted, but should only be used for dirty buffers
177 left->last_write_tid = MAX( left->last_write_tid, right->last_write_tid );
178 left->last_write = MAX( left->last_write, right->last_write );
179
180 left->set_dontneed(right->get_dontneed() ? left->get_dontneed() : false);
181 left->set_nocache(right->get_nocache() ? left->get_nocache() : false);
182
183 // waiters
184 for (map<loff_t, list<Context*> >::iterator p = right->waitfor_read.begin();
185 p != right->waitfor_read.end();
186 ++p)
187 left->waitfor_read[p->first].splice(left->waitfor_read[p->first].begin(),
188 p->second );
189
190 // hose right
191 delete right;
192
193 ldout(oc->cct, 10) << "merge_left result " << *left << dendl;
194}
195
b32b8144
FG
196bool ObjectCacher::Object::can_merge_bh(BufferHead *left, BufferHead *right)
197{
198 if (left->end() != right->start() ||
199 left->get_state() != right->get_state() ||
200 !left->can_merge_journal(right))
201 return false;
202 if (left->is_tx() && left->last_write_tid != right->last_write_tid)
203 return false;
204 return true;
205}
206
7c673cae
FG
207void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
208{
209 assert(oc->lock.is_locked());
210 ldout(oc->cct, 10) << "try_merge_bh " << *bh << dendl;
211
212 // do not merge rx buffers; last_read_tid may not match
213 if (bh->is_rx())
214 return;
215
216 // to the left?
217 map<loff_t,BufferHead*>::iterator p = data.find(bh->start());
218 assert(p->second == bh);
219 if (p != data.begin()) {
220 --p;
b32b8144 221 if (can_merge_bh(p->second, bh)) {
7c673cae
FG
222 merge_left(p->second, bh);
223 bh = p->second;
224 } else {
225 ++p;
226 }
227 }
228 // to the right?
229 assert(p->second == bh);
230 ++p;
b32b8144 231 if (p != data.end() && can_merge_bh(bh, p->second))
7c673cae
FG
232 merge_left(bh, p->second);
233}
234
235/*
236 * count bytes we have cached in given range
237 */
238bool ObjectCacher::Object::is_cached(loff_t cur, loff_t left) const
239{
240 assert(oc->lock.is_locked());
241 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(cur);
242 while (left > 0) {
243 if (p == data.end())
244 return false;
245
246 if (p->first <= cur) {
247 // have part of it
248 loff_t lenfromcur = MIN(p->second->end() - cur, left);
249 cur += lenfromcur;
250 left -= lenfromcur;
251 ++p;
252 continue;
253 } else if (p->first > cur) {
254 // gap
255 return false;
256 } else
257 ceph_abort();
258 }
259
260 return true;
261}
262
263/*
264 * all cached data in this range[off, off+len]
265 */
266bool ObjectCacher::Object::include_all_cached_data(loff_t off, loff_t len)
267{
268 assert(oc->lock.is_locked());
269 if (data.empty())
270 return true;
271 map<loff_t, BufferHead*>::iterator first = data.begin();
272 map<loff_t, BufferHead*>::reverse_iterator last = data.rbegin();
273 if (first->second->start() >= off && last->second->end() <= (off + len))
274 return true;
275 else
276 return false;
277}
278
279/*
280 * map a range of bytes into buffer_heads.
281 * - create missing buffer_heads as necessary.
282 */
283int ObjectCacher::Object::map_read(ObjectExtent &ex,
284 map<loff_t, BufferHead*>& hits,
285 map<loff_t, BufferHead*>& missing,
286 map<loff_t, BufferHead*>& rx,
287 map<loff_t, BufferHead*>& errors)
288{
289 assert(oc->lock.is_locked());
31f18b77
FG
290 ldout(oc->cct, 10) << "map_read " << ex.oid << " "
291 << ex.offset << "~" << ex.length << dendl;
292
7c673cae
FG
293 loff_t cur = ex.offset;
294 loff_t left = ex.length;
295
296 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
297 while (left > 0) {
298 // at end?
299 if (p == data.end()) {
300 // rest is a miss.
301 BufferHead *n = new BufferHead(this);
302 n->set_start(cur);
303 n->set_length(left);
304 oc->bh_add(this, n);
305 if (complete) {
306 oc->mark_zero(n);
307 hits[cur] = n;
308 ldout(oc->cct, 20) << "map_read miss+complete+zero " << left << " left, " << *n << dendl;
309 } else {
310 missing[cur] = n;
311 ldout(oc->cct, 20) << "map_read miss " << left << " left, " << *n << dendl;
312 }
313 cur += left;
314 assert(cur == (loff_t)ex.offset + (loff_t)ex.length);
315 break; // no more.
316 }
31f18b77 317
7c673cae
FG
318 if (p->first <= cur) {
319 // have it (or part of it)
320 BufferHead *e = p->second;
321
322 if (e->is_clean() ||
323 e->is_dirty() ||
324 e->is_tx() ||
325 e->is_zero()) {
326 hits[cur] = e; // readable!
327 ldout(oc->cct, 20) << "map_read hit " << *e << dendl;
328 } else if (e->is_rx()) {
329 rx[cur] = e; // missing, not readable.
330 ldout(oc->cct, 20) << "map_read rx " << *e << dendl;
331 } else if (e->is_error()) {
332 errors[cur] = e;
333 ldout(oc->cct, 20) << "map_read error " << *e << dendl;
334 } else {
335 ceph_abort();
336 }
31f18b77 337
7c673cae
FG
338 loff_t lenfromcur = MIN(e->end() - cur, left);
339 cur += lenfromcur;
340 left -= lenfromcur;
341 ++p;
342 continue; // more?
31f18b77 343
7c673cae
FG
344 } else if (p->first > cur) {
345 // gap.. miss
346 loff_t next = p->first;
347 BufferHead *n = new BufferHead(this);
348 loff_t len = MIN(next - cur, left);
349 n->set_start(cur);
350 n->set_length(len);
351 oc->bh_add(this,n);
352 if (complete) {
353 oc->mark_zero(n);
354 hits[cur] = n;
355 ldout(oc->cct, 20) << "map_read gap+complete+zero " << *n << dendl;
356 } else {
357 missing[cur] = n;
358 ldout(oc->cct, 20) << "map_read gap " << *n << dendl;
359 }
360 cur += MIN(left, n->length());
361 left -= MIN(left, n->length());
362 continue; // more?
363 } else {
364 ceph_abort();
365 }
366 }
367 return 0;
368}
369
370void ObjectCacher::Object::audit_buffers()
371{
372 loff_t offset = 0;
373 for (map<loff_t, BufferHead*>::const_iterator it = data.begin();
374 it != data.end(); ++it) {
375 if (it->first != it->second->start()) {
376 lderr(oc->cct) << "AUDIT FAILURE: map position " << it->first
377 << " does not match bh start position: "
378 << *it->second << dendl;
379 assert(it->first == it->second->start());
380 }
381 if (it->first < offset) {
382 lderr(oc->cct) << "AUDIT FAILURE: " << it->first << " " << *it->second
383 << " overlaps with previous bh " << *((--it)->second)
384 << dendl;
385 assert(it->first >= offset);
386 }
387 BufferHead *bh = it->second;
388 map<loff_t, list<Context*> >::const_iterator w_it;
389 for (w_it = bh->waitfor_read.begin();
390 w_it != bh->waitfor_read.end(); ++w_it) {
391 if (w_it->first < bh->start() ||
392 w_it->first >= bh->start() + bh->length()) {
393 lderr(oc->cct) << "AUDIT FAILURE: waiter at " << w_it->first
394 << " is not within bh " << *bh << dendl;
395 assert(w_it->first >= bh->start());
396 assert(w_it->first < bh->start() + bh->length());
397 }
398 }
399 offset = it->first + it->second->length();
400 }
401}
402
403/*
404 * map a range of extents on an object's buffer cache.
405 * - combine any bh's we're writing into one
406 * - break up bufferheads that don't fall completely within the range
407 * //no! - return a bh that includes the write. may also include
408 * other dirty data to left and/or right.
409 */
410ObjectCacher::BufferHead *ObjectCacher::Object::map_write(ObjectExtent &ex,
31f18b77 411 ceph_tid_t tid)
7c673cae
FG
412{
413 assert(oc->lock.is_locked());
414 BufferHead *final = 0;
415
416 ldout(oc->cct, 10) << "map_write oex " << ex.oid
417 << " " << ex.offset << "~" << ex.length << dendl;
418
419 loff_t cur = ex.offset;
420 loff_t left = ex.length;
421
422 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
423 while (left > 0) {
424 loff_t max = left;
425
426 // at end ?
427 if (p == data.end()) {
428 if (final == NULL) {
429 final = new BufferHead(this);
430 replace_journal_tid(final, tid);
431 final->set_start( cur );
432 final->set_length( max );
433 oc->bh_add(this, final);
434 ldout(oc->cct, 10) << "map_write adding trailing bh " << *final << dendl;
435 } else {
436 oc->bh_stat_sub(final);
437 final->set_length(final->length() + max);
438 oc->bh_stat_add(final);
439 }
440 left -= max;
441 cur += max;
442 continue;
443 }
444
445 ldout(oc->cct, 10) << "cur is " << cur << ", p is " << *p->second << dendl;
446 //oc->verify_stats();
447
448 if (p->first <= cur) {
449 BufferHead *bh = p->second;
450 ldout(oc->cct, 10) << "map_write bh " << *bh << " intersected" << dendl;
451
452 if (p->first < cur) {
453 assert(final == 0);
454 if (cur + max >= bh->end()) {
455 // we want right bit (one splice)
456 final = split(bh, cur); // just split it, take right half.
457 replace_journal_tid(final, tid);
458 ++p;
459 assert(p->second == final);
460 } else {
461 // we want middle bit (two splices)
462 final = split(bh, cur);
463 ++p;
464 assert(p->second == final);
465 split(final, cur+max);
466 replace_journal_tid(final, tid);
467 }
468 } else {
469 assert(p->first == cur);
470 if (bh->length() <= max) {
471 // whole bufferhead, piece of cake.
472 } else {
473 // we want left bit (one splice)
474 split(bh, cur + max); // just split
475 }
476 if (final) {
477 oc->mark_dirty(bh);
478 oc->mark_dirty(final);
479 --p; // move iterator back to final
480 assert(p->second == final);
481 replace_journal_tid(bh, tid);
482 merge_left(final, bh);
483 } else {
484 final = bh;
485 replace_journal_tid(final, tid);
486 }
487 }
488
489 // keep going.
490 loff_t lenfromcur = final->end() - cur;
491 cur += lenfromcur;
492 left -= lenfromcur;
493 ++p;
494 continue;
495 } else {
496 // gap!
497 loff_t next = p->first;
498 loff_t glen = MIN(next - cur, max);
499 ldout(oc->cct, 10) << "map_write gap " << cur << "~" << glen << dendl;
500 if (final) {
501 oc->bh_stat_sub(final);
502 final->set_length(final->length() + glen);
503 oc->bh_stat_add(final);
504 } else {
505 final = new BufferHead(this);
506 replace_journal_tid(final, tid);
507 final->set_start( cur );
508 final->set_length( glen );
509 oc->bh_add(this, final);
510 }
511
512 cur += glen;
513 left -= glen;
514 continue; // more?
515 }
516 }
517
518 // set version
519 assert(final);
520 assert(final->get_journal_tid() == tid);
521 ldout(oc->cct, 10) << "map_write final is " << *final << dendl;
522
523 return final;
524}
525
526void ObjectCacher::Object::replace_journal_tid(BufferHead *bh,
527 ceph_tid_t tid) {
528 ceph_tid_t bh_tid = bh->get_journal_tid();
529
530 assert(tid == 0 || bh_tid <= tid);
531 if (bh_tid != 0 && bh_tid != tid) {
532 // inform journal that it should not expect a writeback from this extent
533 oc->writeback_handler.overwrite_extent(get_oid(), bh->start(),
534 bh->length(), bh_tid, tid);
535 }
536 bh->set_journal_tid(tid);
537}
538
539void ObjectCacher::Object::truncate(loff_t s)
540{
541 assert(oc->lock.is_locked());
542 ldout(oc->cct, 10) << "truncate " << *this << " to " << s << dendl;
543
544 while (!data.empty()) {
545 BufferHead *bh = data.rbegin()->second;
546 if (bh->end() <= s)
547 break;
548
549 // split bh at truncation point?
550 if (bh->start() < s) {
551 split(bh, s);
552 continue;
553 }
554
555 // remove bh entirely
556 assert(bh->start() >= s);
557 assert(bh->waitfor_read.empty());
558 replace_journal_tid(bh, 0);
559 oc->bh_remove(this, bh);
560 delete bh;
561 }
562}
563
28e407b8
AA
564void ObjectCacher::Object::discard(loff_t off, loff_t len,
565 C_GatherBuilder* commit_gather)
7c673cae
FG
566{
567 assert(oc->lock.is_locked());
568 ldout(oc->cct, 10) << "discard " << *this << " " << off << "~" << len
569 << dendl;
570
571 if (!exists) {
572 ldout(oc->cct, 10) << " setting exists on " << *this << dendl;
573 exists = true;
574 }
575 if (complete) {
576 ldout(oc->cct, 10) << " clearing complete on " << *this << dendl;
577 complete = false;
578 }
579
580 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(off);
581 while (p != data.end()) {
582 BufferHead *bh = p->second;
583 if (bh->start() >= off + len)
584 break;
585
586 // split bh at truncation point?
587 if (bh->start() < off) {
588 split(bh, off);
589 ++p;
590 continue;
591 }
592
593 assert(bh->start() >= off);
594 if (bh->end() > off + len) {
595 split(bh, off + len);
596 }
597
598 ++p;
599 ldout(oc->cct, 10) << "discard " << *this << " bh " << *bh << dendl;
7c673cae 600 replace_journal_tid(bh, 0);
28e407b8
AA
601
602 if (bh->is_tx() && commit_gather != nullptr) {
603 // wait for the writeback to commit
604 waitfor_commit[bh->last_write_tid].emplace_back(commit_gather->new_sub());
605 } else if (bh->is_rx()) {
606 // cannot remove bh with in-flight read, but we can ensure the
607 // read won't overwrite the discard
608 bh->last_read_tid = ++oc->last_read_tid;
609 bh->bl.clear();
610 bh->set_nocache(true);
611 oc->mark_zero(bh);
612 // we should mark all Rx bh to zero
613 continue;
614 } else {
615 assert(bh->waitfor_read.empty());
616 }
617
7c673cae
FG
618 oc->bh_remove(this, bh);
619 delete bh;
620 }
621}
622
623
624
625/*** ObjectCacher ***/
626
627#undef dout_prefix
628#define dout_prefix *_dout << "objectcacher "
629
630
631ObjectCacher::ObjectCacher(CephContext *cct_, string name,
632 WritebackHandler& wb, Mutex& l,
633 flush_set_callback_t flush_callback,
634 void *flush_callback_arg, uint64_t max_bytes,
635 uint64_t max_objects, uint64_t max_dirty,
636 uint64_t target_dirty, double max_dirty_age,
637 bool block_writes_upfront)
638 : perfcounter(NULL),
639 cct(cct_), writeback_handler(wb), name(name), lock(l),
640 max_dirty(max_dirty), target_dirty(target_dirty),
641 max_size(max_bytes), max_objects(max_objects),
642 max_dirty_age(ceph::make_timespan(max_dirty_age)),
643 block_writes_upfront(block_writes_upfront),
31f18b77 644 trace_endpoint("ObjectCacher"),
7c673cae
FG
645 flush_set_callback(flush_callback),
646 flush_set_callback_arg(flush_callback_arg),
647 last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct),
648 stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
3efd9988
FG
649 stat_missing(0), stat_error(0), stat_dirty_waiting(0),
650 stat_nr_dirty_waiters(0), reads_outstanding(0)
7c673cae
FG
651{
652 perf_start();
653 finisher.start();
654 scattered_write = writeback_handler.can_scattered_write();
655}
656
657ObjectCacher::~ObjectCacher()
658{
659 finisher.stop();
660 perf_stop();
661 // we should be empty.
662 for (vector<ceph::unordered_map<sobject_t, Object *> >::iterator i
663 = objects.begin();
664 i != objects.end();
665 ++i)
666 assert(i->empty());
667 assert(bh_lru_rest.lru_get_size() == 0);
668 assert(bh_lru_dirty.lru_get_size() == 0);
669 assert(ob_lru.lru_get_size() == 0);
670 assert(dirty_or_tx_bh.empty());
671}
672
673void ObjectCacher::perf_start()
674{
675 string n = "objectcacher-" + name;
676 PerfCountersBuilder plb(cct, n, l_objectcacher_first, l_objectcacher_last);
677
678 plb.add_u64_counter(l_objectcacher_cache_ops_hit,
679 "cache_ops_hit", "Hit operations");
680 plb.add_u64_counter(l_objectcacher_cache_ops_miss,
681 "cache_ops_miss", "Miss operations");
682 plb.add_u64_counter(l_objectcacher_cache_bytes_hit,
683 "cache_bytes_hit", "Hit data");
684 plb.add_u64_counter(l_objectcacher_cache_bytes_miss,
685 "cache_bytes_miss", "Miss data");
686 plb.add_u64_counter(l_objectcacher_data_read,
687 "data_read", "Read data");
688 plb.add_u64_counter(l_objectcacher_data_written,
689 "data_written", "Data written to cache");
690 plb.add_u64_counter(l_objectcacher_data_flushed,
691 "data_flushed", "Data flushed");
692 plb.add_u64_counter(l_objectcacher_overwritten_in_flush,
693 "data_overwritten_while_flushing",
694 "Data overwritten while flushing");
695 plb.add_u64_counter(l_objectcacher_write_ops_blocked, "write_ops_blocked",
696 "Write operations, delayed due to dirty limits");
697 plb.add_u64_counter(l_objectcacher_write_bytes_blocked,
698 "write_bytes_blocked",
699 "Write data blocked on dirty limit");
700 plb.add_time(l_objectcacher_write_time_blocked, "write_time_blocked",
701 "Time spent blocking a write due to dirty limits");
702
703 perfcounter = plb.create_perf_counters();
704 cct->get_perfcounters_collection()->add(perfcounter);
705}
706
707void ObjectCacher::perf_stop()
708{
709 assert(perfcounter);
710 cct->get_perfcounters_collection()->remove(perfcounter);
711 delete perfcounter;
712}
713
714/* private */
715ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid,
716 uint64_t object_no,
717 ObjectSet *oset,
718 object_locator_t &l,
719 uint64_t truncate_size,
720 uint64_t truncate_seq)
721{
722 // XXX: Add handling of nspace in object_locator_t in cache
723 assert(lock.is_locked());
724 // have it?
725 if ((uint32_t)l.pool < objects.size()) {
726 if (objects[l.pool].count(oid)) {
727 Object *o = objects[l.pool][oid];
728 o->object_no = object_no;
729 o->truncate_size = truncate_size;
730 o->truncate_seq = truncate_seq;
731 return o;
732 }
733 } else {
734 objects.resize(l.pool+1);
735 }
736
737 // create it.
738 Object *o = new Object(this, oid, object_no, oset, l, truncate_size,
739 truncate_seq);
740 objects[l.pool][oid] = o;
741 ob_lru.lru_insert_top(o);
742 return o;
743}
744
745void ObjectCacher::close_object(Object *ob)
746{
747 assert(lock.is_locked());
748 ldout(cct, 10) << "close_object " << *ob << dendl;
749 assert(ob->can_close());
750
751 // ok!
752 ob_lru.lru_remove(ob);
753 objects[ob->oloc.pool].erase(ob->get_soid());
754 ob->set_item.remove_myself();
755 delete ob;
756}
757
31f18b77
FG
758void ObjectCacher::bh_read(BufferHead *bh, int op_flags,
759 const ZTracer::Trace &parent_trace)
7c673cae
FG
760{
761 assert(lock.is_locked());
762 ldout(cct, 7) << "bh_read on " << *bh << " outstanding reads "
763 << reads_outstanding << dendl;
764
31f18b77
FG
765 ZTracer::Trace trace;
766 if (parent_trace.valid()) {
767 trace.init("", &trace_endpoint, &parent_trace);
768 trace.copy_name("bh_read " + bh->ob->get_oid().name);
769 trace.event("start");
770 }
771
7c673cae
FG
772 mark_rx(bh);
773 bh->last_read_tid = ++last_read_tid;
774
775 // finisher
776 C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob, bh->last_read_tid,
31f18b77 777 bh->start(), bh->length(), trace);
7c673cae
FG
778 // go
779 writeback_handler.read(bh->ob->get_oid(), bh->ob->get_object_number(),
780 bh->ob->get_oloc(), bh->start(), bh->length(),
781 bh->ob->get_snap(), &onfinish->bl,
782 bh->ob->truncate_size, bh->ob->truncate_seq,
31f18b77 783 op_flags, trace, onfinish);
7c673cae
FG
784
785 ++reads_outstanding;
786}
787
788void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid,
789 ceph_tid_t tid, loff_t start,
790 uint64_t length, bufferlist &bl, int r,
791 bool trust_enoent)
792{
793 assert(lock.is_locked());
794 ldout(cct, 7) << "bh_read_finish "
795 << oid
796 << " tid " << tid
797 << " " << start << "~" << length
798 << " (bl is " << bl.length() << ")"
799 << " returned " << r
800 << " outstanding reads " << reads_outstanding
801 << dendl;
802
803 if (r >= 0 && bl.length() < length) {
804 ldout(cct, 7) << "bh_read_finish " << oid << " padding " << start << "~"
805 << length << " with " << length - bl.length() << " bytes of zeroes"
806 << dendl;
807 bl.append_zero(length - bl.length());
808 }
809
810 list<Context*> ls;
811 int err = 0;
812
813 if (objects[poolid].count(oid) == 0) {
814 ldout(cct, 7) << "bh_read_finish no object cache" << dendl;
815 } else {
816 Object *ob = objects[poolid][oid];
817
818 if (r == -ENOENT && !ob->complete) {
819 // wake up *all* rx waiters, or else we risk reordering
820 // identical reads. e.g.
821 // read 1~1
822 // reply to unrelated 3~1 -> !exists
823 // read 1~1 -> immediate ENOENT
824 // reply to first 1~1 -> ooo ENOENT
825 bool allzero = true;
826 for (map<loff_t, BufferHead*>::iterator p = ob->data.begin();
827 p != ob->data.end(); ++p) {
828 BufferHead *bh = p->second;
829 for (map<loff_t, list<Context*> >::iterator p
830 = bh->waitfor_read.begin();
831 p != bh->waitfor_read.end();
832 ++p)
833 ls.splice(ls.end(), p->second);
834 bh->waitfor_read.clear();
835 if (!bh->is_zero() && !bh->is_rx())
836 allzero = false;
837 }
838
839 // just pass through and retry all waiters if we don't trust
840 // -ENOENT for this read
841 if (trust_enoent) {
842 ldout(cct, 7)
843 << "bh_read_finish ENOENT, marking complete and !exists on " << *ob
844 << dendl;
845 ob->complete = true;
846 ob->exists = false;
847
848 /* If all the bhs are effectively zero, get rid of them. All
849 * the waiters will be retried and get -ENOENT immediately, so
850 * it's safe to clean up the unneeded bh's now. Since we know
851 * it's safe to remove them now, do so, so they aren't hanging
852 *around waiting for more -ENOENTs from rados while the cache
853 * is being shut down.
854 *
855 * Only do this when all the bhs are rx or clean, to match the
856 * condition in _readx(). If there are any non-rx or non-clean
857 * bhs, _readx() will wait for the final result instead of
858 * returning -ENOENT immediately.
859 */
860 if (allzero) {
861 ldout(cct, 10)
862 << "bh_read_finish ENOENT and allzero, getting rid of "
863 << "bhs for " << *ob << dendl;
864 map<loff_t, BufferHead*>::iterator p = ob->data.begin();
865 while (p != ob->data.end()) {
866 BufferHead *bh = p->second;
867 // current iterator will be invalidated by bh_remove()
868 ++p;
869 bh_remove(ob, bh);
870 delete bh;
871 }
872 }
873 }
874 }
875
876 // apply to bh's!
877 loff_t opos = start;
878 while (true) {
879 map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(opos);
880 if (p == ob->data.end())
881 break;
882 if (opos >= start+(loff_t)length) {
883 ldout(cct, 20) << "break due to opos " << opos << " >= start+length "
884 << start << "+" << length << "=" << start+(loff_t)length
885 << dendl;
886 break;
887 }
888
889 BufferHead *bh = p->second;
890 ldout(cct, 20) << "checking bh " << *bh << dendl;
891
892 // finishers?
893 for (map<loff_t, list<Context*> >::iterator it
894 = bh->waitfor_read.begin();
895 it != bh->waitfor_read.end();
896 ++it)
897 ls.splice(ls.end(), it->second);
898 bh->waitfor_read.clear();
899
900 if (bh->start() > opos) {
901 ldout(cct, 1) << "bh_read_finish skipping gap "
902 << opos << "~" << bh->start() - opos
903 << dendl;
904 opos = bh->start();
905 continue;
906 }
907
908 if (!bh->is_rx()) {
909 ldout(cct, 10) << "bh_read_finish skipping non-rx " << *bh << dendl;
910 opos = bh->end();
911 continue;
912 }
913
914 if (bh->last_read_tid != tid) {
915 ldout(cct, 10) << "bh_read_finish bh->last_read_tid "
916 << bh->last_read_tid << " != tid " << tid
917 << ", skipping" << dendl;
918 opos = bh->end();
919 continue;
920 }
921
922 assert(opos >= bh->start());
923 assert(bh->start() == opos); // we don't merge rx bh's... yet!
924 assert(bh->length() <= start+(loff_t)length-opos);
925
926 if (bh->error < 0)
927 err = bh->error;
928
929 opos = bh->end();
930
931 if (r == -ENOENT) {
932 if (trust_enoent) {
933 ldout(cct, 10) << "bh_read_finish removing " << *bh << dendl;
934 bh_remove(ob, bh);
935 delete bh;
936 } else {
937 ldout(cct, 10) << "skipping unstrusted -ENOENT and will retry for "
938 << *bh << dendl;
939 }
940 continue;
941 }
942
943 if (r < 0) {
944 bh->error = r;
945 mark_error(bh);
946 } else {
947 bh->bl.substr_of(bl,
948 bh->start() - start,
949 bh->length());
950 mark_clean(bh);
951 }
952
953 ldout(cct, 10) << "bh_read_finish read " << *bh << dendl;
954
955 ob->try_merge_bh(bh);
956 }
957 }
958
959 // called with lock held.
960 ldout(cct, 20) << "finishing waiters " << ls << dendl;
961
962 finish_contexts(cct, ls, err);
963 retry_waiting_reads();
964
965 --reads_outstanding;
966 read_cond.Signal();
967}
968
969void ObjectCacher::bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
970 int64_t *max_amount, int *max_count)
971{
972 list<BufferHead*> blist;
973
974 int count = 0;
975 int64_t total_len = 0;
976 set<BufferHead*, BufferHead::ptr_lt>::iterator it = dirty_or_tx_bh.find(bh);
977 assert(it != dirty_or_tx_bh.end());
978 for (set<BufferHead*, BufferHead::ptr_lt>::iterator p = it;
979 p != dirty_or_tx_bh.end();
980 ++p) {
981 BufferHead *obh = *p;
982 if (obh->ob != bh->ob)
983 break;
984 if (obh->is_dirty() && obh->last_write <= cutoff) {
985 blist.push_back(obh);
986 ++count;
987 total_len += obh->length();
988 if ((max_count && count > *max_count) ||
989 (max_amount && total_len > *max_amount))
990 break;
991 }
992 }
993
994 while (it != dirty_or_tx_bh.begin()) {
995 --it;
996 BufferHead *obh = *it;
997 if (obh->ob != bh->ob)
998 break;
999 if (obh->is_dirty() && obh->last_write <= cutoff) {
1000 blist.push_front(obh);
1001 ++count;
1002 total_len += obh->length();
1003 if ((max_count && count > *max_count) ||
1004 (max_amount && total_len > *max_amount))
1005 break;
1006 }
1007 }
1008 if (max_count)
1009 *max_count -= count;
1010 if (max_amount)
1011 *max_amount -= total_len;
1012
1013 bh_write_scattered(blist);
1014}
1015
1016class ObjectCacher::C_WriteCommit : public Context {
1017 ObjectCacher *oc;
1018 int64_t poolid;
1019 sobject_t oid;
1020 vector<pair<loff_t, uint64_t> > ranges;
31f18b77 1021 ZTracer::Trace trace;
7c673cae 1022public:
31f18b77 1023 ceph_tid_t tid = 0;
7c673cae 1024 C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, loff_t s,
31f18b77
FG
1025 uint64_t l, const ZTracer::Trace &trace) :
1026 oc(c), poolid(_poolid), oid(o), trace(trace) {
7c673cae
FG
1027 ranges.push_back(make_pair(s, l));
1028 }
1029 C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o,
1030 vector<pair<loff_t, uint64_t> >& _ranges) :
1031 oc(c), poolid(_poolid), oid(o), tid(0) {
1032 ranges.swap(_ranges);
1033 }
1034 void finish(int r) override {
1035 oc->bh_write_commit(poolid, oid, ranges, tid, r);
31f18b77 1036 trace.event("finish");
7c673cae
FG
1037 }
1038};
1039void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
1040{
1041 assert(lock.is_locked());
1042
1043 Object *ob = blist.front()->ob;
1044 ob->get();
1045
1046 ceph::real_time last_write;
1047 SnapContext snapc;
1048 vector<pair<loff_t, uint64_t> > ranges;
1049 vector<pair<uint64_t, bufferlist> > io_vec;
1050
1051 ranges.reserve(blist.size());
1052 io_vec.reserve(blist.size());
1053
1054 uint64_t total_len = 0;
1055 for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1056 BufferHead *bh = *p;
1057 ldout(cct, 7) << "bh_write_scattered " << *bh << dendl;
1058 assert(bh->ob == ob);
1059 assert(bh->bl.length() == bh->length());
1060 ranges.push_back(pair<loff_t, uint64_t>(bh->start(), bh->length()));
1061
1062 int n = io_vec.size();
1063 io_vec.resize(n + 1);
1064 io_vec[n].first = bh->start();
1065 io_vec[n].second = bh->bl;
1066
1067 total_len += bh->length();
1068 if (bh->snapc.seq > snapc.seq)
1069 snapc = bh->snapc;
1070 if (bh->last_write > last_write)
1071 last_write = bh->last_write;
1072 }
1073
1074 C_WriteCommit *oncommit = new C_WriteCommit(this, ob->oloc.pool, ob->get_soid(), ranges);
1075
1076 ceph_tid_t tid = writeback_handler.write(ob->get_oid(), ob->get_oloc(),
1077 io_vec, snapc, last_write,
1078 ob->truncate_size, ob->truncate_seq,
1079 oncommit);
1080 oncommit->tid = tid;
1081 ob->last_write_tid = tid;
1082 for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1083 BufferHead *bh = *p;
1084 bh->last_write_tid = tid;
1085 mark_tx(bh);
1086 }
1087
1088 if (perfcounter)
1089 perfcounter->inc(l_objectcacher_data_flushed, total_len);
1090}
1091
31f18b77 1092void ObjectCacher::bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace)
7c673cae
FG
1093{
1094 assert(lock.is_locked());
1095 ldout(cct, 7) << "bh_write " << *bh << dendl;
1096
1097 bh->ob->get();
1098
31f18b77
FG
1099 ZTracer::Trace trace;
1100 if (parent_trace.valid()) {
1101 trace.init("", &trace_endpoint, &parent_trace);
1102 trace.copy_name("bh_write " + bh->ob->get_oid().name);
1103 trace.event("start");
1104 }
1105
7c673cae
FG
1106 // finishers
1107 C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->oloc.pool,
1108 bh->ob->get_soid(), bh->start(),
31f18b77 1109 bh->length(), trace);
7c673cae
FG
1110 // go
1111 ceph_tid_t tid = writeback_handler.write(bh->ob->get_oid(),
1112 bh->ob->get_oloc(),
1113 bh->start(), bh->length(),
1114 bh->snapc, bh->bl, bh->last_write,
1115 bh->ob->truncate_size,
1116 bh->ob->truncate_seq,
31f18b77 1117 bh->journal_tid, trace, oncommit);
7c673cae
FG
1118 ldout(cct, 20) << " tid " << tid << " on " << bh->ob->get_oid() << dendl;
1119
1120 // set bh last_write_tid
1121 oncommit->tid = tid;
1122 bh->ob->last_write_tid = tid;
1123 bh->last_write_tid = tid;
1124
1125 if (perfcounter) {
1126 perfcounter->inc(l_objectcacher_data_flushed, bh->length());
1127 }
1128
1129 mark_tx(bh);
1130}
1131
1132void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid,
1133 vector<pair<loff_t, uint64_t> >& ranges,
1134 ceph_tid_t tid, int r)
1135{
1136 assert(lock.is_locked());
1137 ldout(cct, 7) << "bh_write_commit " << oid << " tid " << tid
1138 << " ranges " << ranges << " returned " << r << dendl;
1139
1140 if (objects[poolid].count(oid) == 0) {
1141 ldout(cct, 7) << "bh_write_commit no object cache" << dendl;
1142 return;
1143 }
1144
1145 Object *ob = objects[poolid][oid];
1146 int was_dirty_or_tx = ob->oset->dirty_or_tx;
1147
1148 for (vector<pair<loff_t, uint64_t> >::iterator p = ranges.begin();
1149 p != ranges.end();
1150 ++p) {
1151 loff_t start = p->first;
1152 uint64_t length = p->second;
1153 if (!ob->exists) {
1154 ldout(cct, 10) << "bh_write_commit marking exists on " << *ob << dendl;
1155 ob->exists = true;
1156
1157 if (writeback_handler.may_copy_on_write(ob->get_oid(), start, length,
1158 ob->get_snap())) {
1159 ldout(cct, 10) << "bh_write_commit may copy on write, clearing "
1160 "complete on " << *ob << dendl;
1161 ob->complete = false;
1162 }
1163 }
1164
1165 vector<pair<loff_t, BufferHead*>> hit;
1166 // apply to bh's!
1167 for (map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(start);
1168 p != ob->data.end();
1169 ++p) {
1170 BufferHead *bh = p->second;
1171
b32b8144 1172 if (bh->start() >= start+(loff_t)length)
7c673cae
FG
1173 break;
1174
7c673cae
FG
1175 // make sure bh is tx
1176 if (!bh->is_tx()) {
1177 ldout(cct, 10) << "bh_write_commit skipping non-tx " << *bh << dendl;
1178 continue;
1179 }
1180
1181 // make sure bh tid matches
1182 if (bh->last_write_tid != tid) {
1183 assert(bh->last_write_tid > tid);
1184 ldout(cct, 10) << "bh_write_commit newer tid on " << *bh << dendl;
1185 continue;
1186 }
1187
b32b8144
FG
1188 // we don't merge tx buffers. tx buffer should be within the range
1189 assert(bh->start() >= start);
1190 assert(bh->end() <= start+(loff_t)length);
1191
7c673cae
FG
1192 if (r >= 0) {
1193 // ok! mark bh clean and error-free
1194 mark_clean(bh);
1195 bh->set_journal_tid(0);
1196 if (bh->get_nocache())
1197 bh_lru_rest.lru_bottouch(bh);
1198 hit.push_back(make_pair(bh->start(), bh));
1199 ldout(cct, 10) << "bh_write_commit clean " << *bh << dendl;
1200 } else {
1201 mark_dirty(bh);
1202 ldout(cct, 10) << "bh_write_commit marking dirty again due to error "
1203 << *bh << " r = " << r << " " << cpp_strerror(-r)
1204 << dendl;
1205 }
1206 }
1207
1208 for (auto& p : hit) {
1209 //p.second maybe merged and deleted in merge_left
1210 if (ob->data.count(p.first))
1211 ob->try_merge_bh(p.second);
1212 }
1213 }
1214
1215 // update last_commit.
1216 assert(ob->last_commit_tid < tid);
1217 ob->last_commit_tid = tid;
1218
1219 // waiters?
1220 list<Context*> ls;
1221 if (ob->waitfor_commit.count(tid)) {
1222 ls.splice(ls.begin(), ob->waitfor_commit[tid]);
1223 ob->waitfor_commit.erase(tid);
1224 }
1225
1226 // is the entire object set now clean and fully committed?
1227 ObjectSet *oset = ob->oset;
1228 ob->put();
1229
1230 if (flush_set_callback &&
1231 was_dirty_or_tx > 0 &&
1232 oset->dirty_or_tx == 0) { // nothing dirty/tx
1233 flush_set_callback(flush_set_callback_arg, oset);
1234 }
1235
1236 if (!ls.empty())
1237 finish_contexts(cct, ls, r);
1238}
1239
31f18b77 1240void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount)
7c673cae 1241{
31f18b77 1242 assert(trace != nullptr);
7c673cae
FG
1243 assert(lock.is_locked());
1244 ceph::real_time cutoff = ceph::real_clock::now();
1245
1246 ldout(cct, 10) << "flush " << amount << dendl;
1247
1248 /*
1249 * NOTE: we aren't actually pulling things off the LRU here, just
1250 * looking at the tail item. Then we call bh_write, which moves it
1251 * to the other LRU, so that we can call
1252 * lru_dirty.lru_get_next_expire() again.
1253 */
1254 int64_t left = amount;
1255 while (amount == 0 || left > 0) {
1256 BufferHead *bh = static_cast<BufferHead*>(
1257 bh_lru_dirty.lru_get_next_expire());
1258 if (!bh) break;
1259 if (bh->last_write > cutoff) break;
1260
1261 if (scattered_write) {
1262 bh_write_adjacencies(bh, cutoff, amount > 0 ? &left : NULL, NULL);
1263 } else {
1264 left -= bh->length();
31f18b77 1265 bh_write(bh, *trace);
7c673cae 1266 }
31f18b77 1267 }
7c673cae
FG
1268}
1269
1270
1271void ObjectCacher::trim()
1272{
1273 assert(lock.is_locked());
1274 ldout(cct, 10) << "trim start: bytes: max " << max_size << " clean "
1275 << get_stat_clean() << ", objects: max " << max_objects
1276 << " current " << ob_lru.lru_get_size() << dendl;
1277
3efd9988
FG
1278 uint64_t max_clean_bh = max_size >> BUFFER_MEMORY_WEIGHT;
1279 uint64_t nr_clean_bh = bh_lru_rest.lru_get_size() - bh_lru_rest.lru_get_num_pinned();
1280 while (get_stat_clean() > 0 &&
1281 ((uint64_t)get_stat_clean() > max_size ||
1282 nr_clean_bh > max_clean_bh)) {
7c673cae
FG
1283 BufferHead *bh = static_cast<BufferHead*>(bh_lru_rest.lru_expire());
1284 if (!bh)
1285 break;
1286
1287 ldout(cct, 10) << "trim trimming " << *bh << dendl;
1288 assert(bh->is_clean() || bh->is_zero() || bh->is_error());
1289
1290 Object *ob = bh->ob;
1291 bh_remove(ob, bh);
1292 delete bh;
1293
3efd9988
FG
1294 --nr_clean_bh;
1295
7c673cae
FG
1296 if (ob->complete) {
1297 ldout(cct, 10) << "trim clearing complete on " << *ob << dendl;
1298 ob->complete = false;
1299 }
1300 }
1301
1302 while (ob_lru.lru_get_size() > max_objects) {
1303 Object *ob = static_cast<Object*>(ob_lru.lru_expire());
1304 if (!ob)
1305 break;
1306
1307 ldout(cct, 10) << "trim trimming " << *ob << dendl;
1308 close_object(ob);
1309 }
1310
1311 ldout(cct, 10) << "trim finish: max " << max_size << " clean "
1312 << get_stat_clean() << ", objects: max " << max_objects
1313 << " current " << ob_lru.lru_get_size() << dendl;
1314}
1315
1316
1317
1318/* public */
1319
1320bool ObjectCacher::is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
1321 snapid_t snapid)
1322{
1323 assert(lock.is_locked());
1324 for (vector<ObjectExtent>::iterator ex_it = extents.begin();
1325 ex_it != extents.end();
1326 ++ex_it) {
1327 ldout(cct, 10) << "is_cached " << *ex_it << dendl;
1328
1329 // get Object cache
1330 sobject_t soid(ex_it->oid, snapid);
1331 Object *o = get_object_maybe(soid, ex_it->oloc);
1332 if (!o)
1333 return false;
1334 if (!o->is_cached(ex_it->offset, ex_it->length))
1335 return false;
1336 }
1337 return true;
1338}
1339
1340
1341/*
1342 * returns # bytes read (if in cache). onfinish is untouched (caller
1343 * must delete it)
1344 * returns 0 if doing async read
1345 */
31f18b77
FG
1346int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
1347 ZTracer::Trace *parent_trace)
7c673cae 1348{
31f18b77
FG
1349 ZTracer::Trace trace;
1350 if (parent_trace != nullptr) {
1351 trace.init("read", &trace_endpoint, parent_trace);
1352 trace.event("start");
1353 }
1354
1355 int r =_readx(rd, oset, onfinish, true, &trace);
1356 if (r < 0) {
1357 trace.event("finish");
1358 }
1359 return r;
7c673cae
FG
1360}
1361
1362int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
31f18b77 1363 bool external_call, ZTracer::Trace *trace)
7c673cae 1364{
31f18b77 1365 assert(trace != nullptr);
7c673cae
FG
1366 assert(lock.is_locked());
1367 bool success = true;
1368 int error = 0;
1369 uint64_t bytes_in_cache = 0;
1370 uint64_t bytes_not_in_cache = 0;
1371 uint64_t total_bytes_read = 0;
1372 map<uint64_t, bufferlist> stripe_map; // final buffer offset -> substring
1373 bool dontneed = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1374 bool nocache = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1375
1376 /*
1377 * WARNING: we can only meaningfully return ENOENT if the read request
1378 * passed in a single ObjectExtent. Any caller who wants ENOENT instead of
1379 * zeroed buffers needs to feed single extents into readx().
1380 */
1381 assert(!oset->return_enoent || rd->extents.size() == 1);
1382
1383 for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
1384 ex_it != rd->extents.end();
1385 ++ex_it) {
1386 ldout(cct, 10) << "readx " << *ex_it << dendl;
1387
1388 total_bytes_read += ex_it->length;
1389
1390 // get Object cache
1391 sobject_t soid(ex_it->oid, rd->snap);
1392 Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1393 ex_it->truncate_size, oset->truncate_seq);
1394 if (external_call)
1395 touch_ob(o);
1396
1397 // does not exist and no hits?
1398 if (oset->return_enoent && !o->exists) {
1399 ldout(cct, 10) << "readx object !exists, 1 extent..." << dendl;
1400
1401 // should we worry about COW underneath us?
1402 if (writeback_handler.may_copy_on_write(soid.oid, ex_it->offset,
1403 ex_it->length, soid.snap)) {
1404 ldout(cct, 20) << "readx may copy on write" << dendl;
1405 bool wait = false;
1406 list<BufferHead*> blist;
1407 for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1408 bh_it != o->data.end();
1409 ++bh_it) {
1410 BufferHead *bh = bh_it->second;
1411 if (bh->is_dirty() || bh->is_tx()) {
1412 ldout(cct, 10) << "readx flushing " << *bh << dendl;
1413 wait = true;
1414 if (bh->is_dirty()) {
1415 if (scattered_write)
1416 blist.push_back(bh);
1417 else
31f18b77 1418 bh_write(bh, *trace);
7c673cae
FG
1419 }
1420 }
1421 }
1422 if (scattered_write && !blist.empty())
1423 bh_write_scattered(blist);
1424 if (wait) {
1425 ldout(cct, 10) << "readx waiting on tid " << o->last_write_tid
1426 << " on " << *o << dendl;
1427 o->waitfor_commit[o->last_write_tid].push_back(
31f18b77 1428 new C_RetryRead(this,rd, oset, onfinish, *trace));
7c673cae
FG
1429 // FIXME: perfcounter!
1430 return 0;
1431 }
1432 }
1433
1434 // can we return ENOENT?
1435 bool allzero = true;
1436 for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1437 bh_it != o->data.end();
1438 ++bh_it) {
1439 ldout(cct, 20) << "readx ob has bh " << *bh_it->second << dendl;
1440 if (!bh_it->second->is_zero() && !bh_it->second->is_rx()) {
1441 allzero = false;
1442 break;
1443 }
1444 }
1445 if (allzero) {
1446 ldout(cct, 10) << "readx ob has all zero|rx, returning ENOENT"
1447 << dendl;
1448 delete rd;
1449 if (dontneed)
1450 bottouch_ob(o);
1451 return -ENOENT;
1452 }
1453 }
1454
1455 // map extent into bufferheads
1456 map<loff_t, BufferHead*> hits, missing, rx, errors;
1457 o->map_read(*ex_it, hits, missing, rx, errors);
1458 if (external_call) {
1459 // retry reading error buffers
1460 missing.insert(errors.begin(), errors.end());
1461 } else {
1462 // some reads had errors, fail later so completions
1463 // are cleaned up properly
1464 // TODO: make read path not call _readx for every completion
1465 hits.insert(errors.begin(), errors.end());
1466 }
1467
1468 if (!missing.empty() || !rx.empty()) {
1469 // read missing
1470 map<loff_t, BufferHead*>::iterator last = missing.end();
1471 for (map<loff_t, BufferHead*>::iterator bh_it = missing.begin();
1472 bh_it != missing.end();
1473 ++bh_it) {
1474 uint64_t rx_bytes = static_cast<uint64_t>(
1475 stat_rx + bh_it->second->length());
1476 bytes_not_in_cache += bh_it->second->length();
1477 if (!waitfor_read.empty() || (stat_rx > 0 && rx_bytes > max_size)) {
1478 // cache is full with concurrent reads -- wait for rx's to complete
1479 // to constrain memory growth (especially during copy-ups)
1480 if (success) {
1481 ldout(cct, 10) << "readx missed, waiting on cache to complete "
1482 << waitfor_read.size() << " blocked reads, "
1483 << (MAX(rx_bytes, max_size) - max_size)
1484 << " read bytes" << dendl;
31f18b77
FG
1485 waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish,
1486 *trace));
7c673cae
FG
1487 }
1488
1489 bh_remove(o, bh_it->second);
1490 delete bh_it->second;
1491 } else {
1492 bh_it->second->set_nocache(nocache);
31f18b77 1493 bh_read(bh_it->second, rd->fadvise_flags, *trace);
7c673cae
FG
1494 if ((success && onfinish) || last != missing.end())
1495 last = bh_it;
1496 }
1497 success = false;
1498 }
1499
1500 //add wait in last bh avoid wakeup early. Because read is order
1501 if (last != missing.end()) {
1502 ldout(cct, 10) << "readx missed, waiting on " << *last->second
1503 << " off " << last->first << dendl;
1504 last->second->waitfor_read[last->first].push_back(
31f18b77 1505 new C_RetryRead(this, rd, oset, onfinish, *trace) );
7c673cae
FG
1506
1507 }
1508
1509 // bump rx
1510 for (map<loff_t, BufferHead*>::iterator bh_it = rx.begin();
1511 bh_it != rx.end();
1512 ++bh_it) {
1513 touch_bh(bh_it->second); // bump in lru, so we don't lose it.
1514 if (success && onfinish) {
1515 ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
1516 << " off " << bh_it->first << dendl;
1517 bh_it->second->waitfor_read[bh_it->first].push_back(
31f18b77 1518 new C_RetryRead(this, rd, oset, onfinish, *trace) );
7c673cae
FG
1519 }
1520 bytes_not_in_cache += bh_it->second->length();
1521 success = false;
1522 }
1523
1524 for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1525 bh_it != hits.end(); ++bh_it)
1526 //bump in lru, so we don't lose it when later read
1527 touch_bh(bh_it->second);
1528
1529 } else {
1530 assert(!hits.empty());
1531
1532 // make a plain list
1533 for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1534 bh_it != hits.end();
1535 ++bh_it) {
1536 BufferHead *bh = bh_it->second;
1537 ldout(cct, 10) << "readx hit bh " << *bh << dendl;
1538 if (bh->is_error() && bh->error)
1539 error = bh->error;
1540 bytes_in_cache += bh->length();
1541
1542 if (bh->get_nocache() && bh->is_clean())
1543 bh_lru_rest.lru_bottouch(bh);
1544 else
1545 touch_bh(bh);
1546 //must be after touch_bh because touch_bh set dontneed false
1547 if (dontneed &&
1548 ((loff_t)ex_it->offset <= bh->start() &&
1549 (bh->end() <=(loff_t)(ex_it->offset + ex_it->length)))) {
1550 bh->set_dontneed(true); //if dirty
1551 if (bh->is_clean())
1552 bh_lru_rest.lru_bottouch(bh);
1553 }
1554 }
1555
1556 if (!error) {
1557 // create reverse map of buffer offset -> object for the
1558 // eventual result. this is over a single ObjectExtent, so we
1559 // know that
1560 // - the bh's are contiguous
1561 // - the buffer frags need not be (and almost certainly aren't)
1562 loff_t opos = ex_it->offset;
1563 map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1564 assert(bh_it->second->start() <= opos);
1565 uint64_t bhoff = opos - bh_it->second->start();
1566 vector<pair<uint64_t,uint64_t> >::iterator f_it
1567 = ex_it->buffer_extents.begin();
1568 uint64_t foff = 0;
1569 while (1) {
1570 BufferHead *bh = bh_it->second;
1571 assert(opos == (loff_t)(bh->start() + bhoff));
1572
1573 uint64_t len = MIN(f_it->second - foff, bh->length() - bhoff);
1574 ldout(cct, 10) << "readx rmap opos " << opos << ": " << *bh << " +"
1575 << bhoff << " frag " << f_it->first << "~"
1576 << f_it->second << " +" << foff << "~" << len
1577 << dendl;
1578
1579 bufferlist bit;
1580 // put substr here first, since substr_of clobbers, and we
1581 // may get multiple bh's at this stripe_map position
1582 if (bh->is_zero()) {
1583 stripe_map[f_it->first].append_zero(len);
1584 } else {
1585 bit.substr_of(bh->bl,
1586 opos - bh->start(),
1587 len);
1588 stripe_map[f_it->first].claim_append(bit);
1589 }
1590
1591 opos += len;
1592 bhoff += len;
1593 foff += len;
1594 if (opos == bh->end()) {
1595 ++bh_it;
1596 bhoff = 0;
1597 }
1598 if (foff == f_it->second) {
1599 ++f_it;
1600 foff = 0;
1601 }
1602 if (bh_it == hits.end()) break;
1603 if (f_it == ex_it->buffer_extents.end())
1604 break;
1605 }
1606 assert(f_it == ex_it->buffer_extents.end());
1607 assert(opos == (loff_t)ex_it->offset + (loff_t)ex_it->length);
1608 }
1609
1610 if (dontneed && o->include_all_cached_data(ex_it->offset, ex_it->length))
1611 bottouch_ob(o);
1612 }
1613 }
1614
1615 if (!success) {
1616 if (perfcounter && external_call) {
1617 perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1618 perfcounter->inc(l_objectcacher_cache_bytes_miss, bytes_not_in_cache);
1619 perfcounter->inc(l_objectcacher_cache_ops_miss);
1620 }
1621 if (onfinish) {
1622 ldout(cct, 20) << "readx defer " << rd << dendl;
1623 } else {
1624 ldout(cct, 20) << "readx drop " << rd << " (no complete, but no waiter)"
1625 << dendl;
1626 delete rd;
1627 }
1628 return 0; // wait!
1629 }
1630 if (perfcounter && external_call) {
1631 perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1632 perfcounter->inc(l_objectcacher_cache_bytes_hit, bytes_in_cache);
1633 perfcounter->inc(l_objectcacher_cache_ops_hit);
1634 }
1635
1636 // no misses... success! do the read.
1637 ldout(cct, 10) << "readx has all buffers" << dendl;
1638
1639 // ok, assemble into result buffer.
1640 uint64_t pos = 0;
1641 if (rd->bl && !error) {
1642 rd->bl->clear();
1643 for (map<uint64_t,bufferlist>::iterator i = stripe_map.begin();
1644 i != stripe_map.end();
1645 ++i) {
1646 assert(pos == i->first);
1647 ldout(cct, 10) << "readx adding buffer len " << i->second.length()
1648 << " at " << pos << dendl;
1649 pos += i->second.length();
1650 rd->bl->claim_append(i->second);
1651 assert(rd->bl->length() == pos);
1652 }
1653 ldout(cct, 10) << "readx result is " << rd->bl->length() << dendl;
1654 } else if (!error) {
1655 ldout(cct, 10) << "readx no bufferlist ptr (readahead?), done." << dendl;
1656 map<uint64_t,bufferlist>::reverse_iterator i = stripe_map.rbegin();
1657 pos = i->first + i->second.length();
1658 }
1659
1660 // done with read.
1661 int ret = error ? error : pos;
1662 ldout(cct, 20) << "readx done " << rd << " " << ret << dendl;
1663 assert(pos <= (uint64_t) INT_MAX);
1664
1665 delete rd;
1666
1667 trim();
1668
1669 return ret;
1670}
1671
1672void ObjectCacher::retry_waiting_reads()
1673{
1674 list<Context *> ls;
1675 ls.swap(waitfor_read);
1676
1677 while (!ls.empty() && waitfor_read.empty()) {
1678 Context *ctx = ls.front();
1679 ls.pop_front();
1680 ctx->complete(0);
1681 }
1682 waitfor_read.splice(waitfor_read.end(), ls);
1683}
1684
31f18b77
FG
1685int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
1686 ZTracer::Trace *parent_trace)
7c673cae
FG
1687{
1688 assert(lock.is_locked());
1689 ceph::real_time now = ceph::real_clock::now();
1690 uint64_t bytes_written = 0;
1691 uint64_t bytes_written_in_flush = 0;
1692 bool dontneed = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1693 bool nocache = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1694
31f18b77
FG
1695 ZTracer::Trace trace;
1696 if (parent_trace != nullptr) {
1697 trace.init("write", &trace_endpoint, parent_trace);
1698 trace.event("start");
1699 }
1700
7c673cae
FG
1701 for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
1702 ex_it != wr->extents.end();
1703 ++ex_it) {
1704 // get object cache
1705 sobject_t soid(ex_it->oid, CEPH_NOSNAP);
1706 Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1707 ex_it->truncate_size, oset->truncate_seq);
1708
1709 // map it all into a single bufferhead.
1710 BufferHead *bh = o->map_write(*ex_it, wr->journal_tid);
1711 bool missing = bh->is_missing();
1712 bh->snapc = wr->snapc;
31f18b77 1713
7c673cae
FG
1714 bytes_written += ex_it->length;
1715 if (bh->is_tx()) {
1716 bytes_written_in_flush += ex_it->length;
1717 }
1718
1719 // adjust buffer pointers (ie "copy" data into my cache)
1720 // this is over a single ObjectExtent, so we know that
1721 // - there is one contiguous bh
1722 // - the buffer frags need not be (and almost certainly aren't)
1723 // note: i assume striping is monotonic... no jumps backwards, ever!
1724 loff_t opos = ex_it->offset;
1725 for (vector<pair<uint64_t, uint64_t> >::iterator f_it
1726 = ex_it->buffer_extents.begin();
1727 f_it != ex_it->buffer_extents.end();
1728 ++f_it) {
1729 ldout(cct, 10) << "writex writing " << f_it->first << "~"
1730 << f_it->second << " into " << *bh << " at " << opos
1731 << dendl;
1732 uint64_t bhoff = bh->start() - opos;
1733 assert(f_it->second <= bh->length() - bhoff);
1734
1735 // get the frag we're mapping in
1736 bufferlist frag;
1737 frag.substr_of(wr->bl,
1738 f_it->first, f_it->second);
1739
1740 // keep anything left of bhoff
1741 bufferlist newbl;
1742 if (bhoff)
1743 newbl.substr_of(bh->bl, 0, bhoff);
1744 newbl.claim_append(frag);
1745 bh->bl.swap(newbl);
1746
1747 opos += f_it->second;
1748 }
1749
1750 // ok, now bh is dirty.
1751 mark_dirty(bh);
1752 if (dontneed)
1753 bh->set_dontneed(true);
1754 else if (nocache && missing)
1755 bh->set_nocache(true);
1756 else
1757 touch_bh(bh);
1758
1759 bh->last_write = now;
1760
1761 o->try_merge_bh(bh);
1762 }
1763
1764 if (perfcounter) {
1765 perfcounter->inc(l_objectcacher_data_written, bytes_written);
1766 if (bytes_written_in_flush) {
1767 perfcounter->inc(l_objectcacher_overwritten_in_flush,
1768 bytes_written_in_flush);
1769 }
1770 }
1771
31f18b77 1772 int r = _wait_for_write(wr, bytes_written, oset, &trace, onfreespace);
7c673cae
FG
1773 delete wr;
1774
1775 //verify_stats();
1776 trim();
1777 return r;
1778}
1779
1780class ObjectCacher::C_WaitForWrite : public Context {
1781public:
31f18b77
FG
1782 C_WaitForWrite(ObjectCacher *oc, uint64_t len,
1783 const ZTracer::Trace &trace, Context *onfinish) :
1784 m_oc(oc), m_len(len), m_trace(trace), m_onfinish(onfinish) {}
7c673cae
FG
1785 void finish(int r) override;
1786private:
1787 ObjectCacher *m_oc;
1788 uint64_t m_len;
31f18b77 1789 ZTracer::Trace m_trace;
7c673cae
FG
1790 Context *m_onfinish;
1791};
1792
1793void ObjectCacher::C_WaitForWrite::finish(int r)
1794{
1795 Mutex::Locker l(m_oc->lock);
31f18b77 1796 m_oc->maybe_wait_for_writeback(m_len, &m_trace);
7c673cae
FG
1797 m_onfinish->complete(r);
1798}
1799
31f18b77
FG
1800void ObjectCacher::maybe_wait_for_writeback(uint64_t len,
1801 ZTracer::Trace *trace)
7c673cae
FG
1802{
1803 assert(lock.is_locked());
1804 ceph::mono_time start = ceph::mono_clock::now();
1805 int blocked = 0;
1806 // wait for writeback?
1807 // - wait for dirty and tx bytes (relative to the max_dirty threshold)
1808 // - do not wait for bytes other waiters are waiting on. this means that
1809 // threads do not wait for each other. this effectively allows the cache
1810 // size to balloon proportional to the data that is in flight.
3efd9988
FG
1811
1812 uint64_t max_dirty_bh = max_dirty >> BUFFER_MEMORY_WEIGHT;
7c673cae 1813 while (get_stat_dirty() + get_stat_tx() > 0 &&
3efd9988
FG
1814 (((uint64_t)(get_stat_dirty() + get_stat_tx()) >=
1815 max_dirty + get_stat_dirty_waiting()) ||
1816 (dirty_or_tx_bh.size() >=
1817 max_dirty_bh + get_stat_nr_dirty_waiters()))) {
1818
31f18b77
FG
1819 if (blocked == 0) {
1820 trace->event("start wait for writeback");
1821 }
7c673cae
FG
1822 ldout(cct, 10) << __func__ << " waiting for dirty|tx "
1823 << (get_stat_dirty() + get_stat_tx()) << " >= max "
1824 << max_dirty << " + dirty_waiting "
1825 << get_stat_dirty_waiting() << dendl;
1826 flusher_cond.Signal();
1827 stat_dirty_waiting += len;
3efd9988 1828 ++stat_nr_dirty_waiters;
7c673cae
FG
1829 stat_cond.Wait(lock);
1830 stat_dirty_waiting -= len;
3efd9988 1831 --stat_nr_dirty_waiters;
7c673cae
FG
1832 ++blocked;
1833 ldout(cct, 10) << __func__ << " woke up" << dendl;
1834 }
31f18b77
FG
1835 if (blocked > 0) {
1836 trace->event("finish wait for writeback");
1837 }
7c673cae
FG
1838 if (blocked && perfcounter) {
1839 perfcounter->inc(l_objectcacher_write_ops_blocked);
1840 perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
1841 ceph::timespan blocked = ceph::mono_clock::now() - start;
1842 perfcounter->tinc(l_objectcacher_write_time_blocked, blocked);
1843 }
1844}
1845
1846// blocking wait for write.
1847int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
31f18b77 1848 ZTracer::Trace *trace, Context *onfreespace)
7c673cae
FG
1849{
1850 assert(lock.is_locked());
31f18b77 1851 assert(trace != nullptr);
7c673cae
FG
1852 int ret = 0;
1853
1854 if (max_dirty > 0) {
1855 if (block_writes_upfront) {
31f18b77 1856 maybe_wait_for_writeback(len, trace);
7c673cae
FG
1857 if (onfreespace)
1858 onfreespace->complete(0);
1859 } else {
1860 assert(onfreespace);
31f18b77 1861 finisher.queue(new C_WaitForWrite(this, len, *trace, onfreespace));
7c673cae
FG
1862 }
1863 } else {
1864 // write-thru! flush what we just wrote.
1865 Cond cond;
1866 bool done = false;
1867 Context *fin = block_writes_upfront ?
1868 new C_Cond(&cond, &done, &ret) : onfreespace;
1869 assert(fin);
31f18b77 1870 bool flushed = flush_set(oset, wr->extents, trace, fin);
7c673cae
FG
1871 assert(!flushed); // we just dirtied it, and didn't drop our lock!
1872 ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len
1873 << " bytes" << dendl;
1874 if (block_writes_upfront) {
1875 while (!done)
1876 cond.Wait(lock);
1877 ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl;
1878 if (onfreespace)
1879 onfreespace->complete(ret);
1880 }
1881 }
1882
1883 // start writeback anyway?
1884 if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty) {
1885 ldout(cct, 10) << "wait_for_write " << get_stat_dirty() << " > target "
1886 << target_dirty << ", nudging flusher" << dendl;
1887 flusher_cond.Signal();
1888 }
1889 return ret;
1890}
1891
1892void ObjectCacher::flusher_entry()
1893{
1894 ldout(cct, 10) << "flusher start" << dendl;
1895 lock.Lock();
1896 while (!flusher_stop) {
1897 loff_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() +
1898 get_stat_dirty();
1899 ldout(cct, 11) << "flusher "
1900 << all << " / " << max_size << ": "
1901 << get_stat_tx() << " tx, "
1902 << get_stat_rx() << " rx, "
1903 << get_stat_clean() << " clean, "
1904 << get_stat_dirty() << " dirty ("
1905 << target_dirty << " target, "
1906 << max_dirty << " max)"
1907 << dendl;
1908 loff_t actual = get_stat_dirty() + get_stat_dirty_waiting();
31f18b77
FG
1909
1910 ZTracer::Trace trace;
1911 if (cct->_conf->osdc_blkin_trace_all) {
1912 trace.init("flusher", &trace_endpoint);
1913 trace.event("start");
1914 }
1915
7c673cae
FG
1916 if (actual > 0 && (uint64_t) actual > target_dirty) {
1917 // flush some dirty pages
1918 ldout(cct, 10) << "flusher " << get_stat_dirty() << " dirty + "
1919 << get_stat_dirty_waiting() << " dirty_waiting > target "
1920 << target_dirty << ", flushing some dirty bhs" << dendl;
31f18b77 1921 flush(&trace, actual - target_dirty);
7c673cae
FG
1922 } else {
1923 // check tail of lru for old dirty items
1924 ceph::real_time cutoff = ceph::real_clock::now();
1925 cutoff -= max_dirty_age;
1926 BufferHead *bh = 0;
1927 int max = MAX_FLUSH_UNDER_LOCK;
1928 while ((bh = static_cast<BufferHead*>(bh_lru_dirty.
1929 lru_get_next_expire())) != 0 &&
1930 bh->last_write <= cutoff &&
1931 max > 0) {
1932 ldout(cct, 10) << "flusher flushing aged dirty bh " << *bh << dendl;
1933 if (scattered_write) {
1934 bh_write_adjacencies(bh, cutoff, NULL, &max);
1935 } else {
31f18b77 1936 bh_write(bh, trace);
7c673cae
FG
1937 --max;
1938 }
1939 }
1940 if (!max) {
1941 // back off the lock to avoid starving other threads
31f18b77 1942 trace.event("backoff");
7c673cae
FG
1943 lock.Unlock();
1944 lock.Lock();
1945 continue;
1946 }
1947 }
31f18b77
FG
1948
1949 trace.event("finish");
7c673cae
FG
1950 if (flusher_stop)
1951 break;
1952
1953 flusher_cond.WaitInterval(lock, seconds(1));
1954 }
1955
1956 /* Wait for reads to finish. This is only possible if handling
1957 * -ENOENT made some read completions finish before their rados read
1958 * came back. If we don't wait for them, and destroy the cache, when
1959 * the rados reads do come back their callback will try to access the
1960 * no-longer-valid ObjectCacher.
1961 */
1962 while (reads_outstanding > 0) {
1963 ldout(cct, 10) << "Waiting for all reads to complete. Number left: "
1964 << reads_outstanding << dendl;
1965 read_cond.Wait(lock);
1966 }
1967
1968 lock.Unlock();
1969 ldout(cct, 10) << "flusher finish" << dendl;
1970}
1971
1972
1973// -------------------------------------------------
1974
1975bool ObjectCacher::set_is_empty(ObjectSet *oset)
1976{
1977 assert(lock.is_locked());
1978 if (oset->objects.empty())
1979 return true;
1980
1981 for (xlist<Object*>::iterator p = oset->objects.begin(); !p.end(); ++p)
1982 if (!(*p)->is_empty())
1983 return false;
1984
1985 return true;
1986}
1987
1988bool ObjectCacher::set_is_cached(ObjectSet *oset)
1989{
1990 assert(lock.is_locked());
1991 if (oset->objects.empty())
1992 return false;
1993
1994 for (xlist<Object*>::iterator p = oset->objects.begin();
1995 !p.end(); ++p) {
1996 Object *ob = *p;
1997 for (map<loff_t,BufferHead*>::iterator q = ob->data.begin();
1998 q != ob->data.end();
1999 ++q) {
2000 BufferHead *bh = q->second;
2001 if (!bh->is_dirty() && !bh->is_tx())
2002 return true;
2003 }
2004 }
2005
2006 return false;
2007}
2008
2009bool ObjectCacher::set_is_dirty_or_committing(ObjectSet *oset)
2010{
2011 assert(lock.is_locked());
2012 if (oset->objects.empty())
2013 return false;
2014
2015 for (xlist<Object*>::iterator i = oset->objects.begin();
2016 !i.end(); ++i) {
2017 Object *ob = *i;
2018
2019 for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
2020 p != ob->data.end();
2021 ++p) {
2022 BufferHead *bh = p->second;
2023 if (bh->is_dirty() || bh->is_tx())
2024 return true;
2025 }
2026 }
2027
2028 return false;
2029}
2030
2031
2032// purge. non-blocking. violently removes dirty buffers from cache.
2033void ObjectCacher::purge(Object *ob)
2034{
2035 assert(lock.is_locked());
2036 ldout(cct, 10) << "purge " << *ob << dendl;
2037
2038 ob->truncate(0);
2039}
2040
2041
2042// flush. non-blocking. no callback.
2043// true if clean, already flushed.
2044// false if we wrote something.
2045// be sloppy about the ranges and flush any buffer it touches
31f18b77
FG
2046bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length,
2047 ZTracer::Trace *trace)
7c673cae 2048{
31f18b77 2049 assert(trace != nullptr);
7c673cae
FG
2050 assert(lock.is_locked());
2051 list<BufferHead*> blist;
2052 bool clean = true;
2053 ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl;
2054 for (map<loff_t,BufferHead*>::const_iterator p = ob->data_lower_bound(offset);
2055 p != ob->data.end();
2056 ++p) {
2057 BufferHead *bh = p->second;
2058 ldout(cct, 20) << "flush " << *bh << dendl;
2059 if (length && bh->start() > offset+length) {
2060 break;
2061 }
2062 if (bh->is_tx()) {
2063 clean = false;
2064 continue;
2065 }
2066 if (!bh->is_dirty()) {
2067 continue;
2068 }
2069
2070 if (scattered_write)
2071 blist.push_back(bh);
2072 else
31f18b77 2073 bh_write(bh, *trace);
7c673cae
FG
2074 clean = false;
2075 }
2076 if (scattered_write && !blist.empty())
2077 bh_write_scattered(blist);
2078
2079 return clean;
2080}
2081
2082bool ObjectCacher::_flush_set_finish(C_GatherBuilder *gather,
2083 Context *onfinish)
2084{
2085 assert(lock.is_locked());
2086 if (gather->has_subs()) {
2087 gather->set_finisher(onfinish);
2088 gather->activate();
2089 return false;
2090 }
2091
2092 ldout(cct, 10) << "flush_set has no dirty|tx bhs" << dendl;
2093 onfinish->complete(0);
2094 return true;
2095}
2096
2097// flush. non-blocking, takes callback.
2098// returns true if already flushed
2099bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
2100{
2101 assert(lock.is_locked());
2102 assert(onfinish != NULL);
2103 if (oset->objects.empty()) {
2104 ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2105 onfinish->complete(0);
2106 return true;
2107 }
2108
2109 ldout(cct, 10) << "flush_set " << oset << dendl;
2110
2111 // we'll need to wait for all objects to flush!
2112 C_GatherBuilder gather(cct);
2113 set<Object*> waitfor_commit;
2114
2115 list<BufferHead*> blist;
2116 Object *last_ob = NULL;
2117 set<BufferHead*, BufferHead::ptr_lt>::const_iterator it, p, q;
2118
2119 // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
2120 // order. But items in oset->objects are not sorted. So the iterator can
2121 // point to any buffer head in the ObjectSet
2122 BufferHead key(*oset->objects.begin());
2123 it = dirty_or_tx_bh.lower_bound(&key);
2124 p = q = it;
2125
2126 bool backwards = true;
2127 if (it != dirty_or_tx_bh.begin())
2128 --it;
2129 else
2130 backwards = false;
2131
2132 for (; p != dirty_or_tx_bh.end(); p = q) {
2133 ++q;
2134 BufferHead *bh = *p;
2135 if (bh->ob->oset != oset)
2136 break;
2137 waitfor_commit.insert(bh->ob);
2138 if (bh->is_dirty()) {
2139 if (scattered_write) {
2140 if (last_ob != bh->ob) {
2141 if (!blist.empty()) {
2142 bh_write_scattered(blist);
2143 blist.clear();
2144 }
2145 last_ob = bh->ob;
2146 }
2147 blist.push_back(bh);
2148 } else {
31f18b77 2149 bh_write(bh, {});
7c673cae
FG
2150 }
2151 }
2152 }
2153
2154 if (backwards) {
2155 for(p = q = it; true; p = q) {
2156 if (q != dirty_or_tx_bh.begin())
2157 --q;
2158 else
2159 backwards = false;
2160 BufferHead *bh = *p;
2161 if (bh->ob->oset != oset)
2162 break;
2163 waitfor_commit.insert(bh->ob);
2164 if (bh->is_dirty()) {
2165 if (scattered_write) {
2166 if (last_ob != bh->ob) {
2167 if (!blist.empty()) {
2168 bh_write_scattered(blist);
2169 blist.clear();
2170 }
2171 last_ob = bh->ob;
2172 }
2173 blist.push_front(bh);
2174 } else {
31f18b77 2175 bh_write(bh, {});
7c673cae
FG
2176 }
2177 }
2178 if (!backwards)
2179 break;
2180 }
2181 }
2182
2183 if (scattered_write && !blist.empty())
2184 bh_write_scattered(blist);
2185
2186 for (set<Object*>::iterator i = waitfor_commit.begin();
2187 i != waitfor_commit.end(); ++i) {
2188 Object *ob = *i;
2189
2190 // we'll need to gather...
2191 ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2192 << ob->last_write_tid << " on " << *ob << dendl;
2193 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2194 }
2195
2196 return _flush_set_finish(&gather, onfinish);
2197}
2198
2199// flush. non-blocking, takes callback.
2200// returns true if already flushed
2201bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv,
31f18b77 2202 ZTracer::Trace *trace, Context *onfinish)
7c673cae
FG
2203{
2204 assert(lock.is_locked());
31f18b77 2205 assert(trace != nullptr);
7c673cae
FG
2206 assert(onfinish != NULL);
2207 if (oset->objects.empty()) {
2208 ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2209 onfinish->complete(0);
2210 return true;
2211 }
2212
2213 ldout(cct, 10) << "flush_set " << oset << " on " << exv.size()
2214 << " ObjectExtents" << dendl;
2215
2216 // we'll need to wait for all objects to flush!
2217 C_GatherBuilder gather(cct);
2218
2219 for (vector<ObjectExtent>::iterator p = exv.begin();
2220 p != exv.end();
2221 ++p) {
2222 ObjectExtent &ex = *p;
2223 sobject_t soid(ex.oid, CEPH_NOSNAP);
2224 if (objects[oset->poolid].count(soid) == 0)
2225 continue;
2226 Object *ob = objects[oset->poolid][soid];
2227
2228 ldout(cct, 20) << "flush_set " << oset << " ex " << ex << " ob " << soid
2229 << " " << ob << dendl;
2230
31f18b77 2231 if (!flush(ob, ex.offset, ex.length, trace)) {
7c673cae
FG
2232 // we'll need to gather...
2233 ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2234 << ob->last_write_tid << " on " << *ob << dendl;
2235 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2236 }
2237 }
2238
2239 return _flush_set_finish(&gather, onfinish);
2240}
2241
2242// flush all dirty data. non-blocking, takes callback.
2243// returns true if already flushed
2244bool ObjectCacher::flush_all(Context *onfinish)
2245{
2246 assert(lock.is_locked());
2247 assert(onfinish != NULL);
2248
2249 ldout(cct, 10) << "flush_all " << dendl;
2250
2251 // we'll need to wait for all objects to flush!
2252 C_GatherBuilder gather(cct);
2253 set<Object*> waitfor_commit;
2254
2255 list<BufferHead*> blist;
2256 Object *last_ob = NULL;
2257 set<BufferHead*, BufferHead::ptr_lt>::iterator next, it;
2258 next = it = dirty_or_tx_bh.begin();
2259 while (it != dirty_or_tx_bh.end()) {
2260 ++next;
2261 BufferHead *bh = *it;
2262 waitfor_commit.insert(bh->ob);
2263
2264 if (bh->is_dirty()) {
2265 if (scattered_write) {
2266 if (last_ob != bh->ob) {
2267 if (!blist.empty()) {
2268 bh_write_scattered(blist);
2269 blist.clear();
2270 }
2271 last_ob = bh->ob;
2272 }
2273 blist.push_back(bh);
2274 } else {
31f18b77 2275 bh_write(bh, {});
7c673cae
FG
2276 }
2277 }
2278
2279 it = next;
2280 }
2281
2282 if (scattered_write && !blist.empty())
2283 bh_write_scattered(blist);
2284
2285 for (set<Object*>::iterator i = waitfor_commit.begin();
2286 i != waitfor_commit.end();
2287 ++i) {
2288 Object *ob = *i;
2289
2290 // we'll need to gather...
2291 ldout(cct, 10) << "flush_all will wait for ack tid "
2292 << ob->last_write_tid << " on " << *ob << dendl;
2293 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2294 }
2295
2296 return _flush_set_finish(&gather, onfinish);
2297}
2298
2299void ObjectCacher::purge_set(ObjectSet *oset)
2300{
2301 assert(lock.is_locked());
2302 if (oset->objects.empty()) {
2303 ldout(cct, 10) << "purge_set on " << oset << " dne" << dendl;
2304 return;
2305 }
2306
2307 ldout(cct, 10) << "purge_set " << oset << dendl;
2308 const bool were_dirty = oset->dirty_or_tx > 0;
2309
2310 for (xlist<Object*>::iterator i = oset->objects.begin();
2311 !i.end(); ++i) {
2312 Object *ob = *i;
2313 purge(ob);
2314 }
2315
2316 // Although we have purged rather than flushed, caller should still
2317 // drop any resources associate with dirty data.
2318 assert(oset->dirty_or_tx == 0);
2319 if (flush_set_callback && were_dirty) {
2320 flush_set_callback(flush_set_callback_arg, oset);
2321 }
2322}
2323
2324
2325loff_t ObjectCacher::release(Object *ob)
2326{
2327 assert(lock.is_locked());
2328 list<BufferHead*> clean;
2329 loff_t o_unclean = 0;
2330
2331 for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
2332 p != ob->data.end();
2333 ++p) {
2334 BufferHead *bh = p->second;
2335 if (bh->is_clean() || bh->is_zero() || bh->is_error())
2336 clean.push_back(bh);
2337 else
2338 o_unclean += bh->length();
2339 }
2340
2341 for (list<BufferHead*>::iterator p = clean.begin();
2342 p != clean.end();
2343 ++p) {
2344 bh_remove(ob, *p);
2345 delete *p;
2346 }
2347
2348 if (ob->can_close()) {
2349 ldout(cct, 10) << "release trimming " << *ob << dendl;
2350 close_object(ob);
2351 assert(o_unclean == 0);
2352 return 0;
2353 }
2354
2355 if (ob->complete) {
2356 ldout(cct, 10) << "release clearing complete on " << *ob << dendl;
2357 ob->complete = false;
2358 }
2359 if (!ob->exists) {
2360 ldout(cct, 10) << "release setting exists on " << *ob << dendl;
2361 ob->exists = true;
2362 }
2363
2364 return o_unclean;
2365}
2366
2367loff_t ObjectCacher::release_set(ObjectSet *oset)
2368{
2369 assert(lock.is_locked());
2370 // return # bytes not clean (and thus not released).
2371 loff_t unclean = 0;
2372
2373 if (oset->objects.empty()) {
2374 ldout(cct, 10) << "release_set on " << oset << " dne" << dendl;
2375 return 0;
2376 }
2377
2378 ldout(cct, 10) << "release_set " << oset << dendl;
2379
2380 xlist<Object*>::iterator q;
2381 for (xlist<Object*>::iterator p = oset->objects.begin();
2382 !p.end(); ) {
2383 q = p;
2384 ++q;
2385 Object *ob = *p;
2386
2387 loff_t o_unclean = release(ob);
2388 unclean += o_unclean;
2389
2390 if (o_unclean)
2391 ldout(cct, 10) << "release_set " << oset << " " << *ob
2392 << " has " << o_unclean << " bytes left"
2393 << dendl;
2394 p = q;
2395 }
2396
2397 if (unclean) {
2398 ldout(cct, 10) << "release_set " << oset
2399 << ", " << unclean << " bytes left" << dendl;
2400 }
2401
2402 return unclean;
2403}
2404
2405
2406uint64_t ObjectCacher::release_all()
2407{
2408 assert(lock.is_locked());
2409 ldout(cct, 10) << "release_all" << dendl;
2410 uint64_t unclean = 0;
2411
2412 vector<ceph::unordered_map<sobject_t, Object*> >::iterator i
2413 = objects.begin();
2414 while (i != objects.end()) {
2415 ceph::unordered_map<sobject_t, Object*>::iterator p = i->begin();
2416 while (p != i->end()) {
2417 ceph::unordered_map<sobject_t, Object*>::iterator n = p;
2418 ++n;
2419
2420 Object *ob = p->second;
2421
2422 loff_t o_unclean = release(ob);
2423 unclean += o_unclean;
2424
2425 if (o_unclean)
2426 ldout(cct, 10) << "release_all " << *ob
2427 << " has " << o_unclean << " bytes left"
2428 << dendl;
2429 p = n;
2430 }
2431 ++i;
2432 }
2433
2434 if (unclean) {
2435 ldout(cct, 10) << "release_all unclean " << unclean << " bytes left"
2436 << dendl;
2437 }
2438
2439 return unclean;
2440}
2441
2442void ObjectCacher::clear_nonexistence(ObjectSet *oset)
2443{
2444 assert(lock.is_locked());
2445 ldout(cct, 10) << "clear_nonexistence() " << oset << dendl;
2446
2447 for (xlist<Object*>::iterator p = oset->objects.begin();
2448 !p.end(); ++p) {
2449 Object *ob = *p;
2450 if (!ob->exists) {
2451 ldout(cct, 10) << " setting exists and complete on " << *ob << dendl;
2452 ob->exists = true;
2453 ob->complete = false;
2454 }
2455 for (xlist<C_ReadFinish*>::iterator q = ob->reads.begin();
2456 !q.end(); ++q) {
2457 C_ReadFinish *comp = *q;
2458 comp->distrust_enoent();
2459 }
2460 }
2461}
2462
2463/**
2464 * discard object extents from an ObjectSet by removing the objects in
2465 * exls from the in-memory oset.
2466 */
2467void ObjectCacher::discard_set(ObjectSet *oset, const vector<ObjectExtent>& exls)
2468{
2469 assert(lock.is_locked());
28e407b8
AA
2470 bool was_dirty = oset->dirty_or_tx > 0;
2471
2472 _discard(oset, exls, nullptr);
2473 _discard_finish(oset, was_dirty, nullptr);
2474}
2475
2476/**
2477 * discard object extents from an ObjectSet by removing the objects in
2478 * exls from the in-memory oset. If the bh is in TX state, the discard
2479 * will wait for the write to commit prior to invoking on_finish.
2480 */
2481void ObjectCacher::discard_writeback(ObjectSet *oset,
2482 const vector<ObjectExtent>& exls,
2483 Context* on_finish)
2484{
2485 assert(lock.is_locked());
2486 bool was_dirty = oset->dirty_or_tx > 0;
2487
2488 C_GatherBuilder gather(cct);
2489 _discard(oset, exls, &gather);
2490
2491 if (gather.has_subs()) {
2492 bool flushed = was_dirty && oset->dirty_or_tx == 0;
2493 gather.set_finisher(new FunctionContext(
2494 [this, oset, flushed, on_finish](int) {
2495 assert(lock.is_locked());
2496 if (flushed && flush_set_callback)
2497 flush_set_callback(flush_set_callback_arg, oset);
2498 if (on_finish)
2499 on_finish->complete(0);
2500 }));
2501 gather.activate();
7c673cae
FG
2502 return;
2503 }
2504
28e407b8
AA
2505 _discard_finish(oset, was_dirty, on_finish);
2506}
2507
2508void ObjectCacher::_discard(ObjectSet *oset, const vector<ObjectExtent>& exls,
2509 C_GatherBuilder* gather)
2510{
2511 if (oset->objects.empty()) {
2512 ldout(cct, 10) << __func__ << " on " << oset << " dne" << dendl;
2513 return;
2514 }
7c673cae 2515
28e407b8 2516 ldout(cct, 10) << __func__ << " " << oset << dendl;
7c673cae 2517
28e407b8
AA
2518 for (auto& ex : exls) {
2519 ldout(cct, 10) << __func__ << " " << oset << " ex " << ex << dendl;
7c673cae
FG
2520 sobject_t soid(ex.oid, CEPH_NOSNAP);
2521 if (objects[oset->poolid].count(soid) == 0)
2522 continue;
2523 Object *ob = objects[oset->poolid][soid];
2524
28e407b8 2525 ob->discard(ex.offset, ex.length, gather);
7c673cae 2526 }
28e407b8
AA
2527}
2528
2529void ObjectCacher::_discard_finish(ObjectSet *oset, bool was_dirty,
2530 Context* on_finish)
2531{
2532 assert(lock.is_locked());
7c673cae
FG
2533
2534 // did we truncate off dirty data?
28e407b8 2535 if (flush_set_callback && was_dirty && oset->dirty_or_tx == 0) {
7c673cae 2536 flush_set_callback(flush_set_callback_arg, oset);
28e407b8
AA
2537 }
2538
2539 // notify that in-flight writeback has completed
2540 if (on_finish != nullptr) {
2541 on_finish->complete(0);
2542 }
7c673cae
FG
2543}
2544
2545void ObjectCacher::verify_stats() const
2546{
2547 assert(lock.is_locked());
2548 ldout(cct, 10) << "verify_stats" << dendl;
2549
2550 loff_t clean = 0, zero = 0, dirty = 0, rx = 0, tx = 0, missing = 0,
2551 error = 0;
2552 for (vector<ceph::unordered_map<sobject_t, Object*> >::const_iterator i
2553 = objects.begin();
2554 i != objects.end();
2555 ++i) {
2556 for (ceph::unordered_map<sobject_t, Object*>::const_iterator p
2557 = i->begin();
2558 p != i->end();
2559 ++p) {
2560 Object *ob = p->second;
2561 for (map<loff_t, BufferHead*>::const_iterator q = ob->data.begin();
2562 q != ob->data.end();
2563 ++q) {
2564 BufferHead *bh = q->second;
2565 switch (bh->get_state()) {
2566 case BufferHead::STATE_MISSING:
2567 missing += bh->length();
2568 break;
2569 case BufferHead::STATE_CLEAN:
2570 clean += bh->length();
2571 break;
2572 case BufferHead::STATE_ZERO:
2573 zero += bh->length();
2574 break;
2575 case BufferHead::STATE_DIRTY:
2576 dirty += bh->length();
2577 break;
2578 case BufferHead::STATE_TX:
2579 tx += bh->length();
2580 break;
2581 case BufferHead::STATE_RX:
2582 rx += bh->length();
2583 break;
2584 case BufferHead::STATE_ERROR:
2585 error += bh->length();
2586 break;
2587 default:
2588 ceph_abort();
2589 }
2590 }
2591 }
2592 }
2593
2594 ldout(cct, 10) << " clean " << clean << " rx " << rx << " tx " << tx
2595 << " dirty " << dirty << " missing " << missing
2596 << " error " << error << dendl;
2597 assert(clean == stat_clean);
2598 assert(rx == stat_rx);
2599 assert(tx == stat_tx);
2600 assert(dirty == stat_dirty);
2601 assert(missing == stat_missing);
2602 assert(zero == stat_zero);
2603 assert(error == stat_error);
2604}
2605
2606void ObjectCacher::bh_stat_add(BufferHead *bh)
2607{
2608 assert(lock.is_locked());
2609 switch (bh->get_state()) {
2610 case BufferHead::STATE_MISSING:
2611 stat_missing += bh->length();
2612 break;
2613 case BufferHead::STATE_CLEAN:
2614 stat_clean += bh->length();
2615 break;
2616 case BufferHead::STATE_ZERO:
2617 stat_zero += bh->length();
2618 break;
2619 case BufferHead::STATE_DIRTY:
2620 stat_dirty += bh->length();
2621 bh->ob->dirty_or_tx += bh->length();
2622 bh->ob->oset->dirty_or_tx += bh->length();
2623 break;
2624 case BufferHead::STATE_TX:
2625 stat_tx += bh->length();
2626 bh->ob->dirty_or_tx += bh->length();
2627 bh->ob->oset->dirty_or_tx += bh->length();
2628 break;
2629 case BufferHead::STATE_RX:
2630 stat_rx += bh->length();
2631 break;
2632 case BufferHead::STATE_ERROR:
2633 stat_error += bh->length();
2634 break;
2635 default:
2636 assert(0 == "bh_stat_add: invalid bufferhead state");
2637 }
2638 if (get_stat_dirty_waiting() > 0)
2639 stat_cond.Signal();
2640}
2641
2642void ObjectCacher::bh_stat_sub(BufferHead *bh)
2643{
2644 assert(lock.is_locked());
2645 switch (bh->get_state()) {
2646 case BufferHead::STATE_MISSING:
2647 stat_missing -= bh->length();
2648 break;
2649 case BufferHead::STATE_CLEAN:
2650 stat_clean -= bh->length();
2651 break;
2652 case BufferHead::STATE_ZERO:
2653 stat_zero -= bh->length();
2654 break;
2655 case BufferHead::STATE_DIRTY:
2656 stat_dirty -= bh->length();
2657 bh->ob->dirty_or_tx -= bh->length();
2658 bh->ob->oset->dirty_or_tx -= bh->length();
2659 break;
2660 case BufferHead::STATE_TX:
2661 stat_tx -= bh->length();
2662 bh->ob->dirty_or_tx -= bh->length();
2663 bh->ob->oset->dirty_or_tx -= bh->length();
2664 break;
2665 case BufferHead::STATE_RX:
2666 stat_rx -= bh->length();
2667 break;
2668 case BufferHead::STATE_ERROR:
2669 stat_error -= bh->length();
2670 break;
2671 default:
2672 assert(0 == "bh_stat_sub: invalid bufferhead state");
2673 }
2674}
2675
2676void ObjectCacher::bh_set_state(BufferHead *bh, int s)
2677{
2678 assert(lock.is_locked());
2679 int state = bh->get_state();
2680 // move between lru lists?
2681 if (s == BufferHead::STATE_DIRTY && state != BufferHead::STATE_DIRTY) {
2682 bh_lru_rest.lru_remove(bh);
2683 bh_lru_dirty.lru_insert_top(bh);
2684 } else if (s != BufferHead::STATE_DIRTY &&state == BufferHead::STATE_DIRTY) {
2685 bh_lru_dirty.lru_remove(bh);
2686 if (bh->get_dontneed())
2687 bh_lru_rest.lru_insert_bot(bh);
2688 else
2689 bh_lru_rest.lru_insert_top(bh);
2690 }
2691
2692 if ((s == BufferHead::STATE_TX ||
2693 s == BufferHead::STATE_DIRTY) &&
2694 state != BufferHead::STATE_TX &&
2695 state != BufferHead::STATE_DIRTY) {
2696 dirty_or_tx_bh.insert(bh);
2697 } else if ((state == BufferHead::STATE_TX ||
2698 state == BufferHead::STATE_DIRTY) &&
2699 s != BufferHead::STATE_TX &&
2700 s != BufferHead::STATE_DIRTY) {
2701 dirty_or_tx_bh.erase(bh);
2702 }
2703
2704 if (s != BufferHead::STATE_ERROR &&
2705 state == BufferHead::STATE_ERROR) {
2706 bh->error = 0;
2707 }
2708
2709 // set state
2710 bh_stat_sub(bh);
2711 bh->set_state(s);
2712 bh_stat_add(bh);
2713}
2714
2715void ObjectCacher::bh_add(Object *ob, BufferHead *bh)
2716{
2717 assert(lock.is_locked());
2718 ldout(cct, 30) << "bh_add " << *ob << " " << *bh << dendl;
2719 ob->add_bh(bh);
2720 if (bh->is_dirty()) {
2721 bh_lru_dirty.lru_insert_top(bh);
2722 dirty_or_tx_bh.insert(bh);
2723 } else {
2724 if (bh->get_dontneed())
2725 bh_lru_rest.lru_insert_bot(bh);
2726 else
2727 bh_lru_rest.lru_insert_top(bh);
2728 }
2729
2730 if (bh->is_tx()) {
2731 dirty_or_tx_bh.insert(bh);
2732 }
2733 bh_stat_add(bh);
2734}
2735
2736void ObjectCacher::bh_remove(Object *ob, BufferHead *bh)
2737{
2738 assert(lock.is_locked());
2739 assert(bh->get_journal_tid() == 0);
2740 ldout(cct, 30) << "bh_remove " << *ob << " " << *bh << dendl;
2741 ob->remove_bh(bh);
2742 if (bh->is_dirty()) {
2743 bh_lru_dirty.lru_remove(bh);
2744 dirty_or_tx_bh.erase(bh);
2745 } else {
2746 bh_lru_rest.lru_remove(bh);
2747 }
2748
2749 if (bh->is_tx()) {
2750 dirty_or_tx_bh.erase(bh);
2751 }
2752 bh_stat_sub(bh);
2753 if (get_stat_dirty_waiting() > 0)
2754 stat_cond.Signal();
2755}
2756