]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/Journaler.cc
bump version to 12.2.12-pve1
[ceph.git] / ceph / src / osdc / Journaler.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "common/perf_counters.h"
16#include "common/dout.h"
17#include "include/Context.h"
18#include "msg/Messenger.h"
19#include "osdc/Journaler.h"
20#include "common/errno.h"
21#include "include/assert.h"
22#include "common/Finisher.h"
23
24#define dout_subsys ceph_subsys_journaler
25#undef dout_prefix
26#define dout_prefix *_dout << objecter->messenger->get_myname() \
27 << ".journaler." << name << (readonly ? "(ro) ":"(rw) ")
28
29using std::chrono::seconds;
30
31
32class Journaler::C_DelayFlush : public Context {
33 Journaler *journaler;
34 public:
35 C_DelayFlush(Journaler *j) : journaler(j) {}
36 void finish(int r) override {
37 journaler->_do_delayed_flush();
38 }
39};
40
41void Journaler::set_readonly()
42{
43 lock_guard l(lock);
44
45 ldout(cct, 1) << "set_readonly" << dendl;
46 readonly = true;
47}
48
49void Journaler::set_writeable()
50{
51 lock_guard l(lock);
52
53 ldout(cct, 1) << "set_writeable" << dendl;
54 readonly = false;
55}
56
57void Journaler::create(file_layout_t *l, stream_format_t const sf)
58{
59 lock_guard lk(lock);
60
61 assert(!readonly);
62 state = STATE_ACTIVE;
63
64 stream_format = sf;
65 journal_stream.set_format(sf);
66 _set_layout(l);
67
68 prezeroing_pos = prezero_pos = write_pos = flush_pos =
69 safe_pos = read_pos = requested_pos = received_pos =
70 expire_pos = trimming_pos = trimmed_pos =
71 next_safe_pos = layout.get_period();
72
73 ldout(cct, 1) << "created blank journal at inode 0x" << std::hex << ino
74 << std::dec << ", format=" << stream_format << dendl;
75}
76
77void Journaler::set_layout(file_layout_t const *l)
78{
79 lock_guard lk(lock);
80 _set_layout(l);
81}
82
83void Journaler::_set_layout(file_layout_t const *l)
84{
85 layout = *l;
86
b32b8144
FG
87 if (layout.pool_id != pg_pool) {
88 // user can reset pool id through cephfs-journal-tool
89 lderr(cct) << "may got older pool id from header layout" << dendl;
90 ceph_abort();
91 }
7c673cae
FG
92 last_written.layout = layout;
93 last_committed.layout = layout;
94
95 // prefetch intelligently.
96 // (watch out, this is big if you use big objects or weird striping)
97 uint64_t periods = cct->_conf->journaler_prefetch_periods;
98 if (periods < 2)
99 periods = 2; // we need at least 2 periods to make progress.
100 fetch_len = layout.get_period() * periods;
101}
102
103
104/***************** HEADER *******************/
105
106ostream& operator<<(ostream &out, const Journaler::Header &h)
107{
108 return out << "loghead(trim " << h.trimmed_pos
109 << ", expire " << h.expire_pos
110 << ", write " << h.write_pos
111 << ", stream_format " << (int)(h.stream_format)
112 << ")";
113}
114
115class Journaler::C_ReadHead : public Context {
116 Journaler *ls;
117public:
118 bufferlist bl;
119 explicit C_ReadHead(Journaler *l) : ls(l) {}
120 void finish(int r) override {
121 ls->_finish_read_head(r, bl);
122 }
123};
124
125class Journaler::C_RereadHead : public Context {
126 Journaler *ls;
127 Context *onfinish;
128public:
129 bufferlist bl;
130 C_RereadHead(Journaler *l, Context *onfinish_) : ls (l),
131 onfinish(onfinish_) {}
132 void finish(int r) override {
133 ls->_finish_reread_head(r, bl, onfinish);
134 }
135};
136
137class Journaler::C_ProbeEnd : public Context {
138 Journaler *ls;
139public:
140 uint64_t end;
141 explicit C_ProbeEnd(Journaler *l) : ls(l), end(-1) {}
142 void finish(int r) override {
143 ls->_finish_probe_end(r, end);
144 }
145};
146
147class Journaler::C_ReProbe : public Context {
148 Journaler *ls;
149 C_OnFinisher *onfinish;
150public:
151 uint64_t end;
152 C_ReProbe(Journaler *l, C_OnFinisher *onfinish_) :
153 ls(l), onfinish(onfinish_), end(0) {}
154 void finish(int r) override {
155 ls->_finish_reprobe(r, end, onfinish);
156 }
157};
158
159void Journaler::recover(Context *onread)
160{
161 lock_guard l(lock);
b32b8144 162 if (is_stopping()) {
7c673cae
FG
163 onread->complete(-EAGAIN);
164 return;
165 }
166
167 ldout(cct, 1) << "recover start" << dendl;
168 assert(state != STATE_ACTIVE);
169 assert(readonly);
170
171 if (onread)
172 waitfor_recover.push_back(wrap_finisher(onread));
173
174 if (state != STATE_UNDEF) {
175 ldout(cct, 1) << "recover - already recovering" << dendl;
176 return;
177 }
178
179 ldout(cct, 1) << "read_head" << dendl;
180 state = STATE_READHEAD;
181 C_ReadHead *fin = new C_ReadHead(this);
182 _read_head(fin, &fin->bl);
183}
184
185void Journaler::_read_head(Context *on_finish, bufferlist *bl)
186{
187 // lock is locked
188 assert(state == STATE_READHEAD || state == STATE_REREADHEAD);
189
190 object_t oid = file_object_t(ino, 0);
191 object_locator_t oloc(pg_pool);
192 objecter->read_full(oid, oloc, CEPH_NOSNAP, bl, 0, wrap_finisher(on_finish));
193}
194
195void Journaler::reread_head(Context *onfinish)
196{
197 lock_guard l(lock);
198 _reread_head(wrap_finisher(onfinish));
199}
200
201/**
202 * Re-read the head from disk, and set the write_pos, expire_pos, trimmed_pos
203 * from the on-disk header. This switches the state to STATE_REREADHEAD for
204 * the duration, and you shouldn't start a re-read while other operations are
205 * in-flight, nor start other operations while a re-read is in progress.
206 * Also, don't call this until the Journaler has finished its recovery and has
207 * gone STATE_ACTIVE!
208 */
209void Journaler::_reread_head(Context *onfinish)
210{
211 ldout(cct, 10) << "reread_head" << dendl;
212 assert(state == STATE_ACTIVE);
213
214 state = STATE_REREADHEAD;
215 C_RereadHead *fin = new C_RereadHead(this, onfinish);
216 _read_head(fin, &fin->bl);
217}
218
219void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
220{
221 lock_guard l(lock);
b32b8144
FG
222 if (is_stopping()) {
223 finish->complete(-EAGAIN);
224 return;
225 }
7c673cae
FG
226
227 //read on-disk header into
228 assert(bl.length() || r < 0 );
229
230 // unpack header
231 if (r == 0) {
232 Header h;
233 bufferlist::iterator p = bl.begin();
234 try {
235 ::decode(h, p);
236 } catch (const buffer::error &e) {
237 finish->complete(-EINVAL);
238 return;
239 }
240 prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos
241 = h.write_pos;
242 expire_pos = h.expire_pos;
243 trimmed_pos = trimming_pos = h.trimmed_pos;
244 init_headers(h);
245 state = STATE_ACTIVE;
246 }
247
248 finish->complete(r);
249}
250
251void Journaler::_finish_read_head(int r, bufferlist& bl)
252{
253 lock_guard l(lock);
b32b8144
FG
254 if (is_stopping())
255 return;
7c673cae
FG
256
257 assert(state == STATE_READHEAD);
258
259 if (r!=0) {
260 ldout(cct, 0) << "error getting journal off disk" << dendl;
261 list<Context*> ls;
262 ls.swap(waitfor_recover);
263 finish_contexts(cct, ls, r);
264 return;
265 }
266
267 if (bl.length() == 0) {
268 ldout(cct, 1) << "_finish_read_head r=" << r
269 << " read 0 bytes, assuming empty log" << dendl;
270 state = STATE_ACTIVE;
271 list<Context*> ls;
272 ls.swap(waitfor_recover);
273 finish_contexts(cct, ls, 0);
274 return;
275 }
276
277 // unpack header
278 bool corrupt = false;
279 Header h;
280 bufferlist::iterator p = bl.begin();
281 try {
282 ::decode(h, p);
283
284 if (h.magic != magic) {
285 ldout(cct, 0) << "on disk magic '" << h.magic << "' != my magic '"
286 << magic << "'" << dendl;
287 corrupt = true;
288 } else if (h.write_pos < h.expire_pos || h.expire_pos < h.trimmed_pos) {
289 ldout(cct, 0) << "Corrupt header (bad offsets): " << h << dendl;
290 corrupt = true;
291 }
292 } catch (const buffer::error &e) {
293 corrupt = true;
294 }
295
296 if (corrupt) {
297 list<Context*> ls;
298 ls.swap(waitfor_recover);
299 finish_contexts(cct, ls, -EINVAL);
300 return;
301 }
302
303 prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos
304 = h.write_pos;
305 read_pos = requested_pos = received_pos = expire_pos = h.expire_pos;
306 trimmed_pos = trimming_pos = h.trimmed_pos;
307
308 init_headers(h);
309 _set_layout(&h.layout);
310 stream_format = h.stream_format;
311 journal_stream.set_format(h.stream_format);
312
313 ldout(cct, 1) << "_finish_read_head " << h
314 << ". probing for end of log (from " << write_pos << ")..."
315 << dendl;
316 C_ProbeEnd *fin = new C_ProbeEnd(this);
317 state = STATE_PROBING;
318 _probe(fin, &fin->end);
319}
320
321void Journaler::_probe(Context *finish, uint64_t *end)
322{
323 // lock is locked
324 ldout(cct, 1) << "probing for end of the log" << dendl;
325 assert(state == STATE_PROBING || state == STATE_REPROBING);
326 // probe the log
327 filer.probe(ino, &layout, CEPH_NOSNAP,
328 write_pos, end, true, 0, wrap_finisher(finish));
329}
330
331void Journaler::_reprobe(C_OnFinisher *finish)
332{
333 ldout(cct, 10) << "reprobe" << dendl;
334 assert(state == STATE_ACTIVE);
335
336 state = STATE_REPROBING;
337 C_ReProbe *fin = new C_ReProbe(this, finish);
338 _probe(fin, &fin->end);
339}
340
341
342void Journaler::_finish_reprobe(int r, uint64_t new_end,
343 C_OnFinisher *onfinish)
344{
345 lock_guard l(lock);
b32b8144
FG
346 if (is_stopping()) {
347 onfinish->complete(-EAGAIN);
348 return;
349 }
7c673cae
FG
350
351 assert(new_end >= write_pos || r < 0);
352 ldout(cct, 1) << "_finish_reprobe new_end = " << new_end
353 << " (header had " << write_pos << ")."
354 << dendl;
355 prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = new_end;
356 state = STATE_ACTIVE;
357 onfinish->complete(r);
358}
359
360void Journaler::_finish_probe_end(int r, uint64_t end)
361{
362 lock_guard l(lock);
b32b8144
FG
363 if (is_stopping())
364 return;
7c673cae
FG
365
366 assert(state == STATE_PROBING);
367 if (r < 0) { // error in probing
368 goto out;
369 }
370 if (((int64_t)end) == -1) {
371 end = write_pos;
372 ldout(cct, 1) << "_finish_probe_end write_pos = " << end << " (header had "
373 << write_pos << "). log was empty. recovered." << dendl;
374 ceph_abort(); // hrm.
375 } else {
376 assert(end >= write_pos);
377 ldout(cct, 1) << "_finish_probe_end write_pos = " << end
378 << " (header had " << write_pos << "). recovered."
379 << dendl;
380 }
381
382 state = STATE_ACTIVE;
383
384 prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = end;
385
386out:
387 // done.
388 list<Context*> ls;
389 ls.swap(waitfor_recover);
390 finish_contexts(cct, ls, r);
391}
392
393class Journaler::C_RereadHeadProbe : public Context
394{
395 Journaler *ls;
396 C_OnFinisher *final_finish;
397public:
398 C_RereadHeadProbe(Journaler *l, C_OnFinisher *finish) :
399 ls(l), final_finish(finish) {}
400 void finish(int r) override {
401 ls->_finish_reread_head_and_probe(r, final_finish);
402 }
403};
404
405void Journaler::reread_head_and_probe(Context *onfinish)
406{
407 lock_guard l(lock);
408
409 assert(state == STATE_ACTIVE);
410 _reread_head(new C_RereadHeadProbe(this, wrap_finisher(onfinish)));
411}
412
413void Journaler::_finish_reread_head_and_probe(int r, C_OnFinisher *onfinish)
414{
415 // Expect to be called back from finish_reread_head, which already takes lock
416 // lock is locked
b32b8144
FG
417 if (is_stopping()) {
418 onfinish->complete(-EAGAIN);
419 return;
420 }
7c673cae
FG
421
422 assert(!r); //if we get an error, we're boned
423 _reprobe(onfinish);
424}
425
426
427// WRITING
428
429class Journaler::C_WriteHead : public Context {
430public:
431 Journaler *ls;
432 Header h;
433 C_OnFinisher *oncommit;
434 C_WriteHead(Journaler *l, Header& h_, C_OnFinisher *c) : ls(l), h(h_),
435 oncommit(c) {}
436 void finish(int r) override {
437 ls->_finish_write_head(r, h, oncommit);
438 }
439};
440
441void Journaler::write_head(Context *oncommit)
442{
443 lock_guard l(lock);
444 _write_head(oncommit);
445}
446
447
448void Journaler::_write_head(Context *oncommit)
449{
450 assert(!readonly);
451 assert(state == STATE_ACTIVE);
452 last_written.trimmed_pos = trimmed_pos;
453 last_written.expire_pos = expire_pos;
454 last_written.unused_field = expire_pos;
455 last_written.write_pos = safe_pos;
456 last_written.stream_format = stream_format;
457 ldout(cct, 10) << "write_head " << last_written << dendl;
458
459 // Avoid persisting bad pointers in case of bugs
460 assert(last_written.write_pos >= last_written.expire_pos);
461 assert(last_written.expire_pos >= last_written.trimmed_pos);
462
463 last_wrote_head = ceph::real_clock::now();
464
465 bufferlist bl;
466 ::encode(last_written, bl);
467 SnapContext snapc;
468
469 object_t oid = file_object_t(ino, 0);
470 object_locator_t oloc(pg_pool);
471 objecter->write_full(oid, oloc, snapc, bl, ceph::real_clock::now(), 0,
472 wrap_finisher(new C_WriteHead(
473 this, last_written,
474 wrap_finisher(oncommit))),
475 0, 0, write_iohint);
476}
477
478void Journaler::_finish_write_head(int r, Header &wrote,
479 C_OnFinisher *oncommit)
480{
481 lock_guard l(lock);
482
483 if (r < 0) {
484 lderr(cct) << "_finish_write_head got " << cpp_strerror(r) << dendl;
485 handle_write_error(r);
486 return;
487 }
488 assert(!readonly);
489 ldout(cct, 10) << "_finish_write_head " << wrote << dendl;
490 last_committed = wrote;
491 if (oncommit) {
492 oncommit->complete(r);
493 }
494
495 _trim(); // trim?
496}
497
498
499/***************** WRITING *******************/
500
501class Journaler::C_Flush : public Context {
502 Journaler *ls;
503 uint64_t start;
504 ceph::real_time stamp;
505public:
506 C_Flush(Journaler *l, int64_t s, ceph::real_time st)
507 : ls(l), start(s), stamp(st) {}
508 void finish(int r) override {
509 ls->_finish_flush(r, start, stamp);
510 }
511};
512
513void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
514{
515 lock_guard l(lock);
516 assert(!readonly);
517
518 if (r < 0) {
519 lderr(cct) << "_finish_flush got " << cpp_strerror(r) << dendl;
520 handle_write_error(r);
521 return;
522 }
523
524 assert(start < flush_pos);
525
526 // calc latency?
527 if (logger) {
528 ceph::timespan lat = ceph::real_clock::now() - stamp;
529 logger->tinc(logger_key_lat, lat);
530 }
531
532 // adjust safe_pos
533 auto it = pending_safe.find(start);
534 assert(it != pending_safe.end());
f64942e4 535 uint64_t min_next_safe_pos = pending_safe.begin()->second;
7c673cae
FG
536 pending_safe.erase(it);
537 if (pending_safe.empty())
538 safe_pos = next_safe_pos;
539 else
f64942e4 540 safe_pos = min_next_safe_pos;
7c673cae
FG
541
542 ldout(cct, 10) << "_finish_flush safe from " << start
543 << ", pending_safe " << pending_safe
544 << ", (prezeroing/prezero)/write/flush/safe positions now "
545 << "(" << prezeroing_pos << "/" << prezero_pos << ")/"
546 << write_pos << "/" << flush_pos << "/" << safe_pos
547 << dendl;
548
549 // kick waiters <= safe_pos
31f18b77
FG
550 if (!waitfor_safe.empty()) {
551 list<Context*> ls;
552 while (!waitfor_safe.empty()) {
553 auto it = waitfor_safe.begin();
554 if (it->first > safe_pos)
555 break;
556 ls.splice(ls.end(), it->second);
557 waitfor_safe.erase(it);
558 }
559 finish_contexts(cct, ls);
7c673cae
FG
560 }
561}
562
563
564
565uint64_t Journaler::append_entry(bufferlist& bl)
566{
567 unique_lock l(lock);
568
569 assert(!readonly);
570 uint32_t s = bl.length();
571
572 // append
573 size_t delta = bl.length() + journal_stream.get_envelope_size();
574 // write_buf space is nearly full
575 if (!write_buf_throttle.get_or_fail(delta)) {
576 l.unlock();
577 ldout(cct, 10) << "write_buf_throttle wait, delta " << delta << dendl;
578 write_buf_throttle.get(delta);
579 l.lock();
580 }
581 ldout(cct, 20) << "write_buf_throttle get, delta " << delta << dendl;
582 size_t wrote = journal_stream.write(bl, &write_buf, write_pos);
583 ldout(cct, 10) << "append_entry len " << s << " to " << write_pos << "~"
584 << wrote << dendl;
585 write_pos += wrote;
586
587 // flush previous object?
588 uint64_t su = get_layout_period();
589 assert(su > 0);
590 uint64_t write_off = write_pos % su;
591 uint64_t write_obj = write_pos / su;
592 uint64_t flush_obj = flush_pos / su;
593 if (write_obj != flush_obj) {
594 ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro "
595 << write_obj << " flo " << flush_obj << ")" << dendl;
596 _do_flush(write_buf.length() - write_off);
31f18b77
FG
597
598 // if _do_flush() skips flushing some data, it does do a best effort to
599 // update next_safe_pos.
600 if (write_buf.length() > 0 &&
601 write_buf.length() <= wrote) { // the unflushed data are within this entry
602 // set next_safe_pos to end of previous entry
7c673cae
FG
603 next_safe_pos = write_pos - wrote;
604 }
605 }
606
607 return write_pos;
608}
609
610
611void Journaler::_do_flush(unsigned amount)
612{
b32b8144
FG
613 if (is_stopping())
614 return;
7c673cae
FG
615 if (write_pos == flush_pos)
616 return;
617 assert(write_pos > flush_pos);
618 assert(!readonly);
619
620 // flush
621 uint64_t len = write_pos - flush_pos;
622 assert(len == write_buf.length());
623 if (amount && amount < len)
624 len = amount;
625
626 // zero at least two full periods ahead. this ensures
627 // that the next object will not exist.
628 uint64_t period = get_layout_period();
629 if (flush_pos + len + 2*period > prezero_pos) {
630 _issue_prezero();
631
632 int64_t newlen = prezero_pos - flush_pos - period;
633 if (newlen <= 0) {
634 ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len
635 << " already too close to prezero_pos " << prezero_pos
636 << ", zeroing first" << dendl;
94b18763 637 waiting_for_zero_pos = flush_pos + len;
7c673cae
FG
638 return;
639 }
640 if (static_cast<uint64_t>(newlen) < len) {
641 ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len
642 << " but hit prezero_pos " << prezero_pos
643 << ", will do " << flush_pos << "~" << newlen << dendl;
94b18763 644 waiting_for_zero_pos = flush_pos + len;
7c673cae 645 len = newlen;
7c673cae 646 }
7c673cae
FG
647 }
648 ldout(cct, 10) << "_do_flush flushing " << flush_pos << "~" << len << dendl;
649
650 // submit write for anything pending
651 // flush _start_ pos to _finish_flush
652 ceph::real_time now = ceph::real_clock::now();
653 SnapContext snapc;
654
655 Context *onsafe = new C_Flush(this, flush_pos, now); // on COMMIT
656 pending_safe[flush_pos] = next_safe_pos;
657
658 bufferlist write_bl;
659
660 // adjust pointers
661 if (len == write_buf.length()) {
662 write_bl.swap(write_buf);
663 next_safe_pos = write_pos;
664 } else {
665 write_buf.splice(0, len, &write_bl);
31f18b77
FG
666 // Keys of waitfor_safe map are journal entry boundaries.
667 // Try finding a journal entry that we are actually flushing
668 // and set next_safe_pos to end of it. This is best effort.
669 // The one we found may not be the lastest flushing entry.
670 auto p = waitfor_safe.lower_bound(flush_pos + len);
671 if (p != waitfor_safe.end()) {
672 if (p->first > flush_pos + len && p != waitfor_safe.begin())
673 --p;
674 if (p->first <= flush_pos + len && p->first > next_safe_pos)
675 next_safe_pos = p->first;
676 }
7c673cae
FG
677 }
678
679 filer.write(ino, &layout, snapc,
680 flush_pos, len, write_bl, ceph::real_clock::now(),
681 0,
682 wrap_finisher(onsafe), write_iohint);
683
684 flush_pos += len;
685 assert(write_buf.length() == write_pos - flush_pos);
686 write_buf_throttle.put(len);
687 ldout(cct, 20) << "write_buf_throttle put, len " << len << dendl;
688
689 ldout(cct, 10)
690 << "_do_flush (prezeroing/prezero)/write/flush/safe pointers now at "
691 << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos
692 << "/" << flush_pos << "/" << safe_pos << dendl;
693
694 _issue_prezero();
695}
696
697
698void Journaler::wait_for_flush(Context *onsafe)
699{
700 lock_guard l(lock);
b32b8144 701 if (is_stopping()) {
7c673cae
FG
702 onsafe->complete(-EAGAIN);
703 return;
704 }
705 _wait_for_flush(onsafe);
706}
707
708void Journaler::_wait_for_flush(Context *onsafe)
709{
710 assert(!readonly);
711
712 // all flushed and safe?
713 if (write_pos == safe_pos) {
714 assert(write_buf.length() == 0);
715 ldout(cct, 10)
716 << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe "
717 "pointers at " << "(" << prezeroing_pos << "/" << prezero_pos << ")/"
718 << write_pos << "/" << flush_pos << "/" << safe_pos << dendl;
719 if (onsafe) {
720 finisher->queue(onsafe, 0);
721 }
722 return;
723 }
724
725 // queue waiter
726 if (onsafe) {
727 waitfor_safe[write_pos].push_back(wrap_finisher(onsafe));
728 }
729}
730
731void Journaler::flush(Context *onsafe)
732{
733 lock_guard l(lock);
b32b8144
FG
734 if (is_stopping()) {
735 onsafe->complete(-EAGAIN);
736 return;
737 }
7c673cae
FG
738 _flush(wrap_finisher(onsafe));
739}
740
741void Journaler::_flush(C_OnFinisher *onsafe)
742{
743 assert(!readonly);
744
745 if (write_pos == flush_pos) {
746 assert(write_buf.length() == 0);
747 ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/"
748 "flush/safe pointers at " << "(" << prezeroing_pos << "/" << prezero_pos
749 << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos
750 << dendl;
751 if (onsafe) {
752 onsafe->complete(0);
753 }
754 } else {
755 _do_flush();
756 _wait_for_flush(onsafe);
757 }
758
759 // write head?
760 if (_write_head_needed()) {
761 _write_head();
762 }
763}
764
765bool Journaler::_write_head_needed()
766{
767 return last_wrote_head + seconds(cct->_conf->journaler_write_head_interval)
768 < ceph::real_clock::now();
769}
770
771
772/*************** prezeroing ******************/
773
774struct C_Journaler_Prezero : public Context {
775 Journaler *journaler;
776 uint64_t from, len;
777 C_Journaler_Prezero(Journaler *j, uint64_t f, uint64_t l)
778 : journaler(j), from(f), len(l) {}
779 void finish(int r) override {
780 journaler->_finish_prezero(r, from, len);
781 }
782};
783
784void Journaler::_issue_prezero()
785{
786 assert(prezeroing_pos >= flush_pos);
787
788 // we need to zero at least two periods, minimum, to ensure that we
789 // have a full empty object/period in front of us.
790 uint64_t num_periods = MAX(2, cct->_conf->journaler_prezero_periods);
791
792 /*
793 * issue zero requests based on write_pos, even though the invariant
794 * is that we zero ahead of flush_pos.
795 */
796 uint64_t period = get_layout_period();
797 uint64_t to = write_pos + period * num_periods + period - 1;
798 to -= to % period;
799
800 if (prezeroing_pos >= to) {
801 ldout(cct, 20) << "_issue_prezero target " << to << " <= prezeroing_pos "
802 << prezeroing_pos << dendl;
803 return;
804 }
805
806 while (prezeroing_pos < to) {
807 uint64_t len;
808 if (prezeroing_pos % period == 0) {
809 len = period;
810 ldout(cct, 10) << "_issue_prezero removing " << prezeroing_pos << "~"
811 << period << " (full period)" << dendl;
812 } else {
813 len = period - (prezeroing_pos % period);
814 ldout(cct, 10) << "_issue_prezero zeroing " << prezeroing_pos << "~"
815 << len << " (partial period)" << dendl;
816 }
817 SnapContext snapc;
818 Context *c = wrap_finisher(new C_Journaler_Prezero(this, prezeroing_pos,
819 len));
820 filer.zero(ino, &layout, snapc, prezeroing_pos, len,
821 ceph::real_clock::now(), 0, c);
822 prezeroing_pos += len;
823 }
824}
825
826// Lock cycle because we get called out of objecter callback (holding
827// objecter read lock), but there are also cases where we take the journaler
828// lock before calling into objecter to do I/O.
829void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len)
830{
831 lock_guard l(lock);
832
833 ldout(cct, 10) << "_prezeroed to " << start << "~" << len
834 << ", prezeroing/prezero was " << prezeroing_pos << "/"
835 << prezero_pos << ", pending " << pending_zero
836 << dendl;
837 if (r < 0 && r != -ENOENT) {
838 lderr(cct) << "_prezeroed got " << cpp_strerror(r) << dendl;
839 handle_write_error(r);
840 return;
841 }
842
843 assert(r == 0 || r == -ENOENT);
844
845 if (start == prezero_pos) {
846 prezero_pos += len;
847 while (!pending_zero.empty() &&
848 pending_zero.begin().get_start() == prezero_pos) {
849 interval_set<uint64_t>::iterator b(pending_zero.begin());
850 prezero_pos += b.get_len();
851 pending_zero.erase(b);
852 }
853
94b18763
FG
854 if (waiting_for_zero_pos > flush_pos) {
855 _do_flush(waiting_for_zero_pos - flush_pos);
7c673cae 856 }
28e407b8
AA
857
858 if (prezero_pos == prezeroing_pos &&
859 !waitfor_prezero.empty()) {
860 list<Context*> ls;
861 ls.swap(waitfor_prezero);
862 finish_contexts(cct, ls, 0);
863 }
7c673cae
FG
864 } else {
865 pending_zero.insert(start, len);
866 }
867 ldout(cct, 10) << "_prezeroed prezeroing/prezero now " << prezeroing_pos
868 << "/" << prezero_pos
869 << ", pending " << pending_zero
870 << dendl;
871}
872
28e407b8
AA
873void Journaler::wait_for_prezero(Context *onfinish)
874{
875 assert(onfinish);
876 lock_guard l(lock);
877
878 if (prezero_pos == prezeroing_pos) {
879 finisher->queue(onfinish, 0);
880 return;
881 }
882 waitfor_prezero.push_back(wrap_finisher(onfinish));
883}
7c673cae
FG
884
885
886/***************** READING *******************/
887
888
889class Journaler::C_Read : public Context {
890 Journaler *ls;
891 uint64_t offset;
892 uint64_t length;
893public:
894 bufferlist bl;
895 C_Read(Journaler *j, uint64_t o, uint64_t l) : ls(j), offset(o), length(l) {}
896 void finish(int r) override {
897 ls->_finish_read(r, offset, length, bl);
898 }
899};
900
901class Journaler::C_RetryRead : public Context {
902 Journaler *ls;
903public:
904 explicit C_RetryRead(Journaler *l) : ls(l) {}
905
906 void finish(int r) override {
907 // Should only be called from waitfor_safe i.e. already inside lock
908 // (ls->lock is locked
909 ls->_prefetch();
910 }
911};
912
913void Journaler::_finish_read(int r, uint64_t offset, uint64_t length,
914 bufferlist& bl)
915{
916 lock_guard l(lock);
917
918 if (r < 0) {
919 ldout(cct, 0) << "_finish_read got error " << r << dendl;
920 error = r;
921 } else {
922 ldout(cct, 10) << "_finish_read got " << offset << "~" << bl.length()
923 << dendl;
924 if (bl.length() < length) {
925 ldout(cct, 0) << "_finish_read got less than expected (" << length << ")"
926 << dendl;
927 error = -EINVAL;
928 }
929 }
930
931 if (error) {
932 if (on_readable) {
933 C_OnFinisher *f = on_readable;
934 on_readable = 0;
935 f->complete(error);
936 }
937 return;
938 }
939
940 prefetch_buf[offset].swap(bl);
941
942 try {
943 _assimilate_prefetch();
944 } catch (const buffer::error &err) {
945 lderr(cct) << "_decode error from assimilate_prefetch" << dendl;
946 error = -EINVAL;
947 if (on_readable) {
948 C_OnFinisher *f = on_readable;
949 on_readable = 0;
950 f->complete(error);
951 }
952 return;
953 }
954 _prefetch();
955}
956
957void Journaler::_assimilate_prefetch()
958{
959 bool was_readable = readable;
960
961 bool got_any = false;
962 while (!prefetch_buf.empty()) {
963 map<uint64_t,bufferlist>::iterator p = prefetch_buf.begin();
964 if (p->first != received_pos) {
965 uint64_t gap = p->first - received_pos;
966 ldout(cct, 10) << "_assimilate_prefetch gap of " << gap
967 << " from received_pos " << received_pos
968 << " to first prefetched buffer " << p->first << dendl;
969 break;
970 }
971
972 ldout(cct, 10) << "_assimilate_prefetch " << p->first << "~"
973 << p->second.length() << dendl;
974 received_pos += p->second.length();
975 read_buf.claim_append(p->second);
976 assert(received_pos <= requested_pos);
977 prefetch_buf.erase(p);
978 got_any = true;
979 }
980
981 if (got_any) {
982 ldout(cct, 10) << "_assimilate_prefetch read_buf now " << read_pos << "~"
983 << read_buf.length() << ", read pointers " << read_pos
984 << "/" << received_pos << "/" << requested_pos
985 << dendl;
986
987 // Update readability (this will also hit any decode errors resulting
988 // from bad data)
989 readable = _is_readable();
990 }
991
992 if ((got_any && !was_readable && readable) || read_pos == write_pos) {
993 // readable!
994 ldout(cct, 10) << "_finish_read now readable (or at journal end) readable="
995 << readable << " read_pos=" << read_pos << " write_pos="
996 << write_pos << dendl;
997 if (on_readable) {
998 C_OnFinisher *f = on_readable;
999 on_readable = 0;
1000 f->complete(0);
1001 }
1002 }
1003}
1004
1005void Journaler::_issue_read(uint64_t len)
1006{
1007 // stuck at safe_pos? (this is needed if we are reading the tail of
1008 // a journal we are also writing to)
1009 assert(requested_pos <= safe_pos);
1010 if (requested_pos == safe_pos) {
1011 ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos
1012 << ", waiting" << dendl;
1013 assert(write_pos > requested_pos);
1014 if (pending_safe.empty()) {
1015 _flush(NULL);
1016 }
31f18b77
FG
1017
1018 // Make sure keys of waitfor_safe map are journal entry boundaries.
1019 // The key we used here is either next_safe_pos or old value of
1020 // next_safe_pos. next_safe_pos is always set to journal entry
1021 // boundary.
1022 auto p = pending_safe.rbegin();
1023 if (p != pending_safe.rend())
1024 waitfor_safe[p->second].push_back(new C_RetryRead(this));
1025 else
1026 waitfor_safe[next_safe_pos].push_back(new C_RetryRead(this));
7c673cae
FG
1027 return;
1028 }
1029
1030 // don't read too much
1031 if (requested_pos + len > safe_pos) {
1032 len = safe_pos - requested_pos;
1033 ldout(cct, 10) << "_issue_read reading only up to safe_pos " << safe_pos
1034 << dendl;
1035 }
1036
1037 // go.
1038 ldout(cct, 10) << "_issue_read reading " << requested_pos << "~" << len
1039 << ", read pointers " << read_pos << "/" << received_pos
1040 << "/" << (requested_pos+len) << dendl;
1041
1042 // step by period (object). _don't_ do a single big filer.read()
1043 // here because it will wait for all object reads to complete before
1044 // giving us back any data. this way we can process whatever bits
1045 // come in that are contiguous.
1046 uint64_t period = get_layout_period();
1047 while (len > 0) {
1048 uint64_t e = requested_pos + period;
1049 e -= e % period;
1050 uint64_t l = e - requested_pos;
1051 if (l > len)
1052 l = len;
1053 C_Read *c = new C_Read(this, requested_pos, l);
1054 filer.read(ino, &layout, CEPH_NOSNAP, requested_pos, l, &c->bl, 0,
1055 wrap_finisher(c), CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1056 requested_pos += l;
1057 len -= l;
1058 }
1059}
1060
1061void Journaler::_prefetch()
1062{
b32b8144
FG
1063 if (is_stopping())
1064 return;
1065
7c673cae
FG
1066 ldout(cct, 10) << "_prefetch" << dendl;
1067 // prefetch
1068 uint64_t pf;
1069 if (temp_fetch_len) {
1070 ldout(cct, 10) << "_prefetch temp_fetch_len " << temp_fetch_len << dendl;
1071 pf = temp_fetch_len;
1072 temp_fetch_len = 0;
1073 } else {
1074 pf = fetch_len;
1075 }
1076
1077 uint64_t raw_target = read_pos + pf;
1078
1079 // read full log segments, so increase if necessary
1080 uint64_t period = get_layout_period();
1081 uint64_t remainder = raw_target % period;
1082 uint64_t adjustment = remainder ? period - remainder : 0;
1083 uint64_t target = raw_target + adjustment;
1084
1085 // don't read past the log tail
1086 if (target > write_pos)
1087 target = write_pos;
1088
1089 if (requested_pos < target) {
1090 uint64_t len = target - requested_pos;
1091 ldout(cct, 10) << "_prefetch " << pf << " requested_pos " << requested_pos
1092 << " < target " << target << " (" << raw_target
1093 << "), prefetching " << len << dendl;
1094
1095 if (pending_safe.empty() && write_pos > safe_pos) {
1096 // If we are reading and writing the journal, then we may need
1097 // to issue a flush if one isn't already in progress.
1098 // Avoid doing a flush every time so that if we do write/read/write/read
1099 // we don't end up flushing after every write.
1100 ldout(cct, 10) << "_prefetch: requested_pos=" << requested_pos
1101 << ", read_pos=" << read_pos
1102 << ", write_pos=" << write_pos
1103 << ", safe_pos=" << safe_pos << dendl;
1104 _do_flush();
1105 }
1106
1107 _issue_read(len);
1108 }
1109}
1110
1111
1112/*
1113 * _is_readable() - return true if next entry is ready.
1114 */
1115bool Journaler::_is_readable()
1116{
1117 // anything to read?
1118 if (read_pos == write_pos)
1119 return false;
1120
1121 // Check if the retrieve bytestream has enough for an entry
1122 uint64_t need;
1123 if (journal_stream.readable(read_buf, &need)) {
1124 return true;
1125 }
1126
1127 ldout (cct, 10) << "_is_readable read_buf.length() == " << read_buf.length()
1128 << ", but need " << need << " for next entry; fetch_len is "
1129 << fetch_len << dendl;
1130
1131 // partial fragment at the end?
1132 if (received_pos == write_pos) {
1133 ldout(cct, 10) << "is_readable() detected partial entry at tail, "
1134 "adjusting write_pos to " << read_pos << dendl;
1135
1136 // adjust write_pos
1137 prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos;
1138 assert(write_buf.length() == 0);
31f18b77 1139 assert(waitfor_safe.empty());
7c673cae
FG
1140
1141 // reset read state
1142 requested_pos = received_pos = read_pos;
1143 read_buf.clear();
1144
1145 // FIXME: truncate on disk?
1146
1147 return false;
1148 }
1149
1150 if (need > fetch_len) {
1151 temp_fetch_len = need;
1152 ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len
1153 << dendl;
1154 }
1155
1156 ldout(cct, 10) << "_is_readable: not readable, returning false" << dendl;
1157 return false;
1158}
1159
1160/*
1161 * is_readable() - kickstart prefetch, too
1162 */
1163bool Journaler::is_readable()
1164{
1165 lock_guard l(lock);
1166
1167 if (error != 0) {
1168 return false;
1169 }
1170
1171 bool r = readable;
1172 _prefetch();
1173 return r;
1174}
1175
1176class Journaler::C_EraseFinish : public Context {
1177 Journaler *journaler;
1178 C_OnFinisher *completion;
1179 public:
1180 C_EraseFinish(Journaler *j, C_OnFinisher *c) : journaler(j), completion(c) {}
1181 void finish(int r) override {
1182 journaler->_finish_erase(r, completion);
1183 }
1184};
1185
1186/**
1187 * Entirely erase the journal, including header. For use when you
1188 * have already made a copy of the journal somewhere else.
1189 */
1190void Journaler::erase(Context *completion)
1191{
1192 lock_guard l(lock);
1193
1194 // Async delete the journal data
1195 uint64_t first = trimmed_pos / get_layout_period();
1196 uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2;
1197 filer.purge_range(ino, &layout, SnapContext(), first, num,
1198 ceph::real_clock::now(), 0,
1199 wrap_finisher(new C_EraseFinish(
1200 this, wrap_finisher(completion))));
1201
1202 // We will not start the operation to delete the header until
1203 // _finish_erase has seen the data deletion succeed: otherwise if
1204 // there was an error deleting data we might prematurely delete the
1205 // header thereby lose our reference to the data.
1206}
1207
1208void Journaler::_finish_erase(int data_result, C_OnFinisher *completion)
1209{
1210 lock_guard l(lock);
b32b8144
FG
1211 if (is_stopping()) {
1212 completion->complete(-EAGAIN);
1213 return;
1214 }
7c673cae
FG
1215
1216 if (data_result == 0) {
1217 // Async delete the journal header
1218 filer.purge_range(ino, &layout, SnapContext(), 0, 1,
1219 ceph::real_clock::now(),
1220 0, wrap_finisher(completion));
1221 } else {
1222 lderr(cct) << "Failed to delete journal " << ino << " data: "
1223 << cpp_strerror(data_result) << dendl;
1224 completion->complete(data_result);
1225 }
1226}
1227
1228/* try_read_entry(bl)
1229 * read entry into bl if it's ready.
1230 * otherwise, do nothing.
1231 */
1232bool Journaler::try_read_entry(bufferlist& bl)
1233{
1234 lock_guard l(lock);
1235
1236 if (!readable) {
1237 ldout(cct, 10) << "try_read_entry at " << read_pos << " not readable"
1238 << dendl;
1239 return false;
1240 }
1241
1242 uint64_t start_ptr;
1243 size_t consumed;
1244 try {
1245 consumed = journal_stream.read(read_buf, &bl, &start_ptr);
1246 if (stream_format >= JOURNAL_FORMAT_RESILIENT) {
1247 assert(start_ptr == read_pos);
1248 }
1249 } catch (const buffer::error &e) {
1250 lderr(cct) << __func__ << ": decode error from journal_stream" << dendl;
1251 error = -EINVAL;
1252 return false;
1253 }
1254
1255 ldout(cct, 10) << "try_read_entry at " << read_pos << " read "
1256 << read_pos << "~" << consumed << " (have "
1257 << read_buf.length() << ")" << dendl;
1258
1259 read_pos += consumed;
1260 try {
1261 // We were readable, we might not be any more
1262 readable = _is_readable();
1263 } catch (const buffer::error &e) {
1264 lderr(cct) << __func__ << ": decode error from _is_readable" << dendl;
1265 error = -EINVAL;
1266 return false;
1267 }
1268
1269 // prefetch?
1270 _prefetch();
f64942e4
AA
1271
1272 // If bufferlist consists of discontiguous memory, decoding types whose
1273 // denc_traits needs contiguous memory is inefficient. The bufferlist may
1274 // get copied to temporary memory multiple times (copy_shallow() in
1275 // src/include/denc.h actually does deep copy)
1276 if (bl.get_num_buffers() > 1)
1277 bl.rebuild();
7c673cae
FG
1278 return true;
1279}
1280
1281void Journaler::wait_for_readable(Context *onreadable)
1282{
1283 lock_guard l(lock);
b32b8144 1284 if (is_stopping()) {
31f18b77 1285 finisher->queue(onreadable, -EAGAIN);
7c673cae
FG
1286 return;
1287 }
1288
1289 assert(on_readable == 0);
1290 if (!readable) {
1291 ldout(cct, 10) << "wait_for_readable at " << read_pos << " onreadable "
1292 << onreadable << dendl;
1293 on_readable = wrap_finisher(onreadable);
1294 } else {
1295 // race with OSD reply
1296 finisher->queue(onreadable, 0);
1297 }
1298}
1299
1300bool Journaler::have_waiter() const
1301{
1302 return on_readable != nullptr;
1303}
1304
1305
1306
1307
1308/***************** TRIMMING *******************/
1309
1310
1311class Journaler::C_Trim : public Context {
1312 Journaler *ls;
1313 uint64_t to;
1314public:
1315 C_Trim(Journaler *l, int64_t t) : ls(l), to(t) {}
1316 void finish(int r) override {
1317 ls->_finish_trim(r, to);
1318 }
1319};
1320
1321void Journaler::trim()
1322{
1323 lock_guard l(lock);
1324 _trim();
1325}
1326
1327void Journaler::_trim()
1328{
b32b8144
FG
1329 if (is_stopping())
1330 return;
1331
7c673cae
FG
1332 assert(!readonly);
1333 uint64_t period = get_layout_period();
1334 uint64_t trim_to = last_committed.expire_pos;
1335 trim_to -= trim_to % period;
1336 ldout(cct, 10) << "trim last_commited head was " << last_committed
1337 << ", can trim to " << trim_to
1338 << dendl;
1339 if (trim_to == 0 || trim_to == trimming_pos) {
1340 ldout(cct, 10) << "trim already trimmed/trimming to "
1341 << trimmed_pos << "/" << trimming_pos << dendl;
1342 return;
1343 }
1344
1345 if (trimming_pos > trimmed_pos) {
1346 ldout(cct, 10) << "trim already trimming atm, try again later. "
1347 "trimmed/trimming is " << trimmed_pos << "/" << trimming_pos << dendl;
1348 return;
1349 }
1350
1351 // trim
1352 assert(trim_to <= write_pos);
1353 assert(trim_to <= expire_pos);
1354 assert(trim_to > trimming_pos);
1355 ldout(cct, 10) << "trim trimming to " << trim_to
1356 << ", trimmed/trimming/expire are "
1357 << trimmed_pos << "/" << trimming_pos << "/" << expire_pos
1358 << dendl;
1359
1360 // delete range of objects
1361 uint64_t first = trimming_pos / period;
1362 uint64_t num = (trim_to - trimming_pos) / period;
1363 SnapContext snapc;
1364 filer.purge_range(ino, &layout, snapc, first, num,
1365 ceph::real_clock::now(), 0,
1366 wrap_finisher(new C_Trim(this, trim_to)));
1367 trimming_pos = trim_to;
1368}
1369
1370void Journaler::_finish_trim(int r, uint64_t to)
1371{
1372 lock_guard l(lock);
1373
1374 assert(!readonly);
1375 ldout(cct, 10) << "_finish_trim trimmed_pos was " << trimmed_pos
1376 << ", trimmed/trimming/expire now "
1377 << to << "/" << trimming_pos << "/" << expire_pos
1378 << dendl;
1379 if (r < 0 && r != -ENOENT) {
1380 lderr(cct) << "_finish_trim got " << cpp_strerror(r) << dendl;
1381 handle_write_error(r);
1382 return;
1383 }
1384
1385 assert(r >= 0 || r == -ENOENT);
1386
1387 assert(to <= trimming_pos);
1388 assert(to > trimmed_pos);
1389 trimmed_pos = to;
1390}
1391
1392void Journaler::handle_write_error(int r)
1393{
1394 // lock is locked
1395
1396 lderr(cct) << "handle_write_error " << cpp_strerror(r) << dendl;
1397 if (on_write_error) {
1398 on_write_error->complete(r);
1399 on_write_error = NULL;
1400 called_write_error = true;
1401 } else if (called_write_error) {
1402 /* We don't call error handler more than once, subsequent errors
1403 * are dropped -- this is okay as long as the error handler does
1404 * something dramatic like respawn */
1405 lderr(cct) << __func__ << ": multiple write errors, handler already called"
1406 << dendl;
1407 } else {
1408 assert(0 == "unhandled write error");
1409 }
1410}
1411
1412
1413/**
1414 * Test whether the 'read_buf' byte stream has enough data to read
1415 * an entry
1416 *
1417 * sets 'next_envelope_size' to the number of bytes needed to advance (enough
1418 * to get the next header if header was unavailable, or enough to get the whole
1419 * next entry if the header was available but the body wasn't).
1420 */
1421bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const
1422{
1423 assert(need != NULL);
1424
1425 uint32_t entry_size = 0;
1426 uint64_t entry_sentinel = 0;
1427 bufferlist::iterator p = read_buf.begin();
1428
1429 // Do we have enough data to decode an entry prefix?
1430 if (format >= JOURNAL_FORMAT_RESILIENT) {
1431 *need = sizeof(entry_size) + sizeof(entry_sentinel);
1432 } else {
1433 *need = sizeof(entry_size);
1434 }
1435 if (read_buf.length() >= *need) {
1436 if (format >= JOURNAL_FORMAT_RESILIENT) {
1437 ::decode(entry_sentinel, p);
1438 if (entry_sentinel != sentinel) {
1439 throw buffer::malformed_input("Invalid sentinel");
1440 }
1441 }
1442
1443 ::decode(entry_size, p);
1444 } else {
1445 return false;
1446 }
1447
1448 // Do we have enough data to decode an entry prefix, payload and suffix?
1449 if (format >= JOURNAL_FORMAT_RESILIENT) {
1450 *need = JOURNAL_ENVELOPE_RESILIENT + entry_size;
1451 } else {
1452 *need = JOURNAL_ENVELOPE_LEGACY + entry_size;
1453 }
1454 if (read_buf.length() >= *need) {
1455 return true; // No more bytes needed
1456 }
1457
1458 return false;
1459}
1460
1461
1462/**
1463 * Consume one entry from a journal byte stream 'from', splicing a
1464 * serialized LogEvent blob into 'entry'.
1465 *
1466 * 'entry' must be non null and point to an empty bufferlist.
1467 *
1468 * 'from' must contain sufficient valid data (i.e. readable is true).
1469 *
1470 * 'start_ptr' will be set to the entry's start pointer, if the collection
1471 * format provides it. It may not be null.
1472 *
1473 * @returns The number of bytes consumed from the `from` byte stream. Note
1474 * that this is not equal to the length of `entry`, which contains
1475 * the inner serialized LogEvent and not the envelope.
1476 */
1477size_t JournalStream::read(bufferlist &from, bufferlist *entry,
1478 uint64_t *start_ptr)
1479{
1480 assert(start_ptr != NULL);
1481 assert(entry != NULL);
1482 assert(entry->length() == 0);
1483
1484 uint32_t entry_size = 0;
1485
1486 // Consume envelope prefix: entry_size and entry_sentinel
1487 bufferlist::iterator from_ptr = from.begin();
1488 if (format >= JOURNAL_FORMAT_RESILIENT) {
1489 uint64_t entry_sentinel = 0;
1490 ::decode(entry_sentinel, from_ptr);
1491 // Assertion instead of clean check because of precondition of this
1492 // fn is that readable() already passed
1493 assert(entry_sentinel == sentinel);
1494 }
1495 ::decode(entry_size, from_ptr);
1496
1497 // Read out the payload
1498 from_ptr.copy(entry_size, *entry);
1499
1500 // Consume the envelope suffix (start_ptr)
1501 if (format >= JOURNAL_FORMAT_RESILIENT) {
1502 ::decode(*start_ptr, from_ptr);
1503 } else {
1504 *start_ptr = 0;
1505 }
1506
1507 // Trim the input buffer to discard the bytes we have consumed
1508 from.splice(0, from_ptr.get_off());
1509
1510 return from_ptr.get_off();
1511}
1512
1513
1514/**
1515 * Append one entry
1516 */
1517size_t JournalStream::write(bufferlist &entry, bufferlist *to,
1518 uint64_t const &start_ptr)
1519{
1520 assert(to != NULL);
1521
1522 uint32_t const entry_size = entry.length();
1523 if (format >= JOURNAL_FORMAT_RESILIENT) {
1524 ::encode(sentinel, *to);
1525 }
1526 ::encode(entry_size, *to);
1527 to->claim_append(entry);
1528 if (format >= JOURNAL_FORMAT_RESILIENT) {
1529 ::encode(start_ptr, *to);
1530 }
1531
1532 if (format >= JOURNAL_FORMAT_RESILIENT) {
1533 return JOURNAL_ENVELOPE_RESILIENT + entry_size;
1534 } else {
1535 return JOURNAL_ENVELOPE_LEGACY + entry_size;
1536 }
1537}
1538
1539/**
1540 * set write error callback
1541 *
1542 * Set a callback/context to trigger if we get a write error from
1543 * the objecter. This may be from an explicit request (e.g., flush)
1544 * or something async the journaler did on its own (e.g., journal
1545 * header update).
1546 *
1547 * It is only used once; if the caller continues to use the
1548 * Journaler and wants to hear about errors, it needs to reset the
1549 * error_handler.
1550 *
1551 * @param c callback/context to trigger on error
1552 */
1553void Journaler::set_write_error_handler(Context *c) {
1554 lock_guard l(lock);
1555 assert(!on_write_error);
1556 on_write_error = wrap_finisher(c);
1557 called_write_error = false;
1558}
1559
1560
1561/**
1562 * Wrap a context in a C_OnFinisher, if it is non-NULL
1563 *
1564 * Utility function to avoid lots of error-prone and verbose
1565 * NULL checking on contexts passed in.
1566 */
1567C_OnFinisher *Journaler::wrap_finisher(Context *c)
1568{
1569 if (c != NULL) {
1570 return new C_OnFinisher(c, finisher);
1571 } else {
1572 return NULL;
1573 }
1574}
1575
1576void Journaler::shutdown()
1577{
1578 lock_guard l(lock);
1579
1580 ldout(cct, 1) << __func__ << dendl;
1581
b32b8144 1582 state = STATE_STOPPING;
7c673cae 1583 readable = false;
7c673cae
FG
1584
1585 // Kick out anyone reading from journal
1586 error = -EAGAIN;
1587 if (on_readable) {
1588 C_OnFinisher *f = on_readable;
1589 on_readable = 0;
1590 f->complete(-EAGAIN);
1591 }
1592
b32b8144
FG
1593 list<Context*> ls;
1594 ls.swap(waitfor_recover);
1595 finish_contexts(cct, ls, -ESHUTDOWN);
7c673cae
FG
1596
1597 std::map<uint64_t, std::list<Context*> >::iterator i;
1598 for (i = waitfor_safe.begin(); i != waitfor_safe.end(); ++i) {
1599 finish_contexts(cct, i->second, -EAGAIN);
1600 }
1601 waitfor_safe.clear();
1602}
1603