]> git.proxmox.com Git - ceph.git/blob - ceph/src/osdc/ObjectCacher.cc
a77d6b318834796c945fbed137d7e262f0f8f5f0
[ceph.git] / ceph / src / osdc / ObjectCacher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <limits.h>
5
6 #include "msg/Messenger.h"
7 #include "ObjectCacher.h"
8 #include "WritebackHandler.h"
9 #include "common/errno.h"
10 #include "common/perf_counters.h"
11
12 #include "include/assert.h"
13
14 #define MAX_FLUSH_UNDER_LOCK 20 ///< max bh's we start writeback on
15
16 using std::chrono::seconds;
17 /// while holding the lock
18
19 /*** ObjectCacher::BufferHead ***/
20
21
22 /*** ObjectCacher::Object ***/
23
24 #define dout_subsys ceph_subsys_objectcacher
25 #undef dout_prefix
26 #define dout_prefix *_dout << "objectcacher.object(" << oid << ") "
27
28
29
30 class ObjectCacher::C_ReadFinish : public Context {
31 ObjectCacher *oc;
32 int64_t poolid;
33 sobject_t oid;
34 loff_t start;
35 uint64_t length;
36 xlist<C_ReadFinish*>::item set_item;
37 bool trust_enoent;
38 ceph_tid_t tid;
39 ZTracer::Trace trace;
40
41 public:
42 bufferlist bl;
43 C_ReadFinish(ObjectCacher *c, Object *ob, ceph_tid_t t, loff_t s,
44 uint64_t l, const ZTracer::Trace &trace) :
45 oc(c), poolid(ob->oloc.pool), oid(ob->get_soid()), start(s), length(l),
46 set_item(this), trust_enoent(true),
47 tid(t), trace(trace) {
48 ob->reads.push_back(&set_item);
49 }
50
51 void finish(int r) override {
52 oc->bh_read_finish(poolid, oid, tid, start, length, bl, r, trust_enoent);
53 trace.event("finish");
54
55 // object destructor clears the list
56 if (set_item.is_on_list())
57 set_item.remove_myself();
58 }
59
60 void distrust_enoent() {
61 trust_enoent = false;
62 }
63 };
64
65 class ObjectCacher::C_RetryRead : public Context {
66 ObjectCacher *oc;
67 OSDRead *rd;
68 ObjectSet *oset;
69 Context *onfinish;
70 ZTracer::Trace trace;
71 public:
72 C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c,
73 const ZTracer::Trace &trace)
74 : oc(_oc), rd(r), oset(os), onfinish(c), trace(trace) {
75 }
76 void finish(int r) override {
77 if (r >= 0) {
78 r = oc->_readx(rd, oset, onfinish, false, &trace);
79 }
80
81 if (r == 0) {
82 // read is still in-progress
83 return;
84 }
85
86 trace.event("finish");
87 if (onfinish) {
88 onfinish->complete(r);
89 }
90 }
91 };
92
93 ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left,
94 loff_t off)
95 {
96 assert(oc->lock.is_locked());
97 ldout(oc->cct, 20) << "split " << *left << " at " << off << dendl;
98
99 // split off right
100 ObjectCacher::BufferHead *right = new BufferHead(this);
101
102 //inherit and if later access, this auto clean.
103 right->set_dontneed(left->get_dontneed());
104 right->set_nocache(left->get_nocache());
105
106 right->last_write_tid = left->last_write_tid;
107 right->last_read_tid = left->last_read_tid;
108 right->set_state(left->get_state());
109 right->snapc = left->snapc;
110 right->set_journal_tid(left->journal_tid);
111
112 loff_t newleftlen = off - left->start();
113 right->set_start(off);
114 right->set_length(left->length() - newleftlen);
115
116 // shorten left
117 oc->bh_stat_sub(left);
118 left->set_length(newleftlen);
119 oc->bh_stat_add(left);
120
121 // add right
122 oc->bh_add(this, right);
123
124 // split buffers too
125 bufferlist bl;
126 bl.claim(left->bl);
127 if (bl.length()) {
128 assert(bl.length() == (left->length() + right->length()));
129 right->bl.substr_of(bl, left->length(), right->length());
130 left->bl.substr_of(bl, 0, left->length());
131 }
132
133 // move read waiters
134 if (!left->waitfor_read.empty()) {
135 map<loff_t, list<Context*> >::iterator start_remove
136 = left->waitfor_read.begin();
137 while (start_remove != left->waitfor_read.end() &&
138 start_remove->first < right->start())
139 ++start_remove;
140 for (map<loff_t, list<Context*> >::iterator p = start_remove;
141 p != left->waitfor_read.end(); ++p) {
142 ldout(oc->cct, 20) << "split moving waiters at byte " << p->first
143 << " to right bh" << dendl;
144 right->waitfor_read[p->first].swap( p->second );
145 assert(p->second.empty());
146 }
147 left->waitfor_read.erase(start_remove, left->waitfor_read.end());
148 }
149
150 ldout(oc->cct, 20) << "split left is " << *left << dendl;
151 ldout(oc->cct, 20) << "split right is " << *right << dendl;
152 return right;
153 }
154
155
156 void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
157 {
158 assert(oc->lock.is_locked());
159 assert(left->end() == right->start());
160 assert(left->get_state() == right->get_state());
161 assert(left->can_merge_journal(right));
162
163 ldout(oc->cct, 10) << "merge_left " << *left << " + " << *right << dendl;
164 if (left->get_journal_tid() == 0) {
165 left->set_journal_tid(right->get_journal_tid());
166 }
167 right->set_journal_tid(0);
168
169 oc->bh_remove(this, right);
170 oc->bh_stat_sub(left);
171 left->set_length(left->length() + right->length());
172 oc->bh_stat_add(left);
173
174 // data
175 left->bl.claim_append(right->bl);
176
177 // version
178 // note: this is sorta busted, but should only be used for dirty buffers
179 left->last_write_tid = MAX( left->last_write_tid, right->last_write_tid );
180 left->last_write = MAX( left->last_write, right->last_write );
181
182 left->set_dontneed(right->get_dontneed() ? left->get_dontneed() : false);
183 left->set_nocache(right->get_nocache() ? left->get_nocache() : false);
184
185 // waiters
186 for (map<loff_t, list<Context*> >::iterator p = right->waitfor_read.begin();
187 p != right->waitfor_read.end();
188 ++p)
189 left->waitfor_read[p->first].splice(left->waitfor_read[p->first].begin(),
190 p->second );
191
192 // hose right
193 delete right;
194
195 ldout(oc->cct, 10) << "merge_left result " << *left << dendl;
196 }
197
198 void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
199 {
200 assert(oc->lock.is_locked());
201 ldout(oc->cct, 10) << "try_merge_bh " << *bh << dendl;
202
203 // do not merge rx buffers; last_read_tid may not match
204 if (bh->is_rx())
205 return;
206
207 // to the left?
208 map<loff_t,BufferHead*>::iterator p = data.find(bh->start());
209 assert(p->second == bh);
210 if (p != data.begin()) {
211 --p;
212 if (p->second->end() == bh->start() &&
213 p->second->get_state() == bh->get_state() &&
214 p->second->can_merge_journal(bh)) {
215 merge_left(p->second, bh);
216 bh = p->second;
217 } else {
218 ++p;
219 }
220 }
221 // to the right?
222 assert(p->second == bh);
223 ++p;
224 if (p != data.end() &&
225 p->second->start() == bh->end() &&
226 p->second->get_state() == bh->get_state() &&
227 p->second->can_merge_journal(bh))
228 merge_left(bh, p->second);
229 }
230
231 /*
232 * count bytes we have cached in given range
233 */
234 bool ObjectCacher::Object::is_cached(loff_t cur, loff_t left) const
235 {
236 assert(oc->lock.is_locked());
237 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(cur);
238 while (left > 0) {
239 if (p == data.end())
240 return false;
241
242 if (p->first <= cur) {
243 // have part of it
244 loff_t lenfromcur = MIN(p->second->end() - cur, left);
245 cur += lenfromcur;
246 left -= lenfromcur;
247 ++p;
248 continue;
249 } else if (p->first > cur) {
250 // gap
251 return false;
252 } else
253 ceph_abort();
254 }
255
256 return true;
257 }
258
259 /*
260 * all cached data in this range[off, off+len]
261 */
262 bool ObjectCacher::Object::include_all_cached_data(loff_t off, loff_t len)
263 {
264 assert(oc->lock.is_locked());
265 if (data.empty())
266 return true;
267 map<loff_t, BufferHead*>::iterator first = data.begin();
268 map<loff_t, BufferHead*>::reverse_iterator last = data.rbegin();
269 if (first->second->start() >= off && last->second->end() <= (off + len))
270 return true;
271 else
272 return false;
273 }
274
275 /*
276 * map a range of bytes into buffer_heads.
277 * - create missing buffer_heads as necessary.
278 */
279 int ObjectCacher::Object::map_read(ObjectExtent &ex,
280 map<loff_t, BufferHead*>& hits,
281 map<loff_t, BufferHead*>& missing,
282 map<loff_t, BufferHead*>& rx,
283 map<loff_t, BufferHead*>& errors)
284 {
285 assert(oc->lock.is_locked());
286 ldout(oc->cct, 10) << "map_read " << ex.oid << " "
287 << ex.offset << "~" << ex.length << dendl;
288
289 loff_t cur = ex.offset;
290 loff_t left = ex.length;
291
292 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
293 while (left > 0) {
294 // at end?
295 if (p == data.end()) {
296 // rest is a miss.
297 BufferHead *n = new BufferHead(this);
298 n->set_start(cur);
299 n->set_length(left);
300 oc->bh_add(this, n);
301 if (complete) {
302 oc->mark_zero(n);
303 hits[cur] = n;
304 ldout(oc->cct, 20) << "map_read miss+complete+zero " << left << " left, " << *n << dendl;
305 } else {
306 missing[cur] = n;
307 ldout(oc->cct, 20) << "map_read miss " << left << " left, " << *n << dendl;
308 }
309 cur += left;
310 assert(cur == (loff_t)ex.offset + (loff_t)ex.length);
311 break; // no more.
312 }
313
314 if (p->first <= cur) {
315 // have it (or part of it)
316 BufferHead *e = p->second;
317
318 if (e->is_clean() ||
319 e->is_dirty() ||
320 e->is_tx() ||
321 e->is_zero()) {
322 hits[cur] = e; // readable!
323 ldout(oc->cct, 20) << "map_read hit " << *e << dendl;
324 } else if (e->is_rx()) {
325 rx[cur] = e; // missing, not readable.
326 ldout(oc->cct, 20) << "map_read rx " << *e << dendl;
327 } else if (e->is_error()) {
328 errors[cur] = e;
329 ldout(oc->cct, 20) << "map_read error " << *e << dendl;
330 } else {
331 ceph_abort();
332 }
333
334 loff_t lenfromcur = MIN(e->end() - cur, left);
335 cur += lenfromcur;
336 left -= lenfromcur;
337 ++p;
338 continue; // more?
339
340 } else if (p->first > cur) {
341 // gap.. miss
342 loff_t next = p->first;
343 BufferHead *n = new BufferHead(this);
344 loff_t len = MIN(next - cur, left);
345 n->set_start(cur);
346 n->set_length(len);
347 oc->bh_add(this,n);
348 if (complete) {
349 oc->mark_zero(n);
350 hits[cur] = n;
351 ldout(oc->cct, 20) << "map_read gap+complete+zero " << *n << dendl;
352 } else {
353 missing[cur] = n;
354 ldout(oc->cct, 20) << "map_read gap " << *n << dendl;
355 }
356 cur += MIN(left, n->length());
357 left -= MIN(left, n->length());
358 continue; // more?
359 } else {
360 ceph_abort();
361 }
362 }
363 return 0;
364 }
365
366 void ObjectCacher::Object::audit_buffers()
367 {
368 loff_t offset = 0;
369 for (map<loff_t, BufferHead*>::const_iterator it = data.begin();
370 it != data.end(); ++it) {
371 if (it->first != it->second->start()) {
372 lderr(oc->cct) << "AUDIT FAILURE: map position " << it->first
373 << " does not match bh start position: "
374 << *it->second << dendl;
375 assert(it->first == it->second->start());
376 }
377 if (it->first < offset) {
378 lderr(oc->cct) << "AUDIT FAILURE: " << it->first << " " << *it->second
379 << " overlaps with previous bh " << *((--it)->second)
380 << dendl;
381 assert(it->first >= offset);
382 }
383 BufferHead *bh = it->second;
384 map<loff_t, list<Context*> >::const_iterator w_it;
385 for (w_it = bh->waitfor_read.begin();
386 w_it != bh->waitfor_read.end(); ++w_it) {
387 if (w_it->first < bh->start() ||
388 w_it->first >= bh->start() + bh->length()) {
389 lderr(oc->cct) << "AUDIT FAILURE: waiter at " << w_it->first
390 << " is not within bh " << *bh << dendl;
391 assert(w_it->first >= bh->start());
392 assert(w_it->first < bh->start() + bh->length());
393 }
394 }
395 offset = it->first + it->second->length();
396 }
397 }
398
399 /*
400 * map a range of extents on an object's buffer cache.
401 * - combine any bh's we're writing into one
402 * - break up bufferheads that don't fall completely within the range
403 * //no! - return a bh that includes the write. may also include
404 * other dirty data to left and/or right.
405 */
406 ObjectCacher::BufferHead *ObjectCacher::Object::map_write(ObjectExtent &ex,
407 ceph_tid_t tid)
408 {
409 assert(oc->lock.is_locked());
410 BufferHead *final = 0;
411
412 ldout(oc->cct, 10) << "map_write oex " << ex.oid
413 << " " << ex.offset << "~" << ex.length << dendl;
414
415 loff_t cur = ex.offset;
416 loff_t left = ex.length;
417
418 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
419 while (left > 0) {
420 loff_t max = left;
421
422 // at end ?
423 if (p == data.end()) {
424 if (final == NULL) {
425 final = new BufferHead(this);
426 replace_journal_tid(final, tid);
427 final->set_start( cur );
428 final->set_length( max );
429 oc->bh_add(this, final);
430 ldout(oc->cct, 10) << "map_write adding trailing bh " << *final << dendl;
431 } else {
432 oc->bh_stat_sub(final);
433 final->set_length(final->length() + max);
434 oc->bh_stat_add(final);
435 }
436 left -= max;
437 cur += max;
438 continue;
439 }
440
441 ldout(oc->cct, 10) << "cur is " << cur << ", p is " << *p->second << dendl;
442 //oc->verify_stats();
443
444 if (p->first <= cur) {
445 BufferHead *bh = p->second;
446 ldout(oc->cct, 10) << "map_write bh " << *bh << " intersected" << dendl;
447
448 if (p->first < cur) {
449 assert(final == 0);
450 if (cur + max >= bh->end()) {
451 // we want right bit (one splice)
452 final = split(bh, cur); // just split it, take right half.
453 replace_journal_tid(final, tid);
454 ++p;
455 assert(p->second == final);
456 } else {
457 // we want middle bit (two splices)
458 final = split(bh, cur);
459 ++p;
460 assert(p->second == final);
461 split(final, cur+max);
462 replace_journal_tid(final, tid);
463 }
464 } else {
465 assert(p->first == cur);
466 if (bh->length() <= max) {
467 // whole bufferhead, piece of cake.
468 } else {
469 // we want left bit (one splice)
470 split(bh, cur + max); // just split
471 }
472 if (final) {
473 oc->mark_dirty(bh);
474 oc->mark_dirty(final);
475 --p; // move iterator back to final
476 assert(p->second == final);
477 replace_journal_tid(bh, tid);
478 merge_left(final, bh);
479 } else {
480 final = bh;
481 replace_journal_tid(final, tid);
482 }
483 }
484
485 // keep going.
486 loff_t lenfromcur = final->end() - cur;
487 cur += lenfromcur;
488 left -= lenfromcur;
489 ++p;
490 continue;
491 } else {
492 // gap!
493 loff_t next = p->first;
494 loff_t glen = MIN(next - cur, max);
495 ldout(oc->cct, 10) << "map_write gap " << cur << "~" << glen << dendl;
496 if (final) {
497 oc->bh_stat_sub(final);
498 final->set_length(final->length() + glen);
499 oc->bh_stat_add(final);
500 } else {
501 final = new BufferHead(this);
502 replace_journal_tid(final, tid);
503 final->set_start( cur );
504 final->set_length( glen );
505 oc->bh_add(this, final);
506 }
507
508 cur += glen;
509 left -= glen;
510 continue; // more?
511 }
512 }
513
514 // set version
515 assert(final);
516 assert(final->get_journal_tid() == tid);
517 ldout(oc->cct, 10) << "map_write final is " << *final << dendl;
518
519 return final;
520 }
521
522 void ObjectCacher::Object::replace_journal_tid(BufferHead *bh,
523 ceph_tid_t tid) {
524 ceph_tid_t bh_tid = bh->get_journal_tid();
525
526 assert(tid == 0 || bh_tid <= tid);
527 if (bh_tid != 0 && bh_tid != tid) {
528 // inform journal that it should not expect a writeback from this extent
529 oc->writeback_handler.overwrite_extent(get_oid(), bh->start(),
530 bh->length(), bh_tid, tid);
531 }
532 bh->set_journal_tid(tid);
533 }
534
535 void ObjectCacher::Object::truncate(loff_t s)
536 {
537 assert(oc->lock.is_locked());
538 ldout(oc->cct, 10) << "truncate " << *this << " to " << s << dendl;
539
540 while (!data.empty()) {
541 BufferHead *bh = data.rbegin()->second;
542 if (bh->end() <= s)
543 break;
544
545 // split bh at truncation point?
546 if (bh->start() < s) {
547 split(bh, s);
548 continue;
549 }
550
551 // remove bh entirely
552 assert(bh->start() >= s);
553 assert(bh->waitfor_read.empty());
554 replace_journal_tid(bh, 0);
555 oc->bh_remove(this, bh);
556 delete bh;
557 }
558 }
559
560 void ObjectCacher::Object::discard(loff_t off, loff_t len)
561 {
562 assert(oc->lock.is_locked());
563 ldout(oc->cct, 10) << "discard " << *this << " " << off << "~" << len
564 << dendl;
565
566 if (!exists) {
567 ldout(oc->cct, 10) << " setting exists on " << *this << dendl;
568 exists = true;
569 }
570 if (complete) {
571 ldout(oc->cct, 10) << " clearing complete on " << *this << dendl;
572 complete = false;
573 }
574
575 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(off);
576 while (p != data.end()) {
577 BufferHead *bh = p->second;
578 if (bh->start() >= off + len)
579 break;
580
581 // split bh at truncation point?
582 if (bh->start() < off) {
583 split(bh, off);
584 ++p;
585 continue;
586 }
587
588 assert(bh->start() >= off);
589 if (bh->end() > off + len) {
590 split(bh, off + len);
591 }
592
593 ++p;
594 ldout(oc->cct, 10) << "discard " << *this << " bh " << *bh << dendl;
595 assert(bh->waitfor_read.empty());
596 replace_journal_tid(bh, 0);
597 oc->bh_remove(this, bh);
598 delete bh;
599 }
600 }
601
602
603
604 /*** ObjectCacher ***/
605
606 #undef dout_prefix
607 #define dout_prefix *_dout << "objectcacher "
608
609
610 ObjectCacher::ObjectCacher(CephContext *cct_, string name,
611 WritebackHandler& wb, Mutex& l,
612 flush_set_callback_t flush_callback,
613 void *flush_callback_arg, uint64_t max_bytes,
614 uint64_t max_objects, uint64_t max_dirty,
615 uint64_t target_dirty, double max_dirty_age,
616 bool block_writes_upfront)
617 : perfcounter(NULL),
618 cct(cct_), writeback_handler(wb), name(name), lock(l),
619 max_dirty(max_dirty), target_dirty(target_dirty),
620 max_size(max_bytes), max_objects(max_objects),
621 max_dirty_age(ceph::make_timespan(max_dirty_age)),
622 block_writes_upfront(block_writes_upfront),
623 trace_endpoint("ObjectCacher"),
624 flush_set_callback(flush_callback),
625 flush_set_callback_arg(flush_callback_arg),
626 last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct),
627 stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
628 stat_missing(0), stat_error(0), stat_dirty_waiting(0), reads_outstanding(0)
629 {
630 perf_start();
631 finisher.start();
632 scattered_write = writeback_handler.can_scattered_write();
633 }
634
635 ObjectCacher::~ObjectCacher()
636 {
637 finisher.stop();
638 perf_stop();
639 // we should be empty.
640 for (vector<ceph::unordered_map<sobject_t, Object *> >::iterator i
641 = objects.begin();
642 i != objects.end();
643 ++i)
644 assert(i->empty());
645 assert(bh_lru_rest.lru_get_size() == 0);
646 assert(bh_lru_dirty.lru_get_size() == 0);
647 assert(ob_lru.lru_get_size() == 0);
648 assert(dirty_or_tx_bh.empty());
649 }
650
651 void ObjectCacher::perf_start()
652 {
653 string n = "objectcacher-" + name;
654 PerfCountersBuilder plb(cct, n, l_objectcacher_first, l_objectcacher_last);
655
656 plb.add_u64_counter(l_objectcacher_cache_ops_hit,
657 "cache_ops_hit", "Hit operations");
658 plb.add_u64_counter(l_objectcacher_cache_ops_miss,
659 "cache_ops_miss", "Miss operations");
660 plb.add_u64_counter(l_objectcacher_cache_bytes_hit,
661 "cache_bytes_hit", "Hit data");
662 plb.add_u64_counter(l_objectcacher_cache_bytes_miss,
663 "cache_bytes_miss", "Miss data");
664 plb.add_u64_counter(l_objectcacher_data_read,
665 "data_read", "Read data");
666 plb.add_u64_counter(l_objectcacher_data_written,
667 "data_written", "Data written to cache");
668 plb.add_u64_counter(l_objectcacher_data_flushed,
669 "data_flushed", "Data flushed");
670 plb.add_u64_counter(l_objectcacher_overwritten_in_flush,
671 "data_overwritten_while_flushing",
672 "Data overwritten while flushing");
673 plb.add_u64_counter(l_objectcacher_write_ops_blocked, "write_ops_blocked",
674 "Write operations, delayed due to dirty limits");
675 plb.add_u64_counter(l_objectcacher_write_bytes_blocked,
676 "write_bytes_blocked",
677 "Write data blocked on dirty limit");
678 plb.add_time(l_objectcacher_write_time_blocked, "write_time_blocked",
679 "Time spent blocking a write due to dirty limits");
680
681 perfcounter = plb.create_perf_counters();
682 cct->get_perfcounters_collection()->add(perfcounter);
683 }
684
685 void ObjectCacher::perf_stop()
686 {
687 assert(perfcounter);
688 cct->get_perfcounters_collection()->remove(perfcounter);
689 delete perfcounter;
690 }
691
692 /* private */
693 ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid,
694 uint64_t object_no,
695 ObjectSet *oset,
696 object_locator_t &l,
697 uint64_t truncate_size,
698 uint64_t truncate_seq)
699 {
700 // XXX: Add handling of nspace in object_locator_t in cache
701 assert(lock.is_locked());
702 // have it?
703 if ((uint32_t)l.pool < objects.size()) {
704 if (objects[l.pool].count(oid)) {
705 Object *o = objects[l.pool][oid];
706 o->object_no = object_no;
707 o->truncate_size = truncate_size;
708 o->truncate_seq = truncate_seq;
709 return o;
710 }
711 } else {
712 objects.resize(l.pool+1);
713 }
714
715 // create it.
716 Object *o = new Object(this, oid, object_no, oset, l, truncate_size,
717 truncate_seq);
718 objects[l.pool][oid] = o;
719 ob_lru.lru_insert_top(o);
720 return o;
721 }
722
723 void ObjectCacher::close_object(Object *ob)
724 {
725 assert(lock.is_locked());
726 ldout(cct, 10) << "close_object " << *ob << dendl;
727 assert(ob->can_close());
728
729 // ok!
730 ob_lru.lru_remove(ob);
731 objects[ob->oloc.pool].erase(ob->get_soid());
732 ob->set_item.remove_myself();
733 delete ob;
734 }
735
736 void ObjectCacher::bh_read(BufferHead *bh, int op_flags,
737 const ZTracer::Trace &parent_trace)
738 {
739 assert(lock.is_locked());
740 ldout(cct, 7) << "bh_read on " << *bh << " outstanding reads "
741 << reads_outstanding << dendl;
742
743 ZTracer::Trace trace;
744 if (parent_trace.valid()) {
745 trace.init("", &trace_endpoint, &parent_trace);
746 trace.copy_name("bh_read " + bh->ob->get_oid().name);
747 trace.event("start");
748 }
749
750 mark_rx(bh);
751 bh->last_read_tid = ++last_read_tid;
752
753 // finisher
754 C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob, bh->last_read_tid,
755 bh->start(), bh->length(), trace);
756 // go
757 writeback_handler.read(bh->ob->get_oid(), bh->ob->get_object_number(),
758 bh->ob->get_oloc(), bh->start(), bh->length(),
759 bh->ob->get_snap(), &onfinish->bl,
760 bh->ob->truncate_size, bh->ob->truncate_seq,
761 op_flags, trace, onfinish);
762
763 ++reads_outstanding;
764 }
765
766 void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid,
767 ceph_tid_t tid, loff_t start,
768 uint64_t length, bufferlist &bl, int r,
769 bool trust_enoent)
770 {
771 assert(lock.is_locked());
772 ldout(cct, 7) << "bh_read_finish "
773 << oid
774 << " tid " << tid
775 << " " << start << "~" << length
776 << " (bl is " << bl.length() << ")"
777 << " returned " << r
778 << " outstanding reads " << reads_outstanding
779 << dendl;
780
781 if (r >= 0 && bl.length() < length) {
782 ldout(cct, 7) << "bh_read_finish " << oid << " padding " << start << "~"
783 << length << " with " << length - bl.length() << " bytes of zeroes"
784 << dendl;
785 bl.append_zero(length - bl.length());
786 }
787
788 list<Context*> ls;
789 int err = 0;
790
791 if (objects[poolid].count(oid) == 0) {
792 ldout(cct, 7) << "bh_read_finish no object cache" << dendl;
793 } else {
794 Object *ob = objects[poolid][oid];
795
796 if (r == -ENOENT && !ob->complete) {
797 // wake up *all* rx waiters, or else we risk reordering
798 // identical reads. e.g.
799 // read 1~1
800 // reply to unrelated 3~1 -> !exists
801 // read 1~1 -> immediate ENOENT
802 // reply to first 1~1 -> ooo ENOENT
803 bool allzero = true;
804 for (map<loff_t, BufferHead*>::iterator p = ob->data.begin();
805 p != ob->data.end(); ++p) {
806 BufferHead *bh = p->second;
807 for (map<loff_t, list<Context*> >::iterator p
808 = bh->waitfor_read.begin();
809 p != bh->waitfor_read.end();
810 ++p)
811 ls.splice(ls.end(), p->second);
812 bh->waitfor_read.clear();
813 if (!bh->is_zero() && !bh->is_rx())
814 allzero = false;
815 }
816
817 // just pass through and retry all waiters if we don't trust
818 // -ENOENT for this read
819 if (trust_enoent) {
820 ldout(cct, 7)
821 << "bh_read_finish ENOENT, marking complete and !exists on " << *ob
822 << dendl;
823 ob->complete = true;
824 ob->exists = false;
825
826 /* If all the bhs are effectively zero, get rid of them. All
827 * the waiters will be retried and get -ENOENT immediately, so
828 * it's safe to clean up the unneeded bh's now. Since we know
829 * it's safe to remove them now, do so, so they aren't hanging
830 *around waiting for more -ENOENTs from rados while the cache
831 * is being shut down.
832 *
833 * Only do this when all the bhs are rx or clean, to match the
834 * condition in _readx(). If there are any non-rx or non-clean
835 * bhs, _readx() will wait for the final result instead of
836 * returning -ENOENT immediately.
837 */
838 if (allzero) {
839 ldout(cct, 10)
840 << "bh_read_finish ENOENT and allzero, getting rid of "
841 << "bhs for " << *ob << dendl;
842 map<loff_t, BufferHead*>::iterator p = ob->data.begin();
843 while (p != ob->data.end()) {
844 BufferHead *bh = p->second;
845 // current iterator will be invalidated by bh_remove()
846 ++p;
847 bh_remove(ob, bh);
848 delete bh;
849 }
850 }
851 }
852 }
853
854 // apply to bh's!
855 loff_t opos = start;
856 while (true) {
857 map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(opos);
858 if (p == ob->data.end())
859 break;
860 if (opos >= start+(loff_t)length) {
861 ldout(cct, 20) << "break due to opos " << opos << " >= start+length "
862 << start << "+" << length << "=" << start+(loff_t)length
863 << dendl;
864 break;
865 }
866
867 BufferHead *bh = p->second;
868 ldout(cct, 20) << "checking bh " << *bh << dendl;
869
870 // finishers?
871 for (map<loff_t, list<Context*> >::iterator it
872 = bh->waitfor_read.begin();
873 it != bh->waitfor_read.end();
874 ++it)
875 ls.splice(ls.end(), it->second);
876 bh->waitfor_read.clear();
877
878 if (bh->start() > opos) {
879 ldout(cct, 1) << "bh_read_finish skipping gap "
880 << opos << "~" << bh->start() - opos
881 << dendl;
882 opos = bh->start();
883 continue;
884 }
885
886 if (!bh->is_rx()) {
887 ldout(cct, 10) << "bh_read_finish skipping non-rx " << *bh << dendl;
888 opos = bh->end();
889 continue;
890 }
891
892 if (bh->last_read_tid != tid) {
893 ldout(cct, 10) << "bh_read_finish bh->last_read_tid "
894 << bh->last_read_tid << " != tid " << tid
895 << ", skipping" << dendl;
896 opos = bh->end();
897 continue;
898 }
899
900 assert(opos >= bh->start());
901 assert(bh->start() == opos); // we don't merge rx bh's... yet!
902 assert(bh->length() <= start+(loff_t)length-opos);
903
904 if (bh->error < 0)
905 err = bh->error;
906
907 opos = bh->end();
908
909 if (r == -ENOENT) {
910 if (trust_enoent) {
911 ldout(cct, 10) << "bh_read_finish removing " << *bh << dendl;
912 bh_remove(ob, bh);
913 delete bh;
914 } else {
915 ldout(cct, 10) << "skipping unstrusted -ENOENT and will retry for "
916 << *bh << dendl;
917 }
918 continue;
919 }
920
921 if (r < 0) {
922 bh->error = r;
923 mark_error(bh);
924 } else {
925 bh->bl.substr_of(bl,
926 bh->start() - start,
927 bh->length());
928 mark_clean(bh);
929 }
930
931 ldout(cct, 10) << "bh_read_finish read " << *bh << dendl;
932
933 ob->try_merge_bh(bh);
934 }
935 }
936
937 // called with lock held.
938 ldout(cct, 20) << "finishing waiters " << ls << dendl;
939
940 finish_contexts(cct, ls, err);
941 retry_waiting_reads();
942
943 --reads_outstanding;
944 read_cond.Signal();
945 }
946
947 void ObjectCacher::bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
948 int64_t *max_amount, int *max_count)
949 {
950 list<BufferHead*> blist;
951
952 int count = 0;
953 int64_t total_len = 0;
954 set<BufferHead*, BufferHead::ptr_lt>::iterator it = dirty_or_tx_bh.find(bh);
955 assert(it != dirty_or_tx_bh.end());
956 for (set<BufferHead*, BufferHead::ptr_lt>::iterator p = it;
957 p != dirty_or_tx_bh.end();
958 ++p) {
959 BufferHead *obh = *p;
960 if (obh->ob != bh->ob)
961 break;
962 if (obh->is_dirty() && obh->last_write <= cutoff) {
963 blist.push_back(obh);
964 ++count;
965 total_len += obh->length();
966 if ((max_count && count > *max_count) ||
967 (max_amount && total_len > *max_amount))
968 break;
969 }
970 }
971
972 while (it != dirty_or_tx_bh.begin()) {
973 --it;
974 BufferHead *obh = *it;
975 if (obh->ob != bh->ob)
976 break;
977 if (obh->is_dirty() && obh->last_write <= cutoff) {
978 blist.push_front(obh);
979 ++count;
980 total_len += obh->length();
981 if ((max_count && count > *max_count) ||
982 (max_amount && total_len > *max_amount))
983 break;
984 }
985 }
986 if (max_count)
987 *max_count -= count;
988 if (max_amount)
989 *max_amount -= total_len;
990
991 bh_write_scattered(blist);
992 }
993
994 class ObjectCacher::C_WriteCommit : public Context {
995 ObjectCacher *oc;
996 int64_t poolid;
997 sobject_t oid;
998 vector<pair<loff_t, uint64_t> > ranges;
999 ZTracer::Trace trace;
1000 public:
1001 ceph_tid_t tid = 0;
1002 C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, loff_t s,
1003 uint64_t l, const ZTracer::Trace &trace) :
1004 oc(c), poolid(_poolid), oid(o), trace(trace) {
1005 ranges.push_back(make_pair(s, l));
1006 }
1007 C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o,
1008 vector<pair<loff_t, uint64_t> >& _ranges) :
1009 oc(c), poolid(_poolid), oid(o), tid(0) {
1010 ranges.swap(_ranges);
1011 }
1012 void finish(int r) override {
1013 oc->bh_write_commit(poolid, oid, ranges, tid, r);
1014 trace.event("finish");
1015 }
1016 };
1017 void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
1018 {
1019 assert(lock.is_locked());
1020
1021 Object *ob = blist.front()->ob;
1022 ob->get();
1023
1024 ceph::real_time last_write;
1025 SnapContext snapc;
1026 vector<pair<loff_t, uint64_t> > ranges;
1027 vector<pair<uint64_t, bufferlist> > io_vec;
1028
1029 ranges.reserve(blist.size());
1030 io_vec.reserve(blist.size());
1031
1032 uint64_t total_len = 0;
1033 for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1034 BufferHead *bh = *p;
1035 ldout(cct, 7) << "bh_write_scattered " << *bh << dendl;
1036 assert(bh->ob == ob);
1037 assert(bh->bl.length() == bh->length());
1038 ranges.push_back(pair<loff_t, uint64_t>(bh->start(), bh->length()));
1039
1040 int n = io_vec.size();
1041 io_vec.resize(n + 1);
1042 io_vec[n].first = bh->start();
1043 io_vec[n].second = bh->bl;
1044
1045 total_len += bh->length();
1046 if (bh->snapc.seq > snapc.seq)
1047 snapc = bh->snapc;
1048 if (bh->last_write > last_write)
1049 last_write = bh->last_write;
1050 }
1051
1052 C_WriteCommit *oncommit = new C_WriteCommit(this, ob->oloc.pool, ob->get_soid(), ranges);
1053
1054 ceph_tid_t tid = writeback_handler.write(ob->get_oid(), ob->get_oloc(),
1055 io_vec, snapc, last_write,
1056 ob->truncate_size, ob->truncate_seq,
1057 oncommit);
1058 oncommit->tid = tid;
1059 ob->last_write_tid = tid;
1060 for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1061 BufferHead *bh = *p;
1062 bh->last_write_tid = tid;
1063 mark_tx(bh);
1064 }
1065
1066 if (perfcounter)
1067 perfcounter->inc(l_objectcacher_data_flushed, total_len);
1068 }
1069
1070 void ObjectCacher::bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace)
1071 {
1072 assert(lock.is_locked());
1073 ldout(cct, 7) << "bh_write " << *bh << dendl;
1074
1075 bh->ob->get();
1076
1077 ZTracer::Trace trace;
1078 if (parent_trace.valid()) {
1079 trace.init("", &trace_endpoint, &parent_trace);
1080 trace.copy_name("bh_write " + bh->ob->get_oid().name);
1081 trace.event("start");
1082 }
1083
1084 // finishers
1085 C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->oloc.pool,
1086 bh->ob->get_soid(), bh->start(),
1087 bh->length(), trace);
1088 // go
1089 ceph_tid_t tid = writeback_handler.write(bh->ob->get_oid(),
1090 bh->ob->get_oloc(),
1091 bh->start(), bh->length(),
1092 bh->snapc, bh->bl, bh->last_write,
1093 bh->ob->truncate_size,
1094 bh->ob->truncate_seq,
1095 bh->journal_tid, trace, oncommit);
1096 ldout(cct, 20) << " tid " << tid << " on " << bh->ob->get_oid() << dendl;
1097
1098 // set bh last_write_tid
1099 oncommit->tid = tid;
1100 bh->ob->last_write_tid = tid;
1101 bh->last_write_tid = tid;
1102
1103 if (perfcounter) {
1104 perfcounter->inc(l_objectcacher_data_flushed, bh->length());
1105 }
1106
1107 mark_tx(bh);
1108 }
1109
1110 void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid,
1111 vector<pair<loff_t, uint64_t> >& ranges,
1112 ceph_tid_t tid, int r)
1113 {
1114 assert(lock.is_locked());
1115 ldout(cct, 7) << "bh_write_commit " << oid << " tid " << tid
1116 << " ranges " << ranges << " returned " << r << dendl;
1117
1118 if (objects[poolid].count(oid) == 0) {
1119 ldout(cct, 7) << "bh_write_commit no object cache" << dendl;
1120 return;
1121 }
1122
1123 Object *ob = objects[poolid][oid];
1124 int was_dirty_or_tx = ob->oset->dirty_or_tx;
1125
1126 for (vector<pair<loff_t, uint64_t> >::iterator p = ranges.begin();
1127 p != ranges.end();
1128 ++p) {
1129 loff_t start = p->first;
1130 uint64_t length = p->second;
1131 if (!ob->exists) {
1132 ldout(cct, 10) << "bh_write_commit marking exists on " << *ob << dendl;
1133 ob->exists = true;
1134
1135 if (writeback_handler.may_copy_on_write(ob->get_oid(), start, length,
1136 ob->get_snap())) {
1137 ldout(cct, 10) << "bh_write_commit may copy on write, clearing "
1138 "complete on " << *ob << dendl;
1139 ob->complete = false;
1140 }
1141 }
1142
1143 vector<pair<loff_t, BufferHead*>> hit;
1144 // apply to bh's!
1145 for (map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(start);
1146 p != ob->data.end();
1147 ++p) {
1148 BufferHead *bh = p->second;
1149
1150 if (bh->start() > start+(loff_t)length)
1151 break;
1152
1153 if (bh->start() < start &&
1154 bh->end() > start+(loff_t)length) {
1155 ldout(cct, 20) << "bh_write_commit skipping " << *bh << dendl;
1156 continue;
1157 }
1158
1159 // make sure bh is tx
1160 if (!bh->is_tx()) {
1161 ldout(cct, 10) << "bh_write_commit skipping non-tx " << *bh << dendl;
1162 continue;
1163 }
1164
1165 // make sure bh tid matches
1166 if (bh->last_write_tid != tid) {
1167 assert(bh->last_write_tid > tid);
1168 ldout(cct, 10) << "bh_write_commit newer tid on " << *bh << dendl;
1169 continue;
1170 }
1171
1172 if (r >= 0) {
1173 // ok! mark bh clean and error-free
1174 mark_clean(bh);
1175 bh->set_journal_tid(0);
1176 if (bh->get_nocache())
1177 bh_lru_rest.lru_bottouch(bh);
1178 hit.push_back(make_pair(bh->start(), bh));
1179 ldout(cct, 10) << "bh_write_commit clean " << *bh << dendl;
1180 } else {
1181 mark_dirty(bh);
1182 ldout(cct, 10) << "bh_write_commit marking dirty again due to error "
1183 << *bh << " r = " << r << " " << cpp_strerror(-r)
1184 << dendl;
1185 }
1186 }
1187
1188 for (auto& p : hit) {
1189 //p.second maybe merged and deleted in merge_left
1190 if (ob->data.count(p.first))
1191 ob->try_merge_bh(p.second);
1192 }
1193 }
1194
1195 // update last_commit.
1196 assert(ob->last_commit_tid < tid);
1197 ob->last_commit_tid = tid;
1198
1199 // waiters?
1200 list<Context*> ls;
1201 if (ob->waitfor_commit.count(tid)) {
1202 ls.splice(ls.begin(), ob->waitfor_commit[tid]);
1203 ob->waitfor_commit.erase(tid);
1204 }
1205
1206 // is the entire object set now clean and fully committed?
1207 ObjectSet *oset = ob->oset;
1208 ob->put();
1209
1210 if (flush_set_callback &&
1211 was_dirty_or_tx > 0 &&
1212 oset->dirty_or_tx == 0) { // nothing dirty/tx
1213 flush_set_callback(flush_set_callback_arg, oset);
1214 }
1215
1216 if (!ls.empty())
1217 finish_contexts(cct, ls, r);
1218 }
1219
1220 void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount)
1221 {
1222 assert(trace != nullptr);
1223 assert(lock.is_locked());
1224 ceph::real_time cutoff = ceph::real_clock::now();
1225
1226 ldout(cct, 10) << "flush " << amount << dendl;
1227
1228 /*
1229 * NOTE: we aren't actually pulling things off the LRU here, just
1230 * looking at the tail item. Then we call bh_write, which moves it
1231 * to the other LRU, so that we can call
1232 * lru_dirty.lru_get_next_expire() again.
1233 */
1234 int64_t left = amount;
1235 while (amount == 0 || left > 0) {
1236 BufferHead *bh = static_cast<BufferHead*>(
1237 bh_lru_dirty.lru_get_next_expire());
1238 if (!bh) break;
1239 if (bh->last_write > cutoff) break;
1240
1241 if (scattered_write) {
1242 bh_write_adjacencies(bh, cutoff, amount > 0 ? &left : NULL, NULL);
1243 } else {
1244 left -= bh->length();
1245 bh_write(bh, *trace);
1246 }
1247 }
1248 }
1249
1250
1251 void ObjectCacher::trim()
1252 {
1253 assert(lock.is_locked());
1254 ldout(cct, 10) << "trim start: bytes: max " << max_size << " clean "
1255 << get_stat_clean() << ", objects: max " << max_objects
1256 << " current " << ob_lru.lru_get_size() << dendl;
1257
1258 while (get_stat_clean() > 0 && (uint64_t) get_stat_clean() > max_size) {
1259 BufferHead *bh = static_cast<BufferHead*>(bh_lru_rest.lru_expire());
1260 if (!bh)
1261 break;
1262
1263 ldout(cct, 10) << "trim trimming " << *bh << dendl;
1264 assert(bh->is_clean() || bh->is_zero() || bh->is_error());
1265
1266 Object *ob = bh->ob;
1267 bh_remove(ob, bh);
1268 delete bh;
1269
1270 if (ob->complete) {
1271 ldout(cct, 10) << "trim clearing complete on " << *ob << dendl;
1272 ob->complete = false;
1273 }
1274 }
1275
1276 while (ob_lru.lru_get_size() > max_objects) {
1277 Object *ob = static_cast<Object*>(ob_lru.lru_expire());
1278 if (!ob)
1279 break;
1280
1281 ldout(cct, 10) << "trim trimming " << *ob << dendl;
1282 close_object(ob);
1283 }
1284
1285 ldout(cct, 10) << "trim finish: max " << max_size << " clean "
1286 << get_stat_clean() << ", objects: max " << max_objects
1287 << " current " << ob_lru.lru_get_size() << dendl;
1288 }
1289
1290
1291
1292 /* public */
1293
1294 bool ObjectCacher::is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
1295 snapid_t snapid)
1296 {
1297 assert(lock.is_locked());
1298 for (vector<ObjectExtent>::iterator ex_it = extents.begin();
1299 ex_it != extents.end();
1300 ++ex_it) {
1301 ldout(cct, 10) << "is_cached " << *ex_it << dendl;
1302
1303 // get Object cache
1304 sobject_t soid(ex_it->oid, snapid);
1305 Object *o = get_object_maybe(soid, ex_it->oloc);
1306 if (!o)
1307 return false;
1308 if (!o->is_cached(ex_it->offset, ex_it->length))
1309 return false;
1310 }
1311 return true;
1312 }
1313
1314
1315 /*
1316 * returns # bytes read (if in cache). onfinish is untouched (caller
1317 * must delete it)
1318 * returns 0 if doing async read
1319 */
1320 int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
1321 ZTracer::Trace *parent_trace)
1322 {
1323 ZTracer::Trace trace;
1324 if (parent_trace != nullptr) {
1325 trace.init("read", &trace_endpoint, parent_trace);
1326 trace.event("start");
1327 }
1328
1329 int r =_readx(rd, oset, onfinish, true, &trace);
1330 if (r < 0) {
1331 trace.event("finish");
1332 }
1333 return r;
1334 }
1335
1336 int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
1337 bool external_call, ZTracer::Trace *trace)
1338 {
1339 assert(trace != nullptr);
1340 assert(lock.is_locked());
1341 bool success = true;
1342 int error = 0;
1343 uint64_t bytes_in_cache = 0;
1344 uint64_t bytes_not_in_cache = 0;
1345 uint64_t total_bytes_read = 0;
1346 map<uint64_t, bufferlist> stripe_map; // final buffer offset -> substring
1347 bool dontneed = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1348 bool nocache = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1349
1350 /*
1351 * WARNING: we can only meaningfully return ENOENT if the read request
1352 * passed in a single ObjectExtent. Any caller who wants ENOENT instead of
1353 * zeroed buffers needs to feed single extents into readx().
1354 */
1355 assert(!oset->return_enoent || rd->extents.size() == 1);
1356
1357 for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
1358 ex_it != rd->extents.end();
1359 ++ex_it) {
1360 ldout(cct, 10) << "readx " << *ex_it << dendl;
1361
1362 total_bytes_read += ex_it->length;
1363
1364 // get Object cache
1365 sobject_t soid(ex_it->oid, rd->snap);
1366 Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1367 ex_it->truncate_size, oset->truncate_seq);
1368 if (external_call)
1369 touch_ob(o);
1370
1371 // does not exist and no hits?
1372 if (oset->return_enoent && !o->exists) {
1373 ldout(cct, 10) << "readx object !exists, 1 extent..." << dendl;
1374
1375 // should we worry about COW underneath us?
1376 if (writeback_handler.may_copy_on_write(soid.oid, ex_it->offset,
1377 ex_it->length, soid.snap)) {
1378 ldout(cct, 20) << "readx may copy on write" << dendl;
1379 bool wait = false;
1380 list<BufferHead*> blist;
1381 for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1382 bh_it != o->data.end();
1383 ++bh_it) {
1384 BufferHead *bh = bh_it->second;
1385 if (bh->is_dirty() || bh->is_tx()) {
1386 ldout(cct, 10) << "readx flushing " << *bh << dendl;
1387 wait = true;
1388 if (bh->is_dirty()) {
1389 if (scattered_write)
1390 blist.push_back(bh);
1391 else
1392 bh_write(bh, *trace);
1393 }
1394 }
1395 }
1396 if (scattered_write && !blist.empty())
1397 bh_write_scattered(blist);
1398 if (wait) {
1399 ldout(cct, 10) << "readx waiting on tid " << o->last_write_tid
1400 << " on " << *o << dendl;
1401 o->waitfor_commit[o->last_write_tid].push_back(
1402 new C_RetryRead(this,rd, oset, onfinish, *trace));
1403 // FIXME: perfcounter!
1404 return 0;
1405 }
1406 }
1407
1408 // can we return ENOENT?
1409 bool allzero = true;
1410 for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1411 bh_it != o->data.end();
1412 ++bh_it) {
1413 ldout(cct, 20) << "readx ob has bh " << *bh_it->second << dendl;
1414 if (!bh_it->second->is_zero() && !bh_it->second->is_rx()) {
1415 allzero = false;
1416 break;
1417 }
1418 }
1419 if (allzero) {
1420 ldout(cct, 10) << "readx ob has all zero|rx, returning ENOENT"
1421 << dendl;
1422 delete rd;
1423 if (dontneed)
1424 bottouch_ob(o);
1425 return -ENOENT;
1426 }
1427 }
1428
1429 // map extent into bufferheads
1430 map<loff_t, BufferHead*> hits, missing, rx, errors;
1431 o->map_read(*ex_it, hits, missing, rx, errors);
1432 if (external_call) {
1433 // retry reading error buffers
1434 missing.insert(errors.begin(), errors.end());
1435 } else {
1436 // some reads had errors, fail later so completions
1437 // are cleaned up properly
1438 // TODO: make read path not call _readx for every completion
1439 hits.insert(errors.begin(), errors.end());
1440 }
1441
1442 if (!missing.empty() || !rx.empty()) {
1443 // read missing
1444 map<loff_t, BufferHead*>::iterator last = missing.end();
1445 for (map<loff_t, BufferHead*>::iterator bh_it = missing.begin();
1446 bh_it != missing.end();
1447 ++bh_it) {
1448 uint64_t rx_bytes = static_cast<uint64_t>(
1449 stat_rx + bh_it->second->length());
1450 bytes_not_in_cache += bh_it->second->length();
1451 if (!waitfor_read.empty() || (stat_rx > 0 && rx_bytes > max_size)) {
1452 // cache is full with concurrent reads -- wait for rx's to complete
1453 // to constrain memory growth (especially during copy-ups)
1454 if (success) {
1455 ldout(cct, 10) << "readx missed, waiting on cache to complete "
1456 << waitfor_read.size() << " blocked reads, "
1457 << (MAX(rx_bytes, max_size) - max_size)
1458 << " read bytes" << dendl;
1459 waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish,
1460 *trace));
1461 }
1462
1463 bh_remove(o, bh_it->second);
1464 delete bh_it->second;
1465 } else {
1466 bh_it->second->set_nocache(nocache);
1467 bh_read(bh_it->second, rd->fadvise_flags, *trace);
1468 if ((success && onfinish) || last != missing.end())
1469 last = bh_it;
1470 }
1471 success = false;
1472 }
1473
1474 //add wait in last bh avoid wakeup early. Because read is order
1475 if (last != missing.end()) {
1476 ldout(cct, 10) << "readx missed, waiting on " << *last->second
1477 << " off " << last->first << dendl;
1478 last->second->waitfor_read[last->first].push_back(
1479 new C_RetryRead(this, rd, oset, onfinish, *trace) );
1480
1481 }
1482
1483 // bump rx
1484 for (map<loff_t, BufferHead*>::iterator bh_it = rx.begin();
1485 bh_it != rx.end();
1486 ++bh_it) {
1487 touch_bh(bh_it->second); // bump in lru, so we don't lose it.
1488 if (success && onfinish) {
1489 ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
1490 << " off " << bh_it->first << dendl;
1491 bh_it->second->waitfor_read[bh_it->first].push_back(
1492 new C_RetryRead(this, rd, oset, onfinish, *trace) );
1493 }
1494 bytes_not_in_cache += bh_it->second->length();
1495 success = false;
1496 }
1497
1498 for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1499 bh_it != hits.end(); ++bh_it)
1500 //bump in lru, so we don't lose it when later read
1501 touch_bh(bh_it->second);
1502
1503 } else {
1504 assert(!hits.empty());
1505
1506 // make a plain list
1507 for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1508 bh_it != hits.end();
1509 ++bh_it) {
1510 BufferHead *bh = bh_it->second;
1511 ldout(cct, 10) << "readx hit bh " << *bh << dendl;
1512 if (bh->is_error() && bh->error)
1513 error = bh->error;
1514 bytes_in_cache += bh->length();
1515
1516 if (bh->get_nocache() && bh->is_clean())
1517 bh_lru_rest.lru_bottouch(bh);
1518 else
1519 touch_bh(bh);
1520 //must be after touch_bh because touch_bh set dontneed false
1521 if (dontneed &&
1522 ((loff_t)ex_it->offset <= bh->start() &&
1523 (bh->end() <=(loff_t)(ex_it->offset + ex_it->length)))) {
1524 bh->set_dontneed(true); //if dirty
1525 if (bh->is_clean())
1526 bh_lru_rest.lru_bottouch(bh);
1527 }
1528 }
1529
1530 if (!error) {
1531 // create reverse map of buffer offset -> object for the
1532 // eventual result. this is over a single ObjectExtent, so we
1533 // know that
1534 // - the bh's are contiguous
1535 // - the buffer frags need not be (and almost certainly aren't)
1536 loff_t opos = ex_it->offset;
1537 map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1538 assert(bh_it->second->start() <= opos);
1539 uint64_t bhoff = opos - bh_it->second->start();
1540 vector<pair<uint64_t,uint64_t> >::iterator f_it
1541 = ex_it->buffer_extents.begin();
1542 uint64_t foff = 0;
1543 while (1) {
1544 BufferHead *bh = bh_it->second;
1545 assert(opos == (loff_t)(bh->start() + bhoff));
1546
1547 uint64_t len = MIN(f_it->second - foff, bh->length() - bhoff);
1548 ldout(cct, 10) << "readx rmap opos " << opos << ": " << *bh << " +"
1549 << bhoff << " frag " << f_it->first << "~"
1550 << f_it->second << " +" << foff << "~" << len
1551 << dendl;
1552
1553 bufferlist bit;
1554 // put substr here first, since substr_of clobbers, and we
1555 // may get multiple bh's at this stripe_map position
1556 if (bh->is_zero()) {
1557 stripe_map[f_it->first].append_zero(len);
1558 } else {
1559 bit.substr_of(bh->bl,
1560 opos - bh->start(),
1561 len);
1562 stripe_map[f_it->first].claim_append(bit);
1563 }
1564
1565 opos += len;
1566 bhoff += len;
1567 foff += len;
1568 if (opos == bh->end()) {
1569 ++bh_it;
1570 bhoff = 0;
1571 }
1572 if (foff == f_it->second) {
1573 ++f_it;
1574 foff = 0;
1575 }
1576 if (bh_it == hits.end()) break;
1577 if (f_it == ex_it->buffer_extents.end())
1578 break;
1579 }
1580 assert(f_it == ex_it->buffer_extents.end());
1581 assert(opos == (loff_t)ex_it->offset + (loff_t)ex_it->length);
1582 }
1583
1584 if (dontneed && o->include_all_cached_data(ex_it->offset, ex_it->length))
1585 bottouch_ob(o);
1586 }
1587 }
1588
1589 if (!success) {
1590 if (perfcounter && external_call) {
1591 perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1592 perfcounter->inc(l_objectcacher_cache_bytes_miss, bytes_not_in_cache);
1593 perfcounter->inc(l_objectcacher_cache_ops_miss);
1594 }
1595 if (onfinish) {
1596 ldout(cct, 20) << "readx defer " << rd << dendl;
1597 } else {
1598 ldout(cct, 20) << "readx drop " << rd << " (no complete, but no waiter)"
1599 << dendl;
1600 delete rd;
1601 }
1602 return 0; // wait!
1603 }
1604 if (perfcounter && external_call) {
1605 perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1606 perfcounter->inc(l_objectcacher_cache_bytes_hit, bytes_in_cache);
1607 perfcounter->inc(l_objectcacher_cache_ops_hit);
1608 }
1609
1610 // no misses... success! do the read.
1611 ldout(cct, 10) << "readx has all buffers" << dendl;
1612
1613 // ok, assemble into result buffer.
1614 uint64_t pos = 0;
1615 if (rd->bl && !error) {
1616 rd->bl->clear();
1617 for (map<uint64_t,bufferlist>::iterator i = stripe_map.begin();
1618 i != stripe_map.end();
1619 ++i) {
1620 assert(pos == i->first);
1621 ldout(cct, 10) << "readx adding buffer len " << i->second.length()
1622 << " at " << pos << dendl;
1623 pos += i->second.length();
1624 rd->bl->claim_append(i->second);
1625 assert(rd->bl->length() == pos);
1626 }
1627 ldout(cct, 10) << "readx result is " << rd->bl->length() << dendl;
1628 } else if (!error) {
1629 ldout(cct, 10) << "readx no bufferlist ptr (readahead?), done." << dendl;
1630 map<uint64_t,bufferlist>::reverse_iterator i = stripe_map.rbegin();
1631 pos = i->first + i->second.length();
1632 }
1633
1634 // done with read.
1635 int ret = error ? error : pos;
1636 ldout(cct, 20) << "readx done " << rd << " " << ret << dendl;
1637 assert(pos <= (uint64_t) INT_MAX);
1638
1639 delete rd;
1640
1641 trim();
1642
1643 return ret;
1644 }
1645
1646 void ObjectCacher::retry_waiting_reads()
1647 {
1648 list<Context *> ls;
1649 ls.swap(waitfor_read);
1650
1651 while (!ls.empty() && waitfor_read.empty()) {
1652 Context *ctx = ls.front();
1653 ls.pop_front();
1654 ctx->complete(0);
1655 }
1656 waitfor_read.splice(waitfor_read.end(), ls);
1657 }
1658
1659 int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
1660 ZTracer::Trace *parent_trace)
1661 {
1662 assert(lock.is_locked());
1663 ceph::real_time now = ceph::real_clock::now();
1664 uint64_t bytes_written = 0;
1665 uint64_t bytes_written_in_flush = 0;
1666 bool dontneed = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1667 bool nocache = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1668
1669 ZTracer::Trace trace;
1670 if (parent_trace != nullptr) {
1671 trace.init("write", &trace_endpoint, parent_trace);
1672 trace.event("start");
1673 }
1674
1675 for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
1676 ex_it != wr->extents.end();
1677 ++ex_it) {
1678 // get object cache
1679 sobject_t soid(ex_it->oid, CEPH_NOSNAP);
1680 Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1681 ex_it->truncate_size, oset->truncate_seq);
1682
1683 // map it all into a single bufferhead.
1684 BufferHead *bh = o->map_write(*ex_it, wr->journal_tid);
1685 bool missing = bh->is_missing();
1686 bh->snapc = wr->snapc;
1687
1688 bytes_written += ex_it->length;
1689 if (bh->is_tx()) {
1690 bytes_written_in_flush += ex_it->length;
1691 }
1692
1693 // adjust buffer pointers (ie "copy" data into my cache)
1694 // this is over a single ObjectExtent, so we know that
1695 // - there is one contiguous bh
1696 // - the buffer frags need not be (and almost certainly aren't)
1697 // note: i assume striping is monotonic... no jumps backwards, ever!
1698 loff_t opos = ex_it->offset;
1699 for (vector<pair<uint64_t, uint64_t> >::iterator f_it
1700 = ex_it->buffer_extents.begin();
1701 f_it != ex_it->buffer_extents.end();
1702 ++f_it) {
1703 ldout(cct, 10) << "writex writing " << f_it->first << "~"
1704 << f_it->second << " into " << *bh << " at " << opos
1705 << dendl;
1706 uint64_t bhoff = bh->start() - opos;
1707 assert(f_it->second <= bh->length() - bhoff);
1708
1709 // get the frag we're mapping in
1710 bufferlist frag;
1711 frag.substr_of(wr->bl,
1712 f_it->first, f_it->second);
1713
1714 // keep anything left of bhoff
1715 bufferlist newbl;
1716 if (bhoff)
1717 newbl.substr_of(bh->bl, 0, bhoff);
1718 newbl.claim_append(frag);
1719 bh->bl.swap(newbl);
1720
1721 opos += f_it->second;
1722 }
1723
1724 // ok, now bh is dirty.
1725 mark_dirty(bh);
1726 if (dontneed)
1727 bh->set_dontneed(true);
1728 else if (nocache && missing)
1729 bh->set_nocache(true);
1730 else
1731 touch_bh(bh);
1732
1733 bh->last_write = now;
1734
1735 o->try_merge_bh(bh);
1736 }
1737
1738 if (perfcounter) {
1739 perfcounter->inc(l_objectcacher_data_written, bytes_written);
1740 if (bytes_written_in_flush) {
1741 perfcounter->inc(l_objectcacher_overwritten_in_flush,
1742 bytes_written_in_flush);
1743 }
1744 }
1745
1746 int r = _wait_for_write(wr, bytes_written, oset, &trace, onfreespace);
1747 delete wr;
1748
1749 //verify_stats();
1750 trim();
1751 return r;
1752 }
1753
1754 class ObjectCacher::C_WaitForWrite : public Context {
1755 public:
1756 C_WaitForWrite(ObjectCacher *oc, uint64_t len,
1757 const ZTracer::Trace &trace, Context *onfinish) :
1758 m_oc(oc), m_len(len), m_trace(trace), m_onfinish(onfinish) {}
1759 void finish(int r) override;
1760 private:
1761 ObjectCacher *m_oc;
1762 uint64_t m_len;
1763 ZTracer::Trace m_trace;
1764 Context *m_onfinish;
1765 };
1766
1767 void ObjectCacher::C_WaitForWrite::finish(int r)
1768 {
1769 Mutex::Locker l(m_oc->lock);
1770 m_oc->maybe_wait_for_writeback(m_len, &m_trace);
1771 m_onfinish->complete(r);
1772 }
1773
1774 void ObjectCacher::maybe_wait_for_writeback(uint64_t len,
1775 ZTracer::Trace *trace)
1776 {
1777 assert(lock.is_locked());
1778 ceph::mono_time start = ceph::mono_clock::now();
1779 int blocked = 0;
1780 // wait for writeback?
1781 // - wait for dirty and tx bytes (relative to the max_dirty threshold)
1782 // - do not wait for bytes other waiters are waiting on. this means that
1783 // threads do not wait for each other. this effectively allows the cache
1784 // size to balloon proportional to the data that is in flight.
1785 while (get_stat_dirty() + get_stat_tx() > 0 &&
1786 (uint64_t) (get_stat_dirty() + get_stat_tx()) >=
1787 max_dirty + get_stat_dirty_waiting()) {
1788 if (blocked == 0) {
1789 trace->event("start wait for writeback");
1790 }
1791 ldout(cct, 10) << __func__ << " waiting for dirty|tx "
1792 << (get_stat_dirty() + get_stat_tx()) << " >= max "
1793 << max_dirty << " + dirty_waiting "
1794 << get_stat_dirty_waiting() << dendl;
1795 flusher_cond.Signal();
1796 stat_dirty_waiting += len;
1797 stat_cond.Wait(lock);
1798 stat_dirty_waiting -= len;
1799 ++blocked;
1800 ldout(cct, 10) << __func__ << " woke up" << dendl;
1801 }
1802 if (blocked > 0) {
1803 trace->event("finish wait for writeback");
1804 }
1805 if (blocked && perfcounter) {
1806 perfcounter->inc(l_objectcacher_write_ops_blocked);
1807 perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
1808 ceph::timespan blocked = ceph::mono_clock::now() - start;
1809 perfcounter->tinc(l_objectcacher_write_time_blocked, blocked);
1810 }
1811 }
1812
1813 // blocking wait for write.
1814 int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
1815 ZTracer::Trace *trace, Context *onfreespace)
1816 {
1817 assert(lock.is_locked());
1818 assert(trace != nullptr);
1819 int ret = 0;
1820
1821 if (max_dirty > 0) {
1822 if (block_writes_upfront) {
1823 maybe_wait_for_writeback(len, trace);
1824 if (onfreespace)
1825 onfreespace->complete(0);
1826 } else {
1827 assert(onfreespace);
1828 finisher.queue(new C_WaitForWrite(this, len, *trace, onfreespace));
1829 }
1830 } else {
1831 // write-thru! flush what we just wrote.
1832 Cond cond;
1833 bool done = false;
1834 Context *fin = block_writes_upfront ?
1835 new C_Cond(&cond, &done, &ret) : onfreespace;
1836 assert(fin);
1837 bool flushed = flush_set(oset, wr->extents, trace, fin);
1838 assert(!flushed); // we just dirtied it, and didn't drop our lock!
1839 ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len
1840 << " bytes" << dendl;
1841 if (block_writes_upfront) {
1842 while (!done)
1843 cond.Wait(lock);
1844 ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl;
1845 if (onfreespace)
1846 onfreespace->complete(ret);
1847 }
1848 }
1849
1850 // start writeback anyway?
1851 if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty) {
1852 ldout(cct, 10) << "wait_for_write " << get_stat_dirty() << " > target "
1853 << target_dirty << ", nudging flusher" << dendl;
1854 flusher_cond.Signal();
1855 }
1856 return ret;
1857 }
1858
1859 void ObjectCacher::flusher_entry()
1860 {
1861 ldout(cct, 10) << "flusher start" << dendl;
1862 lock.Lock();
1863 while (!flusher_stop) {
1864 loff_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() +
1865 get_stat_dirty();
1866 ldout(cct, 11) << "flusher "
1867 << all << " / " << max_size << ": "
1868 << get_stat_tx() << " tx, "
1869 << get_stat_rx() << " rx, "
1870 << get_stat_clean() << " clean, "
1871 << get_stat_dirty() << " dirty ("
1872 << target_dirty << " target, "
1873 << max_dirty << " max)"
1874 << dendl;
1875 loff_t actual = get_stat_dirty() + get_stat_dirty_waiting();
1876
1877 ZTracer::Trace trace;
1878 if (cct->_conf->osdc_blkin_trace_all) {
1879 trace.init("flusher", &trace_endpoint);
1880 trace.event("start");
1881 }
1882
1883 if (actual > 0 && (uint64_t) actual > target_dirty) {
1884 // flush some dirty pages
1885 ldout(cct, 10) << "flusher " << get_stat_dirty() << " dirty + "
1886 << get_stat_dirty_waiting() << " dirty_waiting > target "
1887 << target_dirty << ", flushing some dirty bhs" << dendl;
1888 flush(&trace, actual - target_dirty);
1889 } else {
1890 // check tail of lru for old dirty items
1891 ceph::real_time cutoff = ceph::real_clock::now();
1892 cutoff -= max_dirty_age;
1893 BufferHead *bh = 0;
1894 int max = MAX_FLUSH_UNDER_LOCK;
1895 while ((bh = static_cast<BufferHead*>(bh_lru_dirty.
1896 lru_get_next_expire())) != 0 &&
1897 bh->last_write <= cutoff &&
1898 max > 0) {
1899 ldout(cct, 10) << "flusher flushing aged dirty bh " << *bh << dendl;
1900 if (scattered_write) {
1901 bh_write_adjacencies(bh, cutoff, NULL, &max);
1902 } else {
1903 bh_write(bh, trace);
1904 --max;
1905 }
1906 }
1907 if (!max) {
1908 // back off the lock to avoid starving other threads
1909 trace.event("backoff");
1910 lock.Unlock();
1911 lock.Lock();
1912 continue;
1913 }
1914 }
1915
1916 trace.event("finish");
1917 if (flusher_stop)
1918 break;
1919
1920 flusher_cond.WaitInterval(lock, seconds(1));
1921 }
1922
1923 /* Wait for reads to finish. This is only possible if handling
1924 * -ENOENT made some read completions finish before their rados read
1925 * came back. If we don't wait for them, and destroy the cache, when
1926 * the rados reads do come back their callback will try to access the
1927 * no-longer-valid ObjectCacher.
1928 */
1929 while (reads_outstanding > 0) {
1930 ldout(cct, 10) << "Waiting for all reads to complete. Number left: "
1931 << reads_outstanding << dendl;
1932 read_cond.Wait(lock);
1933 }
1934
1935 lock.Unlock();
1936 ldout(cct, 10) << "flusher finish" << dendl;
1937 }
1938
1939
1940 // -------------------------------------------------
1941
1942 bool ObjectCacher::set_is_empty(ObjectSet *oset)
1943 {
1944 assert(lock.is_locked());
1945 if (oset->objects.empty())
1946 return true;
1947
1948 for (xlist<Object*>::iterator p = oset->objects.begin(); !p.end(); ++p)
1949 if (!(*p)->is_empty())
1950 return false;
1951
1952 return true;
1953 }
1954
1955 bool ObjectCacher::set_is_cached(ObjectSet *oset)
1956 {
1957 assert(lock.is_locked());
1958 if (oset->objects.empty())
1959 return false;
1960
1961 for (xlist<Object*>::iterator p = oset->objects.begin();
1962 !p.end(); ++p) {
1963 Object *ob = *p;
1964 for (map<loff_t,BufferHead*>::iterator q = ob->data.begin();
1965 q != ob->data.end();
1966 ++q) {
1967 BufferHead *bh = q->second;
1968 if (!bh->is_dirty() && !bh->is_tx())
1969 return true;
1970 }
1971 }
1972
1973 return false;
1974 }
1975
1976 bool ObjectCacher::set_is_dirty_or_committing(ObjectSet *oset)
1977 {
1978 assert(lock.is_locked());
1979 if (oset->objects.empty())
1980 return false;
1981
1982 for (xlist<Object*>::iterator i = oset->objects.begin();
1983 !i.end(); ++i) {
1984 Object *ob = *i;
1985
1986 for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
1987 p != ob->data.end();
1988 ++p) {
1989 BufferHead *bh = p->second;
1990 if (bh->is_dirty() || bh->is_tx())
1991 return true;
1992 }
1993 }
1994
1995 return false;
1996 }
1997
1998
1999 // purge. non-blocking. violently removes dirty buffers from cache.
2000 void ObjectCacher::purge(Object *ob)
2001 {
2002 assert(lock.is_locked());
2003 ldout(cct, 10) << "purge " << *ob << dendl;
2004
2005 ob->truncate(0);
2006 }
2007
2008
2009 // flush. non-blocking. no callback.
2010 // true if clean, already flushed.
2011 // false if we wrote something.
2012 // be sloppy about the ranges and flush any buffer it touches
2013 bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length,
2014 ZTracer::Trace *trace)
2015 {
2016 assert(trace != nullptr);
2017 assert(lock.is_locked());
2018 list<BufferHead*> blist;
2019 bool clean = true;
2020 ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl;
2021 for (map<loff_t,BufferHead*>::const_iterator p = ob->data_lower_bound(offset);
2022 p != ob->data.end();
2023 ++p) {
2024 BufferHead *bh = p->second;
2025 ldout(cct, 20) << "flush " << *bh << dendl;
2026 if (length && bh->start() > offset+length) {
2027 break;
2028 }
2029 if (bh->is_tx()) {
2030 clean = false;
2031 continue;
2032 }
2033 if (!bh->is_dirty()) {
2034 continue;
2035 }
2036
2037 if (scattered_write)
2038 blist.push_back(bh);
2039 else
2040 bh_write(bh, *trace);
2041 clean = false;
2042 }
2043 if (scattered_write && !blist.empty())
2044 bh_write_scattered(blist);
2045
2046 return clean;
2047 }
2048
2049 bool ObjectCacher::_flush_set_finish(C_GatherBuilder *gather,
2050 Context *onfinish)
2051 {
2052 assert(lock.is_locked());
2053 if (gather->has_subs()) {
2054 gather->set_finisher(onfinish);
2055 gather->activate();
2056 return false;
2057 }
2058
2059 ldout(cct, 10) << "flush_set has no dirty|tx bhs" << dendl;
2060 onfinish->complete(0);
2061 return true;
2062 }
2063
2064 // flush. non-blocking, takes callback.
2065 // returns true if already flushed
2066 bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
2067 {
2068 assert(lock.is_locked());
2069 assert(onfinish != NULL);
2070 if (oset->objects.empty()) {
2071 ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2072 onfinish->complete(0);
2073 return true;
2074 }
2075
2076 ldout(cct, 10) << "flush_set " << oset << dendl;
2077
2078 // we'll need to wait for all objects to flush!
2079 C_GatherBuilder gather(cct);
2080 set<Object*> waitfor_commit;
2081
2082 list<BufferHead*> blist;
2083 Object *last_ob = NULL;
2084 set<BufferHead*, BufferHead::ptr_lt>::const_iterator it, p, q;
2085
2086 // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
2087 // order. But items in oset->objects are not sorted. So the iterator can
2088 // point to any buffer head in the ObjectSet
2089 BufferHead key(*oset->objects.begin());
2090 it = dirty_or_tx_bh.lower_bound(&key);
2091 p = q = it;
2092
2093 bool backwards = true;
2094 if (it != dirty_or_tx_bh.begin())
2095 --it;
2096 else
2097 backwards = false;
2098
2099 for (; p != dirty_or_tx_bh.end(); p = q) {
2100 ++q;
2101 BufferHead *bh = *p;
2102 if (bh->ob->oset != oset)
2103 break;
2104 waitfor_commit.insert(bh->ob);
2105 if (bh->is_dirty()) {
2106 if (scattered_write) {
2107 if (last_ob != bh->ob) {
2108 if (!blist.empty()) {
2109 bh_write_scattered(blist);
2110 blist.clear();
2111 }
2112 last_ob = bh->ob;
2113 }
2114 blist.push_back(bh);
2115 } else {
2116 bh_write(bh, {});
2117 }
2118 }
2119 }
2120
2121 if (backwards) {
2122 for(p = q = it; true; p = q) {
2123 if (q != dirty_or_tx_bh.begin())
2124 --q;
2125 else
2126 backwards = false;
2127 BufferHead *bh = *p;
2128 if (bh->ob->oset != oset)
2129 break;
2130 waitfor_commit.insert(bh->ob);
2131 if (bh->is_dirty()) {
2132 if (scattered_write) {
2133 if (last_ob != bh->ob) {
2134 if (!blist.empty()) {
2135 bh_write_scattered(blist);
2136 blist.clear();
2137 }
2138 last_ob = bh->ob;
2139 }
2140 blist.push_front(bh);
2141 } else {
2142 bh_write(bh, {});
2143 }
2144 }
2145 if (!backwards)
2146 break;
2147 }
2148 }
2149
2150 if (scattered_write && !blist.empty())
2151 bh_write_scattered(blist);
2152
2153 for (set<Object*>::iterator i = waitfor_commit.begin();
2154 i != waitfor_commit.end(); ++i) {
2155 Object *ob = *i;
2156
2157 // we'll need to gather...
2158 ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2159 << ob->last_write_tid << " on " << *ob << dendl;
2160 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2161 }
2162
2163 return _flush_set_finish(&gather, onfinish);
2164 }
2165
2166 // flush. non-blocking, takes callback.
2167 // returns true if already flushed
2168 bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv,
2169 ZTracer::Trace *trace, Context *onfinish)
2170 {
2171 assert(lock.is_locked());
2172 assert(trace != nullptr);
2173 assert(onfinish != NULL);
2174 if (oset->objects.empty()) {
2175 ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2176 onfinish->complete(0);
2177 return true;
2178 }
2179
2180 ldout(cct, 10) << "flush_set " << oset << " on " << exv.size()
2181 << " ObjectExtents" << dendl;
2182
2183 // we'll need to wait for all objects to flush!
2184 C_GatherBuilder gather(cct);
2185
2186 for (vector<ObjectExtent>::iterator p = exv.begin();
2187 p != exv.end();
2188 ++p) {
2189 ObjectExtent &ex = *p;
2190 sobject_t soid(ex.oid, CEPH_NOSNAP);
2191 if (objects[oset->poolid].count(soid) == 0)
2192 continue;
2193 Object *ob = objects[oset->poolid][soid];
2194
2195 ldout(cct, 20) << "flush_set " << oset << " ex " << ex << " ob " << soid
2196 << " " << ob << dendl;
2197
2198 if (!flush(ob, ex.offset, ex.length, trace)) {
2199 // we'll need to gather...
2200 ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2201 << ob->last_write_tid << " on " << *ob << dendl;
2202 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2203 }
2204 }
2205
2206 return _flush_set_finish(&gather, onfinish);
2207 }
2208
2209 // flush all dirty data. non-blocking, takes callback.
2210 // returns true if already flushed
2211 bool ObjectCacher::flush_all(Context *onfinish)
2212 {
2213 assert(lock.is_locked());
2214 assert(onfinish != NULL);
2215
2216 ldout(cct, 10) << "flush_all " << dendl;
2217
2218 // we'll need to wait for all objects to flush!
2219 C_GatherBuilder gather(cct);
2220 set<Object*> waitfor_commit;
2221
2222 list<BufferHead*> blist;
2223 Object *last_ob = NULL;
2224 set<BufferHead*, BufferHead::ptr_lt>::iterator next, it;
2225 next = it = dirty_or_tx_bh.begin();
2226 while (it != dirty_or_tx_bh.end()) {
2227 ++next;
2228 BufferHead *bh = *it;
2229 waitfor_commit.insert(bh->ob);
2230
2231 if (bh->is_dirty()) {
2232 if (scattered_write) {
2233 if (last_ob != bh->ob) {
2234 if (!blist.empty()) {
2235 bh_write_scattered(blist);
2236 blist.clear();
2237 }
2238 last_ob = bh->ob;
2239 }
2240 blist.push_back(bh);
2241 } else {
2242 bh_write(bh, {});
2243 }
2244 }
2245
2246 it = next;
2247 }
2248
2249 if (scattered_write && !blist.empty())
2250 bh_write_scattered(blist);
2251
2252 for (set<Object*>::iterator i = waitfor_commit.begin();
2253 i != waitfor_commit.end();
2254 ++i) {
2255 Object *ob = *i;
2256
2257 // we'll need to gather...
2258 ldout(cct, 10) << "flush_all will wait for ack tid "
2259 << ob->last_write_tid << " on " << *ob << dendl;
2260 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2261 }
2262
2263 return _flush_set_finish(&gather, onfinish);
2264 }
2265
2266 void ObjectCacher::purge_set(ObjectSet *oset)
2267 {
2268 assert(lock.is_locked());
2269 if (oset->objects.empty()) {
2270 ldout(cct, 10) << "purge_set on " << oset << " dne" << dendl;
2271 return;
2272 }
2273
2274 ldout(cct, 10) << "purge_set " << oset << dendl;
2275 const bool were_dirty = oset->dirty_or_tx > 0;
2276
2277 for (xlist<Object*>::iterator i = oset->objects.begin();
2278 !i.end(); ++i) {
2279 Object *ob = *i;
2280 purge(ob);
2281 }
2282
2283 // Although we have purged rather than flushed, caller should still
2284 // drop any resources associate with dirty data.
2285 assert(oset->dirty_or_tx == 0);
2286 if (flush_set_callback && were_dirty) {
2287 flush_set_callback(flush_set_callback_arg, oset);
2288 }
2289 }
2290
2291
2292 loff_t ObjectCacher::release(Object *ob)
2293 {
2294 assert(lock.is_locked());
2295 list<BufferHead*> clean;
2296 loff_t o_unclean = 0;
2297
2298 for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
2299 p != ob->data.end();
2300 ++p) {
2301 BufferHead *bh = p->second;
2302 if (bh->is_clean() || bh->is_zero() || bh->is_error())
2303 clean.push_back(bh);
2304 else
2305 o_unclean += bh->length();
2306 }
2307
2308 for (list<BufferHead*>::iterator p = clean.begin();
2309 p != clean.end();
2310 ++p) {
2311 bh_remove(ob, *p);
2312 delete *p;
2313 }
2314
2315 if (ob->can_close()) {
2316 ldout(cct, 10) << "release trimming " << *ob << dendl;
2317 close_object(ob);
2318 assert(o_unclean == 0);
2319 return 0;
2320 }
2321
2322 if (ob->complete) {
2323 ldout(cct, 10) << "release clearing complete on " << *ob << dendl;
2324 ob->complete = false;
2325 }
2326 if (!ob->exists) {
2327 ldout(cct, 10) << "release setting exists on " << *ob << dendl;
2328 ob->exists = true;
2329 }
2330
2331 return o_unclean;
2332 }
2333
2334 loff_t ObjectCacher::release_set(ObjectSet *oset)
2335 {
2336 assert(lock.is_locked());
2337 // return # bytes not clean (and thus not released).
2338 loff_t unclean = 0;
2339
2340 if (oset->objects.empty()) {
2341 ldout(cct, 10) << "release_set on " << oset << " dne" << dendl;
2342 return 0;
2343 }
2344
2345 ldout(cct, 10) << "release_set " << oset << dendl;
2346
2347 xlist<Object*>::iterator q;
2348 for (xlist<Object*>::iterator p = oset->objects.begin();
2349 !p.end(); ) {
2350 q = p;
2351 ++q;
2352 Object *ob = *p;
2353
2354 loff_t o_unclean = release(ob);
2355 unclean += o_unclean;
2356
2357 if (o_unclean)
2358 ldout(cct, 10) << "release_set " << oset << " " << *ob
2359 << " has " << o_unclean << " bytes left"
2360 << dendl;
2361 p = q;
2362 }
2363
2364 if (unclean) {
2365 ldout(cct, 10) << "release_set " << oset
2366 << ", " << unclean << " bytes left" << dendl;
2367 }
2368
2369 return unclean;
2370 }
2371
2372
2373 uint64_t ObjectCacher::release_all()
2374 {
2375 assert(lock.is_locked());
2376 ldout(cct, 10) << "release_all" << dendl;
2377 uint64_t unclean = 0;
2378
2379 vector<ceph::unordered_map<sobject_t, Object*> >::iterator i
2380 = objects.begin();
2381 while (i != objects.end()) {
2382 ceph::unordered_map<sobject_t, Object*>::iterator p = i->begin();
2383 while (p != i->end()) {
2384 ceph::unordered_map<sobject_t, Object*>::iterator n = p;
2385 ++n;
2386
2387 Object *ob = p->second;
2388
2389 loff_t o_unclean = release(ob);
2390 unclean += o_unclean;
2391
2392 if (o_unclean)
2393 ldout(cct, 10) << "release_all " << *ob
2394 << " has " << o_unclean << " bytes left"
2395 << dendl;
2396 p = n;
2397 }
2398 ++i;
2399 }
2400
2401 if (unclean) {
2402 ldout(cct, 10) << "release_all unclean " << unclean << " bytes left"
2403 << dendl;
2404 }
2405
2406 return unclean;
2407 }
2408
2409 void ObjectCacher::clear_nonexistence(ObjectSet *oset)
2410 {
2411 assert(lock.is_locked());
2412 ldout(cct, 10) << "clear_nonexistence() " << oset << dendl;
2413
2414 for (xlist<Object*>::iterator p = oset->objects.begin();
2415 !p.end(); ++p) {
2416 Object *ob = *p;
2417 if (!ob->exists) {
2418 ldout(cct, 10) << " setting exists and complete on " << *ob << dendl;
2419 ob->exists = true;
2420 ob->complete = false;
2421 }
2422 for (xlist<C_ReadFinish*>::iterator q = ob->reads.begin();
2423 !q.end(); ++q) {
2424 C_ReadFinish *comp = *q;
2425 comp->distrust_enoent();
2426 }
2427 }
2428 }
2429
2430 /**
2431 * discard object extents from an ObjectSet by removing the objects in
2432 * exls from the in-memory oset.
2433 */
2434 void ObjectCacher::discard_set(ObjectSet *oset, const vector<ObjectExtent>& exls)
2435 {
2436 assert(lock.is_locked());
2437 if (oset->objects.empty()) {
2438 ldout(cct, 10) << "discard_set on " << oset << " dne" << dendl;
2439 return;
2440 }
2441
2442 ldout(cct, 10) << "discard_set " << oset << dendl;
2443
2444 bool were_dirty = oset->dirty_or_tx > 0;
2445
2446 for (vector<ObjectExtent>::const_iterator p = exls.begin();
2447 p != exls.end();
2448 ++p) {
2449 ldout(cct, 10) << "discard_set " << oset << " ex " << *p << dendl;
2450 const ObjectExtent &ex = *p;
2451 sobject_t soid(ex.oid, CEPH_NOSNAP);
2452 if (objects[oset->poolid].count(soid) == 0)
2453 continue;
2454 Object *ob = objects[oset->poolid][soid];
2455
2456 ob->discard(ex.offset, ex.length);
2457 }
2458
2459 // did we truncate off dirty data?
2460 if (flush_set_callback &&
2461 were_dirty && oset->dirty_or_tx == 0)
2462 flush_set_callback(flush_set_callback_arg, oset);
2463 }
2464
2465 void ObjectCacher::verify_stats() const
2466 {
2467 assert(lock.is_locked());
2468 ldout(cct, 10) << "verify_stats" << dendl;
2469
2470 loff_t clean = 0, zero = 0, dirty = 0, rx = 0, tx = 0, missing = 0,
2471 error = 0;
2472 for (vector<ceph::unordered_map<sobject_t, Object*> >::const_iterator i
2473 = objects.begin();
2474 i != objects.end();
2475 ++i) {
2476 for (ceph::unordered_map<sobject_t, Object*>::const_iterator p
2477 = i->begin();
2478 p != i->end();
2479 ++p) {
2480 Object *ob = p->second;
2481 for (map<loff_t, BufferHead*>::const_iterator q = ob->data.begin();
2482 q != ob->data.end();
2483 ++q) {
2484 BufferHead *bh = q->second;
2485 switch (bh->get_state()) {
2486 case BufferHead::STATE_MISSING:
2487 missing += bh->length();
2488 break;
2489 case BufferHead::STATE_CLEAN:
2490 clean += bh->length();
2491 break;
2492 case BufferHead::STATE_ZERO:
2493 zero += bh->length();
2494 break;
2495 case BufferHead::STATE_DIRTY:
2496 dirty += bh->length();
2497 break;
2498 case BufferHead::STATE_TX:
2499 tx += bh->length();
2500 break;
2501 case BufferHead::STATE_RX:
2502 rx += bh->length();
2503 break;
2504 case BufferHead::STATE_ERROR:
2505 error += bh->length();
2506 break;
2507 default:
2508 ceph_abort();
2509 }
2510 }
2511 }
2512 }
2513
2514 ldout(cct, 10) << " clean " << clean << " rx " << rx << " tx " << tx
2515 << " dirty " << dirty << " missing " << missing
2516 << " error " << error << dendl;
2517 assert(clean == stat_clean);
2518 assert(rx == stat_rx);
2519 assert(tx == stat_tx);
2520 assert(dirty == stat_dirty);
2521 assert(missing == stat_missing);
2522 assert(zero == stat_zero);
2523 assert(error == stat_error);
2524 }
2525
2526 void ObjectCacher::bh_stat_add(BufferHead *bh)
2527 {
2528 assert(lock.is_locked());
2529 switch (bh->get_state()) {
2530 case BufferHead::STATE_MISSING:
2531 stat_missing += bh->length();
2532 break;
2533 case BufferHead::STATE_CLEAN:
2534 stat_clean += bh->length();
2535 break;
2536 case BufferHead::STATE_ZERO:
2537 stat_zero += bh->length();
2538 break;
2539 case BufferHead::STATE_DIRTY:
2540 stat_dirty += bh->length();
2541 bh->ob->dirty_or_tx += bh->length();
2542 bh->ob->oset->dirty_or_tx += bh->length();
2543 break;
2544 case BufferHead::STATE_TX:
2545 stat_tx += bh->length();
2546 bh->ob->dirty_or_tx += bh->length();
2547 bh->ob->oset->dirty_or_tx += bh->length();
2548 break;
2549 case BufferHead::STATE_RX:
2550 stat_rx += bh->length();
2551 break;
2552 case BufferHead::STATE_ERROR:
2553 stat_error += bh->length();
2554 break;
2555 default:
2556 assert(0 == "bh_stat_add: invalid bufferhead state");
2557 }
2558 if (get_stat_dirty_waiting() > 0)
2559 stat_cond.Signal();
2560 }
2561
2562 void ObjectCacher::bh_stat_sub(BufferHead *bh)
2563 {
2564 assert(lock.is_locked());
2565 switch (bh->get_state()) {
2566 case BufferHead::STATE_MISSING:
2567 stat_missing -= bh->length();
2568 break;
2569 case BufferHead::STATE_CLEAN:
2570 stat_clean -= bh->length();
2571 break;
2572 case BufferHead::STATE_ZERO:
2573 stat_zero -= bh->length();
2574 break;
2575 case BufferHead::STATE_DIRTY:
2576 stat_dirty -= bh->length();
2577 bh->ob->dirty_or_tx -= bh->length();
2578 bh->ob->oset->dirty_or_tx -= bh->length();
2579 break;
2580 case BufferHead::STATE_TX:
2581 stat_tx -= bh->length();
2582 bh->ob->dirty_or_tx -= bh->length();
2583 bh->ob->oset->dirty_or_tx -= bh->length();
2584 break;
2585 case BufferHead::STATE_RX:
2586 stat_rx -= bh->length();
2587 break;
2588 case BufferHead::STATE_ERROR:
2589 stat_error -= bh->length();
2590 break;
2591 default:
2592 assert(0 == "bh_stat_sub: invalid bufferhead state");
2593 }
2594 }
2595
2596 void ObjectCacher::bh_set_state(BufferHead *bh, int s)
2597 {
2598 assert(lock.is_locked());
2599 int state = bh->get_state();
2600 // move between lru lists?
2601 if (s == BufferHead::STATE_DIRTY && state != BufferHead::STATE_DIRTY) {
2602 bh_lru_rest.lru_remove(bh);
2603 bh_lru_dirty.lru_insert_top(bh);
2604 } else if (s != BufferHead::STATE_DIRTY &&state == BufferHead::STATE_DIRTY) {
2605 bh_lru_dirty.lru_remove(bh);
2606 if (bh->get_dontneed())
2607 bh_lru_rest.lru_insert_bot(bh);
2608 else
2609 bh_lru_rest.lru_insert_top(bh);
2610 }
2611
2612 if ((s == BufferHead::STATE_TX ||
2613 s == BufferHead::STATE_DIRTY) &&
2614 state != BufferHead::STATE_TX &&
2615 state != BufferHead::STATE_DIRTY) {
2616 dirty_or_tx_bh.insert(bh);
2617 } else if ((state == BufferHead::STATE_TX ||
2618 state == BufferHead::STATE_DIRTY) &&
2619 s != BufferHead::STATE_TX &&
2620 s != BufferHead::STATE_DIRTY) {
2621 dirty_or_tx_bh.erase(bh);
2622 }
2623
2624 if (s != BufferHead::STATE_ERROR &&
2625 state == BufferHead::STATE_ERROR) {
2626 bh->error = 0;
2627 }
2628
2629 // set state
2630 bh_stat_sub(bh);
2631 bh->set_state(s);
2632 bh_stat_add(bh);
2633 }
2634
2635 void ObjectCacher::bh_add(Object *ob, BufferHead *bh)
2636 {
2637 assert(lock.is_locked());
2638 ldout(cct, 30) << "bh_add " << *ob << " " << *bh << dendl;
2639 ob->add_bh(bh);
2640 if (bh->is_dirty()) {
2641 bh_lru_dirty.lru_insert_top(bh);
2642 dirty_or_tx_bh.insert(bh);
2643 } else {
2644 if (bh->get_dontneed())
2645 bh_lru_rest.lru_insert_bot(bh);
2646 else
2647 bh_lru_rest.lru_insert_top(bh);
2648 }
2649
2650 if (bh->is_tx()) {
2651 dirty_or_tx_bh.insert(bh);
2652 }
2653 bh_stat_add(bh);
2654 }
2655
2656 void ObjectCacher::bh_remove(Object *ob, BufferHead *bh)
2657 {
2658 assert(lock.is_locked());
2659 assert(bh->get_journal_tid() == 0);
2660 ldout(cct, 30) << "bh_remove " << *ob << " " << *bh << dendl;
2661 ob->remove_bh(bh);
2662 if (bh->is_dirty()) {
2663 bh_lru_dirty.lru_remove(bh);
2664 dirty_or_tx_bh.erase(bh);
2665 } else {
2666 bh_lru_rest.lru_remove(bh);
2667 }
2668
2669 if (bh->is_tx()) {
2670 dirty_or_tx_bh.erase(bh);
2671 }
2672 bh_stat_sub(bh);
2673 if (get_stat_dirty_waiting() > 0)
2674 stat_cond.Signal();
2675 }
2676