]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/Journaler.cc
bump version to 15.2.6-pve1
[ceph.git] / ceph / src / osdc / Journaler.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "common/perf_counters.h"
16#include "common/dout.h"
17#include "include/Context.h"
18#include "msg/Messenger.h"
19#include "osdc/Journaler.h"
20#include "common/errno.h"
11fdf7f2 21#include "include/ceph_assert.h"
7c673cae
FG
22#include "common/Finisher.h"
23
24#define dout_subsys ceph_subsys_journaler
25#undef dout_prefix
26#define dout_prefix *_dout << objecter->messenger->get_myname() \
27 << ".journaler." << name << (readonly ? "(ro) ":"(rw) ")
28
29using std::chrono::seconds;
30
31
32class Journaler::C_DelayFlush : public Context {
33 Journaler *journaler;
34 public:
11fdf7f2 35 explicit C_DelayFlush(Journaler *j) : journaler(j) {}
7c673cae
FG
36 void finish(int r) override {
37 journaler->_do_delayed_flush();
38 }
39};
40
41void Journaler::set_readonly()
42{
43 lock_guard l(lock);
44
45 ldout(cct, 1) << "set_readonly" << dendl;
46 readonly = true;
47}
48
49void Journaler::set_writeable()
50{
51 lock_guard l(lock);
52
53 ldout(cct, 1) << "set_writeable" << dendl;
54 readonly = false;
55}
56
57void Journaler::create(file_layout_t *l, stream_format_t const sf)
58{
59 lock_guard lk(lock);
60
11fdf7f2 61 ceph_assert(!readonly);
7c673cae
FG
62 state = STATE_ACTIVE;
63
64 stream_format = sf;
65 journal_stream.set_format(sf);
66 _set_layout(l);
67
68 prezeroing_pos = prezero_pos = write_pos = flush_pos =
69 safe_pos = read_pos = requested_pos = received_pos =
70 expire_pos = trimming_pos = trimmed_pos =
71 next_safe_pos = layout.get_period();
72
73 ldout(cct, 1) << "created blank journal at inode 0x" << std::hex << ino
74 << std::dec << ", format=" << stream_format << dendl;
75}
76
77void Journaler::set_layout(file_layout_t const *l)
78{
79 lock_guard lk(lock);
80 _set_layout(l);
81}
82
83void Journaler::_set_layout(file_layout_t const *l)
84{
85 layout = *l;
86
b32b8144
FG
87 if (layout.pool_id != pg_pool) {
88 // user can reset pool id through cephfs-journal-tool
89 lderr(cct) << "may got older pool id from header layout" << dendl;
90 ceph_abort();
91 }
7c673cae
FG
92 last_written.layout = layout;
93 last_committed.layout = layout;
94
95 // prefetch intelligently.
96 // (watch out, this is big if you use big objects or weird striping)
11fdf7f2 97 uint64_t periods = cct->_conf.get_val<uint64_t>("journaler_prefetch_periods");
7c673cae
FG
98 fetch_len = layout.get_period() * periods;
99}
100
101
102/***************** HEADER *******************/
103
104ostream& operator<<(ostream &out, const Journaler::Header &h)
105{
106 return out << "loghead(trim " << h.trimmed_pos
107 << ", expire " << h.expire_pos
108 << ", write " << h.write_pos
109 << ", stream_format " << (int)(h.stream_format)
110 << ")";
111}
112
113class Journaler::C_ReadHead : public Context {
114 Journaler *ls;
115public:
116 bufferlist bl;
117 explicit C_ReadHead(Journaler *l) : ls(l) {}
118 void finish(int r) override {
119 ls->_finish_read_head(r, bl);
120 }
121};
122
123class Journaler::C_RereadHead : public Context {
124 Journaler *ls;
125 Context *onfinish;
126public:
127 bufferlist bl;
128 C_RereadHead(Journaler *l, Context *onfinish_) : ls (l),
129 onfinish(onfinish_) {}
130 void finish(int r) override {
131 ls->_finish_reread_head(r, bl, onfinish);
132 }
133};
134
135class Journaler::C_ProbeEnd : public Context {
136 Journaler *ls;
137public:
138 uint64_t end;
139 explicit C_ProbeEnd(Journaler *l) : ls(l), end(-1) {}
140 void finish(int r) override {
141 ls->_finish_probe_end(r, end);
142 }
143};
144
145class Journaler::C_ReProbe : public Context {
146 Journaler *ls;
147 C_OnFinisher *onfinish;
148public:
149 uint64_t end;
150 C_ReProbe(Journaler *l, C_OnFinisher *onfinish_) :
151 ls(l), onfinish(onfinish_), end(0) {}
152 void finish(int r) override {
153 ls->_finish_reprobe(r, end, onfinish);
154 }
155};
156
157void Journaler::recover(Context *onread)
158{
159 lock_guard l(lock);
b32b8144 160 if (is_stopping()) {
7c673cae
FG
161 onread->complete(-EAGAIN);
162 return;
163 }
164
165 ldout(cct, 1) << "recover start" << dendl;
11fdf7f2
TL
166 ceph_assert(state != STATE_ACTIVE);
167 ceph_assert(readonly);
7c673cae
FG
168
169 if (onread)
170 waitfor_recover.push_back(wrap_finisher(onread));
171
172 if (state != STATE_UNDEF) {
173 ldout(cct, 1) << "recover - already recovering" << dendl;
174 return;
175 }
176
177 ldout(cct, 1) << "read_head" << dendl;
178 state = STATE_READHEAD;
179 C_ReadHead *fin = new C_ReadHead(this);
180 _read_head(fin, &fin->bl);
181}
182
183void Journaler::_read_head(Context *on_finish, bufferlist *bl)
184{
185 // lock is locked
11fdf7f2 186 ceph_assert(state == STATE_READHEAD || state == STATE_REREADHEAD);
7c673cae
FG
187
188 object_t oid = file_object_t(ino, 0);
189 object_locator_t oloc(pg_pool);
190 objecter->read_full(oid, oloc, CEPH_NOSNAP, bl, 0, wrap_finisher(on_finish));
191}
192
193void Journaler::reread_head(Context *onfinish)
194{
195 lock_guard l(lock);
196 _reread_head(wrap_finisher(onfinish));
197}
198
199/**
200 * Re-read the head from disk, and set the write_pos, expire_pos, trimmed_pos
201 * from the on-disk header. This switches the state to STATE_REREADHEAD for
202 * the duration, and you shouldn't start a re-read while other operations are
203 * in-flight, nor start other operations while a re-read is in progress.
204 * Also, don't call this until the Journaler has finished its recovery and has
205 * gone STATE_ACTIVE!
206 */
207void Journaler::_reread_head(Context *onfinish)
208{
209 ldout(cct, 10) << "reread_head" << dendl;
11fdf7f2 210 ceph_assert(state == STATE_ACTIVE);
7c673cae
FG
211
212 state = STATE_REREADHEAD;
213 C_RereadHead *fin = new C_RereadHead(this, onfinish);
214 _read_head(fin, &fin->bl);
215}
216
217void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
218{
219 lock_guard l(lock);
b32b8144
FG
220 if (is_stopping()) {
221 finish->complete(-EAGAIN);
222 return;
223 }
7c673cae
FG
224
225 //read on-disk header into
11fdf7f2 226 ceph_assert(bl.length() || r < 0 );
7c673cae
FG
227
228 // unpack header
229 if (r == 0) {
230 Header h;
11fdf7f2 231 auto p = bl.cbegin();
7c673cae 232 try {
11fdf7f2 233 decode(h, p);
7c673cae
FG
234 } catch (const buffer::error &e) {
235 finish->complete(-EINVAL);
236 return;
237 }
238 prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos
239 = h.write_pos;
240 expire_pos = h.expire_pos;
241 trimmed_pos = trimming_pos = h.trimmed_pos;
242 init_headers(h);
243 state = STATE_ACTIVE;
244 }
245
246 finish->complete(r);
247}
248
249void Journaler::_finish_read_head(int r, bufferlist& bl)
250{
251 lock_guard l(lock);
b32b8144
FG
252 if (is_stopping())
253 return;
7c673cae 254
11fdf7f2 255 ceph_assert(state == STATE_READHEAD);
7c673cae
FG
256
257 if (r!=0) {
258 ldout(cct, 0) << "error getting journal off disk" << dendl;
259 list<Context*> ls;
260 ls.swap(waitfor_recover);
261 finish_contexts(cct, ls, r);
262 return;
263 }
264
265 if (bl.length() == 0) {
266 ldout(cct, 1) << "_finish_read_head r=" << r
267 << " read 0 bytes, assuming empty log" << dendl;
268 state = STATE_ACTIVE;
269 list<Context*> ls;
270 ls.swap(waitfor_recover);
271 finish_contexts(cct, ls, 0);
272 return;
273 }
274
275 // unpack header
276 bool corrupt = false;
277 Header h;
11fdf7f2 278 auto p = bl.cbegin();
7c673cae 279 try {
11fdf7f2 280 decode(h, p);
7c673cae
FG
281
282 if (h.magic != magic) {
283 ldout(cct, 0) << "on disk magic '" << h.magic << "' != my magic '"
284 << magic << "'" << dendl;
285 corrupt = true;
286 } else if (h.write_pos < h.expire_pos || h.expire_pos < h.trimmed_pos) {
287 ldout(cct, 0) << "Corrupt header (bad offsets): " << h << dendl;
288 corrupt = true;
289 }
290 } catch (const buffer::error &e) {
291 corrupt = true;
292 }
293
294 if (corrupt) {
295 list<Context*> ls;
296 ls.swap(waitfor_recover);
297 finish_contexts(cct, ls, -EINVAL);
298 return;
299 }
300
301 prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos
302 = h.write_pos;
303 read_pos = requested_pos = received_pos = expire_pos = h.expire_pos;
304 trimmed_pos = trimming_pos = h.trimmed_pos;
305
306 init_headers(h);
307 _set_layout(&h.layout);
308 stream_format = h.stream_format;
309 journal_stream.set_format(h.stream_format);
310
311 ldout(cct, 1) << "_finish_read_head " << h
312 << ". probing for end of log (from " << write_pos << ")..."
313 << dendl;
314 C_ProbeEnd *fin = new C_ProbeEnd(this);
315 state = STATE_PROBING;
316 _probe(fin, &fin->end);
317}
318
319void Journaler::_probe(Context *finish, uint64_t *end)
320{
321 // lock is locked
322 ldout(cct, 1) << "probing for end of the log" << dendl;
11fdf7f2 323 ceph_assert(state == STATE_PROBING || state == STATE_REPROBING);
7c673cae
FG
324 // probe the log
325 filer.probe(ino, &layout, CEPH_NOSNAP,
326 write_pos, end, true, 0, wrap_finisher(finish));
327}
328
329void Journaler::_reprobe(C_OnFinisher *finish)
330{
331 ldout(cct, 10) << "reprobe" << dendl;
11fdf7f2 332 ceph_assert(state == STATE_ACTIVE);
7c673cae
FG
333
334 state = STATE_REPROBING;
335 C_ReProbe *fin = new C_ReProbe(this, finish);
336 _probe(fin, &fin->end);
337}
338
339
340void Journaler::_finish_reprobe(int r, uint64_t new_end,
341 C_OnFinisher *onfinish)
342{
343 lock_guard l(lock);
b32b8144
FG
344 if (is_stopping()) {
345 onfinish->complete(-EAGAIN);
346 return;
347 }
7c673cae 348
11fdf7f2 349 ceph_assert(new_end >= write_pos || r < 0);
7c673cae
FG
350 ldout(cct, 1) << "_finish_reprobe new_end = " << new_end
351 << " (header had " << write_pos << ")."
352 << dendl;
353 prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = new_end;
354 state = STATE_ACTIVE;
355 onfinish->complete(r);
356}
357
358void Journaler::_finish_probe_end(int r, uint64_t end)
359{
360 lock_guard l(lock);
b32b8144
FG
361 if (is_stopping())
362 return;
7c673cae 363
11fdf7f2 364 ceph_assert(state == STATE_PROBING);
7c673cae
FG
365 if (r < 0) { // error in probing
366 goto out;
367 }
368 if (((int64_t)end) == -1) {
369 end = write_pos;
370 ldout(cct, 1) << "_finish_probe_end write_pos = " << end << " (header had "
371 << write_pos << "). log was empty. recovered." << dendl;
372 ceph_abort(); // hrm.
373 } else {
11fdf7f2 374 ceph_assert(end >= write_pos);
7c673cae
FG
375 ldout(cct, 1) << "_finish_probe_end write_pos = " << end
376 << " (header had " << write_pos << "). recovered."
377 << dendl;
378 }
379
380 state = STATE_ACTIVE;
381
382 prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = end;
383
384out:
385 // done.
386 list<Context*> ls;
387 ls.swap(waitfor_recover);
388 finish_contexts(cct, ls, r);
389}
390
391class Journaler::C_RereadHeadProbe : public Context
392{
393 Journaler *ls;
394 C_OnFinisher *final_finish;
395public:
396 C_RereadHeadProbe(Journaler *l, C_OnFinisher *finish) :
397 ls(l), final_finish(finish) {}
398 void finish(int r) override {
399 ls->_finish_reread_head_and_probe(r, final_finish);
400 }
401};
402
403void Journaler::reread_head_and_probe(Context *onfinish)
404{
405 lock_guard l(lock);
406
11fdf7f2 407 ceph_assert(state == STATE_ACTIVE);
7c673cae
FG
408 _reread_head(new C_RereadHeadProbe(this, wrap_finisher(onfinish)));
409}
410
411void Journaler::_finish_reread_head_and_probe(int r, C_OnFinisher *onfinish)
412{
413 // Expect to be called back from finish_reread_head, which already takes lock
414 // lock is locked
b32b8144
FG
415 if (is_stopping()) {
416 onfinish->complete(-EAGAIN);
417 return;
418 }
7c673cae 419
9f95a23c
TL
420 // Let the caller know that the operation has failed or was intentionally
421 // failed since the caller has been blacklisted.
422 if (r == -EBLACKLISTED) {
423 onfinish->complete(r);
424 return;
425 }
426
11fdf7f2 427 ceph_assert(!r); //if we get an error, we're boned
7c673cae
FG
428 _reprobe(onfinish);
429}
430
431
432// WRITING
433
434class Journaler::C_WriteHead : public Context {
435public:
436 Journaler *ls;
437 Header h;
438 C_OnFinisher *oncommit;
439 C_WriteHead(Journaler *l, Header& h_, C_OnFinisher *c) : ls(l), h(h_),
440 oncommit(c) {}
441 void finish(int r) override {
442 ls->_finish_write_head(r, h, oncommit);
443 }
444};
445
446void Journaler::write_head(Context *oncommit)
447{
448 lock_guard l(lock);
449 _write_head(oncommit);
450}
451
452
453void Journaler::_write_head(Context *oncommit)
454{
11fdf7f2
TL
455 ceph_assert(!readonly);
456 ceph_assert(state == STATE_ACTIVE);
7c673cae
FG
457 last_written.trimmed_pos = trimmed_pos;
458 last_written.expire_pos = expire_pos;
459 last_written.unused_field = expire_pos;
460 last_written.write_pos = safe_pos;
461 last_written.stream_format = stream_format;
462 ldout(cct, 10) << "write_head " << last_written << dendl;
463
464 // Avoid persisting bad pointers in case of bugs
11fdf7f2
TL
465 ceph_assert(last_written.write_pos >= last_written.expire_pos);
466 ceph_assert(last_written.expire_pos >= last_written.trimmed_pos);
7c673cae
FG
467
468 last_wrote_head = ceph::real_clock::now();
469
470 bufferlist bl;
11fdf7f2 471 encode(last_written, bl);
7c673cae
FG
472 SnapContext snapc;
473
474 object_t oid = file_object_t(ino, 0);
475 object_locator_t oloc(pg_pool);
476 objecter->write_full(oid, oloc, snapc, bl, ceph::real_clock::now(), 0,
477 wrap_finisher(new C_WriteHead(
478 this, last_written,
479 wrap_finisher(oncommit))),
480 0, 0, write_iohint);
481}
482
483void Journaler::_finish_write_head(int r, Header &wrote,
484 C_OnFinisher *oncommit)
485{
486 lock_guard l(lock);
487
488 if (r < 0) {
489 lderr(cct) << "_finish_write_head got " << cpp_strerror(r) << dendl;
490 handle_write_error(r);
491 return;
492 }
11fdf7f2 493 ceph_assert(!readonly);
7c673cae
FG
494 ldout(cct, 10) << "_finish_write_head " << wrote << dendl;
495 last_committed = wrote;
496 if (oncommit) {
497 oncommit->complete(r);
498 }
499
500 _trim(); // trim?
501}
502
503
504/***************** WRITING *******************/
505
506class Journaler::C_Flush : public Context {
507 Journaler *ls;
508 uint64_t start;
509 ceph::real_time stamp;
510public:
511 C_Flush(Journaler *l, int64_t s, ceph::real_time st)
512 : ls(l), start(s), stamp(st) {}
513 void finish(int r) override {
514 ls->_finish_flush(r, start, stamp);
515 }
516};
517
518void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
519{
520 lock_guard l(lock);
11fdf7f2 521 ceph_assert(!readonly);
7c673cae
FG
522
523 if (r < 0) {
524 lderr(cct) << "_finish_flush got " << cpp_strerror(r) << dendl;
525 handle_write_error(r);
526 return;
527 }
528
11fdf7f2 529 ceph_assert(start < flush_pos);
7c673cae
FG
530
531 // calc latency?
532 if (logger) {
533 ceph::timespan lat = ceph::real_clock::now() - stamp;
534 logger->tinc(logger_key_lat, lat);
535 }
536
537 // adjust safe_pos
538 auto it = pending_safe.find(start);
11fdf7f2 539 ceph_assert(it != pending_safe.end());
f64942e4 540 uint64_t min_next_safe_pos = pending_safe.begin()->second;
7c673cae
FG
541 pending_safe.erase(it);
542 if (pending_safe.empty())
543 safe_pos = next_safe_pos;
544 else
f64942e4 545 safe_pos = min_next_safe_pos;
7c673cae
FG
546
547 ldout(cct, 10) << "_finish_flush safe from " << start
548 << ", pending_safe " << pending_safe
549 << ", (prezeroing/prezero)/write/flush/safe positions now "
550 << "(" << prezeroing_pos << "/" << prezero_pos << ")/"
551 << write_pos << "/" << flush_pos << "/" << safe_pos
552 << dendl;
553
554 // kick waiters <= safe_pos
31f18b77
FG
555 if (!waitfor_safe.empty()) {
556 list<Context*> ls;
557 while (!waitfor_safe.empty()) {
558 auto it = waitfor_safe.begin();
559 if (it->first > safe_pos)
560 break;
561 ls.splice(ls.end(), it->second);
562 waitfor_safe.erase(it);
563 }
564 finish_contexts(cct, ls);
7c673cae
FG
565 }
566}
567
568
569
570uint64_t Journaler::append_entry(bufferlist& bl)
571{
572 unique_lock l(lock);
573
11fdf7f2 574 ceph_assert(!readonly);
7c673cae
FG
575 uint32_t s = bl.length();
576
577 // append
578 size_t delta = bl.length() + journal_stream.get_envelope_size();
579 // write_buf space is nearly full
580 if (!write_buf_throttle.get_or_fail(delta)) {
581 l.unlock();
582 ldout(cct, 10) << "write_buf_throttle wait, delta " << delta << dendl;
583 write_buf_throttle.get(delta);
584 l.lock();
585 }
586 ldout(cct, 20) << "write_buf_throttle get, delta " << delta << dendl;
587 size_t wrote = journal_stream.write(bl, &write_buf, write_pos);
588 ldout(cct, 10) << "append_entry len " << s << " to " << write_pos << "~"
589 << wrote << dendl;
590 write_pos += wrote;
591
592 // flush previous object?
593 uint64_t su = get_layout_period();
11fdf7f2 594 ceph_assert(su > 0);
7c673cae
FG
595 uint64_t write_off = write_pos % su;
596 uint64_t write_obj = write_pos / su;
597 uint64_t flush_obj = flush_pos / su;
598 if (write_obj != flush_obj) {
599 ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro "
600 << write_obj << " flo " << flush_obj << ")" << dendl;
601 _do_flush(write_buf.length() - write_off);
31f18b77
FG
602
603 // if _do_flush() skips flushing some data, it does do a best effort to
604 // update next_safe_pos.
605 if (write_buf.length() > 0 &&
606 write_buf.length() <= wrote) { // the unflushed data are within this entry
607 // set next_safe_pos to end of previous entry
7c673cae
FG
608 next_safe_pos = write_pos - wrote;
609 }
610 }
611
612 return write_pos;
613}
614
615
616void Journaler::_do_flush(unsigned amount)
617{
b32b8144
FG
618 if (is_stopping())
619 return;
7c673cae
FG
620 if (write_pos == flush_pos)
621 return;
11fdf7f2
TL
622 ceph_assert(write_pos > flush_pos);
623 ceph_assert(!readonly);
7c673cae
FG
624
625 // flush
626 uint64_t len = write_pos - flush_pos;
11fdf7f2 627 ceph_assert(len == write_buf.length());
7c673cae
FG
628 if (amount && amount < len)
629 len = amount;
630
631 // zero at least two full periods ahead. this ensures
632 // that the next object will not exist.
633 uint64_t period = get_layout_period();
634 if (flush_pos + len + 2*period > prezero_pos) {
635 _issue_prezero();
636
637 int64_t newlen = prezero_pos - flush_pos - period;
638 if (newlen <= 0) {
639 ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len
640 << " already too close to prezero_pos " << prezero_pos
641 << ", zeroing first" << dendl;
94b18763 642 waiting_for_zero_pos = flush_pos + len;
7c673cae
FG
643 return;
644 }
645 if (static_cast<uint64_t>(newlen) < len) {
646 ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len
647 << " but hit prezero_pos " << prezero_pos
648 << ", will do " << flush_pos << "~" << newlen << dendl;
94b18763 649 waiting_for_zero_pos = flush_pos + len;
7c673cae 650 len = newlen;
7c673cae 651 }
7c673cae
FG
652 }
653 ldout(cct, 10) << "_do_flush flushing " << flush_pos << "~" << len << dendl;
654
655 // submit write for anything pending
656 // flush _start_ pos to _finish_flush
657 ceph::real_time now = ceph::real_clock::now();
658 SnapContext snapc;
659
660 Context *onsafe = new C_Flush(this, flush_pos, now); // on COMMIT
661 pending_safe[flush_pos] = next_safe_pos;
662
663 bufferlist write_bl;
664
665 // adjust pointers
666 if (len == write_buf.length()) {
667 write_bl.swap(write_buf);
668 next_safe_pos = write_pos;
669 } else {
670 write_buf.splice(0, len, &write_bl);
31f18b77
FG
671 // Keys of waitfor_safe map are journal entry boundaries.
672 // Try finding a journal entry that we are actually flushing
673 // and set next_safe_pos to end of it. This is best effort.
674 // The one we found may not be the lastest flushing entry.
675 auto p = waitfor_safe.lower_bound(flush_pos + len);
676 if (p != waitfor_safe.end()) {
677 if (p->first > flush_pos + len && p != waitfor_safe.begin())
678 --p;
679 if (p->first <= flush_pos + len && p->first > next_safe_pos)
680 next_safe_pos = p->first;
681 }
7c673cae
FG
682 }
683
684 filer.write(ino, &layout, snapc,
685 flush_pos, len, write_bl, ceph::real_clock::now(),
686 0,
687 wrap_finisher(onsafe), write_iohint);
688
689 flush_pos += len;
11fdf7f2 690 ceph_assert(write_buf.length() == write_pos - flush_pos);
7c673cae
FG
691 write_buf_throttle.put(len);
692 ldout(cct, 20) << "write_buf_throttle put, len " << len << dendl;
693
694 ldout(cct, 10)
695 << "_do_flush (prezeroing/prezero)/write/flush/safe pointers now at "
696 << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos
697 << "/" << flush_pos << "/" << safe_pos << dendl;
698
699 _issue_prezero();
700}
701
702
703void Journaler::wait_for_flush(Context *onsafe)
704{
705 lock_guard l(lock);
b32b8144 706 if (is_stopping()) {
7c673cae
FG
707 onsafe->complete(-EAGAIN);
708 return;
709 }
710 _wait_for_flush(onsafe);
711}
712
713void Journaler::_wait_for_flush(Context *onsafe)
714{
11fdf7f2 715 ceph_assert(!readonly);
7c673cae
FG
716
717 // all flushed and safe?
718 if (write_pos == safe_pos) {
11fdf7f2 719 ceph_assert(write_buf.length() == 0);
7c673cae
FG
720 ldout(cct, 10)
721 << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe "
722 "pointers at " << "(" << prezeroing_pos << "/" << prezero_pos << ")/"
723 << write_pos << "/" << flush_pos << "/" << safe_pos << dendl;
724 if (onsafe) {
725 finisher->queue(onsafe, 0);
726 }
727 return;
728 }
729
730 // queue waiter
731 if (onsafe) {
732 waitfor_safe[write_pos].push_back(wrap_finisher(onsafe));
733 }
734}
735
736void Journaler::flush(Context *onsafe)
737{
738 lock_guard l(lock);
b32b8144
FG
739 if (is_stopping()) {
740 onsafe->complete(-EAGAIN);
741 return;
742 }
7c673cae
FG
743 _flush(wrap_finisher(onsafe));
744}
745
746void Journaler::_flush(C_OnFinisher *onsafe)
747{
11fdf7f2 748 ceph_assert(!readonly);
7c673cae
FG
749
750 if (write_pos == flush_pos) {
11fdf7f2 751 ceph_assert(write_buf.length() == 0);
7c673cae
FG
752 ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/"
753 "flush/safe pointers at " << "(" << prezeroing_pos << "/" << prezero_pos
754 << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos
755 << dendl;
756 if (onsafe) {
757 onsafe->complete(0);
758 }
759 } else {
760 _do_flush();
761 _wait_for_flush(onsafe);
762 }
763
764 // write head?
765 if (_write_head_needed()) {
766 _write_head();
767 }
768}
769
770bool Journaler::_write_head_needed()
771{
11fdf7f2 772 return last_wrote_head + seconds(cct->_conf.get_val<int64_t>("journaler_write_head_interval"))
7c673cae
FG
773 < ceph::real_clock::now();
774}
775
776
777/*************** prezeroing ******************/
778
779struct C_Journaler_Prezero : public Context {
780 Journaler *journaler;
781 uint64_t from, len;
782 C_Journaler_Prezero(Journaler *j, uint64_t f, uint64_t l)
783 : journaler(j), from(f), len(l) {}
784 void finish(int r) override {
785 journaler->_finish_prezero(r, from, len);
786 }
787};
788
789void Journaler::_issue_prezero()
790{
11fdf7f2 791 ceph_assert(prezeroing_pos >= flush_pos);
7c673cae 792
11fdf7f2 793 uint64_t num_periods = cct->_conf.get_val<uint64_t>("journaler_prezero_periods");
7c673cae
FG
794 /*
795 * issue zero requests based on write_pos, even though the invariant
796 * is that we zero ahead of flush_pos.
797 */
798 uint64_t period = get_layout_period();
799 uint64_t to = write_pos + period * num_periods + period - 1;
800 to -= to % period;
801
802 if (prezeroing_pos >= to) {
803 ldout(cct, 20) << "_issue_prezero target " << to << " <= prezeroing_pos "
804 << prezeroing_pos << dendl;
805 return;
806 }
807
808 while (prezeroing_pos < to) {
809 uint64_t len;
810 if (prezeroing_pos % period == 0) {
811 len = period;
812 ldout(cct, 10) << "_issue_prezero removing " << prezeroing_pos << "~"
813 << period << " (full period)" << dendl;
814 } else {
815 len = period - (prezeroing_pos % period);
816 ldout(cct, 10) << "_issue_prezero zeroing " << prezeroing_pos << "~"
817 << len << " (partial period)" << dendl;
818 }
819 SnapContext snapc;
820 Context *c = wrap_finisher(new C_Journaler_Prezero(this, prezeroing_pos,
821 len));
822 filer.zero(ino, &layout, snapc, prezeroing_pos, len,
823 ceph::real_clock::now(), 0, c);
824 prezeroing_pos += len;
825 }
826}
827
828// Lock cycle because we get called out of objecter callback (holding
829// objecter read lock), but there are also cases where we take the journaler
830// lock before calling into objecter to do I/O.
831void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len)
832{
833 lock_guard l(lock);
834
835 ldout(cct, 10) << "_prezeroed to " << start << "~" << len
836 << ", prezeroing/prezero was " << prezeroing_pos << "/"
837 << prezero_pos << ", pending " << pending_zero
838 << dendl;
839 if (r < 0 && r != -ENOENT) {
840 lderr(cct) << "_prezeroed got " << cpp_strerror(r) << dendl;
841 handle_write_error(r);
842 return;
843 }
844
11fdf7f2 845 ceph_assert(r == 0 || r == -ENOENT);
7c673cae
FG
846
847 if (start == prezero_pos) {
848 prezero_pos += len;
849 while (!pending_zero.empty() &&
850 pending_zero.begin().get_start() == prezero_pos) {
851 interval_set<uint64_t>::iterator b(pending_zero.begin());
852 prezero_pos += b.get_len();
853 pending_zero.erase(b);
854 }
855
94b18763
FG
856 if (waiting_for_zero_pos > flush_pos) {
857 _do_flush(waiting_for_zero_pos - flush_pos);
7c673cae 858 }
28e407b8
AA
859
860 if (prezero_pos == prezeroing_pos &&
861 !waitfor_prezero.empty()) {
862 list<Context*> ls;
863 ls.swap(waitfor_prezero);
864 finish_contexts(cct, ls, 0);
865 }
7c673cae
FG
866 } else {
867 pending_zero.insert(start, len);
868 }
869 ldout(cct, 10) << "_prezeroed prezeroing/prezero now " << prezeroing_pos
870 << "/" << prezero_pos
871 << ", pending " << pending_zero
872 << dendl;
873}
874
28e407b8
AA
875void Journaler::wait_for_prezero(Context *onfinish)
876{
11fdf7f2 877 ceph_assert(onfinish);
28e407b8
AA
878 lock_guard l(lock);
879
880 if (prezero_pos == prezeroing_pos) {
881 finisher->queue(onfinish, 0);
882 return;
883 }
884 waitfor_prezero.push_back(wrap_finisher(onfinish));
885}
7c673cae
FG
886
887
888/***************** READING *******************/
889
890
891class Journaler::C_Read : public Context {
892 Journaler *ls;
893 uint64_t offset;
894 uint64_t length;
895public:
896 bufferlist bl;
897 C_Read(Journaler *j, uint64_t o, uint64_t l) : ls(j), offset(o), length(l) {}
898 void finish(int r) override {
899 ls->_finish_read(r, offset, length, bl);
900 }
901};
902
903class Journaler::C_RetryRead : public Context {
904 Journaler *ls;
905public:
906 explicit C_RetryRead(Journaler *l) : ls(l) {}
907
908 void finish(int r) override {
909 // Should only be called from waitfor_safe i.e. already inside lock
910 // (ls->lock is locked
911 ls->_prefetch();
912 }
913};
914
915void Journaler::_finish_read(int r, uint64_t offset, uint64_t length,
916 bufferlist& bl)
917{
918 lock_guard l(lock);
919
920 if (r < 0) {
921 ldout(cct, 0) << "_finish_read got error " << r << dendl;
922 error = r;
923 } else {
924 ldout(cct, 10) << "_finish_read got " << offset << "~" << bl.length()
925 << dendl;
926 if (bl.length() < length) {
927 ldout(cct, 0) << "_finish_read got less than expected (" << length << ")"
928 << dendl;
929 error = -EINVAL;
930 }
931 }
932
933 if (error) {
934 if (on_readable) {
935 C_OnFinisher *f = on_readable;
936 on_readable = 0;
937 f->complete(error);
938 }
939 return;
940 }
941
942 prefetch_buf[offset].swap(bl);
943
944 try {
945 _assimilate_prefetch();
946 } catch (const buffer::error &err) {
947 lderr(cct) << "_decode error from assimilate_prefetch" << dendl;
948 error = -EINVAL;
949 if (on_readable) {
950 C_OnFinisher *f = on_readable;
951 on_readable = 0;
952 f->complete(error);
953 }
954 return;
955 }
956 _prefetch();
957}
958
959void Journaler::_assimilate_prefetch()
960{
961 bool was_readable = readable;
962
963 bool got_any = false;
964 while (!prefetch_buf.empty()) {
965 map<uint64_t,bufferlist>::iterator p = prefetch_buf.begin();
966 if (p->first != received_pos) {
967 uint64_t gap = p->first - received_pos;
968 ldout(cct, 10) << "_assimilate_prefetch gap of " << gap
969 << " from received_pos " << received_pos
970 << " to first prefetched buffer " << p->first << dendl;
971 break;
972 }
973
974 ldout(cct, 10) << "_assimilate_prefetch " << p->first << "~"
975 << p->second.length() << dendl;
976 received_pos += p->second.length();
977 read_buf.claim_append(p->second);
11fdf7f2 978 ceph_assert(received_pos <= requested_pos);
7c673cae
FG
979 prefetch_buf.erase(p);
980 got_any = true;
981 }
982
983 if (got_any) {
984 ldout(cct, 10) << "_assimilate_prefetch read_buf now " << read_pos << "~"
11fdf7f2
TL
985 << read_buf.length() << ", read pointers read_pos=" << read_pos
986 << " received_pos=" << received_pos << " requested_pos=" << requested_pos
7c673cae
FG
987 << dendl;
988
989 // Update readability (this will also hit any decode errors resulting
990 // from bad data)
991 readable = _is_readable();
992 }
993
994 if ((got_any && !was_readable && readable) || read_pos == write_pos) {
995 // readable!
996 ldout(cct, 10) << "_finish_read now readable (or at journal end) readable="
997 << readable << " read_pos=" << read_pos << " write_pos="
998 << write_pos << dendl;
999 if (on_readable) {
1000 C_OnFinisher *f = on_readable;
1001 on_readable = 0;
1002 f->complete(0);
1003 }
1004 }
1005}
1006
1007void Journaler::_issue_read(uint64_t len)
1008{
1009 // stuck at safe_pos? (this is needed if we are reading the tail of
1010 // a journal we are also writing to)
11fdf7f2 1011 ceph_assert(requested_pos <= safe_pos);
7c673cae
FG
1012 if (requested_pos == safe_pos) {
1013 ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos
1014 << ", waiting" << dendl;
11fdf7f2 1015 ceph_assert(write_pos > requested_pos);
7c673cae
FG
1016 if (pending_safe.empty()) {
1017 _flush(NULL);
1018 }
31f18b77
FG
1019
1020 // Make sure keys of waitfor_safe map are journal entry boundaries.
1021 // The key we used here is either next_safe_pos or old value of
1022 // next_safe_pos. next_safe_pos is always set to journal entry
1023 // boundary.
1024 auto p = pending_safe.rbegin();
1025 if (p != pending_safe.rend())
1026 waitfor_safe[p->second].push_back(new C_RetryRead(this));
1027 else
1028 waitfor_safe[next_safe_pos].push_back(new C_RetryRead(this));
7c673cae
FG
1029 return;
1030 }
1031
1032 // don't read too much
1033 if (requested_pos + len > safe_pos) {
1034 len = safe_pos - requested_pos;
1035 ldout(cct, 10) << "_issue_read reading only up to safe_pos " << safe_pos
1036 << dendl;
1037 }
1038
1039 // go.
1040 ldout(cct, 10) << "_issue_read reading " << requested_pos << "~" << len
11fdf7f2
TL
1041 << ", read pointers read_pos=" << read_pos << " received_pos=" << received_pos
1042 << " requested_pos+len=" << (requested_pos+len) << dendl;
7c673cae
FG
1043
1044 // step by period (object). _don't_ do a single big filer.read()
1045 // here because it will wait for all object reads to complete before
1046 // giving us back any data. this way we can process whatever bits
1047 // come in that are contiguous.
1048 uint64_t period = get_layout_period();
1049 while (len > 0) {
1050 uint64_t e = requested_pos + period;
1051 e -= e % period;
1052 uint64_t l = e - requested_pos;
1053 if (l > len)
1054 l = len;
1055 C_Read *c = new C_Read(this, requested_pos, l);
1056 filer.read(ino, &layout, CEPH_NOSNAP, requested_pos, l, &c->bl, 0,
1057 wrap_finisher(c), CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1058 requested_pos += l;
1059 len -= l;
1060 }
1061}
1062
1063void Journaler::_prefetch()
1064{
b32b8144
FG
1065 if (is_stopping())
1066 return;
1067
7c673cae
FG
1068 ldout(cct, 10) << "_prefetch" << dendl;
1069 // prefetch
1070 uint64_t pf;
1071 if (temp_fetch_len) {
1072 ldout(cct, 10) << "_prefetch temp_fetch_len " << temp_fetch_len << dendl;
1073 pf = temp_fetch_len;
1074 temp_fetch_len = 0;
1075 } else {
1076 pf = fetch_len;
1077 }
1078
1079 uint64_t raw_target = read_pos + pf;
1080
1081 // read full log segments, so increase if necessary
1082 uint64_t period = get_layout_period();
1083 uint64_t remainder = raw_target % period;
1084 uint64_t adjustment = remainder ? period - remainder : 0;
1085 uint64_t target = raw_target + adjustment;
1086
1087 // don't read past the log tail
1088 if (target > write_pos)
1089 target = write_pos;
1090
1091 if (requested_pos < target) {
1092 uint64_t len = target - requested_pos;
1093 ldout(cct, 10) << "_prefetch " << pf << " requested_pos " << requested_pos
1094 << " < target " << target << " (" << raw_target
1095 << "), prefetching " << len << dendl;
1096
1097 if (pending_safe.empty() && write_pos > safe_pos) {
1098 // If we are reading and writing the journal, then we may need
1099 // to issue a flush if one isn't already in progress.
1100 // Avoid doing a flush every time so that if we do write/read/write/read
1101 // we don't end up flushing after every write.
1102 ldout(cct, 10) << "_prefetch: requested_pos=" << requested_pos
1103 << ", read_pos=" << read_pos
1104 << ", write_pos=" << write_pos
1105 << ", safe_pos=" << safe_pos << dendl;
1106 _do_flush();
1107 }
1108
1109 _issue_read(len);
1110 }
1111}
1112
1113
1114/*
1115 * _is_readable() - return true if next entry is ready.
1116 */
1117bool Journaler::_is_readable()
1118{
1119 // anything to read?
1120 if (read_pos == write_pos)
1121 return false;
1122
1123 // Check if the retrieve bytestream has enough for an entry
1124 uint64_t need;
1125 if (journal_stream.readable(read_buf, &need)) {
1126 return true;
1127 }
1128
1129 ldout (cct, 10) << "_is_readable read_buf.length() == " << read_buf.length()
1130 << ", but need " << need << " for next entry; fetch_len is "
1131 << fetch_len << dendl;
1132
1133 // partial fragment at the end?
1134 if (received_pos == write_pos) {
1135 ldout(cct, 10) << "is_readable() detected partial entry at tail, "
1136 "adjusting write_pos to " << read_pos << dendl;
1137
1138 // adjust write_pos
1139 prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos;
11fdf7f2
TL
1140 ceph_assert(write_buf.length() == 0);
1141 ceph_assert(waitfor_safe.empty());
7c673cae
FG
1142
1143 // reset read state
1144 requested_pos = received_pos = read_pos;
1145 read_buf.clear();
1146
1147 // FIXME: truncate on disk?
1148
1149 return false;
1150 }
1151
1152 if (need > fetch_len) {
1153 temp_fetch_len = need;
1154 ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len
1155 << dendl;
1156 }
1157
1158 ldout(cct, 10) << "_is_readable: not readable, returning false" << dendl;
1159 return false;
1160}
1161
1162/*
1163 * is_readable() - kickstart prefetch, too
1164 */
1165bool Journaler::is_readable()
1166{
1167 lock_guard l(lock);
1168
1169 if (error != 0) {
1170 return false;
1171 }
1172
1173 bool r = readable;
1174 _prefetch();
1175 return r;
1176}
1177
1178class Journaler::C_EraseFinish : public Context {
1179 Journaler *journaler;
1180 C_OnFinisher *completion;
1181 public:
1182 C_EraseFinish(Journaler *j, C_OnFinisher *c) : journaler(j), completion(c) {}
1183 void finish(int r) override {
1184 journaler->_finish_erase(r, completion);
1185 }
1186};
1187
1188/**
1189 * Entirely erase the journal, including header. For use when you
1190 * have already made a copy of the journal somewhere else.
1191 */
1192void Journaler::erase(Context *completion)
1193{
1194 lock_guard l(lock);
1195
1196 // Async delete the journal data
1197 uint64_t first = trimmed_pos / get_layout_period();
1198 uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2;
1199 filer.purge_range(ino, &layout, SnapContext(), first, num,
1200 ceph::real_clock::now(), 0,
1201 wrap_finisher(new C_EraseFinish(
1202 this, wrap_finisher(completion))));
1203
1204 // We will not start the operation to delete the header until
1205 // _finish_erase has seen the data deletion succeed: otherwise if
1206 // there was an error deleting data we might prematurely delete the
1207 // header thereby lose our reference to the data.
1208}
1209
1210void Journaler::_finish_erase(int data_result, C_OnFinisher *completion)
1211{
1212 lock_guard l(lock);
b32b8144
FG
1213 if (is_stopping()) {
1214 completion->complete(-EAGAIN);
1215 return;
1216 }
7c673cae
FG
1217
1218 if (data_result == 0) {
1219 // Async delete the journal header
1220 filer.purge_range(ino, &layout, SnapContext(), 0, 1,
1221 ceph::real_clock::now(),
1222 0, wrap_finisher(completion));
1223 } else {
1224 lderr(cct) << "Failed to delete journal " << ino << " data: "
1225 << cpp_strerror(data_result) << dendl;
1226 completion->complete(data_result);
1227 }
1228}
1229
1230/* try_read_entry(bl)
1231 * read entry into bl if it's ready.
1232 * otherwise, do nothing.
1233 */
1234bool Journaler::try_read_entry(bufferlist& bl)
1235{
1236 lock_guard l(lock);
1237
1238 if (!readable) {
1239 ldout(cct, 10) << "try_read_entry at " << read_pos << " not readable"
1240 << dendl;
1241 return false;
1242 }
1243
1244 uint64_t start_ptr;
1245 size_t consumed;
1246 try {
1247 consumed = journal_stream.read(read_buf, &bl, &start_ptr);
1248 if (stream_format >= JOURNAL_FORMAT_RESILIENT) {
11fdf7f2 1249 ceph_assert(start_ptr == read_pos);
7c673cae
FG
1250 }
1251 } catch (const buffer::error &e) {
1252 lderr(cct) << __func__ << ": decode error from journal_stream" << dendl;
1253 error = -EINVAL;
1254 return false;
1255 }
1256
1257 ldout(cct, 10) << "try_read_entry at " << read_pos << " read "
1258 << read_pos << "~" << consumed << " (have "
1259 << read_buf.length() << ")" << dendl;
1260
1261 read_pos += consumed;
1262 try {
1263 // We were readable, we might not be any more
1264 readable = _is_readable();
1265 } catch (const buffer::error &e) {
1266 lderr(cct) << __func__ << ": decode error from _is_readable" << dendl;
1267 error = -EINVAL;
1268 return false;
1269 }
1270
1271 // prefetch?
1272 _prefetch();
f64942e4
AA
1273
1274 // If bufferlist consists of discontiguous memory, decoding types whose
1275 // denc_traits needs contiguous memory is inefficient. The bufferlist may
1276 // get copied to temporary memory multiple times (copy_shallow() in
1277 // src/include/denc.h actually does deep copy)
1278 if (bl.get_num_buffers() > 1)
1279 bl.rebuild();
7c673cae
FG
1280 return true;
1281}
1282
1283void Journaler::wait_for_readable(Context *onreadable)
1284{
1285 lock_guard l(lock);
b32b8144 1286 if (is_stopping()) {
31f18b77 1287 finisher->queue(onreadable, -EAGAIN);
7c673cae
FG
1288 return;
1289 }
1290
11fdf7f2 1291 ceph_assert(on_readable == 0);
7c673cae
FG
1292 if (!readable) {
1293 ldout(cct, 10) << "wait_for_readable at " << read_pos << " onreadable "
1294 << onreadable << dendl;
1295 on_readable = wrap_finisher(onreadable);
1296 } else {
1297 // race with OSD reply
1298 finisher->queue(onreadable, 0);
1299 }
1300}
1301
1302bool Journaler::have_waiter() const
1303{
1304 return on_readable != nullptr;
1305}
1306
1307
1308
1309
1310/***************** TRIMMING *******************/
1311
1312
1313class Journaler::C_Trim : public Context {
1314 Journaler *ls;
1315 uint64_t to;
1316public:
1317 C_Trim(Journaler *l, int64_t t) : ls(l), to(t) {}
1318 void finish(int r) override {
1319 ls->_finish_trim(r, to);
1320 }
1321};
1322
1323void Journaler::trim()
1324{
1325 lock_guard l(lock);
1326 _trim();
1327}
1328
1329void Journaler::_trim()
1330{
b32b8144
FG
1331 if (is_stopping())
1332 return;
1333
11fdf7f2 1334 ceph_assert(!readonly);
7c673cae
FG
1335 uint64_t period = get_layout_period();
1336 uint64_t trim_to = last_committed.expire_pos;
1337 trim_to -= trim_to % period;
1338 ldout(cct, 10) << "trim last_commited head was " << last_committed
1339 << ", can trim to " << trim_to
1340 << dendl;
1341 if (trim_to == 0 || trim_to == trimming_pos) {
1342 ldout(cct, 10) << "trim already trimmed/trimming to "
1343 << trimmed_pos << "/" << trimming_pos << dendl;
1344 return;
1345 }
1346
1347 if (trimming_pos > trimmed_pos) {
1348 ldout(cct, 10) << "trim already trimming atm, try again later. "
1349 "trimmed/trimming is " << trimmed_pos << "/" << trimming_pos << dendl;
1350 return;
1351 }
1352
1353 // trim
11fdf7f2
TL
1354 ceph_assert(trim_to <= write_pos);
1355 ceph_assert(trim_to <= expire_pos);
1356 ceph_assert(trim_to > trimming_pos);
7c673cae
FG
1357 ldout(cct, 10) << "trim trimming to " << trim_to
1358 << ", trimmed/trimming/expire are "
1359 << trimmed_pos << "/" << trimming_pos << "/" << expire_pos
1360 << dendl;
1361
1362 // delete range of objects
1363 uint64_t first = trimming_pos / period;
1364 uint64_t num = (trim_to - trimming_pos) / period;
1365 SnapContext snapc;
1366 filer.purge_range(ino, &layout, snapc, first, num,
1367 ceph::real_clock::now(), 0,
1368 wrap_finisher(new C_Trim(this, trim_to)));
1369 trimming_pos = trim_to;
1370}
1371
1372void Journaler::_finish_trim(int r, uint64_t to)
1373{
1374 lock_guard l(lock);
1375
11fdf7f2 1376 ceph_assert(!readonly);
7c673cae
FG
1377 ldout(cct, 10) << "_finish_trim trimmed_pos was " << trimmed_pos
1378 << ", trimmed/trimming/expire now "
1379 << to << "/" << trimming_pos << "/" << expire_pos
1380 << dendl;
1381 if (r < 0 && r != -ENOENT) {
1382 lderr(cct) << "_finish_trim got " << cpp_strerror(r) << dendl;
1383 handle_write_error(r);
1384 return;
1385 }
1386
11fdf7f2 1387 ceph_assert(r >= 0 || r == -ENOENT);
7c673cae 1388
11fdf7f2
TL
1389 ceph_assert(to <= trimming_pos);
1390 ceph_assert(to > trimmed_pos);
7c673cae
FG
1391 trimmed_pos = to;
1392}
1393
1394void Journaler::handle_write_error(int r)
1395{
1396 // lock is locked
1397
1398 lderr(cct) << "handle_write_error " << cpp_strerror(r) << dendl;
1399 if (on_write_error) {
1400 on_write_error->complete(r);
1401 on_write_error = NULL;
1402 called_write_error = true;
1403 } else if (called_write_error) {
1404 /* We don't call error handler more than once, subsequent errors
1405 * are dropped -- this is okay as long as the error handler does
1406 * something dramatic like respawn */
1407 lderr(cct) << __func__ << ": multiple write errors, handler already called"
1408 << dendl;
1409 } else {
11fdf7f2 1410 ceph_abort_msg("unhandled write error");
7c673cae
FG
1411 }
1412}
1413
1414
1415/**
1416 * Test whether the 'read_buf' byte stream has enough data to read
1417 * an entry
1418 *
1419 * sets 'next_envelope_size' to the number of bytes needed to advance (enough
1420 * to get the next header if header was unavailable, or enough to get the whole
1421 * next entry if the header was available but the body wasn't).
1422 */
1423bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const
1424{
11fdf7f2 1425 ceph_assert(need != NULL);
7c673cae
FG
1426
1427 uint32_t entry_size = 0;
1428 uint64_t entry_sentinel = 0;
11fdf7f2 1429 auto p = read_buf.cbegin();
7c673cae
FG
1430
1431 // Do we have enough data to decode an entry prefix?
1432 if (format >= JOURNAL_FORMAT_RESILIENT) {
1433 *need = sizeof(entry_size) + sizeof(entry_sentinel);
1434 } else {
1435 *need = sizeof(entry_size);
1436 }
1437 if (read_buf.length() >= *need) {
1438 if (format >= JOURNAL_FORMAT_RESILIENT) {
11fdf7f2 1439 decode(entry_sentinel, p);
7c673cae
FG
1440 if (entry_sentinel != sentinel) {
1441 throw buffer::malformed_input("Invalid sentinel");
1442 }
1443 }
1444
11fdf7f2 1445 decode(entry_size, p);
7c673cae
FG
1446 } else {
1447 return false;
1448 }
1449
1450 // Do we have enough data to decode an entry prefix, payload and suffix?
1451 if (format >= JOURNAL_FORMAT_RESILIENT) {
1452 *need = JOURNAL_ENVELOPE_RESILIENT + entry_size;
1453 } else {
1454 *need = JOURNAL_ENVELOPE_LEGACY + entry_size;
1455 }
1456 if (read_buf.length() >= *need) {
1457 return true; // No more bytes needed
1458 }
1459
1460 return false;
1461}
1462
1463
1464/**
1465 * Consume one entry from a journal byte stream 'from', splicing a
1466 * serialized LogEvent blob into 'entry'.
1467 *
1468 * 'entry' must be non null and point to an empty bufferlist.
1469 *
1470 * 'from' must contain sufficient valid data (i.e. readable is true).
1471 *
1472 * 'start_ptr' will be set to the entry's start pointer, if the collection
1473 * format provides it. It may not be null.
1474 *
1475 * @returns The number of bytes consumed from the `from` byte stream. Note
1476 * that this is not equal to the length of `entry`, which contains
1477 * the inner serialized LogEvent and not the envelope.
1478 */
1479size_t JournalStream::read(bufferlist &from, bufferlist *entry,
1480 uint64_t *start_ptr)
1481{
11fdf7f2
TL
1482 ceph_assert(start_ptr != NULL);
1483 ceph_assert(entry != NULL);
1484 ceph_assert(entry->length() == 0);
7c673cae
FG
1485
1486 uint32_t entry_size = 0;
1487
1488 // Consume envelope prefix: entry_size and entry_sentinel
11fdf7f2 1489 auto from_ptr = from.cbegin();
7c673cae
FG
1490 if (format >= JOURNAL_FORMAT_RESILIENT) {
1491 uint64_t entry_sentinel = 0;
11fdf7f2 1492 decode(entry_sentinel, from_ptr);
7c673cae
FG
1493 // Assertion instead of clean check because of precondition of this
1494 // fn is that readable() already passed
11fdf7f2 1495 ceph_assert(entry_sentinel == sentinel);
7c673cae 1496 }
11fdf7f2 1497 decode(entry_size, from_ptr);
7c673cae
FG
1498
1499 // Read out the payload
1500 from_ptr.copy(entry_size, *entry);
1501
1502 // Consume the envelope suffix (start_ptr)
1503 if (format >= JOURNAL_FORMAT_RESILIENT) {
11fdf7f2 1504 decode(*start_ptr, from_ptr);
7c673cae
FG
1505 } else {
1506 *start_ptr = 0;
1507 }
1508
1509 // Trim the input buffer to discard the bytes we have consumed
1510 from.splice(0, from_ptr.get_off());
1511
1512 return from_ptr.get_off();
1513}
1514
1515
1516/**
1517 * Append one entry
1518 */
1519size_t JournalStream::write(bufferlist &entry, bufferlist *to,
1520 uint64_t const &start_ptr)
1521{
11fdf7f2 1522 ceph_assert(to != NULL);
7c673cae
FG
1523
1524 uint32_t const entry_size = entry.length();
1525 if (format >= JOURNAL_FORMAT_RESILIENT) {
11fdf7f2 1526 encode(sentinel, *to);
7c673cae 1527 }
11fdf7f2 1528 encode(entry_size, *to);
7c673cae
FG
1529 to->claim_append(entry);
1530 if (format >= JOURNAL_FORMAT_RESILIENT) {
11fdf7f2 1531 encode(start_ptr, *to);
7c673cae
FG
1532 }
1533
1534 if (format >= JOURNAL_FORMAT_RESILIENT) {
1535 return JOURNAL_ENVELOPE_RESILIENT + entry_size;
1536 } else {
1537 return JOURNAL_ENVELOPE_LEGACY + entry_size;
1538 }
1539}
1540
1541/**
1542 * set write error callback
1543 *
1544 * Set a callback/context to trigger if we get a write error from
1545 * the objecter. This may be from an explicit request (e.g., flush)
1546 * or something async the journaler did on its own (e.g., journal
1547 * header update).
1548 *
1549 * It is only used once; if the caller continues to use the
1550 * Journaler and wants to hear about errors, it needs to reset the
1551 * error_handler.
1552 *
1553 * @param c callback/context to trigger on error
1554 */
1555void Journaler::set_write_error_handler(Context *c) {
1556 lock_guard l(lock);
11fdf7f2 1557 ceph_assert(!on_write_error);
7c673cae
FG
1558 on_write_error = wrap_finisher(c);
1559 called_write_error = false;
1560}
1561
1562
1563/**
1564 * Wrap a context in a C_OnFinisher, if it is non-NULL
1565 *
1566 * Utility function to avoid lots of error-prone and verbose
1567 * NULL checking on contexts passed in.
1568 */
1569C_OnFinisher *Journaler::wrap_finisher(Context *c)
1570{
1571 if (c != NULL) {
1572 return new C_OnFinisher(c, finisher);
1573 } else {
1574 return NULL;
1575 }
1576}
1577
1578void Journaler::shutdown()
1579{
1580 lock_guard l(lock);
1581
1582 ldout(cct, 1) << __func__ << dendl;
1583
b32b8144 1584 state = STATE_STOPPING;
7c673cae 1585 readable = false;
7c673cae
FG
1586
1587 // Kick out anyone reading from journal
1588 error = -EAGAIN;
1589 if (on_readable) {
1590 C_OnFinisher *f = on_readable;
1591 on_readable = 0;
1592 f->complete(-EAGAIN);
1593 }
1594
b32b8144
FG
1595 list<Context*> ls;
1596 ls.swap(waitfor_recover);
1597 finish_contexts(cct, ls, -ESHUTDOWN);
7c673cae
FG
1598
1599 std::map<uint64_t, std::list<Context*> >::iterator i;
1600 for (i = waitfor_safe.begin(); i != waitfor_safe.end(); ++i) {
1601 finish_contexts(cct, i->second, -EAGAIN);
1602 }
1603 waitfor_safe.clear();
1604}
1605