]> git.proxmox.com Git - ceph.git/blob - ceph/src/osdc/ObjectCacher.cc
update sources to v12.2.3
[ceph.git] / ceph / src / osdc / ObjectCacher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <limits.h>
5
6 #include "msg/Messenger.h"
7 #include "ObjectCacher.h"
8 #include "WritebackHandler.h"
9 #include "common/errno.h"
10 #include "common/perf_counters.h"
11
12 #include "include/assert.h"
13
14 #define MAX_FLUSH_UNDER_LOCK 20 ///< max bh's we start writeback on
15 #define BUFFER_MEMORY_WEIGHT 12 // memory usage of BufferHead, count in (1<<n)
16
17 using std::chrono::seconds;
18 /// while holding the lock
19
20 /*** ObjectCacher::BufferHead ***/
21
22
23 /*** ObjectCacher::Object ***/
24
25 #define dout_subsys ceph_subsys_objectcacher
26 #undef dout_prefix
27 #define dout_prefix *_dout << "objectcacher.object(" << oid << ") "
28
29
30
31 class ObjectCacher::C_ReadFinish : public Context {
32 ObjectCacher *oc;
33 int64_t poolid;
34 sobject_t oid;
35 loff_t start;
36 uint64_t length;
37 xlist<C_ReadFinish*>::item set_item;
38 bool trust_enoent;
39 ceph_tid_t tid;
40 ZTracer::Trace trace;
41
42 public:
43 bufferlist bl;
44 C_ReadFinish(ObjectCacher *c, Object *ob, ceph_tid_t t, loff_t s,
45 uint64_t l, const ZTracer::Trace &trace) :
46 oc(c), poolid(ob->oloc.pool), oid(ob->get_soid()), start(s), length(l),
47 set_item(this), trust_enoent(true),
48 tid(t), trace(trace) {
49 ob->reads.push_back(&set_item);
50 }
51
52 void finish(int r) override {
53 oc->bh_read_finish(poolid, oid, tid, start, length, bl, r, trust_enoent);
54 trace.event("finish");
55
56 // object destructor clears the list
57 if (set_item.is_on_list())
58 set_item.remove_myself();
59 }
60
61 void distrust_enoent() {
62 trust_enoent = false;
63 }
64 };
65
66 class ObjectCacher::C_RetryRead : public Context {
67 ObjectCacher *oc;
68 OSDRead *rd;
69 ObjectSet *oset;
70 Context *onfinish;
71 ZTracer::Trace trace;
72 public:
73 C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c,
74 const ZTracer::Trace &trace)
75 : oc(_oc), rd(r), oset(os), onfinish(c), trace(trace) {
76 }
77 void finish(int r) override {
78 if (r >= 0) {
79 r = oc->_readx(rd, oset, onfinish, false, &trace);
80 }
81
82 if (r == 0) {
83 // read is still in-progress
84 return;
85 }
86
87 trace.event("finish");
88 if (onfinish) {
89 onfinish->complete(r);
90 }
91 }
92 };
93
94 ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left,
95 loff_t off)
96 {
97 assert(oc->lock.is_locked());
98 ldout(oc->cct, 20) << "split " << *left << " at " << off << dendl;
99
100 // split off right
101 ObjectCacher::BufferHead *right = new BufferHead(this);
102
103 //inherit and if later access, this auto clean.
104 right->set_dontneed(left->get_dontneed());
105 right->set_nocache(left->get_nocache());
106
107 right->last_write_tid = left->last_write_tid;
108 right->last_read_tid = left->last_read_tid;
109 right->set_state(left->get_state());
110 right->snapc = left->snapc;
111 right->set_journal_tid(left->journal_tid);
112
113 loff_t newleftlen = off - left->start();
114 right->set_start(off);
115 right->set_length(left->length() - newleftlen);
116
117 // shorten left
118 oc->bh_stat_sub(left);
119 left->set_length(newleftlen);
120 oc->bh_stat_add(left);
121
122 // add right
123 oc->bh_add(this, right);
124
125 // split buffers too
126 bufferlist bl;
127 bl.claim(left->bl);
128 if (bl.length()) {
129 assert(bl.length() == (left->length() + right->length()));
130 right->bl.substr_of(bl, left->length(), right->length());
131 left->bl.substr_of(bl, 0, left->length());
132 }
133
134 // move read waiters
135 if (!left->waitfor_read.empty()) {
136 map<loff_t, list<Context*> >::iterator start_remove
137 = left->waitfor_read.begin();
138 while (start_remove != left->waitfor_read.end() &&
139 start_remove->first < right->start())
140 ++start_remove;
141 for (map<loff_t, list<Context*> >::iterator p = start_remove;
142 p != left->waitfor_read.end(); ++p) {
143 ldout(oc->cct, 20) << "split moving waiters at byte " << p->first
144 << " to right bh" << dendl;
145 right->waitfor_read[p->first].swap( p->second );
146 assert(p->second.empty());
147 }
148 left->waitfor_read.erase(start_remove, left->waitfor_read.end());
149 }
150
151 ldout(oc->cct, 20) << "split left is " << *left << dendl;
152 ldout(oc->cct, 20) << "split right is " << *right << dendl;
153 return right;
154 }
155
156
157 void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
158 {
159 assert(oc->lock.is_locked());
160
161 ldout(oc->cct, 10) << "merge_left " << *left << " + " << *right << dendl;
162 if (left->get_journal_tid() == 0) {
163 left->set_journal_tid(right->get_journal_tid());
164 }
165 right->set_journal_tid(0);
166
167 oc->bh_remove(this, right);
168 oc->bh_stat_sub(left);
169 left->set_length(left->length() + right->length());
170 oc->bh_stat_add(left);
171
172 // data
173 left->bl.claim_append(right->bl);
174
175 // version
176 // note: this is sorta busted, but should only be used for dirty buffers
177 left->last_write_tid = MAX( left->last_write_tid, right->last_write_tid );
178 left->last_write = MAX( left->last_write, right->last_write );
179
180 left->set_dontneed(right->get_dontneed() ? left->get_dontneed() : false);
181 left->set_nocache(right->get_nocache() ? left->get_nocache() : false);
182
183 // waiters
184 for (map<loff_t, list<Context*> >::iterator p = right->waitfor_read.begin();
185 p != right->waitfor_read.end();
186 ++p)
187 left->waitfor_read[p->first].splice(left->waitfor_read[p->first].begin(),
188 p->second );
189
190 // hose right
191 delete right;
192
193 ldout(oc->cct, 10) << "merge_left result " << *left << dendl;
194 }
195
196 bool ObjectCacher::Object::can_merge_bh(BufferHead *left, BufferHead *right)
197 {
198 if (left->end() != right->start() ||
199 left->get_state() != right->get_state() ||
200 !left->can_merge_journal(right))
201 return false;
202 if (left->is_tx() && left->last_write_tid != right->last_write_tid)
203 return false;
204 return true;
205 }
206
207 void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
208 {
209 assert(oc->lock.is_locked());
210 ldout(oc->cct, 10) << "try_merge_bh " << *bh << dendl;
211
212 // do not merge rx buffers; last_read_tid may not match
213 if (bh->is_rx())
214 return;
215
216 // to the left?
217 map<loff_t,BufferHead*>::iterator p = data.find(bh->start());
218 assert(p->second == bh);
219 if (p != data.begin()) {
220 --p;
221 if (can_merge_bh(p->second, bh)) {
222 merge_left(p->second, bh);
223 bh = p->second;
224 } else {
225 ++p;
226 }
227 }
228 // to the right?
229 assert(p->second == bh);
230 ++p;
231 if (p != data.end() && can_merge_bh(bh, p->second))
232 merge_left(bh, p->second);
233 }
234
235 /*
236 * count bytes we have cached in given range
237 */
238 bool ObjectCacher::Object::is_cached(loff_t cur, loff_t left) const
239 {
240 assert(oc->lock.is_locked());
241 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(cur);
242 while (left > 0) {
243 if (p == data.end())
244 return false;
245
246 if (p->first <= cur) {
247 // have part of it
248 loff_t lenfromcur = MIN(p->second->end() - cur, left);
249 cur += lenfromcur;
250 left -= lenfromcur;
251 ++p;
252 continue;
253 } else if (p->first > cur) {
254 // gap
255 return false;
256 } else
257 ceph_abort();
258 }
259
260 return true;
261 }
262
263 /*
264 * all cached data in this range[off, off+len]
265 */
266 bool ObjectCacher::Object::include_all_cached_data(loff_t off, loff_t len)
267 {
268 assert(oc->lock.is_locked());
269 if (data.empty())
270 return true;
271 map<loff_t, BufferHead*>::iterator first = data.begin();
272 map<loff_t, BufferHead*>::reverse_iterator last = data.rbegin();
273 if (first->second->start() >= off && last->second->end() <= (off + len))
274 return true;
275 else
276 return false;
277 }
278
279 /*
280 * map a range of bytes into buffer_heads.
281 * - create missing buffer_heads as necessary.
282 */
283 int ObjectCacher::Object::map_read(ObjectExtent &ex,
284 map<loff_t, BufferHead*>& hits,
285 map<loff_t, BufferHead*>& missing,
286 map<loff_t, BufferHead*>& rx,
287 map<loff_t, BufferHead*>& errors)
288 {
289 assert(oc->lock.is_locked());
290 ldout(oc->cct, 10) << "map_read " << ex.oid << " "
291 << ex.offset << "~" << ex.length << dendl;
292
293 loff_t cur = ex.offset;
294 loff_t left = ex.length;
295
296 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
297 while (left > 0) {
298 // at end?
299 if (p == data.end()) {
300 // rest is a miss.
301 BufferHead *n = new BufferHead(this);
302 n->set_start(cur);
303 n->set_length(left);
304 oc->bh_add(this, n);
305 if (complete) {
306 oc->mark_zero(n);
307 hits[cur] = n;
308 ldout(oc->cct, 20) << "map_read miss+complete+zero " << left << " left, " << *n << dendl;
309 } else {
310 missing[cur] = n;
311 ldout(oc->cct, 20) << "map_read miss " << left << " left, " << *n << dendl;
312 }
313 cur += left;
314 assert(cur == (loff_t)ex.offset + (loff_t)ex.length);
315 break; // no more.
316 }
317
318 if (p->first <= cur) {
319 // have it (or part of it)
320 BufferHead *e = p->second;
321
322 if (e->is_clean() ||
323 e->is_dirty() ||
324 e->is_tx() ||
325 e->is_zero()) {
326 hits[cur] = e; // readable!
327 ldout(oc->cct, 20) << "map_read hit " << *e << dendl;
328 } else if (e->is_rx()) {
329 rx[cur] = e; // missing, not readable.
330 ldout(oc->cct, 20) << "map_read rx " << *e << dendl;
331 } else if (e->is_error()) {
332 errors[cur] = e;
333 ldout(oc->cct, 20) << "map_read error " << *e << dendl;
334 } else {
335 ceph_abort();
336 }
337
338 loff_t lenfromcur = MIN(e->end() - cur, left);
339 cur += lenfromcur;
340 left -= lenfromcur;
341 ++p;
342 continue; // more?
343
344 } else if (p->first > cur) {
345 // gap.. miss
346 loff_t next = p->first;
347 BufferHead *n = new BufferHead(this);
348 loff_t len = MIN(next - cur, left);
349 n->set_start(cur);
350 n->set_length(len);
351 oc->bh_add(this,n);
352 if (complete) {
353 oc->mark_zero(n);
354 hits[cur] = n;
355 ldout(oc->cct, 20) << "map_read gap+complete+zero " << *n << dendl;
356 } else {
357 missing[cur] = n;
358 ldout(oc->cct, 20) << "map_read gap " << *n << dendl;
359 }
360 cur += MIN(left, n->length());
361 left -= MIN(left, n->length());
362 continue; // more?
363 } else {
364 ceph_abort();
365 }
366 }
367 return 0;
368 }
369
370 void ObjectCacher::Object::audit_buffers()
371 {
372 loff_t offset = 0;
373 for (map<loff_t, BufferHead*>::const_iterator it = data.begin();
374 it != data.end(); ++it) {
375 if (it->first != it->second->start()) {
376 lderr(oc->cct) << "AUDIT FAILURE: map position " << it->first
377 << " does not match bh start position: "
378 << *it->second << dendl;
379 assert(it->first == it->second->start());
380 }
381 if (it->first < offset) {
382 lderr(oc->cct) << "AUDIT FAILURE: " << it->first << " " << *it->second
383 << " overlaps with previous bh " << *((--it)->second)
384 << dendl;
385 assert(it->first >= offset);
386 }
387 BufferHead *bh = it->second;
388 map<loff_t, list<Context*> >::const_iterator w_it;
389 for (w_it = bh->waitfor_read.begin();
390 w_it != bh->waitfor_read.end(); ++w_it) {
391 if (w_it->first < bh->start() ||
392 w_it->first >= bh->start() + bh->length()) {
393 lderr(oc->cct) << "AUDIT FAILURE: waiter at " << w_it->first
394 << " is not within bh " << *bh << dendl;
395 assert(w_it->first >= bh->start());
396 assert(w_it->first < bh->start() + bh->length());
397 }
398 }
399 offset = it->first + it->second->length();
400 }
401 }
402
403 /*
404 * map a range of extents on an object's buffer cache.
405 * - combine any bh's we're writing into one
406 * - break up bufferheads that don't fall completely within the range
407 * //no! - return a bh that includes the write. may also include
408 * other dirty data to left and/or right.
409 */
410 ObjectCacher::BufferHead *ObjectCacher::Object::map_write(ObjectExtent &ex,
411 ceph_tid_t tid)
412 {
413 assert(oc->lock.is_locked());
414 BufferHead *final = 0;
415
416 ldout(oc->cct, 10) << "map_write oex " << ex.oid
417 << " " << ex.offset << "~" << ex.length << dendl;
418
419 loff_t cur = ex.offset;
420 loff_t left = ex.length;
421
422 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
423 while (left > 0) {
424 loff_t max = left;
425
426 // at end ?
427 if (p == data.end()) {
428 if (final == NULL) {
429 final = new BufferHead(this);
430 replace_journal_tid(final, tid);
431 final->set_start( cur );
432 final->set_length( max );
433 oc->bh_add(this, final);
434 ldout(oc->cct, 10) << "map_write adding trailing bh " << *final << dendl;
435 } else {
436 oc->bh_stat_sub(final);
437 final->set_length(final->length() + max);
438 oc->bh_stat_add(final);
439 }
440 left -= max;
441 cur += max;
442 continue;
443 }
444
445 ldout(oc->cct, 10) << "cur is " << cur << ", p is " << *p->second << dendl;
446 //oc->verify_stats();
447
448 if (p->first <= cur) {
449 BufferHead *bh = p->second;
450 ldout(oc->cct, 10) << "map_write bh " << *bh << " intersected" << dendl;
451
452 if (p->first < cur) {
453 assert(final == 0);
454 if (cur + max >= bh->end()) {
455 // we want right bit (one splice)
456 final = split(bh, cur); // just split it, take right half.
457 replace_journal_tid(final, tid);
458 ++p;
459 assert(p->second == final);
460 } else {
461 // we want middle bit (two splices)
462 final = split(bh, cur);
463 ++p;
464 assert(p->second == final);
465 split(final, cur+max);
466 replace_journal_tid(final, tid);
467 }
468 } else {
469 assert(p->first == cur);
470 if (bh->length() <= max) {
471 // whole bufferhead, piece of cake.
472 } else {
473 // we want left bit (one splice)
474 split(bh, cur + max); // just split
475 }
476 if (final) {
477 oc->mark_dirty(bh);
478 oc->mark_dirty(final);
479 --p; // move iterator back to final
480 assert(p->second == final);
481 replace_journal_tid(bh, tid);
482 merge_left(final, bh);
483 } else {
484 final = bh;
485 replace_journal_tid(final, tid);
486 }
487 }
488
489 // keep going.
490 loff_t lenfromcur = final->end() - cur;
491 cur += lenfromcur;
492 left -= lenfromcur;
493 ++p;
494 continue;
495 } else {
496 // gap!
497 loff_t next = p->first;
498 loff_t glen = MIN(next - cur, max);
499 ldout(oc->cct, 10) << "map_write gap " << cur << "~" << glen << dendl;
500 if (final) {
501 oc->bh_stat_sub(final);
502 final->set_length(final->length() + glen);
503 oc->bh_stat_add(final);
504 } else {
505 final = new BufferHead(this);
506 replace_journal_tid(final, tid);
507 final->set_start( cur );
508 final->set_length( glen );
509 oc->bh_add(this, final);
510 }
511
512 cur += glen;
513 left -= glen;
514 continue; // more?
515 }
516 }
517
518 // set version
519 assert(final);
520 assert(final->get_journal_tid() == tid);
521 ldout(oc->cct, 10) << "map_write final is " << *final << dendl;
522
523 return final;
524 }
525
526 void ObjectCacher::Object::replace_journal_tid(BufferHead *bh,
527 ceph_tid_t tid) {
528 ceph_tid_t bh_tid = bh->get_journal_tid();
529
530 assert(tid == 0 || bh_tid <= tid);
531 if (bh_tid != 0 && bh_tid != tid) {
532 // inform journal that it should not expect a writeback from this extent
533 oc->writeback_handler.overwrite_extent(get_oid(), bh->start(),
534 bh->length(), bh_tid, tid);
535 }
536 bh->set_journal_tid(tid);
537 }
538
539 void ObjectCacher::Object::truncate(loff_t s)
540 {
541 assert(oc->lock.is_locked());
542 ldout(oc->cct, 10) << "truncate " << *this << " to " << s << dendl;
543
544 while (!data.empty()) {
545 BufferHead *bh = data.rbegin()->second;
546 if (bh->end() <= s)
547 break;
548
549 // split bh at truncation point?
550 if (bh->start() < s) {
551 split(bh, s);
552 continue;
553 }
554
555 // remove bh entirely
556 assert(bh->start() >= s);
557 assert(bh->waitfor_read.empty());
558 replace_journal_tid(bh, 0);
559 oc->bh_remove(this, bh);
560 delete bh;
561 }
562 }
563
564 void ObjectCacher::Object::discard(loff_t off, loff_t len)
565 {
566 assert(oc->lock.is_locked());
567 ldout(oc->cct, 10) << "discard " << *this << " " << off << "~" << len
568 << dendl;
569
570 if (!exists) {
571 ldout(oc->cct, 10) << " setting exists on " << *this << dendl;
572 exists = true;
573 }
574 if (complete) {
575 ldout(oc->cct, 10) << " clearing complete on " << *this << dendl;
576 complete = false;
577 }
578
579 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(off);
580 while (p != data.end()) {
581 BufferHead *bh = p->second;
582 if (bh->start() >= off + len)
583 break;
584
585 // split bh at truncation point?
586 if (bh->start() < off) {
587 split(bh, off);
588 ++p;
589 continue;
590 }
591
592 assert(bh->start() >= off);
593 if (bh->end() > off + len) {
594 split(bh, off + len);
595 }
596
597 ++p;
598 ldout(oc->cct, 10) << "discard " << *this << " bh " << *bh << dendl;
599 assert(bh->waitfor_read.empty());
600 replace_journal_tid(bh, 0);
601 oc->bh_remove(this, bh);
602 delete bh;
603 }
604 }
605
606
607
608 /*** ObjectCacher ***/
609
610 #undef dout_prefix
611 #define dout_prefix *_dout << "objectcacher "
612
613
614 ObjectCacher::ObjectCacher(CephContext *cct_, string name,
615 WritebackHandler& wb, Mutex& l,
616 flush_set_callback_t flush_callback,
617 void *flush_callback_arg, uint64_t max_bytes,
618 uint64_t max_objects, uint64_t max_dirty,
619 uint64_t target_dirty, double max_dirty_age,
620 bool block_writes_upfront)
621 : perfcounter(NULL),
622 cct(cct_), writeback_handler(wb), name(name), lock(l),
623 max_dirty(max_dirty), target_dirty(target_dirty),
624 max_size(max_bytes), max_objects(max_objects),
625 max_dirty_age(ceph::make_timespan(max_dirty_age)),
626 block_writes_upfront(block_writes_upfront),
627 trace_endpoint("ObjectCacher"),
628 flush_set_callback(flush_callback),
629 flush_set_callback_arg(flush_callback_arg),
630 last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct),
631 stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
632 stat_missing(0), stat_error(0), stat_dirty_waiting(0),
633 stat_nr_dirty_waiters(0), reads_outstanding(0)
634 {
635 perf_start();
636 finisher.start();
637 scattered_write = writeback_handler.can_scattered_write();
638 }
639
640 ObjectCacher::~ObjectCacher()
641 {
642 finisher.stop();
643 perf_stop();
644 // we should be empty.
645 for (vector<ceph::unordered_map<sobject_t, Object *> >::iterator i
646 = objects.begin();
647 i != objects.end();
648 ++i)
649 assert(i->empty());
650 assert(bh_lru_rest.lru_get_size() == 0);
651 assert(bh_lru_dirty.lru_get_size() == 0);
652 assert(ob_lru.lru_get_size() == 0);
653 assert(dirty_or_tx_bh.empty());
654 }
655
656 void ObjectCacher::perf_start()
657 {
658 string n = "objectcacher-" + name;
659 PerfCountersBuilder plb(cct, n, l_objectcacher_first, l_objectcacher_last);
660
661 plb.add_u64_counter(l_objectcacher_cache_ops_hit,
662 "cache_ops_hit", "Hit operations");
663 plb.add_u64_counter(l_objectcacher_cache_ops_miss,
664 "cache_ops_miss", "Miss operations");
665 plb.add_u64_counter(l_objectcacher_cache_bytes_hit,
666 "cache_bytes_hit", "Hit data");
667 plb.add_u64_counter(l_objectcacher_cache_bytes_miss,
668 "cache_bytes_miss", "Miss data");
669 plb.add_u64_counter(l_objectcacher_data_read,
670 "data_read", "Read data");
671 plb.add_u64_counter(l_objectcacher_data_written,
672 "data_written", "Data written to cache");
673 plb.add_u64_counter(l_objectcacher_data_flushed,
674 "data_flushed", "Data flushed");
675 plb.add_u64_counter(l_objectcacher_overwritten_in_flush,
676 "data_overwritten_while_flushing",
677 "Data overwritten while flushing");
678 plb.add_u64_counter(l_objectcacher_write_ops_blocked, "write_ops_blocked",
679 "Write operations, delayed due to dirty limits");
680 plb.add_u64_counter(l_objectcacher_write_bytes_blocked,
681 "write_bytes_blocked",
682 "Write data blocked on dirty limit");
683 plb.add_time(l_objectcacher_write_time_blocked, "write_time_blocked",
684 "Time spent blocking a write due to dirty limits");
685
686 perfcounter = plb.create_perf_counters();
687 cct->get_perfcounters_collection()->add(perfcounter);
688 }
689
690 void ObjectCacher::perf_stop()
691 {
692 assert(perfcounter);
693 cct->get_perfcounters_collection()->remove(perfcounter);
694 delete perfcounter;
695 }
696
697 /* private */
698 ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid,
699 uint64_t object_no,
700 ObjectSet *oset,
701 object_locator_t &l,
702 uint64_t truncate_size,
703 uint64_t truncate_seq)
704 {
705 // XXX: Add handling of nspace in object_locator_t in cache
706 assert(lock.is_locked());
707 // have it?
708 if ((uint32_t)l.pool < objects.size()) {
709 if (objects[l.pool].count(oid)) {
710 Object *o = objects[l.pool][oid];
711 o->object_no = object_no;
712 o->truncate_size = truncate_size;
713 o->truncate_seq = truncate_seq;
714 return o;
715 }
716 } else {
717 objects.resize(l.pool+1);
718 }
719
720 // create it.
721 Object *o = new Object(this, oid, object_no, oset, l, truncate_size,
722 truncate_seq);
723 objects[l.pool][oid] = o;
724 ob_lru.lru_insert_top(o);
725 return o;
726 }
727
728 void ObjectCacher::close_object(Object *ob)
729 {
730 assert(lock.is_locked());
731 ldout(cct, 10) << "close_object " << *ob << dendl;
732 assert(ob->can_close());
733
734 // ok!
735 ob_lru.lru_remove(ob);
736 objects[ob->oloc.pool].erase(ob->get_soid());
737 ob->set_item.remove_myself();
738 delete ob;
739 }
740
741 void ObjectCacher::bh_read(BufferHead *bh, int op_flags,
742 const ZTracer::Trace &parent_trace)
743 {
744 assert(lock.is_locked());
745 ldout(cct, 7) << "bh_read on " << *bh << " outstanding reads "
746 << reads_outstanding << dendl;
747
748 ZTracer::Trace trace;
749 if (parent_trace.valid()) {
750 trace.init("", &trace_endpoint, &parent_trace);
751 trace.copy_name("bh_read " + bh->ob->get_oid().name);
752 trace.event("start");
753 }
754
755 mark_rx(bh);
756 bh->last_read_tid = ++last_read_tid;
757
758 // finisher
759 C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob, bh->last_read_tid,
760 bh->start(), bh->length(), trace);
761 // go
762 writeback_handler.read(bh->ob->get_oid(), bh->ob->get_object_number(),
763 bh->ob->get_oloc(), bh->start(), bh->length(),
764 bh->ob->get_snap(), &onfinish->bl,
765 bh->ob->truncate_size, bh->ob->truncate_seq,
766 op_flags, trace, onfinish);
767
768 ++reads_outstanding;
769 }
770
771 void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid,
772 ceph_tid_t tid, loff_t start,
773 uint64_t length, bufferlist &bl, int r,
774 bool trust_enoent)
775 {
776 assert(lock.is_locked());
777 ldout(cct, 7) << "bh_read_finish "
778 << oid
779 << " tid " << tid
780 << " " << start << "~" << length
781 << " (bl is " << bl.length() << ")"
782 << " returned " << r
783 << " outstanding reads " << reads_outstanding
784 << dendl;
785
786 if (r >= 0 && bl.length() < length) {
787 ldout(cct, 7) << "bh_read_finish " << oid << " padding " << start << "~"
788 << length << " with " << length - bl.length() << " bytes of zeroes"
789 << dendl;
790 bl.append_zero(length - bl.length());
791 }
792
793 list<Context*> ls;
794 int err = 0;
795
796 if (objects[poolid].count(oid) == 0) {
797 ldout(cct, 7) << "bh_read_finish no object cache" << dendl;
798 } else {
799 Object *ob = objects[poolid][oid];
800
801 if (r == -ENOENT && !ob->complete) {
802 // wake up *all* rx waiters, or else we risk reordering
803 // identical reads. e.g.
804 // read 1~1
805 // reply to unrelated 3~1 -> !exists
806 // read 1~1 -> immediate ENOENT
807 // reply to first 1~1 -> ooo ENOENT
808 bool allzero = true;
809 for (map<loff_t, BufferHead*>::iterator p = ob->data.begin();
810 p != ob->data.end(); ++p) {
811 BufferHead *bh = p->second;
812 for (map<loff_t, list<Context*> >::iterator p
813 = bh->waitfor_read.begin();
814 p != bh->waitfor_read.end();
815 ++p)
816 ls.splice(ls.end(), p->second);
817 bh->waitfor_read.clear();
818 if (!bh->is_zero() && !bh->is_rx())
819 allzero = false;
820 }
821
822 // just pass through and retry all waiters if we don't trust
823 // -ENOENT for this read
824 if (trust_enoent) {
825 ldout(cct, 7)
826 << "bh_read_finish ENOENT, marking complete and !exists on " << *ob
827 << dendl;
828 ob->complete = true;
829 ob->exists = false;
830
831 /* If all the bhs are effectively zero, get rid of them. All
832 * the waiters will be retried and get -ENOENT immediately, so
833 * it's safe to clean up the unneeded bh's now. Since we know
834 * it's safe to remove them now, do so, so they aren't hanging
835 *around waiting for more -ENOENTs from rados while the cache
836 * is being shut down.
837 *
838 * Only do this when all the bhs are rx or clean, to match the
839 * condition in _readx(). If there are any non-rx or non-clean
840 * bhs, _readx() will wait for the final result instead of
841 * returning -ENOENT immediately.
842 */
843 if (allzero) {
844 ldout(cct, 10)
845 << "bh_read_finish ENOENT and allzero, getting rid of "
846 << "bhs for " << *ob << dendl;
847 map<loff_t, BufferHead*>::iterator p = ob->data.begin();
848 while (p != ob->data.end()) {
849 BufferHead *bh = p->second;
850 // current iterator will be invalidated by bh_remove()
851 ++p;
852 bh_remove(ob, bh);
853 delete bh;
854 }
855 }
856 }
857 }
858
859 // apply to bh's!
860 loff_t opos = start;
861 while (true) {
862 map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(opos);
863 if (p == ob->data.end())
864 break;
865 if (opos >= start+(loff_t)length) {
866 ldout(cct, 20) << "break due to opos " << opos << " >= start+length "
867 << start << "+" << length << "=" << start+(loff_t)length
868 << dendl;
869 break;
870 }
871
872 BufferHead *bh = p->second;
873 ldout(cct, 20) << "checking bh " << *bh << dendl;
874
875 // finishers?
876 for (map<loff_t, list<Context*> >::iterator it
877 = bh->waitfor_read.begin();
878 it != bh->waitfor_read.end();
879 ++it)
880 ls.splice(ls.end(), it->second);
881 bh->waitfor_read.clear();
882
883 if (bh->start() > opos) {
884 ldout(cct, 1) << "bh_read_finish skipping gap "
885 << opos << "~" << bh->start() - opos
886 << dendl;
887 opos = bh->start();
888 continue;
889 }
890
891 if (!bh->is_rx()) {
892 ldout(cct, 10) << "bh_read_finish skipping non-rx " << *bh << dendl;
893 opos = bh->end();
894 continue;
895 }
896
897 if (bh->last_read_tid != tid) {
898 ldout(cct, 10) << "bh_read_finish bh->last_read_tid "
899 << bh->last_read_tid << " != tid " << tid
900 << ", skipping" << dendl;
901 opos = bh->end();
902 continue;
903 }
904
905 assert(opos >= bh->start());
906 assert(bh->start() == opos); // we don't merge rx bh's... yet!
907 assert(bh->length() <= start+(loff_t)length-opos);
908
909 if (bh->error < 0)
910 err = bh->error;
911
912 opos = bh->end();
913
914 if (r == -ENOENT) {
915 if (trust_enoent) {
916 ldout(cct, 10) << "bh_read_finish removing " << *bh << dendl;
917 bh_remove(ob, bh);
918 delete bh;
919 } else {
920 ldout(cct, 10) << "skipping unstrusted -ENOENT and will retry for "
921 << *bh << dendl;
922 }
923 continue;
924 }
925
926 if (r < 0) {
927 bh->error = r;
928 mark_error(bh);
929 } else {
930 bh->bl.substr_of(bl,
931 bh->start() - start,
932 bh->length());
933 mark_clean(bh);
934 }
935
936 ldout(cct, 10) << "bh_read_finish read " << *bh << dendl;
937
938 ob->try_merge_bh(bh);
939 }
940 }
941
942 // called with lock held.
943 ldout(cct, 20) << "finishing waiters " << ls << dendl;
944
945 finish_contexts(cct, ls, err);
946 retry_waiting_reads();
947
948 --reads_outstanding;
949 read_cond.Signal();
950 }
951
952 void ObjectCacher::bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
953 int64_t *max_amount, int *max_count)
954 {
955 list<BufferHead*> blist;
956
957 int count = 0;
958 int64_t total_len = 0;
959 set<BufferHead*, BufferHead::ptr_lt>::iterator it = dirty_or_tx_bh.find(bh);
960 assert(it != dirty_or_tx_bh.end());
961 for (set<BufferHead*, BufferHead::ptr_lt>::iterator p = it;
962 p != dirty_or_tx_bh.end();
963 ++p) {
964 BufferHead *obh = *p;
965 if (obh->ob != bh->ob)
966 break;
967 if (obh->is_dirty() && obh->last_write <= cutoff) {
968 blist.push_back(obh);
969 ++count;
970 total_len += obh->length();
971 if ((max_count && count > *max_count) ||
972 (max_amount && total_len > *max_amount))
973 break;
974 }
975 }
976
977 while (it != dirty_or_tx_bh.begin()) {
978 --it;
979 BufferHead *obh = *it;
980 if (obh->ob != bh->ob)
981 break;
982 if (obh->is_dirty() && obh->last_write <= cutoff) {
983 blist.push_front(obh);
984 ++count;
985 total_len += obh->length();
986 if ((max_count && count > *max_count) ||
987 (max_amount && total_len > *max_amount))
988 break;
989 }
990 }
991 if (max_count)
992 *max_count -= count;
993 if (max_amount)
994 *max_amount -= total_len;
995
996 bh_write_scattered(blist);
997 }
998
999 class ObjectCacher::C_WriteCommit : public Context {
1000 ObjectCacher *oc;
1001 int64_t poolid;
1002 sobject_t oid;
1003 vector<pair<loff_t, uint64_t> > ranges;
1004 ZTracer::Trace trace;
1005 public:
1006 ceph_tid_t tid = 0;
1007 C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, loff_t s,
1008 uint64_t l, const ZTracer::Trace &trace) :
1009 oc(c), poolid(_poolid), oid(o), trace(trace) {
1010 ranges.push_back(make_pair(s, l));
1011 }
1012 C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o,
1013 vector<pair<loff_t, uint64_t> >& _ranges) :
1014 oc(c), poolid(_poolid), oid(o), tid(0) {
1015 ranges.swap(_ranges);
1016 }
1017 void finish(int r) override {
1018 oc->bh_write_commit(poolid, oid, ranges, tid, r);
1019 trace.event("finish");
1020 }
1021 };
1022 void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
1023 {
1024 assert(lock.is_locked());
1025
1026 Object *ob = blist.front()->ob;
1027 ob->get();
1028
1029 ceph::real_time last_write;
1030 SnapContext snapc;
1031 vector<pair<loff_t, uint64_t> > ranges;
1032 vector<pair<uint64_t, bufferlist> > io_vec;
1033
1034 ranges.reserve(blist.size());
1035 io_vec.reserve(blist.size());
1036
1037 uint64_t total_len = 0;
1038 for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1039 BufferHead *bh = *p;
1040 ldout(cct, 7) << "bh_write_scattered " << *bh << dendl;
1041 assert(bh->ob == ob);
1042 assert(bh->bl.length() == bh->length());
1043 ranges.push_back(pair<loff_t, uint64_t>(bh->start(), bh->length()));
1044
1045 int n = io_vec.size();
1046 io_vec.resize(n + 1);
1047 io_vec[n].first = bh->start();
1048 io_vec[n].second = bh->bl;
1049
1050 total_len += bh->length();
1051 if (bh->snapc.seq > snapc.seq)
1052 snapc = bh->snapc;
1053 if (bh->last_write > last_write)
1054 last_write = bh->last_write;
1055 }
1056
1057 C_WriteCommit *oncommit = new C_WriteCommit(this, ob->oloc.pool, ob->get_soid(), ranges);
1058
1059 ceph_tid_t tid = writeback_handler.write(ob->get_oid(), ob->get_oloc(),
1060 io_vec, snapc, last_write,
1061 ob->truncate_size, ob->truncate_seq,
1062 oncommit);
1063 oncommit->tid = tid;
1064 ob->last_write_tid = tid;
1065 for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1066 BufferHead *bh = *p;
1067 bh->last_write_tid = tid;
1068 mark_tx(bh);
1069 }
1070
1071 if (perfcounter)
1072 perfcounter->inc(l_objectcacher_data_flushed, total_len);
1073 }
1074
1075 void ObjectCacher::bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace)
1076 {
1077 assert(lock.is_locked());
1078 ldout(cct, 7) << "bh_write " << *bh << dendl;
1079
1080 bh->ob->get();
1081
1082 ZTracer::Trace trace;
1083 if (parent_trace.valid()) {
1084 trace.init("", &trace_endpoint, &parent_trace);
1085 trace.copy_name("bh_write " + bh->ob->get_oid().name);
1086 trace.event("start");
1087 }
1088
1089 // finishers
1090 C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->oloc.pool,
1091 bh->ob->get_soid(), bh->start(),
1092 bh->length(), trace);
1093 // go
1094 ceph_tid_t tid = writeback_handler.write(bh->ob->get_oid(),
1095 bh->ob->get_oloc(),
1096 bh->start(), bh->length(),
1097 bh->snapc, bh->bl, bh->last_write,
1098 bh->ob->truncate_size,
1099 bh->ob->truncate_seq,
1100 bh->journal_tid, trace, oncommit);
1101 ldout(cct, 20) << " tid " << tid << " on " << bh->ob->get_oid() << dendl;
1102
1103 // set bh last_write_tid
1104 oncommit->tid = tid;
1105 bh->ob->last_write_tid = tid;
1106 bh->last_write_tid = tid;
1107
1108 if (perfcounter) {
1109 perfcounter->inc(l_objectcacher_data_flushed, bh->length());
1110 }
1111
1112 mark_tx(bh);
1113 }
1114
1115 void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid,
1116 vector<pair<loff_t, uint64_t> >& ranges,
1117 ceph_tid_t tid, int r)
1118 {
1119 assert(lock.is_locked());
1120 ldout(cct, 7) << "bh_write_commit " << oid << " tid " << tid
1121 << " ranges " << ranges << " returned " << r << dendl;
1122
1123 if (objects[poolid].count(oid) == 0) {
1124 ldout(cct, 7) << "bh_write_commit no object cache" << dendl;
1125 return;
1126 }
1127
1128 Object *ob = objects[poolid][oid];
1129 int was_dirty_or_tx = ob->oset->dirty_or_tx;
1130
1131 for (vector<pair<loff_t, uint64_t> >::iterator p = ranges.begin();
1132 p != ranges.end();
1133 ++p) {
1134 loff_t start = p->first;
1135 uint64_t length = p->second;
1136 if (!ob->exists) {
1137 ldout(cct, 10) << "bh_write_commit marking exists on " << *ob << dendl;
1138 ob->exists = true;
1139
1140 if (writeback_handler.may_copy_on_write(ob->get_oid(), start, length,
1141 ob->get_snap())) {
1142 ldout(cct, 10) << "bh_write_commit may copy on write, clearing "
1143 "complete on " << *ob << dendl;
1144 ob->complete = false;
1145 }
1146 }
1147
1148 vector<pair<loff_t, BufferHead*>> hit;
1149 // apply to bh's!
1150 for (map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(start);
1151 p != ob->data.end();
1152 ++p) {
1153 BufferHead *bh = p->second;
1154
1155 if (bh->start() >= start+(loff_t)length)
1156 break;
1157
1158 // make sure bh is tx
1159 if (!bh->is_tx()) {
1160 ldout(cct, 10) << "bh_write_commit skipping non-tx " << *bh << dendl;
1161 continue;
1162 }
1163
1164 // make sure bh tid matches
1165 if (bh->last_write_tid != tid) {
1166 assert(bh->last_write_tid > tid);
1167 ldout(cct, 10) << "bh_write_commit newer tid on " << *bh << dendl;
1168 continue;
1169 }
1170
1171 // we don't merge tx buffers. tx buffer should be within the range
1172 assert(bh->start() >= start);
1173 assert(bh->end() <= start+(loff_t)length);
1174
1175 if (r >= 0) {
1176 // ok! mark bh clean and error-free
1177 mark_clean(bh);
1178 bh->set_journal_tid(0);
1179 if (bh->get_nocache())
1180 bh_lru_rest.lru_bottouch(bh);
1181 hit.push_back(make_pair(bh->start(), bh));
1182 ldout(cct, 10) << "bh_write_commit clean " << *bh << dendl;
1183 } else {
1184 mark_dirty(bh);
1185 ldout(cct, 10) << "bh_write_commit marking dirty again due to error "
1186 << *bh << " r = " << r << " " << cpp_strerror(-r)
1187 << dendl;
1188 }
1189 }
1190
1191 for (auto& p : hit) {
1192 //p.second maybe merged and deleted in merge_left
1193 if (ob->data.count(p.first))
1194 ob->try_merge_bh(p.second);
1195 }
1196 }
1197
1198 // update last_commit.
1199 assert(ob->last_commit_tid < tid);
1200 ob->last_commit_tid = tid;
1201
1202 // waiters?
1203 list<Context*> ls;
1204 if (ob->waitfor_commit.count(tid)) {
1205 ls.splice(ls.begin(), ob->waitfor_commit[tid]);
1206 ob->waitfor_commit.erase(tid);
1207 }
1208
1209 // is the entire object set now clean and fully committed?
1210 ObjectSet *oset = ob->oset;
1211 ob->put();
1212
1213 if (flush_set_callback &&
1214 was_dirty_or_tx > 0 &&
1215 oset->dirty_or_tx == 0) { // nothing dirty/tx
1216 flush_set_callback(flush_set_callback_arg, oset);
1217 }
1218
1219 if (!ls.empty())
1220 finish_contexts(cct, ls, r);
1221 }
1222
1223 void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount)
1224 {
1225 assert(trace != nullptr);
1226 assert(lock.is_locked());
1227 ceph::real_time cutoff = ceph::real_clock::now();
1228
1229 ldout(cct, 10) << "flush " << amount << dendl;
1230
1231 /*
1232 * NOTE: we aren't actually pulling things off the LRU here, just
1233 * looking at the tail item. Then we call bh_write, which moves it
1234 * to the other LRU, so that we can call
1235 * lru_dirty.lru_get_next_expire() again.
1236 */
1237 int64_t left = amount;
1238 while (amount == 0 || left > 0) {
1239 BufferHead *bh = static_cast<BufferHead*>(
1240 bh_lru_dirty.lru_get_next_expire());
1241 if (!bh) break;
1242 if (bh->last_write > cutoff) break;
1243
1244 if (scattered_write) {
1245 bh_write_adjacencies(bh, cutoff, amount > 0 ? &left : NULL, NULL);
1246 } else {
1247 left -= bh->length();
1248 bh_write(bh, *trace);
1249 }
1250 }
1251 }
1252
1253
1254 void ObjectCacher::trim()
1255 {
1256 assert(lock.is_locked());
1257 ldout(cct, 10) << "trim start: bytes: max " << max_size << " clean "
1258 << get_stat_clean() << ", objects: max " << max_objects
1259 << " current " << ob_lru.lru_get_size() << dendl;
1260
1261 uint64_t max_clean_bh = max_size >> BUFFER_MEMORY_WEIGHT;
1262 uint64_t nr_clean_bh = bh_lru_rest.lru_get_size() - bh_lru_rest.lru_get_num_pinned();
1263 while (get_stat_clean() > 0 &&
1264 ((uint64_t)get_stat_clean() > max_size ||
1265 nr_clean_bh > max_clean_bh)) {
1266 BufferHead *bh = static_cast<BufferHead*>(bh_lru_rest.lru_expire());
1267 if (!bh)
1268 break;
1269
1270 ldout(cct, 10) << "trim trimming " << *bh << dendl;
1271 assert(bh->is_clean() || bh->is_zero() || bh->is_error());
1272
1273 Object *ob = bh->ob;
1274 bh_remove(ob, bh);
1275 delete bh;
1276
1277 --nr_clean_bh;
1278
1279 if (ob->complete) {
1280 ldout(cct, 10) << "trim clearing complete on " << *ob << dendl;
1281 ob->complete = false;
1282 }
1283 }
1284
1285 while (ob_lru.lru_get_size() > max_objects) {
1286 Object *ob = static_cast<Object*>(ob_lru.lru_expire());
1287 if (!ob)
1288 break;
1289
1290 ldout(cct, 10) << "trim trimming " << *ob << dendl;
1291 close_object(ob);
1292 }
1293
1294 ldout(cct, 10) << "trim finish: max " << max_size << " clean "
1295 << get_stat_clean() << ", objects: max " << max_objects
1296 << " current " << ob_lru.lru_get_size() << dendl;
1297 }
1298
1299
1300
1301 /* public */
1302
1303 bool ObjectCacher::is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
1304 snapid_t snapid)
1305 {
1306 assert(lock.is_locked());
1307 for (vector<ObjectExtent>::iterator ex_it = extents.begin();
1308 ex_it != extents.end();
1309 ++ex_it) {
1310 ldout(cct, 10) << "is_cached " << *ex_it << dendl;
1311
1312 // get Object cache
1313 sobject_t soid(ex_it->oid, snapid);
1314 Object *o = get_object_maybe(soid, ex_it->oloc);
1315 if (!o)
1316 return false;
1317 if (!o->is_cached(ex_it->offset, ex_it->length))
1318 return false;
1319 }
1320 return true;
1321 }
1322
1323
1324 /*
1325 * returns # bytes read (if in cache). onfinish is untouched (caller
1326 * must delete it)
1327 * returns 0 if doing async read
1328 */
1329 int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
1330 ZTracer::Trace *parent_trace)
1331 {
1332 ZTracer::Trace trace;
1333 if (parent_trace != nullptr) {
1334 trace.init("read", &trace_endpoint, parent_trace);
1335 trace.event("start");
1336 }
1337
1338 int r =_readx(rd, oset, onfinish, true, &trace);
1339 if (r < 0) {
1340 trace.event("finish");
1341 }
1342 return r;
1343 }
1344
1345 int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
1346 bool external_call, ZTracer::Trace *trace)
1347 {
1348 assert(trace != nullptr);
1349 assert(lock.is_locked());
1350 bool success = true;
1351 int error = 0;
1352 uint64_t bytes_in_cache = 0;
1353 uint64_t bytes_not_in_cache = 0;
1354 uint64_t total_bytes_read = 0;
1355 map<uint64_t, bufferlist> stripe_map; // final buffer offset -> substring
1356 bool dontneed = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1357 bool nocache = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1358
1359 /*
1360 * WARNING: we can only meaningfully return ENOENT if the read request
1361 * passed in a single ObjectExtent. Any caller who wants ENOENT instead of
1362 * zeroed buffers needs to feed single extents into readx().
1363 */
1364 assert(!oset->return_enoent || rd->extents.size() == 1);
1365
1366 for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
1367 ex_it != rd->extents.end();
1368 ++ex_it) {
1369 ldout(cct, 10) << "readx " << *ex_it << dendl;
1370
1371 total_bytes_read += ex_it->length;
1372
1373 // get Object cache
1374 sobject_t soid(ex_it->oid, rd->snap);
1375 Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1376 ex_it->truncate_size, oset->truncate_seq);
1377 if (external_call)
1378 touch_ob(o);
1379
1380 // does not exist and no hits?
1381 if (oset->return_enoent && !o->exists) {
1382 ldout(cct, 10) << "readx object !exists, 1 extent..." << dendl;
1383
1384 // should we worry about COW underneath us?
1385 if (writeback_handler.may_copy_on_write(soid.oid, ex_it->offset,
1386 ex_it->length, soid.snap)) {
1387 ldout(cct, 20) << "readx may copy on write" << dendl;
1388 bool wait = false;
1389 list<BufferHead*> blist;
1390 for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1391 bh_it != o->data.end();
1392 ++bh_it) {
1393 BufferHead *bh = bh_it->second;
1394 if (bh->is_dirty() || bh->is_tx()) {
1395 ldout(cct, 10) << "readx flushing " << *bh << dendl;
1396 wait = true;
1397 if (bh->is_dirty()) {
1398 if (scattered_write)
1399 blist.push_back(bh);
1400 else
1401 bh_write(bh, *trace);
1402 }
1403 }
1404 }
1405 if (scattered_write && !blist.empty())
1406 bh_write_scattered(blist);
1407 if (wait) {
1408 ldout(cct, 10) << "readx waiting on tid " << o->last_write_tid
1409 << " on " << *o << dendl;
1410 o->waitfor_commit[o->last_write_tid].push_back(
1411 new C_RetryRead(this,rd, oset, onfinish, *trace));
1412 // FIXME: perfcounter!
1413 return 0;
1414 }
1415 }
1416
1417 // can we return ENOENT?
1418 bool allzero = true;
1419 for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1420 bh_it != o->data.end();
1421 ++bh_it) {
1422 ldout(cct, 20) << "readx ob has bh " << *bh_it->second << dendl;
1423 if (!bh_it->second->is_zero() && !bh_it->second->is_rx()) {
1424 allzero = false;
1425 break;
1426 }
1427 }
1428 if (allzero) {
1429 ldout(cct, 10) << "readx ob has all zero|rx, returning ENOENT"
1430 << dendl;
1431 delete rd;
1432 if (dontneed)
1433 bottouch_ob(o);
1434 return -ENOENT;
1435 }
1436 }
1437
1438 // map extent into bufferheads
1439 map<loff_t, BufferHead*> hits, missing, rx, errors;
1440 o->map_read(*ex_it, hits, missing, rx, errors);
1441 if (external_call) {
1442 // retry reading error buffers
1443 missing.insert(errors.begin(), errors.end());
1444 } else {
1445 // some reads had errors, fail later so completions
1446 // are cleaned up properly
1447 // TODO: make read path not call _readx for every completion
1448 hits.insert(errors.begin(), errors.end());
1449 }
1450
1451 if (!missing.empty() || !rx.empty()) {
1452 // read missing
1453 map<loff_t, BufferHead*>::iterator last = missing.end();
1454 for (map<loff_t, BufferHead*>::iterator bh_it = missing.begin();
1455 bh_it != missing.end();
1456 ++bh_it) {
1457 uint64_t rx_bytes = static_cast<uint64_t>(
1458 stat_rx + bh_it->second->length());
1459 bytes_not_in_cache += bh_it->second->length();
1460 if (!waitfor_read.empty() || (stat_rx > 0 && rx_bytes > max_size)) {
1461 // cache is full with concurrent reads -- wait for rx's to complete
1462 // to constrain memory growth (especially during copy-ups)
1463 if (success) {
1464 ldout(cct, 10) << "readx missed, waiting on cache to complete "
1465 << waitfor_read.size() << " blocked reads, "
1466 << (MAX(rx_bytes, max_size) - max_size)
1467 << " read bytes" << dendl;
1468 waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish,
1469 *trace));
1470 }
1471
1472 bh_remove(o, bh_it->second);
1473 delete bh_it->second;
1474 } else {
1475 bh_it->second->set_nocache(nocache);
1476 bh_read(bh_it->second, rd->fadvise_flags, *trace);
1477 if ((success && onfinish) || last != missing.end())
1478 last = bh_it;
1479 }
1480 success = false;
1481 }
1482
1483 //add wait in last bh avoid wakeup early. Because read is order
1484 if (last != missing.end()) {
1485 ldout(cct, 10) << "readx missed, waiting on " << *last->second
1486 << " off " << last->first << dendl;
1487 last->second->waitfor_read[last->first].push_back(
1488 new C_RetryRead(this, rd, oset, onfinish, *trace) );
1489
1490 }
1491
1492 // bump rx
1493 for (map<loff_t, BufferHead*>::iterator bh_it = rx.begin();
1494 bh_it != rx.end();
1495 ++bh_it) {
1496 touch_bh(bh_it->second); // bump in lru, so we don't lose it.
1497 if (success && onfinish) {
1498 ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
1499 << " off " << bh_it->first << dendl;
1500 bh_it->second->waitfor_read[bh_it->first].push_back(
1501 new C_RetryRead(this, rd, oset, onfinish, *trace) );
1502 }
1503 bytes_not_in_cache += bh_it->second->length();
1504 success = false;
1505 }
1506
1507 for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1508 bh_it != hits.end(); ++bh_it)
1509 //bump in lru, so we don't lose it when later read
1510 touch_bh(bh_it->second);
1511
1512 } else {
1513 assert(!hits.empty());
1514
1515 // make a plain list
1516 for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1517 bh_it != hits.end();
1518 ++bh_it) {
1519 BufferHead *bh = bh_it->second;
1520 ldout(cct, 10) << "readx hit bh " << *bh << dendl;
1521 if (bh->is_error() && bh->error)
1522 error = bh->error;
1523 bytes_in_cache += bh->length();
1524
1525 if (bh->get_nocache() && bh->is_clean())
1526 bh_lru_rest.lru_bottouch(bh);
1527 else
1528 touch_bh(bh);
1529 //must be after touch_bh because touch_bh set dontneed false
1530 if (dontneed &&
1531 ((loff_t)ex_it->offset <= bh->start() &&
1532 (bh->end() <=(loff_t)(ex_it->offset + ex_it->length)))) {
1533 bh->set_dontneed(true); //if dirty
1534 if (bh->is_clean())
1535 bh_lru_rest.lru_bottouch(bh);
1536 }
1537 }
1538
1539 if (!error) {
1540 // create reverse map of buffer offset -> object for the
1541 // eventual result. this is over a single ObjectExtent, so we
1542 // know that
1543 // - the bh's are contiguous
1544 // - the buffer frags need not be (and almost certainly aren't)
1545 loff_t opos = ex_it->offset;
1546 map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1547 assert(bh_it->second->start() <= opos);
1548 uint64_t bhoff = opos - bh_it->second->start();
1549 vector<pair<uint64_t,uint64_t> >::iterator f_it
1550 = ex_it->buffer_extents.begin();
1551 uint64_t foff = 0;
1552 while (1) {
1553 BufferHead *bh = bh_it->second;
1554 assert(opos == (loff_t)(bh->start() + bhoff));
1555
1556 uint64_t len = MIN(f_it->second - foff, bh->length() - bhoff);
1557 ldout(cct, 10) << "readx rmap opos " << opos << ": " << *bh << " +"
1558 << bhoff << " frag " << f_it->first << "~"
1559 << f_it->second << " +" << foff << "~" << len
1560 << dendl;
1561
1562 bufferlist bit;
1563 // put substr here first, since substr_of clobbers, and we
1564 // may get multiple bh's at this stripe_map position
1565 if (bh->is_zero()) {
1566 stripe_map[f_it->first].append_zero(len);
1567 } else {
1568 bit.substr_of(bh->bl,
1569 opos - bh->start(),
1570 len);
1571 stripe_map[f_it->first].claim_append(bit);
1572 }
1573
1574 opos += len;
1575 bhoff += len;
1576 foff += len;
1577 if (opos == bh->end()) {
1578 ++bh_it;
1579 bhoff = 0;
1580 }
1581 if (foff == f_it->second) {
1582 ++f_it;
1583 foff = 0;
1584 }
1585 if (bh_it == hits.end()) break;
1586 if (f_it == ex_it->buffer_extents.end())
1587 break;
1588 }
1589 assert(f_it == ex_it->buffer_extents.end());
1590 assert(opos == (loff_t)ex_it->offset + (loff_t)ex_it->length);
1591 }
1592
1593 if (dontneed && o->include_all_cached_data(ex_it->offset, ex_it->length))
1594 bottouch_ob(o);
1595 }
1596 }
1597
1598 if (!success) {
1599 if (perfcounter && external_call) {
1600 perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1601 perfcounter->inc(l_objectcacher_cache_bytes_miss, bytes_not_in_cache);
1602 perfcounter->inc(l_objectcacher_cache_ops_miss);
1603 }
1604 if (onfinish) {
1605 ldout(cct, 20) << "readx defer " << rd << dendl;
1606 } else {
1607 ldout(cct, 20) << "readx drop " << rd << " (no complete, but no waiter)"
1608 << dendl;
1609 delete rd;
1610 }
1611 return 0; // wait!
1612 }
1613 if (perfcounter && external_call) {
1614 perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1615 perfcounter->inc(l_objectcacher_cache_bytes_hit, bytes_in_cache);
1616 perfcounter->inc(l_objectcacher_cache_ops_hit);
1617 }
1618
1619 // no misses... success! do the read.
1620 ldout(cct, 10) << "readx has all buffers" << dendl;
1621
1622 // ok, assemble into result buffer.
1623 uint64_t pos = 0;
1624 if (rd->bl && !error) {
1625 rd->bl->clear();
1626 for (map<uint64_t,bufferlist>::iterator i = stripe_map.begin();
1627 i != stripe_map.end();
1628 ++i) {
1629 assert(pos == i->first);
1630 ldout(cct, 10) << "readx adding buffer len " << i->second.length()
1631 << " at " << pos << dendl;
1632 pos += i->second.length();
1633 rd->bl->claim_append(i->second);
1634 assert(rd->bl->length() == pos);
1635 }
1636 ldout(cct, 10) << "readx result is " << rd->bl->length() << dendl;
1637 } else if (!error) {
1638 ldout(cct, 10) << "readx no bufferlist ptr (readahead?), done." << dendl;
1639 map<uint64_t,bufferlist>::reverse_iterator i = stripe_map.rbegin();
1640 pos = i->first + i->second.length();
1641 }
1642
1643 // done with read.
1644 int ret = error ? error : pos;
1645 ldout(cct, 20) << "readx done " << rd << " " << ret << dendl;
1646 assert(pos <= (uint64_t) INT_MAX);
1647
1648 delete rd;
1649
1650 trim();
1651
1652 return ret;
1653 }
1654
1655 void ObjectCacher::retry_waiting_reads()
1656 {
1657 list<Context *> ls;
1658 ls.swap(waitfor_read);
1659
1660 while (!ls.empty() && waitfor_read.empty()) {
1661 Context *ctx = ls.front();
1662 ls.pop_front();
1663 ctx->complete(0);
1664 }
1665 waitfor_read.splice(waitfor_read.end(), ls);
1666 }
1667
1668 int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
1669 ZTracer::Trace *parent_trace)
1670 {
1671 assert(lock.is_locked());
1672 ceph::real_time now = ceph::real_clock::now();
1673 uint64_t bytes_written = 0;
1674 uint64_t bytes_written_in_flush = 0;
1675 bool dontneed = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1676 bool nocache = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1677
1678 ZTracer::Trace trace;
1679 if (parent_trace != nullptr) {
1680 trace.init("write", &trace_endpoint, parent_trace);
1681 trace.event("start");
1682 }
1683
1684 for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
1685 ex_it != wr->extents.end();
1686 ++ex_it) {
1687 // get object cache
1688 sobject_t soid(ex_it->oid, CEPH_NOSNAP);
1689 Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1690 ex_it->truncate_size, oset->truncate_seq);
1691
1692 // map it all into a single bufferhead.
1693 BufferHead *bh = o->map_write(*ex_it, wr->journal_tid);
1694 bool missing = bh->is_missing();
1695 bh->snapc = wr->snapc;
1696
1697 bytes_written += ex_it->length;
1698 if (bh->is_tx()) {
1699 bytes_written_in_flush += ex_it->length;
1700 }
1701
1702 // adjust buffer pointers (ie "copy" data into my cache)
1703 // this is over a single ObjectExtent, so we know that
1704 // - there is one contiguous bh
1705 // - the buffer frags need not be (and almost certainly aren't)
1706 // note: i assume striping is monotonic... no jumps backwards, ever!
1707 loff_t opos = ex_it->offset;
1708 for (vector<pair<uint64_t, uint64_t> >::iterator f_it
1709 = ex_it->buffer_extents.begin();
1710 f_it != ex_it->buffer_extents.end();
1711 ++f_it) {
1712 ldout(cct, 10) << "writex writing " << f_it->first << "~"
1713 << f_it->second << " into " << *bh << " at " << opos
1714 << dendl;
1715 uint64_t bhoff = bh->start() - opos;
1716 assert(f_it->second <= bh->length() - bhoff);
1717
1718 // get the frag we're mapping in
1719 bufferlist frag;
1720 frag.substr_of(wr->bl,
1721 f_it->first, f_it->second);
1722
1723 // keep anything left of bhoff
1724 bufferlist newbl;
1725 if (bhoff)
1726 newbl.substr_of(bh->bl, 0, bhoff);
1727 newbl.claim_append(frag);
1728 bh->bl.swap(newbl);
1729
1730 opos += f_it->second;
1731 }
1732
1733 // ok, now bh is dirty.
1734 mark_dirty(bh);
1735 if (dontneed)
1736 bh->set_dontneed(true);
1737 else if (nocache && missing)
1738 bh->set_nocache(true);
1739 else
1740 touch_bh(bh);
1741
1742 bh->last_write = now;
1743
1744 o->try_merge_bh(bh);
1745 }
1746
1747 if (perfcounter) {
1748 perfcounter->inc(l_objectcacher_data_written, bytes_written);
1749 if (bytes_written_in_flush) {
1750 perfcounter->inc(l_objectcacher_overwritten_in_flush,
1751 bytes_written_in_flush);
1752 }
1753 }
1754
1755 int r = _wait_for_write(wr, bytes_written, oset, &trace, onfreespace);
1756 delete wr;
1757
1758 //verify_stats();
1759 trim();
1760 return r;
1761 }
1762
1763 class ObjectCacher::C_WaitForWrite : public Context {
1764 public:
1765 C_WaitForWrite(ObjectCacher *oc, uint64_t len,
1766 const ZTracer::Trace &trace, Context *onfinish) :
1767 m_oc(oc), m_len(len), m_trace(trace), m_onfinish(onfinish) {}
1768 void finish(int r) override;
1769 private:
1770 ObjectCacher *m_oc;
1771 uint64_t m_len;
1772 ZTracer::Trace m_trace;
1773 Context *m_onfinish;
1774 };
1775
1776 void ObjectCacher::C_WaitForWrite::finish(int r)
1777 {
1778 Mutex::Locker l(m_oc->lock);
1779 m_oc->maybe_wait_for_writeback(m_len, &m_trace);
1780 m_onfinish->complete(r);
1781 }
1782
1783 void ObjectCacher::maybe_wait_for_writeback(uint64_t len,
1784 ZTracer::Trace *trace)
1785 {
1786 assert(lock.is_locked());
1787 ceph::mono_time start = ceph::mono_clock::now();
1788 int blocked = 0;
1789 // wait for writeback?
1790 // - wait for dirty and tx bytes (relative to the max_dirty threshold)
1791 // - do not wait for bytes other waiters are waiting on. this means that
1792 // threads do not wait for each other. this effectively allows the cache
1793 // size to balloon proportional to the data that is in flight.
1794
1795 uint64_t max_dirty_bh = max_dirty >> BUFFER_MEMORY_WEIGHT;
1796 while (get_stat_dirty() + get_stat_tx() > 0 &&
1797 (((uint64_t)(get_stat_dirty() + get_stat_tx()) >=
1798 max_dirty + get_stat_dirty_waiting()) ||
1799 (dirty_or_tx_bh.size() >=
1800 max_dirty_bh + get_stat_nr_dirty_waiters()))) {
1801
1802 if (blocked == 0) {
1803 trace->event("start wait for writeback");
1804 }
1805 ldout(cct, 10) << __func__ << " waiting for dirty|tx "
1806 << (get_stat_dirty() + get_stat_tx()) << " >= max "
1807 << max_dirty << " + dirty_waiting "
1808 << get_stat_dirty_waiting() << dendl;
1809 flusher_cond.Signal();
1810 stat_dirty_waiting += len;
1811 ++stat_nr_dirty_waiters;
1812 stat_cond.Wait(lock);
1813 stat_dirty_waiting -= len;
1814 --stat_nr_dirty_waiters;
1815 ++blocked;
1816 ldout(cct, 10) << __func__ << " woke up" << dendl;
1817 }
1818 if (blocked > 0) {
1819 trace->event("finish wait for writeback");
1820 }
1821 if (blocked && perfcounter) {
1822 perfcounter->inc(l_objectcacher_write_ops_blocked);
1823 perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
1824 ceph::timespan blocked = ceph::mono_clock::now() - start;
1825 perfcounter->tinc(l_objectcacher_write_time_blocked, blocked);
1826 }
1827 }
1828
1829 // blocking wait for write.
1830 int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
1831 ZTracer::Trace *trace, Context *onfreespace)
1832 {
1833 assert(lock.is_locked());
1834 assert(trace != nullptr);
1835 int ret = 0;
1836
1837 if (max_dirty > 0) {
1838 if (block_writes_upfront) {
1839 maybe_wait_for_writeback(len, trace);
1840 if (onfreespace)
1841 onfreespace->complete(0);
1842 } else {
1843 assert(onfreespace);
1844 finisher.queue(new C_WaitForWrite(this, len, *trace, onfreespace));
1845 }
1846 } else {
1847 // write-thru! flush what we just wrote.
1848 Cond cond;
1849 bool done = false;
1850 Context *fin = block_writes_upfront ?
1851 new C_Cond(&cond, &done, &ret) : onfreespace;
1852 assert(fin);
1853 bool flushed = flush_set(oset, wr->extents, trace, fin);
1854 assert(!flushed); // we just dirtied it, and didn't drop our lock!
1855 ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len
1856 << " bytes" << dendl;
1857 if (block_writes_upfront) {
1858 while (!done)
1859 cond.Wait(lock);
1860 ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl;
1861 if (onfreespace)
1862 onfreespace->complete(ret);
1863 }
1864 }
1865
1866 // start writeback anyway?
1867 if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty) {
1868 ldout(cct, 10) << "wait_for_write " << get_stat_dirty() << " > target "
1869 << target_dirty << ", nudging flusher" << dendl;
1870 flusher_cond.Signal();
1871 }
1872 return ret;
1873 }
1874
1875 void ObjectCacher::flusher_entry()
1876 {
1877 ldout(cct, 10) << "flusher start" << dendl;
1878 lock.Lock();
1879 while (!flusher_stop) {
1880 loff_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() +
1881 get_stat_dirty();
1882 ldout(cct, 11) << "flusher "
1883 << all << " / " << max_size << ": "
1884 << get_stat_tx() << " tx, "
1885 << get_stat_rx() << " rx, "
1886 << get_stat_clean() << " clean, "
1887 << get_stat_dirty() << " dirty ("
1888 << target_dirty << " target, "
1889 << max_dirty << " max)"
1890 << dendl;
1891 loff_t actual = get_stat_dirty() + get_stat_dirty_waiting();
1892
1893 ZTracer::Trace trace;
1894 if (cct->_conf->osdc_blkin_trace_all) {
1895 trace.init("flusher", &trace_endpoint);
1896 trace.event("start");
1897 }
1898
1899 if (actual > 0 && (uint64_t) actual > target_dirty) {
1900 // flush some dirty pages
1901 ldout(cct, 10) << "flusher " << get_stat_dirty() << " dirty + "
1902 << get_stat_dirty_waiting() << " dirty_waiting > target "
1903 << target_dirty << ", flushing some dirty bhs" << dendl;
1904 flush(&trace, actual - target_dirty);
1905 } else {
1906 // check tail of lru for old dirty items
1907 ceph::real_time cutoff = ceph::real_clock::now();
1908 cutoff -= max_dirty_age;
1909 BufferHead *bh = 0;
1910 int max = MAX_FLUSH_UNDER_LOCK;
1911 while ((bh = static_cast<BufferHead*>(bh_lru_dirty.
1912 lru_get_next_expire())) != 0 &&
1913 bh->last_write <= cutoff &&
1914 max > 0) {
1915 ldout(cct, 10) << "flusher flushing aged dirty bh " << *bh << dendl;
1916 if (scattered_write) {
1917 bh_write_adjacencies(bh, cutoff, NULL, &max);
1918 } else {
1919 bh_write(bh, trace);
1920 --max;
1921 }
1922 }
1923 if (!max) {
1924 // back off the lock to avoid starving other threads
1925 trace.event("backoff");
1926 lock.Unlock();
1927 lock.Lock();
1928 continue;
1929 }
1930 }
1931
1932 trace.event("finish");
1933 if (flusher_stop)
1934 break;
1935
1936 flusher_cond.WaitInterval(lock, seconds(1));
1937 }
1938
1939 /* Wait for reads to finish. This is only possible if handling
1940 * -ENOENT made some read completions finish before their rados read
1941 * came back. If we don't wait for them, and destroy the cache, when
1942 * the rados reads do come back their callback will try to access the
1943 * no-longer-valid ObjectCacher.
1944 */
1945 while (reads_outstanding > 0) {
1946 ldout(cct, 10) << "Waiting for all reads to complete. Number left: "
1947 << reads_outstanding << dendl;
1948 read_cond.Wait(lock);
1949 }
1950
1951 lock.Unlock();
1952 ldout(cct, 10) << "flusher finish" << dendl;
1953 }
1954
1955
1956 // -------------------------------------------------
1957
1958 bool ObjectCacher::set_is_empty(ObjectSet *oset)
1959 {
1960 assert(lock.is_locked());
1961 if (oset->objects.empty())
1962 return true;
1963
1964 for (xlist<Object*>::iterator p = oset->objects.begin(); !p.end(); ++p)
1965 if (!(*p)->is_empty())
1966 return false;
1967
1968 return true;
1969 }
1970
1971 bool ObjectCacher::set_is_cached(ObjectSet *oset)
1972 {
1973 assert(lock.is_locked());
1974 if (oset->objects.empty())
1975 return false;
1976
1977 for (xlist<Object*>::iterator p = oset->objects.begin();
1978 !p.end(); ++p) {
1979 Object *ob = *p;
1980 for (map<loff_t,BufferHead*>::iterator q = ob->data.begin();
1981 q != ob->data.end();
1982 ++q) {
1983 BufferHead *bh = q->second;
1984 if (!bh->is_dirty() && !bh->is_tx())
1985 return true;
1986 }
1987 }
1988
1989 return false;
1990 }
1991
1992 bool ObjectCacher::set_is_dirty_or_committing(ObjectSet *oset)
1993 {
1994 assert(lock.is_locked());
1995 if (oset->objects.empty())
1996 return false;
1997
1998 for (xlist<Object*>::iterator i = oset->objects.begin();
1999 !i.end(); ++i) {
2000 Object *ob = *i;
2001
2002 for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
2003 p != ob->data.end();
2004 ++p) {
2005 BufferHead *bh = p->second;
2006 if (bh->is_dirty() || bh->is_tx())
2007 return true;
2008 }
2009 }
2010
2011 return false;
2012 }
2013
2014
2015 // purge. non-blocking. violently removes dirty buffers from cache.
2016 void ObjectCacher::purge(Object *ob)
2017 {
2018 assert(lock.is_locked());
2019 ldout(cct, 10) << "purge " << *ob << dendl;
2020
2021 ob->truncate(0);
2022 }
2023
2024
2025 // flush. non-blocking. no callback.
2026 // true if clean, already flushed.
2027 // false if we wrote something.
2028 // be sloppy about the ranges and flush any buffer it touches
2029 bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length,
2030 ZTracer::Trace *trace)
2031 {
2032 assert(trace != nullptr);
2033 assert(lock.is_locked());
2034 list<BufferHead*> blist;
2035 bool clean = true;
2036 ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl;
2037 for (map<loff_t,BufferHead*>::const_iterator p = ob->data_lower_bound(offset);
2038 p != ob->data.end();
2039 ++p) {
2040 BufferHead *bh = p->second;
2041 ldout(cct, 20) << "flush " << *bh << dendl;
2042 if (length && bh->start() > offset+length) {
2043 break;
2044 }
2045 if (bh->is_tx()) {
2046 clean = false;
2047 continue;
2048 }
2049 if (!bh->is_dirty()) {
2050 continue;
2051 }
2052
2053 if (scattered_write)
2054 blist.push_back(bh);
2055 else
2056 bh_write(bh, *trace);
2057 clean = false;
2058 }
2059 if (scattered_write && !blist.empty())
2060 bh_write_scattered(blist);
2061
2062 return clean;
2063 }
2064
2065 bool ObjectCacher::_flush_set_finish(C_GatherBuilder *gather,
2066 Context *onfinish)
2067 {
2068 assert(lock.is_locked());
2069 if (gather->has_subs()) {
2070 gather->set_finisher(onfinish);
2071 gather->activate();
2072 return false;
2073 }
2074
2075 ldout(cct, 10) << "flush_set has no dirty|tx bhs" << dendl;
2076 onfinish->complete(0);
2077 return true;
2078 }
2079
2080 // flush. non-blocking, takes callback.
2081 // returns true if already flushed
2082 bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
2083 {
2084 assert(lock.is_locked());
2085 assert(onfinish != NULL);
2086 if (oset->objects.empty()) {
2087 ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2088 onfinish->complete(0);
2089 return true;
2090 }
2091
2092 ldout(cct, 10) << "flush_set " << oset << dendl;
2093
2094 // we'll need to wait for all objects to flush!
2095 C_GatherBuilder gather(cct);
2096 set<Object*> waitfor_commit;
2097
2098 list<BufferHead*> blist;
2099 Object *last_ob = NULL;
2100 set<BufferHead*, BufferHead::ptr_lt>::const_iterator it, p, q;
2101
2102 // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
2103 // order. But items in oset->objects are not sorted. So the iterator can
2104 // point to any buffer head in the ObjectSet
2105 BufferHead key(*oset->objects.begin());
2106 it = dirty_or_tx_bh.lower_bound(&key);
2107 p = q = it;
2108
2109 bool backwards = true;
2110 if (it != dirty_or_tx_bh.begin())
2111 --it;
2112 else
2113 backwards = false;
2114
2115 for (; p != dirty_or_tx_bh.end(); p = q) {
2116 ++q;
2117 BufferHead *bh = *p;
2118 if (bh->ob->oset != oset)
2119 break;
2120 waitfor_commit.insert(bh->ob);
2121 if (bh->is_dirty()) {
2122 if (scattered_write) {
2123 if (last_ob != bh->ob) {
2124 if (!blist.empty()) {
2125 bh_write_scattered(blist);
2126 blist.clear();
2127 }
2128 last_ob = bh->ob;
2129 }
2130 blist.push_back(bh);
2131 } else {
2132 bh_write(bh, {});
2133 }
2134 }
2135 }
2136
2137 if (backwards) {
2138 for(p = q = it; true; p = q) {
2139 if (q != dirty_or_tx_bh.begin())
2140 --q;
2141 else
2142 backwards = false;
2143 BufferHead *bh = *p;
2144 if (bh->ob->oset != oset)
2145 break;
2146 waitfor_commit.insert(bh->ob);
2147 if (bh->is_dirty()) {
2148 if (scattered_write) {
2149 if (last_ob != bh->ob) {
2150 if (!blist.empty()) {
2151 bh_write_scattered(blist);
2152 blist.clear();
2153 }
2154 last_ob = bh->ob;
2155 }
2156 blist.push_front(bh);
2157 } else {
2158 bh_write(bh, {});
2159 }
2160 }
2161 if (!backwards)
2162 break;
2163 }
2164 }
2165
2166 if (scattered_write && !blist.empty())
2167 bh_write_scattered(blist);
2168
2169 for (set<Object*>::iterator i = waitfor_commit.begin();
2170 i != waitfor_commit.end(); ++i) {
2171 Object *ob = *i;
2172
2173 // we'll need to gather...
2174 ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2175 << ob->last_write_tid << " on " << *ob << dendl;
2176 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2177 }
2178
2179 return _flush_set_finish(&gather, onfinish);
2180 }
2181
2182 // flush. non-blocking, takes callback.
2183 // returns true if already flushed
2184 bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv,
2185 ZTracer::Trace *trace, Context *onfinish)
2186 {
2187 assert(lock.is_locked());
2188 assert(trace != nullptr);
2189 assert(onfinish != NULL);
2190 if (oset->objects.empty()) {
2191 ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2192 onfinish->complete(0);
2193 return true;
2194 }
2195
2196 ldout(cct, 10) << "flush_set " << oset << " on " << exv.size()
2197 << " ObjectExtents" << dendl;
2198
2199 // we'll need to wait for all objects to flush!
2200 C_GatherBuilder gather(cct);
2201
2202 for (vector<ObjectExtent>::iterator p = exv.begin();
2203 p != exv.end();
2204 ++p) {
2205 ObjectExtent &ex = *p;
2206 sobject_t soid(ex.oid, CEPH_NOSNAP);
2207 if (objects[oset->poolid].count(soid) == 0)
2208 continue;
2209 Object *ob = objects[oset->poolid][soid];
2210
2211 ldout(cct, 20) << "flush_set " << oset << " ex " << ex << " ob " << soid
2212 << " " << ob << dendl;
2213
2214 if (!flush(ob, ex.offset, ex.length, trace)) {
2215 // we'll need to gather...
2216 ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2217 << ob->last_write_tid << " on " << *ob << dendl;
2218 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2219 }
2220 }
2221
2222 return _flush_set_finish(&gather, onfinish);
2223 }
2224
2225 // flush all dirty data. non-blocking, takes callback.
2226 // returns true if already flushed
2227 bool ObjectCacher::flush_all(Context *onfinish)
2228 {
2229 assert(lock.is_locked());
2230 assert(onfinish != NULL);
2231
2232 ldout(cct, 10) << "flush_all " << dendl;
2233
2234 // we'll need to wait for all objects to flush!
2235 C_GatherBuilder gather(cct);
2236 set<Object*> waitfor_commit;
2237
2238 list<BufferHead*> blist;
2239 Object *last_ob = NULL;
2240 set<BufferHead*, BufferHead::ptr_lt>::iterator next, it;
2241 next = it = dirty_or_tx_bh.begin();
2242 while (it != dirty_or_tx_bh.end()) {
2243 ++next;
2244 BufferHead *bh = *it;
2245 waitfor_commit.insert(bh->ob);
2246
2247 if (bh->is_dirty()) {
2248 if (scattered_write) {
2249 if (last_ob != bh->ob) {
2250 if (!blist.empty()) {
2251 bh_write_scattered(blist);
2252 blist.clear();
2253 }
2254 last_ob = bh->ob;
2255 }
2256 blist.push_back(bh);
2257 } else {
2258 bh_write(bh, {});
2259 }
2260 }
2261
2262 it = next;
2263 }
2264
2265 if (scattered_write && !blist.empty())
2266 bh_write_scattered(blist);
2267
2268 for (set<Object*>::iterator i = waitfor_commit.begin();
2269 i != waitfor_commit.end();
2270 ++i) {
2271 Object *ob = *i;
2272
2273 // we'll need to gather...
2274 ldout(cct, 10) << "flush_all will wait for ack tid "
2275 << ob->last_write_tid << " on " << *ob << dendl;
2276 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2277 }
2278
2279 return _flush_set_finish(&gather, onfinish);
2280 }
2281
2282 void ObjectCacher::purge_set(ObjectSet *oset)
2283 {
2284 assert(lock.is_locked());
2285 if (oset->objects.empty()) {
2286 ldout(cct, 10) << "purge_set on " << oset << " dne" << dendl;
2287 return;
2288 }
2289
2290 ldout(cct, 10) << "purge_set " << oset << dendl;
2291 const bool were_dirty = oset->dirty_or_tx > 0;
2292
2293 for (xlist<Object*>::iterator i = oset->objects.begin();
2294 !i.end(); ++i) {
2295 Object *ob = *i;
2296 purge(ob);
2297 }
2298
2299 // Although we have purged rather than flushed, caller should still
2300 // drop any resources associate with dirty data.
2301 assert(oset->dirty_or_tx == 0);
2302 if (flush_set_callback && were_dirty) {
2303 flush_set_callback(flush_set_callback_arg, oset);
2304 }
2305 }
2306
2307
2308 loff_t ObjectCacher::release(Object *ob)
2309 {
2310 assert(lock.is_locked());
2311 list<BufferHead*> clean;
2312 loff_t o_unclean = 0;
2313
2314 for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
2315 p != ob->data.end();
2316 ++p) {
2317 BufferHead *bh = p->second;
2318 if (bh->is_clean() || bh->is_zero() || bh->is_error())
2319 clean.push_back(bh);
2320 else
2321 o_unclean += bh->length();
2322 }
2323
2324 for (list<BufferHead*>::iterator p = clean.begin();
2325 p != clean.end();
2326 ++p) {
2327 bh_remove(ob, *p);
2328 delete *p;
2329 }
2330
2331 if (ob->can_close()) {
2332 ldout(cct, 10) << "release trimming " << *ob << dendl;
2333 close_object(ob);
2334 assert(o_unclean == 0);
2335 return 0;
2336 }
2337
2338 if (ob->complete) {
2339 ldout(cct, 10) << "release clearing complete on " << *ob << dendl;
2340 ob->complete = false;
2341 }
2342 if (!ob->exists) {
2343 ldout(cct, 10) << "release setting exists on " << *ob << dendl;
2344 ob->exists = true;
2345 }
2346
2347 return o_unclean;
2348 }
2349
2350 loff_t ObjectCacher::release_set(ObjectSet *oset)
2351 {
2352 assert(lock.is_locked());
2353 // return # bytes not clean (and thus not released).
2354 loff_t unclean = 0;
2355
2356 if (oset->objects.empty()) {
2357 ldout(cct, 10) << "release_set on " << oset << " dne" << dendl;
2358 return 0;
2359 }
2360
2361 ldout(cct, 10) << "release_set " << oset << dendl;
2362
2363 xlist<Object*>::iterator q;
2364 for (xlist<Object*>::iterator p = oset->objects.begin();
2365 !p.end(); ) {
2366 q = p;
2367 ++q;
2368 Object *ob = *p;
2369
2370 loff_t o_unclean = release(ob);
2371 unclean += o_unclean;
2372
2373 if (o_unclean)
2374 ldout(cct, 10) << "release_set " << oset << " " << *ob
2375 << " has " << o_unclean << " bytes left"
2376 << dendl;
2377 p = q;
2378 }
2379
2380 if (unclean) {
2381 ldout(cct, 10) << "release_set " << oset
2382 << ", " << unclean << " bytes left" << dendl;
2383 }
2384
2385 return unclean;
2386 }
2387
2388
2389 uint64_t ObjectCacher::release_all()
2390 {
2391 assert(lock.is_locked());
2392 ldout(cct, 10) << "release_all" << dendl;
2393 uint64_t unclean = 0;
2394
2395 vector<ceph::unordered_map<sobject_t, Object*> >::iterator i
2396 = objects.begin();
2397 while (i != objects.end()) {
2398 ceph::unordered_map<sobject_t, Object*>::iterator p = i->begin();
2399 while (p != i->end()) {
2400 ceph::unordered_map<sobject_t, Object*>::iterator n = p;
2401 ++n;
2402
2403 Object *ob = p->second;
2404
2405 loff_t o_unclean = release(ob);
2406 unclean += o_unclean;
2407
2408 if (o_unclean)
2409 ldout(cct, 10) << "release_all " << *ob
2410 << " has " << o_unclean << " bytes left"
2411 << dendl;
2412 p = n;
2413 }
2414 ++i;
2415 }
2416
2417 if (unclean) {
2418 ldout(cct, 10) << "release_all unclean " << unclean << " bytes left"
2419 << dendl;
2420 }
2421
2422 return unclean;
2423 }
2424
2425 void ObjectCacher::clear_nonexistence(ObjectSet *oset)
2426 {
2427 assert(lock.is_locked());
2428 ldout(cct, 10) << "clear_nonexistence() " << oset << dendl;
2429
2430 for (xlist<Object*>::iterator p = oset->objects.begin();
2431 !p.end(); ++p) {
2432 Object *ob = *p;
2433 if (!ob->exists) {
2434 ldout(cct, 10) << " setting exists and complete on " << *ob << dendl;
2435 ob->exists = true;
2436 ob->complete = false;
2437 }
2438 for (xlist<C_ReadFinish*>::iterator q = ob->reads.begin();
2439 !q.end(); ++q) {
2440 C_ReadFinish *comp = *q;
2441 comp->distrust_enoent();
2442 }
2443 }
2444 }
2445
2446 /**
2447 * discard object extents from an ObjectSet by removing the objects in
2448 * exls from the in-memory oset.
2449 */
2450 void ObjectCacher::discard_set(ObjectSet *oset, const vector<ObjectExtent>& exls)
2451 {
2452 assert(lock.is_locked());
2453 if (oset->objects.empty()) {
2454 ldout(cct, 10) << "discard_set on " << oset << " dne" << dendl;
2455 return;
2456 }
2457
2458 ldout(cct, 10) << "discard_set " << oset << dendl;
2459
2460 bool were_dirty = oset->dirty_or_tx > 0;
2461
2462 for (vector<ObjectExtent>::const_iterator p = exls.begin();
2463 p != exls.end();
2464 ++p) {
2465 ldout(cct, 10) << "discard_set " << oset << " ex " << *p << dendl;
2466 const ObjectExtent &ex = *p;
2467 sobject_t soid(ex.oid, CEPH_NOSNAP);
2468 if (objects[oset->poolid].count(soid) == 0)
2469 continue;
2470 Object *ob = objects[oset->poolid][soid];
2471
2472 ob->discard(ex.offset, ex.length);
2473 }
2474
2475 // did we truncate off dirty data?
2476 if (flush_set_callback &&
2477 were_dirty && oset->dirty_or_tx == 0)
2478 flush_set_callback(flush_set_callback_arg, oset);
2479 }
2480
2481 void ObjectCacher::verify_stats() const
2482 {
2483 assert(lock.is_locked());
2484 ldout(cct, 10) << "verify_stats" << dendl;
2485
2486 loff_t clean = 0, zero = 0, dirty = 0, rx = 0, tx = 0, missing = 0,
2487 error = 0;
2488 for (vector<ceph::unordered_map<sobject_t, Object*> >::const_iterator i
2489 = objects.begin();
2490 i != objects.end();
2491 ++i) {
2492 for (ceph::unordered_map<sobject_t, Object*>::const_iterator p
2493 = i->begin();
2494 p != i->end();
2495 ++p) {
2496 Object *ob = p->second;
2497 for (map<loff_t, BufferHead*>::const_iterator q = ob->data.begin();
2498 q != ob->data.end();
2499 ++q) {
2500 BufferHead *bh = q->second;
2501 switch (bh->get_state()) {
2502 case BufferHead::STATE_MISSING:
2503 missing += bh->length();
2504 break;
2505 case BufferHead::STATE_CLEAN:
2506 clean += bh->length();
2507 break;
2508 case BufferHead::STATE_ZERO:
2509 zero += bh->length();
2510 break;
2511 case BufferHead::STATE_DIRTY:
2512 dirty += bh->length();
2513 break;
2514 case BufferHead::STATE_TX:
2515 tx += bh->length();
2516 break;
2517 case BufferHead::STATE_RX:
2518 rx += bh->length();
2519 break;
2520 case BufferHead::STATE_ERROR:
2521 error += bh->length();
2522 break;
2523 default:
2524 ceph_abort();
2525 }
2526 }
2527 }
2528 }
2529
2530 ldout(cct, 10) << " clean " << clean << " rx " << rx << " tx " << tx
2531 << " dirty " << dirty << " missing " << missing
2532 << " error " << error << dendl;
2533 assert(clean == stat_clean);
2534 assert(rx == stat_rx);
2535 assert(tx == stat_tx);
2536 assert(dirty == stat_dirty);
2537 assert(missing == stat_missing);
2538 assert(zero == stat_zero);
2539 assert(error == stat_error);
2540 }
2541
2542 void ObjectCacher::bh_stat_add(BufferHead *bh)
2543 {
2544 assert(lock.is_locked());
2545 switch (bh->get_state()) {
2546 case BufferHead::STATE_MISSING:
2547 stat_missing += bh->length();
2548 break;
2549 case BufferHead::STATE_CLEAN:
2550 stat_clean += bh->length();
2551 break;
2552 case BufferHead::STATE_ZERO:
2553 stat_zero += bh->length();
2554 break;
2555 case BufferHead::STATE_DIRTY:
2556 stat_dirty += bh->length();
2557 bh->ob->dirty_or_tx += bh->length();
2558 bh->ob->oset->dirty_or_tx += bh->length();
2559 break;
2560 case BufferHead::STATE_TX:
2561 stat_tx += bh->length();
2562 bh->ob->dirty_or_tx += bh->length();
2563 bh->ob->oset->dirty_or_tx += bh->length();
2564 break;
2565 case BufferHead::STATE_RX:
2566 stat_rx += bh->length();
2567 break;
2568 case BufferHead::STATE_ERROR:
2569 stat_error += bh->length();
2570 break;
2571 default:
2572 assert(0 == "bh_stat_add: invalid bufferhead state");
2573 }
2574 if (get_stat_dirty_waiting() > 0)
2575 stat_cond.Signal();
2576 }
2577
2578 void ObjectCacher::bh_stat_sub(BufferHead *bh)
2579 {
2580 assert(lock.is_locked());
2581 switch (bh->get_state()) {
2582 case BufferHead::STATE_MISSING:
2583 stat_missing -= bh->length();
2584 break;
2585 case BufferHead::STATE_CLEAN:
2586 stat_clean -= bh->length();
2587 break;
2588 case BufferHead::STATE_ZERO:
2589 stat_zero -= bh->length();
2590 break;
2591 case BufferHead::STATE_DIRTY:
2592 stat_dirty -= bh->length();
2593 bh->ob->dirty_or_tx -= bh->length();
2594 bh->ob->oset->dirty_or_tx -= bh->length();
2595 break;
2596 case BufferHead::STATE_TX:
2597 stat_tx -= bh->length();
2598 bh->ob->dirty_or_tx -= bh->length();
2599 bh->ob->oset->dirty_or_tx -= bh->length();
2600 break;
2601 case BufferHead::STATE_RX:
2602 stat_rx -= bh->length();
2603 break;
2604 case BufferHead::STATE_ERROR:
2605 stat_error -= bh->length();
2606 break;
2607 default:
2608 assert(0 == "bh_stat_sub: invalid bufferhead state");
2609 }
2610 }
2611
2612 void ObjectCacher::bh_set_state(BufferHead *bh, int s)
2613 {
2614 assert(lock.is_locked());
2615 int state = bh->get_state();
2616 // move between lru lists?
2617 if (s == BufferHead::STATE_DIRTY && state != BufferHead::STATE_DIRTY) {
2618 bh_lru_rest.lru_remove(bh);
2619 bh_lru_dirty.lru_insert_top(bh);
2620 } else if (s != BufferHead::STATE_DIRTY &&state == BufferHead::STATE_DIRTY) {
2621 bh_lru_dirty.lru_remove(bh);
2622 if (bh->get_dontneed())
2623 bh_lru_rest.lru_insert_bot(bh);
2624 else
2625 bh_lru_rest.lru_insert_top(bh);
2626 }
2627
2628 if ((s == BufferHead::STATE_TX ||
2629 s == BufferHead::STATE_DIRTY) &&
2630 state != BufferHead::STATE_TX &&
2631 state != BufferHead::STATE_DIRTY) {
2632 dirty_or_tx_bh.insert(bh);
2633 } else if ((state == BufferHead::STATE_TX ||
2634 state == BufferHead::STATE_DIRTY) &&
2635 s != BufferHead::STATE_TX &&
2636 s != BufferHead::STATE_DIRTY) {
2637 dirty_or_tx_bh.erase(bh);
2638 }
2639
2640 if (s != BufferHead::STATE_ERROR &&
2641 state == BufferHead::STATE_ERROR) {
2642 bh->error = 0;
2643 }
2644
2645 // set state
2646 bh_stat_sub(bh);
2647 bh->set_state(s);
2648 bh_stat_add(bh);
2649 }
2650
2651 void ObjectCacher::bh_add(Object *ob, BufferHead *bh)
2652 {
2653 assert(lock.is_locked());
2654 ldout(cct, 30) << "bh_add " << *ob << " " << *bh << dendl;
2655 ob->add_bh(bh);
2656 if (bh->is_dirty()) {
2657 bh_lru_dirty.lru_insert_top(bh);
2658 dirty_or_tx_bh.insert(bh);
2659 } else {
2660 if (bh->get_dontneed())
2661 bh_lru_rest.lru_insert_bot(bh);
2662 else
2663 bh_lru_rest.lru_insert_top(bh);
2664 }
2665
2666 if (bh->is_tx()) {
2667 dirty_or_tx_bh.insert(bh);
2668 }
2669 bh_stat_add(bh);
2670 }
2671
2672 void ObjectCacher::bh_remove(Object *ob, BufferHead *bh)
2673 {
2674 assert(lock.is_locked());
2675 assert(bh->get_journal_tid() == 0);
2676 ldout(cct, 30) << "bh_remove " << *ob << " " << *bh << dendl;
2677 ob->remove_bh(bh);
2678 if (bh->is_dirty()) {
2679 bh_lru_dirty.lru_remove(bh);
2680 dirty_or_tx_bh.erase(bh);
2681 } else {
2682 bh_lru_rest.lru_remove(bh);
2683 }
2684
2685 if (bh->is_tx()) {
2686 dirty_or_tx_bh.erase(bh);
2687 }
2688 bh_stat_sub(bh);
2689 if (get_stat_dirty_waiting() > 0)
2690 stat_cond.Signal();
2691 }
2692