1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #include "common/debug.h"
17 #include "common/errno.h"
18 #include "common/safe_io.h"
19 #include "FileJournal.h"
20 #include "include/color.h"
21 #include "common/perf_counters.h"
22 #include "FileStore.h"
24 #include "include/compat.h"
31 #include <sys/types.h>
33 #include <sys/mount.h>
35 #include "common/blkdev.h"
36 #if defined(__linux__)
37 #include "common/linux_version.h"
40 #if defined(__FreeBSD__)
41 #define O_DSYNC O_SYNC
44 #define dout_context cct
45 #define dout_subsys ceph_subsys_journal
47 #define dout_prefix *_dout << "journal "
52 using std::ostringstream
;
56 using std::stringstream
;
59 using ceph::bufferlist
;
60 using ceph::bufferptr
;
61 using ceph::Formatter
;
62 using ceph::JSONFormatter
;
64 const static int64_t ONE_MEG(1 << 20);
65 const static int CEPH_DIRECTIO_ALIGNMENT(4096);
68 int FileJournal::_open(bool forwrite
, bool create
)
75 flags
|= O_DIRECT
| O_DSYNC
;
83 if (TEMP_FAILURE_RETRY(::close(fd
))) {
85 derr
<< "FileJournal::_open: error closing old fd: "
86 << cpp_strerror(err
) << dendl
;
89 fd
= TEMP_FAILURE_RETRY(::open(fn
.c_str(), flags
|O_CLOEXEC
, 0644));
92 dout(2) << "FileJournal::_open unable to open journal "
93 << fn
<< ": " << cpp_strerror(err
) << dendl
;
98 ret
= ::fstat(fd
, &st
);
101 derr
<< "FileJournal::_open: unable to fstat journal: " << cpp_strerror(ret
) << dendl
;
106 if (S_ISBLK(st
.st_mode
)) {
107 ret
= _open_block_device();
108 } else if (S_ISREG(st
.st_mode
)) {
109 if (aio
&& !force_aio
) {
110 derr
<< "FileJournal::_open: disabling aio for non-block journal. Use "
111 << "journal_force_aio to force use of aio anyway" << dendl
;
114 ret
= _open_file(st
.st_size
, st
.st_blksize
, create
);
116 derr
<< "FileJournal::_open: wrong journal file type: " << st
.st_mode
127 ret
= io_setup(128, &aio_ctx
);
130 // Contrary to naive expectations -EAGIAN means ...
132 derr
<< "FileJournal::_open: user's limit of aio events exceeded. "
133 << "Try increasing /proc/sys/fs/aio-max-nr" << dendl
;
136 derr
<< "FileJournal::_open: unable to setup io_context " << cpp_strerror(-ret
) << dendl
;
144 /* We really want max_size to be a multiple of block_size. */
145 max_size
-= max_size
% block_size
;
147 dout(1) << "_open " << fn
<< " fd " << fd
149 << " bytes, block size " << block_size
150 << " bytes, directio = " << directio
156 VOID_TEMP_FAILURE_RETRY(::close(fd
));
161 int FileJournal::_open_block_device()
165 int ret
= blkdev
.get_size(&bdev_sz
);
167 dout(0) << __func__
<< ": failed to read block device size." << dendl
;
171 /* Check for bdev_sz too small */
172 if (bdev_sz
< ONE_MEG
) {
173 dout(0) << __func__
<< ": your block device must be at least "
174 << ONE_MEG
<< " bytes to be used for a Ceph journal." << dendl
;
178 dout(10) << __func__
<< ": ignoring osd journal size. "
179 << "We'll use the entire block device (size: " << bdev_sz
<< ")"
183 block_size
= cct
->_conf
->journal_block_size
;
185 if (cct
->_conf
->journal_discard
) {
186 discard
= blkdev
.support_discard();
187 dout(10) << fn
<< " support discard: " << (int)discard
<< dendl
;
193 int FileJournal::_open_file(int64_t oldsize
, blksize_t blksize
,
197 int64_t conf_journal_sz(cct
->_conf
->osd_journal_size
);
198 conf_journal_sz
<<= 20;
200 if ((cct
->_conf
->osd_journal_size
== 0) && (oldsize
< ONE_MEG
)) {
201 derr
<< "I'm sorry, I don't know how large of a journal to create."
202 << "Please specify a block device to use as the journal OR "
203 << "set osd_journal_size in your ceph.conf" << dendl
;
207 if (create
&& (oldsize
< conf_journal_sz
)) {
208 uint64_t newsize(conf_journal_sz
);
209 dout(10) << __func__
<< " _open extending to " << newsize
<< " bytes" << dendl
;
210 ret
= ::ftruncate(fd
, newsize
);
213 derr
<< "FileJournal::_open_file : unable to extend journal to "
214 << newsize
<< " bytes: " << cpp_strerror(err
) << dendl
;
217 ret
= ceph_posix_fallocate(fd
, 0, newsize
);
219 derr
<< "FileJournal::_open_file : unable to preallocation journal to "
220 << newsize
<< " bytes: " << cpp_strerror(ret
) << dendl
;
228 block_size
= cct
->_conf
->journal_block_size
;
230 if (create
&& cct
->_conf
->journal_zero_on_create
) {
231 derr
<< "FileJournal::_open_file : zeroing journal" << dendl
;
232 uint64_t write_size
= 1 << 20;
234 ret
= ::posix_memalign((void **)&buf
, block_size
, write_size
);
238 memset(static_cast<void*>(buf
), 0, write_size
);
240 for (; (i
+ write_size
) <= (uint64_t)max_size
; i
+= write_size
) {
241 ret
= ::pwrite(fd
, static_cast<void*>(buf
), write_size
, i
);
247 if (i
< (uint64_t)max_size
) {
248 ret
= ::pwrite(fd
, static_cast<void*>(buf
), max_size
- i
, i
);
258 dout(10) << "_open journal is not a block device, NOT checking disk "
259 << "write cache on '" << fn
<< "'" << dendl
;
264 // This can not be used on an active journal
265 int FileJournal::check()
269 ceph_assert(fd
== -1);
270 ret
= _open(false, false);
274 ret
= read_header(&header
);
278 if (header
.fsid
!= fsid
) {
279 derr
<< "check: ondisk fsid " << header
.fsid
<< " doesn't match expected " << fsid
280 << ", invalid (someone else's?) journal" << dendl
;
285 dout(1) << "check: header looks ok" << dendl
;
294 int FileJournal::create()
297 int64_t needed_space
;
299 ceph::buffer::ptr bp
;
300 dout(2) << "create " << fn
<< " fsid " << fsid
<< dendl
;
302 ret
= _open(true, true);
306 // write empty header
308 header
.flags
= header_t::FLAG_CRC
; // enable crcs on any new journal.
310 header
.max_size
= max_size
;
311 header
.block_size
= block_size
;
312 if (cct
->_conf
->journal_block_align
|| directio
)
313 header
.alignment
= block_size
;
315 header
.alignment
= 16; // at least stay word aligned on 64bit machines...
317 header
.start
= get_top();
318 header
.start_seq
= 0;
320 print_header(header
);
322 // static zeroed buffer for alignment padding
324 zero_buf
= new char[header
.alignment
];
325 memset(zero_buf
, 0, header
.alignment
);
327 bp
= prepare_header();
328 if (TEMP_FAILURE_RETRY(::pwrite(fd
, bp
.c_str(), bp
.length(), 0)) < 0) {
330 derr
<< "FileJournal::create : create write header error "
331 << cpp_strerror(ret
) << dendl
;
335 // zero first little bit, too.
336 ret
= posix_memalign(&buf
, block_size
, block_size
);
339 derr
<< "FileJournal::create: failed to allocate " << block_size
340 << " bytes of memory: " << cpp_strerror(ret
) << dendl
;
343 memset(buf
, 0, block_size
);
344 if (TEMP_FAILURE_RETRY(::pwrite(fd
, buf
, block_size
, get_top())) < 0) {
346 derr
<< "FileJournal::create: error zeroing first " << block_size
347 << " bytes " << cpp_strerror(ret
) << dendl
;
351 needed_space
= cct
->_conf
->osd_max_write_size
<< 20;
352 needed_space
+= (2 * sizeof(entry_header_t
)) + get_top();
353 if (header
.max_size
- header
.start
< needed_space
) {
354 derr
<< "FileJournal::create: OSD journal is not large enough to hold "
355 << "osd_max_write_size bytes!" << dendl
;
360 dout(2) << "create done" << dendl
;
367 if (TEMP_FAILURE_RETRY(::close(fd
)) < 0) {
369 derr
<< "FileJournal::create: error closing fd: " << cpp_strerror(ret
)
377 // This can not be used on an active journal
378 int FileJournal::peek_fsid(uuid_d
& fsid
)
380 ceph_assert(fd
== -1);
381 int r
= _open(false, false);
384 r
= read_header(&header
);
393 int FileJournal::open(uint64_t fs_op_seq
)
395 dout(2) << "open " << fn
<< " fsid " << fsid
<< " fs_op_seq " << fs_op_seq
<< dendl
;
397 uint64_t next_seq
= fs_op_seq
+ 1;
400 int err
= _open(false);
404 // assume writeable, unless...
406 write_pos
= get_top();
409 err
= read_header(&header
);
413 // static zeroed buffer for alignment padding
415 zero_buf
= new char[header
.alignment
];
416 memset(zero_buf
, 0, header
.alignment
);
418 dout(10) << "open header.fsid = " << header
.fsid
419 //<< " vs expected fsid = " << fsid
421 if (header
.fsid
!= fsid
) {
422 derr
<< "FileJournal::open: ondisk fsid " << header
.fsid
<< " doesn't match expected " << fsid
423 << ", invalid (someone else's?) journal" << dendl
;
427 if (header
.max_size
> max_size
) {
428 dout(2) << "open journal size " << header
.max_size
<< " > current " << max_size
<< dendl
;
432 if (header
.block_size
!= block_size
) {
433 dout(2) << "open journal block size " << header
.block_size
<< " != current " << block_size
<< dendl
;
437 if (header
.max_size
% header
.block_size
) {
438 dout(2) << "open journal max size " << header
.max_size
439 << " not a multiple of block size " << header
.block_size
<< dendl
;
443 if (header
.alignment
!= block_size
&& directio
) {
444 dout(0) << "open journal alignment " << header
.alignment
<< " does not match block size "
445 << block_size
<< " (required for direct_io journal mode)" << dendl
;
449 if ((header
.alignment
% CEPH_DIRECTIO_ALIGNMENT
) && directio
) {
450 dout(0) << "open journal alignment " << header
.alignment
451 << " is not multiple of minimum directio alignment "
452 << CEPH_DIRECTIO_ALIGNMENT
<< " (required for direct_io journal mode)"
458 // looks like a valid header.
459 write_pos
= 0; // not writeable yet
461 journaled_seq
= header
.committed_up_to
;
464 read_pos
= header
.start
;
465 seq
= header
.start_seq
;
469 off64_t old_pos
= read_pos
;
470 if (!read_entry(bl
, seq
)) {
471 dout(10) << "open reached end of journal." << dendl
;
474 if (seq
> next_seq
) {
475 dout(10) << "open entry " << seq
<< " len " << bl
.length() << " > next_seq " << next_seq
476 << ", ignoring journal contents"
479 last_committed_seq
= 0;
482 if (seq
== next_seq
) {
483 dout(10) << "open reached seq " << seq
<< dendl
;
487 seq
++; // next event should follow.
496 void FileJournal::_close(int fd
) const
498 VOID_TEMP_FAILURE_RETRY(::close(fd
));
501 void FileJournal::close()
503 dout(1) << "close " << fn
<< dendl
;
505 // stop writer thread
509 ceph_assert(writeq_empty());
510 ceph_assert(!must_write_header
);
511 ceph_assert(fd
>= 0);
517 int FileJournal::dump(ostream
& out
)
519 return _dump(out
, false);
522 int FileJournal::simple_dump(ostream
& out
)
524 return _dump(out
, true);
527 int FileJournal::_dump(ostream
& out
, bool simple
)
529 JSONFormatter
f(true);
530 int ret
= _fdump(f
, simple
);
535 int FileJournal::_fdump(Formatter
&f
, bool simple
)
537 dout(10) << "_fdump" << dendl
;
539 ceph_assert(fd
== -1);
540 int err
= _open(false, false);
544 err
= read_header(&header
);
550 off64_t next_pos
= header
.start
;
552 f
.open_object_section("journal");
554 f
.open_object_section("header");
555 f
.dump_unsigned("flags", header
.flags
);
558 f
.dump_string("fsid", os
.str());
559 f
.dump_unsigned("block_size", header
.block_size
);
560 f
.dump_unsigned("alignment", header
.alignment
);
561 f
.dump_int("max_size", header
.max_size
);
562 f
.dump_int("start", header
.start
);
563 f
.dump_unsigned("committed_up_to", header
.committed_up_to
);
564 f
.dump_unsigned("start_seq", header
.start_seq
);
567 f
.open_array_section("entries");
568 uint64_t seq
= header
.start_seq
;
571 off64_t pos
= next_pos
;
574 dout(2) << "_dump -- not readable" << dendl
;
579 read_entry_result result
= do_read_entry(
585 if (result
!= SUCCESS
) {
586 if (seq
< header
.committed_up_to
) {
587 dout(2) << "Unable to read past sequence " << seq
588 << " but header indicates the journal has committed up through "
589 << header
.committed_up_to
<< ", journal is corrupt" << dendl
;
592 dout(25) << ss
.str() << dendl
;
593 dout(25) << "No further valid entries found, journal is most likely valid"
598 f
.open_object_section("entry");
599 f
.dump_unsigned("offset", pos
);
600 f
.dump_unsigned("seq", seq
);
602 f
.dump_unsigned("bl.length", bl
.length());
604 f
.open_array_section("transactions");
605 auto p
= bl
.cbegin();
608 ObjectStore::Transaction
t(p
);
609 f
.open_object_section("transaction");
610 f
.dump_unsigned("trans_num", trans_num
);
622 dout(10) << "dump finish" << dendl
;
629 void FileJournal::start_writer()
633 write_thread
.create("journal_write");
636 write_finish_thread
.create("journal_wrt_fin");
640 void FileJournal::stop_writer()
642 // Do nothing if writer already stopped or never started
646 std::lock_guard l
{write_lock
};
647 std::lock_guard p
{writeq_lock
};
649 writeq_cond
.notify_all();
650 // Doesn't hurt to signal commit_cond in case thread is waiting there
651 // and caller didn't use committed_thru() first.
652 commit_cond
.notify_all();
656 // write journal header now so that we have less to replay on remount
661 // stop aio completeion thread *after* writer thread has stopped
662 // and has submitted all of its io
663 if (aio
&& !aio_stop
) {
666 aio_cond
.notify_all();
667 write_finish_cond
.notify_all();
669 write_finish_thread
.join();
676 void FileJournal::print_header(const header_t
&header
) const
678 dout(10) << "header: block_size " << header
.block_size
679 << " alignment " << header
.alignment
680 << " max_size " << header
.max_size
682 dout(10) << "header: start " << header
.start
<< dendl
;
683 dout(10) << " write_pos " << write_pos
<< dendl
;
686 int FileJournal::read_header(header_t
*hdr
) const
688 dout(10) << "read_header" << dendl
;
691 ceph::buffer::ptr bp
= ceph::buffer::create_small_page_aligned(block_size
);
692 char* bpdata
= bp
.c_str();
693 int r
= ::pread(fd
, bpdata
, bp
.length(), 0);
697 dout(0) << "read_header got " << cpp_strerror(err
) << dendl
;
701 // don't use bp.zero() here, because it also invalidates
702 // crc cache (which is not yet populated anyway)
703 if (bp
.length() != (size_t)r
) {
704 // r will be always less or equal than bp.length
706 memset(bpdata
, 0, bp
.length() - r
);
709 bl
.push_back(std::move(bp
));
712 auto p
= bl
.cbegin();
715 catch (ceph::buffer::error
& e
) {
716 derr
<< "read_header error decoding journal header" << dendl
;
722 * Unfortunately we weren't initializing the flags field for new
723 * journals! Aie. This is safe(ish) now that we have only one
724 * flag. Probably around when we add the next flag we need to
725 * remove this or else this (eventually old) code will clobber newer
728 if (hdr
->flags
> 3) {
729 derr
<< "read_header appears to have gibberish flags; assuming 0" << dendl
;
738 bufferptr
FileJournal::prepare_header()
742 std::lock_guard l
{finisher_lock
};
743 header
.committed_up_to
= journaled_seq
;
746 bufferptr bp
= ceph::buffer::create_small_page_aligned(get_top());
747 // don't use bp.zero() here, because it also invalidates
748 // crc cache (which is not yet populated anyway)
749 char* data
= bp
.c_str();
750 memcpy(data
, bl
.c_str(), bl
.length());
752 memset(data
, 0, bp
.length()-bl
.length());
756 void FileJournal::write_header_sync()
758 std::lock_guard locker
{write_lock
};
759 must_write_header
= true;
762 dout(20) << __func__
<< " finish" << dendl
;
765 int FileJournal::check_for_full(uint64_t seq
, off64_t pos
, off64_t size
)
768 if (full_state
!= FULL_NOTFULL
)
771 // take 1 byte off so that we only get pos == header.start on EMPTY, never on FULL.
773 if (pos
>= header
.start
)
774 room
= (header
.max_size
- pos
) + (header
.start
- get_top()) - 1;
776 room
= header
.start
- pos
- 1;
777 dout(10) << "room " << room
<< " max_size " << max_size
<< " pos " << pos
<< " header.start " << header
.start
778 << " top " << get_top() << dendl
;
781 if (room
>= (header
.max_size
>> 1) &&
782 room
- size
< (header
.max_size
>> 1)) {
783 dout(10) << " passing half full mark, triggering commit" << dendl
;
784 #ifdef CEPH_DEBUG_MUTEX
785 do_sync_cond
->notify_all(true); // initiate a real commit so we can trim
787 do_sync_cond
->notify_all();
793 dout(10) << "check_for_full at " << pos
<< " : " << size
<< " < " << room
<< dendl
;
794 if (pos
+ size
> header
.max_size
)
795 must_write_header
= true;
800 dout(1) << "check_for_full at " << pos
<< " : JOURNAL FULL "
801 << pos
<< " >= " << room
802 << " (max_size " << header
.max_size
<< " start " << header
.start
<< ")"
805 off64_t max
= header
.max_size
- get_top();
807 dout(0) << "JOURNAL TOO SMALL: continuing, but slow: item " << size
<< " > journal " << max
<< " (usable)" << dendl
;
812 int FileJournal::prepare_multi_write(bufferlist
& bl
, uint64_t& orig_ops
, uint64_t& orig_bytes
)
814 // gather queued writes
815 off64_t queue_pos
= write_pos
;
817 int eleft
= cct
->_conf
->journal_max_write_entries
;
818 unsigned bmax
= cct
->_conf
->journal_max_write_bytes
;
820 if (full_state
!= FULL_NOTFULL
)
823 while (!writeq_empty()) {
824 list
<write_item
> items
;
825 batch_pop_write(items
);
826 list
<write_item
>::iterator it
= items
.begin();
827 while (it
!= items
.end()) {
828 uint64_t bytes
= it
->bl
.length();
829 int r
= prepare_single_write(*it
, bl
, queue_pos
, orig_ops
, orig_bytes
);
830 if (r
== 0) { // prepare ok, delete it
834 std::lock_guard locker
{aio_lock
};
835 ceph_assert(aio_write_queue_ops
> 0);
836 aio_write_queue_ops
--;
837 ceph_assert(aio_write_queue_bytes
>= bytes
);
838 aio_write_queue_bytes
-= bytes
;
845 // the journal maybe full, insert the left item to writeq
846 batch_unpop_write(items
);
848 goto out
; // commit what we have
851 logger
->inc(l_filestore_journal_full
);
854 dout(20) << "prepare_multi_write full on first entry, need to wait" << dendl
;
856 dout(20) << "prepare_multi_write full on first entry, restarting journal" << dendl
;
858 // throw out what we have so far
859 full_state
= FULL_FULL
;
860 while (!writeq_empty()) {
861 complete_write(1, peek_write().orig_len
);
864 print_header(header
);
867 return -ENOSPC
; // hrm, full on first op
871 dout(20) << "prepare_multi_write hit max events per write "
872 << cct
->_conf
->journal_max_write_entries
<< dendl
;
873 batch_unpop_write(items
);
878 if (bl
.length() >= bmax
) {
879 dout(20) << "prepare_multi_write hit max write size "
880 << cct
->_conf
->journal_max_write_bytes
<< dendl
;
881 batch_unpop_write(items
);
889 dout(20) << "prepare_multi_write queue_pos now " << queue_pos
<< dendl
;
890 ceph_assert((write_pos
+ bl
.length() == queue_pos
) ||
891 (write_pos
+ bl
.length() - header
.max_size
+ get_top() == queue_pos
));
896 void FileJournal::queue_write_fin(uint64_t seq, Context *fin)
898 writing_seq.push_back(seq);
899 if (!waiting_for_notfull.empty()) {
900 // make sure previously unjournaled stuff waiting for UNFULL triggers
901 // _before_ newly journaled stuff does
902 dout(10) << "queue_write_fin will defer seq " << seq << " callback " << fin
903 << " until after UNFULL" << dendl;
904 C_Gather *g = new C_Gather(writeq.front().fin);
905 writing_fin.push_back(g->new_sub());
906 waiting_for_notfull.push_back(g->new_sub());
908 writing_fin.push_back(writeq.front().fin);
909 dout(20) << "queue_write_fin seq " << seq << " callback " << fin << dendl;
914 void FileJournal::queue_completions_thru(uint64_t seq
)
916 ceph_assert(ceph_mutex_is_locked(finisher_lock
));
917 utime_t now
= ceph_clock_now();
918 list
<completion_item
> items
;
919 batch_pop_completions(items
);
920 list
<completion_item
>::iterator it
= items
.begin();
921 while (it
!= items
.end()) {
922 completion_item
& next
= *it
;
927 dout(10) << "queue_completions_thru seq " << seq
928 << " queueing seq " << next
.seq
929 << " " << next
.finish
930 << " lat " << lat
<< dendl
;
932 logger
->tinc(l_filestore_journal_latency
, lat
);
935 finisher
->queue(next
.finish
);
936 if (next
.tracked_op
) {
937 next
.tracked_op
->mark_event("journaled_completion_queued");
938 next
.tracked_op
->journal_trace
.event("queued completion");
939 next
.tracked_op
->journal_trace
.keyval("completed through", seq
);
943 batch_unpop_completions(items
);
944 finisher_cond
.notify_all();
948 int FileJournal::prepare_single_write(write_item
&next_write
, bufferlist
& bl
, off64_t
& queue_pos
, uint64_t& orig_ops
, uint64_t& orig_bytes
)
950 uint64_t seq
= next_write
.seq
;
951 bufferlist
&ebl
= next_write
.bl
;
952 off64_t size
= ebl
.length();
954 int r
= check_for_full(seq
, queue_pos
, size
);
956 return r
; // ENOSPC or EAGAIN
958 uint32_t orig_len
= next_write
.orig_len
;
959 orig_bytes
+= orig_len
;
962 // add to write buffer
963 dout(15) << "prepare_single_write " << orig_ops
<< " will write " << queue_pos
<< " : seq " << seq
964 << " len " << orig_len
<< " -> " << size
<< dendl
;
966 unsigned seq_offset
= offsetof(entry_header_t
, seq
);
967 unsigned magic1_offset
= offsetof(entry_header_t
, magic1
);
968 unsigned magic2_offset
= offsetof(entry_header_t
, magic2
);
970 bufferptr headerptr
= ebl
.buffers().front();
972 uint64_t _queue_pos
= queue_pos
;
973 uint64_t magic2
= entry_header_t::make_magic(seq
, orig_len
, header
.get_fsid64());
974 headerptr
.copy_in(seq_offset
, sizeof(uint64_t), (char *)&_seq
);
975 headerptr
.copy_in(magic1_offset
, sizeof(uint64_t), (char *)&_queue_pos
);
976 headerptr
.copy_in(magic2_offset
, sizeof(uint64_t), (char *)&magic2
);
978 bufferptr footerptr
= ebl
.buffers().back();
979 unsigned post_offset
= footerptr
.length() - sizeof(entry_header_t
);
980 footerptr
.copy_in(post_offset
+ seq_offset
, sizeof(uint64_t), (char *)&_seq
);
981 footerptr
.copy_in(post_offset
+ magic1_offset
, sizeof(uint64_t), (char *)&_queue_pos
);
982 footerptr
.copy_in(post_offset
+ magic2_offset
, sizeof(uint64_t), (char *)&magic2
);
984 bl
.claim_append(ebl
);
985 if (next_write
.tracked_op
) {
986 next_write
.tracked_op
->mark_event("write_thread_in_journal_buffer");
987 next_write
.tracked_op
->journal_trace
.event("prepare_single_write");
990 journalq
.push_back(pair
<uint64_t,off64_t
>(seq
, queue_pos
));
994 if (queue_pos
>= header
.max_size
)
995 queue_pos
= queue_pos
+ get_top() - header
.max_size
;
1000 void FileJournal::check_align(off64_t pos
, bufferlist
& bl
)
1002 // make sure list segments are page aligned
1003 if (directio
&& !bl
.is_aligned_size_and_memory(block_size
, CEPH_DIRECTIO_ALIGNMENT
)) {
1004 ceph_assert((bl
.length() & (CEPH_DIRECTIO_ALIGNMENT
- 1)) == 0);
1005 ceph_assert((pos
& (CEPH_DIRECTIO_ALIGNMENT
- 1)) == 0);
1006 ceph_abort_msg("bl was not aligned");
1010 int FileJournal::write_bl(off64_t
& pos
, bufferlist
& bl
)
1014 off64_t spos
= ::lseek64(fd
, pos
, SEEK_SET
);
1017 derr
<< "FileJournal::write_bl : lseek64 failed " << cpp_strerror(ret
) << dendl
;
1020 ret
= bl
.write_fd(fd
);
1022 derr
<< "FileJournal::write_bl : write_fd failed: " << cpp_strerror(ret
) << dendl
;
1026 if (pos
== header
.max_size
)
1031 void FileJournal::do_write(bufferlist
& bl
)
1034 if (bl
.length() == 0 && !must_write_header
)
1037 ceph::buffer::ptr hbp
;
1038 if (cct
->_conf
->journal_write_header_frequency
&&
1039 (((++journaled_since_start
) %
1040 cct
->_conf
->journal_write_header_frequency
) == 0)) {
1041 must_write_header
= true;
1044 if (must_write_header
) {
1045 must_write_header
= false;
1046 hbp
= prepare_header();
1049 dout(15) << "do_write writing " << write_pos
<< "~" << bl
.length()
1050 << (hbp
.length() ? " + header":"")
1053 utime_t from
= ceph_clock_now();
1056 off64_t pos
= write_pos
;
1059 write_pos
+= bl
.length();
1060 if (write_pos
>= header
.max_size
)
1061 write_pos
= write_pos
- header
.max_size
+ get_top();
1063 write_lock
.unlock();
1067 if (pos
+ bl
.length() > header
.max_size
) {
1068 bufferlist first
, second
;
1069 split
= header
.max_size
- pos
;
1070 first
.substr_of(bl
, 0, split
);
1071 second
.substr_of(bl
, split
, bl
.length() - split
);
1072 ceph_assert(first
.length() + second
.length() == bl
.length());
1073 dout(10) << "do_write wrapping, first bit at " << pos
<< " len " << first
.length()
1074 << " second bit len " << second
.length() << " (orig len " << bl
.length() << ")" << dendl
;
1076 //Save pos to write first piece second
1077 off64_t first_pos
= pos
;
1082 // be sneaky: include the header in the second fragment
1085 tmp
.claim_append(second
);
1087 pos
= 0; // we included the header
1089 // Write the second portion first possible with the header, so
1090 // do_read_entry() won't even get a valid entry_header_t if there
1091 // is a crash between the two writes.
1093 if (write_bl(pos
, second
)) {
1094 derr
<< "FileJournal::do_write: write_bl(pos=" << orig_pos
1095 << ") failed" << dendl
;
1096 check_align(pos
, second
);
1099 orig_pos
= first_pos
;
1100 if (write_bl(first_pos
, first
)) {
1101 derr
<< "FileJournal::do_write: write_bl(pos=" << orig_pos
1102 << ") failed" << dendl
;
1103 check_align(first_pos
, first
);
1106 ceph_assert(first_pos
== get_top());
1110 if (TEMP_FAILURE_RETRY(::pwrite(fd
, hbp
.c_str(), hbp
.length(), 0)) < 0) {
1112 derr
<< "FileJournal::do_write: pwrite(fd=" << fd
1113 << ", hbp.length=" << hbp
.length() << ") failed :"
1114 << cpp_strerror(err
) << dendl
;
1119 if (write_bl(pos
, bl
)) {
1120 derr
<< "FileJournal::do_write: write_bl(pos=" << pos
1121 << ") failed" << dendl
;
1122 check_align(pos
, bl
);
1128 dout(20) << "do_write fsync" << dendl
;
1131 * We'd really love to have a fsync_range or fdatasync_range and do a:
1134 * ::fsync_range(fd, header.max_size - split, split)l
1135 * ::fsync_range(fd, get_top(), bl.length() - split);
1137 * ::fsync_range(fd, write_pos, bl.length())
1139 * NetBSD and AIX apparently have it, and adding it to Linux wouldn't be
1140 * too hard given all the underlying infrastructure already exist.
1142 * NOTE: using sync_file_range here would not be safe as it does not
1143 * flush disk caches or commits any sort of metadata.
1146 #if defined(__APPLE__) || defined(__FreeBSD__)
1149 ret
= ::fdatasync(fd
);
1152 derr
<< __func__
<< " fsync/fdatasync failed: " << cpp_strerror(errno
) << dendl
;
1155 #ifdef HAVE_POSIX_FADVISE
1156 if (cct
->_conf
->filestore_fadvise
)
1157 posix_fadvise(fd
, 0, 0, POSIX_FADV_DONTNEED
);
1161 utime_t lat
= ceph_clock_now() - from
;
1162 dout(20) << "do_write latency " << lat
<< dendl
;
1166 ceph_assert(write_pos
== pos
);
1167 ceph_assert(write_pos
% header
.alignment
== 0);
1170 std::lock_guard locker
{finisher_lock
};
1171 journaled_seq
= writing_seq
;
1174 // only if we haven't filled up recently!
1175 if (full_state
!= FULL_NOTFULL
) {
1176 dout(10) << "do_write NOT queueing finisher seq " << journaled_seq
1177 << ", full_commit_seq|full_restart_seq" << dendl
;
1179 if (plug_journal_completions
) {
1180 dout(20) << "do_write NOT queueing finishers through seq " << journaled_seq
1181 << " due to completion plug" << dendl
;
1183 dout(20) << "do_write queueing finishers through seq " << journaled_seq
<< dendl
;
1184 queue_completions_thru(journaled_seq
);
1190 void FileJournal::flush()
1192 dout(10) << "waiting for completions to empty" << dendl
;
1194 std::unique_lock l
{finisher_lock
};
1195 finisher_cond
.wait(l
, [this] { return completions_empty(); });
1197 dout(10) << "flush waiting for finisher" << dendl
;
1198 finisher
->wait_for_empty();
1199 dout(10) << "flush done" << dendl
;
1203 void FileJournal::write_thread_entry()
1205 dout(10) << "write_thread_entry start" << dendl
;
1208 std::unique_lock locker
{writeq_lock
};
1209 if (writeq
.empty() && !must_write_header
) {
1212 dout(20) << "write_thread_entry going to sleep" << dendl
;
1213 writeq_cond
.wait(locker
);
1214 dout(20) << "write_thread_entry woke up" << dendl
;
1221 std::unique_lock locker
{aio_lock
};
1222 // should we back off to limit aios in flight? try to do this
1223 // adaptively so that we submit larger aios once we have lots of
1226 // NOTE: our condition here is based on aio_num (protected by
1227 // aio_lock) and throttle_bytes (part of the write queue). when
1228 // we sleep, we *only* wait for aio_num to change, and do not
1229 // wake when more data is queued. this is not strictly correct,
1230 // but should be fine given that we will have plenty of aios in
1231 // flight if we hit this limit to ensure we keep the device
1233 while (aio_num
> 0) {
1234 int exp
= std::min
<int>(aio_num
* 2, 24);
1235 long unsigned min_new
= 1ull << exp
;
1236 uint64_t cur
= aio_write_queue_bytes
;
1237 dout(20) << "write_thread_entry aio throttle: aio num " << aio_num
<< " bytes " << aio_bytes
1238 << " ... exp " << exp
<< " min_new " << min_new
1239 << " ... pending " << cur
<< dendl
;
1242 dout(20) << "write_thread_entry deferring until more aios complete: "
1243 << aio_num
<< " aios with " << aio_bytes
<< " bytes needs " << min_new
1244 << " bytes to start a new aio (currently " << cur
<< " pending)" << dendl
;
1245 aio_cond
.wait(locker
);
1246 dout(20) << "write_thread_entry woke up" << dendl
;
1251 std::unique_lock locker
{write_lock
};
1252 uint64_t orig_ops
= 0;
1253 uint64_t orig_bytes
= 0;
1256 int r
= prepare_multi_write(bl
, orig_ops
, orig_bytes
);
1257 // Don't care about journal full if stoppping, so drop queue and
1258 // possibly let header get written and loop above to notice stop
1261 dout(20) << "write_thread_entry full and stopping, throw out queue and finish up" << dendl
;
1262 while (!writeq_empty()) {
1263 complete_write(1, peek_write().orig_len
);
1266 print_header(header
);
1269 dout(20) << "write_thread_entry full, going to sleep (waiting for commit)" << dendl
;
1270 commit_cond
.wait(locker
);
1271 dout(20) << "write_thread_entry woke up" << dendl
;
1275 ceph_assert(r
== 0);
1278 logger
->inc(l_filestore_journal_wr
);
1279 logger
->inc(l_filestore_journal_wr_bytes
, bl
.length());
1290 complete_write(orig_ops
, orig_bytes
);
1293 dout(10) << "write_thread_entry finish" << dendl
;
1297 void FileJournal::do_aio_write(bufferlist
& bl
)
1300 if (cct
->_conf
->journal_write_header_frequency
&&
1301 (((++journaled_since_start
) %
1302 cct
->_conf
->journal_write_header_frequency
) == 0)) {
1303 must_write_header
= true;
1307 if (bl
.length() == 0 && !must_write_header
)
1310 ceph::buffer::ptr hbp
;
1311 if (must_write_header
) {
1312 must_write_header
= false;
1313 hbp
= prepare_header();
1317 off64_t pos
= write_pos
;
1319 dout(15) << "do_aio_write writing " << pos
<< "~" << bl
.length()
1320 << (hbp
.length() ? " + header":"")
1325 if (pos
+ bl
.length() > header
.max_size
) {
1326 bufferlist first
, second
;
1327 split
= header
.max_size
- pos
;
1328 first
.substr_of(bl
, 0, split
);
1329 second
.substr_of(bl
, split
, bl
.length() - split
);
1330 ceph_assert(first
.length() + second
.length() == bl
.length());
1331 dout(10) << "do_aio_write wrapping, first bit at " << pos
<< "~" << first
.length() << dendl
;
1333 if (write_aio_bl(pos
, first
, 0)) {
1334 derr
<< "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
1335 << ") failed" << dendl
;
1338 ceph_assert(pos
== header
.max_size
);
1340 // be sneaky: include the header in the second fragment
1343 tmp
.claim_append(second
);
1345 pos
= 0; // we included the header
1347 pos
= get_top(); // no header, start after that
1348 if (write_aio_bl(pos
, second
, writing_seq
)) {
1349 derr
<< "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
1350 << ") failed" << dendl
;
1359 if (write_aio_bl(pos
, hbl
, 0)) {
1360 derr
<< "FileJournal::do_aio_write: write_aio_bl(header) failed" << dendl
;
1365 if (write_aio_bl(pos
, bl
, writing_seq
)) {
1366 derr
<< "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
1367 << ") failed" << dendl
;
1373 if (write_pos
== header
.max_size
)
1374 write_pos
= get_top();
1375 ceph_assert(write_pos
% header
.alignment
== 0);
1379 * write a buffer using aio
1381 * @param seq seq to trigger when this aio completes. if 0, do not update any state
1384 int FileJournal::write_aio_bl(off64_t
& pos
, bufferlist
& bl
, uint64_t seq
)
1386 dout(20) << "write_aio_bl " << pos
<< "~" << bl
.length() << " seq " << seq
<< dendl
;
1388 while (bl
.length() > 0) {
1389 int max
= std::min
<int>(bl
.get_num_buffers(), IOV_MAX
-1);
1390 iovec
*iov
= new iovec
[max
];
1393 for (auto p
= std::cbegin(bl
.buffers()); n
< max
; ++p
, ++n
) {
1394 ceph_assert(p
!= std::cend(bl
.buffers()));
1395 iov
[n
].iov_base
= const_cast<void*>(static_cast<const void*>(p
->c_str()));
1396 iov
[n
].iov_len
= p
->length();
1401 bl
.splice(0, len
, &tbl
); // move bytes from bl -> tbl
1403 // lock only aio_queue, current aio, aio_num, aio_bytes, which may be
1404 // modified in check_aio_completion
1406 aio_queue
.push_back(aio_info(tbl
, pos
, bl
.length() > 0 ? 0 : seq
));
1407 aio_info
& aio
= aio_queue
.back();
1410 io_prep_pwritev(&aio
.iocb
, fd
, aio
.iov
, n
, pos
);
1412 dout(20) << "write_aio_bl .. " << aio
.off
<< "~" << aio
.len
1413 << " in " << n
<< dendl
;
1416 aio_bytes
+= aio
.len
;
1418 // need to save current aio len to update write_pos later because current
1419 // aio could be ereased from aio_queue once it is done
1420 uint64_t cur_len
= aio
.len
;
1421 // unlock aio_lock because following io_submit might take time to return
1424 iocb
*piocb
= &aio
.iocb
;
1426 // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds
1430 int r
= io_submit(aio_ctx
, 1, &piocb
);
1431 dout(20) << "write_aio_bl io_submit return value: " << r
<< dendl
;
1433 derr
<< "io_submit to " << aio
.off
<< "~" << cur_len
1434 << " got " << cpp_strerror(r
) << dendl
;
1435 if (r
== -EAGAIN
&& attempts
-- > 0) {
1440 check_align(pos
, tbl
);
1441 ceph_abort_msg("io_submit got unexpected error");
1449 write_finish_cond
.notify_all();
1455 void FileJournal::write_finish_thread_entry()
1458 dout(10) << __func__
<< " enter" << dendl
;
1461 std::unique_lock locker
{aio_lock
};
1462 if (aio_queue
.empty()) {
1465 dout(20) << __func__
<< " sleeping" << dendl
;
1466 write_finish_cond
.wait(locker
);
1471 dout(20) << __func__
<< " waiting for aio(s)" << dendl
;
1473 int r
= io_getevents(aio_ctx
, 1, 16, event
, NULL
);
1476 dout(0) << "io_getevents got " << cpp_strerror(r
) << dendl
;
1479 derr
<< "io_getevents got " << cpp_strerror(r
) << dendl
;
1481 note_io_error_event(devname
.c_str(), fn
.c_str(), -EIO
, 0, 0, 0);
1483 ceph_abort_msg("got unexpected error from io_getevents");
1487 std::lock_guard locker
{aio_lock
};
1488 for (int i
=0; i
<r
; i
++) {
1489 aio_info
*ai
= (aio_info
*)event
[i
].obj
;
1490 if (event
[i
].res
!= ai
->len
) {
1491 derr
<< "aio to " << ai
->off
<< "~" << ai
->len
1492 << " returned: " << (int)event
[i
].res
<< dendl
;
1493 ceph_abort_msg("unexpected aio error");
1495 dout(10) << __func__
<< " aio " << ai
->off
1496 << "~" << ai
->len
<< " done" << dendl
;
1499 check_aio_completion();
1502 dout(10) << __func__
<< " exit" << dendl
;
1508 * check aio_wait for completed aio, and update state appropriately.
1510 void FileJournal::check_aio_completion()
1512 ceph_assert(ceph_mutex_is_locked(aio_lock
));
1513 dout(20) << "check_aio_completion" << dendl
;
1515 bool completed_something
= false, signal
= false;
1516 uint64_t new_journaled_seq
= 0;
1518 list
<aio_info
>::iterator p
= aio_queue
.begin();
1519 while (p
!= aio_queue
.end() && p
->done
) {
1520 dout(20) << "check_aio_completion completed seq " << p
->seq
<< " "
1521 << p
->off
<< "~" << p
->len
<< dendl
;
1523 new_journaled_seq
= p
->seq
;
1524 completed_something
= true;
1527 aio_bytes
-= p
->len
;
1528 aio_queue
.erase(p
++);
1532 if (completed_something
) {
1534 // only if we haven't filled up recently!
1535 std::lock_guard locker
{finisher_lock
};
1536 journaled_seq
= new_journaled_seq
;
1537 if (full_state
!= FULL_NOTFULL
) {
1538 dout(10) << "check_aio_completion NOT queueing finisher seq " << journaled_seq
1539 << ", full_commit_seq|full_restart_seq" << dendl
;
1541 if (plug_journal_completions
) {
1542 dout(20) << "check_aio_completion NOT queueing finishers through seq " << journaled_seq
1543 << " due to completion plug" << dendl
;
1545 dout(20) << "check_aio_completion queueing finishers through seq " << journaled_seq
<< dendl
;
1546 queue_completions_thru(journaled_seq
);
1551 // maybe write queue was waiting for aio count to drop?
1552 aio_cond
.notify_all();
1557 int FileJournal::prepare_entry(vector
<ObjectStore::Transaction
>& tls
, bufferlist
* tbl
) {
1558 dout(10) << "prepare_entry " << tls
<< dendl
;
1559 int data_len
= cct
->_conf
->journal_align_min_size
- 1;
1560 int data_align
= -1; // -1 indicates that we don't care about the alignment
1562 for (vector
<ObjectStore::Transaction
>::iterator p
= tls
.begin();
1563 p
!= tls
.end(); ++p
) {
1564 if ((int)(*p
).get_data_length() > data_len
) {
1565 data_len
= (*p
).get_data_length();
1566 data_align
= ((*p
).get_data_alignment() - bl
.length()) & ~CEPH_PAGE_MASK
;
1570 if (tbl
->length()) {
1571 bl
.claim_append(*tbl
);
1573 // add it this entry
1575 unsigned head_size
= sizeof(entry_header_t
);
1576 off64_t base_size
= 2*head_size
+ bl
.length();
1577 memset(&h
, 0, sizeof(h
));
1578 if (data_align
>= 0)
1579 h
.pre_pad
= ((unsigned int)data_align
- (unsigned int)head_size
) & ~CEPH_PAGE_MASK
;
1580 off64_t size
= round_up_to(base_size
+ h
.pre_pad
, header
.alignment
);
1581 unsigned post_pad
= size
- base_size
- h
.pre_pad
;
1582 h
.len
= bl
.length();
1583 h
.post_pad
= post_pad
;
1584 h
.crc32c
= bl
.crc32c(0);
1585 dout(10) << " len " << bl
.length() << " -> " << size
1586 << " (head " << head_size
<< " pre_pad " << h
.pre_pad
1587 << " bl " << bl
.length() << " post_pad " << post_pad
<< " tail " << head_size
<< ")"
1588 << " (bl alignment " << data_align
<< ")"
1592 ebl
.append((const char*)&h
, sizeof(h
));
1594 ebl
.push_back(ceph::buffer::create_static(h
.pre_pad
, zero_buf
));
1597 ebl
.claim_append(bl
);
1599 ebl
.push_back(ceph::buffer::create_static(h
.post_pad
, zero_buf
));
1602 ebl
.append((const char*)&h
, sizeof(h
));
1604 ebl
.rebuild_aligned(CEPH_DIRECTIO_ALIGNMENT
);
1605 *tbl
= std::move(ebl
);
1609 void FileJournal::submit_entry(uint64_t seq
, bufferlist
& e
, uint32_t orig_len
,
1610 Context
*oncommit
, TrackedOpRef osd_op
)
1613 dout(5) << "submit_entry seq " << seq
1614 << " len " << e
.length()
1615 << " (" << oncommit
<< ")" << dendl
;
1616 ceph_assert(e
.length() > 0);
1617 ceph_assert(e
.length() < header
.max_size
);
1620 logger
->inc(l_filestore_journal_queue_bytes
, orig_len
);
1621 logger
->inc(l_filestore_journal_queue_ops
, 1);
1624 throttle
.register_throttle_seq(seq
, e
.length());
1626 logger
->inc(l_filestore_journal_ops
, 1);
1627 logger
->inc(l_filestore_journal_bytes
, e
.length());
1631 osd_op
->mark_event("commit_queued_for_journal_write");
1632 if (osd_op
->store_trace
) {
1633 osd_op
->journal_trace
.init("journal", &trace_endpoint
, &osd_op
->store_trace
);
1634 osd_op
->journal_trace
.event("submit_entry");
1635 osd_op
->journal_trace
.keyval("seq", seq
);
1639 std::lock_guard l1
{writeq_lock
};
1641 std::lock_guard l2
{aio_lock
};
1643 std::lock_guard l3
{completions_lock
};
1646 aio_write_queue_ops
++;
1647 aio_write_queue_bytes
+= e
.length();
1648 aio_cond
.notify_all();
1651 completions
.push_back(
1653 seq
, oncommit
, ceph_clock_now(), osd_op
));
1655 writeq_cond
.notify_all();
1656 writeq
.push_back(write_item(seq
, e
, orig_len
, osd_op
));
1658 osd_op
->journal_trace
.keyval("queue depth", writeq
.size());
1662 bool FileJournal::writeq_empty()
1664 std::lock_guard locker
{writeq_lock
};
1665 return writeq
.empty();
1668 FileJournal::write_item
&FileJournal::peek_write()
1670 ceph_assert(ceph_mutex_is_locked(write_lock
));
1671 std::lock_guard locker
{writeq_lock
};
1672 return writeq
.front();
1675 void FileJournal::pop_write()
1677 ceph_assert(ceph_mutex_is_locked(write_lock
));
1678 std::lock_guard locker
{writeq_lock
};
1680 logger
->dec(l_filestore_journal_queue_bytes
, writeq
.front().orig_len
);
1681 logger
->dec(l_filestore_journal_queue_ops
, 1);
1686 void FileJournal::batch_pop_write(list
<write_item
> &items
)
1688 ceph_assert(ceph_mutex_is_locked(write_lock
));
1690 std::lock_guard locker
{writeq_lock
};
1693 for (auto &&i
: items
) {
1695 logger
->dec(l_filestore_journal_queue_bytes
, i
.orig_len
);
1696 logger
->dec(l_filestore_journal_queue_ops
, 1);
1701 void FileJournal::batch_unpop_write(list
<write_item
> &items
)
1703 ceph_assert(ceph_mutex_is_locked(write_lock
));
1704 for (auto &&i
: items
) {
1706 logger
->inc(l_filestore_journal_queue_bytes
, i
.orig_len
);
1707 logger
->inc(l_filestore_journal_queue_ops
, 1);
1710 std::lock_guard locker
{writeq_lock
};
1711 writeq
.splice(writeq
.begin(), items
);
1714 void FileJournal::commit_start(uint64_t seq
)
1716 dout(10) << "commit_start" << dendl
;
1719 switch (full_state
) {
1724 if (seq
>= journaled_seq
) {
1725 dout(1) << " FULL_FULL -> FULL_WAIT. commit_start on seq "
1726 << seq
<< " > journaled_seq " << journaled_seq
1727 << ", moving to FULL_WAIT."
1729 full_state
= FULL_WAIT
;
1731 dout(1) << "FULL_FULL commit_start on seq "
1732 << seq
<< " < journaled_seq " << journaled_seq
1733 << ", remaining in FULL_FULL"
1739 dout(1) << " FULL_WAIT -> FULL_NOTFULL. journal now active, setting completion plug." << dendl
;
1740 full_state
= FULL_NOTFULL
;
1741 plug_journal_completions
= true;
1747 *send discard command to joural block deivce
1749 void FileJournal::do_discard(int64_t offset
, int64_t end
)
1751 dout(10) << __func__
<< " trim(" << offset
<< ", " << end
<< dendl
;
1753 offset
= round_up_to(offset
, block_size
);
1756 end
= round_up_to(end
- block_size
, block_size
);
1757 ceph_assert(end
>= offset
);
1760 if (blkdev
.discard(offset
, end
- offset
) < 0) {
1761 dout(1) << __func__
<< "ioctl(BLKDISCARD) error:" << cpp_strerror(errno
) << dendl
;
1766 void FileJournal::committed_thru(uint64_t seq
)
1768 std::lock_guard locker
{write_lock
};
1770 auto released
= throttle
.flush(seq
);
1772 logger
->dec(l_filestore_journal_ops
, released
.first
);
1773 logger
->dec(l_filestore_journal_bytes
, released
.second
);
1776 if (seq
< last_committed_seq
) {
1777 dout(5) << "committed_thru " << seq
<< " < last_committed_seq " << last_committed_seq
<< dendl
;
1778 ceph_assert(seq
>= last_committed_seq
);
1781 if (seq
== last_committed_seq
) {
1782 dout(5) << "committed_thru " << seq
<< " == last_committed_seq " << last_committed_seq
<< dendl
;
1786 dout(5) << "committed_thru " << seq
<< " (last_committed_seq " << last_committed_seq
<< ")" << dendl
;
1787 last_committed_seq
= seq
;
1791 std::lock_guard locker
{finisher_lock
};
1792 queue_completions_thru(seq
);
1793 if (plug_journal_completions
&& seq
>= header
.start_seq
) {
1794 dout(10) << " removing completion plug, queuing completions thru journaled_seq " << journaled_seq
<< dendl
;
1795 plug_journal_completions
= false;
1796 queue_completions_thru(journaled_seq
);
1800 // adjust start pointer
1801 while (!journalq
.empty() && journalq
.front().first
<= seq
) {
1802 journalq
.pop_front();
1805 int64_t old_start
= header
.start
;
1806 if (!journalq
.empty()) {
1807 header
.start
= journalq
.front().second
;
1808 header
.start_seq
= journalq
.front().first
;
1810 header
.start
= write_pos
;
1811 header
.start_seq
= seq
+ 1;
1815 dout(10) << __func__
<< " will trim (" << old_start
<< ", " << header
.start
<< ")" << dendl
;
1816 if (old_start
< header
.start
)
1817 do_discard(old_start
, header
.start
- 1);
1819 do_discard(old_start
, header
.max_size
- 1);
1820 do_discard(get_top(), header
.start
- 1);
1824 must_write_header
= true;
1825 print_header(header
);
1827 // committed but unjournaled items
1828 while (!writeq_empty() && peek_write().seq
<= seq
) {
1829 dout(15) << " dropping committed but unwritten seq " << peek_write().seq
1830 << " len " << peek_write().bl
.length()
1832 complete_write(1, peek_write().orig_len
);
1836 commit_cond
.notify_all();
1838 dout(10) << "committed_thru done" << dendl
;
1842 void FileJournal::complete_write(uint64_t ops
, uint64_t bytes
)
1844 dout(5) << __func__
<< " finished " << ops
<< " ops and "
1845 << bytes
<< " bytes" << dendl
;
1848 int FileJournal::make_writeable()
1850 dout(10) << __func__
<< dendl
;
1851 int r
= set_throttle_params();
1860 write_pos
= read_pos
;
1862 write_pos
= get_top();
1865 must_write_header
= true;
1871 int FileJournal::set_throttle_params()
1874 bool valid
= throttle
.set_params(
1875 cct
->_conf
->journal_throttle_low_threshhold
,
1876 cct
->_conf
->journal_throttle_high_threshhold
,
1877 cct
->_conf
->filestore_expected_throughput_bytes
,
1878 cct
->_conf
->journal_throttle_high_multiple
,
1879 cct
->_conf
->journal_throttle_max_multiple
,
1880 header
.max_size
- get_top(),
1884 derr
<< "tried to set invalid params: "
1888 return valid
? 0 : -EINVAL
;
1891 const char** FileJournal::get_tracked_conf_keys() const
1893 static const char *KEYS
[] = {
1894 "journal_throttle_low_threshhold",
1895 "journal_throttle_high_threshhold",
1896 "journal_throttle_high_multiple",
1897 "journal_throttle_max_multiple",
1898 "filestore_expected_throughput_bytes",
1903 void FileJournal::wrap_read_bl(
1911 while (pos
>= header
.max_size
)
1912 pos
= pos
+ get_top() - header
.max_size
;
1915 if (pos
+ olen
> header
.max_size
)
1916 len
= header
.max_size
- pos
; // partial
1920 int64_t actual
= ::lseek64(fd
, pos
, SEEK_SET
);
1921 ceph_assert(actual
== pos
);
1923 bufferptr bp
= ceph::buffer::create(len
);
1924 int r
= safe_read_exact(fd
, bp
.c_str(), len
);
1926 derr
<< "FileJournal::wrap_read_bl: safe_read_exact " << pos
<< "~" << len
<< " returned "
1927 << cpp_strerror(r
) << dendl
;
1930 bl
->push_back(std::move(bp
));
1934 if (pos
>= header
.max_size
)
1935 pos
= pos
+ get_top() - header
.max_size
;
1940 bool FileJournal::read_entry(
1947 uint64_t seq
= next_seq
;
1950 dout(2) << "read_entry -- not readable" << dendl
;
1954 off64_t pos
= read_pos
;
1955 off64_t next_pos
= pos
;
1957 read_entry_result result
= do_read_entry(
1963 if (result
== SUCCESS
) {
1964 journalq
.push_back( pair
<uint64_t,off64_t
>(seq
, pos
));
1965 uint64_t amount_to_take
=
1968 (header
.max_size
- pos
) + (next_pos
- get_top());
1969 throttle
.take(amount_to_take
);
1970 throttle
.register_throttle_seq(next_seq
, amount_to_take
);
1972 logger
->inc(l_filestore_journal_ops
, 1);
1973 logger
->inc(l_filestore_journal_bytes
, amount_to_take
);
1975 if (next_seq
> seq
) {
1978 read_pos
= next_pos
;
1980 if (seq
> journaled_seq
)
1981 journaled_seq
= seq
;
1985 derr
<< "do_read_entry(" << pos
<< "): " << ss
.str() << dendl
;
1988 if (seq
&& seq
< header
.committed_up_to
) {
1989 derr
<< "Unable to read past sequence " << seq
1990 << " but header indicates the journal has committed up through "
1991 << header
.committed_up_to
<< ", journal is corrupt" << dendl
;
1992 if (cct
->_conf
->journal_ignore_corruption
) {
2001 dout(2) << "No further valid entries found, journal is most likely valid"
2006 FileJournal::read_entry_result
FileJournal::do_read_entry(
2012 entry_header_t
*_h
) const
2014 off64_t cur_pos
= init_pos
;
2023 wrap_read_bl(cur_pos
, sizeof(*h
), &hbl
, &_next_pos
);
2024 h
= reinterpret_cast<entry_header_t
*>(hbl
.c_str());
2026 if (!h
->check_magic(cur_pos
, header
.get_fsid64())) {
2027 dout(25) << "read_entry " << init_pos
2028 << " : bad header magic, end of journal" << dendl
;
2030 *ss
<< "bad header magic";
2032 *next_pos
= init_pos
+ (4<<10); // check 4k ahead
2033 return MAYBE_CORRUPT
;
2035 cur_pos
= _next_pos
;
2039 cur_pos
+= h
->pre_pad
;
2042 wrap_read_bl(cur_pos
, h
->len
, bl
, &cur_pos
);
2045 cur_pos
+= h
->post_pad
;
2050 wrap_read_bl(cur_pos
, sizeof(*f
), &fbl
, &cur_pos
);
2051 f
= reinterpret_cast<entry_header_t
*>(fbl
.c_str());
2052 if (memcmp(f
, h
, sizeof(*f
))) {
2054 *ss
<< "bad footer magic, partial entry";
2056 *next_pos
= cur_pos
;
2057 return MAYBE_CORRUPT
;
2060 if ((header
.flags
& header_t::FLAG_CRC
) || // if explicitly enabled (new journal)
2061 h
->crc32c
!= 0) { // newer entry in old journal
2062 uint32_t actual_crc
= bl
->crc32c(0);
2063 if (actual_crc
!= h
->crc32c
) {
2065 *ss
<< "header crc (" << h
->crc32c
2066 << ") doesn't match body crc (" << actual_crc
<< ")";
2068 *next_pos
= cur_pos
;
2069 return MAYBE_CORRUPT
;
2074 dout(2) << "read_entry " << init_pos
<< " : seq " << h
->seq
2075 << " " << h
->len
<< " bytes"
2084 *next_pos
= cur_pos
;
2089 ceph_assert(cur_pos
% header
.alignment
== 0);
2093 void FileJournal::reserve_throttle_and_backoff(uint64_t count
)
2095 throttle
.get(count
);
2098 void FileJournal::get_header(
2099 uint64_t wanted_seq
,
2103 off64_t pos
= header
.start
;
2104 off64_t next_pos
= pos
;
2107 dout(2) << __func__
<< dendl
;
2111 read_entry_result result
= do_read_entry(
2118 if (result
== FAILURE
|| result
== MAYBE_CORRUPT
)
2120 if (seq
== wanted_seq
) {
2126 ceph_abort(); // not reachable
2129 void FileJournal::corrupt(
2133 dout(2) << __func__
<< dendl
;
2134 if (corrupt_at
>= header
.max_size
)
2135 corrupt_at
= corrupt_at
+ get_top() - header
.max_size
;
2137 int64_t actual
= ::lseek64(fd
, corrupt_at
, SEEK_SET
);
2138 ceph_assert(actual
== corrupt_at
);
2141 int r
= safe_read_exact(fd
, buf
, 1);
2142 ceph_assert(r
== 0);
2144 actual
= ::lseek64(wfd
, corrupt_at
, SEEK_SET
);
2145 ceph_assert(actual
== corrupt_at
);
2148 r
= safe_write(wfd
, buf
, 1);
2149 ceph_assert(r
== 0);
2152 void FileJournal::corrupt_payload(
2156 dout(2) << __func__
<< dendl
;
2159 get_header(seq
, &pos
, &h
);
2160 off64_t corrupt_at
=
2161 pos
+ sizeof(entry_header_t
) + h
.pre_pad
;
2162 corrupt(wfd
, corrupt_at
);
2166 void FileJournal::corrupt_footer_magic(
2170 dout(2) << __func__
<< dendl
;
2173 get_header(seq
, &pos
, &h
);
2174 off64_t corrupt_at
=
2175 pos
+ sizeof(entry_header_t
) + h
.pre_pad
+
2176 h
.len
+ h
.post_pad
+
2177 (reinterpret_cast<char*>(&h
.magic2
) - reinterpret_cast<char*>(&h
));
2178 corrupt(wfd
, corrupt_at
);
2182 void FileJournal::corrupt_header_magic(
2186 dout(2) << __func__
<< dendl
;
2189 get_header(seq
, &pos
, &h
);
2190 off64_t corrupt_at
=
2192 (reinterpret_cast<char*>(&h
.magic2
) - reinterpret_cast<char*>(&h
));
2193 corrupt(wfd
, corrupt_at
);
2196 off64_t
FileJournal::get_journal_size_estimate()
2198 off64_t size
, start
= header
.start
;
2199 if (write_pos
< start
) {
2200 size
= (max_size
- start
) + write_pos
;
2202 size
= write_pos
- start
;
2204 dout(20) << __func__
<< " journal size=" << size
<< dendl
;
2208 void FileJournal::get_devices(set
<string
> *ls
)
2212 if (int rc
= blkdev
.wholedisk(&dev_node
); rc
) {
2215 get_raw_devices(dev_node
, ls
);
2218 void FileJournal::collect_metadata(map
<string
,string
> *pm
)
2221 char partition_path
[PATH_MAX
];
2222 char dev_node
[PATH_MAX
];
2223 if (blkdev
.partition(partition_path
, PATH_MAX
)) {
2224 (*pm
)["backend_filestore_journal_partition_path"] = "unknown";
2226 (*pm
)["backend_filestore_journal_partition_path"] = string(partition_path
);
2228 if (blkdev
.wholedisk(dev_node
, PATH_MAX
)) {
2229 (*pm
)["backend_filestore_journal_dev_node"] = "unknown";
2231 (*pm
)["backend_filestore_journal_dev_node"] = string(dev_node
);