]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/ObjectCacher.cc
import 15.2.4
[ceph.git] / ceph / src / osdc / ObjectCacher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <limits.h>
5
6#include "msg/Messenger.h"
7#include "ObjectCacher.h"
8#include "WritebackHandler.h"
9#include "common/errno.h"
10#include "common/perf_counters.h"
11
11fdf7f2 12#include "include/ceph_assert.h"
7c673cae
FG
13
14#define MAX_FLUSH_UNDER_LOCK 20 ///< max bh's we start writeback on
f64942e4 15#define BUFFER_MEMORY_WEIGHT CEPH_PAGE_SHIFT // memory usage of BufferHead, count in (1<<n)
7c673cae
FG
16
17using std::chrono::seconds;
18 /// while holding the lock
19
20/*** ObjectCacher::BufferHead ***/
21
22
23/*** ObjectCacher::Object ***/
24
25#define dout_subsys ceph_subsys_objectcacher
26#undef dout_prefix
27#define dout_prefix *_dout << "objectcacher.object(" << oid << ") "
28
29
30
31class ObjectCacher::C_ReadFinish : public Context {
32 ObjectCacher *oc;
33 int64_t poolid;
34 sobject_t oid;
35 loff_t start;
36 uint64_t length;
37 xlist<C_ReadFinish*>::item set_item;
38 bool trust_enoent;
39 ceph_tid_t tid;
31f18b77 40 ZTracer::Trace trace;
7c673cae
FG
41
42public:
43 bufferlist bl;
44 C_ReadFinish(ObjectCacher *c, Object *ob, ceph_tid_t t, loff_t s,
31f18b77 45 uint64_t l, const ZTracer::Trace &trace) :
7c673cae
FG
46 oc(c), poolid(ob->oloc.pool), oid(ob->get_soid()), start(s), length(l),
47 set_item(this), trust_enoent(true),
31f18b77 48 tid(t), trace(trace) {
7c673cae
FG
49 ob->reads.push_back(&set_item);
50 }
51
52 void finish(int r) override {
53 oc->bh_read_finish(poolid, oid, tid, start, length, bl, r, trust_enoent);
31f18b77 54 trace.event("finish");
7c673cae
FG
55
56 // object destructor clears the list
57 if (set_item.is_on_list())
58 set_item.remove_myself();
59 }
60
61 void distrust_enoent() {
62 trust_enoent = false;
63 }
64};
65
66class ObjectCacher::C_RetryRead : public Context {
67 ObjectCacher *oc;
68 OSDRead *rd;
69 ObjectSet *oset;
70 Context *onfinish;
31f18b77 71 ZTracer::Trace trace;
7c673cae 72public:
31f18b77
FG
73 C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c,
74 const ZTracer::Trace &trace)
75 : oc(_oc), rd(r), oset(os), onfinish(c), trace(trace) {
76 }
7c673cae 77 void finish(int r) override {
31f18b77
FG
78 if (r >= 0) {
79 r = oc->_readx(rd, oset, onfinish, false, &trace);
80 }
81
82 if (r == 0) {
83 // read is still in-progress
7c673cae
FG
84 return;
85 }
31f18b77
FG
86
87 trace.event("finish");
88 if (onfinish) {
89 onfinish->complete(r);
7c673cae
FG
90 }
91 }
92};
93
94ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left,
95 loff_t off)
96{
9f95a23c 97 ceph_assert(ceph_mutex_is_locked(oc->lock));
7c673cae
FG
98 ldout(oc->cct, 20) << "split " << *left << " at " << off << dendl;
99
100 // split off right
101 ObjectCacher::BufferHead *right = new BufferHead(this);
102
103 //inherit and if later access, this auto clean.
104 right->set_dontneed(left->get_dontneed());
105 right->set_nocache(left->get_nocache());
106
107 right->last_write_tid = left->last_write_tid;
108 right->last_read_tid = left->last_read_tid;
109 right->set_state(left->get_state());
110 right->snapc = left->snapc;
111 right->set_journal_tid(left->journal_tid);
112
113 loff_t newleftlen = off - left->start();
114 right->set_start(off);
115 right->set_length(left->length() - newleftlen);
116
117 // shorten left
118 oc->bh_stat_sub(left);
119 left->set_length(newleftlen);
120 oc->bh_stat_add(left);
121
122 // add right
123 oc->bh_add(this, right);
124
125 // split buffers too
126 bufferlist bl;
127 bl.claim(left->bl);
128 if (bl.length()) {
11fdf7f2 129 ceph_assert(bl.length() == (left->length() + right->length()));
7c673cae
FG
130 right->bl.substr_of(bl, left->length(), right->length());
131 left->bl.substr_of(bl, 0, left->length());
132 }
133
134 // move read waiters
135 if (!left->waitfor_read.empty()) {
136 map<loff_t, list<Context*> >::iterator start_remove
137 = left->waitfor_read.begin();
138 while (start_remove != left->waitfor_read.end() &&
139 start_remove->first < right->start())
140 ++start_remove;
141 for (map<loff_t, list<Context*> >::iterator p = start_remove;
142 p != left->waitfor_read.end(); ++p) {
143 ldout(oc->cct, 20) << "split moving waiters at byte " << p->first
144 << " to right bh" << dendl;
145 right->waitfor_read[p->first].swap( p->second );
11fdf7f2 146 ceph_assert(p->second.empty());
7c673cae
FG
147 }
148 left->waitfor_read.erase(start_remove, left->waitfor_read.end());
149 }
150
151 ldout(oc->cct, 20) << "split left is " << *left << dendl;
152 ldout(oc->cct, 20) << "split right is " << *right << dendl;
153 return right;
154}
155
156
157void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
158{
9f95a23c 159 ceph_assert(ceph_mutex_is_locked(oc->lock));
7c673cae
FG
160
161 ldout(oc->cct, 10) << "merge_left " << *left << " + " << *right << dendl;
162 if (left->get_journal_tid() == 0) {
163 left->set_journal_tid(right->get_journal_tid());
164 }
165 right->set_journal_tid(0);
166
167 oc->bh_remove(this, right);
168 oc->bh_stat_sub(left);
169 left->set_length(left->length() + right->length());
170 oc->bh_stat_add(left);
171
172 // data
173 left->bl.claim_append(right->bl);
174
175 // version
176 // note: this is sorta busted, but should only be used for dirty buffers
11fdf7f2
TL
177 left->last_write_tid = std::max( left->last_write_tid, right->last_write_tid );
178 left->last_write = std::max( left->last_write, right->last_write );
7c673cae
FG
179
180 left->set_dontneed(right->get_dontneed() ? left->get_dontneed() : false);
181 left->set_nocache(right->get_nocache() ? left->get_nocache() : false);
182
183 // waiters
184 for (map<loff_t, list<Context*> >::iterator p = right->waitfor_read.begin();
185 p != right->waitfor_read.end();
186 ++p)
187 left->waitfor_read[p->first].splice(left->waitfor_read[p->first].begin(),
188 p->second );
189
190 // hose right
191 delete right;
192
193 ldout(oc->cct, 10) << "merge_left result " << *left << dendl;
194}
195
b32b8144
FG
196bool ObjectCacher::Object::can_merge_bh(BufferHead *left, BufferHead *right)
197{
198 if (left->end() != right->start() ||
199 left->get_state() != right->get_state() ||
200 !left->can_merge_journal(right))
201 return false;
202 if (left->is_tx() && left->last_write_tid != right->last_write_tid)
203 return false;
204 return true;
205}
206
7c673cae
FG
207void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
208{
9f95a23c 209 ceph_assert(ceph_mutex_is_locked(oc->lock));
7c673cae
FG
210 ldout(oc->cct, 10) << "try_merge_bh " << *bh << dendl;
211
212 // do not merge rx buffers; last_read_tid may not match
213 if (bh->is_rx())
214 return;
215
216 // to the left?
217 map<loff_t,BufferHead*>::iterator p = data.find(bh->start());
11fdf7f2 218 ceph_assert(p->second == bh);
7c673cae
FG
219 if (p != data.begin()) {
220 --p;
b32b8144 221 if (can_merge_bh(p->second, bh)) {
7c673cae
FG
222 merge_left(p->second, bh);
223 bh = p->second;
224 } else {
225 ++p;
226 }
227 }
228 // to the right?
11fdf7f2 229 ceph_assert(p->second == bh);
7c673cae 230 ++p;
b32b8144 231 if (p != data.end() && can_merge_bh(bh, p->second))
7c673cae 232 merge_left(bh, p->second);
f64942e4
AA
233
234 maybe_rebuild_buffer(bh);
235}
236
237void ObjectCacher::Object::maybe_rebuild_buffer(BufferHead *bh)
238{
239 auto& bl = bh->bl;
240 if (bl.get_num_buffers() <= 1)
241 return;
242
243 auto wasted = bl.get_wasted_space();
244 if (wasted * 2 > bl.length() &&
245 wasted > (1U << BUFFER_MEMORY_WEIGHT))
246 bl.rebuild();
7c673cae
FG
247}
248
249/*
250 * count bytes we have cached in given range
251 */
252bool ObjectCacher::Object::is_cached(loff_t cur, loff_t left) const
253{
9f95a23c 254 ceph_assert(ceph_mutex_is_locked(oc->lock));
7c673cae
FG
255 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(cur);
256 while (left > 0) {
257 if (p == data.end())
258 return false;
259
260 if (p->first <= cur) {
261 // have part of it
11fdf7f2 262 loff_t lenfromcur = std::min(p->second->end() - cur, left);
7c673cae
FG
263 cur += lenfromcur;
264 left -= lenfromcur;
265 ++p;
266 continue;
267 } else if (p->first > cur) {
268 // gap
269 return false;
270 } else
271 ceph_abort();
272 }
273
274 return true;
275}
276
277/*
278 * all cached data in this range[off, off+len]
279 */
280bool ObjectCacher::Object::include_all_cached_data(loff_t off, loff_t len)
281{
9f95a23c 282 ceph_assert(ceph_mutex_is_locked(oc->lock));
7c673cae
FG
283 if (data.empty())
284 return true;
285 map<loff_t, BufferHead*>::iterator first = data.begin();
286 map<loff_t, BufferHead*>::reverse_iterator last = data.rbegin();
287 if (first->second->start() >= off && last->second->end() <= (off + len))
288 return true;
289 else
290 return false;
291}
292
293/*
294 * map a range of bytes into buffer_heads.
295 * - create missing buffer_heads as necessary.
296 */
297int ObjectCacher::Object::map_read(ObjectExtent &ex,
298 map<loff_t, BufferHead*>& hits,
299 map<loff_t, BufferHead*>& missing,
300 map<loff_t, BufferHead*>& rx,
301 map<loff_t, BufferHead*>& errors)
302{
9f95a23c 303 ceph_assert(ceph_mutex_is_locked(oc->lock));
31f18b77
FG
304 ldout(oc->cct, 10) << "map_read " << ex.oid << " "
305 << ex.offset << "~" << ex.length << dendl;
306
7c673cae
FG
307 loff_t cur = ex.offset;
308 loff_t left = ex.length;
309
310 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
311 while (left > 0) {
312 // at end?
313 if (p == data.end()) {
314 // rest is a miss.
315 BufferHead *n = new BufferHead(this);
316 n->set_start(cur);
317 n->set_length(left);
318 oc->bh_add(this, n);
319 if (complete) {
320 oc->mark_zero(n);
321 hits[cur] = n;
322 ldout(oc->cct, 20) << "map_read miss+complete+zero " << left << " left, " << *n << dendl;
323 } else {
324 missing[cur] = n;
325 ldout(oc->cct, 20) << "map_read miss " << left << " left, " << *n << dendl;
326 }
327 cur += left;
11fdf7f2 328 ceph_assert(cur == (loff_t)ex.offset + (loff_t)ex.length);
7c673cae
FG
329 break; // no more.
330 }
31f18b77 331
7c673cae
FG
332 if (p->first <= cur) {
333 // have it (or part of it)
334 BufferHead *e = p->second;
335
336 if (e->is_clean() ||
337 e->is_dirty() ||
338 e->is_tx() ||
339 e->is_zero()) {
340 hits[cur] = e; // readable!
341 ldout(oc->cct, 20) << "map_read hit " << *e << dendl;
342 } else if (e->is_rx()) {
343 rx[cur] = e; // missing, not readable.
344 ldout(oc->cct, 20) << "map_read rx " << *e << dendl;
345 } else if (e->is_error()) {
346 errors[cur] = e;
347 ldout(oc->cct, 20) << "map_read error " << *e << dendl;
348 } else {
349 ceph_abort();
350 }
31f18b77 351
11fdf7f2 352 loff_t lenfromcur = std::min(e->end() - cur, left);
7c673cae
FG
353 cur += lenfromcur;
354 left -= lenfromcur;
355 ++p;
356 continue; // more?
31f18b77 357
7c673cae
FG
358 } else if (p->first > cur) {
359 // gap.. miss
360 loff_t next = p->first;
361 BufferHead *n = new BufferHead(this);
11fdf7f2 362 loff_t len = std::min(next - cur, left);
7c673cae
FG
363 n->set_start(cur);
364 n->set_length(len);
365 oc->bh_add(this,n);
366 if (complete) {
367 oc->mark_zero(n);
368 hits[cur] = n;
369 ldout(oc->cct, 20) << "map_read gap+complete+zero " << *n << dendl;
370 } else {
371 missing[cur] = n;
372 ldout(oc->cct, 20) << "map_read gap " << *n << dendl;
373 }
11fdf7f2
TL
374 cur += std::min(left, n->length());
375 left -= std::min(left, n->length());
7c673cae
FG
376 continue; // more?
377 } else {
378 ceph_abort();
379 }
380 }
381 return 0;
382}
383
384void ObjectCacher::Object::audit_buffers()
385{
386 loff_t offset = 0;
387 for (map<loff_t, BufferHead*>::const_iterator it = data.begin();
388 it != data.end(); ++it) {
389 if (it->first != it->second->start()) {
390 lderr(oc->cct) << "AUDIT FAILURE: map position " << it->first
391 << " does not match bh start position: "
392 << *it->second << dendl;
11fdf7f2 393 ceph_assert(it->first == it->second->start());
7c673cae
FG
394 }
395 if (it->first < offset) {
396 lderr(oc->cct) << "AUDIT FAILURE: " << it->first << " " << *it->second
397 << " overlaps with previous bh " << *((--it)->second)
398 << dendl;
11fdf7f2 399 ceph_assert(it->first >= offset);
7c673cae
FG
400 }
401 BufferHead *bh = it->second;
402 map<loff_t, list<Context*> >::const_iterator w_it;
403 for (w_it = bh->waitfor_read.begin();
404 w_it != bh->waitfor_read.end(); ++w_it) {
405 if (w_it->first < bh->start() ||
406 w_it->first >= bh->start() + bh->length()) {
407 lderr(oc->cct) << "AUDIT FAILURE: waiter at " << w_it->first
408 << " is not within bh " << *bh << dendl;
11fdf7f2
TL
409 ceph_assert(w_it->first >= bh->start());
410 ceph_assert(w_it->first < bh->start() + bh->length());
7c673cae
FG
411 }
412 }
413 offset = it->first + it->second->length();
414 }
415}
416
417/*
418 * map a range of extents on an object's buffer cache.
419 * - combine any bh's we're writing into one
420 * - break up bufferheads that don't fall completely within the range
421 * //no! - return a bh that includes the write. may also include
422 * other dirty data to left and/or right.
423 */
424ObjectCacher::BufferHead *ObjectCacher::Object::map_write(ObjectExtent &ex,
31f18b77 425 ceph_tid_t tid)
7c673cae 426{
9f95a23c 427 ceph_assert(ceph_mutex_is_locked(oc->lock));
7c673cae
FG
428 BufferHead *final = 0;
429
430 ldout(oc->cct, 10) << "map_write oex " << ex.oid
431 << " " << ex.offset << "~" << ex.length << dendl;
432
433 loff_t cur = ex.offset;
434 loff_t left = ex.length;
435
436 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
437 while (left > 0) {
438 loff_t max = left;
439
440 // at end ?
441 if (p == data.end()) {
442 if (final == NULL) {
443 final = new BufferHead(this);
444 replace_journal_tid(final, tid);
445 final->set_start( cur );
446 final->set_length( max );
447 oc->bh_add(this, final);
448 ldout(oc->cct, 10) << "map_write adding trailing bh " << *final << dendl;
449 } else {
450 oc->bh_stat_sub(final);
451 final->set_length(final->length() + max);
452 oc->bh_stat_add(final);
453 }
454 left -= max;
455 cur += max;
456 continue;
457 }
458
459 ldout(oc->cct, 10) << "cur is " << cur << ", p is " << *p->second << dendl;
460 //oc->verify_stats();
461
462 if (p->first <= cur) {
463 BufferHead *bh = p->second;
464 ldout(oc->cct, 10) << "map_write bh " << *bh << " intersected" << dendl;
465
466 if (p->first < cur) {
11fdf7f2 467 ceph_assert(final == 0);
7c673cae
FG
468 if (cur + max >= bh->end()) {
469 // we want right bit (one splice)
470 final = split(bh, cur); // just split it, take right half.
f64942e4 471 maybe_rebuild_buffer(bh);
7c673cae
FG
472 replace_journal_tid(final, tid);
473 ++p;
11fdf7f2 474 ceph_assert(p->second == final);
7c673cae
FG
475 } else {
476 // we want middle bit (two splices)
477 final = split(bh, cur);
f64942e4 478 maybe_rebuild_buffer(bh);
7c673cae 479 ++p;
11fdf7f2 480 ceph_assert(p->second == final);
f64942e4
AA
481 auto right = split(final, cur+max);
482 maybe_rebuild_buffer(right);
7c673cae
FG
483 replace_journal_tid(final, tid);
484 }
485 } else {
11fdf7f2 486 ceph_assert(p->first == cur);
7c673cae
FG
487 if (bh->length() <= max) {
488 // whole bufferhead, piece of cake.
489 } else {
490 // we want left bit (one splice)
f64942e4
AA
491 auto right = split(bh, cur + max); // just split
492 maybe_rebuild_buffer(right);
7c673cae
FG
493 }
494 if (final) {
495 oc->mark_dirty(bh);
496 oc->mark_dirty(final);
497 --p; // move iterator back to final
11fdf7f2 498 ceph_assert(p->second == final);
7c673cae
FG
499 replace_journal_tid(bh, tid);
500 merge_left(final, bh);
501 } else {
502 final = bh;
503 replace_journal_tid(final, tid);
504 }
505 }
506
507 // keep going.
508 loff_t lenfromcur = final->end() - cur;
509 cur += lenfromcur;
510 left -= lenfromcur;
511 ++p;
512 continue;
513 } else {
514 // gap!
515 loff_t next = p->first;
11fdf7f2 516 loff_t glen = std::min(next - cur, max);
7c673cae
FG
517 ldout(oc->cct, 10) << "map_write gap " << cur << "~" << glen << dendl;
518 if (final) {
519 oc->bh_stat_sub(final);
520 final->set_length(final->length() + glen);
521 oc->bh_stat_add(final);
522 } else {
523 final = new BufferHead(this);
524 replace_journal_tid(final, tid);
525 final->set_start( cur );
526 final->set_length( glen );
527 oc->bh_add(this, final);
528 }
529
530 cur += glen;
531 left -= glen;
532 continue; // more?
533 }
534 }
535
536 // set version
11fdf7f2
TL
537 ceph_assert(final);
538 ceph_assert(final->get_journal_tid() == tid);
7c673cae
FG
539 ldout(oc->cct, 10) << "map_write final is " << *final << dendl;
540
541 return final;
542}
543
544void ObjectCacher::Object::replace_journal_tid(BufferHead *bh,
545 ceph_tid_t tid) {
546 ceph_tid_t bh_tid = bh->get_journal_tid();
547
11fdf7f2 548 ceph_assert(tid == 0 || bh_tid <= tid);
7c673cae
FG
549 if (bh_tid != 0 && bh_tid != tid) {
550 // inform journal that it should not expect a writeback from this extent
551 oc->writeback_handler.overwrite_extent(get_oid(), bh->start(),
552 bh->length(), bh_tid, tid);
553 }
554 bh->set_journal_tid(tid);
555}
556
557void ObjectCacher::Object::truncate(loff_t s)
558{
9f95a23c 559 ceph_assert(ceph_mutex_is_locked(oc->lock));
7c673cae
FG
560 ldout(oc->cct, 10) << "truncate " << *this << " to " << s << dendl;
561
562 while (!data.empty()) {
563 BufferHead *bh = data.rbegin()->second;
564 if (bh->end() <= s)
565 break;
566
567 // split bh at truncation point?
568 if (bh->start() < s) {
569 split(bh, s);
f64942e4 570 maybe_rebuild_buffer(bh);
7c673cae
FG
571 continue;
572 }
573
574 // remove bh entirely
11fdf7f2
TL
575 ceph_assert(bh->start() >= s);
576 ceph_assert(bh->waitfor_read.empty());
7c673cae
FG
577 replace_journal_tid(bh, 0);
578 oc->bh_remove(this, bh);
579 delete bh;
580 }
581}
582
28e407b8
AA
583void ObjectCacher::Object::discard(loff_t off, loff_t len,
584 C_GatherBuilder* commit_gather)
7c673cae 585{
9f95a23c 586 ceph_assert(ceph_mutex_is_locked(oc->lock));
7c673cae
FG
587 ldout(oc->cct, 10) << "discard " << *this << " " << off << "~" << len
588 << dendl;
589
590 if (!exists) {
591 ldout(oc->cct, 10) << " setting exists on " << *this << dendl;
592 exists = true;
593 }
594 if (complete) {
595 ldout(oc->cct, 10) << " clearing complete on " << *this << dendl;
596 complete = false;
597 }
598
599 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(off);
600 while (p != data.end()) {
601 BufferHead *bh = p->second;
602 if (bh->start() >= off + len)
603 break;
604
605 // split bh at truncation point?
606 if (bh->start() < off) {
607 split(bh, off);
f64942e4 608 maybe_rebuild_buffer(bh);
7c673cae
FG
609 ++p;
610 continue;
611 }
612
11fdf7f2 613 ceph_assert(bh->start() >= off);
7c673cae 614 if (bh->end() > off + len) {
f64942e4
AA
615 auto right = split(bh, off + len);
616 maybe_rebuild_buffer(right);
7c673cae
FG
617 }
618
619 ++p;
620 ldout(oc->cct, 10) << "discard " << *this << " bh " << *bh << dendl;
7c673cae 621 replace_journal_tid(bh, 0);
28e407b8
AA
622
623 if (bh->is_tx() && commit_gather != nullptr) {
624 // wait for the writeback to commit
625 waitfor_commit[bh->last_write_tid].emplace_back(commit_gather->new_sub());
626 } else if (bh->is_rx()) {
627 // cannot remove bh with in-flight read, but we can ensure the
628 // read won't overwrite the discard
629 bh->last_read_tid = ++oc->last_read_tid;
630 bh->bl.clear();
631 bh->set_nocache(true);
632 oc->mark_zero(bh);
633 // we should mark all Rx bh to zero
634 continue;
635 } else {
11fdf7f2 636 ceph_assert(bh->waitfor_read.empty());
28e407b8
AA
637 }
638
7c673cae
FG
639 oc->bh_remove(this, bh);
640 delete bh;
641 }
642}
643
644
645
646/*** ObjectCacher ***/
647
648#undef dout_prefix
649#define dout_prefix *_dout << "objectcacher "
650
651
652ObjectCacher::ObjectCacher(CephContext *cct_, string name,
9f95a23c 653 WritebackHandler& wb, ceph::mutex& l,
7c673cae
FG
654 flush_set_callback_t flush_callback,
655 void *flush_callback_arg, uint64_t max_bytes,
656 uint64_t max_objects, uint64_t max_dirty,
657 uint64_t target_dirty, double max_dirty_age,
658 bool block_writes_upfront)
659 : perfcounter(NULL),
660 cct(cct_), writeback_handler(wb), name(name), lock(l),
661 max_dirty(max_dirty), target_dirty(target_dirty),
662 max_size(max_bytes), max_objects(max_objects),
663 max_dirty_age(ceph::make_timespan(max_dirty_age)),
664 block_writes_upfront(block_writes_upfront),
31f18b77 665 trace_endpoint("ObjectCacher"),
7c673cae
FG
666 flush_set_callback(flush_callback),
667 flush_set_callback_arg(flush_callback_arg),
668 last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct),
669 stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
3efd9988
FG
670 stat_missing(0), stat_error(0), stat_dirty_waiting(0),
671 stat_nr_dirty_waiters(0), reads_outstanding(0)
7c673cae
FG
672{
673 perf_start();
674 finisher.start();
675 scattered_write = writeback_handler.can_scattered_write();
676}
677
678ObjectCacher::~ObjectCacher()
679{
680 finisher.stop();
681 perf_stop();
682 // we should be empty.
683 for (vector<ceph::unordered_map<sobject_t, Object *> >::iterator i
684 = objects.begin();
685 i != objects.end();
686 ++i)
11fdf7f2
TL
687 ceph_assert(i->empty());
688 ceph_assert(bh_lru_rest.lru_get_size() == 0);
689 ceph_assert(bh_lru_dirty.lru_get_size() == 0);
690 ceph_assert(ob_lru.lru_get_size() == 0);
691 ceph_assert(dirty_or_tx_bh.empty());
7c673cae
FG
692}
693
694void ObjectCacher::perf_start()
695{
696 string n = "objectcacher-" + name;
697 PerfCountersBuilder plb(cct, n, l_objectcacher_first, l_objectcacher_last);
698
699 plb.add_u64_counter(l_objectcacher_cache_ops_hit,
700 "cache_ops_hit", "Hit operations");
701 plb.add_u64_counter(l_objectcacher_cache_ops_miss,
702 "cache_ops_miss", "Miss operations");
703 plb.add_u64_counter(l_objectcacher_cache_bytes_hit,
11fdf7f2 704 "cache_bytes_hit", "Hit data", NULL, 0, unit_t(UNIT_BYTES));
7c673cae 705 plb.add_u64_counter(l_objectcacher_cache_bytes_miss,
11fdf7f2 706 "cache_bytes_miss", "Miss data", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
707 plb.add_u64_counter(l_objectcacher_data_read,
708 "data_read", "Read data");
709 plb.add_u64_counter(l_objectcacher_data_written,
710 "data_written", "Data written to cache");
711 plb.add_u64_counter(l_objectcacher_data_flushed,
712 "data_flushed", "Data flushed");
713 plb.add_u64_counter(l_objectcacher_overwritten_in_flush,
714 "data_overwritten_while_flushing",
715 "Data overwritten while flushing");
716 plb.add_u64_counter(l_objectcacher_write_ops_blocked, "write_ops_blocked",
717 "Write operations, delayed due to dirty limits");
718 plb.add_u64_counter(l_objectcacher_write_bytes_blocked,
719 "write_bytes_blocked",
11fdf7f2 720 "Write data blocked on dirty limit", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
721 plb.add_time(l_objectcacher_write_time_blocked, "write_time_blocked",
722 "Time spent blocking a write due to dirty limits");
723
724 perfcounter = plb.create_perf_counters();
725 cct->get_perfcounters_collection()->add(perfcounter);
726}
727
728void ObjectCacher::perf_stop()
729{
11fdf7f2 730 ceph_assert(perfcounter);
7c673cae
FG
731 cct->get_perfcounters_collection()->remove(perfcounter);
732 delete perfcounter;
733}
734
735/* private */
736ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid,
737 uint64_t object_no,
738 ObjectSet *oset,
739 object_locator_t &l,
740 uint64_t truncate_size,
741 uint64_t truncate_seq)
742{
743 // XXX: Add handling of nspace in object_locator_t in cache
9f95a23c 744 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
745 // have it?
746 if ((uint32_t)l.pool < objects.size()) {
747 if (objects[l.pool].count(oid)) {
748 Object *o = objects[l.pool][oid];
749 o->object_no = object_no;
750 o->truncate_size = truncate_size;
751 o->truncate_seq = truncate_seq;
752 return o;
753 }
754 } else {
755 objects.resize(l.pool+1);
756 }
757
758 // create it.
759 Object *o = new Object(this, oid, object_no, oset, l, truncate_size,
760 truncate_seq);
761 objects[l.pool][oid] = o;
762 ob_lru.lru_insert_top(o);
763 return o;
764}
765
766void ObjectCacher::close_object(Object *ob)
767{
9f95a23c 768 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae 769 ldout(cct, 10) << "close_object " << *ob << dendl;
11fdf7f2 770 ceph_assert(ob->can_close());
7c673cae
FG
771
772 // ok!
773 ob_lru.lru_remove(ob);
774 objects[ob->oloc.pool].erase(ob->get_soid());
775 ob->set_item.remove_myself();
776 delete ob;
777}
778
31f18b77
FG
779void ObjectCacher::bh_read(BufferHead *bh, int op_flags,
780 const ZTracer::Trace &parent_trace)
7c673cae 781{
9f95a23c 782 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
783 ldout(cct, 7) << "bh_read on " << *bh << " outstanding reads "
784 << reads_outstanding << dendl;
785
31f18b77
FG
786 ZTracer::Trace trace;
787 if (parent_trace.valid()) {
788 trace.init("", &trace_endpoint, &parent_trace);
789 trace.copy_name("bh_read " + bh->ob->get_oid().name);
790 trace.event("start");
791 }
792
7c673cae
FG
793 mark_rx(bh);
794 bh->last_read_tid = ++last_read_tid;
795
796 // finisher
797 C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob, bh->last_read_tid,
31f18b77 798 bh->start(), bh->length(), trace);
7c673cae
FG
799 // go
800 writeback_handler.read(bh->ob->get_oid(), bh->ob->get_object_number(),
801 bh->ob->get_oloc(), bh->start(), bh->length(),
802 bh->ob->get_snap(), &onfinish->bl,
803 bh->ob->truncate_size, bh->ob->truncate_seq,
31f18b77 804 op_flags, trace, onfinish);
7c673cae
FG
805
806 ++reads_outstanding;
807}
808
809void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid,
810 ceph_tid_t tid, loff_t start,
811 uint64_t length, bufferlist &bl, int r,
812 bool trust_enoent)
813{
9f95a23c 814 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
815 ldout(cct, 7) << "bh_read_finish "
816 << oid
817 << " tid " << tid
818 << " " << start << "~" << length
819 << " (bl is " << bl.length() << ")"
820 << " returned " << r
821 << " outstanding reads " << reads_outstanding
822 << dendl;
823
824 if (r >= 0 && bl.length() < length) {
825 ldout(cct, 7) << "bh_read_finish " << oid << " padding " << start << "~"
826 << length << " with " << length - bl.length() << " bytes of zeroes"
827 << dendl;
828 bl.append_zero(length - bl.length());
829 }
830
831 list<Context*> ls;
832 int err = 0;
833
834 if (objects[poolid].count(oid) == 0) {
835 ldout(cct, 7) << "bh_read_finish no object cache" << dendl;
836 } else {
837 Object *ob = objects[poolid][oid];
838
839 if (r == -ENOENT && !ob->complete) {
840 // wake up *all* rx waiters, or else we risk reordering
841 // identical reads. e.g.
842 // read 1~1
843 // reply to unrelated 3~1 -> !exists
844 // read 1~1 -> immediate ENOENT
845 // reply to first 1~1 -> ooo ENOENT
846 bool allzero = true;
847 for (map<loff_t, BufferHead*>::iterator p = ob->data.begin();
848 p != ob->data.end(); ++p) {
849 BufferHead *bh = p->second;
850 for (map<loff_t, list<Context*> >::iterator p
851 = bh->waitfor_read.begin();
852 p != bh->waitfor_read.end();
853 ++p)
854 ls.splice(ls.end(), p->second);
855 bh->waitfor_read.clear();
856 if (!bh->is_zero() && !bh->is_rx())
857 allzero = false;
858 }
859
860 // just pass through and retry all waiters if we don't trust
861 // -ENOENT for this read
862 if (trust_enoent) {
863 ldout(cct, 7)
864 << "bh_read_finish ENOENT, marking complete and !exists on " << *ob
865 << dendl;
866 ob->complete = true;
867 ob->exists = false;
868
869 /* If all the bhs are effectively zero, get rid of them. All
870 * the waiters will be retried and get -ENOENT immediately, so
871 * it's safe to clean up the unneeded bh's now. Since we know
872 * it's safe to remove them now, do so, so they aren't hanging
873 *around waiting for more -ENOENTs from rados while the cache
874 * is being shut down.
875 *
876 * Only do this when all the bhs are rx or clean, to match the
877 * condition in _readx(). If there are any non-rx or non-clean
878 * bhs, _readx() will wait for the final result instead of
879 * returning -ENOENT immediately.
880 */
881 if (allzero) {
882 ldout(cct, 10)
883 << "bh_read_finish ENOENT and allzero, getting rid of "
884 << "bhs for " << *ob << dendl;
885 map<loff_t, BufferHead*>::iterator p = ob->data.begin();
886 while (p != ob->data.end()) {
887 BufferHead *bh = p->second;
888 // current iterator will be invalidated by bh_remove()
889 ++p;
890 bh_remove(ob, bh);
891 delete bh;
892 }
893 }
894 }
895 }
896
897 // apply to bh's!
898 loff_t opos = start;
899 while (true) {
900 map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(opos);
901 if (p == ob->data.end())
902 break;
903 if (opos >= start+(loff_t)length) {
904 ldout(cct, 20) << "break due to opos " << opos << " >= start+length "
905 << start << "+" << length << "=" << start+(loff_t)length
906 << dendl;
907 break;
908 }
909
910 BufferHead *bh = p->second;
911 ldout(cct, 20) << "checking bh " << *bh << dendl;
912
913 // finishers?
914 for (map<loff_t, list<Context*> >::iterator it
915 = bh->waitfor_read.begin();
916 it != bh->waitfor_read.end();
917 ++it)
918 ls.splice(ls.end(), it->second);
919 bh->waitfor_read.clear();
920
921 if (bh->start() > opos) {
922 ldout(cct, 1) << "bh_read_finish skipping gap "
923 << opos << "~" << bh->start() - opos
924 << dendl;
925 opos = bh->start();
926 continue;
927 }
928
929 if (!bh->is_rx()) {
930 ldout(cct, 10) << "bh_read_finish skipping non-rx " << *bh << dendl;
931 opos = bh->end();
932 continue;
933 }
934
935 if (bh->last_read_tid != tid) {
936 ldout(cct, 10) << "bh_read_finish bh->last_read_tid "
937 << bh->last_read_tid << " != tid " << tid
938 << ", skipping" << dendl;
939 opos = bh->end();
940 continue;
941 }
942
11fdf7f2
TL
943 ceph_assert(opos >= bh->start());
944 ceph_assert(bh->start() == opos); // we don't merge rx bh's... yet!
945 ceph_assert(bh->length() <= start+(loff_t)length-opos);
7c673cae
FG
946
947 if (bh->error < 0)
948 err = bh->error;
949
950 opos = bh->end();
951
952 if (r == -ENOENT) {
953 if (trust_enoent) {
954 ldout(cct, 10) << "bh_read_finish removing " << *bh << dendl;
955 bh_remove(ob, bh);
956 delete bh;
957 } else {
958 ldout(cct, 10) << "skipping unstrusted -ENOENT and will retry for "
959 << *bh << dendl;
960 }
961 continue;
962 }
963
964 if (r < 0) {
965 bh->error = r;
966 mark_error(bh);
967 } else {
968 bh->bl.substr_of(bl,
969 bh->start() - start,
970 bh->length());
971 mark_clean(bh);
972 }
973
974 ldout(cct, 10) << "bh_read_finish read " << *bh << dendl;
975
976 ob->try_merge_bh(bh);
977 }
978 }
979
980 // called with lock held.
981 ldout(cct, 20) << "finishing waiters " << ls << dendl;
982
983 finish_contexts(cct, ls, err);
984 retry_waiting_reads();
985
986 --reads_outstanding;
9f95a23c 987 read_cond.notify_all();
7c673cae
FG
988}
989
990void ObjectCacher::bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
991 int64_t *max_amount, int *max_count)
992{
993 list<BufferHead*> blist;
994
995 int count = 0;
996 int64_t total_len = 0;
997 set<BufferHead*, BufferHead::ptr_lt>::iterator it = dirty_or_tx_bh.find(bh);
11fdf7f2 998 ceph_assert(it != dirty_or_tx_bh.end());
7c673cae
FG
999 for (set<BufferHead*, BufferHead::ptr_lt>::iterator p = it;
1000 p != dirty_or_tx_bh.end();
1001 ++p) {
1002 BufferHead *obh = *p;
1003 if (obh->ob != bh->ob)
1004 break;
1005 if (obh->is_dirty() && obh->last_write <= cutoff) {
1006 blist.push_back(obh);
1007 ++count;
1008 total_len += obh->length();
1009 if ((max_count && count > *max_count) ||
1010 (max_amount && total_len > *max_amount))
1011 break;
1012 }
1013 }
1014
1015 while (it != dirty_or_tx_bh.begin()) {
1016 --it;
1017 BufferHead *obh = *it;
1018 if (obh->ob != bh->ob)
1019 break;
1020 if (obh->is_dirty() && obh->last_write <= cutoff) {
1021 blist.push_front(obh);
1022 ++count;
1023 total_len += obh->length();
1024 if ((max_count && count > *max_count) ||
1025 (max_amount && total_len > *max_amount))
1026 break;
1027 }
1028 }
1029 if (max_count)
1030 *max_count -= count;
1031 if (max_amount)
1032 *max_amount -= total_len;
1033
1034 bh_write_scattered(blist);
1035}
1036
1037class ObjectCacher::C_WriteCommit : public Context {
1038 ObjectCacher *oc;
1039 int64_t poolid;
1040 sobject_t oid;
1041 vector<pair<loff_t, uint64_t> > ranges;
31f18b77 1042 ZTracer::Trace trace;
7c673cae 1043public:
31f18b77 1044 ceph_tid_t tid = 0;
7c673cae 1045 C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, loff_t s,
31f18b77
FG
1046 uint64_t l, const ZTracer::Trace &trace) :
1047 oc(c), poolid(_poolid), oid(o), trace(trace) {
7c673cae
FG
1048 ranges.push_back(make_pair(s, l));
1049 }
1050 C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o,
1051 vector<pair<loff_t, uint64_t> >& _ranges) :
1052 oc(c), poolid(_poolid), oid(o), tid(0) {
1053 ranges.swap(_ranges);
1054 }
1055 void finish(int r) override {
1056 oc->bh_write_commit(poolid, oid, ranges, tid, r);
31f18b77 1057 trace.event("finish");
7c673cae
FG
1058 }
1059};
1060void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
1061{
9f95a23c 1062 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
1063
1064 Object *ob = blist.front()->ob;
1065 ob->get();
1066
1067 ceph::real_time last_write;
1068 SnapContext snapc;
1069 vector<pair<loff_t, uint64_t> > ranges;
1070 vector<pair<uint64_t, bufferlist> > io_vec;
1071
1072 ranges.reserve(blist.size());
1073 io_vec.reserve(blist.size());
1074
1075 uint64_t total_len = 0;
1076 for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1077 BufferHead *bh = *p;
1078 ldout(cct, 7) << "bh_write_scattered " << *bh << dendl;
11fdf7f2
TL
1079 ceph_assert(bh->ob == ob);
1080 ceph_assert(bh->bl.length() == bh->length());
7c673cae
FG
1081 ranges.push_back(pair<loff_t, uint64_t>(bh->start(), bh->length()));
1082
1083 int n = io_vec.size();
1084 io_vec.resize(n + 1);
1085 io_vec[n].first = bh->start();
1086 io_vec[n].second = bh->bl;
1087
1088 total_len += bh->length();
1089 if (bh->snapc.seq > snapc.seq)
1090 snapc = bh->snapc;
1091 if (bh->last_write > last_write)
1092 last_write = bh->last_write;
1093 }
1094
1095 C_WriteCommit *oncommit = new C_WriteCommit(this, ob->oloc.pool, ob->get_soid(), ranges);
1096
1097 ceph_tid_t tid = writeback_handler.write(ob->get_oid(), ob->get_oloc(),
1098 io_vec, snapc, last_write,
1099 ob->truncate_size, ob->truncate_seq,
1100 oncommit);
1101 oncommit->tid = tid;
1102 ob->last_write_tid = tid;
1103 for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1104 BufferHead *bh = *p;
1105 bh->last_write_tid = tid;
1106 mark_tx(bh);
1107 }
1108
1109 if (perfcounter)
1110 perfcounter->inc(l_objectcacher_data_flushed, total_len);
1111}
1112
31f18b77 1113void ObjectCacher::bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace)
7c673cae 1114{
9f95a23c 1115 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
1116 ldout(cct, 7) << "bh_write " << *bh << dendl;
1117
1118 bh->ob->get();
1119
31f18b77
FG
1120 ZTracer::Trace trace;
1121 if (parent_trace.valid()) {
1122 trace.init("", &trace_endpoint, &parent_trace);
1123 trace.copy_name("bh_write " + bh->ob->get_oid().name);
1124 trace.event("start");
1125 }
1126
7c673cae
FG
1127 // finishers
1128 C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->oloc.pool,
1129 bh->ob->get_soid(), bh->start(),
31f18b77 1130 bh->length(), trace);
7c673cae
FG
1131 // go
1132 ceph_tid_t tid = writeback_handler.write(bh->ob->get_oid(),
1133 bh->ob->get_oloc(),
1134 bh->start(), bh->length(),
1135 bh->snapc, bh->bl, bh->last_write,
1136 bh->ob->truncate_size,
1137 bh->ob->truncate_seq,
31f18b77 1138 bh->journal_tid, trace, oncommit);
7c673cae
FG
1139 ldout(cct, 20) << " tid " << tid << " on " << bh->ob->get_oid() << dendl;
1140
1141 // set bh last_write_tid
1142 oncommit->tid = tid;
1143 bh->ob->last_write_tid = tid;
1144 bh->last_write_tid = tid;
1145
1146 if (perfcounter) {
1147 perfcounter->inc(l_objectcacher_data_flushed, bh->length());
1148 }
1149
1150 mark_tx(bh);
1151}
1152
1153void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid,
1154 vector<pair<loff_t, uint64_t> >& ranges,
1155 ceph_tid_t tid, int r)
1156{
9f95a23c 1157 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
1158 ldout(cct, 7) << "bh_write_commit " << oid << " tid " << tid
1159 << " ranges " << ranges << " returned " << r << dendl;
1160
1161 if (objects[poolid].count(oid) == 0) {
1162 ldout(cct, 7) << "bh_write_commit no object cache" << dendl;
1163 return;
1164 }
1165
1166 Object *ob = objects[poolid][oid];
1167 int was_dirty_or_tx = ob->oset->dirty_or_tx;
1168
1169 for (vector<pair<loff_t, uint64_t> >::iterator p = ranges.begin();
1170 p != ranges.end();
1171 ++p) {
1172 loff_t start = p->first;
1173 uint64_t length = p->second;
1174 if (!ob->exists) {
1175 ldout(cct, 10) << "bh_write_commit marking exists on " << *ob << dendl;
1176 ob->exists = true;
1177
1178 if (writeback_handler.may_copy_on_write(ob->get_oid(), start, length,
1179 ob->get_snap())) {
1180 ldout(cct, 10) << "bh_write_commit may copy on write, clearing "
1181 "complete on " << *ob << dendl;
1182 ob->complete = false;
1183 }
1184 }
1185
1186 vector<pair<loff_t, BufferHead*>> hit;
1187 // apply to bh's!
1188 for (map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(start);
1189 p != ob->data.end();
1190 ++p) {
1191 BufferHead *bh = p->second;
1192
b32b8144 1193 if (bh->start() >= start+(loff_t)length)
7c673cae
FG
1194 break;
1195
7c673cae
FG
1196 // make sure bh is tx
1197 if (!bh->is_tx()) {
1198 ldout(cct, 10) << "bh_write_commit skipping non-tx " << *bh << dendl;
1199 continue;
1200 }
1201
1202 // make sure bh tid matches
1203 if (bh->last_write_tid != tid) {
11fdf7f2 1204 ceph_assert(bh->last_write_tid > tid);
7c673cae
FG
1205 ldout(cct, 10) << "bh_write_commit newer tid on " << *bh << dendl;
1206 continue;
1207 }
1208
b32b8144 1209 // we don't merge tx buffers. tx buffer should be within the range
11fdf7f2
TL
1210 ceph_assert(bh->start() >= start);
1211 ceph_assert(bh->end() <= start+(loff_t)length);
b32b8144 1212
7c673cae
FG
1213 if (r >= 0) {
1214 // ok! mark bh clean and error-free
1215 mark_clean(bh);
1216 bh->set_journal_tid(0);
1217 if (bh->get_nocache())
1218 bh_lru_rest.lru_bottouch(bh);
1219 hit.push_back(make_pair(bh->start(), bh));
1220 ldout(cct, 10) << "bh_write_commit clean " << *bh << dendl;
1221 } else {
1222 mark_dirty(bh);
1223 ldout(cct, 10) << "bh_write_commit marking dirty again due to error "
1224 << *bh << " r = " << r << " " << cpp_strerror(-r)
1225 << dendl;
1226 }
1227 }
1228
1229 for (auto& p : hit) {
1230 //p.second maybe merged and deleted in merge_left
1231 if (ob->data.count(p.first))
1232 ob->try_merge_bh(p.second);
1233 }
1234 }
1235
1236 // update last_commit.
11fdf7f2 1237 ceph_assert(ob->last_commit_tid < tid);
7c673cae
FG
1238 ob->last_commit_tid = tid;
1239
1240 // waiters?
1241 list<Context*> ls;
1242 if (ob->waitfor_commit.count(tid)) {
1243 ls.splice(ls.begin(), ob->waitfor_commit[tid]);
1244 ob->waitfor_commit.erase(tid);
1245 }
1246
1247 // is the entire object set now clean and fully committed?
1248 ObjectSet *oset = ob->oset;
1249 ob->put();
1250
1251 if (flush_set_callback &&
1252 was_dirty_or_tx > 0 &&
1253 oset->dirty_or_tx == 0) { // nothing dirty/tx
1254 flush_set_callback(flush_set_callback_arg, oset);
1255 }
1256
1257 if (!ls.empty())
1258 finish_contexts(cct, ls, r);
1259}
1260
31f18b77 1261void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount)
7c673cae 1262{
11fdf7f2 1263 ceph_assert(trace != nullptr);
9f95a23c 1264 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
1265 ceph::real_time cutoff = ceph::real_clock::now();
1266
1267 ldout(cct, 10) << "flush " << amount << dendl;
1268
1269 /*
1270 * NOTE: we aren't actually pulling things off the LRU here, just
1271 * looking at the tail item. Then we call bh_write, which moves it
1272 * to the other LRU, so that we can call
1273 * lru_dirty.lru_get_next_expire() again.
1274 */
1275 int64_t left = amount;
1276 while (amount == 0 || left > 0) {
1277 BufferHead *bh = static_cast<BufferHead*>(
1278 bh_lru_dirty.lru_get_next_expire());
1279 if (!bh) break;
1280 if (bh->last_write > cutoff) break;
1281
1282 if (scattered_write) {
1283 bh_write_adjacencies(bh, cutoff, amount > 0 ? &left : NULL, NULL);
1284 } else {
1285 left -= bh->length();
31f18b77 1286 bh_write(bh, *trace);
7c673cae 1287 }
31f18b77 1288 }
7c673cae
FG
1289}
1290
1291
1292void ObjectCacher::trim()
1293{
9f95a23c 1294 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
1295 ldout(cct, 10) << "trim start: bytes: max " << max_size << " clean "
1296 << get_stat_clean() << ", objects: max " << max_objects
1297 << " current " << ob_lru.lru_get_size() << dendl;
1298
3efd9988
FG
1299 uint64_t max_clean_bh = max_size >> BUFFER_MEMORY_WEIGHT;
1300 uint64_t nr_clean_bh = bh_lru_rest.lru_get_size() - bh_lru_rest.lru_get_num_pinned();
1301 while (get_stat_clean() > 0 &&
1302 ((uint64_t)get_stat_clean() > max_size ||
1303 nr_clean_bh > max_clean_bh)) {
7c673cae
FG
1304 BufferHead *bh = static_cast<BufferHead*>(bh_lru_rest.lru_expire());
1305 if (!bh)
1306 break;
1307
1308 ldout(cct, 10) << "trim trimming " << *bh << dendl;
11fdf7f2 1309 ceph_assert(bh->is_clean() || bh->is_zero() || bh->is_error());
7c673cae
FG
1310
1311 Object *ob = bh->ob;
1312 bh_remove(ob, bh);
1313 delete bh;
1314
3efd9988
FG
1315 --nr_clean_bh;
1316
7c673cae
FG
1317 if (ob->complete) {
1318 ldout(cct, 10) << "trim clearing complete on " << *ob << dendl;
1319 ob->complete = false;
1320 }
1321 }
1322
1323 while (ob_lru.lru_get_size() > max_objects) {
1324 Object *ob = static_cast<Object*>(ob_lru.lru_expire());
1325 if (!ob)
1326 break;
1327
1328 ldout(cct, 10) << "trim trimming " << *ob << dendl;
1329 close_object(ob);
1330 }
1331
1332 ldout(cct, 10) << "trim finish: max " << max_size << " clean "
1333 << get_stat_clean() << ", objects: max " << max_objects
1334 << " current " << ob_lru.lru_get_size() << dendl;
1335}
1336
1337
1338
1339/* public */
1340
1341bool ObjectCacher::is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
1342 snapid_t snapid)
1343{
9f95a23c 1344 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
1345 for (vector<ObjectExtent>::iterator ex_it = extents.begin();
1346 ex_it != extents.end();
1347 ++ex_it) {
1348 ldout(cct, 10) << "is_cached " << *ex_it << dendl;
1349
1350 // get Object cache
1351 sobject_t soid(ex_it->oid, snapid);
1352 Object *o = get_object_maybe(soid, ex_it->oloc);
1353 if (!o)
1354 return false;
1355 if (!o->is_cached(ex_it->offset, ex_it->length))
1356 return false;
1357 }
1358 return true;
1359}
1360
1361
1362/*
1363 * returns # bytes read (if in cache). onfinish is untouched (caller
1364 * must delete it)
1365 * returns 0 if doing async read
1366 */
31f18b77
FG
1367int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
1368 ZTracer::Trace *parent_trace)
7c673cae 1369{
31f18b77
FG
1370 ZTracer::Trace trace;
1371 if (parent_trace != nullptr) {
1372 trace.init("read", &trace_endpoint, parent_trace);
1373 trace.event("start");
1374 }
1375
1376 int r =_readx(rd, oset, onfinish, true, &trace);
1377 if (r < 0) {
1378 trace.event("finish");
1379 }
1380 return r;
7c673cae
FG
1381}
1382
1383int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
31f18b77 1384 bool external_call, ZTracer::Trace *trace)
7c673cae 1385{
11fdf7f2 1386 ceph_assert(trace != nullptr);
9f95a23c 1387 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
1388 bool success = true;
1389 int error = 0;
1390 uint64_t bytes_in_cache = 0;
1391 uint64_t bytes_not_in_cache = 0;
1392 uint64_t total_bytes_read = 0;
1393 map<uint64_t, bufferlist> stripe_map; // final buffer offset -> substring
1394 bool dontneed = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1395 bool nocache = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1396
1397 /*
1398 * WARNING: we can only meaningfully return ENOENT if the read request
1399 * passed in a single ObjectExtent. Any caller who wants ENOENT instead of
1400 * zeroed buffers needs to feed single extents into readx().
1401 */
11fdf7f2 1402 ceph_assert(!oset->return_enoent || rd->extents.size() == 1);
7c673cae
FG
1403
1404 for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
1405 ex_it != rd->extents.end();
1406 ++ex_it) {
1407 ldout(cct, 10) << "readx " << *ex_it << dendl;
1408
1409 total_bytes_read += ex_it->length;
1410
1411 // get Object cache
1412 sobject_t soid(ex_it->oid, rd->snap);
1413 Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1414 ex_it->truncate_size, oset->truncate_seq);
1415 if (external_call)
1416 touch_ob(o);
1417
1418 // does not exist and no hits?
1419 if (oset->return_enoent && !o->exists) {
1420 ldout(cct, 10) << "readx object !exists, 1 extent..." << dendl;
1421
1422 // should we worry about COW underneath us?
1423 if (writeback_handler.may_copy_on_write(soid.oid, ex_it->offset,
1424 ex_it->length, soid.snap)) {
1425 ldout(cct, 20) << "readx may copy on write" << dendl;
1426 bool wait = false;
1427 list<BufferHead*> blist;
1428 for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1429 bh_it != o->data.end();
1430 ++bh_it) {
1431 BufferHead *bh = bh_it->second;
1432 if (bh->is_dirty() || bh->is_tx()) {
1433 ldout(cct, 10) << "readx flushing " << *bh << dendl;
1434 wait = true;
1435 if (bh->is_dirty()) {
1436 if (scattered_write)
1437 blist.push_back(bh);
1438 else
31f18b77 1439 bh_write(bh, *trace);
7c673cae
FG
1440 }
1441 }
1442 }
1443 if (scattered_write && !blist.empty())
1444 bh_write_scattered(blist);
1445 if (wait) {
1446 ldout(cct, 10) << "readx waiting on tid " << o->last_write_tid
1447 << " on " << *o << dendl;
1448 o->waitfor_commit[o->last_write_tid].push_back(
31f18b77 1449 new C_RetryRead(this,rd, oset, onfinish, *trace));
7c673cae
FG
1450 // FIXME: perfcounter!
1451 return 0;
1452 }
1453 }
1454
1455 // can we return ENOENT?
1456 bool allzero = true;
1457 for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1458 bh_it != o->data.end();
1459 ++bh_it) {
1460 ldout(cct, 20) << "readx ob has bh " << *bh_it->second << dendl;
1461 if (!bh_it->second->is_zero() && !bh_it->second->is_rx()) {
1462 allzero = false;
1463 break;
1464 }
1465 }
1466 if (allzero) {
1467 ldout(cct, 10) << "readx ob has all zero|rx, returning ENOENT"
1468 << dendl;
1469 delete rd;
1470 if (dontneed)
1471 bottouch_ob(o);
1472 return -ENOENT;
1473 }
1474 }
1475
1476 // map extent into bufferheads
1477 map<loff_t, BufferHead*> hits, missing, rx, errors;
1478 o->map_read(*ex_it, hits, missing, rx, errors);
1479 if (external_call) {
1480 // retry reading error buffers
1481 missing.insert(errors.begin(), errors.end());
1482 } else {
1483 // some reads had errors, fail later so completions
1484 // are cleaned up properly
1485 // TODO: make read path not call _readx for every completion
1486 hits.insert(errors.begin(), errors.end());
1487 }
1488
1489 if (!missing.empty() || !rx.empty()) {
1490 // read missing
1491 map<loff_t, BufferHead*>::iterator last = missing.end();
1492 for (map<loff_t, BufferHead*>::iterator bh_it = missing.begin();
1493 bh_it != missing.end();
1494 ++bh_it) {
1495 uint64_t rx_bytes = static_cast<uint64_t>(
1496 stat_rx + bh_it->second->length());
1497 bytes_not_in_cache += bh_it->second->length();
1498 if (!waitfor_read.empty() || (stat_rx > 0 && rx_bytes > max_size)) {
1499 // cache is full with concurrent reads -- wait for rx's to complete
1500 // to constrain memory growth (especially during copy-ups)
1501 if (success) {
1502 ldout(cct, 10) << "readx missed, waiting on cache to complete "
1503 << waitfor_read.size() << " blocked reads, "
11fdf7f2 1504 << (std::max(rx_bytes, max_size) - max_size)
7c673cae 1505 << " read bytes" << dendl;
31f18b77
FG
1506 waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish,
1507 *trace));
7c673cae
FG
1508 }
1509
1510 bh_remove(o, bh_it->second);
1511 delete bh_it->second;
1512 } else {
1513 bh_it->second->set_nocache(nocache);
31f18b77 1514 bh_read(bh_it->second, rd->fadvise_flags, *trace);
7c673cae
FG
1515 if ((success && onfinish) || last != missing.end())
1516 last = bh_it;
1517 }
1518 success = false;
1519 }
1520
1521 //add wait in last bh avoid wakeup early. Because read is order
1522 if (last != missing.end()) {
1523 ldout(cct, 10) << "readx missed, waiting on " << *last->second
1524 << " off " << last->first << dendl;
1525 last->second->waitfor_read[last->first].push_back(
31f18b77 1526 new C_RetryRead(this, rd, oset, onfinish, *trace) );
7c673cae
FG
1527
1528 }
1529
1530 // bump rx
1531 for (map<loff_t, BufferHead*>::iterator bh_it = rx.begin();
1532 bh_it != rx.end();
1533 ++bh_it) {
1534 touch_bh(bh_it->second); // bump in lru, so we don't lose it.
1535 if (success && onfinish) {
1536 ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
1537 << " off " << bh_it->first << dendl;
1538 bh_it->second->waitfor_read[bh_it->first].push_back(
31f18b77 1539 new C_RetryRead(this, rd, oset, onfinish, *trace) );
7c673cae
FG
1540 }
1541 bytes_not_in_cache += bh_it->second->length();
1542 success = false;
1543 }
1544
1545 for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1546 bh_it != hits.end(); ++bh_it)
1547 //bump in lru, so we don't lose it when later read
1548 touch_bh(bh_it->second);
1549
1550 } else {
11fdf7f2 1551 ceph_assert(!hits.empty());
7c673cae
FG
1552
1553 // make a plain list
1554 for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1555 bh_it != hits.end();
1556 ++bh_it) {
1557 BufferHead *bh = bh_it->second;
1558 ldout(cct, 10) << "readx hit bh " << *bh << dendl;
1559 if (bh->is_error() && bh->error)
1560 error = bh->error;
1561 bytes_in_cache += bh->length();
1562
1563 if (bh->get_nocache() && bh->is_clean())
1564 bh_lru_rest.lru_bottouch(bh);
1565 else
1566 touch_bh(bh);
1567 //must be after touch_bh because touch_bh set dontneed false
1568 if (dontneed &&
1569 ((loff_t)ex_it->offset <= bh->start() &&
1570 (bh->end() <=(loff_t)(ex_it->offset + ex_it->length)))) {
1571 bh->set_dontneed(true); //if dirty
1572 if (bh->is_clean())
1573 bh_lru_rest.lru_bottouch(bh);
1574 }
1575 }
1576
1577 if (!error) {
1578 // create reverse map of buffer offset -> object for the
1579 // eventual result. this is over a single ObjectExtent, so we
1580 // know that
1581 // - the bh's are contiguous
1582 // - the buffer frags need not be (and almost certainly aren't)
1583 loff_t opos = ex_it->offset;
1584 map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
11fdf7f2 1585 ceph_assert(bh_it->second->start() <= opos);
7c673cae
FG
1586 uint64_t bhoff = opos - bh_it->second->start();
1587 vector<pair<uint64_t,uint64_t> >::iterator f_it
1588 = ex_it->buffer_extents.begin();
1589 uint64_t foff = 0;
1590 while (1) {
1591 BufferHead *bh = bh_it->second;
11fdf7f2 1592 ceph_assert(opos == (loff_t)(bh->start() + bhoff));
7c673cae 1593
11fdf7f2 1594 uint64_t len = std::min(f_it->second - foff, bh->length() - bhoff);
7c673cae
FG
1595 ldout(cct, 10) << "readx rmap opos " << opos << ": " << *bh << " +"
1596 << bhoff << " frag " << f_it->first << "~"
1597 << f_it->second << " +" << foff << "~" << len
1598 << dendl;
1599
1600 bufferlist bit;
1601 // put substr here first, since substr_of clobbers, and we
1602 // may get multiple bh's at this stripe_map position
1603 if (bh->is_zero()) {
1604 stripe_map[f_it->first].append_zero(len);
1605 } else {
1606 bit.substr_of(bh->bl,
1607 opos - bh->start(),
1608 len);
1609 stripe_map[f_it->first].claim_append(bit);
1610 }
1611
1612 opos += len;
1613 bhoff += len;
1614 foff += len;
1615 if (opos == bh->end()) {
1616 ++bh_it;
1617 bhoff = 0;
1618 }
1619 if (foff == f_it->second) {
1620 ++f_it;
1621 foff = 0;
1622 }
1623 if (bh_it == hits.end()) break;
1624 if (f_it == ex_it->buffer_extents.end())
1625 break;
1626 }
11fdf7f2
TL
1627 ceph_assert(f_it == ex_it->buffer_extents.end());
1628 ceph_assert(opos == (loff_t)ex_it->offset + (loff_t)ex_it->length);
7c673cae
FG
1629 }
1630
1631 if (dontneed && o->include_all_cached_data(ex_it->offset, ex_it->length))
1632 bottouch_ob(o);
1633 }
1634 }
1635
1636 if (!success) {
1637 if (perfcounter && external_call) {
1638 perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1639 perfcounter->inc(l_objectcacher_cache_bytes_miss, bytes_not_in_cache);
1640 perfcounter->inc(l_objectcacher_cache_ops_miss);
1641 }
1642 if (onfinish) {
1643 ldout(cct, 20) << "readx defer " << rd << dendl;
1644 } else {
1645 ldout(cct, 20) << "readx drop " << rd << " (no complete, but no waiter)"
1646 << dendl;
1647 delete rd;
1648 }
1649 return 0; // wait!
1650 }
1651 if (perfcounter && external_call) {
1652 perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1653 perfcounter->inc(l_objectcacher_cache_bytes_hit, bytes_in_cache);
1654 perfcounter->inc(l_objectcacher_cache_ops_hit);
1655 }
1656
1657 // no misses... success! do the read.
1658 ldout(cct, 10) << "readx has all buffers" << dendl;
1659
1660 // ok, assemble into result buffer.
1661 uint64_t pos = 0;
1662 if (rd->bl && !error) {
1663 rd->bl->clear();
1664 for (map<uint64_t,bufferlist>::iterator i = stripe_map.begin();
1665 i != stripe_map.end();
1666 ++i) {
11fdf7f2 1667 ceph_assert(pos == i->first);
7c673cae
FG
1668 ldout(cct, 10) << "readx adding buffer len " << i->second.length()
1669 << " at " << pos << dendl;
1670 pos += i->second.length();
1671 rd->bl->claim_append(i->second);
11fdf7f2 1672 ceph_assert(rd->bl->length() == pos);
7c673cae
FG
1673 }
1674 ldout(cct, 10) << "readx result is " << rd->bl->length() << dendl;
1675 } else if (!error) {
1676 ldout(cct, 10) << "readx no bufferlist ptr (readahead?), done." << dendl;
1677 map<uint64_t,bufferlist>::reverse_iterator i = stripe_map.rbegin();
1678 pos = i->first + i->second.length();
1679 }
1680
1681 // done with read.
1682 int ret = error ? error : pos;
1683 ldout(cct, 20) << "readx done " << rd << " " << ret << dendl;
11fdf7f2 1684 ceph_assert(pos <= (uint64_t) INT_MAX);
7c673cae
FG
1685
1686 delete rd;
1687
1688 trim();
1689
1690 return ret;
1691}
1692
1693void ObjectCacher::retry_waiting_reads()
1694{
1695 list<Context *> ls;
1696 ls.swap(waitfor_read);
1697
1698 while (!ls.empty() && waitfor_read.empty()) {
1699 Context *ctx = ls.front();
1700 ls.pop_front();
1701 ctx->complete(0);
1702 }
1703 waitfor_read.splice(waitfor_read.end(), ls);
1704}
1705
31f18b77
FG
1706int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
1707 ZTracer::Trace *parent_trace)
7c673cae 1708{
9f95a23c 1709 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
1710 ceph::real_time now = ceph::real_clock::now();
1711 uint64_t bytes_written = 0;
1712 uint64_t bytes_written_in_flush = 0;
1713 bool dontneed = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1714 bool nocache = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1715
31f18b77
FG
1716 ZTracer::Trace trace;
1717 if (parent_trace != nullptr) {
1718 trace.init("write", &trace_endpoint, parent_trace);
1719 trace.event("start");
1720 }
1721
7c673cae
FG
1722 for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
1723 ex_it != wr->extents.end();
1724 ++ex_it) {
1725 // get object cache
1726 sobject_t soid(ex_it->oid, CEPH_NOSNAP);
1727 Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1728 ex_it->truncate_size, oset->truncate_seq);
1729
1730 // map it all into a single bufferhead.
1731 BufferHead *bh = o->map_write(*ex_it, wr->journal_tid);
1732 bool missing = bh->is_missing();
1733 bh->snapc = wr->snapc;
31f18b77 1734
7c673cae
FG
1735 bytes_written += ex_it->length;
1736 if (bh->is_tx()) {
1737 bytes_written_in_flush += ex_it->length;
1738 }
1739
1740 // adjust buffer pointers (ie "copy" data into my cache)
1741 // this is over a single ObjectExtent, so we know that
1742 // - there is one contiguous bh
1743 // - the buffer frags need not be (and almost certainly aren't)
1744 // note: i assume striping is monotonic... no jumps backwards, ever!
1745 loff_t opos = ex_it->offset;
1746 for (vector<pair<uint64_t, uint64_t> >::iterator f_it
1747 = ex_it->buffer_extents.begin();
1748 f_it != ex_it->buffer_extents.end();
1749 ++f_it) {
1750 ldout(cct, 10) << "writex writing " << f_it->first << "~"
1751 << f_it->second << " into " << *bh << " at " << opos
1752 << dendl;
1adf2230 1753 uint64_t bhoff = opos - bh->start();
11fdf7f2 1754 ceph_assert(f_it->second <= bh->length() - bhoff);
7c673cae
FG
1755
1756 // get the frag we're mapping in
1757 bufferlist frag;
11fdf7f2 1758 frag.substr_of(wr->bl, f_it->first, f_it->second);
7c673cae
FG
1759
1760 // keep anything left of bhoff
11fdf7f2
TL
1761 if (!bhoff)
1762 bh->bl.swap(frag);
1763 else
1764 bh->bl.claim_append(frag);
7c673cae
FG
1765
1766 opos += f_it->second;
1767 }
1768
1769 // ok, now bh is dirty.
1770 mark_dirty(bh);
1771 if (dontneed)
1772 bh->set_dontneed(true);
1773 else if (nocache && missing)
1774 bh->set_nocache(true);
1775 else
1776 touch_bh(bh);
1777
1778 bh->last_write = now;
1779
1780 o->try_merge_bh(bh);
1781 }
1782
1783 if (perfcounter) {
1784 perfcounter->inc(l_objectcacher_data_written, bytes_written);
1785 if (bytes_written_in_flush) {
1786 perfcounter->inc(l_objectcacher_overwritten_in_flush,
1787 bytes_written_in_flush);
1788 }
1789 }
1790
31f18b77 1791 int r = _wait_for_write(wr, bytes_written, oset, &trace, onfreespace);
7c673cae
FG
1792 delete wr;
1793
1794 //verify_stats();
1795 trim();
1796 return r;
1797}
1798
1799class ObjectCacher::C_WaitForWrite : public Context {
1800public:
31f18b77
FG
1801 C_WaitForWrite(ObjectCacher *oc, uint64_t len,
1802 const ZTracer::Trace &trace, Context *onfinish) :
1803 m_oc(oc), m_len(len), m_trace(trace), m_onfinish(onfinish) {}
7c673cae
FG
1804 void finish(int r) override;
1805private:
1806 ObjectCacher *m_oc;
1807 uint64_t m_len;
31f18b77 1808 ZTracer::Trace m_trace;
7c673cae
FG
1809 Context *m_onfinish;
1810};
1811
1812void ObjectCacher::C_WaitForWrite::finish(int r)
1813{
11fdf7f2 1814 std::lock_guard l(m_oc->lock);
9f95a23c 1815 m_oc->_maybe_wait_for_writeback(m_len, &m_trace);
7c673cae
FG
1816 m_onfinish->complete(r);
1817}
1818
9f95a23c
TL
1819void ObjectCacher::_maybe_wait_for_writeback(uint64_t len,
1820 ZTracer::Trace *trace)
7c673cae 1821{
9f95a23c 1822 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
1823 ceph::mono_time start = ceph::mono_clock::now();
1824 int blocked = 0;
1825 // wait for writeback?
1826 // - wait for dirty and tx bytes (relative to the max_dirty threshold)
1827 // - do not wait for bytes other waiters are waiting on. this means that
1828 // threads do not wait for each other. this effectively allows the cache
1829 // size to balloon proportional to the data that is in flight.
3efd9988
FG
1830
1831 uint64_t max_dirty_bh = max_dirty >> BUFFER_MEMORY_WEIGHT;
7c673cae 1832 while (get_stat_dirty() + get_stat_tx() > 0 &&
3efd9988
FG
1833 (((uint64_t)(get_stat_dirty() + get_stat_tx()) >=
1834 max_dirty + get_stat_dirty_waiting()) ||
1835 (dirty_or_tx_bh.size() >=
1836 max_dirty_bh + get_stat_nr_dirty_waiters()))) {
1837
31f18b77
FG
1838 if (blocked == 0) {
1839 trace->event("start wait for writeback");
1840 }
7c673cae
FG
1841 ldout(cct, 10) << __func__ << " waiting for dirty|tx "
1842 << (get_stat_dirty() + get_stat_tx()) << " >= max "
1843 << max_dirty << " + dirty_waiting "
1844 << get_stat_dirty_waiting() << dendl;
9f95a23c 1845 flusher_cond.notify_all();
7c673cae 1846 stat_dirty_waiting += len;
3efd9988 1847 ++stat_nr_dirty_waiters;
9f95a23c
TL
1848 std::unique_lock l{lock, std::adopt_lock};
1849 stat_cond.wait(l);
1850 l.release();
7c673cae 1851 stat_dirty_waiting -= len;
3efd9988 1852 --stat_nr_dirty_waiters;
7c673cae
FG
1853 ++blocked;
1854 ldout(cct, 10) << __func__ << " woke up" << dendl;
1855 }
31f18b77
FG
1856 if (blocked > 0) {
1857 trace->event("finish wait for writeback");
1858 }
7c673cae
FG
1859 if (blocked && perfcounter) {
1860 perfcounter->inc(l_objectcacher_write_ops_blocked);
1861 perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
1862 ceph::timespan blocked = ceph::mono_clock::now() - start;
1863 perfcounter->tinc(l_objectcacher_write_time_blocked, blocked);
1864 }
1865}
1866
1867// blocking wait for write.
1868int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
31f18b77 1869 ZTracer::Trace *trace, Context *onfreespace)
7c673cae 1870{
9f95a23c 1871 ceph_assert(ceph_mutex_is_locked(lock));
11fdf7f2 1872 ceph_assert(trace != nullptr);
7c673cae
FG
1873 int ret = 0;
1874
11fdf7f2 1875 if (max_dirty > 0 && !(wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_FUA)) {
7c673cae 1876 if (block_writes_upfront) {
9f95a23c 1877 _maybe_wait_for_writeback(len, trace);
7c673cae
FG
1878 if (onfreespace)
1879 onfreespace->complete(0);
1880 } else {
11fdf7f2 1881 ceph_assert(onfreespace);
31f18b77 1882 finisher.queue(new C_WaitForWrite(this, len, *trace, onfreespace));
7c673cae
FG
1883 }
1884 } else {
1885 // write-thru! flush what we just wrote.
9f95a23c 1886 ceph::condition_variable cond;
7c673cae
FG
1887 bool done = false;
1888 Context *fin = block_writes_upfront ?
9f95a23c 1889 new C_Cond(cond, &done, &ret) : onfreespace;
11fdf7f2 1890 ceph_assert(fin);
31f18b77 1891 bool flushed = flush_set(oset, wr->extents, trace, fin);
11fdf7f2 1892 ceph_assert(!flushed); // we just dirtied it, and didn't drop our lock!
7c673cae
FG
1893 ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len
1894 << " bytes" << dendl;
1895 if (block_writes_upfront) {
9f95a23c
TL
1896 std::unique_lock l{lock, std::adopt_lock};
1897 cond.wait(l, [&done] { return done; });
1898 l.release();
7c673cae
FG
1899 ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl;
1900 if (onfreespace)
1901 onfreespace->complete(ret);
1902 }
1903 }
1904
1905 // start writeback anyway?
1906 if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty) {
1907 ldout(cct, 10) << "wait_for_write " << get_stat_dirty() << " > target "
1908 << target_dirty << ", nudging flusher" << dendl;
9f95a23c 1909 flusher_cond.notify_all();
7c673cae
FG
1910 }
1911 return ret;
1912}
1913
1914void ObjectCacher::flusher_entry()
1915{
1916 ldout(cct, 10) << "flusher start" << dendl;
9f95a23c 1917 std::unique_lock l{lock};
7c673cae
FG
1918 while (!flusher_stop) {
1919 loff_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() +
1920 get_stat_dirty();
1921 ldout(cct, 11) << "flusher "
1922 << all << " / " << max_size << ": "
1923 << get_stat_tx() << " tx, "
1924 << get_stat_rx() << " rx, "
1925 << get_stat_clean() << " clean, "
1926 << get_stat_dirty() << " dirty ("
1927 << target_dirty << " target, "
1928 << max_dirty << " max)"
1929 << dendl;
1930 loff_t actual = get_stat_dirty() + get_stat_dirty_waiting();
31f18b77
FG
1931
1932 ZTracer::Trace trace;
1933 if (cct->_conf->osdc_blkin_trace_all) {
1934 trace.init("flusher", &trace_endpoint);
1935 trace.event("start");
1936 }
1937
7c673cae
FG
1938 if (actual > 0 && (uint64_t) actual > target_dirty) {
1939 // flush some dirty pages
1940 ldout(cct, 10) << "flusher " << get_stat_dirty() << " dirty + "
1941 << get_stat_dirty_waiting() << " dirty_waiting > target "
1942 << target_dirty << ", flushing some dirty bhs" << dendl;
31f18b77 1943 flush(&trace, actual - target_dirty);
7c673cae
FG
1944 } else {
1945 // check tail of lru for old dirty items
1946 ceph::real_time cutoff = ceph::real_clock::now();
1947 cutoff -= max_dirty_age;
1948 BufferHead *bh = 0;
1949 int max = MAX_FLUSH_UNDER_LOCK;
1950 while ((bh = static_cast<BufferHead*>(bh_lru_dirty.
1951 lru_get_next_expire())) != 0 &&
1952 bh->last_write <= cutoff &&
1953 max > 0) {
1954 ldout(cct, 10) << "flusher flushing aged dirty bh " << *bh << dendl;
1955 if (scattered_write) {
1956 bh_write_adjacencies(bh, cutoff, NULL, &max);
1957 } else {
31f18b77 1958 bh_write(bh, trace);
7c673cae
FG
1959 --max;
1960 }
1961 }
1962 if (!max) {
1963 // back off the lock to avoid starving other threads
31f18b77 1964 trace.event("backoff");
9f95a23c
TL
1965 l.unlock();
1966 l.lock();
7c673cae
FG
1967 continue;
1968 }
1969 }
31f18b77
FG
1970
1971 trace.event("finish");
7c673cae
FG
1972 if (flusher_stop)
1973 break;
1974
9f95a23c 1975 flusher_cond.wait_for(l, 1s);
7c673cae
FG
1976 }
1977
1978 /* Wait for reads to finish. This is only possible if handling
1979 * -ENOENT made some read completions finish before their rados read
1980 * came back. If we don't wait for them, and destroy the cache, when
1981 * the rados reads do come back their callback will try to access the
1982 * no-longer-valid ObjectCacher.
1983 */
9f95a23c
TL
1984 read_cond.wait(l, [this] {
1985 if (reads_outstanding > 0) {
1986 ldout(cct, 10) << "Waiting for all reads to complete. Number left: "
1987 << reads_outstanding << dendl;
1988 return false;
1989 } else {
1990 return true;
1991 }
1992 });
7c673cae
FG
1993 ldout(cct, 10) << "flusher finish" << dendl;
1994}
1995
1996
1997// -------------------------------------------------
1998
1999bool ObjectCacher::set_is_empty(ObjectSet *oset)
2000{
9f95a23c 2001 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
2002 if (oset->objects.empty())
2003 return true;
2004
2005 for (xlist<Object*>::iterator p = oset->objects.begin(); !p.end(); ++p)
2006 if (!(*p)->is_empty())
2007 return false;
2008
2009 return true;
2010}
2011
2012bool ObjectCacher::set_is_cached(ObjectSet *oset)
2013{
9f95a23c 2014 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
2015 if (oset->objects.empty())
2016 return false;
2017
2018 for (xlist<Object*>::iterator p = oset->objects.begin();
2019 !p.end(); ++p) {
2020 Object *ob = *p;
2021 for (map<loff_t,BufferHead*>::iterator q = ob->data.begin();
2022 q != ob->data.end();
2023 ++q) {
2024 BufferHead *bh = q->second;
2025 if (!bh->is_dirty() && !bh->is_tx())
2026 return true;
2027 }
2028 }
2029
2030 return false;
2031}
2032
2033bool ObjectCacher::set_is_dirty_or_committing(ObjectSet *oset)
2034{
9f95a23c 2035 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
2036 if (oset->objects.empty())
2037 return false;
2038
2039 for (xlist<Object*>::iterator i = oset->objects.begin();
2040 !i.end(); ++i) {
2041 Object *ob = *i;
2042
2043 for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
2044 p != ob->data.end();
2045 ++p) {
2046 BufferHead *bh = p->second;
2047 if (bh->is_dirty() || bh->is_tx())
2048 return true;
2049 }
2050 }
2051
2052 return false;
2053}
2054
2055
2056// purge. non-blocking. violently removes dirty buffers from cache.
2057void ObjectCacher::purge(Object *ob)
2058{
9f95a23c 2059 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
2060 ldout(cct, 10) << "purge " << *ob << dendl;
2061
2062 ob->truncate(0);
2063}
2064
2065
2066// flush. non-blocking. no callback.
2067// true if clean, already flushed.
2068// false if we wrote something.
2069// be sloppy about the ranges and flush any buffer it touches
31f18b77
FG
2070bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length,
2071 ZTracer::Trace *trace)
7c673cae 2072{
11fdf7f2 2073 ceph_assert(trace != nullptr);
9f95a23c 2074 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
2075 list<BufferHead*> blist;
2076 bool clean = true;
2077 ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl;
2078 for (map<loff_t,BufferHead*>::const_iterator p = ob->data_lower_bound(offset);
2079 p != ob->data.end();
2080 ++p) {
2081 BufferHead *bh = p->second;
2082 ldout(cct, 20) << "flush " << *bh << dendl;
2083 if (length && bh->start() > offset+length) {
2084 break;
2085 }
2086 if (bh->is_tx()) {
2087 clean = false;
2088 continue;
2089 }
2090 if (!bh->is_dirty()) {
2091 continue;
2092 }
2093
2094 if (scattered_write)
2095 blist.push_back(bh);
2096 else
31f18b77 2097 bh_write(bh, *trace);
7c673cae
FG
2098 clean = false;
2099 }
2100 if (scattered_write && !blist.empty())
2101 bh_write_scattered(blist);
2102
2103 return clean;
2104}
2105
2106bool ObjectCacher::_flush_set_finish(C_GatherBuilder *gather,
2107 Context *onfinish)
2108{
9f95a23c 2109 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
2110 if (gather->has_subs()) {
2111 gather->set_finisher(onfinish);
2112 gather->activate();
2113 return false;
2114 }
2115
2116 ldout(cct, 10) << "flush_set has no dirty|tx bhs" << dendl;
2117 onfinish->complete(0);
2118 return true;
2119}
2120
2121// flush. non-blocking, takes callback.
2122// returns true if already flushed
2123bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
2124{
9f95a23c 2125 ceph_assert(ceph_mutex_is_locked(lock));
11fdf7f2 2126 ceph_assert(onfinish != NULL);
7c673cae
FG
2127 if (oset->objects.empty()) {
2128 ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2129 onfinish->complete(0);
2130 return true;
2131 }
2132
2133 ldout(cct, 10) << "flush_set " << oset << dendl;
2134
2135 // we'll need to wait for all objects to flush!
2136 C_GatherBuilder gather(cct);
2137 set<Object*> waitfor_commit;
2138
2139 list<BufferHead*> blist;
2140 Object *last_ob = NULL;
2141 set<BufferHead*, BufferHead::ptr_lt>::const_iterator it, p, q;
2142
2143 // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
2144 // order. But items in oset->objects are not sorted. So the iterator can
2145 // point to any buffer head in the ObjectSet
2146 BufferHead key(*oset->objects.begin());
2147 it = dirty_or_tx_bh.lower_bound(&key);
2148 p = q = it;
2149
2150 bool backwards = true;
2151 if (it != dirty_or_tx_bh.begin())
2152 --it;
2153 else
2154 backwards = false;
2155
2156 for (; p != dirty_or_tx_bh.end(); p = q) {
2157 ++q;
2158 BufferHead *bh = *p;
2159 if (bh->ob->oset != oset)
2160 break;
2161 waitfor_commit.insert(bh->ob);
2162 if (bh->is_dirty()) {
2163 if (scattered_write) {
2164 if (last_ob != bh->ob) {
2165 if (!blist.empty()) {
2166 bh_write_scattered(blist);
2167 blist.clear();
2168 }
2169 last_ob = bh->ob;
2170 }
2171 blist.push_back(bh);
2172 } else {
31f18b77 2173 bh_write(bh, {});
7c673cae
FG
2174 }
2175 }
2176 }
2177
2178 if (backwards) {
2179 for(p = q = it; true; p = q) {
2180 if (q != dirty_or_tx_bh.begin())
2181 --q;
2182 else
2183 backwards = false;
2184 BufferHead *bh = *p;
2185 if (bh->ob->oset != oset)
2186 break;
2187 waitfor_commit.insert(bh->ob);
2188 if (bh->is_dirty()) {
2189 if (scattered_write) {
2190 if (last_ob != bh->ob) {
2191 if (!blist.empty()) {
2192 bh_write_scattered(blist);
2193 blist.clear();
2194 }
2195 last_ob = bh->ob;
2196 }
2197 blist.push_front(bh);
2198 } else {
31f18b77 2199 bh_write(bh, {});
7c673cae
FG
2200 }
2201 }
2202 if (!backwards)
2203 break;
2204 }
2205 }
2206
2207 if (scattered_write && !blist.empty())
2208 bh_write_scattered(blist);
2209
2210 for (set<Object*>::iterator i = waitfor_commit.begin();
2211 i != waitfor_commit.end(); ++i) {
2212 Object *ob = *i;
2213
2214 // we'll need to gather...
2215 ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2216 << ob->last_write_tid << " on " << *ob << dendl;
2217 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2218 }
2219
2220 return _flush_set_finish(&gather, onfinish);
2221}
2222
2223// flush. non-blocking, takes callback.
2224// returns true if already flushed
2225bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv,
31f18b77 2226 ZTracer::Trace *trace, Context *onfinish)
7c673cae 2227{
9f95a23c 2228 ceph_assert(ceph_mutex_is_locked(lock));
11fdf7f2
TL
2229 ceph_assert(trace != nullptr);
2230 ceph_assert(onfinish != NULL);
7c673cae
FG
2231 if (oset->objects.empty()) {
2232 ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2233 onfinish->complete(0);
2234 return true;
2235 }
2236
2237 ldout(cct, 10) << "flush_set " << oset << " on " << exv.size()
2238 << " ObjectExtents" << dendl;
2239
2240 // we'll need to wait for all objects to flush!
2241 C_GatherBuilder gather(cct);
2242
2243 for (vector<ObjectExtent>::iterator p = exv.begin();
2244 p != exv.end();
2245 ++p) {
2246 ObjectExtent &ex = *p;
2247 sobject_t soid(ex.oid, CEPH_NOSNAP);
2248 if (objects[oset->poolid].count(soid) == 0)
2249 continue;
2250 Object *ob = objects[oset->poolid][soid];
2251
2252 ldout(cct, 20) << "flush_set " << oset << " ex " << ex << " ob " << soid
2253 << " " << ob << dendl;
2254
31f18b77 2255 if (!flush(ob, ex.offset, ex.length, trace)) {
7c673cae
FG
2256 // we'll need to gather...
2257 ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2258 << ob->last_write_tid << " on " << *ob << dendl;
2259 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2260 }
2261 }
2262
2263 return _flush_set_finish(&gather, onfinish);
2264}
2265
2266// flush all dirty data. non-blocking, takes callback.
2267// returns true if already flushed
2268bool ObjectCacher::flush_all(Context *onfinish)
2269{
9f95a23c 2270 ceph_assert(ceph_mutex_is_locked(lock));
11fdf7f2 2271 ceph_assert(onfinish != NULL);
7c673cae
FG
2272
2273 ldout(cct, 10) << "flush_all " << dendl;
2274
2275 // we'll need to wait for all objects to flush!
2276 C_GatherBuilder gather(cct);
2277 set<Object*> waitfor_commit;
2278
2279 list<BufferHead*> blist;
2280 Object *last_ob = NULL;
2281 set<BufferHead*, BufferHead::ptr_lt>::iterator next, it;
2282 next = it = dirty_or_tx_bh.begin();
2283 while (it != dirty_or_tx_bh.end()) {
2284 ++next;
2285 BufferHead *bh = *it;
2286 waitfor_commit.insert(bh->ob);
2287
2288 if (bh->is_dirty()) {
2289 if (scattered_write) {
2290 if (last_ob != bh->ob) {
2291 if (!blist.empty()) {
2292 bh_write_scattered(blist);
2293 blist.clear();
2294 }
2295 last_ob = bh->ob;
2296 }
2297 blist.push_back(bh);
2298 } else {
31f18b77 2299 bh_write(bh, {});
7c673cae
FG
2300 }
2301 }
2302
2303 it = next;
2304 }
2305
2306 if (scattered_write && !blist.empty())
2307 bh_write_scattered(blist);
2308
2309 for (set<Object*>::iterator i = waitfor_commit.begin();
2310 i != waitfor_commit.end();
2311 ++i) {
2312 Object *ob = *i;
2313
2314 // we'll need to gather...
2315 ldout(cct, 10) << "flush_all will wait for ack tid "
2316 << ob->last_write_tid << " on " << *ob << dendl;
2317 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2318 }
2319
2320 return _flush_set_finish(&gather, onfinish);
2321}
2322
2323void ObjectCacher::purge_set(ObjectSet *oset)
2324{
9f95a23c 2325 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
2326 if (oset->objects.empty()) {
2327 ldout(cct, 10) << "purge_set on " << oset << " dne" << dendl;
2328 return;
2329 }
2330
2331 ldout(cct, 10) << "purge_set " << oset << dendl;
2332 const bool were_dirty = oset->dirty_or_tx > 0;
2333
2334 for (xlist<Object*>::iterator i = oset->objects.begin();
2335 !i.end(); ++i) {
2336 Object *ob = *i;
2337 purge(ob);
2338 }
2339
2340 // Although we have purged rather than flushed, caller should still
2341 // drop any resources associate with dirty data.
11fdf7f2 2342 ceph_assert(oset->dirty_or_tx == 0);
7c673cae
FG
2343 if (flush_set_callback && were_dirty) {
2344 flush_set_callback(flush_set_callback_arg, oset);
2345 }
2346}
2347
2348
2349loff_t ObjectCacher::release(Object *ob)
2350{
9f95a23c 2351 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
2352 list<BufferHead*> clean;
2353 loff_t o_unclean = 0;
2354
2355 for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
2356 p != ob->data.end();
2357 ++p) {
2358 BufferHead *bh = p->second;
2359 if (bh->is_clean() || bh->is_zero() || bh->is_error())
2360 clean.push_back(bh);
2361 else
2362 o_unclean += bh->length();
2363 }
2364
2365 for (list<BufferHead*>::iterator p = clean.begin();
2366 p != clean.end();
2367 ++p) {
2368 bh_remove(ob, *p);
2369 delete *p;
2370 }
2371
2372 if (ob->can_close()) {
2373 ldout(cct, 10) << "release trimming " << *ob << dendl;
2374 close_object(ob);
11fdf7f2 2375 ceph_assert(o_unclean == 0);
7c673cae
FG
2376 return 0;
2377 }
2378
2379 if (ob->complete) {
2380 ldout(cct, 10) << "release clearing complete on " << *ob << dendl;
2381 ob->complete = false;
2382 }
2383 if (!ob->exists) {
2384 ldout(cct, 10) << "release setting exists on " << *ob << dendl;
2385 ob->exists = true;
2386 }
2387
2388 return o_unclean;
2389}
2390
2391loff_t ObjectCacher::release_set(ObjectSet *oset)
2392{
9f95a23c 2393 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
2394 // return # bytes not clean (and thus not released).
2395 loff_t unclean = 0;
2396
2397 if (oset->objects.empty()) {
2398 ldout(cct, 10) << "release_set on " << oset << " dne" << dendl;
2399 return 0;
2400 }
2401
2402 ldout(cct, 10) << "release_set " << oset << dendl;
2403
2404 xlist<Object*>::iterator q;
2405 for (xlist<Object*>::iterator p = oset->objects.begin();
2406 !p.end(); ) {
2407 q = p;
2408 ++q;
2409 Object *ob = *p;
2410
2411 loff_t o_unclean = release(ob);
2412 unclean += o_unclean;
2413
2414 if (o_unclean)
2415 ldout(cct, 10) << "release_set " << oset << " " << *ob
2416 << " has " << o_unclean << " bytes left"
2417 << dendl;
2418 p = q;
2419 }
2420
2421 if (unclean) {
2422 ldout(cct, 10) << "release_set " << oset
2423 << ", " << unclean << " bytes left" << dendl;
2424 }
2425
2426 return unclean;
2427}
2428
2429
2430uint64_t ObjectCacher::release_all()
2431{
9f95a23c 2432 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
2433 ldout(cct, 10) << "release_all" << dendl;
2434 uint64_t unclean = 0;
2435
2436 vector<ceph::unordered_map<sobject_t, Object*> >::iterator i
2437 = objects.begin();
2438 while (i != objects.end()) {
2439 ceph::unordered_map<sobject_t, Object*>::iterator p = i->begin();
2440 while (p != i->end()) {
2441 ceph::unordered_map<sobject_t, Object*>::iterator n = p;
2442 ++n;
2443
2444 Object *ob = p->second;
2445
2446 loff_t o_unclean = release(ob);
2447 unclean += o_unclean;
2448
2449 if (o_unclean)
2450 ldout(cct, 10) << "release_all " << *ob
2451 << " has " << o_unclean << " bytes left"
2452 << dendl;
2453 p = n;
2454 }
2455 ++i;
2456 }
2457
2458 if (unclean) {
2459 ldout(cct, 10) << "release_all unclean " << unclean << " bytes left"
2460 << dendl;
2461 }
2462
2463 return unclean;
2464}
2465
2466void ObjectCacher::clear_nonexistence(ObjectSet *oset)
2467{
9f95a23c 2468 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
2469 ldout(cct, 10) << "clear_nonexistence() " << oset << dendl;
2470
2471 for (xlist<Object*>::iterator p = oset->objects.begin();
2472 !p.end(); ++p) {
2473 Object *ob = *p;
2474 if (!ob->exists) {
2475 ldout(cct, 10) << " setting exists and complete on " << *ob << dendl;
2476 ob->exists = true;
2477 ob->complete = false;
2478 }
2479 for (xlist<C_ReadFinish*>::iterator q = ob->reads.begin();
2480 !q.end(); ++q) {
2481 C_ReadFinish *comp = *q;
2482 comp->distrust_enoent();
2483 }
2484 }
2485}
2486
2487/**
2488 * discard object extents from an ObjectSet by removing the objects in
2489 * exls from the in-memory oset.
2490 */
2491void ObjectCacher::discard_set(ObjectSet *oset, const vector<ObjectExtent>& exls)
2492{
9f95a23c 2493 ceph_assert(ceph_mutex_is_locked(lock));
28e407b8
AA
2494 bool was_dirty = oset->dirty_or_tx > 0;
2495
2496 _discard(oset, exls, nullptr);
2497 _discard_finish(oset, was_dirty, nullptr);
2498}
2499
2500/**
2501 * discard object extents from an ObjectSet by removing the objects in
2502 * exls from the in-memory oset. If the bh is in TX state, the discard
2503 * will wait for the write to commit prior to invoking on_finish.
2504 */
2505void ObjectCacher::discard_writeback(ObjectSet *oset,
2506 const vector<ObjectExtent>& exls,
2507 Context* on_finish)
2508{
9f95a23c 2509 ceph_assert(ceph_mutex_is_locked(lock));
28e407b8
AA
2510 bool was_dirty = oset->dirty_or_tx > 0;
2511
2512 C_GatherBuilder gather(cct);
2513 _discard(oset, exls, &gather);
2514
2515 if (gather.has_subs()) {
2516 bool flushed = was_dirty && oset->dirty_or_tx == 0;
9f95a23c 2517 gather.set_finisher(new LambdaContext(
28e407b8 2518 [this, oset, flushed, on_finish](int) {
9f95a23c 2519 ceph_assert(ceph_mutex_is_locked(lock));
28e407b8
AA
2520 if (flushed && flush_set_callback)
2521 flush_set_callback(flush_set_callback_arg, oset);
2522 if (on_finish)
2523 on_finish->complete(0);
2524 }));
2525 gather.activate();
7c673cae
FG
2526 return;
2527 }
2528
28e407b8
AA
2529 _discard_finish(oset, was_dirty, on_finish);
2530}
2531
2532void ObjectCacher::_discard(ObjectSet *oset, const vector<ObjectExtent>& exls,
2533 C_GatherBuilder* gather)
2534{
2535 if (oset->objects.empty()) {
2536 ldout(cct, 10) << __func__ << " on " << oset << " dne" << dendl;
2537 return;
2538 }
7c673cae 2539
28e407b8 2540 ldout(cct, 10) << __func__ << " " << oset << dendl;
7c673cae 2541
28e407b8
AA
2542 for (auto& ex : exls) {
2543 ldout(cct, 10) << __func__ << " " << oset << " ex " << ex << dendl;
7c673cae
FG
2544 sobject_t soid(ex.oid, CEPH_NOSNAP);
2545 if (objects[oset->poolid].count(soid) == 0)
2546 continue;
2547 Object *ob = objects[oset->poolid][soid];
2548
28e407b8 2549 ob->discard(ex.offset, ex.length, gather);
7c673cae 2550 }
28e407b8
AA
2551}
2552
2553void ObjectCacher::_discard_finish(ObjectSet *oset, bool was_dirty,
2554 Context* on_finish)
2555{
9f95a23c 2556 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
2557
2558 // did we truncate off dirty data?
28e407b8 2559 if (flush_set_callback && was_dirty && oset->dirty_or_tx == 0) {
7c673cae 2560 flush_set_callback(flush_set_callback_arg, oset);
28e407b8
AA
2561 }
2562
2563 // notify that in-flight writeback has completed
2564 if (on_finish != nullptr) {
2565 on_finish->complete(0);
2566 }
7c673cae
FG
2567}
2568
2569void ObjectCacher::verify_stats() const
2570{
9f95a23c 2571 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
2572 ldout(cct, 10) << "verify_stats" << dendl;
2573
2574 loff_t clean = 0, zero = 0, dirty = 0, rx = 0, tx = 0, missing = 0,
2575 error = 0;
2576 for (vector<ceph::unordered_map<sobject_t, Object*> >::const_iterator i
2577 = objects.begin();
2578 i != objects.end();
2579 ++i) {
2580 for (ceph::unordered_map<sobject_t, Object*>::const_iterator p
2581 = i->begin();
2582 p != i->end();
2583 ++p) {
2584 Object *ob = p->second;
2585 for (map<loff_t, BufferHead*>::const_iterator q = ob->data.begin();
2586 q != ob->data.end();
2587 ++q) {
2588 BufferHead *bh = q->second;
2589 switch (bh->get_state()) {
2590 case BufferHead::STATE_MISSING:
2591 missing += bh->length();
2592 break;
2593 case BufferHead::STATE_CLEAN:
2594 clean += bh->length();
2595 break;
2596 case BufferHead::STATE_ZERO:
2597 zero += bh->length();
2598 break;
2599 case BufferHead::STATE_DIRTY:
2600 dirty += bh->length();
2601 break;
2602 case BufferHead::STATE_TX:
2603 tx += bh->length();
2604 break;
2605 case BufferHead::STATE_RX:
2606 rx += bh->length();
2607 break;
2608 case BufferHead::STATE_ERROR:
2609 error += bh->length();
2610 break;
2611 default:
2612 ceph_abort();
2613 }
2614 }
2615 }
2616 }
2617
2618 ldout(cct, 10) << " clean " << clean << " rx " << rx << " tx " << tx
2619 << " dirty " << dirty << " missing " << missing
2620 << " error " << error << dendl;
11fdf7f2
TL
2621 ceph_assert(clean == stat_clean);
2622 ceph_assert(rx == stat_rx);
2623 ceph_assert(tx == stat_tx);
2624 ceph_assert(dirty == stat_dirty);
2625 ceph_assert(missing == stat_missing);
2626 ceph_assert(zero == stat_zero);
2627 ceph_assert(error == stat_error);
7c673cae
FG
2628}
2629
2630void ObjectCacher::bh_stat_add(BufferHead *bh)
2631{
9f95a23c 2632 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
2633 switch (bh->get_state()) {
2634 case BufferHead::STATE_MISSING:
2635 stat_missing += bh->length();
2636 break;
2637 case BufferHead::STATE_CLEAN:
2638 stat_clean += bh->length();
2639 break;
2640 case BufferHead::STATE_ZERO:
2641 stat_zero += bh->length();
2642 break;
2643 case BufferHead::STATE_DIRTY:
2644 stat_dirty += bh->length();
2645 bh->ob->dirty_or_tx += bh->length();
2646 bh->ob->oset->dirty_or_tx += bh->length();
2647 break;
2648 case BufferHead::STATE_TX:
2649 stat_tx += bh->length();
2650 bh->ob->dirty_or_tx += bh->length();
2651 bh->ob->oset->dirty_or_tx += bh->length();
2652 break;
2653 case BufferHead::STATE_RX:
2654 stat_rx += bh->length();
2655 break;
2656 case BufferHead::STATE_ERROR:
2657 stat_error += bh->length();
2658 break;
2659 default:
11fdf7f2 2660 ceph_abort_msg("bh_stat_add: invalid bufferhead state");
7c673cae
FG
2661 }
2662 if (get_stat_dirty_waiting() > 0)
9f95a23c 2663 stat_cond.notify_all();
7c673cae
FG
2664}
2665
2666void ObjectCacher::bh_stat_sub(BufferHead *bh)
2667{
9f95a23c 2668 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
2669 switch (bh->get_state()) {
2670 case BufferHead::STATE_MISSING:
2671 stat_missing -= bh->length();
2672 break;
2673 case BufferHead::STATE_CLEAN:
2674 stat_clean -= bh->length();
2675 break;
2676 case BufferHead::STATE_ZERO:
2677 stat_zero -= bh->length();
2678 break;
2679 case BufferHead::STATE_DIRTY:
2680 stat_dirty -= bh->length();
2681 bh->ob->dirty_or_tx -= bh->length();
2682 bh->ob->oset->dirty_or_tx -= bh->length();
2683 break;
2684 case BufferHead::STATE_TX:
2685 stat_tx -= bh->length();
2686 bh->ob->dirty_or_tx -= bh->length();
2687 bh->ob->oset->dirty_or_tx -= bh->length();
2688 break;
2689 case BufferHead::STATE_RX:
2690 stat_rx -= bh->length();
2691 break;
2692 case BufferHead::STATE_ERROR:
2693 stat_error -= bh->length();
2694 break;
2695 default:
11fdf7f2 2696 ceph_abort_msg("bh_stat_sub: invalid bufferhead state");
7c673cae
FG
2697 }
2698}
2699
2700void ObjectCacher::bh_set_state(BufferHead *bh, int s)
2701{
9f95a23c 2702 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
2703 int state = bh->get_state();
2704 // move between lru lists?
2705 if (s == BufferHead::STATE_DIRTY && state != BufferHead::STATE_DIRTY) {
2706 bh_lru_rest.lru_remove(bh);
2707 bh_lru_dirty.lru_insert_top(bh);
2708 } else if (s != BufferHead::STATE_DIRTY &&state == BufferHead::STATE_DIRTY) {
2709 bh_lru_dirty.lru_remove(bh);
2710 if (bh->get_dontneed())
2711 bh_lru_rest.lru_insert_bot(bh);
2712 else
2713 bh_lru_rest.lru_insert_top(bh);
2714 }
2715
2716 if ((s == BufferHead::STATE_TX ||
2717 s == BufferHead::STATE_DIRTY) &&
2718 state != BufferHead::STATE_TX &&
2719 state != BufferHead::STATE_DIRTY) {
2720 dirty_or_tx_bh.insert(bh);
2721 } else if ((state == BufferHead::STATE_TX ||
2722 state == BufferHead::STATE_DIRTY) &&
2723 s != BufferHead::STATE_TX &&
2724 s != BufferHead::STATE_DIRTY) {
2725 dirty_or_tx_bh.erase(bh);
2726 }
2727
2728 if (s != BufferHead::STATE_ERROR &&
2729 state == BufferHead::STATE_ERROR) {
2730 bh->error = 0;
2731 }
2732
2733 // set state
2734 bh_stat_sub(bh);
2735 bh->set_state(s);
2736 bh_stat_add(bh);
2737}
2738
2739void ObjectCacher::bh_add(Object *ob, BufferHead *bh)
2740{
9f95a23c 2741 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
2742 ldout(cct, 30) << "bh_add " << *ob << " " << *bh << dendl;
2743 ob->add_bh(bh);
2744 if (bh->is_dirty()) {
2745 bh_lru_dirty.lru_insert_top(bh);
2746 dirty_or_tx_bh.insert(bh);
2747 } else {
2748 if (bh->get_dontneed())
2749 bh_lru_rest.lru_insert_bot(bh);
2750 else
2751 bh_lru_rest.lru_insert_top(bh);
2752 }
2753
2754 if (bh->is_tx()) {
2755 dirty_or_tx_bh.insert(bh);
2756 }
2757 bh_stat_add(bh);
2758}
2759
2760void ObjectCacher::bh_remove(Object *ob, BufferHead *bh)
2761{
9f95a23c 2762 ceph_assert(ceph_mutex_is_locked(lock));
11fdf7f2 2763 ceph_assert(bh->get_journal_tid() == 0);
7c673cae
FG
2764 ldout(cct, 30) << "bh_remove " << *ob << " " << *bh << dendl;
2765 ob->remove_bh(bh);
2766 if (bh->is_dirty()) {
2767 bh_lru_dirty.lru_remove(bh);
2768 dirty_or_tx_bh.erase(bh);
2769 } else {
2770 bh_lru_rest.lru_remove(bh);
2771 }
2772
2773 if (bh->is_tx()) {
2774 dirty_or_tx_bh.erase(bh);
2775 }
2776 bh_stat_sub(bh);
2777 if (get_stat_dirty_waiting() > 0)
9f95a23c 2778 stat_cond.notify_all();
7c673cae
FG
2779}
2780