]> git.proxmox.com Git - ceph.git/blob - ceph/src/osdc/ObjectCacher.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / osdc / ObjectCacher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <limits.h>
5
6 #include "msg/Messenger.h"
7 #include "ObjectCacher.h"
8 #include "WritebackHandler.h"
9 #include "common/errno.h"
10 #include "common/perf_counters.h"
11
12 #include "include/assert.h"
13
14 #define MAX_FLUSH_UNDER_LOCK 20 ///< max bh's we start writeback on
15
16 using std::chrono::seconds;
17 /// while holding the lock
18
19 /*** ObjectCacher::BufferHead ***/
20
21
22 /*** ObjectCacher::Object ***/
23
24 #define dout_subsys ceph_subsys_objectcacher
25 #undef dout_prefix
26 #define dout_prefix *_dout << "objectcacher.object(" << oid << ") "
27
28
29
30 class ObjectCacher::C_ReadFinish : public Context {
31 ObjectCacher *oc;
32 int64_t poolid;
33 sobject_t oid;
34 loff_t start;
35 uint64_t length;
36 xlist<C_ReadFinish*>::item set_item;
37 bool trust_enoent;
38 ceph_tid_t tid;
39
40 public:
41 bufferlist bl;
42 C_ReadFinish(ObjectCacher *c, Object *ob, ceph_tid_t t, loff_t s,
43 uint64_t l) :
44 oc(c), poolid(ob->oloc.pool), oid(ob->get_soid()), start(s), length(l),
45 set_item(this), trust_enoent(true),
46 tid(t) {
47 ob->reads.push_back(&set_item);
48 }
49
50 void finish(int r) override {
51 oc->bh_read_finish(poolid, oid, tid, start, length, bl, r, trust_enoent);
52
53 // object destructor clears the list
54 if (set_item.is_on_list())
55 set_item.remove_myself();
56 }
57
58 void distrust_enoent() {
59 trust_enoent = false;
60 }
61 };
62
63 class ObjectCacher::C_RetryRead : public Context {
64 ObjectCacher *oc;
65 OSDRead *rd;
66 ObjectSet *oset;
67 Context *onfinish;
68 public:
69 C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c)
70 : oc(_oc), rd(r), oset(os), onfinish(c) {}
71 void finish(int r) override {
72 if (r < 0) {
73 if (onfinish)
74 onfinish->complete(r);
75 return;
76 }
77 int ret = oc->_readx(rd, oset, onfinish, false);
78 if (ret != 0 && onfinish) {
79 onfinish->complete(ret);
80 }
81 }
82 };
83
84 ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left,
85 loff_t off)
86 {
87 assert(oc->lock.is_locked());
88 ldout(oc->cct, 20) << "split " << *left << " at " << off << dendl;
89
90 // split off right
91 ObjectCacher::BufferHead *right = new BufferHead(this);
92
93 //inherit and if later access, this auto clean.
94 right->set_dontneed(left->get_dontneed());
95 right->set_nocache(left->get_nocache());
96
97 right->last_write_tid = left->last_write_tid;
98 right->last_read_tid = left->last_read_tid;
99 right->set_state(left->get_state());
100 right->snapc = left->snapc;
101 right->set_journal_tid(left->journal_tid);
102
103 loff_t newleftlen = off - left->start();
104 right->set_start(off);
105 right->set_length(left->length() - newleftlen);
106
107 // shorten left
108 oc->bh_stat_sub(left);
109 left->set_length(newleftlen);
110 oc->bh_stat_add(left);
111
112 // add right
113 oc->bh_add(this, right);
114
115 // split buffers too
116 bufferlist bl;
117 bl.claim(left->bl);
118 if (bl.length()) {
119 assert(bl.length() == (left->length() + right->length()));
120 right->bl.substr_of(bl, left->length(), right->length());
121 left->bl.substr_of(bl, 0, left->length());
122 }
123
124 // move read waiters
125 if (!left->waitfor_read.empty()) {
126 map<loff_t, list<Context*> >::iterator start_remove
127 = left->waitfor_read.begin();
128 while (start_remove != left->waitfor_read.end() &&
129 start_remove->first < right->start())
130 ++start_remove;
131 for (map<loff_t, list<Context*> >::iterator p = start_remove;
132 p != left->waitfor_read.end(); ++p) {
133 ldout(oc->cct, 20) << "split moving waiters at byte " << p->first
134 << " to right bh" << dendl;
135 right->waitfor_read[p->first].swap( p->second );
136 assert(p->second.empty());
137 }
138 left->waitfor_read.erase(start_remove, left->waitfor_read.end());
139 }
140
141 ldout(oc->cct, 20) << "split left is " << *left << dendl;
142 ldout(oc->cct, 20) << "split right is " << *right << dendl;
143 return right;
144 }
145
146
147 void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
148 {
149 assert(oc->lock.is_locked());
150 assert(left->end() == right->start());
151 assert(left->get_state() == right->get_state());
152 assert(left->can_merge_journal(right));
153
154 ldout(oc->cct, 10) << "merge_left " << *left << " + " << *right << dendl;
155 if (left->get_journal_tid() == 0) {
156 left->set_journal_tid(right->get_journal_tid());
157 }
158 right->set_journal_tid(0);
159
160 oc->bh_remove(this, right);
161 oc->bh_stat_sub(left);
162 left->set_length(left->length() + right->length());
163 oc->bh_stat_add(left);
164
165 // data
166 left->bl.claim_append(right->bl);
167
168 // version
169 // note: this is sorta busted, but should only be used for dirty buffers
170 left->last_write_tid = MAX( left->last_write_tid, right->last_write_tid );
171 left->last_write = MAX( left->last_write, right->last_write );
172
173 left->set_dontneed(right->get_dontneed() ? left->get_dontneed() : false);
174 left->set_nocache(right->get_nocache() ? left->get_nocache() : false);
175
176 // waiters
177 for (map<loff_t, list<Context*> >::iterator p = right->waitfor_read.begin();
178 p != right->waitfor_read.end();
179 ++p)
180 left->waitfor_read[p->first].splice(left->waitfor_read[p->first].begin(),
181 p->second );
182
183 // hose right
184 delete right;
185
186 ldout(oc->cct, 10) << "merge_left result " << *left << dendl;
187 }
188
189 void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
190 {
191 assert(oc->lock.is_locked());
192 ldout(oc->cct, 10) << "try_merge_bh " << *bh << dendl;
193
194 // do not merge rx buffers; last_read_tid may not match
195 if (bh->is_rx())
196 return;
197
198 // to the left?
199 map<loff_t,BufferHead*>::iterator p = data.find(bh->start());
200 assert(p->second == bh);
201 if (p != data.begin()) {
202 --p;
203 if (p->second->end() == bh->start() &&
204 p->second->get_state() == bh->get_state() &&
205 p->second->can_merge_journal(bh)) {
206 merge_left(p->second, bh);
207 bh = p->second;
208 } else {
209 ++p;
210 }
211 }
212 // to the right?
213 assert(p->second == bh);
214 ++p;
215 if (p != data.end() &&
216 p->second->start() == bh->end() &&
217 p->second->get_state() == bh->get_state() &&
218 p->second->can_merge_journal(bh))
219 merge_left(bh, p->second);
220 }
221
222 /*
223 * count bytes we have cached in given range
224 */
225 bool ObjectCacher::Object::is_cached(loff_t cur, loff_t left) const
226 {
227 assert(oc->lock.is_locked());
228 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(cur);
229 while (left > 0) {
230 if (p == data.end())
231 return false;
232
233 if (p->first <= cur) {
234 // have part of it
235 loff_t lenfromcur = MIN(p->second->end() - cur, left);
236 cur += lenfromcur;
237 left -= lenfromcur;
238 ++p;
239 continue;
240 } else if (p->first > cur) {
241 // gap
242 return false;
243 } else
244 ceph_abort();
245 }
246
247 return true;
248 }
249
250 /*
251 * all cached data in this range[off, off+len]
252 */
253 bool ObjectCacher::Object::include_all_cached_data(loff_t off, loff_t len)
254 {
255 assert(oc->lock.is_locked());
256 if (data.empty())
257 return true;
258 map<loff_t, BufferHead*>::iterator first = data.begin();
259 map<loff_t, BufferHead*>::reverse_iterator last = data.rbegin();
260 if (first->second->start() >= off && last->second->end() <= (off + len))
261 return true;
262 else
263 return false;
264 }
265
266 /*
267 * map a range of bytes into buffer_heads.
268 * - create missing buffer_heads as necessary.
269 */
270 int ObjectCacher::Object::map_read(ObjectExtent &ex,
271 map<loff_t, BufferHead*>& hits,
272 map<loff_t, BufferHead*>& missing,
273 map<loff_t, BufferHead*>& rx,
274 map<loff_t, BufferHead*>& errors)
275 {
276 assert(oc->lock.is_locked());
277 ldout(oc->cct, 10) << "map_read " << ex.oid
278 << " " << ex.offset << "~" << ex.length
279 << dendl;
280
281 loff_t cur = ex.offset;
282 loff_t left = ex.length;
283
284 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
285 while (left > 0) {
286 // at end?
287 if (p == data.end()) {
288 // rest is a miss.
289 BufferHead *n = new BufferHead(this);
290 n->set_start(cur);
291 n->set_length(left);
292 oc->bh_add(this, n);
293 if (complete) {
294 oc->mark_zero(n);
295 hits[cur] = n;
296 ldout(oc->cct, 20) << "map_read miss+complete+zero " << left << " left, " << *n << dendl;
297 } else {
298 missing[cur] = n;
299 ldout(oc->cct, 20) << "map_read miss " << left << " left, " << *n << dendl;
300 }
301 cur += left;
302 assert(cur == (loff_t)ex.offset + (loff_t)ex.length);
303 break; // no more.
304 }
305
306 if (p->first <= cur) {
307 // have it (or part of it)
308 BufferHead *e = p->second;
309
310 if (e->is_clean() ||
311 e->is_dirty() ||
312 e->is_tx() ||
313 e->is_zero()) {
314 hits[cur] = e; // readable!
315 ldout(oc->cct, 20) << "map_read hit " << *e << dendl;
316 } else if (e->is_rx()) {
317 rx[cur] = e; // missing, not readable.
318 ldout(oc->cct, 20) << "map_read rx " << *e << dendl;
319 } else if (e->is_error()) {
320 errors[cur] = e;
321 ldout(oc->cct, 20) << "map_read error " << *e << dendl;
322 } else {
323 ceph_abort();
324 }
325
326 loff_t lenfromcur = MIN(e->end() - cur, left);
327 cur += lenfromcur;
328 left -= lenfromcur;
329 ++p;
330 continue; // more?
331
332 } else if (p->first > cur) {
333 // gap.. miss
334 loff_t next = p->first;
335 BufferHead *n = new BufferHead(this);
336 loff_t len = MIN(next - cur, left);
337 n->set_start(cur);
338 n->set_length(len);
339 oc->bh_add(this,n);
340 if (complete) {
341 oc->mark_zero(n);
342 hits[cur] = n;
343 ldout(oc->cct, 20) << "map_read gap+complete+zero " << *n << dendl;
344 } else {
345 missing[cur] = n;
346 ldout(oc->cct, 20) << "map_read gap " << *n << dendl;
347 }
348 cur += MIN(left, n->length());
349 left -= MIN(left, n->length());
350 continue; // more?
351 } else {
352 ceph_abort();
353 }
354 }
355 return 0;
356 }
357
358 void ObjectCacher::Object::audit_buffers()
359 {
360 loff_t offset = 0;
361 for (map<loff_t, BufferHead*>::const_iterator it = data.begin();
362 it != data.end(); ++it) {
363 if (it->first != it->second->start()) {
364 lderr(oc->cct) << "AUDIT FAILURE: map position " << it->first
365 << " does not match bh start position: "
366 << *it->second << dendl;
367 assert(it->first == it->second->start());
368 }
369 if (it->first < offset) {
370 lderr(oc->cct) << "AUDIT FAILURE: " << it->first << " " << *it->second
371 << " overlaps with previous bh " << *((--it)->second)
372 << dendl;
373 assert(it->first >= offset);
374 }
375 BufferHead *bh = it->second;
376 map<loff_t, list<Context*> >::const_iterator w_it;
377 for (w_it = bh->waitfor_read.begin();
378 w_it != bh->waitfor_read.end(); ++w_it) {
379 if (w_it->first < bh->start() ||
380 w_it->first >= bh->start() + bh->length()) {
381 lderr(oc->cct) << "AUDIT FAILURE: waiter at " << w_it->first
382 << " is not within bh " << *bh << dendl;
383 assert(w_it->first >= bh->start());
384 assert(w_it->first < bh->start() + bh->length());
385 }
386 }
387 offset = it->first + it->second->length();
388 }
389 }
390
391 /*
392 * map a range of extents on an object's buffer cache.
393 * - combine any bh's we're writing into one
394 * - break up bufferheads that don't fall completely within the range
395 * //no! - return a bh that includes the write. may also include
396 * other dirty data to left and/or right.
397 */
398 ObjectCacher::BufferHead *ObjectCacher::Object::map_write(ObjectExtent &ex,
399 ceph_tid_t tid)
400 {
401 assert(oc->lock.is_locked());
402 BufferHead *final = 0;
403
404 ldout(oc->cct, 10) << "map_write oex " << ex.oid
405 << " " << ex.offset << "~" << ex.length << dendl;
406
407 loff_t cur = ex.offset;
408 loff_t left = ex.length;
409
410 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
411 while (left > 0) {
412 loff_t max = left;
413
414 // at end ?
415 if (p == data.end()) {
416 if (final == NULL) {
417 final = new BufferHead(this);
418 replace_journal_tid(final, tid);
419 final->set_start( cur );
420 final->set_length( max );
421 oc->bh_add(this, final);
422 ldout(oc->cct, 10) << "map_write adding trailing bh " << *final << dendl;
423 } else {
424 oc->bh_stat_sub(final);
425 final->set_length(final->length() + max);
426 oc->bh_stat_add(final);
427 }
428 left -= max;
429 cur += max;
430 continue;
431 }
432
433 ldout(oc->cct, 10) << "cur is " << cur << ", p is " << *p->second << dendl;
434 //oc->verify_stats();
435
436 if (p->first <= cur) {
437 BufferHead *bh = p->second;
438 ldout(oc->cct, 10) << "map_write bh " << *bh << " intersected" << dendl;
439
440 if (p->first < cur) {
441 assert(final == 0);
442 if (cur + max >= bh->end()) {
443 // we want right bit (one splice)
444 final = split(bh, cur); // just split it, take right half.
445 replace_journal_tid(final, tid);
446 ++p;
447 assert(p->second == final);
448 } else {
449 // we want middle bit (two splices)
450 final = split(bh, cur);
451 ++p;
452 assert(p->second == final);
453 split(final, cur+max);
454 replace_journal_tid(final, tid);
455 }
456 } else {
457 assert(p->first == cur);
458 if (bh->length() <= max) {
459 // whole bufferhead, piece of cake.
460 } else {
461 // we want left bit (one splice)
462 split(bh, cur + max); // just split
463 }
464 if (final) {
465 oc->mark_dirty(bh);
466 oc->mark_dirty(final);
467 --p; // move iterator back to final
468 assert(p->second == final);
469 replace_journal_tid(bh, tid);
470 merge_left(final, bh);
471 } else {
472 final = bh;
473 replace_journal_tid(final, tid);
474 }
475 }
476
477 // keep going.
478 loff_t lenfromcur = final->end() - cur;
479 cur += lenfromcur;
480 left -= lenfromcur;
481 ++p;
482 continue;
483 } else {
484 // gap!
485 loff_t next = p->first;
486 loff_t glen = MIN(next - cur, max);
487 ldout(oc->cct, 10) << "map_write gap " << cur << "~" << glen << dendl;
488 if (final) {
489 oc->bh_stat_sub(final);
490 final->set_length(final->length() + glen);
491 oc->bh_stat_add(final);
492 } else {
493 final = new BufferHead(this);
494 replace_journal_tid(final, tid);
495 final->set_start( cur );
496 final->set_length( glen );
497 oc->bh_add(this, final);
498 }
499
500 cur += glen;
501 left -= glen;
502 continue; // more?
503 }
504 }
505
506 // set version
507 assert(final);
508 assert(final->get_journal_tid() == tid);
509 ldout(oc->cct, 10) << "map_write final is " << *final << dendl;
510
511 return final;
512 }
513
514 void ObjectCacher::Object::replace_journal_tid(BufferHead *bh,
515 ceph_tid_t tid) {
516 ceph_tid_t bh_tid = bh->get_journal_tid();
517
518 assert(tid == 0 || bh_tid <= tid);
519 if (bh_tid != 0 && bh_tid != tid) {
520 // inform journal that it should not expect a writeback from this extent
521 oc->writeback_handler.overwrite_extent(get_oid(), bh->start(),
522 bh->length(), bh_tid, tid);
523 }
524 bh->set_journal_tid(tid);
525 }
526
527 void ObjectCacher::Object::truncate(loff_t s)
528 {
529 assert(oc->lock.is_locked());
530 ldout(oc->cct, 10) << "truncate " << *this << " to " << s << dendl;
531
532 while (!data.empty()) {
533 BufferHead *bh = data.rbegin()->second;
534 if (bh->end() <= s)
535 break;
536
537 // split bh at truncation point?
538 if (bh->start() < s) {
539 split(bh, s);
540 continue;
541 }
542
543 // remove bh entirely
544 assert(bh->start() >= s);
545 assert(bh->waitfor_read.empty());
546 replace_journal_tid(bh, 0);
547 oc->bh_remove(this, bh);
548 delete bh;
549 }
550 }
551
552 void ObjectCacher::Object::discard(loff_t off, loff_t len)
553 {
554 assert(oc->lock.is_locked());
555 ldout(oc->cct, 10) << "discard " << *this << " " << off << "~" << len
556 << dendl;
557
558 if (!exists) {
559 ldout(oc->cct, 10) << " setting exists on " << *this << dendl;
560 exists = true;
561 }
562 if (complete) {
563 ldout(oc->cct, 10) << " clearing complete on " << *this << dendl;
564 complete = false;
565 }
566
567 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(off);
568 while (p != data.end()) {
569 BufferHead *bh = p->second;
570 if (bh->start() >= off + len)
571 break;
572
573 // split bh at truncation point?
574 if (bh->start() < off) {
575 split(bh, off);
576 ++p;
577 continue;
578 }
579
580 assert(bh->start() >= off);
581 if (bh->end() > off + len) {
582 split(bh, off + len);
583 }
584
585 ++p;
586 ldout(oc->cct, 10) << "discard " << *this << " bh " << *bh << dendl;
587 assert(bh->waitfor_read.empty());
588 replace_journal_tid(bh, 0);
589 oc->bh_remove(this, bh);
590 delete bh;
591 }
592 }
593
594
595
596 /*** ObjectCacher ***/
597
598 #undef dout_prefix
599 #define dout_prefix *_dout << "objectcacher "
600
601
602 ObjectCacher::ObjectCacher(CephContext *cct_, string name,
603 WritebackHandler& wb, Mutex& l,
604 flush_set_callback_t flush_callback,
605 void *flush_callback_arg, uint64_t max_bytes,
606 uint64_t max_objects, uint64_t max_dirty,
607 uint64_t target_dirty, double max_dirty_age,
608 bool block_writes_upfront)
609 : perfcounter(NULL),
610 cct(cct_), writeback_handler(wb), name(name), lock(l),
611 max_dirty(max_dirty), target_dirty(target_dirty),
612 max_size(max_bytes), max_objects(max_objects),
613 max_dirty_age(ceph::make_timespan(max_dirty_age)),
614 block_writes_upfront(block_writes_upfront),
615 flush_set_callback(flush_callback),
616 flush_set_callback_arg(flush_callback_arg),
617 last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct),
618 stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
619 stat_missing(0), stat_error(0), stat_dirty_waiting(0), reads_outstanding(0)
620 {
621 perf_start();
622 finisher.start();
623 scattered_write = writeback_handler.can_scattered_write();
624 }
625
626 ObjectCacher::~ObjectCacher()
627 {
628 finisher.stop();
629 perf_stop();
630 // we should be empty.
631 for (vector<ceph::unordered_map<sobject_t, Object *> >::iterator i
632 = objects.begin();
633 i != objects.end();
634 ++i)
635 assert(i->empty());
636 assert(bh_lru_rest.lru_get_size() == 0);
637 assert(bh_lru_dirty.lru_get_size() == 0);
638 assert(ob_lru.lru_get_size() == 0);
639 assert(dirty_or_tx_bh.empty());
640 }
641
642 void ObjectCacher::perf_start()
643 {
644 string n = "objectcacher-" + name;
645 PerfCountersBuilder plb(cct, n, l_objectcacher_first, l_objectcacher_last);
646
647 plb.add_u64_counter(l_objectcacher_cache_ops_hit,
648 "cache_ops_hit", "Hit operations");
649 plb.add_u64_counter(l_objectcacher_cache_ops_miss,
650 "cache_ops_miss", "Miss operations");
651 plb.add_u64_counter(l_objectcacher_cache_bytes_hit,
652 "cache_bytes_hit", "Hit data");
653 plb.add_u64_counter(l_objectcacher_cache_bytes_miss,
654 "cache_bytes_miss", "Miss data");
655 plb.add_u64_counter(l_objectcacher_data_read,
656 "data_read", "Read data");
657 plb.add_u64_counter(l_objectcacher_data_written,
658 "data_written", "Data written to cache");
659 plb.add_u64_counter(l_objectcacher_data_flushed,
660 "data_flushed", "Data flushed");
661 plb.add_u64_counter(l_objectcacher_overwritten_in_flush,
662 "data_overwritten_while_flushing",
663 "Data overwritten while flushing");
664 plb.add_u64_counter(l_objectcacher_write_ops_blocked, "write_ops_blocked",
665 "Write operations, delayed due to dirty limits");
666 plb.add_u64_counter(l_objectcacher_write_bytes_blocked,
667 "write_bytes_blocked",
668 "Write data blocked on dirty limit");
669 plb.add_time(l_objectcacher_write_time_blocked, "write_time_blocked",
670 "Time spent blocking a write due to dirty limits");
671
672 perfcounter = plb.create_perf_counters();
673 cct->get_perfcounters_collection()->add(perfcounter);
674 }
675
676 void ObjectCacher::perf_stop()
677 {
678 assert(perfcounter);
679 cct->get_perfcounters_collection()->remove(perfcounter);
680 delete perfcounter;
681 }
682
683 /* private */
684 ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid,
685 uint64_t object_no,
686 ObjectSet *oset,
687 object_locator_t &l,
688 uint64_t truncate_size,
689 uint64_t truncate_seq)
690 {
691 // XXX: Add handling of nspace in object_locator_t in cache
692 assert(lock.is_locked());
693 // have it?
694 if ((uint32_t)l.pool < objects.size()) {
695 if (objects[l.pool].count(oid)) {
696 Object *o = objects[l.pool][oid];
697 o->object_no = object_no;
698 o->truncate_size = truncate_size;
699 o->truncate_seq = truncate_seq;
700 return o;
701 }
702 } else {
703 objects.resize(l.pool+1);
704 }
705
706 // create it.
707 Object *o = new Object(this, oid, object_no, oset, l, truncate_size,
708 truncate_seq);
709 objects[l.pool][oid] = o;
710 ob_lru.lru_insert_top(o);
711 return o;
712 }
713
714 void ObjectCacher::close_object(Object *ob)
715 {
716 assert(lock.is_locked());
717 ldout(cct, 10) << "close_object " << *ob << dendl;
718 assert(ob->can_close());
719
720 // ok!
721 ob_lru.lru_remove(ob);
722 objects[ob->oloc.pool].erase(ob->get_soid());
723 ob->set_item.remove_myself();
724 delete ob;
725 }
726
727 void ObjectCacher::bh_read(BufferHead *bh, int op_flags)
728 {
729 assert(lock.is_locked());
730 ldout(cct, 7) << "bh_read on " << *bh << " outstanding reads "
731 << reads_outstanding << dendl;
732
733 mark_rx(bh);
734 bh->last_read_tid = ++last_read_tid;
735
736 // finisher
737 C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob, bh->last_read_tid,
738 bh->start(), bh->length());
739 // go
740 writeback_handler.read(bh->ob->get_oid(), bh->ob->get_object_number(),
741 bh->ob->get_oloc(), bh->start(), bh->length(),
742 bh->ob->get_snap(), &onfinish->bl,
743 bh->ob->truncate_size, bh->ob->truncate_seq,
744 op_flags, onfinish);
745
746 ++reads_outstanding;
747 }
748
749 void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid,
750 ceph_tid_t tid, loff_t start,
751 uint64_t length, bufferlist &bl, int r,
752 bool trust_enoent)
753 {
754 assert(lock.is_locked());
755 ldout(cct, 7) << "bh_read_finish "
756 << oid
757 << " tid " << tid
758 << " " << start << "~" << length
759 << " (bl is " << bl.length() << ")"
760 << " returned " << r
761 << " outstanding reads " << reads_outstanding
762 << dendl;
763
764 if (r >= 0 && bl.length() < length) {
765 ldout(cct, 7) << "bh_read_finish " << oid << " padding " << start << "~"
766 << length << " with " << length - bl.length() << " bytes of zeroes"
767 << dendl;
768 bl.append_zero(length - bl.length());
769 }
770
771 list<Context*> ls;
772 int err = 0;
773
774 if (objects[poolid].count(oid) == 0) {
775 ldout(cct, 7) << "bh_read_finish no object cache" << dendl;
776 } else {
777 Object *ob = objects[poolid][oid];
778
779 if (r == -ENOENT && !ob->complete) {
780 // wake up *all* rx waiters, or else we risk reordering
781 // identical reads. e.g.
782 // read 1~1
783 // reply to unrelated 3~1 -> !exists
784 // read 1~1 -> immediate ENOENT
785 // reply to first 1~1 -> ooo ENOENT
786 bool allzero = true;
787 for (map<loff_t, BufferHead*>::iterator p = ob->data.begin();
788 p != ob->data.end(); ++p) {
789 BufferHead *bh = p->second;
790 for (map<loff_t, list<Context*> >::iterator p
791 = bh->waitfor_read.begin();
792 p != bh->waitfor_read.end();
793 ++p)
794 ls.splice(ls.end(), p->second);
795 bh->waitfor_read.clear();
796 if (!bh->is_zero() && !bh->is_rx())
797 allzero = false;
798 }
799
800 // just pass through and retry all waiters if we don't trust
801 // -ENOENT for this read
802 if (trust_enoent) {
803 ldout(cct, 7)
804 << "bh_read_finish ENOENT, marking complete and !exists on " << *ob
805 << dendl;
806 ob->complete = true;
807 ob->exists = false;
808
809 /* If all the bhs are effectively zero, get rid of them. All
810 * the waiters will be retried and get -ENOENT immediately, so
811 * it's safe to clean up the unneeded bh's now. Since we know
812 * it's safe to remove them now, do so, so they aren't hanging
813 *around waiting for more -ENOENTs from rados while the cache
814 * is being shut down.
815 *
816 * Only do this when all the bhs are rx or clean, to match the
817 * condition in _readx(). If there are any non-rx or non-clean
818 * bhs, _readx() will wait for the final result instead of
819 * returning -ENOENT immediately.
820 */
821 if (allzero) {
822 ldout(cct, 10)
823 << "bh_read_finish ENOENT and allzero, getting rid of "
824 << "bhs for " << *ob << dendl;
825 map<loff_t, BufferHead*>::iterator p = ob->data.begin();
826 while (p != ob->data.end()) {
827 BufferHead *bh = p->second;
828 // current iterator will be invalidated by bh_remove()
829 ++p;
830 bh_remove(ob, bh);
831 delete bh;
832 }
833 }
834 }
835 }
836
837 // apply to bh's!
838 loff_t opos = start;
839 while (true) {
840 map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(opos);
841 if (p == ob->data.end())
842 break;
843 if (opos >= start+(loff_t)length) {
844 ldout(cct, 20) << "break due to opos " << opos << " >= start+length "
845 << start << "+" << length << "=" << start+(loff_t)length
846 << dendl;
847 break;
848 }
849
850 BufferHead *bh = p->second;
851 ldout(cct, 20) << "checking bh " << *bh << dendl;
852
853 // finishers?
854 for (map<loff_t, list<Context*> >::iterator it
855 = bh->waitfor_read.begin();
856 it != bh->waitfor_read.end();
857 ++it)
858 ls.splice(ls.end(), it->second);
859 bh->waitfor_read.clear();
860
861 if (bh->start() > opos) {
862 ldout(cct, 1) << "bh_read_finish skipping gap "
863 << opos << "~" << bh->start() - opos
864 << dendl;
865 opos = bh->start();
866 continue;
867 }
868
869 if (!bh->is_rx()) {
870 ldout(cct, 10) << "bh_read_finish skipping non-rx " << *bh << dendl;
871 opos = bh->end();
872 continue;
873 }
874
875 if (bh->last_read_tid != tid) {
876 ldout(cct, 10) << "bh_read_finish bh->last_read_tid "
877 << bh->last_read_tid << " != tid " << tid
878 << ", skipping" << dendl;
879 opos = bh->end();
880 continue;
881 }
882
883 assert(opos >= bh->start());
884 assert(bh->start() == opos); // we don't merge rx bh's... yet!
885 assert(bh->length() <= start+(loff_t)length-opos);
886
887 if (bh->error < 0)
888 err = bh->error;
889
890 opos = bh->end();
891
892 if (r == -ENOENT) {
893 if (trust_enoent) {
894 ldout(cct, 10) << "bh_read_finish removing " << *bh << dendl;
895 bh_remove(ob, bh);
896 delete bh;
897 } else {
898 ldout(cct, 10) << "skipping unstrusted -ENOENT and will retry for "
899 << *bh << dendl;
900 }
901 continue;
902 }
903
904 if (r < 0) {
905 bh->error = r;
906 mark_error(bh);
907 } else {
908 bh->bl.substr_of(bl,
909 bh->start() - start,
910 bh->length());
911 mark_clean(bh);
912 }
913
914 ldout(cct, 10) << "bh_read_finish read " << *bh << dendl;
915
916 ob->try_merge_bh(bh);
917 }
918 }
919
920 // called with lock held.
921 ldout(cct, 20) << "finishing waiters " << ls << dendl;
922
923 finish_contexts(cct, ls, err);
924 retry_waiting_reads();
925
926 --reads_outstanding;
927 read_cond.Signal();
928 }
929
930 void ObjectCacher::bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
931 int64_t *max_amount, int *max_count)
932 {
933 list<BufferHead*> blist;
934
935 int count = 0;
936 int64_t total_len = 0;
937 set<BufferHead*, BufferHead::ptr_lt>::iterator it = dirty_or_tx_bh.find(bh);
938 assert(it != dirty_or_tx_bh.end());
939 for (set<BufferHead*, BufferHead::ptr_lt>::iterator p = it;
940 p != dirty_or_tx_bh.end();
941 ++p) {
942 BufferHead *obh = *p;
943 if (obh->ob != bh->ob)
944 break;
945 if (obh->is_dirty() && obh->last_write <= cutoff) {
946 blist.push_back(obh);
947 ++count;
948 total_len += obh->length();
949 if ((max_count && count > *max_count) ||
950 (max_amount && total_len > *max_amount))
951 break;
952 }
953 }
954
955 while (it != dirty_or_tx_bh.begin()) {
956 --it;
957 BufferHead *obh = *it;
958 if (obh->ob != bh->ob)
959 break;
960 if (obh->is_dirty() && obh->last_write <= cutoff) {
961 blist.push_front(obh);
962 ++count;
963 total_len += obh->length();
964 if ((max_count && count > *max_count) ||
965 (max_amount && total_len > *max_amount))
966 break;
967 }
968 }
969 if (max_count)
970 *max_count -= count;
971 if (max_amount)
972 *max_amount -= total_len;
973
974 bh_write_scattered(blist);
975 }
976
977 class ObjectCacher::C_WriteCommit : public Context {
978 ObjectCacher *oc;
979 int64_t poolid;
980 sobject_t oid;
981 vector<pair<loff_t, uint64_t> > ranges;
982 public:
983 ceph_tid_t tid;
984 C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, loff_t s,
985 uint64_t l) :
986 oc(c), poolid(_poolid), oid(o), tid(0) {
987 ranges.push_back(make_pair(s, l));
988 }
989 C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o,
990 vector<pair<loff_t, uint64_t> >& _ranges) :
991 oc(c), poolid(_poolid), oid(o), tid(0) {
992 ranges.swap(_ranges);
993 }
994 void finish(int r) override {
995 oc->bh_write_commit(poolid, oid, ranges, tid, r);
996 }
997 };
998 void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
999 {
1000 assert(lock.is_locked());
1001
1002 Object *ob = blist.front()->ob;
1003 ob->get();
1004
1005 ceph::real_time last_write;
1006 SnapContext snapc;
1007 vector<pair<loff_t, uint64_t> > ranges;
1008 vector<pair<uint64_t, bufferlist> > io_vec;
1009
1010 ranges.reserve(blist.size());
1011 io_vec.reserve(blist.size());
1012
1013 uint64_t total_len = 0;
1014 for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1015 BufferHead *bh = *p;
1016 ldout(cct, 7) << "bh_write_scattered " << *bh << dendl;
1017 assert(bh->ob == ob);
1018 assert(bh->bl.length() == bh->length());
1019 ranges.push_back(pair<loff_t, uint64_t>(bh->start(), bh->length()));
1020
1021 int n = io_vec.size();
1022 io_vec.resize(n + 1);
1023 io_vec[n].first = bh->start();
1024 io_vec[n].second = bh->bl;
1025
1026 total_len += bh->length();
1027 if (bh->snapc.seq > snapc.seq)
1028 snapc = bh->snapc;
1029 if (bh->last_write > last_write)
1030 last_write = bh->last_write;
1031 }
1032
1033 C_WriteCommit *oncommit = new C_WriteCommit(this, ob->oloc.pool, ob->get_soid(), ranges);
1034
1035 ceph_tid_t tid = writeback_handler.write(ob->get_oid(), ob->get_oloc(),
1036 io_vec, snapc, last_write,
1037 ob->truncate_size, ob->truncate_seq,
1038 oncommit);
1039 oncommit->tid = tid;
1040 ob->last_write_tid = tid;
1041 for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1042 BufferHead *bh = *p;
1043 bh->last_write_tid = tid;
1044 mark_tx(bh);
1045 }
1046
1047 if (perfcounter)
1048 perfcounter->inc(l_objectcacher_data_flushed, total_len);
1049 }
1050
1051 void ObjectCacher::bh_write(BufferHead *bh)
1052 {
1053 assert(lock.is_locked());
1054 ldout(cct, 7) << "bh_write " << *bh << dendl;
1055
1056 bh->ob->get();
1057
1058 // finishers
1059 C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->oloc.pool,
1060 bh->ob->get_soid(), bh->start(),
1061 bh->length());
1062 // go
1063 ceph_tid_t tid = writeback_handler.write(bh->ob->get_oid(),
1064 bh->ob->get_oloc(),
1065 bh->start(), bh->length(),
1066 bh->snapc, bh->bl, bh->last_write,
1067 bh->ob->truncate_size,
1068 bh->ob->truncate_seq,
1069 bh->journal_tid, oncommit);
1070 ldout(cct, 20) << " tid " << tid << " on " << bh->ob->get_oid() << dendl;
1071
1072 // set bh last_write_tid
1073 oncommit->tid = tid;
1074 bh->ob->last_write_tid = tid;
1075 bh->last_write_tid = tid;
1076
1077 if (perfcounter) {
1078 perfcounter->inc(l_objectcacher_data_flushed, bh->length());
1079 }
1080
1081 mark_tx(bh);
1082 }
1083
1084 void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid,
1085 vector<pair<loff_t, uint64_t> >& ranges,
1086 ceph_tid_t tid, int r)
1087 {
1088 assert(lock.is_locked());
1089 ldout(cct, 7) << "bh_write_commit " << oid << " tid " << tid
1090 << " ranges " << ranges << " returned " << r << dendl;
1091
1092 if (objects[poolid].count(oid) == 0) {
1093 ldout(cct, 7) << "bh_write_commit no object cache" << dendl;
1094 return;
1095 }
1096
1097 Object *ob = objects[poolid][oid];
1098 int was_dirty_or_tx = ob->oset->dirty_or_tx;
1099
1100 for (vector<pair<loff_t, uint64_t> >::iterator p = ranges.begin();
1101 p != ranges.end();
1102 ++p) {
1103 loff_t start = p->first;
1104 uint64_t length = p->second;
1105 if (!ob->exists) {
1106 ldout(cct, 10) << "bh_write_commit marking exists on " << *ob << dendl;
1107 ob->exists = true;
1108
1109 if (writeback_handler.may_copy_on_write(ob->get_oid(), start, length,
1110 ob->get_snap())) {
1111 ldout(cct, 10) << "bh_write_commit may copy on write, clearing "
1112 "complete on " << *ob << dendl;
1113 ob->complete = false;
1114 }
1115 }
1116
1117 vector<pair<loff_t, BufferHead*>> hit;
1118 // apply to bh's!
1119 for (map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(start);
1120 p != ob->data.end();
1121 ++p) {
1122 BufferHead *bh = p->second;
1123
1124 if (bh->start() > start+(loff_t)length)
1125 break;
1126
1127 if (bh->start() < start &&
1128 bh->end() > start+(loff_t)length) {
1129 ldout(cct, 20) << "bh_write_commit skipping " << *bh << dendl;
1130 continue;
1131 }
1132
1133 // make sure bh is tx
1134 if (!bh->is_tx()) {
1135 ldout(cct, 10) << "bh_write_commit skipping non-tx " << *bh << dendl;
1136 continue;
1137 }
1138
1139 // make sure bh tid matches
1140 if (bh->last_write_tid != tid) {
1141 assert(bh->last_write_tid > tid);
1142 ldout(cct, 10) << "bh_write_commit newer tid on " << *bh << dendl;
1143 continue;
1144 }
1145
1146 if (r >= 0) {
1147 // ok! mark bh clean and error-free
1148 mark_clean(bh);
1149 bh->set_journal_tid(0);
1150 if (bh->get_nocache())
1151 bh_lru_rest.lru_bottouch(bh);
1152 hit.push_back(make_pair(bh->start(), bh));
1153 ldout(cct, 10) << "bh_write_commit clean " << *bh << dendl;
1154 } else {
1155 mark_dirty(bh);
1156 ldout(cct, 10) << "bh_write_commit marking dirty again due to error "
1157 << *bh << " r = " << r << " " << cpp_strerror(-r)
1158 << dendl;
1159 }
1160 }
1161
1162 for (auto& p : hit) {
1163 //p.second maybe merged and deleted in merge_left
1164 if (ob->data.count(p.first))
1165 ob->try_merge_bh(p.second);
1166 }
1167 }
1168
1169 // update last_commit.
1170 assert(ob->last_commit_tid < tid);
1171 ob->last_commit_tid = tid;
1172
1173 // waiters?
1174 list<Context*> ls;
1175 if (ob->waitfor_commit.count(tid)) {
1176 ls.splice(ls.begin(), ob->waitfor_commit[tid]);
1177 ob->waitfor_commit.erase(tid);
1178 }
1179
1180 // is the entire object set now clean and fully committed?
1181 ObjectSet *oset = ob->oset;
1182 ob->put();
1183
1184 if (flush_set_callback &&
1185 was_dirty_or_tx > 0 &&
1186 oset->dirty_or_tx == 0) { // nothing dirty/tx
1187 flush_set_callback(flush_set_callback_arg, oset);
1188 }
1189
1190 if (!ls.empty())
1191 finish_contexts(cct, ls, r);
1192 }
1193
1194 void ObjectCacher::flush(loff_t amount)
1195 {
1196 assert(lock.is_locked());
1197 ceph::real_time cutoff = ceph::real_clock::now();
1198
1199 ldout(cct, 10) << "flush " << amount << dendl;
1200
1201 /*
1202 * NOTE: we aren't actually pulling things off the LRU here, just
1203 * looking at the tail item. Then we call bh_write, which moves it
1204 * to the other LRU, so that we can call
1205 * lru_dirty.lru_get_next_expire() again.
1206 */
1207 int64_t left = amount;
1208 while (amount == 0 || left > 0) {
1209 BufferHead *bh = static_cast<BufferHead*>(
1210 bh_lru_dirty.lru_get_next_expire());
1211 if (!bh) break;
1212 if (bh->last_write > cutoff) break;
1213
1214 if (scattered_write) {
1215 bh_write_adjacencies(bh, cutoff, amount > 0 ? &left : NULL, NULL);
1216 } else {
1217 left -= bh->length();
1218 bh_write(bh);
1219 }
1220 }
1221 }
1222
1223
1224 void ObjectCacher::trim()
1225 {
1226 assert(lock.is_locked());
1227 ldout(cct, 10) << "trim start: bytes: max " << max_size << " clean "
1228 << get_stat_clean() << ", objects: max " << max_objects
1229 << " current " << ob_lru.lru_get_size() << dendl;
1230
1231 while (get_stat_clean() > 0 && (uint64_t) get_stat_clean() > max_size) {
1232 BufferHead *bh = static_cast<BufferHead*>(bh_lru_rest.lru_expire());
1233 if (!bh)
1234 break;
1235
1236 ldout(cct, 10) << "trim trimming " << *bh << dendl;
1237 assert(bh->is_clean() || bh->is_zero() || bh->is_error());
1238
1239 Object *ob = bh->ob;
1240 bh_remove(ob, bh);
1241 delete bh;
1242
1243 if (ob->complete) {
1244 ldout(cct, 10) << "trim clearing complete on " << *ob << dendl;
1245 ob->complete = false;
1246 }
1247 }
1248
1249 while (ob_lru.lru_get_size() > max_objects) {
1250 Object *ob = static_cast<Object*>(ob_lru.lru_expire());
1251 if (!ob)
1252 break;
1253
1254 ldout(cct, 10) << "trim trimming " << *ob << dendl;
1255 close_object(ob);
1256 }
1257
1258 ldout(cct, 10) << "trim finish: max " << max_size << " clean "
1259 << get_stat_clean() << ", objects: max " << max_objects
1260 << " current " << ob_lru.lru_get_size() << dendl;
1261 }
1262
1263
1264
1265 /* public */
1266
1267 bool ObjectCacher::is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
1268 snapid_t snapid)
1269 {
1270 assert(lock.is_locked());
1271 for (vector<ObjectExtent>::iterator ex_it = extents.begin();
1272 ex_it != extents.end();
1273 ++ex_it) {
1274 ldout(cct, 10) << "is_cached " << *ex_it << dendl;
1275
1276 // get Object cache
1277 sobject_t soid(ex_it->oid, snapid);
1278 Object *o = get_object_maybe(soid, ex_it->oloc);
1279 if (!o)
1280 return false;
1281 if (!o->is_cached(ex_it->offset, ex_it->length))
1282 return false;
1283 }
1284 return true;
1285 }
1286
1287
1288 /*
1289 * returns # bytes read (if in cache). onfinish is untouched (caller
1290 * must delete it)
1291 * returns 0 if doing async read
1292 */
1293 int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish)
1294 {
1295 return _readx(rd, oset, onfinish, true);
1296 }
1297
1298 int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
1299 bool external_call)
1300 {
1301 assert(lock.is_locked());
1302 bool success = true;
1303 int error = 0;
1304 uint64_t bytes_in_cache = 0;
1305 uint64_t bytes_not_in_cache = 0;
1306 uint64_t total_bytes_read = 0;
1307 map<uint64_t, bufferlist> stripe_map; // final buffer offset -> substring
1308 bool dontneed = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1309 bool nocache = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1310
1311 /*
1312 * WARNING: we can only meaningfully return ENOENT if the read request
1313 * passed in a single ObjectExtent. Any caller who wants ENOENT instead of
1314 * zeroed buffers needs to feed single extents into readx().
1315 */
1316 assert(!oset->return_enoent || rd->extents.size() == 1);
1317
1318 for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
1319 ex_it != rd->extents.end();
1320 ++ex_it) {
1321 ldout(cct, 10) << "readx " << *ex_it << dendl;
1322
1323 total_bytes_read += ex_it->length;
1324
1325 // get Object cache
1326 sobject_t soid(ex_it->oid, rd->snap);
1327 Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1328 ex_it->truncate_size, oset->truncate_seq);
1329 if (external_call)
1330 touch_ob(o);
1331
1332 // does not exist and no hits?
1333 if (oset->return_enoent && !o->exists) {
1334 ldout(cct, 10) << "readx object !exists, 1 extent..." << dendl;
1335
1336 // should we worry about COW underneath us?
1337 if (writeback_handler.may_copy_on_write(soid.oid, ex_it->offset,
1338 ex_it->length, soid.snap)) {
1339 ldout(cct, 20) << "readx may copy on write" << dendl;
1340 bool wait = false;
1341 list<BufferHead*> blist;
1342 for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1343 bh_it != o->data.end();
1344 ++bh_it) {
1345 BufferHead *bh = bh_it->second;
1346 if (bh->is_dirty() || bh->is_tx()) {
1347 ldout(cct, 10) << "readx flushing " << *bh << dendl;
1348 wait = true;
1349 if (bh->is_dirty()) {
1350 if (scattered_write)
1351 blist.push_back(bh);
1352 else
1353 bh_write(bh);
1354 }
1355 }
1356 }
1357 if (scattered_write && !blist.empty())
1358 bh_write_scattered(blist);
1359 if (wait) {
1360 ldout(cct, 10) << "readx waiting on tid " << o->last_write_tid
1361 << " on " << *o << dendl;
1362 o->waitfor_commit[o->last_write_tid].push_back(
1363 new C_RetryRead(this,rd, oset, onfinish));
1364 // FIXME: perfcounter!
1365 return 0;
1366 }
1367 }
1368
1369 // can we return ENOENT?
1370 bool allzero = true;
1371 for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1372 bh_it != o->data.end();
1373 ++bh_it) {
1374 ldout(cct, 20) << "readx ob has bh " << *bh_it->second << dendl;
1375 if (!bh_it->second->is_zero() && !bh_it->second->is_rx()) {
1376 allzero = false;
1377 break;
1378 }
1379 }
1380 if (allzero) {
1381 ldout(cct, 10) << "readx ob has all zero|rx, returning ENOENT"
1382 << dendl;
1383 delete rd;
1384 if (dontneed)
1385 bottouch_ob(o);
1386 return -ENOENT;
1387 }
1388 }
1389
1390 // map extent into bufferheads
1391 map<loff_t, BufferHead*> hits, missing, rx, errors;
1392 o->map_read(*ex_it, hits, missing, rx, errors);
1393 if (external_call) {
1394 // retry reading error buffers
1395 missing.insert(errors.begin(), errors.end());
1396 } else {
1397 // some reads had errors, fail later so completions
1398 // are cleaned up properly
1399 // TODO: make read path not call _readx for every completion
1400 hits.insert(errors.begin(), errors.end());
1401 }
1402
1403 if (!missing.empty() || !rx.empty()) {
1404 // read missing
1405 map<loff_t, BufferHead*>::iterator last = missing.end();
1406 for (map<loff_t, BufferHead*>::iterator bh_it = missing.begin();
1407 bh_it != missing.end();
1408 ++bh_it) {
1409 uint64_t rx_bytes = static_cast<uint64_t>(
1410 stat_rx + bh_it->second->length());
1411 bytes_not_in_cache += bh_it->second->length();
1412 if (!waitfor_read.empty() || (stat_rx > 0 && rx_bytes > max_size)) {
1413 // cache is full with concurrent reads -- wait for rx's to complete
1414 // to constrain memory growth (especially during copy-ups)
1415 if (success) {
1416 ldout(cct, 10) << "readx missed, waiting on cache to complete "
1417 << waitfor_read.size() << " blocked reads, "
1418 << (MAX(rx_bytes, max_size) - max_size)
1419 << " read bytes" << dendl;
1420 waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish));
1421 }
1422
1423 bh_remove(o, bh_it->second);
1424 delete bh_it->second;
1425 } else {
1426 bh_it->second->set_nocache(nocache);
1427 bh_read(bh_it->second, rd->fadvise_flags);
1428 if ((success && onfinish) || last != missing.end())
1429 last = bh_it;
1430 }
1431 success = false;
1432 }
1433
1434 //add wait in last bh avoid wakeup early. Because read is order
1435 if (last != missing.end()) {
1436 ldout(cct, 10) << "readx missed, waiting on " << *last->second
1437 << " off " << last->first << dendl;
1438 last->second->waitfor_read[last->first].push_back(
1439 new C_RetryRead(this, rd, oset, onfinish) );
1440
1441 }
1442
1443 // bump rx
1444 for (map<loff_t, BufferHead*>::iterator bh_it = rx.begin();
1445 bh_it != rx.end();
1446 ++bh_it) {
1447 touch_bh(bh_it->second); // bump in lru, so we don't lose it.
1448 if (success && onfinish) {
1449 ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
1450 << " off " << bh_it->first << dendl;
1451 bh_it->second->waitfor_read[bh_it->first].push_back(
1452 new C_RetryRead(this, rd, oset, onfinish) );
1453 }
1454 bytes_not_in_cache += bh_it->second->length();
1455 success = false;
1456 }
1457
1458 for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1459 bh_it != hits.end(); ++bh_it)
1460 //bump in lru, so we don't lose it when later read
1461 touch_bh(bh_it->second);
1462
1463 } else {
1464 assert(!hits.empty());
1465
1466 // make a plain list
1467 for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1468 bh_it != hits.end();
1469 ++bh_it) {
1470 BufferHead *bh = bh_it->second;
1471 ldout(cct, 10) << "readx hit bh " << *bh << dendl;
1472 if (bh->is_error() && bh->error)
1473 error = bh->error;
1474 bytes_in_cache += bh->length();
1475
1476 if (bh->get_nocache() && bh->is_clean())
1477 bh_lru_rest.lru_bottouch(bh);
1478 else
1479 touch_bh(bh);
1480 //must be after touch_bh because touch_bh set dontneed false
1481 if (dontneed &&
1482 ((loff_t)ex_it->offset <= bh->start() &&
1483 (bh->end() <=(loff_t)(ex_it->offset + ex_it->length)))) {
1484 bh->set_dontneed(true); //if dirty
1485 if (bh->is_clean())
1486 bh_lru_rest.lru_bottouch(bh);
1487 }
1488 }
1489
1490 if (!error) {
1491 // create reverse map of buffer offset -> object for the
1492 // eventual result. this is over a single ObjectExtent, so we
1493 // know that
1494 // - the bh's are contiguous
1495 // - the buffer frags need not be (and almost certainly aren't)
1496 loff_t opos = ex_it->offset;
1497 map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1498 assert(bh_it->second->start() <= opos);
1499 uint64_t bhoff = opos - bh_it->second->start();
1500 vector<pair<uint64_t,uint64_t> >::iterator f_it
1501 = ex_it->buffer_extents.begin();
1502 uint64_t foff = 0;
1503 while (1) {
1504 BufferHead *bh = bh_it->second;
1505 assert(opos == (loff_t)(bh->start() + bhoff));
1506
1507 uint64_t len = MIN(f_it->second - foff, bh->length() - bhoff);
1508 ldout(cct, 10) << "readx rmap opos " << opos << ": " << *bh << " +"
1509 << bhoff << " frag " << f_it->first << "~"
1510 << f_it->second << " +" << foff << "~" << len
1511 << dendl;
1512
1513 bufferlist bit;
1514 // put substr here first, since substr_of clobbers, and we
1515 // may get multiple bh's at this stripe_map position
1516 if (bh->is_zero()) {
1517 stripe_map[f_it->first].append_zero(len);
1518 } else {
1519 bit.substr_of(bh->bl,
1520 opos - bh->start(),
1521 len);
1522 stripe_map[f_it->first].claim_append(bit);
1523 }
1524
1525 opos += len;
1526 bhoff += len;
1527 foff += len;
1528 if (opos == bh->end()) {
1529 ++bh_it;
1530 bhoff = 0;
1531 }
1532 if (foff == f_it->second) {
1533 ++f_it;
1534 foff = 0;
1535 }
1536 if (bh_it == hits.end()) break;
1537 if (f_it == ex_it->buffer_extents.end())
1538 break;
1539 }
1540 assert(f_it == ex_it->buffer_extents.end());
1541 assert(opos == (loff_t)ex_it->offset + (loff_t)ex_it->length);
1542 }
1543
1544 if (dontneed && o->include_all_cached_data(ex_it->offset, ex_it->length))
1545 bottouch_ob(o);
1546 }
1547 }
1548
1549 if (!success) {
1550 if (perfcounter && external_call) {
1551 perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1552 perfcounter->inc(l_objectcacher_cache_bytes_miss, bytes_not_in_cache);
1553 perfcounter->inc(l_objectcacher_cache_ops_miss);
1554 }
1555 if (onfinish) {
1556 ldout(cct, 20) << "readx defer " << rd << dendl;
1557 } else {
1558 ldout(cct, 20) << "readx drop " << rd << " (no complete, but no waiter)"
1559 << dendl;
1560 delete rd;
1561 }
1562 return 0; // wait!
1563 }
1564 if (perfcounter && external_call) {
1565 perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1566 perfcounter->inc(l_objectcacher_cache_bytes_hit, bytes_in_cache);
1567 perfcounter->inc(l_objectcacher_cache_ops_hit);
1568 }
1569
1570 // no misses... success! do the read.
1571 ldout(cct, 10) << "readx has all buffers" << dendl;
1572
1573 // ok, assemble into result buffer.
1574 uint64_t pos = 0;
1575 if (rd->bl && !error) {
1576 rd->bl->clear();
1577 for (map<uint64_t,bufferlist>::iterator i = stripe_map.begin();
1578 i != stripe_map.end();
1579 ++i) {
1580 assert(pos == i->first);
1581 ldout(cct, 10) << "readx adding buffer len " << i->second.length()
1582 << " at " << pos << dendl;
1583 pos += i->second.length();
1584 rd->bl->claim_append(i->second);
1585 assert(rd->bl->length() == pos);
1586 }
1587 ldout(cct, 10) << "readx result is " << rd->bl->length() << dendl;
1588 } else if (!error) {
1589 ldout(cct, 10) << "readx no bufferlist ptr (readahead?), done." << dendl;
1590 map<uint64_t,bufferlist>::reverse_iterator i = stripe_map.rbegin();
1591 pos = i->first + i->second.length();
1592 }
1593
1594 // done with read.
1595 int ret = error ? error : pos;
1596 ldout(cct, 20) << "readx done " << rd << " " << ret << dendl;
1597 assert(pos <= (uint64_t) INT_MAX);
1598
1599 delete rd;
1600
1601 trim();
1602
1603 return ret;
1604 }
1605
1606 void ObjectCacher::retry_waiting_reads()
1607 {
1608 list<Context *> ls;
1609 ls.swap(waitfor_read);
1610
1611 while (!ls.empty() && waitfor_read.empty()) {
1612 Context *ctx = ls.front();
1613 ls.pop_front();
1614 ctx->complete(0);
1615 }
1616 waitfor_read.splice(waitfor_read.end(), ls);
1617 }
1618
1619 int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace)
1620 {
1621 assert(lock.is_locked());
1622 ceph::real_time now = ceph::real_clock::now();
1623 uint64_t bytes_written = 0;
1624 uint64_t bytes_written_in_flush = 0;
1625 bool dontneed = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1626 bool nocache = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1627
1628 for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
1629 ex_it != wr->extents.end();
1630 ++ex_it) {
1631 // get object cache
1632 sobject_t soid(ex_it->oid, CEPH_NOSNAP);
1633 Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1634 ex_it->truncate_size, oset->truncate_seq);
1635
1636 // map it all into a single bufferhead.
1637 BufferHead *bh = o->map_write(*ex_it, wr->journal_tid);
1638 bool missing = bh->is_missing();
1639 bh->snapc = wr->snapc;
1640
1641 bytes_written += ex_it->length;
1642 if (bh->is_tx()) {
1643 bytes_written_in_flush += ex_it->length;
1644 }
1645
1646 // adjust buffer pointers (ie "copy" data into my cache)
1647 // this is over a single ObjectExtent, so we know that
1648 // - there is one contiguous bh
1649 // - the buffer frags need not be (and almost certainly aren't)
1650 // note: i assume striping is monotonic... no jumps backwards, ever!
1651 loff_t opos = ex_it->offset;
1652 for (vector<pair<uint64_t, uint64_t> >::iterator f_it
1653 = ex_it->buffer_extents.begin();
1654 f_it != ex_it->buffer_extents.end();
1655 ++f_it) {
1656 ldout(cct, 10) << "writex writing " << f_it->first << "~"
1657 << f_it->second << " into " << *bh << " at " << opos
1658 << dendl;
1659 uint64_t bhoff = bh->start() - opos;
1660 assert(f_it->second <= bh->length() - bhoff);
1661
1662 // get the frag we're mapping in
1663 bufferlist frag;
1664 frag.substr_of(wr->bl,
1665 f_it->first, f_it->second);
1666
1667 // keep anything left of bhoff
1668 bufferlist newbl;
1669 if (bhoff)
1670 newbl.substr_of(bh->bl, 0, bhoff);
1671 newbl.claim_append(frag);
1672 bh->bl.swap(newbl);
1673
1674 opos += f_it->second;
1675 }
1676
1677 // ok, now bh is dirty.
1678 mark_dirty(bh);
1679 if (dontneed)
1680 bh->set_dontneed(true);
1681 else if (nocache && missing)
1682 bh->set_nocache(true);
1683 else
1684 touch_bh(bh);
1685
1686 bh->last_write = now;
1687
1688 o->try_merge_bh(bh);
1689 }
1690
1691 if (perfcounter) {
1692 perfcounter->inc(l_objectcacher_data_written, bytes_written);
1693 if (bytes_written_in_flush) {
1694 perfcounter->inc(l_objectcacher_overwritten_in_flush,
1695 bytes_written_in_flush);
1696 }
1697 }
1698
1699 int r = _wait_for_write(wr, bytes_written, oset, onfreespace);
1700 delete wr;
1701
1702 //verify_stats();
1703 trim();
1704 return r;
1705 }
1706
1707 class ObjectCacher::C_WaitForWrite : public Context {
1708 public:
1709 C_WaitForWrite(ObjectCacher *oc, uint64_t len, Context *onfinish) :
1710 m_oc(oc), m_len(len), m_onfinish(onfinish) {}
1711 void finish(int r) override;
1712 private:
1713 ObjectCacher *m_oc;
1714 uint64_t m_len;
1715 Context *m_onfinish;
1716 };
1717
1718 void ObjectCacher::C_WaitForWrite::finish(int r)
1719 {
1720 Mutex::Locker l(m_oc->lock);
1721 m_oc->maybe_wait_for_writeback(m_len);
1722 m_onfinish->complete(r);
1723 }
1724
1725 void ObjectCacher::maybe_wait_for_writeback(uint64_t len)
1726 {
1727 assert(lock.is_locked());
1728 ceph::mono_time start = ceph::mono_clock::now();
1729 int blocked = 0;
1730 // wait for writeback?
1731 // - wait for dirty and tx bytes (relative to the max_dirty threshold)
1732 // - do not wait for bytes other waiters are waiting on. this means that
1733 // threads do not wait for each other. this effectively allows the cache
1734 // size to balloon proportional to the data that is in flight.
1735 while (get_stat_dirty() + get_stat_tx() > 0 &&
1736 (uint64_t) (get_stat_dirty() + get_stat_tx()) >=
1737 max_dirty + get_stat_dirty_waiting()) {
1738 ldout(cct, 10) << __func__ << " waiting for dirty|tx "
1739 << (get_stat_dirty() + get_stat_tx()) << " >= max "
1740 << max_dirty << " + dirty_waiting "
1741 << get_stat_dirty_waiting() << dendl;
1742 flusher_cond.Signal();
1743 stat_dirty_waiting += len;
1744 stat_cond.Wait(lock);
1745 stat_dirty_waiting -= len;
1746 ++blocked;
1747 ldout(cct, 10) << __func__ << " woke up" << dendl;
1748 }
1749 if (blocked && perfcounter) {
1750 perfcounter->inc(l_objectcacher_write_ops_blocked);
1751 perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
1752 ceph::timespan blocked = ceph::mono_clock::now() - start;
1753 perfcounter->tinc(l_objectcacher_write_time_blocked, blocked);
1754 }
1755 }
1756
1757 // blocking wait for write.
1758 int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
1759 Context *onfreespace)
1760 {
1761 assert(lock.is_locked());
1762 int ret = 0;
1763
1764 if (max_dirty > 0) {
1765 if (block_writes_upfront) {
1766 maybe_wait_for_writeback(len);
1767 if (onfreespace)
1768 onfreespace->complete(0);
1769 } else {
1770 assert(onfreespace);
1771 finisher.queue(new C_WaitForWrite(this, len, onfreespace));
1772 }
1773 } else {
1774 // write-thru! flush what we just wrote.
1775 Cond cond;
1776 bool done = false;
1777 Context *fin = block_writes_upfront ?
1778 new C_Cond(&cond, &done, &ret) : onfreespace;
1779 assert(fin);
1780 bool flushed = flush_set(oset, wr->extents, fin);
1781 assert(!flushed); // we just dirtied it, and didn't drop our lock!
1782 ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len
1783 << " bytes" << dendl;
1784 if (block_writes_upfront) {
1785 while (!done)
1786 cond.Wait(lock);
1787 ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl;
1788 if (onfreespace)
1789 onfreespace->complete(ret);
1790 }
1791 }
1792
1793 // start writeback anyway?
1794 if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty) {
1795 ldout(cct, 10) << "wait_for_write " << get_stat_dirty() << " > target "
1796 << target_dirty << ", nudging flusher" << dendl;
1797 flusher_cond.Signal();
1798 }
1799 return ret;
1800 }
1801
1802 void ObjectCacher::flusher_entry()
1803 {
1804 ldout(cct, 10) << "flusher start" << dendl;
1805 lock.Lock();
1806 while (!flusher_stop) {
1807 loff_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() +
1808 get_stat_dirty();
1809 ldout(cct, 11) << "flusher "
1810 << all << " / " << max_size << ": "
1811 << get_stat_tx() << " tx, "
1812 << get_stat_rx() << " rx, "
1813 << get_stat_clean() << " clean, "
1814 << get_stat_dirty() << " dirty ("
1815 << target_dirty << " target, "
1816 << max_dirty << " max)"
1817 << dendl;
1818 loff_t actual = get_stat_dirty() + get_stat_dirty_waiting();
1819 if (actual > 0 && (uint64_t) actual > target_dirty) {
1820 // flush some dirty pages
1821 ldout(cct, 10) << "flusher " << get_stat_dirty() << " dirty + "
1822 << get_stat_dirty_waiting() << " dirty_waiting > target "
1823 << target_dirty << ", flushing some dirty bhs" << dendl;
1824 flush(actual - target_dirty);
1825 } else {
1826 // check tail of lru for old dirty items
1827 ceph::real_time cutoff = ceph::real_clock::now();
1828 cutoff -= max_dirty_age;
1829 BufferHead *bh = 0;
1830 int max = MAX_FLUSH_UNDER_LOCK;
1831 while ((bh = static_cast<BufferHead*>(bh_lru_dirty.
1832 lru_get_next_expire())) != 0 &&
1833 bh->last_write <= cutoff &&
1834 max > 0) {
1835 ldout(cct, 10) << "flusher flushing aged dirty bh " << *bh << dendl;
1836 if (scattered_write) {
1837 bh_write_adjacencies(bh, cutoff, NULL, &max);
1838 } else {
1839 bh_write(bh);
1840 --max;
1841 }
1842 }
1843 if (!max) {
1844 // back off the lock to avoid starving other threads
1845 lock.Unlock();
1846 lock.Lock();
1847 continue;
1848 }
1849 }
1850 if (flusher_stop)
1851 break;
1852
1853 flusher_cond.WaitInterval(lock, seconds(1));
1854 }
1855
1856 /* Wait for reads to finish. This is only possible if handling
1857 * -ENOENT made some read completions finish before their rados read
1858 * came back. If we don't wait for them, and destroy the cache, when
1859 * the rados reads do come back their callback will try to access the
1860 * no-longer-valid ObjectCacher.
1861 */
1862 while (reads_outstanding > 0) {
1863 ldout(cct, 10) << "Waiting for all reads to complete. Number left: "
1864 << reads_outstanding << dendl;
1865 read_cond.Wait(lock);
1866 }
1867
1868 lock.Unlock();
1869 ldout(cct, 10) << "flusher finish" << dendl;
1870 }
1871
1872
1873 // -------------------------------------------------
1874
1875 bool ObjectCacher::set_is_empty(ObjectSet *oset)
1876 {
1877 assert(lock.is_locked());
1878 if (oset->objects.empty())
1879 return true;
1880
1881 for (xlist<Object*>::iterator p = oset->objects.begin(); !p.end(); ++p)
1882 if (!(*p)->is_empty())
1883 return false;
1884
1885 return true;
1886 }
1887
1888 bool ObjectCacher::set_is_cached(ObjectSet *oset)
1889 {
1890 assert(lock.is_locked());
1891 if (oset->objects.empty())
1892 return false;
1893
1894 for (xlist<Object*>::iterator p = oset->objects.begin();
1895 !p.end(); ++p) {
1896 Object *ob = *p;
1897 for (map<loff_t,BufferHead*>::iterator q = ob->data.begin();
1898 q != ob->data.end();
1899 ++q) {
1900 BufferHead *bh = q->second;
1901 if (!bh->is_dirty() && !bh->is_tx())
1902 return true;
1903 }
1904 }
1905
1906 return false;
1907 }
1908
1909 bool ObjectCacher::set_is_dirty_or_committing(ObjectSet *oset)
1910 {
1911 assert(lock.is_locked());
1912 if (oset->objects.empty())
1913 return false;
1914
1915 for (xlist<Object*>::iterator i = oset->objects.begin();
1916 !i.end(); ++i) {
1917 Object *ob = *i;
1918
1919 for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
1920 p != ob->data.end();
1921 ++p) {
1922 BufferHead *bh = p->second;
1923 if (bh->is_dirty() || bh->is_tx())
1924 return true;
1925 }
1926 }
1927
1928 return false;
1929 }
1930
1931
1932 // purge. non-blocking. violently removes dirty buffers from cache.
1933 void ObjectCacher::purge(Object *ob)
1934 {
1935 assert(lock.is_locked());
1936 ldout(cct, 10) << "purge " << *ob << dendl;
1937
1938 ob->truncate(0);
1939 }
1940
1941
1942 // flush. non-blocking. no callback.
1943 // true if clean, already flushed.
1944 // false if we wrote something.
1945 // be sloppy about the ranges and flush any buffer it touches
1946 bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length)
1947 {
1948 assert(lock.is_locked());
1949 list<BufferHead*> blist;
1950 bool clean = true;
1951 ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl;
1952 for (map<loff_t,BufferHead*>::const_iterator p = ob->data_lower_bound(offset);
1953 p != ob->data.end();
1954 ++p) {
1955 BufferHead *bh = p->second;
1956 ldout(cct, 20) << "flush " << *bh << dendl;
1957 if (length && bh->start() > offset+length) {
1958 break;
1959 }
1960 if (bh->is_tx()) {
1961 clean = false;
1962 continue;
1963 }
1964 if (!bh->is_dirty()) {
1965 continue;
1966 }
1967
1968 if (scattered_write)
1969 blist.push_back(bh);
1970 else
1971 bh_write(bh);
1972 clean = false;
1973 }
1974 if (scattered_write && !blist.empty())
1975 bh_write_scattered(blist);
1976
1977 return clean;
1978 }
1979
1980 bool ObjectCacher::_flush_set_finish(C_GatherBuilder *gather,
1981 Context *onfinish)
1982 {
1983 assert(lock.is_locked());
1984 if (gather->has_subs()) {
1985 gather->set_finisher(onfinish);
1986 gather->activate();
1987 return false;
1988 }
1989
1990 ldout(cct, 10) << "flush_set has no dirty|tx bhs" << dendl;
1991 onfinish->complete(0);
1992 return true;
1993 }
1994
1995 // flush. non-blocking, takes callback.
1996 // returns true if already flushed
1997 bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
1998 {
1999 assert(lock.is_locked());
2000 assert(onfinish != NULL);
2001 if (oset->objects.empty()) {
2002 ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2003 onfinish->complete(0);
2004 return true;
2005 }
2006
2007 ldout(cct, 10) << "flush_set " << oset << dendl;
2008
2009 // we'll need to wait for all objects to flush!
2010 C_GatherBuilder gather(cct);
2011 set<Object*> waitfor_commit;
2012
2013 list<BufferHead*> blist;
2014 Object *last_ob = NULL;
2015 set<BufferHead*, BufferHead::ptr_lt>::const_iterator it, p, q;
2016
2017 // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
2018 // order. But items in oset->objects are not sorted. So the iterator can
2019 // point to any buffer head in the ObjectSet
2020 BufferHead key(*oset->objects.begin());
2021 it = dirty_or_tx_bh.lower_bound(&key);
2022 p = q = it;
2023
2024 bool backwards = true;
2025 if (it != dirty_or_tx_bh.begin())
2026 --it;
2027 else
2028 backwards = false;
2029
2030 for (; p != dirty_or_tx_bh.end(); p = q) {
2031 ++q;
2032 BufferHead *bh = *p;
2033 if (bh->ob->oset != oset)
2034 break;
2035 waitfor_commit.insert(bh->ob);
2036 if (bh->is_dirty()) {
2037 if (scattered_write) {
2038 if (last_ob != bh->ob) {
2039 if (!blist.empty()) {
2040 bh_write_scattered(blist);
2041 blist.clear();
2042 }
2043 last_ob = bh->ob;
2044 }
2045 blist.push_back(bh);
2046 } else {
2047 bh_write(bh);
2048 }
2049 }
2050 }
2051
2052 if (backwards) {
2053 for(p = q = it; true; p = q) {
2054 if (q != dirty_or_tx_bh.begin())
2055 --q;
2056 else
2057 backwards = false;
2058 BufferHead *bh = *p;
2059 if (bh->ob->oset != oset)
2060 break;
2061 waitfor_commit.insert(bh->ob);
2062 if (bh->is_dirty()) {
2063 if (scattered_write) {
2064 if (last_ob != bh->ob) {
2065 if (!blist.empty()) {
2066 bh_write_scattered(blist);
2067 blist.clear();
2068 }
2069 last_ob = bh->ob;
2070 }
2071 blist.push_front(bh);
2072 } else {
2073 bh_write(bh);
2074 }
2075 }
2076 if (!backwards)
2077 break;
2078 }
2079 }
2080
2081 if (scattered_write && !blist.empty())
2082 bh_write_scattered(blist);
2083
2084 for (set<Object*>::iterator i = waitfor_commit.begin();
2085 i != waitfor_commit.end(); ++i) {
2086 Object *ob = *i;
2087
2088 // we'll need to gather...
2089 ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2090 << ob->last_write_tid << " on " << *ob << dendl;
2091 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2092 }
2093
2094 return _flush_set_finish(&gather, onfinish);
2095 }
2096
2097 // flush. non-blocking, takes callback.
2098 // returns true if already flushed
2099 bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv,
2100 Context *onfinish)
2101 {
2102 assert(lock.is_locked());
2103 assert(onfinish != NULL);
2104 if (oset->objects.empty()) {
2105 ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2106 onfinish->complete(0);
2107 return true;
2108 }
2109
2110 ldout(cct, 10) << "flush_set " << oset << " on " << exv.size()
2111 << " ObjectExtents" << dendl;
2112
2113 // we'll need to wait for all objects to flush!
2114 C_GatherBuilder gather(cct);
2115
2116 for (vector<ObjectExtent>::iterator p = exv.begin();
2117 p != exv.end();
2118 ++p) {
2119 ObjectExtent &ex = *p;
2120 sobject_t soid(ex.oid, CEPH_NOSNAP);
2121 if (objects[oset->poolid].count(soid) == 0)
2122 continue;
2123 Object *ob = objects[oset->poolid][soid];
2124
2125 ldout(cct, 20) << "flush_set " << oset << " ex " << ex << " ob " << soid
2126 << " " << ob << dendl;
2127
2128 if (!flush(ob, ex.offset, ex.length)) {
2129 // we'll need to gather...
2130 ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2131 << ob->last_write_tid << " on " << *ob << dendl;
2132 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2133 }
2134 }
2135
2136 return _flush_set_finish(&gather, onfinish);
2137 }
2138
2139 // flush all dirty data. non-blocking, takes callback.
2140 // returns true if already flushed
2141 bool ObjectCacher::flush_all(Context *onfinish)
2142 {
2143 assert(lock.is_locked());
2144 assert(onfinish != NULL);
2145
2146 ldout(cct, 10) << "flush_all " << dendl;
2147
2148 // we'll need to wait for all objects to flush!
2149 C_GatherBuilder gather(cct);
2150 set<Object*> waitfor_commit;
2151
2152 list<BufferHead*> blist;
2153 Object *last_ob = NULL;
2154 set<BufferHead*, BufferHead::ptr_lt>::iterator next, it;
2155 next = it = dirty_or_tx_bh.begin();
2156 while (it != dirty_or_tx_bh.end()) {
2157 ++next;
2158 BufferHead *bh = *it;
2159 waitfor_commit.insert(bh->ob);
2160
2161 if (bh->is_dirty()) {
2162 if (scattered_write) {
2163 if (last_ob != bh->ob) {
2164 if (!blist.empty()) {
2165 bh_write_scattered(blist);
2166 blist.clear();
2167 }
2168 last_ob = bh->ob;
2169 }
2170 blist.push_back(bh);
2171 } else {
2172 bh_write(bh);
2173 }
2174 }
2175
2176 it = next;
2177 }
2178
2179 if (scattered_write && !blist.empty())
2180 bh_write_scattered(blist);
2181
2182 for (set<Object*>::iterator i = waitfor_commit.begin();
2183 i != waitfor_commit.end();
2184 ++i) {
2185 Object *ob = *i;
2186
2187 // we'll need to gather...
2188 ldout(cct, 10) << "flush_all will wait for ack tid "
2189 << ob->last_write_tid << " on " << *ob << dendl;
2190 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2191 }
2192
2193 return _flush_set_finish(&gather, onfinish);
2194 }
2195
2196 void ObjectCacher::purge_set(ObjectSet *oset)
2197 {
2198 assert(lock.is_locked());
2199 if (oset->objects.empty()) {
2200 ldout(cct, 10) << "purge_set on " << oset << " dne" << dendl;
2201 return;
2202 }
2203
2204 ldout(cct, 10) << "purge_set " << oset << dendl;
2205 const bool were_dirty = oset->dirty_or_tx > 0;
2206
2207 for (xlist<Object*>::iterator i = oset->objects.begin();
2208 !i.end(); ++i) {
2209 Object *ob = *i;
2210 purge(ob);
2211 }
2212
2213 // Although we have purged rather than flushed, caller should still
2214 // drop any resources associate with dirty data.
2215 assert(oset->dirty_or_tx == 0);
2216 if (flush_set_callback && were_dirty) {
2217 flush_set_callback(flush_set_callback_arg, oset);
2218 }
2219 }
2220
2221
2222 loff_t ObjectCacher::release(Object *ob)
2223 {
2224 assert(lock.is_locked());
2225 list<BufferHead*> clean;
2226 loff_t o_unclean = 0;
2227
2228 for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
2229 p != ob->data.end();
2230 ++p) {
2231 BufferHead *bh = p->second;
2232 if (bh->is_clean() || bh->is_zero() || bh->is_error())
2233 clean.push_back(bh);
2234 else
2235 o_unclean += bh->length();
2236 }
2237
2238 for (list<BufferHead*>::iterator p = clean.begin();
2239 p != clean.end();
2240 ++p) {
2241 bh_remove(ob, *p);
2242 delete *p;
2243 }
2244
2245 if (ob->can_close()) {
2246 ldout(cct, 10) << "release trimming " << *ob << dendl;
2247 close_object(ob);
2248 assert(o_unclean == 0);
2249 return 0;
2250 }
2251
2252 if (ob->complete) {
2253 ldout(cct, 10) << "release clearing complete on " << *ob << dendl;
2254 ob->complete = false;
2255 }
2256 if (!ob->exists) {
2257 ldout(cct, 10) << "release setting exists on " << *ob << dendl;
2258 ob->exists = true;
2259 }
2260
2261 return o_unclean;
2262 }
2263
2264 loff_t ObjectCacher::release_set(ObjectSet *oset)
2265 {
2266 assert(lock.is_locked());
2267 // return # bytes not clean (and thus not released).
2268 loff_t unclean = 0;
2269
2270 if (oset->objects.empty()) {
2271 ldout(cct, 10) << "release_set on " << oset << " dne" << dendl;
2272 return 0;
2273 }
2274
2275 ldout(cct, 10) << "release_set " << oset << dendl;
2276
2277 xlist<Object*>::iterator q;
2278 for (xlist<Object*>::iterator p = oset->objects.begin();
2279 !p.end(); ) {
2280 q = p;
2281 ++q;
2282 Object *ob = *p;
2283
2284 loff_t o_unclean = release(ob);
2285 unclean += o_unclean;
2286
2287 if (o_unclean)
2288 ldout(cct, 10) << "release_set " << oset << " " << *ob
2289 << " has " << o_unclean << " bytes left"
2290 << dendl;
2291 p = q;
2292 }
2293
2294 if (unclean) {
2295 ldout(cct, 10) << "release_set " << oset
2296 << ", " << unclean << " bytes left" << dendl;
2297 }
2298
2299 return unclean;
2300 }
2301
2302
2303 uint64_t ObjectCacher::release_all()
2304 {
2305 assert(lock.is_locked());
2306 ldout(cct, 10) << "release_all" << dendl;
2307 uint64_t unclean = 0;
2308
2309 vector<ceph::unordered_map<sobject_t, Object*> >::iterator i
2310 = objects.begin();
2311 while (i != objects.end()) {
2312 ceph::unordered_map<sobject_t, Object*>::iterator p = i->begin();
2313 while (p != i->end()) {
2314 ceph::unordered_map<sobject_t, Object*>::iterator n = p;
2315 ++n;
2316
2317 Object *ob = p->second;
2318
2319 loff_t o_unclean = release(ob);
2320 unclean += o_unclean;
2321
2322 if (o_unclean)
2323 ldout(cct, 10) << "release_all " << *ob
2324 << " has " << o_unclean << " bytes left"
2325 << dendl;
2326 p = n;
2327 }
2328 ++i;
2329 }
2330
2331 if (unclean) {
2332 ldout(cct, 10) << "release_all unclean " << unclean << " bytes left"
2333 << dendl;
2334 }
2335
2336 return unclean;
2337 }
2338
2339 void ObjectCacher::clear_nonexistence(ObjectSet *oset)
2340 {
2341 assert(lock.is_locked());
2342 ldout(cct, 10) << "clear_nonexistence() " << oset << dendl;
2343
2344 for (xlist<Object*>::iterator p = oset->objects.begin();
2345 !p.end(); ++p) {
2346 Object *ob = *p;
2347 if (!ob->exists) {
2348 ldout(cct, 10) << " setting exists and complete on " << *ob << dendl;
2349 ob->exists = true;
2350 ob->complete = false;
2351 }
2352 for (xlist<C_ReadFinish*>::iterator q = ob->reads.begin();
2353 !q.end(); ++q) {
2354 C_ReadFinish *comp = *q;
2355 comp->distrust_enoent();
2356 }
2357 }
2358 }
2359
2360 /**
2361 * discard object extents from an ObjectSet by removing the objects in
2362 * exls from the in-memory oset.
2363 */
2364 void ObjectCacher::discard_set(ObjectSet *oset, const vector<ObjectExtent>& exls)
2365 {
2366 assert(lock.is_locked());
2367 if (oset->objects.empty()) {
2368 ldout(cct, 10) << "discard_set on " << oset << " dne" << dendl;
2369 return;
2370 }
2371
2372 ldout(cct, 10) << "discard_set " << oset << dendl;
2373
2374 bool were_dirty = oset->dirty_or_tx > 0;
2375
2376 for (vector<ObjectExtent>::const_iterator p = exls.begin();
2377 p != exls.end();
2378 ++p) {
2379 ldout(cct, 10) << "discard_set " << oset << " ex " << *p << dendl;
2380 const ObjectExtent &ex = *p;
2381 sobject_t soid(ex.oid, CEPH_NOSNAP);
2382 if (objects[oset->poolid].count(soid) == 0)
2383 continue;
2384 Object *ob = objects[oset->poolid][soid];
2385
2386 ob->discard(ex.offset, ex.length);
2387 }
2388
2389 // did we truncate off dirty data?
2390 if (flush_set_callback &&
2391 were_dirty && oset->dirty_or_tx == 0)
2392 flush_set_callback(flush_set_callback_arg, oset);
2393 }
2394
2395 void ObjectCacher::verify_stats() const
2396 {
2397 assert(lock.is_locked());
2398 ldout(cct, 10) << "verify_stats" << dendl;
2399
2400 loff_t clean = 0, zero = 0, dirty = 0, rx = 0, tx = 0, missing = 0,
2401 error = 0;
2402 for (vector<ceph::unordered_map<sobject_t, Object*> >::const_iterator i
2403 = objects.begin();
2404 i != objects.end();
2405 ++i) {
2406 for (ceph::unordered_map<sobject_t, Object*>::const_iterator p
2407 = i->begin();
2408 p != i->end();
2409 ++p) {
2410 Object *ob = p->second;
2411 for (map<loff_t, BufferHead*>::const_iterator q = ob->data.begin();
2412 q != ob->data.end();
2413 ++q) {
2414 BufferHead *bh = q->second;
2415 switch (bh->get_state()) {
2416 case BufferHead::STATE_MISSING:
2417 missing += bh->length();
2418 break;
2419 case BufferHead::STATE_CLEAN:
2420 clean += bh->length();
2421 break;
2422 case BufferHead::STATE_ZERO:
2423 zero += bh->length();
2424 break;
2425 case BufferHead::STATE_DIRTY:
2426 dirty += bh->length();
2427 break;
2428 case BufferHead::STATE_TX:
2429 tx += bh->length();
2430 break;
2431 case BufferHead::STATE_RX:
2432 rx += bh->length();
2433 break;
2434 case BufferHead::STATE_ERROR:
2435 error += bh->length();
2436 break;
2437 default:
2438 ceph_abort();
2439 }
2440 }
2441 }
2442 }
2443
2444 ldout(cct, 10) << " clean " << clean << " rx " << rx << " tx " << tx
2445 << " dirty " << dirty << " missing " << missing
2446 << " error " << error << dendl;
2447 assert(clean == stat_clean);
2448 assert(rx == stat_rx);
2449 assert(tx == stat_tx);
2450 assert(dirty == stat_dirty);
2451 assert(missing == stat_missing);
2452 assert(zero == stat_zero);
2453 assert(error == stat_error);
2454 }
2455
2456 void ObjectCacher::bh_stat_add(BufferHead *bh)
2457 {
2458 assert(lock.is_locked());
2459 switch (bh->get_state()) {
2460 case BufferHead::STATE_MISSING:
2461 stat_missing += bh->length();
2462 break;
2463 case BufferHead::STATE_CLEAN:
2464 stat_clean += bh->length();
2465 break;
2466 case BufferHead::STATE_ZERO:
2467 stat_zero += bh->length();
2468 break;
2469 case BufferHead::STATE_DIRTY:
2470 stat_dirty += bh->length();
2471 bh->ob->dirty_or_tx += bh->length();
2472 bh->ob->oset->dirty_or_tx += bh->length();
2473 break;
2474 case BufferHead::STATE_TX:
2475 stat_tx += bh->length();
2476 bh->ob->dirty_or_tx += bh->length();
2477 bh->ob->oset->dirty_or_tx += bh->length();
2478 break;
2479 case BufferHead::STATE_RX:
2480 stat_rx += bh->length();
2481 break;
2482 case BufferHead::STATE_ERROR:
2483 stat_error += bh->length();
2484 break;
2485 default:
2486 assert(0 == "bh_stat_add: invalid bufferhead state");
2487 }
2488 if (get_stat_dirty_waiting() > 0)
2489 stat_cond.Signal();
2490 }
2491
2492 void ObjectCacher::bh_stat_sub(BufferHead *bh)
2493 {
2494 assert(lock.is_locked());
2495 switch (bh->get_state()) {
2496 case BufferHead::STATE_MISSING:
2497 stat_missing -= bh->length();
2498 break;
2499 case BufferHead::STATE_CLEAN:
2500 stat_clean -= bh->length();
2501 break;
2502 case BufferHead::STATE_ZERO:
2503 stat_zero -= bh->length();
2504 break;
2505 case BufferHead::STATE_DIRTY:
2506 stat_dirty -= bh->length();
2507 bh->ob->dirty_or_tx -= bh->length();
2508 bh->ob->oset->dirty_or_tx -= bh->length();
2509 break;
2510 case BufferHead::STATE_TX:
2511 stat_tx -= bh->length();
2512 bh->ob->dirty_or_tx -= bh->length();
2513 bh->ob->oset->dirty_or_tx -= bh->length();
2514 break;
2515 case BufferHead::STATE_RX:
2516 stat_rx -= bh->length();
2517 break;
2518 case BufferHead::STATE_ERROR:
2519 stat_error -= bh->length();
2520 break;
2521 default:
2522 assert(0 == "bh_stat_sub: invalid bufferhead state");
2523 }
2524 }
2525
2526 void ObjectCacher::bh_set_state(BufferHead *bh, int s)
2527 {
2528 assert(lock.is_locked());
2529 int state = bh->get_state();
2530 // move between lru lists?
2531 if (s == BufferHead::STATE_DIRTY && state != BufferHead::STATE_DIRTY) {
2532 bh_lru_rest.lru_remove(bh);
2533 bh_lru_dirty.lru_insert_top(bh);
2534 } else if (s != BufferHead::STATE_DIRTY &&state == BufferHead::STATE_DIRTY) {
2535 bh_lru_dirty.lru_remove(bh);
2536 if (bh->get_dontneed())
2537 bh_lru_rest.lru_insert_bot(bh);
2538 else
2539 bh_lru_rest.lru_insert_top(bh);
2540 }
2541
2542 if ((s == BufferHead::STATE_TX ||
2543 s == BufferHead::STATE_DIRTY) &&
2544 state != BufferHead::STATE_TX &&
2545 state != BufferHead::STATE_DIRTY) {
2546 dirty_or_tx_bh.insert(bh);
2547 } else if ((state == BufferHead::STATE_TX ||
2548 state == BufferHead::STATE_DIRTY) &&
2549 s != BufferHead::STATE_TX &&
2550 s != BufferHead::STATE_DIRTY) {
2551 dirty_or_tx_bh.erase(bh);
2552 }
2553
2554 if (s != BufferHead::STATE_ERROR &&
2555 state == BufferHead::STATE_ERROR) {
2556 bh->error = 0;
2557 }
2558
2559 // set state
2560 bh_stat_sub(bh);
2561 bh->set_state(s);
2562 bh_stat_add(bh);
2563 }
2564
2565 void ObjectCacher::bh_add(Object *ob, BufferHead *bh)
2566 {
2567 assert(lock.is_locked());
2568 ldout(cct, 30) << "bh_add " << *ob << " " << *bh << dendl;
2569 ob->add_bh(bh);
2570 if (bh->is_dirty()) {
2571 bh_lru_dirty.lru_insert_top(bh);
2572 dirty_or_tx_bh.insert(bh);
2573 } else {
2574 if (bh->get_dontneed())
2575 bh_lru_rest.lru_insert_bot(bh);
2576 else
2577 bh_lru_rest.lru_insert_top(bh);
2578 }
2579
2580 if (bh->is_tx()) {
2581 dirty_or_tx_bh.insert(bh);
2582 }
2583 bh_stat_add(bh);
2584 }
2585
2586 void ObjectCacher::bh_remove(Object *ob, BufferHead *bh)
2587 {
2588 assert(lock.is_locked());
2589 assert(bh->get_journal_tid() == 0);
2590 ldout(cct, 30) << "bh_remove " << *ob << " " << *bh << dendl;
2591 ob->remove_bh(bh);
2592 if (bh->is_dirty()) {
2593 bh_lru_dirty.lru_remove(bh);
2594 dirty_or_tx_bh.erase(bh);
2595 } else {
2596 bh_lru_rest.lru_remove(bh);
2597 }
2598
2599 if (bh->is_tx()) {
2600 dirty_or_tx_bh.erase(bh);
2601 }
2602 bh_stat_sub(bh);
2603 if (get_stat_dirty_waiting() > 0)
2604 stat_cond.Signal();
2605 }
2606