]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/Journaler.cc
buildsys: switch source download to quincy
[ceph.git] / ceph / src / osdc / Journaler.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "common/perf_counters.h"
16#include "common/dout.h"
17#include "include/Context.h"
18#include "msg/Messenger.h"
19#include "osdc/Journaler.h"
20#include "common/errno.h"
11fdf7f2 21#include "include/ceph_assert.h"
7c673cae
FG
22#include "common/Finisher.h"
23
24#define dout_subsys ceph_subsys_journaler
25#undef dout_prefix
26#define dout_prefix *_dout << objecter->messenger->get_myname() \
27 << ".journaler." << name << (readonly ? "(ro) ":"(rw) ")
28
29using std::chrono::seconds;
30
31
32class Journaler::C_DelayFlush : public Context {
33 Journaler *journaler;
34 public:
11fdf7f2 35 explicit C_DelayFlush(Journaler *j) : journaler(j) {}
7c673cae
FG
36 void finish(int r) override {
37 journaler->_do_delayed_flush();
38 }
39};
40
41void Journaler::set_readonly()
42{
43 lock_guard l(lock);
44
45 ldout(cct, 1) << "set_readonly" << dendl;
46 readonly = true;
47}
48
49void Journaler::set_writeable()
50{
51 lock_guard l(lock);
52
53 ldout(cct, 1) << "set_writeable" << dendl;
54 readonly = false;
55}
56
57void Journaler::create(file_layout_t *l, stream_format_t const sf)
58{
59 lock_guard lk(lock);
60
11fdf7f2 61 ceph_assert(!readonly);
7c673cae
FG
62 state = STATE_ACTIVE;
63
64 stream_format = sf;
65 journal_stream.set_format(sf);
66 _set_layout(l);
67
68 prezeroing_pos = prezero_pos = write_pos = flush_pos =
69 safe_pos = read_pos = requested_pos = received_pos =
70 expire_pos = trimming_pos = trimmed_pos =
71 next_safe_pos = layout.get_period();
72
73 ldout(cct, 1) << "created blank journal at inode 0x" << std::hex << ino
74 << std::dec << ", format=" << stream_format << dendl;
75}
76
77void Journaler::set_layout(file_layout_t const *l)
78{
79 lock_guard lk(lock);
80 _set_layout(l);
81}
82
83void Journaler::_set_layout(file_layout_t const *l)
84{
85 layout = *l;
86
b32b8144
FG
87 if (layout.pool_id != pg_pool) {
88 // user can reset pool id through cephfs-journal-tool
89 lderr(cct) << "may got older pool id from header layout" << dendl;
90 ceph_abort();
91 }
7c673cae
FG
92 last_written.layout = layout;
93 last_committed.layout = layout;
94
95 // prefetch intelligently.
96 // (watch out, this is big if you use big objects or weird striping)
11fdf7f2 97 uint64_t periods = cct->_conf.get_val<uint64_t>("journaler_prefetch_periods");
7c673cae
FG
98 fetch_len = layout.get_period() * periods;
99}
100
101
102/***************** HEADER *******************/
103
104ostream& operator<<(ostream &out, const Journaler::Header &h)
105{
106 return out << "loghead(trim " << h.trimmed_pos
107 << ", expire " << h.expire_pos
108 << ", write " << h.write_pos
109 << ", stream_format " << (int)(h.stream_format)
110 << ")";
111}
112
113class Journaler::C_ReadHead : public Context {
114 Journaler *ls;
115public:
116 bufferlist bl;
117 explicit C_ReadHead(Journaler *l) : ls(l) {}
118 void finish(int r) override {
119 ls->_finish_read_head(r, bl);
120 }
121};
122
123class Journaler::C_RereadHead : public Context {
124 Journaler *ls;
125 Context *onfinish;
126public:
127 bufferlist bl;
128 C_RereadHead(Journaler *l, Context *onfinish_) : ls (l),
129 onfinish(onfinish_) {}
130 void finish(int r) override {
131 ls->_finish_reread_head(r, bl, onfinish);
132 }
133};
134
135class Journaler::C_ProbeEnd : public Context {
136 Journaler *ls;
137public:
138 uint64_t end;
139 explicit C_ProbeEnd(Journaler *l) : ls(l), end(-1) {}
140 void finish(int r) override {
141 ls->_finish_probe_end(r, end);
142 }
143};
144
145class Journaler::C_ReProbe : public Context {
146 Journaler *ls;
147 C_OnFinisher *onfinish;
148public:
149 uint64_t end;
150 C_ReProbe(Journaler *l, C_OnFinisher *onfinish_) :
151 ls(l), onfinish(onfinish_), end(0) {}
152 void finish(int r) override {
153 ls->_finish_reprobe(r, end, onfinish);
154 }
155};
156
157void Journaler::recover(Context *onread)
158{
159 lock_guard l(lock);
b32b8144 160 if (is_stopping()) {
7c673cae
FG
161 onread->complete(-EAGAIN);
162 return;
163 }
164
165 ldout(cct, 1) << "recover start" << dendl;
11fdf7f2
TL
166 ceph_assert(state != STATE_ACTIVE);
167 ceph_assert(readonly);
7c673cae
FG
168
169 if (onread)
170 waitfor_recover.push_back(wrap_finisher(onread));
171
172 if (state != STATE_UNDEF) {
173 ldout(cct, 1) << "recover - already recovering" << dendl;
174 return;
175 }
176
177 ldout(cct, 1) << "read_head" << dendl;
178 state = STATE_READHEAD;
179 C_ReadHead *fin = new C_ReadHead(this);
180 _read_head(fin, &fin->bl);
181}
182
183void Journaler::_read_head(Context *on_finish, bufferlist *bl)
184{
185 // lock is locked
11fdf7f2 186 ceph_assert(state == STATE_READHEAD || state == STATE_REREADHEAD);
7c673cae
FG
187
188 object_t oid = file_object_t(ino, 0);
189 object_locator_t oloc(pg_pool);
190 objecter->read_full(oid, oloc, CEPH_NOSNAP, bl, 0, wrap_finisher(on_finish));
191}
192
193void Journaler::reread_head(Context *onfinish)
194{
195 lock_guard l(lock);
196 _reread_head(wrap_finisher(onfinish));
197}
198
199/**
200 * Re-read the head from disk, and set the write_pos, expire_pos, trimmed_pos
201 * from the on-disk header. This switches the state to STATE_REREADHEAD for
202 * the duration, and you shouldn't start a re-read while other operations are
203 * in-flight, nor start other operations while a re-read is in progress.
204 * Also, don't call this until the Journaler has finished its recovery and has
205 * gone STATE_ACTIVE!
206 */
207void Journaler::_reread_head(Context *onfinish)
208{
209 ldout(cct, 10) << "reread_head" << dendl;
11fdf7f2 210 ceph_assert(state == STATE_ACTIVE);
7c673cae
FG
211
212 state = STATE_REREADHEAD;
213 C_RereadHead *fin = new C_RereadHead(this, onfinish);
214 _read_head(fin, &fin->bl);
215}
216
217void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
218{
219 lock_guard l(lock);
b32b8144
FG
220 if (is_stopping()) {
221 finish->complete(-EAGAIN);
222 return;
223 }
7c673cae
FG
224
225 //read on-disk header into
11fdf7f2 226 ceph_assert(bl.length() || r < 0 );
7c673cae
FG
227
228 // unpack header
229 if (r == 0) {
230 Header h;
11fdf7f2 231 auto p = bl.cbegin();
7c673cae 232 try {
11fdf7f2 233 decode(h, p);
7c673cae
FG
234 } catch (const buffer::error &e) {
235 finish->complete(-EINVAL);
236 return;
237 }
238 prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos
239 = h.write_pos;
240 expire_pos = h.expire_pos;
241 trimmed_pos = trimming_pos = h.trimmed_pos;
242 init_headers(h);
243 state = STATE_ACTIVE;
244 }
245
246 finish->complete(r);
247}
248
249void Journaler::_finish_read_head(int r, bufferlist& bl)
250{
251 lock_guard l(lock);
b32b8144
FG
252 if (is_stopping())
253 return;
7c673cae 254
11fdf7f2 255 ceph_assert(state == STATE_READHEAD);
7c673cae
FG
256
257 if (r!=0) {
258 ldout(cct, 0) << "error getting journal off disk" << dendl;
259 list<Context*> ls;
260 ls.swap(waitfor_recover);
261 finish_contexts(cct, ls, r);
262 return;
263 }
264
265 if (bl.length() == 0) {
266 ldout(cct, 1) << "_finish_read_head r=" << r
267 << " read 0 bytes, assuming empty log" << dendl;
268 state = STATE_ACTIVE;
269 list<Context*> ls;
270 ls.swap(waitfor_recover);
271 finish_contexts(cct, ls, 0);
272 return;
273 }
274
275 // unpack header
276 bool corrupt = false;
277 Header h;
11fdf7f2 278 auto p = bl.cbegin();
7c673cae 279 try {
11fdf7f2 280 decode(h, p);
7c673cae
FG
281
282 if (h.magic != magic) {
283 ldout(cct, 0) << "on disk magic '" << h.magic << "' != my magic '"
284 << magic << "'" << dendl;
285 corrupt = true;
286 } else if (h.write_pos < h.expire_pos || h.expire_pos < h.trimmed_pos) {
287 ldout(cct, 0) << "Corrupt header (bad offsets): " << h << dendl;
288 corrupt = true;
289 }
290 } catch (const buffer::error &e) {
291 corrupt = true;
292 }
293
294 if (corrupt) {
295 list<Context*> ls;
296 ls.swap(waitfor_recover);
297 finish_contexts(cct, ls, -EINVAL);
298 return;
299 }
300
301 prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos
302 = h.write_pos;
303 read_pos = requested_pos = received_pos = expire_pos = h.expire_pos;
304 trimmed_pos = trimming_pos = h.trimmed_pos;
305
306 init_headers(h);
307 _set_layout(&h.layout);
308 stream_format = h.stream_format;
309 journal_stream.set_format(h.stream_format);
310
311 ldout(cct, 1) << "_finish_read_head " << h
312 << ". probing for end of log (from " << write_pos << ")..."
313 << dendl;
314 C_ProbeEnd *fin = new C_ProbeEnd(this);
315 state = STATE_PROBING;
316 _probe(fin, &fin->end);
317}
318
319void Journaler::_probe(Context *finish, uint64_t *end)
320{
321 // lock is locked
322 ldout(cct, 1) << "probing for end of the log" << dendl;
11fdf7f2 323 ceph_assert(state == STATE_PROBING || state == STATE_REPROBING);
7c673cae
FG
324 // probe the log
325 filer.probe(ino, &layout, CEPH_NOSNAP,
326 write_pos, end, true, 0, wrap_finisher(finish));
327}
328
329void Journaler::_reprobe(C_OnFinisher *finish)
330{
331 ldout(cct, 10) << "reprobe" << dendl;
11fdf7f2 332 ceph_assert(state == STATE_ACTIVE);
7c673cae
FG
333
334 state = STATE_REPROBING;
335 C_ReProbe *fin = new C_ReProbe(this, finish);
336 _probe(fin, &fin->end);
337}
338
339
340void Journaler::_finish_reprobe(int r, uint64_t new_end,
341 C_OnFinisher *onfinish)
342{
343 lock_guard l(lock);
b32b8144
FG
344 if (is_stopping()) {
345 onfinish->complete(-EAGAIN);
346 return;
347 }
7c673cae 348
11fdf7f2 349 ceph_assert(new_end >= write_pos || r < 0);
7c673cae
FG
350 ldout(cct, 1) << "_finish_reprobe new_end = " << new_end
351 << " (header had " << write_pos << ")."
352 << dendl;
353 prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = new_end;
354 state = STATE_ACTIVE;
355 onfinish->complete(r);
356}
357
358void Journaler::_finish_probe_end(int r, uint64_t end)
359{
360 lock_guard l(lock);
b32b8144
FG
361 if (is_stopping())
362 return;
7c673cae 363
11fdf7f2 364 ceph_assert(state == STATE_PROBING);
7c673cae
FG
365 if (r < 0) { // error in probing
366 goto out;
367 }
368 if (((int64_t)end) == -1) {
369 end = write_pos;
370 ldout(cct, 1) << "_finish_probe_end write_pos = " << end << " (header had "
371 << write_pos << "). log was empty. recovered." << dendl;
372 ceph_abort(); // hrm.
373 } else {
11fdf7f2 374 ceph_assert(end >= write_pos);
7c673cae
FG
375 ldout(cct, 1) << "_finish_probe_end write_pos = " << end
376 << " (header had " << write_pos << "). recovered."
377 << dendl;
378 }
379
380 state = STATE_ACTIVE;
381
382 prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = end;
383
384out:
385 // done.
386 list<Context*> ls;
387 ls.swap(waitfor_recover);
388 finish_contexts(cct, ls, r);
389}
390
391class Journaler::C_RereadHeadProbe : public Context
392{
393 Journaler *ls;
394 C_OnFinisher *final_finish;
395public:
396 C_RereadHeadProbe(Journaler *l, C_OnFinisher *finish) :
397 ls(l), final_finish(finish) {}
398 void finish(int r) override {
399 ls->_finish_reread_head_and_probe(r, final_finish);
400 }
401};
402
403void Journaler::reread_head_and_probe(Context *onfinish)
404{
405 lock_guard l(lock);
406
11fdf7f2 407 ceph_assert(state == STATE_ACTIVE);
7c673cae
FG
408 _reread_head(new C_RereadHeadProbe(this, wrap_finisher(onfinish)));
409}
410
411void Journaler::_finish_reread_head_and_probe(int r, C_OnFinisher *onfinish)
412{
413 // Expect to be called back from finish_reread_head, which already takes lock
414 // lock is locked
b32b8144
FG
415 if (is_stopping()) {
416 onfinish->complete(-EAGAIN);
417 return;
418 }
7c673cae 419
9f95a23c 420 // Let the caller know that the operation has failed or was intentionally
f67539c2
TL
421 // failed since the caller has been blocklisted.
422 if (r == -EBLOCKLISTED) {
9f95a23c
TL
423 onfinish->complete(r);
424 return;
425 }
426
11fdf7f2 427 ceph_assert(!r); //if we get an error, we're boned
7c673cae
FG
428 _reprobe(onfinish);
429}
430
431
432// WRITING
433
434class Journaler::C_WriteHead : public Context {
435public:
436 Journaler *ls;
437 Header h;
438 C_OnFinisher *oncommit;
439 C_WriteHead(Journaler *l, Header& h_, C_OnFinisher *c) : ls(l), h(h_),
440 oncommit(c) {}
441 void finish(int r) override {
442 ls->_finish_write_head(r, h, oncommit);
443 }
444};
445
446void Journaler::write_head(Context *oncommit)
447{
448 lock_guard l(lock);
449 _write_head(oncommit);
450}
451
452
453void Journaler::_write_head(Context *oncommit)
454{
11fdf7f2
TL
455 ceph_assert(!readonly);
456 ceph_assert(state == STATE_ACTIVE);
7c673cae
FG
457 last_written.trimmed_pos = trimmed_pos;
458 last_written.expire_pos = expire_pos;
459 last_written.unused_field = expire_pos;
460 last_written.write_pos = safe_pos;
461 last_written.stream_format = stream_format;
462 ldout(cct, 10) << "write_head " << last_written << dendl;
463
464 // Avoid persisting bad pointers in case of bugs
11fdf7f2
TL
465 ceph_assert(last_written.write_pos >= last_written.expire_pos);
466 ceph_assert(last_written.expire_pos >= last_written.trimmed_pos);
7c673cae
FG
467
468 last_wrote_head = ceph::real_clock::now();
469
470 bufferlist bl;
11fdf7f2 471 encode(last_written, bl);
7c673cae
FG
472 SnapContext snapc;
473
474 object_t oid = file_object_t(ino, 0);
475 object_locator_t oloc(pg_pool);
476 objecter->write_full(oid, oloc, snapc, bl, ceph::real_clock::now(), 0,
477 wrap_finisher(new C_WriteHead(
478 this, last_written,
479 wrap_finisher(oncommit))),
480 0, 0, write_iohint);
481}
482
483void Journaler::_finish_write_head(int r, Header &wrote,
484 C_OnFinisher *oncommit)
485{
486 lock_guard l(lock);
487
488 if (r < 0) {
489 lderr(cct) << "_finish_write_head got " << cpp_strerror(r) << dendl;
490 handle_write_error(r);
491 return;
492 }
11fdf7f2 493 ceph_assert(!readonly);
7c673cae
FG
494 ldout(cct, 10) << "_finish_write_head " << wrote << dendl;
495 last_committed = wrote;
496 if (oncommit) {
497 oncommit->complete(r);
498 }
499
500 _trim(); // trim?
501}
502
503
504/***************** WRITING *******************/
505
506class Journaler::C_Flush : public Context {
507 Journaler *ls;
508 uint64_t start;
509 ceph::real_time stamp;
510public:
511 C_Flush(Journaler *l, int64_t s, ceph::real_time st)
512 : ls(l), start(s), stamp(st) {}
513 void finish(int r) override {
514 ls->_finish_flush(r, start, stamp);
515 }
516};
517
518void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
519{
520 lock_guard l(lock);
11fdf7f2 521 ceph_assert(!readonly);
7c673cae
FG
522
523 if (r < 0) {
524 lderr(cct) << "_finish_flush got " << cpp_strerror(r) << dendl;
525 handle_write_error(r);
526 return;
527 }
528
11fdf7f2 529 ceph_assert(start < flush_pos);
7c673cae
FG
530
531 // calc latency?
532 if (logger) {
533 ceph::timespan lat = ceph::real_clock::now() - stamp;
534 logger->tinc(logger_key_lat, lat);
535 }
536
537 // adjust safe_pos
538 auto it = pending_safe.find(start);
11fdf7f2 539 ceph_assert(it != pending_safe.end());
f64942e4 540 uint64_t min_next_safe_pos = pending_safe.begin()->second;
7c673cae
FG
541 pending_safe.erase(it);
542 if (pending_safe.empty())
543 safe_pos = next_safe_pos;
544 else
f64942e4 545 safe_pos = min_next_safe_pos;
7c673cae
FG
546
547 ldout(cct, 10) << "_finish_flush safe from " << start
548 << ", pending_safe " << pending_safe
549 << ", (prezeroing/prezero)/write/flush/safe positions now "
550 << "(" << prezeroing_pos << "/" << prezero_pos << ")/"
551 << write_pos << "/" << flush_pos << "/" << safe_pos
552 << dendl;
553
554 // kick waiters <= safe_pos
31f18b77
FG
555 if (!waitfor_safe.empty()) {
556 list<Context*> ls;
557 while (!waitfor_safe.empty()) {
558 auto it = waitfor_safe.begin();
559 if (it->first > safe_pos)
560 break;
561 ls.splice(ls.end(), it->second);
562 waitfor_safe.erase(it);
563 }
564 finish_contexts(cct, ls);
7c673cae
FG
565 }
566}
567
568
569
570uint64_t Journaler::append_entry(bufferlist& bl)
571{
572 unique_lock l(lock);
573
11fdf7f2 574 ceph_assert(!readonly);
7c673cae
FG
575 uint32_t s = bl.length();
576
577 // append
578 size_t delta = bl.length() + journal_stream.get_envelope_size();
579 // write_buf space is nearly full
580 if (!write_buf_throttle.get_or_fail(delta)) {
581 l.unlock();
582 ldout(cct, 10) << "write_buf_throttle wait, delta " << delta << dendl;
583 write_buf_throttle.get(delta);
584 l.lock();
585 }
586 ldout(cct, 20) << "write_buf_throttle get, delta " << delta << dendl;
587 size_t wrote = journal_stream.write(bl, &write_buf, write_pos);
588 ldout(cct, 10) << "append_entry len " << s << " to " << write_pos << "~"
589 << wrote << dendl;
590 write_pos += wrote;
591
592 // flush previous object?
593 uint64_t su = get_layout_period();
11fdf7f2 594 ceph_assert(su > 0);
7c673cae
FG
595 uint64_t write_off = write_pos % su;
596 uint64_t write_obj = write_pos / su;
597 uint64_t flush_obj = flush_pos / su;
598 if (write_obj != flush_obj) {
599 ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro "
600 << write_obj << " flo " << flush_obj << ")" << dendl;
601 _do_flush(write_buf.length() - write_off);
31f18b77
FG
602
603 // if _do_flush() skips flushing some data, it does do a best effort to
604 // update next_safe_pos.
605 if (write_buf.length() > 0 &&
606 write_buf.length() <= wrote) { // the unflushed data are within this entry
607 // set next_safe_pos to end of previous entry
7c673cae
FG
608 next_safe_pos = write_pos - wrote;
609 }
610 }
611
612 return write_pos;
613}
614
615
616void Journaler::_do_flush(unsigned amount)
617{
b32b8144
FG
618 if (is_stopping())
619 return;
7c673cae
FG
620 if (write_pos == flush_pos)
621 return;
11fdf7f2
TL
622 ceph_assert(write_pos > flush_pos);
623 ceph_assert(!readonly);
7c673cae
FG
624
625 // flush
626 uint64_t len = write_pos - flush_pos;
11fdf7f2 627 ceph_assert(len == write_buf.length());
7c673cae
FG
628 if (amount && amount < len)
629 len = amount;
630
631 // zero at least two full periods ahead. this ensures
632 // that the next object will not exist.
633 uint64_t period = get_layout_period();
634 if (flush_pos + len + 2*period > prezero_pos) {
635 _issue_prezero();
636
637 int64_t newlen = prezero_pos - flush_pos - period;
638 if (newlen <= 0) {
639 ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len
640 << " already too close to prezero_pos " << prezero_pos
641 << ", zeroing first" << dendl;
94b18763 642 waiting_for_zero_pos = flush_pos + len;
7c673cae
FG
643 return;
644 }
645 if (static_cast<uint64_t>(newlen) < len) {
646 ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len
647 << " but hit prezero_pos " << prezero_pos
648 << ", will do " << flush_pos << "~" << newlen << dendl;
94b18763 649 waiting_for_zero_pos = flush_pos + len;
7c673cae 650 len = newlen;
7c673cae 651 }
7c673cae
FG
652 }
653 ldout(cct, 10) << "_do_flush flushing " << flush_pos << "~" << len << dendl;
654
655 // submit write for anything pending
656 // flush _start_ pos to _finish_flush
657 ceph::real_time now = ceph::real_clock::now();
658 SnapContext snapc;
659
660 Context *onsafe = new C_Flush(this, flush_pos, now); // on COMMIT
661 pending_safe[flush_pos] = next_safe_pos;
662
663 bufferlist write_bl;
664
665 // adjust pointers
666 if (len == write_buf.length()) {
667 write_bl.swap(write_buf);
668 next_safe_pos = write_pos;
669 } else {
670 write_buf.splice(0, len, &write_bl);
31f18b77
FG
671 // Keys of waitfor_safe map are journal entry boundaries.
672 // Try finding a journal entry that we are actually flushing
673 // and set next_safe_pos to end of it. This is best effort.
674 // The one we found may not be the lastest flushing entry.
675 auto p = waitfor_safe.lower_bound(flush_pos + len);
676 if (p != waitfor_safe.end()) {
677 if (p->first > flush_pos + len && p != waitfor_safe.begin())
678 --p;
679 if (p->first <= flush_pos + len && p->first > next_safe_pos)
680 next_safe_pos = p->first;
681 }
7c673cae
FG
682 }
683
684 filer.write(ino, &layout, snapc,
685 flush_pos, len, write_bl, ceph::real_clock::now(),
686 0,
687 wrap_finisher(onsafe), write_iohint);
688
689 flush_pos += len;
11fdf7f2 690 ceph_assert(write_buf.length() == write_pos - flush_pos);
7c673cae
FG
691 write_buf_throttle.put(len);
692 ldout(cct, 20) << "write_buf_throttle put, len " << len << dendl;
693
694 ldout(cct, 10)
695 << "_do_flush (prezeroing/prezero)/write/flush/safe pointers now at "
696 << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos
697 << "/" << flush_pos << "/" << safe_pos << dendl;
698
699 _issue_prezero();
700}
701
702
703void Journaler::wait_for_flush(Context *onsafe)
704{
705 lock_guard l(lock);
b32b8144 706 if (is_stopping()) {
f91f0fd5
TL
707 if (onsafe)
708 onsafe->complete(-EAGAIN);
7c673cae
FG
709 return;
710 }
711 _wait_for_flush(onsafe);
712}
713
714void Journaler::_wait_for_flush(Context *onsafe)
715{
11fdf7f2 716 ceph_assert(!readonly);
7c673cae
FG
717
718 // all flushed and safe?
719 if (write_pos == safe_pos) {
11fdf7f2 720 ceph_assert(write_buf.length() == 0);
7c673cae
FG
721 ldout(cct, 10)
722 << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe "
723 "pointers at " << "(" << prezeroing_pos << "/" << prezero_pos << ")/"
724 << write_pos << "/" << flush_pos << "/" << safe_pos << dendl;
725 if (onsafe) {
726 finisher->queue(onsafe, 0);
727 }
728 return;
729 }
730
731 // queue waiter
732 if (onsafe) {
733 waitfor_safe[write_pos].push_back(wrap_finisher(onsafe));
734 }
735}
736
737void Journaler::flush(Context *onsafe)
738{
739 lock_guard l(lock);
b32b8144 740 if (is_stopping()) {
f91f0fd5
TL
741 if (onsafe)
742 onsafe->complete(-EAGAIN);
b32b8144
FG
743 return;
744 }
7c673cae
FG
745 _flush(wrap_finisher(onsafe));
746}
747
748void Journaler::_flush(C_OnFinisher *onsafe)
749{
11fdf7f2 750 ceph_assert(!readonly);
7c673cae
FG
751
752 if (write_pos == flush_pos) {
11fdf7f2 753 ceph_assert(write_buf.length() == 0);
7c673cae
FG
754 ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/"
755 "flush/safe pointers at " << "(" << prezeroing_pos << "/" << prezero_pos
756 << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos
757 << dendl;
758 if (onsafe) {
759 onsafe->complete(0);
760 }
761 } else {
762 _do_flush();
763 _wait_for_flush(onsafe);
764 }
765
766 // write head?
767 if (_write_head_needed()) {
768 _write_head();
769 }
770}
771
772bool Journaler::_write_head_needed()
773{
11fdf7f2 774 return last_wrote_head + seconds(cct->_conf.get_val<int64_t>("journaler_write_head_interval"))
7c673cae
FG
775 < ceph::real_clock::now();
776}
777
778
779/*************** prezeroing ******************/
780
781struct C_Journaler_Prezero : public Context {
782 Journaler *journaler;
783 uint64_t from, len;
784 C_Journaler_Prezero(Journaler *j, uint64_t f, uint64_t l)
785 : journaler(j), from(f), len(l) {}
786 void finish(int r) override {
787 journaler->_finish_prezero(r, from, len);
788 }
789};
790
791void Journaler::_issue_prezero()
792{
11fdf7f2 793 ceph_assert(prezeroing_pos >= flush_pos);
7c673cae 794
11fdf7f2 795 uint64_t num_periods = cct->_conf.get_val<uint64_t>("journaler_prezero_periods");
7c673cae
FG
796 /*
797 * issue zero requests based on write_pos, even though the invariant
798 * is that we zero ahead of flush_pos.
799 */
800 uint64_t period = get_layout_period();
801 uint64_t to = write_pos + period * num_periods + period - 1;
802 to -= to % period;
803
804 if (prezeroing_pos >= to) {
805 ldout(cct, 20) << "_issue_prezero target " << to << " <= prezeroing_pos "
806 << prezeroing_pos << dendl;
807 return;
808 }
809
810 while (prezeroing_pos < to) {
811 uint64_t len;
812 if (prezeroing_pos % period == 0) {
813 len = period;
814 ldout(cct, 10) << "_issue_prezero removing " << prezeroing_pos << "~"
815 << period << " (full period)" << dendl;
816 } else {
817 len = period - (prezeroing_pos % period);
818 ldout(cct, 10) << "_issue_prezero zeroing " << prezeroing_pos << "~"
819 << len << " (partial period)" << dendl;
820 }
821 SnapContext snapc;
822 Context *c = wrap_finisher(new C_Journaler_Prezero(this, prezeroing_pos,
823 len));
824 filer.zero(ino, &layout, snapc, prezeroing_pos, len,
825 ceph::real_clock::now(), 0, c);
826 prezeroing_pos += len;
827 }
828}
829
830// Lock cycle because we get called out of objecter callback (holding
831// objecter read lock), but there are also cases where we take the journaler
832// lock before calling into objecter to do I/O.
833void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len)
834{
835 lock_guard l(lock);
836
837 ldout(cct, 10) << "_prezeroed to " << start << "~" << len
838 << ", prezeroing/prezero was " << prezeroing_pos << "/"
839 << prezero_pos << ", pending " << pending_zero
840 << dendl;
841 if (r < 0 && r != -ENOENT) {
842 lderr(cct) << "_prezeroed got " << cpp_strerror(r) << dendl;
843 handle_write_error(r);
844 return;
845 }
846
11fdf7f2 847 ceph_assert(r == 0 || r == -ENOENT);
7c673cae
FG
848
849 if (start == prezero_pos) {
850 prezero_pos += len;
851 while (!pending_zero.empty() &&
852 pending_zero.begin().get_start() == prezero_pos) {
853 interval_set<uint64_t>::iterator b(pending_zero.begin());
854 prezero_pos += b.get_len();
855 pending_zero.erase(b);
856 }
857
94b18763
FG
858 if (waiting_for_zero_pos > flush_pos) {
859 _do_flush(waiting_for_zero_pos - flush_pos);
7c673cae 860 }
28e407b8
AA
861
862 if (prezero_pos == prezeroing_pos &&
863 !waitfor_prezero.empty()) {
864 list<Context*> ls;
865 ls.swap(waitfor_prezero);
866 finish_contexts(cct, ls, 0);
867 }
7c673cae
FG
868 } else {
869 pending_zero.insert(start, len);
870 }
871 ldout(cct, 10) << "_prezeroed prezeroing/prezero now " << prezeroing_pos
872 << "/" << prezero_pos
873 << ", pending " << pending_zero
874 << dendl;
875}
876
28e407b8
AA
877void Journaler::wait_for_prezero(Context *onfinish)
878{
11fdf7f2 879 ceph_assert(onfinish);
28e407b8
AA
880 lock_guard l(lock);
881
882 if (prezero_pos == prezeroing_pos) {
883 finisher->queue(onfinish, 0);
884 return;
885 }
886 waitfor_prezero.push_back(wrap_finisher(onfinish));
887}
7c673cae
FG
888
889
890/***************** READING *******************/
891
892
893class Journaler::C_Read : public Context {
894 Journaler *ls;
895 uint64_t offset;
896 uint64_t length;
897public:
898 bufferlist bl;
899 C_Read(Journaler *j, uint64_t o, uint64_t l) : ls(j), offset(o), length(l) {}
900 void finish(int r) override {
901 ls->_finish_read(r, offset, length, bl);
902 }
903};
904
905class Journaler::C_RetryRead : public Context {
906 Journaler *ls;
907public:
908 explicit C_RetryRead(Journaler *l) : ls(l) {}
909
910 void finish(int r) override {
911 // Should only be called from waitfor_safe i.e. already inside lock
912 // (ls->lock is locked
913 ls->_prefetch();
914 }
915};
916
917void Journaler::_finish_read(int r, uint64_t offset, uint64_t length,
918 bufferlist& bl)
919{
920 lock_guard l(lock);
921
922 if (r < 0) {
923 ldout(cct, 0) << "_finish_read got error " << r << dendl;
924 error = r;
925 } else {
926 ldout(cct, 10) << "_finish_read got " << offset << "~" << bl.length()
927 << dendl;
928 if (bl.length() < length) {
929 ldout(cct, 0) << "_finish_read got less than expected (" << length << ")"
930 << dendl;
931 error = -EINVAL;
932 }
933 }
934
935 if (error) {
936 if (on_readable) {
937 C_OnFinisher *f = on_readable;
938 on_readable = 0;
939 f->complete(error);
940 }
941 return;
942 }
943
944 prefetch_buf[offset].swap(bl);
945
946 try {
947 _assimilate_prefetch();
948 } catch (const buffer::error &err) {
949 lderr(cct) << "_decode error from assimilate_prefetch" << dendl;
950 error = -EINVAL;
951 if (on_readable) {
952 C_OnFinisher *f = on_readable;
953 on_readable = 0;
954 f->complete(error);
955 }
956 return;
957 }
958 _prefetch();
959}
960
961void Journaler::_assimilate_prefetch()
962{
963 bool was_readable = readable;
964
965 bool got_any = false;
966 while (!prefetch_buf.empty()) {
967 map<uint64_t,bufferlist>::iterator p = prefetch_buf.begin();
968 if (p->first != received_pos) {
969 uint64_t gap = p->first - received_pos;
970 ldout(cct, 10) << "_assimilate_prefetch gap of " << gap
971 << " from received_pos " << received_pos
972 << " to first prefetched buffer " << p->first << dendl;
973 break;
974 }
975
976 ldout(cct, 10) << "_assimilate_prefetch " << p->first << "~"
977 << p->second.length() << dendl;
978 received_pos += p->second.length();
979 read_buf.claim_append(p->second);
11fdf7f2 980 ceph_assert(received_pos <= requested_pos);
7c673cae
FG
981 prefetch_buf.erase(p);
982 got_any = true;
983 }
984
985 if (got_any) {
986 ldout(cct, 10) << "_assimilate_prefetch read_buf now " << read_pos << "~"
11fdf7f2
TL
987 << read_buf.length() << ", read pointers read_pos=" << read_pos
988 << " received_pos=" << received_pos << " requested_pos=" << requested_pos
7c673cae
FG
989 << dendl;
990
991 // Update readability (this will also hit any decode errors resulting
992 // from bad data)
993 readable = _is_readable();
994 }
995
996 if ((got_any && !was_readable && readable) || read_pos == write_pos) {
997 // readable!
998 ldout(cct, 10) << "_finish_read now readable (or at journal end) readable="
999 << readable << " read_pos=" << read_pos << " write_pos="
1000 << write_pos << dendl;
1001 if (on_readable) {
1002 C_OnFinisher *f = on_readable;
1003 on_readable = 0;
1004 f->complete(0);
1005 }
1006 }
1007}
1008
1009void Journaler::_issue_read(uint64_t len)
1010{
1011 // stuck at safe_pos? (this is needed if we are reading the tail of
1012 // a journal we are also writing to)
11fdf7f2 1013 ceph_assert(requested_pos <= safe_pos);
7c673cae
FG
1014 if (requested_pos == safe_pos) {
1015 ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos
1016 << ", waiting" << dendl;
11fdf7f2 1017 ceph_assert(write_pos > requested_pos);
7c673cae
FG
1018 if (pending_safe.empty()) {
1019 _flush(NULL);
1020 }
31f18b77
FG
1021
1022 // Make sure keys of waitfor_safe map are journal entry boundaries.
1023 // The key we used here is either next_safe_pos or old value of
1024 // next_safe_pos. next_safe_pos is always set to journal entry
1025 // boundary.
1026 auto p = pending_safe.rbegin();
1027 if (p != pending_safe.rend())
1028 waitfor_safe[p->second].push_back(new C_RetryRead(this));
1029 else
1030 waitfor_safe[next_safe_pos].push_back(new C_RetryRead(this));
7c673cae
FG
1031 return;
1032 }
1033
1034 // don't read too much
1035 if (requested_pos + len > safe_pos) {
1036 len = safe_pos - requested_pos;
1037 ldout(cct, 10) << "_issue_read reading only up to safe_pos " << safe_pos
1038 << dendl;
1039 }
1040
1041 // go.
1042 ldout(cct, 10) << "_issue_read reading " << requested_pos << "~" << len
11fdf7f2
TL
1043 << ", read pointers read_pos=" << read_pos << " received_pos=" << received_pos
1044 << " requested_pos+len=" << (requested_pos+len) << dendl;
7c673cae
FG
1045
1046 // step by period (object). _don't_ do a single big filer.read()
1047 // here because it will wait for all object reads to complete before
1048 // giving us back any data. this way we can process whatever bits
1049 // come in that are contiguous.
1050 uint64_t period = get_layout_period();
1051 while (len > 0) {
1052 uint64_t e = requested_pos + period;
1053 e -= e % period;
1054 uint64_t l = e - requested_pos;
1055 if (l > len)
1056 l = len;
1057 C_Read *c = new C_Read(this, requested_pos, l);
1058 filer.read(ino, &layout, CEPH_NOSNAP, requested_pos, l, &c->bl, 0,
1059 wrap_finisher(c), CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1060 requested_pos += l;
1061 len -= l;
1062 }
1063}
1064
1065void Journaler::_prefetch()
1066{
b32b8144
FG
1067 if (is_stopping())
1068 return;
1069
7c673cae
FG
1070 ldout(cct, 10) << "_prefetch" << dendl;
1071 // prefetch
1072 uint64_t pf;
1073 if (temp_fetch_len) {
1074 ldout(cct, 10) << "_prefetch temp_fetch_len " << temp_fetch_len << dendl;
1075 pf = temp_fetch_len;
1076 temp_fetch_len = 0;
1077 } else {
1078 pf = fetch_len;
1079 }
1080
1081 uint64_t raw_target = read_pos + pf;
1082
1083 // read full log segments, so increase if necessary
1084 uint64_t period = get_layout_period();
1085 uint64_t remainder = raw_target % period;
1086 uint64_t adjustment = remainder ? period - remainder : 0;
1087 uint64_t target = raw_target + adjustment;
1088
1089 // don't read past the log tail
1090 if (target > write_pos)
1091 target = write_pos;
1092
1093 if (requested_pos < target) {
1094 uint64_t len = target - requested_pos;
1095 ldout(cct, 10) << "_prefetch " << pf << " requested_pos " << requested_pos
1096 << " < target " << target << " (" << raw_target
1097 << "), prefetching " << len << dendl;
1098
1099 if (pending_safe.empty() && write_pos > safe_pos) {
1100 // If we are reading and writing the journal, then we may need
1101 // to issue a flush if one isn't already in progress.
1102 // Avoid doing a flush every time so that if we do write/read/write/read
1103 // we don't end up flushing after every write.
1104 ldout(cct, 10) << "_prefetch: requested_pos=" << requested_pos
1105 << ", read_pos=" << read_pos
1106 << ", write_pos=" << write_pos
1107 << ", safe_pos=" << safe_pos << dendl;
1108 _do_flush();
1109 }
1110
1111 _issue_read(len);
1112 }
1113}
1114
1115
1116/*
1117 * _is_readable() - return true if next entry is ready.
1118 */
1119bool Journaler::_is_readable()
1120{
1121 // anything to read?
1122 if (read_pos == write_pos)
1123 return false;
1124
1125 // Check if the retrieve bytestream has enough for an entry
1126 uint64_t need;
1127 if (journal_stream.readable(read_buf, &need)) {
1128 return true;
1129 }
1130
1131 ldout (cct, 10) << "_is_readable read_buf.length() == " << read_buf.length()
1132 << ", but need " << need << " for next entry; fetch_len is "
1133 << fetch_len << dendl;
1134
1135 // partial fragment at the end?
1136 if (received_pos == write_pos) {
1137 ldout(cct, 10) << "is_readable() detected partial entry at tail, "
1138 "adjusting write_pos to " << read_pos << dendl;
1139
1140 // adjust write_pos
1141 prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos;
11fdf7f2
TL
1142 ceph_assert(write_buf.length() == 0);
1143 ceph_assert(waitfor_safe.empty());
7c673cae
FG
1144
1145 // reset read state
1146 requested_pos = received_pos = read_pos;
1147 read_buf.clear();
1148
1149 // FIXME: truncate on disk?
1150
1151 return false;
1152 }
1153
1154 if (need > fetch_len) {
1155 temp_fetch_len = need;
1156 ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len
1157 << dendl;
1158 }
1159
1160 ldout(cct, 10) << "_is_readable: not readable, returning false" << dendl;
1161 return false;
1162}
1163
1164/*
1165 * is_readable() - kickstart prefetch, too
1166 */
1167bool Journaler::is_readable()
1168{
1169 lock_guard l(lock);
1170
1171 if (error != 0) {
1172 return false;
1173 }
1174
1175 bool r = readable;
1176 _prefetch();
1177 return r;
1178}
1179
1180class Journaler::C_EraseFinish : public Context {
1181 Journaler *journaler;
1182 C_OnFinisher *completion;
1183 public:
1184 C_EraseFinish(Journaler *j, C_OnFinisher *c) : journaler(j), completion(c) {}
1185 void finish(int r) override {
1186 journaler->_finish_erase(r, completion);
1187 }
1188};
1189
1190/**
1191 * Entirely erase the journal, including header. For use when you
1192 * have already made a copy of the journal somewhere else.
1193 */
1194void Journaler::erase(Context *completion)
1195{
1196 lock_guard l(lock);
1197
1198 // Async delete the journal data
1199 uint64_t first = trimmed_pos / get_layout_period();
1200 uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2;
1201 filer.purge_range(ino, &layout, SnapContext(), first, num,
1202 ceph::real_clock::now(), 0,
1203 wrap_finisher(new C_EraseFinish(
1204 this, wrap_finisher(completion))));
1205
1206 // We will not start the operation to delete the header until
1207 // _finish_erase has seen the data deletion succeed: otherwise if
1208 // there was an error deleting data we might prematurely delete the
1209 // header thereby lose our reference to the data.
1210}
1211
1212void Journaler::_finish_erase(int data_result, C_OnFinisher *completion)
1213{
1214 lock_guard l(lock);
b32b8144
FG
1215 if (is_stopping()) {
1216 completion->complete(-EAGAIN);
1217 return;
1218 }
7c673cae
FG
1219
1220 if (data_result == 0) {
1221 // Async delete the journal header
1222 filer.purge_range(ino, &layout, SnapContext(), 0, 1,
1223 ceph::real_clock::now(),
1224 0, wrap_finisher(completion));
1225 } else {
1226 lderr(cct) << "Failed to delete journal " << ino << " data: "
1227 << cpp_strerror(data_result) << dendl;
1228 completion->complete(data_result);
1229 }
1230}
1231
1232/* try_read_entry(bl)
1233 * read entry into bl if it's ready.
1234 * otherwise, do nothing.
1235 */
1236bool Journaler::try_read_entry(bufferlist& bl)
1237{
1238 lock_guard l(lock);
1239
1240 if (!readable) {
1241 ldout(cct, 10) << "try_read_entry at " << read_pos << " not readable"
1242 << dendl;
1243 return false;
1244 }
1245
1246 uint64_t start_ptr;
1247 size_t consumed;
1248 try {
1249 consumed = journal_stream.read(read_buf, &bl, &start_ptr);
1250 if (stream_format >= JOURNAL_FORMAT_RESILIENT) {
11fdf7f2 1251 ceph_assert(start_ptr == read_pos);
7c673cae
FG
1252 }
1253 } catch (const buffer::error &e) {
1254 lderr(cct) << __func__ << ": decode error from journal_stream" << dendl;
1255 error = -EINVAL;
1256 return false;
1257 }
1258
1259 ldout(cct, 10) << "try_read_entry at " << read_pos << " read "
1260 << read_pos << "~" << consumed << " (have "
1261 << read_buf.length() << ")" << dendl;
1262
1263 read_pos += consumed;
1264 try {
1265 // We were readable, we might not be any more
1266 readable = _is_readable();
1267 } catch (const buffer::error &e) {
1268 lderr(cct) << __func__ << ": decode error from _is_readable" << dendl;
1269 error = -EINVAL;
1270 return false;
1271 }
1272
1273 // prefetch?
1274 _prefetch();
f64942e4
AA
1275
1276 // If bufferlist consists of discontiguous memory, decoding types whose
1277 // denc_traits needs contiguous memory is inefficient. The bufferlist may
1278 // get copied to temporary memory multiple times (copy_shallow() in
1279 // src/include/denc.h actually does deep copy)
1280 if (bl.get_num_buffers() > 1)
1281 bl.rebuild();
7c673cae
FG
1282 return true;
1283}
1284
1285void Journaler::wait_for_readable(Context *onreadable)
1286{
1287 lock_guard l(lock);
b32b8144 1288 if (is_stopping()) {
31f18b77 1289 finisher->queue(onreadable, -EAGAIN);
7c673cae
FG
1290 return;
1291 }
1292
11fdf7f2 1293 ceph_assert(on_readable == 0);
7c673cae
FG
1294 if (!readable) {
1295 ldout(cct, 10) << "wait_for_readable at " << read_pos << " onreadable "
1296 << onreadable << dendl;
1297 on_readable = wrap_finisher(onreadable);
1298 } else {
1299 // race with OSD reply
1300 finisher->queue(onreadable, 0);
1301 }
1302}
1303
1304bool Journaler::have_waiter() const
1305{
1306 return on_readable != nullptr;
1307}
1308
1309
1310
1311
1312/***************** TRIMMING *******************/
1313
1314
1315class Journaler::C_Trim : public Context {
1316 Journaler *ls;
1317 uint64_t to;
1318public:
1319 C_Trim(Journaler *l, int64_t t) : ls(l), to(t) {}
1320 void finish(int r) override {
1321 ls->_finish_trim(r, to);
1322 }
1323};
1324
1325void Journaler::trim()
1326{
1327 lock_guard l(lock);
1328 _trim();
1329}
1330
1331void Journaler::_trim()
1332{
b32b8144
FG
1333 if (is_stopping())
1334 return;
1335
11fdf7f2 1336 ceph_assert(!readonly);
7c673cae
FG
1337 uint64_t period = get_layout_period();
1338 uint64_t trim_to = last_committed.expire_pos;
1339 trim_to -= trim_to % period;
1340 ldout(cct, 10) << "trim last_commited head was " << last_committed
1341 << ", can trim to " << trim_to
1342 << dendl;
1343 if (trim_to == 0 || trim_to == trimming_pos) {
1344 ldout(cct, 10) << "trim already trimmed/trimming to "
1345 << trimmed_pos << "/" << trimming_pos << dendl;
1346 return;
1347 }
1348
1349 if (trimming_pos > trimmed_pos) {
1350 ldout(cct, 10) << "trim already trimming atm, try again later. "
1351 "trimmed/trimming is " << trimmed_pos << "/" << trimming_pos << dendl;
1352 return;
1353 }
1354
1355 // trim
11fdf7f2
TL
1356 ceph_assert(trim_to <= write_pos);
1357 ceph_assert(trim_to <= expire_pos);
1358 ceph_assert(trim_to > trimming_pos);
7c673cae
FG
1359 ldout(cct, 10) << "trim trimming to " << trim_to
1360 << ", trimmed/trimming/expire are "
1361 << trimmed_pos << "/" << trimming_pos << "/" << expire_pos
1362 << dendl;
1363
1364 // delete range of objects
1365 uint64_t first = trimming_pos / period;
1366 uint64_t num = (trim_to - trimming_pos) / period;
1367 SnapContext snapc;
1368 filer.purge_range(ino, &layout, snapc, first, num,
1369 ceph::real_clock::now(), 0,
1370 wrap_finisher(new C_Trim(this, trim_to)));
1371 trimming_pos = trim_to;
1372}
1373
1374void Journaler::_finish_trim(int r, uint64_t to)
1375{
1376 lock_guard l(lock);
1377
11fdf7f2 1378 ceph_assert(!readonly);
7c673cae
FG
1379 ldout(cct, 10) << "_finish_trim trimmed_pos was " << trimmed_pos
1380 << ", trimmed/trimming/expire now "
1381 << to << "/" << trimming_pos << "/" << expire_pos
1382 << dendl;
1383 if (r < 0 && r != -ENOENT) {
1384 lderr(cct) << "_finish_trim got " << cpp_strerror(r) << dendl;
1385 handle_write_error(r);
1386 return;
1387 }
1388
11fdf7f2 1389 ceph_assert(r >= 0 || r == -ENOENT);
7c673cae 1390
11fdf7f2
TL
1391 ceph_assert(to <= trimming_pos);
1392 ceph_assert(to > trimmed_pos);
7c673cae
FG
1393 trimmed_pos = to;
1394}
1395
1396void Journaler::handle_write_error(int r)
1397{
1398 // lock is locked
1399
1400 lderr(cct) << "handle_write_error " << cpp_strerror(r) << dendl;
1401 if (on_write_error) {
1402 on_write_error->complete(r);
1403 on_write_error = NULL;
1404 called_write_error = true;
1405 } else if (called_write_error) {
1406 /* We don't call error handler more than once, subsequent errors
1407 * are dropped -- this is okay as long as the error handler does
1408 * something dramatic like respawn */
1409 lderr(cct) << __func__ << ": multiple write errors, handler already called"
1410 << dendl;
1411 } else {
11fdf7f2 1412 ceph_abort_msg("unhandled write error");
7c673cae
FG
1413 }
1414}
1415
1416
1417/**
1418 * Test whether the 'read_buf' byte stream has enough data to read
1419 * an entry
1420 *
1421 * sets 'next_envelope_size' to the number of bytes needed to advance (enough
1422 * to get the next header if header was unavailable, or enough to get the whole
1423 * next entry if the header was available but the body wasn't).
1424 */
1425bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const
1426{
11fdf7f2 1427 ceph_assert(need != NULL);
7c673cae
FG
1428
1429 uint32_t entry_size = 0;
1430 uint64_t entry_sentinel = 0;
11fdf7f2 1431 auto p = read_buf.cbegin();
7c673cae
FG
1432
1433 // Do we have enough data to decode an entry prefix?
1434 if (format >= JOURNAL_FORMAT_RESILIENT) {
1435 *need = sizeof(entry_size) + sizeof(entry_sentinel);
1436 } else {
1437 *need = sizeof(entry_size);
1438 }
1439 if (read_buf.length() >= *need) {
1440 if (format >= JOURNAL_FORMAT_RESILIENT) {
11fdf7f2 1441 decode(entry_sentinel, p);
7c673cae
FG
1442 if (entry_sentinel != sentinel) {
1443 throw buffer::malformed_input("Invalid sentinel");
1444 }
1445 }
1446
11fdf7f2 1447 decode(entry_size, p);
7c673cae
FG
1448 } else {
1449 return false;
1450 }
1451
1452 // Do we have enough data to decode an entry prefix, payload and suffix?
1453 if (format >= JOURNAL_FORMAT_RESILIENT) {
1454 *need = JOURNAL_ENVELOPE_RESILIENT + entry_size;
1455 } else {
1456 *need = JOURNAL_ENVELOPE_LEGACY + entry_size;
1457 }
1458 if (read_buf.length() >= *need) {
1459 return true; // No more bytes needed
1460 }
1461
1462 return false;
1463}
1464
1465
1466/**
1467 * Consume one entry from a journal byte stream 'from', splicing a
1468 * serialized LogEvent blob into 'entry'.
1469 *
1470 * 'entry' must be non null and point to an empty bufferlist.
1471 *
1472 * 'from' must contain sufficient valid data (i.e. readable is true).
1473 *
1474 * 'start_ptr' will be set to the entry's start pointer, if the collection
1475 * format provides it. It may not be null.
1476 *
1477 * @returns The number of bytes consumed from the `from` byte stream. Note
1478 * that this is not equal to the length of `entry`, which contains
1479 * the inner serialized LogEvent and not the envelope.
1480 */
1481size_t JournalStream::read(bufferlist &from, bufferlist *entry,
1482 uint64_t *start_ptr)
1483{
11fdf7f2
TL
1484 ceph_assert(start_ptr != NULL);
1485 ceph_assert(entry != NULL);
1486 ceph_assert(entry->length() == 0);
7c673cae
FG
1487
1488 uint32_t entry_size = 0;
1489
1490 // Consume envelope prefix: entry_size and entry_sentinel
11fdf7f2 1491 auto from_ptr = from.cbegin();
7c673cae
FG
1492 if (format >= JOURNAL_FORMAT_RESILIENT) {
1493 uint64_t entry_sentinel = 0;
11fdf7f2 1494 decode(entry_sentinel, from_ptr);
7c673cae
FG
1495 // Assertion instead of clean check because of precondition of this
1496 // fn is that readable() already passed
11fdf7f2 1497 ceph_assert(entry_sentinel == sentinel);
7c673cae 1498 }
11fdf7f2 1499 decode(entry_size, from_ptr);
7c673cae
FG
1500
1501 // Read out the payload
1502 from_ptr.copy(entry_size, *entry);
1503
1504 // Consume the envelope suffix (start_ptr)
1505 if (format >= JOURNAL_FORMAT_RESILIENT) {
11fdf7f2 1506 decode(*start_ptr, from_ptr);
7c673cae
FG
1507 } else {
1508 *start_ptr = 0;
1509 }
1510
1511 // Trim the input buffer to discard the bytes we have consumed
1512 from.splice(0, from_ptr.get_off());
1513
1514 return from_ptr.get_off();
1515}
1516
1517
1518/**
1519 * Append one entry
1520 */
1521size_t JournalStream::write(bufferlist &entry, bufferlist *to,
1522 uint64_t const &start_ptr)
1523{
11fdf7f2 1524 ceph_assert(to != NULL);
7c673cae
FG
1525
1526 uint32_t const entry_size = entry.length();
1527 if (format >= JOURNAL_FORMAT_RESILIENT) {
11fdf7f2 1528 encode(sentinel, *to);
7c673cae 1529 }
11fdf7f2 1530 encode(entry_size, *to);
7c673cae
FG
1531 to->claim_append(entry);
1532 if (format >= JOURNAL_FORMAT_RESILIENT) {
11fdf7f2 1533 encode(start_ptr, *to);
7c673cae
FG
1534 }
1535
1536 if (format >= JOURNAL_FORMAT_RESILIENT) {
1537 return JOURNAL_ENVELOPE_RESILIENT + entry_size;
1538 } else {
1539 return JOURNAL_ENVELOPE_LEGACY + entry_size;
1540 }
1541}
1542
1543/**
1544 * set write error callback
1545 *
1546 * Set a callback/context to trigger if we get a write error from
1547 * the objecter. This may be from an explicit request (e.g., flush)
1548 * or something async the journaler did on its own (e.g., journal
1549 * header update).
1550 *
1551 * It is only used once; if the caller continues to use the
1552 * Journaler and wants to hear about errors, it needs to reset the
1553 * error_handler.
1554 *
1555 * @param c callback/context to trigger on error
1556 */
1557void Journaler::set_write_error_handler(Context *c) {
1558 lock_guard l(lock);
11fdf7f2 1559 ceph_assert(!on_write_error);
7c673cae
FG
1560 on_write_error = wrap_finisher(c);
1561 called_write_error = false;
1562}
1563
1564
1565/**
1566 * Wrap a context in a C_OnFinisher, if it is non-NULL
1567 *
1568 * Utility function to avoid lots of error-prone and verbose
1569 * NULL checking on contexts passed in.
1570 */
1571C_OnFinisher *Journaler::wrap_finisher(Context *c)
1572{
1573 if (c != NULL) {
1574 return new C_OnFinisher(c, finisher);
1575 } else {
1576 return NULL;
1577 }
1578}
1579
1580void Journaler::shutdown()
1581{
1582 lock_guard l(lock);
1583
1584 ldout(cct, 1) << __func__ << dendl;
1585
b32b8144 1586 state = STATE_STOPPING;
7c673cae 1587 readable = false;
7c673cae
FG
1588
1589 // Kick out anyone reading from journal
1590 error = -EAGAIN;
1591 if (on_readable) {
1592 C_OnFinisher *f = on_readable;
1593 on_readable = 0;
1594 f->complete(-EAGAIN);
1595 }
1596
b32b8144
FG
1597 list<Context*> ls;
1598 ls.swap(waitfor_recover);
1599 finish_contexts(cct, ls, -ESHUTDOWN);
7c673cae
FG
1600
1601 std::map<uint64_t, std::list<Context*> >::iterator i;
1602 for (i = waitfor_safe.begin(); i != waitfor_safe.end(); ++i) {
1603 finish_contexts(cct, i->second, -EAGAIN);
1604 }
1605 waitfor_safe.clear();
1606}
1607