]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/Journaler.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / osdc / Journaler.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "common/perf_counters.h"
16#include "common/dout.h"
17#include "include/Context.h"
18#include "msg/Messenger.h"
19#include "osdc/Journaler.h"
20#include "common/errno.h"
11fdf7f2 21#include "include/ceph_assert.h"
7c673cae
FG
22#include "common/Finisher.h"
23
24#define dout_subsys ceph_subsys_journaler
25#undef dout_prefix
26#define dout_prefix *_dout << objecter->messenger->get_myname() \
27 << ".journaler." << name << (readonly ? "(ro) ":"(rw) ")
28
20effc67 29using namespace std;
7c673cae
FG
30using std::chrono::seconds;
31
32
33class Journaler::C_DelayFlush : public Context {
34 Journaler *journaler;
35 public:
11fdf7f2 36 explicit C_DelayFlush(Journaler *j) : journaler(j) {}
7c673cae
FG
37 void finish(int r) override {
38 journaler->_do_delayed_flush();
39 }
40};
41
42void Journaler::set_readonly()
43{
44 lock_guard l(lock);
45
46 ldout(cct, 1) << "set_readonly" << dendl;
47 readonly = true;
48}
49
50void Journaler::set_writeable()
51{
52 lock_guard l(lock);
53
54 ldout(cct, 1) << "set_writeable" << dendl;
55 readonly = false;
56}
57
58void Journaler::create(file_layout_t *l, stream_format_t const sf)
59{
60 lock_guard lk(lock);
61
11fdf7f2 62 ceph_assert(!readonly);
7c673cae
FG
63 state = STATE_ACTIVE;
64
65 stream_format = sf;
66 journal_stream.set_format(sf);
67 _set_layout(l);
68
69 prezeroing_pos = prezero_pos = write_pos = flush_pos =
70 safe_pos = read_pos = requested_pos = received_pos =
71 expire_pos = trimming_pos = trimmed_pos =
72 next_safe_pos = layout.get_period();
73
74 ldout(cct, 1) << "created blank journal at inode 0x" << std::hex << ino
75 << std::dec << ", format=" << stream_format << dendl;
76}
77
78void Journaler::set_layout(file_layout_t const *l)
79{
80 lock_guard lk(lock);
81 _set_layout(l);
82}
83
84void Journaler::_set_layout(file_layout_t const *l)
85{
86 layout = *l;
87
b32b8144
FG
88 if (layout.pool_id != pg_pool) {
89 // user can reset pool id through cephfs-journal-tool
90 lderr(cct) << "may got older pool id from header layout" << dendl;
91 ceph_abort();
92 }
7c673cae
FG
93 last_written.layout = layout;
94 last_committed.layout = layout;
95
96 // prefetch intelligently.
97 // (watch out, this is big if you use big objects or weird striping)
11fdf7f2 98 uint64_t periods = cct->_conf.get_val<uint64_t>("journaler_prefetch_periods");
7c673cae
FG
99 fetch_len = layout.get_period() * periods;
100}
101
102
103/***************** HEADER *******************/
104
105ostream& operator<<(ostream &out, const Journaler::Header &h)
106{
107 return out << "loghead(trim " << h.trimmed_pos
108 << ", expire " << h.expire_pos
109 << ", write " << h.write_pos
110 << ", stream_format " << (int)(h.stream_format)
111 << ")";
112}
113
114class Journaler::C_ReadHead : public Context {
115 Journaler *ls;
116public:
117 bufferlist bl;
118 explicit C_ReadHead(Journaler *l) : ls(l) {}
119 void finish(int r) override {
120 ls->_finish_read_head(r, bl);
121 }
122};
123
124class Journaler::C_RereadHead : public Context {
125 Journaler *ls;
126 Context *onfinish;
127public:
128 bufferlist bl;
129 C_RereadHead(Journaler *l, Context *onfinish_) : ls (l),
130 onfinish(onfinish_) {}
131 void finish(int r) override {
132 ls->_finish_reread_head(r, bl, onfinish);
133 }
134};
135
136class Journaler::C_ProbeEnd : public Context {
137 Journaler *ls;
138public:
139 uint64_t end;
140 explicit C_ProbeEnd(Journaler *l) : ls(l), end(-1) {}
141 void finish(int r) override {
142 ls->_finish_probe_end(r, end);
143 }
144};
145
146class Journaler::C_ReProbe : public Context {
147 Journaler *ls;
148 C_OnFinisher *onfinish;
149public:
150 uint64_t end;
151 C_ReProbe(Journaler *l, C_OnFinisher *onfinish_) :
152 ls(l), onfinish(onfinish_), end(0) {}
153 void finish(int r) override {
154 ls->_finish_reprobe(r, end, onfinish);
155 }
156};
157
158void Journaler::recover(Context *onread)
159{
160 lock_guard l(lock);
b32b8144 161 if (is_stopping()) {
7c673cae
FG
162 onread->complete(-EAGAIN);
163 return;
164 }
165
166 ldout(cct, 1) << "recover start" << dendl;
11fdf7f2
TL
167 ceph_assert(state != STATE_ACTIVE);
168 ceph_assert(readonly);
7c673cae
FG
169
170 if (onread)
171 waitfor_recover.push_back(wrap_finisher(onread));
172
173 if (state != STATE_UNDEF) {
174 ldout(cct, 1) << "recover - already recovering" << dendl;
175 return;
176 }
177
178 ldout(cct, 1) << "read_head" << dendl;
179 state = STATE_READHEAD;
180 C_ReadHead *fin = new C_ReadHead(this);
181 _read_head(fin, &fin->bl);
182}
183
184void Journaler::_read_head(Context *on_finish, bufferlist *bl)
185{
186 // lock is locked
11fdf7f2 187 ceph_assert(state == STATE_READHEAD || state == STATE_REREADHEAD);
7c673cae
FG
188
189 object_t oid = file_object_t(ino, 0);
190 object_locator_t oloc(pg_pool);
191 objecter->read_full(oid, oloc, CEPH_NOSNAP, bl, 0, wrap_finisher(on_finish));
192}
193
194void Journaler::reread_head(Context *onfinish)
195{
196 lock_guard l(lock);
197 _reread_head(wrap_finisher(onfinish));
198}
199
200/**
201 * Re-read the head from disk, and set the write_pos, expire_pos, trimmed_pos
202 * from the on-disk header. This switches the state to STATE_REREADHEAD for
203 * the duration, and you shouldn't start a re-read while other operations are
204 * in-flight, nor start other operations while a re-read is in progress.
205 * Also, don't call this until the Journaler has finished its recovery and has
206 * gone STATE_ACTIVE!
207 */
208void Journaler::_reread_head(Context *onfinish)
209{
210 ldout(cct, 10) << "reread_head" << dendl;
11fdf7f2 211 ceph_assert(state == STATE_ACTIVE);
7c673cae
FG
212
213 state = STATE_REREADHEAD;
214 C_RereadHead *fin = new C_RereadHead(this, onfinish);
215 _read_head(fin, &fin->bl);
216}
217
218void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
219{
220 lock_guard l(lock);
b32b8144
FG
221 if (is_stopping()) {
222 finish->complete(-EAGAIN);
223 return;
224 }
7c673cae
FG
225
226 //read on-disk header into
11fdf7f2 227 ceph_assert(bl.length() || r < 0 );
7c673cae
FG
228
229 // unpack header
230 if (r == 0) {
231 Header h;
11fdf7f2 232 auto p = bl.cbegin();
7c673cae 233 try {
11fdf7f2 234 decode(h, p);
7c673cae
FG
235 } catch (const buffer::error &e) {
236 finish->complete(-EINVAL);
237 return;
238 }
239 prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos
240 = h.write_pos;
241 expire_pos = h.expire_pos;
242 trimmed_pos = trimming_pos = h.trimmed_pos;
243 init_headers(h);
244 state = STATE_ACTIVE;
245 }
246
247 finish->complete(r);
248}
249
250void Journaler::_finish_read_head(int r, bufferlist& bl)
251{
252 lock_guard l(lock);
b32b8144
FG
253 if (is_stopping())
254 return;
7c673cae 255
11fdf7f2 256 ceph_assert(state == STATE_READHEAD);
7c673cae
FG
257
258 if (r!=0) {
259 ldout(cct, 0) << "error getting journal off disk" << dendl;
260 list<Context*> ls;
261 ls.swap(waitfor_recover);
262 finish_contexts(cct, ls, r);
263 return;
264 }
265
266 if (bl.length() == 0) {
267 ldout(cct, 1) << "_finish_read_head r=" << r
268 << " read 0 bytes, assuming empty log" << dendl;
269 state = STATE_ACTIVE;
270 list<Context*> ls;
271 ls.swap(waitfor_recover);
272 finish_contexts(cct, ls, 0);
273 return;
274 }
275
276 // unpack header
277 bool corrupt = false;
278 Header h;
11fdf7f2 279 auto p = bl.cbegin();
7c673cae 280 try {
11fdf7f2 281 decode(h, p);
7c673cae
FG
282
283 if (h.magic != magic) {
284 ldout(cct, 0) << "on disk magic '" << h.magic << "' != my magic '"
285 << magic << "'" << dendl;
286 corrupt = true;
287 } else if (h.write_pos < h.expire_pos || h.expire_pos < h.trimmed_pos) {
288 ldout(cct, 0) << "Corrupt header (bad offsets): " << h << dendl;
289 corrupt = true;
290 }
291 } catch (const buffer::error &e) {
292 corrupt = true;
293 }
294
295 if (corrupt) {
296 list<Context*> ls;
297 ls.swap(waitfor_recover);
298 finish_contexts(cct, ls, -EINVAL);
299 return;
300 }
301
302 prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos
303 = h.write_pos;
304 read_pos = requested_pos = received_pos = expire_pos = h.expire_pos;
305 trimmed_pos = trimming_pos = h.trimmed_pos;
306
307 init_headers(h);
308 _set_layout(&h.layout);
309 stream_format = h.stream_format;
310 journal_stream.set_format(h.stream_format);
311
312 ldout(cct, 1) << "_finish_read_head " << h
313 << ". probing for end of log (from " << write_pos << ")..."
314 << dendl;
315 C_ProbeEnd *fin = new C_ProbeEnd(this);
316 state = STATE_PROBING;
317 _probe(fin, &fin->end);
318}
319
320void Journaler::_probe(Context *finish, uint64_t *end)
321{
322 // lock is locked
323 ldout(cct, 1) << "probing for end of the log" << dendl;
11fdf7f2 324 ceph_assert(state == STATE_PROBING || state == STATE_REPROBING);
7c673cae
FG
325 // probe the log
326 filer.probe(ino, &layout, CEPH_NOSNAP,
327 write_pos, end, true, 0, wrap_finisher(finish));
328}
329
330void Journaler::_reprobe(C_OnFinisher *finish)
331{
332 ldout(cct, 10) << "reprobe" << dendl;
11fdf7f2 333 ceph_assert(state == STATE_ACTIVE);
7c673cae
FG
334
335 state = STATE_REPROBING;
336 C_ReProbe *fin = new C_ReProbe(this, finish);
337 _probe(fin, &fin->end);
338}
339
340
341void Journaler::_finish_reprobe(int r, uint64_t new_end,
342 C_OnFinisher *onfinish)
343{
344 lock_guard l(lock);
b32b8144
FG
345 if (is_stopping()) {
346 onfinish->complete(-EAGAIN);
347 return;
348 }
7c673cae 349
11fdf7f2 350 ceph_assert(new_end >= write_pos || r < 0);
7c673cae
FG
351 ldout(cct, 1) << "_finish_reprobe new_end = " << new_end
352 << " (header had " << write_pos << ")."
353 << dendl;
354 prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = new_end;
355 state = STATE_ACTIVE;
356 onfinish->complete(r);
357}
358
359void Journaler::_finish_probe_end(int r, uint64_t end)
360{
361 lock_guard l(lock);
b32b8144
FG
362 if (is_stopping())
363 return;
7c673cae 364
11fdf7f2 365 ceph_assert(state == STATE_PROBING);
7c673cae
FG
366 if (r < 0) { // error in probing
367 goto out;
368 }
369 if (((int64_t)end) == -1) {
370 end = write_pos;
371 ldout(cct, 1) << "_finish_probe_end write_pos = " << end << " (header had "
372 << write_pos << "). log was empty. recovered." << dendl;
373 ceph_abort(); // hrm.
374 } else {
11fdf7f2 375 ceph_assert(end >= write_pos);
7c673cae
FG
376 ldout(cct, 1) << "_finish_probe_end write_pos = " << end
377 << " (header had " << write_pos << "). recovered."
378 << dendl;
379 }
380
381 state = STATE_ACTIVE;
382
383 prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = end;
384
385out:
386 // done.
387 list<Context*> ls;
388 ls.swap(waitfor_recover);
389 finish_contexts(cct, ls, r);
390}
391
392class Journaler::C_RereadHeadProbe : public Context
393{
394 Journaler *ls;
395 C_OnFinisher *final_finish;
396public:
397 C_RereadHeadProbe(Journaler *l, C_OnFinisher *finish) :
398 ls(l), final_finish(finish) {}
399 void finish(int r) override {
400 ls->_finish_reread_head_and_probe(r, final_finish);
401 }
402};
403
404void Journaler::reread_head_and_probe(Context *onfinish)
405{
406 lock_guard l(lock);
407
11fdf7f2 408 ceph_assert(state == STATE_ACTIVE);
7c673cae
FG
409 _reread_head(new C_RereadHeadProbe(this, wrap_finisher(onfinish)));
410}
411
412void Journaler::_finish_reread_head_and_probe(int r, C_OnFinisher *onfinish)
413{
414 // Expect to be called back from finish_reread_head, which already takes lock
415 // lock is locked
b32b8144
FG
416 if (is_stopping()) {
417 onfinish->complete(-EAGAIN);
418 return;
419 }
7c673cae 420
9f95a23c 421 // Let the caller know that the operation has failed or was intentionally
f67539c2
TL
422 // failed since the caller has been blocklisted.
423 if (r == -EBLOCKLISTED) {
9f95a23c
TL
424 onfinish->complete(r);
425 return;
426 }
427
11fdf7f2 428 ceph_assert(!r); //if we get an error, we're boned
7c673cae
FG
429 _reprobe(onfinish);
430}
431
432
433// WRITING
434
435class Journaler::C_WriteHead : public Context {
436public:
437 Journaler *ls;
438 Header h;
439 C_OnFinisher *oncommit;
440 C_WriteHead(Journaler *l, Header& h_, C_OnFinisher *c) : ls(l), h(h_),
441 oncommit(c) {}
442 void finish(int r) override {
443 ls->_finish_write_head(r, h, oncommit);
444 }
445};
446
447void Journaler::write_head(Context *oncommit)
448{
449 lock_guard l(lock);
450 _write_head(oncommit);
451}
452
453
454void Journaler::_write_head(Context *oncommit)
455{
11fdf7f2
TL
456 ceph_assert(!readonly);
457 ceph_assert(state == STATE_ACTIVE);
7c673cae
FG
458 last_written.trimmed_pos = trimmed_pos;
459 last_written.expire_pos = expire_pos;
460 last_written.unused_field = expire_pos;
461 last_written.write_pos = safe_pos;
462 last_written.stream_format = stream_format;
463 ldout(cct, 10) << "write_head " << last_written << dendl;
464
465 // Avoid persisting bad pointers in case of bugs
11fdf7f2
TL
466 ceph_assert(last_written.write_pos >= last_written.expire_pos);
467 ceph_assert(last_written.expire_pos >= last_written.trimmed_pos);
7c673cae
FG
468
469 last_wrote_head = ceph::real_clock::now();
470
471 bufferlist bl;
11fdf7f2 472 encode(last_written, bl);
7c673cae
FG
473 SnapContext snapc;
474
475 object_t oid = file_object_t(ino, 0);
476 object_locator_t oloc(pg_pool);
477 objecter->write_full(oid, oloc, snapc, bl, ceph::real_clock::now(), 0,
478 wrap_finisher(new C_WriteHead(
479 this, last_written,
480 wrap_finisher(oncommit))),
481 0, 0, write_iohint);
482}
483
484void Journaler::_finish_write_head(int r, Header &wrote,
485 C_OnFinisher *oncommit)
486{
487 lock_guard l(lock);
488
489 if (r < 0) {
490 lderr(cct) << "_finish_write_head got " << cpp_strerror(r) << dendl;
491 handle_write_error(r);
492 return;
493 }
11fdf7f2 494 ceph_assert(!readonly);
7c673cae
FG
495 ldout(cct, 10) << "_finish_write_head " << wrote << dendl;
496 last_committed = wrote;
497 if (oncommit) {
498 oncommit->complete(r);
499 }
500
501 _trim(); // trim?
502}
503
504
505/***************** WRITING *******************/
506
507class Journaler::C_Flush : public Context {
508 Journaler *ls;
509 uint64_t start;
510 ceph::real_time stamp;
511public:
512 C_Flush(Journaler *l, int64_t s, ceph::real_time st)
513 : ls(l), start(s), stamp(st) {}
514 void finish(int r) override {
515 ls->_finish_flush(r, start, stamp);
516 }
517};
518
519void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
520{
521 lock_guard l(lock);
11fdf7f2 522 ceph_assert(!readonly);
7c673cae
FG
523
524 if (r < 0) {
525 lderr(cct) << "_finish_flush got " << cpp_strerror(r) << dendl;
526 handle_write_error(r);
527 return;
528 }
529
11fdf7f2 530 ceph_assert(start < flush_pos);
7c673cae
FG
531
532 // calc latency?
533 if (logger) {
534 ceph::timespan lat = ceph::real_clock::now() - stamp;
535 logger->tinc(logger_key_lat, lat);
536 }
537
538 // adjust safe_pos
539 auto it = pending_safe.find(start);
11fdf7f2 540 ceph_assert(it != pending_safe.end());
f64942e4 541 uint64_t min_next_safe_pos = pending_safe.begin()->second;
7c673cae
FG
542 pending_safe.erase(it);
543 if (pending_safe.empty())
544 safe_pos = next_safe_pos;
545 else
f64942e4 546 safe_pos = min_next_safe_pos;
7c673cae
FG
547
548 ldout(cct, 10) << "_finish_flush safe from " << start
549 << ", pending_safe " << pending_safe
550 << ", (prezeroing/prezero)/write/flush/safe positions now "
551 << "(" << prezeroing_pos << "/" << prezero_pos << ")/"
552 << write_pos << "/" << flush_pos << "/" << safe_pos
553 << dendl;
554
555 // kick waiters <= safe_pos
31f18b77
FG
556 if (!waitfor_safe.empty()) {
557 list<Context*> ls;
558 while (!waitfor_safe.empty()) {
559 auto it = waitfor_safe.begin();
560 if (it->first > safe_pos)
561 break;
562 ls.splice(ls.end(), it->second);
563 waitfor_safe.erase(it);
564 }
565 finish_contexts(cct, ls);
7c673cae
FG
566 }
567}
568
569
570
571uint64_t Journaler::append_entry(bufferlist& bl)
572{
573 unique_lock l(lock);
574
11fdf7f2 575 ceph_assert(!readonly);
7c673cae
FG
576 uint32_t s = bl.length();
577
578 // append
579 size_t delta = bl.length() + journal_stream.get_envelope_size();
580 // write_buf space is nearly full
581 if (!write_buf_throttle.get_or_fail(delta)) {
582 l.unlock();
583 ldout(cct, 10) << "write_buf_throttle wait, delta " << delta << dendl;
584 write_buf_throttle.get(delta);
585 l.lock();
586 }
587 ldout(cct, 20) << "write_buf_throttle get, delta " << delta << dendl;
588 size_t wrote = journal_stream.write(bl, &write_buf, write_pos);
589 ldout(cct, 10) << "append_entry len " << s << " to " << write_pos << "~"
590 << wrote << dendl;
591 write_pos += wrote;
592
593 // flush previous object?
594 uint64_t su = get_layout_period();
11fdf7f2 595 ceph_assert(su > 0);
7c673cae
FG
596 uint64_t write_off = write_pos % su;
597 uint64_t write_obj = write_pos / su;
598 uint64_t flush_obj = flush_pos / su;
599 if (write_obj != flush_obj) {
600 ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro "
601 << write_obj << " flo " << flush_obj << ")" << dendl;
602 _do_flush(write_buf.length() - write_off);
31f18b77
FG
603
604 // if _do_flush() skips flushing some data, it does do a best effort to
605 // update next_safe_pos.
606 if (write_buf.length() > 0 &&
607 write_buf.length() <= wrote) { // the unflushed data are within this entry
608 // set next_safe_pos to end of previous entry
7c673cae
FG
609 next_safe_pos = write_pos - wrote;
610 }
611 }
612
613 return write_pos;
614}
615
616
617void Journaler::_do_flush(unsigned amount)
618{
b32b8144
FG
619 if (is_stopping())
620 return;
7c673cae
FG
621 if (write_pos == flush_pos)
622 return;
11fdf7f2
TL
623 ceph_assert(write_pos > flush_pos);
624 ceph_assert(!readonly);
7c673cae
FG
625
626 // flush
627 uint64_t len = write_pos - flush_pos;
11fdf7f2 628 ceph_assert(len == write_buf.length());
7c673cae
FG
629 if (amount && amount < len)
630 len = amount;
631
632 // zero at least two full periods ahead. this ensures
633 // that the next object will not exist.
634 uint64_t period = get_layout_period();
635 if (flush_pos + len + 2*period > prezero_pos) {
636 _issue_prezero();
637
638 int64_t newlen = prezero_pos - flush_pos - period;
639 if (newlen <= 0) {
640 ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len
641 << " already too close to prezero_pos " << prezero_pos
642 << ", zeroing first" << dendl;
94b18763 643 waiting_for_zero_pos = flush_pos + len;
7c673cae
FG
644 return;
645 }
646 if (static_cast<uint64_t>(newlen) < len) {
647 ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len
648 << " but hit prezero_pos " << prezero_pos
649 << ", will do " << flush_pos << "~" << newlen << dendl;
94b18763 650 waiting_for_zero_pos = flush_pos + len;
7c673cae 651 len = newlen;
7c673cae 652 }
7c673cae
FG
653 }
654 ldout(cct, 10) << "_do_flush flushing " << flush_pos << "~" << len << dendl;
655
656 // submit write for anything pending
657 // flush _start_ pos to _finish_flush
658 ceph::real_time now = ceph::real_clock::now();
659 SnapContext snapc;
660
661 Context *onsafe = new C_Flush(this, flush_pos, now); // on COMMIT
662 pending_safe[flush_pos] = next_safe_pos;
663
664 bufferlist write_bl;
665
666 // adjust pointers
667 if (len == write_buf.length()) {
668 write_bl.swap(write_buf);
669 next_safe_pos = write_pos;
670 } else {
671 write_buf.splice(0, len, &write_bl);
31f18b77
FG
672 // Keys of waitfor_safe map are journal entry boundaries.
673 // Try finding a journal entry that we are actually flushing
674 // and set next_safe_pos to end of it. This is best effort.
675 // The one we found may not be the lastest flushing entry.
676 auto p = waitfor_safe.lower_bound(flush_pos + len);
677 if (p != waitfor_safe.end()) {
678 if (p->first > flush_pos + len && p != waitfor_safe.begin())
679 --p;
680 if (p->first <= flush_pos + len && p->first > next_safe_pos)
681 next_safe_pos = p->first;
682 }
7c673cae
FG
683 }
684
685 filer.write(ino, &layout, snapc,
686 flush_pos, len, write_bl, ceph::real_clock::now(),
687 0,
688 wrap_finisher(onsafe), write_iohint);
689
690 flush_pos += len;
11fdf7f2 691 ceph_assert(write_buf.length() == write_pos - flush_pos);
7c673cae
FG
692 write_buf_throttle.put(len);
693 ldout(cct, 20) << "write_buf_throttle put, len " << len << dendl;
694
695 ldout(cct, 10)
696 << "_do_flush (prezeroing/prezero)/write/flush/safe pointers now at "
697 << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos
698 << "/" << flush_pos << "/" << safe_pos << dendl;
699
700 _issue_prezero();
701}
702
703
704void Journaler::wait_for_flush(Context *onsafe)
705{
706 lock_guard l(lock);
b32b8144 707 if (is_stopping()) {
f91f0fd5
TL
708 if (onsafe)
709 onsafe->complete(-EAGAIN);
7c673cae
FG
710 return;
711 }
712 _wait_for_flush(onsafe);
713}
714
715void Journaler::_wait_for_flush(Context *onsafe)
716{
11fdf7f2 717 ceph_assert(!readonly);
7c673cae
FG
718
719 // all flushed and safe?
720 if (write_pos == safe_pos) {
11fdf7f2 721 ceph_assert(write_buf.length() == 0);
7c673cae
FG
722 ldout(cct, 10)
723 << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe "
724 "pointers at " << "(" << prezeroing_pos << "/" << prezero_pos << ")/"
725 << write_pos << "/" << flush_pos << "/" << safe_pos << dendl;
726 if (onsafe) {
727 finisher->queue(onsafe, 0);
728 }
729 return;
730 }
731
732 // queue waiter
733 if (onsafe) {
734 waitfor_safe[write_pos].push_back(wrap_finisher(onsafe));
735 }
736}
737
738void Journaler::flush(Context *onsafe)
739{
740 lock_guard l(lock);
b32b8144 741 if (is_stopping()) {
f91f0fd5
TL
742 if (onsafe)
743 onsafe->complete(-EAGAIN);
b32b8144
FG
744 return;
745 }
7c673cae
FG
746 _flush(wrap_finisher(onsafe));
747}
748
749void Journaler::_flush(C_OnFinisher *onsafe)
750{
11fdf7f2 751 ceph_assert(!readonly);
7c673cae
FG
752
753 if (write_pos == flush_pos) {
11fdf7f2 754 ceph_assert(write_buf.length() == 0);
7c673cae
FG
755 ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/"
756 "flush/safe pointers at " << "(" << prezeroing_pos << "/" << prezero_pos
757 << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos
758 << dendl;
759 if (onsafe) {
760 onsafe->complete(0);
761 }
762 } else {
763 _do_flush();
764 _wait_for_flush(onsafe);
765 }
766
767 // write head?
768 if (_write_head_needed()) {
769 _write_head();
770 }
771}
772
773bool Journaler::_write_head_needed()
774{
11fdf7f2 775 return last_wrote_head + seconds(cct->_conf.get_val<int64_t>("journaler_write_head_interval"))
7c673cae
FG
776 < ceph::real_clock::now();
777}
778
779
780/*************** prezeroing ******************/
781
782struct C_Journaler_Prezero : public Context {
783 Journaler *journaler;
784 uint64_t from, len;
785 C_Journaler_Prezero(Journaler *j, uint64_t f, uint64_t l)
786 : journaler(j), from(f), len(l) {}
787 void finish(int r) override {
788 journaler->_finish_prezero(r, from, len);
789 }
790};
791
792void Journaler::_issue_prezero()
793{
11fdf7f2 794 ceph_assert(prezeroing_pos >= flush_pos);
7c673cae 795
11fdf7f2 796 uint64_t num_periods = cct->_conf.get_val<uint64_t>("journaler_prezero_periods");
7c673cae
FG
797 /*
798 * issue zero requests based on write_pos, even though the invariant
799 * is that we zero ahead of flush_pos.
800 */
801 uint64_t period = get_layout_period();
802 uint64_t to = write_pos + period * num_periods + period - 1;
803 to -= to % period;
804
805 if (prezeroing_pos >= to) {
806 ldout(cct, 20) << "_issue_prezero target " << to << " <= prezeroing_pos "
807 << prezeroing_pos << dendl;
808 return;
809 }
810
811 while (prezeroing_pos < to) {
812 uint64_t len;
813 if (prezeroing_pos % period == 0) {
814 len = period;
815 ldout(cct, 10) << "_issue_prezero removing " << prezeroing_pos << "~"
816 << period << " (full period)" << dendl;
817 } else {
818 len = period - (prezeroing_pos % period);
819 ldout(cct, 10) << "_issue_prezero zeroing " << prezeroing_pos << "~"
820 << len << " (partial period)" << dendl;
821 }
822 SnapContext snapc;
823 Context *c = wrap_finisher(new C_Journaler_Prezero(this, prezeroing_pos,
824 len));
825 filer.zero(ino, &layout, snapc, prezeroing_pos, len,
826 ceph::real_clock::now(), 0, c);
827 prezeroing_pos += len;
828 }
829}
830
831// Lock cycle because we get called out of objecter callback (holding
832// objecter read lock), but there are also cases where we take the journaler
833// lock before calling into objecter to do I/O.
834void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len)
835{
836 lock_guard l(lock);
837
838 ldout(cct, 10) << "_prezeroed to " << start << "~" << len
839 << ", prezeroing/prezero was " << prezeroing_pos << "/"
840 << prezero_pos << ", pending " << pending_zero
841 << dendl;
842 if (r < 0 && r != -ENOENT) {
843 lderr(cct) << "_prezeroed got " << cpp_strerror(r) << dendl;
844 handle_write_error(r);
845 return;
846 }
847
11fdf7f2 848 ceph_assert(r == 0 || r == -ENOENT);
7c673cae
FG
849
850 if (start == prezero_pos) {
851 prezero_pos += len;
852 while (!pending_zero.empty() &&
853 pending_zero.begin().get_start() == prezero_pos) {
854 interval_set<uint64_t>::iterator b(pending_zero.begin());
855 prezero_pos += b.get_len();
856 pending_zero.erase(b);
857 }
858
94b18763
FG
859 if (waiting_for_zero_pos > flush_pos) {
860 _do_flush(waiting_for_zero_pos - flush_pos);
7c673cae 861 }
28e407b8
AA
862
863 if (prezero_pos == prezeroing_pos &&
864 !waitfor_prezero.empty()) {
865 list<Context*> ls;
866 ls.swap(waitfor_prezero);
867 finish_contexts(cct, ls, 0);
868 }
7c673cae
FG
869 } else {
870 pending_zero.insert(start, len);
871 }
872 ldout(cct, 10) << "_prezeroed prezeroing/prezero now " << prezeroing_pos
873 << "/" << prezero_pos
874 << ", pending " << pending_zero
875 << dendl;
876}
877
28e407b8
AA
878void Journaler::wait_for_prezero(Context *onfinish)
879{
11fdf7f2 880 ceph_assert(onfinish);
28e407b8
AA
881 lock_guard l(lock);
882
883 if (prezero_pos == prezeroing_pos) {
884 finisher->queue(onfinish, 0);
885 return;
886 }
887 waitfor_prezero.push_back(wrap_finisher(onfinish));
888}
7c673cae
FG
889
890
891/***************** READING *******************/
892
893
894class Journaler::C_Read : public Context {
895 Journaler *ls;
896 uint64_t offset;
897 uint64_t length;
898public:
899 bufferlist bl;
900 C_Read(Journaler *j, uint64_t o, uint64_t l) : ls(j), offset(o), length(l) {}
901 void finish(int r) override {
902 ls->_finish_read(r, offset, length, bl);
903 }
904};
905
906class Journaler::C_RetryRead : public Context {
907 Journaler *ls;
908public:
909 explicit C_RetryRead(Journaler *l) : ls(l) {}
910
911 void finish(int r) override {
912 // Should only be called from waitfor_safe i.e. already inside lock
913 // (ls->lock is locked
914 ls->_prefetch();
915 }
916};
917
918void Journaler::_finish_read(int r, uint64_t offset, uint64_t length,
919 bufferlist& bl)
920{
921 lock_guard l(lock);
922
923 if (r < 0) {
924 ldout(cct, 0) << "_finish_read got error " << r << dendl;
925 error = r;
926 } else {
927 ldout(cct, 10) << "_finish_read got " << offset << "~" << bl.length()
928 << dendl;
929 if (bl.length() < length) {
930 ldout(cct, 0) << "_finish_read got less than expected (" << length << ")"
931 << dendl;
932 error = -EINVAL;
933 }
934 }
935
936 if (error) {
937 if (on_readable) {
938 C_OnFinisher *f = on_readable;
939 on_readable = 0;
940 f->complete(error);
941 }
942 return;
943 }
944
945 prefetch_buf[offset].swap(bl);
946
947 try {
948 _assimilate_prefetch();
949 } catch (const buffer::error &err) {
950 lderr(cct) << "_decode error from assimilate_prefetch" << dendl;
951 error = -EINVAL;
952 if (on_readable) {
953 C_OnFinisher *f = on_readable;
954 on_readable = 0;
955 f->complete(error);
956 }
957 return;
958 }
959 _prefetch();
960}
961
962void Journaler::_assimilate_prefetch()
963{
964 bool was_readable = readable;
965
966 bool got_any = false;
967 while (!prefetch_buf.empty()) {
968 map<uint64_t,bufferlist>::iterator p = prefetch_buf.begin();
969 if (p->first != received_pos) {
970 uint64_t gap = p->first - received_pos;
971 ldout(cct, 10) << "_assimilate_prefetch gap of " << gap
972 << " from received_pos " << received_pos
973 << " to first prefetched buffer " << p->first << dendl;
974 break;
975 }
976
977 ldout(cct, 10) << "_assimilate_prefetch " << p->first << "~"
978 << p->second.length() << dendl;
979 received_pos += p->second.length();
980 read_buf.claim_append(p->second);
11fdf7f2 981 ceph_assert(received_pos <= requested_pos);
7c673cae
FG
982 prefetch_buf.erase(p);
983 got_any = true;
984 }
985
986 if (got_any) {
987 ldout(cct, 10) << "_assimilate_prefetch read_buf now " << read_pos << "~"
11fdf7f2
TL
988 << read_buf.length() << ", read pointers read_pos=" << read_pos
989 << " received_pos=" << received_pos << " requested_pos=" << requested_pos
7c673cae
FG
990 << dendl;
991
992 // Update readability (this will also hit any decode errors resulting
993 // from bad data)
39ae355f 994 readable = _have_next_entry();
7c673cae
FG
995 }
996
997 if ((got_any && !was_readable && readable) || read_pos == write_pos) {
998 // readable!
999 ldout(cct, 10) << "_finish_read now readable (or at journal end) readable="
1000 << readable << " read_pos=" << read_pos << " write_pos="
1001 << write_pos << dendl;
1002 if (on_readable) {
1003 C_OnFinisher *f = on_readable;
1004 on_readable = 0;
1005 f->complete(0);
1006 }
1007 }
1008}
1009
1010void Journaler::_issue_read(uint64_t len)
1011{
1012 // stuck at safe_pos? (this is needed if we are reading the tail of
1013 // a journal we are also writing to)
11fdf7f2 1014 ceph_assert(requested_pos <= safe_pos);
7c673cae
FG
1015 if (requested_pos == safe_pos) {
1016 ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos
1017 << ", waiting" << dendl;
11fdf7f2 1018 ceph_assert(write_pos > requested_pos);
7c673cae
FG
1019 if (pending_safe.empty()) {
1020 _flush(NULL);
1021 }
31f18b77
FG
1022
1023 // Make sure keys of waitfor_safe map are journal entry boundaries.
1024 // The key we used here is either next_safe_pos or old value of
1025 // next_safe_pos. next_safe_pos is always set to journal entry
1026 // boundary.
1027 auto p = pending_safe.rbegin();
1028 if (p != pending_safe.rend())
1029 waitfor_safe[p->second].push_back(new C_RetryRead(this));
1030 else
1031 waitfor_safe[next_safe_pos].push_back(new C_RetryRead(this));
7c673cae
FG
1032 return;
1033 }
1034
1035 // don't read too much
1036 if (requested_pos + len > safe_pos) {
1037 len = safe_pos - requested_pos;
1038 ldout(cct, 10) << "_issue_read reading only up to safe_pos " << safe_pos
1039 << dendl;
1040 }
1041
1042 // go.
1043 ldout(cct, 10) << "_issue_read reading " << requested_pos << "~" << len
11fdf7f2
TL
1044 << ", read pointers read_pos=" << read_pos << " received_pos=" << received_pos
1045 << " requested_pos+len=" << (requested_pos+len) << dendl;
7c673cae
FG
1046
1047 // step by period (object). _don't_ do a single big filer.read()
1048 // here because it will wait for all object reads to complete before
1049 // giving us back any data. this way we can process whatever bits
1050 // come in that are contiguous.
1051 uint64_t period = get_layout_period();
1052 while (len > 0) {
1053 uint64_t e = requested_pos + period;
1054 e -= e % period;
1055 uint64_t l = e - requested_pos;
1056 if (l > len)
1057 l = len;
1058 C_Read *c = new C_Read(this, requested_pos, l);
1059 filer.read(ino, &layout, CEPH_NOSNAP, requested_pos, l, &c->bl, 0,
1060 wrap_finisher(c), CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1061 requested_pos += l;
1062 len -= l;
1063 }
1064}
1065
1066void Journaler::_prefetch()
1067{
b32b8144
FG
1068 if (is_stopping())
1069 return;
1070
7c673cae
FG
1071 ldout(cct, 10) << "_prefetch" << dendl;
1072 // prefetch
1073 uint64_t pf;
1074 if (temp_fetch_len) {
1075 ldout(cct, 10) << "_prefetch temp_fetch_len " << temp_fetch_len << dendl;
1076 pf = temp_fetch_len;
1077 temp_fetch_len = 0;
1078 } else {
1079 pf = fetch_len;
1080 }
1081
1082 uint64_t raw_target = read_pos + pf;
1083
1084 // read full log segments, so increase if necessary
1085 uint64_t period = get_layout_period();
1086 uint64_t remainder = raw_target % period;
1087 uint64_t adjustment = remainder ? period - remainder : 0;
1088 uint64_t target = raw_target + adjustment;
1089
1090 // don't read past the log tail
1091 if (target > write_pos)
1092 target = write_pos;
1093
1094 if (requested_pos < target) {
1095 uint64_t len = target - requested_pos;
1096 ldout(cct, 10) << "_prefetch " << pf << " requested_pos " << requested_pos
1097 << " < target " << target << " (" << raw_target
1098 << "), prefetching " << len << dendl;
1099
1100 if (pending_safe.empty() && write_pos > safe_pos) {
1101 // If we are reading and writing the journal, then we may need
1102 // to issue a flush if one isn't already in progress.
1103 // Avoid doing a flush every time so that if we do write/read/write/read
1104 // we don't end up flushing after every write.
1105 ldout(cct, 10) << "_prefetch: requested_pos=" << requested_pos
1106 << ", read_pos=" << read_pos
1107 << ", write_pos=" << write_pos
1108 << ", safe_pos=" << safe_pos << dendl;
1109 _do_flush();
1110 }
1111
1112 _issue_read(len);
1113 }
1114}
1115
1116
1117/*
39ae355f 1118 * _have_next_entry() - return true if next entry is ready.
7c673cae 1119 */
39ae355f 1120bool Journaler::_have_next_entry()
7c673cae
FG
1121{
1122 // anything to read?
1123 if (read_pos == write_pos)
1124 return false;
1125
1126 // Check if the retrieve bytestream has enough for an entry
1127 uint64_t need;
1128 if (journal_stream.readable(read_buf, &need)) {
1129 return true;
1130 }
1131
39ae355f 1132 ldout (cct, 10) << "_have_next_entry read_buf.length() == " << read_buf.length()
7c673cae
FG
1133 << ", but need " << need << " for next entry; fetch_len is "
1134 << fetch_len << dendl;
1135
1136 // partial fragment at the end?
1137 if (received_pos == write_pos) {
39ae355f 1138 ldout(cct, 10) << "_have_next_entry() detected partial entry at tail, "
7c673cae
FG
1139 "adjusting write_pos to " << read_pos << dendl;
1140
1141 // adjust write_pos
1142 prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos;
11fdf7f2
TL
1143 ceph_assert(write_buf.length() == 0);
1144 ceph_assert(waitfor_safe.empty());
7c673cae
FG
1145
1146 // reset read state
1147 requested_pos = received_pos = read_pos;
1148 read_buf.clear();
1149
1150 // FIXME: truncate on disk?
1151
1152 return false;
1153 }
1154
1155 if (need > fetch_len) {
1156 temp_fetch_len = need;
39ae355f 1157 ldout(cct, 10) << "_have_next_entry noting temp_fetch_len " << temp_fetch_len
7c673cae
FG
1158 << dendl;
1159 }
1160
39ae355f 1161 ldout(cct, 10) << "_have_next_entry: not readable, returning false" << dendl;
7c673cae
FG
1162 return false;
1163}
1164
1165/*
1166 * is_readable() - kickstart prefetch, too
1167 */
1168bool Journaler::is_readable()
1169{
1170 lock_guard l(lock);
39ae355f
TL
1171 return _is_readable();
1172}
7c673cae 1173
39ae355f
TL
1174bool Journaler::_is_readable()
1175{
7c673cae
FG
1176 if (error != 0) {
1177 return false;
1178 }
1179
1180 bool r = readable;
1181 _prefetch();
1182 return r;
1183}
1184
1185class Journaler::C_EraseFinish : public Context {
1186 Journaler *journaler;
1187 C_OnFinisher *completion;
1188 public:
1189 C_EraseFinish(Journaler *j, C_OnFinisher *c) : journaler(j), completion(c) {}
1190 void finish(int r) override {
1191 journaler->_finish_erase(r, completion);
1192 }
1193};
1194
1195/**
1196 * Entirely erase the journal, including header. For use when you
1197 * have already made a copy of the journal somewhere else.
1198 */
1199void Journaler::erase(Context *completion)
1200{
1201 lock_guard l(lock);
1202
1203 // Async delete the journal data
1204 uint64_t first = trimmed_pos / get_layout_period();
1205 uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2;
1206 filer.purge_range(ino, &layout, SnapContext(), first, num,
1207 ceph::real_clock::now(), 0,
1208 wrap_finisher(new C_EraseFinish(
1209 this, wrap_finisher(completion))));
1210
1211 // We will not start the operation to delete the header until
1212 // _finish_erase has seen the data deletion succeed: otherwise if
1213 // there was an error deleting data we might prematurely delete the
1214 // header thereby lose our reference to the data.
1215}
1216
1217void Journaler::_finish_erase(int data_result, C_OnFinisher *completion)
1218{
1219 lock_guard l(lock);
b32b8144
FG
1220 if (is_stopping()) {
1221 completion->complete(-EAGAIN);
1222 return;
1223 }
7c673cae
FG
1224
1225 if (data_result == 0) {
1226 // Async delete the journal header
1227 filer.purge_range(ino, &layout, SnapContext(), 0, 1,
1228 ceph::real_clock::now(),
1229 0, wrap_finisher(completion));
1230 } else {
1231 lderr(cct) << "Failed to delete journal " << ino << " data: "
1232 << cpp_strerror(data_result) << dendl;
1233 completion->complete(data_result);
1234 }
1235}
1236
1237/* try_read_entry(bl)
1238 * read entry into bl if it's ready.
1239 * otherwise, do nothing.
1240 */
1241bool Journaler::try_read_entry(bufferlist& bl)
1242{
1243 lock_guard l(lock);
1244
1245 if (!readable) {
1246 ldout(cct, 10) << "try_read_entry at " << read_pos << " not readable"
1247 << dendl;
1248 return false;
1249 }
1250
1251 uint64_t start_ptr;
1252 size_t consumed;
1253 try {
1254 consumed = journal_stream.read(read_buf, &bl, &start_ptr);
1255 if (stream_format >= JOURNAL_FORMAT_RESILIENT) {
11fdf7f2 1256 ceph_assert(start_ptr == read_pos);
7c673cae
FG
1257 }
1258 } catch (const buffer::error &e) {
1259 lderr(cct) << __func__ << ": decode error from journal_stream" << dendl;
1260 error = -EINVAL;
1261 return false;
1262 }
1263
1264 ldout(cct, 10) << "try_read_entry at " << read_pos << " read "
1265 << read_pos << "~" << consumed << " (have "
1266 << read_buf.length() << ")" << dendl;
1267
1268 read_pos += consumed;
1269 try {
1270 // We were readable, we might not be any more
39ae355f 1271 readable = _have_next_entry();
7c673cae 1272 } catch (const buffer::error &e) {
39ae355f 1273 lderr(cct) << __func__ << ": decode error from _have_next_entry" << dendl;
7c673cae
FG
1274 error = -EINVAL;
1275 return false;
1276 }
1277
1278 // prefetch?
1279 _prefetch();
f64942e4
AA
1280
1281 // If bufferlist consists of discontiguous memory, decoding types whose
1282 // denc_traits needs contiguous memory is inefficient. The bufferlist may
1283 // get copied to temporary memory multiple times (copy_shallow() in
1284 // src/include/denc.h actually does deep copy)
1285 if (bl.get_num_buffers() > 1)
1286 bl.rebuild();
7c673cae
FG
1287 return true;
1288}
1289
1290void Journaler::wait_for_readable(Context *onreadable)
1291{
1292 lock_guard l(lock);
39ae355f
TL
1293 _wait_for_readable(onreadable);
1294}
1295
1296void Journaler::_wait_for_readable(Context *onreadable)
1297{
b32b8144 1298 if (is_stopping()) {
31f18b77 1299 finisher->queue(onreadable, -EAGAIN);
7c673cae
FG
1300 return;
1301 }
1302
11fdf7f2 1303 ceph_assert(on_readable == 0);
7c673cae
FG
1304 if (!readable) {
1305 ldout(cct, 10) << "wait_for_readable at " << read_pos << " onreadable "
1306 << onreadable << dendl;
1307 on_readable = wrap_finisher(onreadable);
1308 } else {
1309 // race with OSD reply
1310 finisher->queue(onreadable, 0);
1311 }
1312}
1313
1314bool Journaler::have_waiter() const
1315{
1316 return on_readable != nullptr;
1317}
1318
1319
1320
1321
1322/***************** TRIMMING *******************/
1323
1324
1325class Journaler::C_Trim : public Context {
1326 Journaler *ls;
1327 uint64_t to;
1328public:
1329 C_Trim(Journaler *l, int64_t t) : ls(l), to(t) {}
1330 void finish(int r) override {
1331 ls->_finish_trim(r, to);
1332 }
1333};
1334
1335void Journaler::trim()
1336{
1337 lock_guard l(lock);
1338 _trim();
1339}
1340
1341void Journaler::_trim()
1342{
b32b8144
FG
1343 if (is_stopping())
1344 return;
1345
11fdf7f2 1346 ceph_assert(!readonly);
7c673cae
FG
1347 uint64_t period = get_layout_period();
1348 uint64_t trim_to = last_committed.expire_pos;
1349 trim_to -= trim_to % period;
1350 ldout(cct, 10) << "trim last_commited head was " << last_committed
1351 << ", can trim to " << trim_to
1352 << dendl;
1353 if (trim_to == 0 || trim_to == trimming_pos) {
1354 ldout(cct, 10) << "trim already trimmed/trimming to "
1355 << trimmed_pos << "/" << trimming_pos << dendl;
1356 return;
1357 }
1358
1359 if (trimming_pos > trimmed_pos) {
1360 ldout(cct, 10) << "trim already trimming atm, try again later. "
1361 "trimmed/trimming is " << trimmed_pos << "/" << trimming_pos << dendl;
1362 return;
1363 }
1364
1365 // trim
11fdf7f2
TL
1366 ceph_assert(trim_to <= write_pos);
1367 ceph_assert(trim_to <= expire_pos);
1368 ceph_assert(trim_to > trimming_pos);
7c673cae
FG
1369 ldout(cct, 10) << "trim trimming to " << trim_to
1370 << ", trimmed/trimming/expire are "
1371 << trimmed_pos << "/" << trimming_pos << "/" << expire_pos
1372 << dendl;
1373
1374 // delete range of objects
1375 uint64_t first = trimming_pos / period;
1376 uint64_t num = (trim_to - trimming_pos) / period;
1377 SnapContext snapc;
1378 filer.purge_range(ino, &layout, snapc, first, num,
1379 ceph::real_clock::now(), 0,
1380 wrap_finisher(new C_Trim(this, trim_to)));
1381 trimming_pos = trim_to;
1382}
1383
1384void Journaler::_finish_trim(int r, uint64_t to)
1385{
1386 lock_guard l(lock);
1387
11fdf7f2 1388 ceph_assert(!readonly);
7c673cae
FG
1389 ldout(cct, 10) << "_finish_trim trimmed_pos was " << trimmed_pos
1390 << ", trimmed/trimming/expire now "
1391 << to << "/" << trimming_pos << "/" << expire_pos
1392 << dendl;
1393 if (r < 0 && r != -ENOENT) {
1394 lderr(cct) << "_finish_trim got " << cpp_strerror(r) << dendl;
1395 handle_write_error(r);
1396 return;
1397 }
1398
11fdf7f2 1399 ceph_assert(r >= 0 || r == -ENOENT);
7c673cae 1400
11fdf7f2
TL
1401 ceph_assert(to <= trimming_pos);
1402 ceph_assert(to > trimmed_pos);
7c673cae
FG
1403 trimmed_pos = to;
1404}
1405
1406void Journaler::handle_write_error(int r)
1407{
1408 // lock is locked
1409
1410 lderr(cct) << "handle_write_error " << cpp_strerror(r) << dendl;
1411 if (on_write_error) {
1412 on_write_error->complete(r);
1413 on_write_error = NULL;
1414 called_write_error = true;
1415 } else if (called_write_error) {
1416 /* We don't call error handler more than once, subsequent errors
1417 * are dropped -- this is okay as long as the error handler does
1418 * something dramatic like respawn */
1419 lderr(cct) << __func__ << ": multiple write errors, handler already called"
1420 << dendl;
1421 } else {
11fdf7f2 1422 ceph_abort_msg("unhandled write error");
7c673cae
FG
1423 }
1424}
1425
1426
1427/**
1428 * Test whether the 'read_buf' byte stream has enough data to read
1429 * an entry
1430 *
1431 * sets 'next_envelope_size' to the number of bytes needed to advance (enough
1432 * to get the next header if header was unavailable, or enough to get the whole
1433 * next entry if the header was available but the body wasn't).
1434 */
1435bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const
1436{
11fdf7f2 1437 ceph_assert(need != NULL);
7c673cae
FG
1438
1439 uint32_t entry_size = 0;
1440 uint64_t entry_sentinel = 0;
11fdf7f2 1441 auto p = read_buf.cbegin();
7c673cae
FG
1442
1443 // Do we have enough data to decode an entry prefix?
1444 if (format >= JOURNAL_FORMAT_RESILIENT) {
1445 *need = sizeof(entry_size) + sizeof(entry_sentinel);
1446 } else {
1447 *need = sizeof(entry_size);
1448 }
1449 if (read_buf.length() >= *need) {
1450 if (format >= JOURNAL_FORMAT_RESILIENT) {
11fdf7f2 1451 decode(entry_sentinel, p);
7c673cae
FG
1452 if (entry_sentinel != sentinel) {
1453 throw buffer::malformed_input("Invalid sentinel");
1454 }
1455 }
1456
11fdf7f2 1457 decode(entry_size, p);
7c673cae
FG
1458 } else {
1459 return false;
1460 }
1461
1462 // Do we have enough data to decode an entry prefix, payload and suffix?
1463 if (format >= JOURNAL_FORMAT_RESILIENT) {
1464 *need = JOURNAL_ENVELOPE_RESILIENT + entry_size;
1465 } else {
1466 *need = JOURNAL_ENVELOPE_LEGACY + entry_size;
1467 }
1468 if (read_buf.length() >= *need) {
1469 return true; // No more bytes needed
1470 }
1471
1472 return false;
1473}
1474
1475
1476/**
1477 * Consume one entry from a journal byte stream 'from', splicing a
1478 * serialized LogEvent blob into 'entry'.
1479 *
1480 * 'entry' must be non null and point to an empty bufferlist.
1481 *
1482 * 'from' must contain sufficient valid data (i.e. readable is true).
1483 *
1484 * 'start_ptr' will be set to the entry's start pointer, if the collection
1485 * format provides it. It may not be null.
1486 *
1487 * @returns The number of bytes consumed from the `from` byte stream. Note
1488 * that this is not equal to the length of `entry`, which contains
1489 * the inner serialized LogEvent and not the envelope.
1490 */
1491size_t JournalStream::read(bufferlist &from, bufferlist *entry,
1492 uint64_t *start_ptr)
1493{
11fdf7f2
TL
1494 ceph_assert(start_ptr != NULL);
1495 ceph_assert(entry != NULL);
1496 ceph_assert(entry->length() == 0);
7c673cae
FG
1497
1498 uint32_t entry_size = 0;
1499
1500 // Consume envelope prefix: entry_size and entry_sentinel
11fdf7f2 1501 auto from_ptr = from.cbegin();
7c673cae
FG
1502 if (format >= JOURNAL_FORMAT_RESILIENT) {
1503 uint64_t entry_sentinel = 0;
11fdf7f2 1504 decode(entry_sentinel, from_ptr);
7c673cae
FG
1505 // Assertion instead of clean check because of precondition of this
1506 // fn is that readable() already passed
11fdf7f2 1507 ceph_assert(entry_sentinel == sentinel);
7c673cae 1508 }
11fdf7f2 1509 decode(entry_size, from_ptr);
7c673cae
FG
1510
1511 // Read out the payload
1512 from_ptr.copy(entry_size, *entry);
1513
1514 // Consume the envelope suffix (start_ptr)
1515 if (format >= JOURNAL_FORMAT_RESILIENT) {
11fdf7f2 1516 decode(*start_ptr, from_ptr);
7c673cae
FG
1517 } else {
1518 *start_ptr = 0;
1519 }
1520
1521 // Trim the input buffer to discard the bytes we have consumed
1522 from.splice(0, from_ptr.get_off());
1523
1524 return from_ptr.get_off();
1525}
1526
1527
1528/**
1529 * Append one entry
1530 */
1531size_t JournalStream::write(bufferlist &entry, bufferlist *to,
1532 uint64_t const &start_ptr)
1533{
11fdf7f2 1534 ceph_assert(to != NULL);
7c673cae
FG
1535
1536 uint32_t const entry_size = entry.length();
1537 if (format >= JOURNAL_FORMAT_RESILIENT) {
11fdf7f2 1538 encode(sentinel, *to);
7c673cae 1539 }
11fdf7f2 1540 encode(entry_size, *to);
7c673cae
FG
1541 to->claim_append(entry);
1542 if (format >= JOURNAL_FORMAT_RESILIENT) {
11fdf7f2 1543 encode(start_ptr, *to);
7c673cae
FG
1544 }
1545
1546 if (format >= JOURNAL_FORMAT_RESILIENT) {
1547 return JOURNAL_ENVELOPE_RESILIENT + entry_size;
1548 } else {
1549 return JOURNAL_ENVELOPE_LEGACY + entry_size;
1550 }
1551}
1552
1553/**
1554 * set write error callback
1555 *
1556 * Set a callback/context to trigger if we get a write error from
1557 * the objecter. This may be from an explicit request (e.g., flush)
1558 * or something async the journaler did on its own (e.g., journal
1559 * header update).
1560 *
1561 * It is only used once; if the caller continues to use the
1562 * Journaler and wants to hear about errors, it needs to reset the
1563 * error_handler.
1564 *
1565 * @param c callback/context to trigger on error
1566 */
1567void Journaler::set_write_error_handler(Context *c) {
1568 lock_guard l(lock);
11fdf7f2 1569 ceph_assert(!on_write_error);
7c673cae
FG
1570 on_write_error = wrap_finisher(c);
1571 called_write_error = false;
1572}
1573
1574
1575/**
1576 * Wrap a context in a C_OnFinisher, if it is non-NULL
1577 *
1578 * Utility function to avoid lots of error-prone and verbose
1579 * NULL checking on contexts passed in.
1580 */
1581C_OnFinisher *Journaler::wrap_finisher(Context *c)
1582{
1583 if (c != NULL) {
1584 return new C_OnFinisher(c, finisher);
1585 } else {
1586 return NULL;
1587 }
1588}
1589
1590void Journaler::shutdown()
1591{
1592 lock_guard l(lock);
1593
1594 ldout(cct, 1) << __func__ << dendl;
1595
b32b8144 1596 state = STATE_STOPPING;
7c673cae 1597 readable = false;
7c673cae
FG
1598
1599 // Kick out anyone reading from journal
1600 error = -EAGAIN;
1601 if (on_readable) {
1602 C_OnFinisher *f = on_readable;
1603 on_readable = 0;
1604 f->complete(-EAGAIN);
1605 }
1606
b32b8144
FG
1607 list<Context*> ls;
1608 ls.swap(waitfor_recover);
1609 finish_contexts(cct, ls, -ESHUTDOWN);
7c673cae
FG
1610
1611 std::map<uint64_t, std::list<Context*> >::iterator i;
1612 for (i = waitfor_safe.begin(); i != waitfor_safe.end(); ++i) {
1613 finish_contexts(cct, i->second, -EAGAIN);
1614 }
1615 waitfor_safe.clear();
1616}
1617
39ae355f
TL
1618void Journaler::check_isreadable()
1619{
1620 std::unique_lock l(lock);
1621 while (!_is_readable() &&
1622 get_read_pos() < get_write_pos() &&
1623 !get_error()) {
1624 C_SaferCond readable_waiter;
1625 _wait_for_readable(&readable_waiter);
1626 l.unlock();
1627 readable_waiter.wait();
1628 l.lock();
1629 }
1630 return ;
1631}