]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/filestore/FileJournal.cc
7e6a19cbf0e1f66c4037d2ff33bca696e8121df7
[ceph.git] / ceph / src / os / filestore / FileJournal.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14 #include "acconfig.h"
15
16 #include "common/debug.h"
17 #include "common/errno.h"
18 #include "common/safe_io.h"
19 #include "FileJournal.h"
20 #include "include/color.h"
21 #include "common/perf_counters.h"
22 #include "FileStore.h"
23
24 #include "include/compat.h"
25
26 #include <fcntl.h>
27 #include <limits.h>
28 #include <sstream>
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <sys/types.h>
32 #include <sys/stat.h>
33 #include <sys/mount.h>
34
35 #include "common/blkdev.h"
36 #if defined(__linux__)
37 #include "common/linux_version.h"
38 #endif
39
40 #if defined(__FreeBSD__)
41 #define O_DSYNC O_SYNC
42 #endif
43
44 #define dout_context cct
45 #define dout_subsys ceph_subsys_journal
46 #undef dout_prefix
47 #define dout_prefix *_dout << "journal "
48
49 const static int64_t ONE_MEG(1 << 20);
50 const static int CEPH_DIRECTIO_ALIGNMENT(4096);
51
52
53 int FileJournal::_open(bool forwrite, bool create)
54 {
55 int flags, ret;
56
57 if (forwrite) {
58 flags = O_RDWR;
59 if (directio)
60 flags |= O_DIRECT | O_DSYNC;
61 } else {
62 flags = O_RDONLY;
63 }
64 if (create)
65 flags |= O_CREAT;
66
67 if (fd >= 0) {
68 if (TEMP_FAILURE_RETRY(::close(fd))) {
69 int err = errno;
70 derr << "FileJournal::_open: error closing old fd: "
71 << cpp_strerror(err) << dendl;
72 }
73 }
74 fd = TEMP_FAILURE_RETRY(::open(fn.c_str(), flags, 0644));
75 if (fd < 0) {
76 int err = errno;
77 dout(2) << "FileJournal::_open unable to open journal "
78 << fn << ": " << cpp_strerror(err) << dendl;
79 return -err;
80 }
81
82 struct stat st;
83 ret = ::fstat(fd, &st);
84 if (ret) {
85 ret = errno;
86 derr << "FileJournal::_open: unable to fstat journal: " << cpp_strerror(ret) << dendl;
87 ret = -ret;
88 goto out_fd;
89 }
90
91 if (S_ISBLK(st.st_mode)) {
92 ret = _open_block_device();
93 } else if (S_ISREG(st.st_mode)) {
94 if (aio && !force_aio) {
95 derr << "FileJournal::_open: disabling aio for non-block journal. Use "
96 << "journal_force_aio to force use of aio anyway" << dendl;
97 aio = false;
98 }
99 ret = _open_file(st.st_size, st.st_blksize, create);
100 } else {
101 derr << "FileJournal::_open: wrong journal file type: " << st.st_mode
102 << dendl;
103 ret = -EINVAL;
104 }
105
106 if (ret)
107 goto out_fd;
108
109 #ifdef HAVE_LIBAIO
110 if (aio) {
111 aio_ctx = 0;
112 ret = io_setup(128, &aio_ctx);
113 if (ret < 0) {
114 switch (ret) {
115 // Contrary to naive expectations -EAGIAN means ...
116 case -EAGAIN:
117 derr << "FileJournal::_open: user's limit of aio events exceeded. "
118 << "Try increasing /proc/sys/fs/aio-max-nr" << dendl;
119 break;
120 default:
121 derr << "FileJournal::_open: unable to setup io_context " << cpp_strerror(-ret) << dendl;
122 break;
123 }
124 goto out_fd;
125 }
126 }
127 #endif
128
129 /* We really want max_size to be a multiple of block_size. */
130 max_size -= max_size % block_size;
131
132 dout(1) << "_open " << fn << " fd " << fd
133 << ": " << max_size
134 << " bytes, block size " << block_size
135 << " bytes, directio = " << directio
136 << ", aio = " << aio
137 << dendl;
138 return 0;
139
140 out_fd:
141 VOID_TEMP_FAILURE_RETRY(::close(fd));
142 fd = -1;
143 return ret;
144 }
145
146 int FileJournal::_open_block_device()
147 {
148 int64_t bdev_sz = 0;
149 int ret = get_block_device_size(fd, &bdev_sz);
150 if (ret) {
151 dout(0) << __func__ << ": failed to read block device size." << dendl;
152 return -EIO;
153 }
154
155 /* Check for bdev_sz too small */
156 if (bdev_sz < ONE_MEG) {
157 dout(0) << __func__ << ": your block device must be at least "
158 << ONE_MEG << " bytes to be used for a Ceph journal." << dendl;
159 return -EINVAL;
160 }
161
162 dout(10) << __func__ << ": ignoring osd journal size. "
163 << "We'll use the entire block device (size: " << bdev_sz << ")"
164 << dendl;
165 max_size = bdev_sz;
166
167 block_size = cct->_conf->journal_block_size;
168
169 if (cct->_conf->journal_discard) {
170 discard = block_device_support_discard(fn.c_str());
171 dout(10) << fn << " support discard: " << (int)discard << dendl;
172 }
173
174 return 0;
175 }
176
177 int FileJournal::_open_file(int64_t oldsize, blksize_t blksize,
178 bool create)
179 {
180 int ret;
181 int64_t conf_journal_sz(cct->_conf->osd_journal_size);
182 conf_journal_sz <<= 20;
183
184 if ((cct->_conf->osd_journal_size == 0) && (oldsize < ONE_MEG)) {
185 derr << "I'm sorry, I don't know how large of a journal to create."
186 << "Please specify a block device to use as the journal OR "
187 << "set osd_journal_size in your ceph.conf" << dendl;
188 return -EINVAL;
189 }
190
191 if (create && (oldsize < conf_journal_sz)) {
192 uint64_t newsize(conf_journal_sz);
193 dout(10) << __func__ << " _open extending to " << newsize << " bytes" << dendl;
194 ret = ::ftruncate(fd, newsize);
195 if (ret < 0) {
196 int err = errno;
197 derr << "FileJournal::_open_file : unable to extend journal to "
198 << newsize << " bytes: " << cpp_strerror(err) << dendl;
199 return -err;
200 }
201 #ifdef HAVE_POSIX_FALLOCATE
202 ret = ::posix_fallocate(fd, 0, newsize);
203 if (ret) {
204 derr << "FileJournal::_open_file : unable to preallocation journal to "
205 << newsize << " bytes: " << cpp_strerror(ret) << dendl;
206 return -ret;
207 }
208 max_size = newsize;
209 #elif defined(__APPLE__)
210 fstore_t store;
211 store.fst_flags = F_ALLOCATECONTIG;
212 store.fst_posmode = F_PEOFPOSMODE;
213 store.fst_offset = 0;
214 store.fst_length = newsize;
215
216 ret = ::fcntl(fd, F_PREALLOCATE, &store);
217 if (ret == -1) {
218 ret = -errno;
219 derr << "FileJournal::_open_file : unable to preallocation journal to "
220 << newsize << " bytes: " << cpp_strerror(ret) << dendl;
221 return ret;
222 }
223 max_size = newsize;
224 #else
225 # error "Journal pre-allocation not supported on platform."
226 #endif
227 }
228 else {
229 max_size = oldsize;
230 }
231 block_size = cct->_conf->journal_block_size;
232
233 if (create && cct->_conf->journal_zero_on_create) {
234 derr << "FileJournal::_open_file : zeroing journal" << dendl;
235 uint64_t write_size = 1 << 20;
236 char *buf;
237 ret = ::posix_memalign((void **)&buf, block_size, write_size);
238 if (ret != 0) {
239 return -ret;
240 }
241 memset(static_cast<void*>(buf), 0, write_size);
242 uint64_t i = 0;
243 for (; (i + write_size) <= (uint64_t)max_size; i += write_size) {
244 ret = ::pwrite(fd, static_cast<void*>(buf), write_size, i);
245 if (ret < 0) {
246 free(buf);
247 return -errno;
248 }
249 }
250 if (i < (uint64_t)max_size) {
251 ret = ::pwrite(fd, static_cast<void*>(buf), max_size - i, i);
252 if (ret < 0) {
253 free(buf);
254 return -errno;
255 }
256 }
257 free(buf);
258 }
259
260
261 dout(10) << "_open journal is not a block device, NOT checking disk "
262 << "write cache on '" << fn << "'" << dendl;
263
264 return 0;
265 }
266
267 // This can not be used on an active journal
268 int FileJournal::check()
269 {
270 int ret;
271
272 assert(fd == -1);
273 ret = _open(false, false);
274 if (ret)
275 return ret;
276
277 ret = read_header(&header);
278 if (ret < 0)
279 goto done;
280
281 if (header.fsid != fsid) {
282 derr << "check: ondisk fsid " << header.fsid << " doesn't match expected " << fsid
283 << ", invalid (someone else's?) journal" << dendl;
284 ret = -EINVAL;
285 goto done;
286 }
287
288 dout(1) << "check: header looks ok" << dendl;
289 ret = 0;
290
291 done:
292 close();
293 return ret;
294 }
295
296
297 int FileJournal::create()
298 {
299 void *buf = 0;
300 int64_t needed_space;
301 int ret;
302 buffer::ptr bp;
303 dout(2) << "create " << fn << " fsid " << fsid << dendl;
304
305 ret = _open(true, true);
306 if (ret)
307 goto done;
308
309 // write empty header
310 header = header_t();
311 header.flags = header_t::FLAG_CRC; // enable crcs on any new journal.
312 header.fsid = fsid;
313 header.max_size = max_size;
314 header.block_size = block_size;
315 if (cct->_conf->journal_block_align || directio)
316 header.alignment = block_size;
317 else
318 header.alignment = 16; // at least stay word aligned on 64bit machines...
319
320 header.start = get_top();
321 header.start_seq = 0;
322
323 print_header(header);
324
325 // static zeroed buffer for alignment padding
326 delete [] zero_buf;
327 zero_buf = new char[header.alignment];
328 memset(zero_buf, 0, header.alignment);
329
330 bp = prepare_header();
331 if (TEMP_FAILURE_RETRY(::pwrite(fd, bp.c_str(), bp.length(), 0)) < 0) {
332 ret = -errno;
333 derr << "FileJournal::create : create write header error "
334 << cpp_strerror(ret) << dendl;
335 goto close_fd;
336 }
337
338 // zero first little bit, too.
339 ret = posix_memalign(&buf, block_size, block_size);
340 if (ret) {
341 ret = -ret;
342 derr << "FileJournal::create: failed to allocate " << block_size
343 << " bytes of memory: " << cpp_strerror(ret) << dendl;
344 goto close_fd;
345 }
346 memset(buf, 0, block_size);
347 if (TEMP_FAILURE_RETRY(::pwrite(fd, buf, block_size, get_top())) < 0) {
348 ret = -errno;
349 derr << "FileJournal::create: error zeroing first " << block_size
350 << " bytes " << cpp_strerror(ret) << dendl;
351 goto free_buf;
352 }
353
354 needed_space = ((int64_t)cct->_conf->osd_max_write_size) << 20;
355 needed_space += (2 * sizeof(entry_header_t)) + get_top();
356 if (header.max_size - header.start < needed_space) {
357 derr << "FileJournal::create: OSD journal is not large enough to hold "
358 << "osd_max_write_size bytes!" << dendl;
359 ret = -ENOSPC;
360 goto free_buf;
361 }
362
363 dout(2) << "create done" << dendl;
364 ret = 0;
365
366 free_buf:
367 free(buf);
368 buf = 0;
369 close_fd:
370 if (TEMP_FAILURE_RETRY(::close(fd)) < 0) {
371 ret = -errno;
372 derr << "FileJournal::create: error closing fd: " << cpp_strerror(ret)
373 << dendl;
374 }
375 done:
376 fd = -1;
377 return ret;
378 }
379
380 // This can not be used on an active journal
381 int FileJournal::peek_fsid(uuid_d& fsid)
382 {
383 assert(fd == -1);
384 int r = _open(false, false);
385 if (r)
386 return r;
387 r = read_header(&header);
388 if (r < 0)
389 goto out;
390 fsid = header.fsid;
391 out:
392 close();
393 return r;
394 }
395
396 int FileJournal::open(uint64_t fs_op_seq)
397 {
398 dout(2) << "open " << fn << " fsid " << fsid << " fs_op_seq " << fs_op_seq << dendl;
399
400 uint64_t next_seq = fs_op_seq + 1;
401
402 int err = _open(false);
403 if (err)
404 return err;
405
406 // assume writeable, unless...
407 read_pos = 0;
408 write_pos = get_top();
409
410 // read header?
411 err = read_header(&header);
412 if (err < 0)
413 return err;
414
415 // static zeroed buffer for alignment padding
416 delete [] zero_buf;
417 zero_buf = new char[header.alignment];
418 memset(zero_buf, 0, header.alignment);
419
420 dout(10) << "open header.fsid = " << header.fsid
421 //<< " vs expected fsid = " << fsid
422 << dendl;
423 if (header.fsid != fsid) {
424 derr << "FileJournal::open: ondisk fsid " << header.fsid << " doesn't match expected " << fsid
425 << ", invalid (someone else's?) journal" << dendl;
426 return -EINVAL;
427 }
428 if (header.max_size > max_size) {
429 dout(2) << "open journal size " << header.max_size << " > current " << max_size << dendl;
430 return -EINVAL;
431 }
432 if (header.block_size != block_size) {
433 dout(2) << "open journal block size " << header.block_size << " != current " << block_size << dendl;
434 return -EINVAL;
435 }
436 if (header.max_size % header.block_size) {
437 dout(2) << "open journal max size " << header.max_size
438 << " not a multiple of block size " << header.block_size << dendl;
439 return -EINVAL;
440 }
441 if (header.alignment != block_size && directio) {
442 dout(0) << "open journal alignment " << header.alignment << " does not match block size "
443 << block_size << " (required for direct_io journal mode)" << dendl;
444 return -EINVAL;
445 }
446 if ((header.alignment % CEPH_DIRECTIO_ALIGNMENT) && directio) {
447 dout(0) << "open journal alignment " << header.alignment
448 << " is not multiple of minimum directio alignment "
449 << CEPH_DIRECTIO_ALIGNMENT << " (required for direct_io journal mode)"
450 << dendl;
451 return -EINVAL;
452 }
453
454 // looks like a valid header.
455 write_pos = 0; // not writeable yet
456
457 journaled_seq = header.committed_up_to;
458
459 // find next entry
460 read_pos = header.start;
461 uint64_t seq = header.start_seq;
462
463 // last_committed_seq is 1 before the start of the journal or
464 // 0 if the start is 0
465 last_committed_seq = seq > 0 ? seq - 1 : seq;
466 if (last_committed_seq < fs_op_seq) {
467 dout(2) << "open advancing committed_seq " << last_committed_seq
468 << " to fs op_seq " << fs_op_seq << dendl;
469 last_committed_seq = fs_op_seq;
470 }
471
472 while (1) {
473 bufferlist bl;
474 off64_t old_pos = read_pos;
475 if (!read_entry(bl, seq)) {
476 dout(10) << "open reached end of journal." << dendl;
477 break;
478 }
479 if (seq > next_seq) {
480 dout(10) << "open entry " << seq << " len " << bl.length() << " > next_seq " << next_seq
481 << ", ignoring journal contents"
482 << dendl;
483 read_pos = -1;
484 last_committed_seq = 0;
485 return 0;
486 }
487 if (seq == next_seq) {
488 dout(10) << "open reached seq " << seq << dendl;
489 read_pos = old_pos;
490 break;
491 }
492 seq++; // next event should follow.
493 }
494
495 return 0;
496 }
497
498 void FileJournal::_close(int fd) const
499 {
500 VOID_TEMP_FAILURE_RETRY(::close(fd));
501 }
502
503 void FileJournal::close()
504 {
505 dout(1) << "close " << fn << dendl;
506
507 // stop writer thread
508 stop_writer();
509
510 // close
511 assert(writeq_empty());
512 assert(!must_write_header);
513 assert(fd >= 0);
514 _close(fd);
515 fd = -1;
516 }
517
518
519 int FileJournal::dump(ostream& out)
520 {
521 return _dump(out, false);
522 }
523
524 int FileJournal::simple_dump(ostream& out)
525 {
526 return _dump(out, true);
527 }
528
529 int FileJournal::_dump(ostream& out, bool simple)
530 {
531 JSONFormatter f(true);
532 int ret = _fdump(f, simple);
533 f.flush(out);
534 return ret;
535 }
536
537 int FileJournal::_fdump(Formatter &f, bool simple)
538 {
539 dout(10) << "_fdump" << dendl;
540
541 assert(fd == -1);
542 int err = _open(false, false);
543 if (err)
544 return err;
545
546 err = read_header(&header);
547 if (err < 0) {
548 close();
549 return err;
550 }
551
552 off64_t next_pos = header.start;
553
554 f.open_object_section("journal");
555
556 f.open_object_section("header");
557 f.dump_unsigned("flags", header.flags);
558 ostringstream os;
559 os << header.fsid;
560 f.dump_string("fsid", os.str());
561 f.dump_unsigned("block_size", header.block_size);
562 f.dump_unsigned("alignment", header.alignment);
563 f.dump_int("max_size", header.max_size);
564 f.dump_int("start", header.start);
565 f.dump_unsigned("committed_up_to", header.committed_up_to);
566 f.dump_unsigned("start_seq", header.start_seq);
567 f.close_section();
568
569 f.open_array_section("entries");
570 uint64_t seq = header.start_seq;
571 while (1) {
572 bufferlist bl;
573 off64_t pos = next_pos;
574
575 if (!pos) {
576 dout(2) << "_dump -- not readable" << dendl;
577 err = -EINVAL;
578 break;
579 }
580 stringstream ss;
581 read_entry_result result = do_read_entry(
582 pos,
583 &next_pos,
584 &bl,
585 &seq,
586 &ss);
587 if (result != SUCCESS) {
588 if (seq < header.committed_up_to) {
589 dout(2) << "Unable to read past sequence " << seq
590 << " but header indicates the journal has committed up through "
591 << header.committed_up_to << ", journal is corrupt" << dendl;
592 err = -EINVAL;
593 }
594 dout(25) << ss.str() << dendl;
595 dout(25) << "No further valid entries found, journal is most likely valid"
596 << dendl;
597 break;
598 }
599
600 f.open_object_section("entry");
601 f.dump_unsigned("offset", pos);
602 f.dump_unsigned("seq", seq);
603 if (simple) {
604 f.dump_unsigned("bl.length", bl.length());
605 } else {
606 f.open_array_section("transactions");
607 bufferlist::iterator p = bl.begin();
608 int trans_num = 0;
609 while (!p.end()) {
610 ObjectStore::Transaction t(p);
611 f.open_object_section("transaction");
612 f.dump_unsigned("trans_num", trans_num);
613 t.dump(&f);
614 f.close_section();
615 trans_num++;
616 }
617 f.close_section();
618 }
619 f.close_section();
620 }
621
622 f.close_section();
623 f.close_section();
624 dout(10) << "dump finish" << dendl;
625
626 close();
627 return err;
628 }
629
630
631 void FileJournal::start_writer()
632 {
633 write_stop = false;
634 aio_stop = false;
635 write_thread.create("journal_write");
636 #ifdef HAVE_LIBAIO
637 if (aio)
638 write_finish_thread.create("journal_wrt_fin");
639 #endif
640 }
641
642 void FileJournal::stop_writer()
643 {
644 // Do nothing if writer already stopped or never started
645 if (!write_stop)
646 {
647 {
648 Mutex::Locker l(write_lock);
649 Mutex::Locker p(writeq_lock);
650 write_stop = true;
651 writeq_cond.Signal();
652 // Doesn't hurt to signal commit_cond in case thread is waiting there
653 // and caller didn't use committed_thru() first.
654 commit_cond.Signal();
655 }
656 write_thread.join();
657
658 // write journal header now so that we have less to replay on remount
659 write_header_sync();
660 }
661
662 #ifdef HAVE_LIBAIO
663 // stop aio completeion thread *after* writer thread has stopped
664 // and has submitted all of its io
665 if (aio && !aio_stop) {
666 aio_lock.Lock();
667 aio_stop = true;
668 aio_cond.Signal();
669 write_finish_cond.Signal();
670 aio_lock.Unlock();
671 write_finish_thread.join();
672 }
673 #endif
674 }
675
676
677
678 void FileJournal::print_header(const header_t &header) const
679 {
680 dout(10) << "header: block_size " << header.block_size
681 << " alignment " << header.alignment
682 << " max_size " << header.max_size
683 << dendl;
684 dout(10) << "header: start " << header.start << dendl;
685 dout(10) << " write_pos " << write_pos << dendl;
686 }
687
688 int FileJournal::read_header(header_t *hdr) const
689 {
690 dout(10) << "read_header" << dendl;
691 bufferlist bl;
692
693 buffer::ptr bp = buffer::create_page_aligned(block_size);
694 char* bpdata = bp.c_str();
695 int r = ::pread(fd, bpdata, bp.length(), 0);
696
697 if (r < 0) {
698 int err = errno;
699 dout(0) << "read_header got " << cpp_strerror(err) << dendl;
700 return -err;
701 }
702
703 // don't use bp.zero() here, because it also invalidates
704 // crc cache (which is not yet populated anyway)
705 if (bp.length() != (size_t)r) {
706 // r will be always less or equal than bp.length
707 bpdata += r;
708 memset(bpdata, 0, bp.length() - r);
709 }
710
711 bl.push_back(std::move(bp));
712
713 try {
714 bufferlist::iterator p = bl.begin();
715 ::decode(*hdr, p);
716 }
717 catch (buffer::error& e) {
718 derr << "read_header error decoding journal header" << dendl;
719 return -EINVAL;
720 }
721
722
723 /*
724 * Unfortunately we weren't initializing the flags field for new
725 * journals! Aie. This is safe(ish) now that we have only one
726 * flag. Probably around when we add the next flag we need to
727 * remove this or else this (eventually old) code will clobber newer
728 * code's flags.
729 */
730 if (hdr->flags > 3) {
731 derr << "read_header appears to have gibberish flags; assuming 0" << dendl;
732 hdr->flags = 0;
733 }
734
735 print_header(*hdr);
736
737 return 0;
738 }
739
740 bufferptr FileJournal::prepare_header()
741 {
742 bufferlist bl;
743 {
744 Mutex::Locker l(finisher_lock);
745 header.committed_up_to = journaled_seq;
746 }
747 ::encode(header, bl);
748 bufferptr bp = buffer::create_page_aligned(get_top());
749 // don't use bp.zero() here, because it also invalidates
750 // crc cache (which is not yet populated anyway)
751 char* data = bp.c_str();
752 memcpy(data, bl.c_str(), bl.length());
753 data += bl.length();
754 memset(data, 0, bp.length()-bl.length());
755 return bp;
756 }
757
758 void FileJournal::write_header_sync()
759 {
760 Mutex::Locker locker(write_lock);
761 must_write_header = true;
762 bufferlist bl;
763 do_write(bl);
764 dout(20) << __func__ << " finish" << dendl;
765 }
766
767 int FileJournal::check_for_full(uint64_t seq, off64_t pos, off64_t size)
768 {
769 // already full?
770 if (full_state != FULL_NOTFULL)
771 return -ENOSPC;
772
773 // take 1 byte off so that we only get pos == header.start on EMPTY, never on FULL.
774 off64_t room;
775 if (pos >= header.start)
776 room = (header.max_size - pos) + (header.start - get_top()) - 1;
777 else
778 room = header.start - pos - 1;
779 dout(10) << "room " << room << " max_size " << max_size << " pos " << pos << " header.start " << header.start
780 << " top " << get_top() << dendl;
781
782 if (do_sync_cond) {
783 if (room >= (header.max_size >> 1) &&
784 room - size < (header.max_size >> 1)) {
785 dout(10) << " passing half full mark, triggering commit" << dendl;
786 do_sync_cond->SloppySignal(); // initiate a real commit so we can trim
787 }
788 }
789
790 if (room >= size) {
791 dout(10) << "check_for_full at " << pos << " : " << size << " < " << room << dendl;
792 if (pos + size > header.max_size)
793 must_write_header = true;
794 return 0;
795 }
796
797 // full
798 dout(1) << "check_for_full at " << pos << " : JOURNAL FULL "
799 << pos << " >= " << room
800 << " (max_size " << header.max_size << " start " << header.start << ")"
801 << dendl;
802
803 off64_t max = header.max_size - get_top();
804 if (size > max)
805 dout(0) << "JOURNAL TOO SMALL: continuing, but slow: item " << size << " > journal " << max << " (usable)" << dendl;
806
807 return -ENOSPC;
808 }
809
810 int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytes)
811 {
812 // gather queued writes
813 off64_t queue_pos = write_pos;
814
815 int eleft = cct->_conf->journal_max_write_entries;
816 unsigned bmax = cct->_conf->journal_max_write_bytes;
817
818 if (full_state != FULL_NOTFULL)
819 return -ENOSPC;
820
821 while (!writeq_empty()) {
822 list<write_item> items;
823 batch_pop_write(items);
824 list<write_item>::iterator it = items.begin();
825 while (it != items.end()) {
826 uint64_t bytes = it->bl.length();
827 int r = prepare_single_write(*it, bl, queue_pos, orig_ops, orig_bytes);
828 if (r == 0) { // prepare ok, delete it
829 items.erase(it++);
830 #ifdef HAVE_LIBAIO
831 {
832 Mutex::Locker locker(aio_lock);
833 assert(aio_write_queue_ops > 0);
834 aio_write_queue_ops--;
835 assert(aio_write_queue_bytes >= bytes);
836 aio_write_queue_bytes -= bytes;
837 }
838 #else
839 (void)bytes;
840 #endif
841 }
842 if (r == -ENOSPC) {
843 // the journal maybe full, insert the left item to writeq
844 batch_unpop_write(items);
845 if (orig_ops)
846 goto out; // commit what we have
847
848 if (logger)
849 logger->inc(l_filestore_journal_full);
850
851 if (wait_on_full) {
852 dout(20) << "prepare_multi_write full on first entry, need to wait" << dendl;
853 } else {
854 dout(20) << "prepare_multi_write full on first entry, restarting journal" << dendl;
855
856 // throw out what we have so far
857 full_state = FULL_FULL;
858 while (!writeq_empty()) {
859 complete_write(1, peek_write().orig_len);
860 pop_write();
861 }
862 print_header(header);
863 }
864
865 return -ENOSPC; // hrm, full on first op
866 }
867 if (eleft) {
868 if (--eleft == 0) {
869 dout(20) << "prepare_multi_write hit max events per write "
870 << cct->_conf->journal_max_write_entries << dendl;
871 batch_unpop_write(items);
872 goto out;
873 }
874 }
875 if (bmax) {
876 if (bl.length() >= bmax) {
877 dout(20) << "prepare_multi_write hit max write size "
878 << cct->_conf->journal_max_write_bytes << dendl;
879 batch_unpop_write(items);
880 goto out;
881 }
882 }
883 }
884 }
885
886 out:
887 dout(20) << "prepare_multi_write queue_pos now " << queue_pos << dendl;
888 assert((write_pos + bl.length() == queue_pos) ||
889 (write_pos + bl.length() - header.max_size + get_top() == queue_pos));
890 return 0;
891 }
892
893 /*
894 void FileJournal::queue_write_fin(uint64_t seq, Context *fin)
895 {
896 writing_seq.push_back(seq);
897 if (!waiting_for_notfull.empty()) {
898 // make sure previously unjournaled stuff waiting for UNFULL triggers
899 // _before_ newly journaled stuff does
900 dout(10) << "queue_write_fin will defer seq " << seq << " callback " << fin
901 << " until after UNFULL" << dendl;
902 C_Gather *g = new C_Gather(writeq.front().fin);
903 writing_fin.push_back(g->new_sub());
904 waiting_for_notfull.push_back(g->new_sub());
905 } else {
906 writing_fin.push_back(writeq.front().fin);
907 dout(20) << "queue_write_fin seq " << seq << " callback " << fin << dendl;
908 }
909 }
910 */
911
912 void FileJournal::queue_completions_thru(uint64_t seq)
913 {
914 assert(finisher_lock.is_locked());
915 utime_t now = ceph_clock_now();
916 list<completion_item> items;
917 batch_pop_completions(items);
918 list<completion_item>::iterator it = items.begin();
919 while (it != items.end()) {
920 completion_item& next = *it;
921 if (next.seq > seq)
922 break;
923 utime_t lat = now;
924 lat -= next.start;
925 dout(10) << "queue_completions_thru seq " << seq
926 << " queueing seq " << next.seq
927 << " " << next.finish
928 << " lat " << lat << dendl;
929 if (logger) {
930 logger->tinc(l_filestore_journal_latency, lat);
931 }
932 if (next.finish)
933 finisher->queue(next.finish);
934 if (next.tracked_op) {
935 next.tracked_op->mark_event("journaled_completion_queued");
936 next.tracked_op->journal_trace.event("queued completion");
937 next.tracked_op->journal_trace.keyval("completed through", seq);
938 }
939 items.erase(it++);
940 }
941 batch_unpop_completions(items);
942 finisher_cond.Signal();
943 }
944
945
946 int FileJournal::prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes)
947 {
948 uint64_t seq = next_write.seq;
949 bufferlist &ebl = next_write.bl;
950 off64_t size = ebl.length();
951
952 int r = check_for_full(seq, queue_pos, size);
953 if (r < 0)
954 return r; // ENOSPC or EAGAIN
955
956 uint32_t orig_len = next_write.orig_len;
957 orig_bytes += orig_len;
958 orig_ops++;
959
960 // add to write buffer
961 dout(15) << "prepare_single_write " << orig_ops << " will write " << queue_pos << " : seq " << seq
962 << " len " << orig_len << " -> " << size << dendl;
963
964 unsigned seq_offset = offsetof(entry_header_t, seq);
965 unsigned magic1_offset = offsetof(entry_header_t, magic1);
966 unsigned magic2_offset = offsetof(entry_header_t, magic2);
967
968 bufferptr headerptr = ebl.buffers().front();
969 uint64_t _seq = seq;
970 uint64_t _queue_pos = queue_pos;
971 uint64_t magic2 = entry_header_t::make_magic(seq, orig_len, header.get_fsid64());
972 headerptr.copy_in(seq_offset, sizeof(uint64_t), (char *)&_seq);
973 headerptr.copy_in(magic1_offset, sizeof(uint64_t), (char *)&_queue_pos);
974 headerptr.copy_in(magic2_offset, sizeof(uint64_t), (char *)&magic2);
975
976 bufferptr footerptr = ebl.buffers().back();
977 unsigned post_offset = footerptr.length() - sizeof(entry_header_t);
978 footerptr.copy_in(post_offset + seq_offset, sizeof(uint64_t), (char *)&_seq);
979 footerptr.copy_in(post_offset + magic1_offset, sizeof(uint64_t), (char *)&_queue_pos);
980 footerptr.copy_in(post_offset + magic2_offset, sizeof(uint64_t), (char *)&magic2);
981
982 bl.claim_append(ebl);
983 if (next_write.tracked_op) {
984 next_write.tracked_op->mark_event("write_thread_in_journal_buffer");
985 next_write.tracked_op->journal_trace.event("prepare_single_write");
986 }
987
988 journalq.push_back(pair<uint64_t,off64_t>(seq, queue_pos));
989 writing_seq = seq;
990
991 queue_pos += size;
992 if (queue_pos >= header.max_size)
993 queue_pos = queue_pos + get_top() - header.max_size;
994
995 return 0;
996 }
997
998 void FileJournal::check_align(off64_t pos, bufferlist& bl)
999 {
1000 // make sure list segments are page aligned
1001 if (directio && !bl.is_aligned_size_and_memory(block_size, CEPH_DIRECTIO_ALIGNMENT)) {
1002 assert((bl.length() & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0);
1003 assert((pos & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0);
1004 assert(0 == "bl was not aligned");
1005 }
1006 }
1007
1008 int FileJournal::write_bl(off64_t& pos, bufferlist& bl)
1009 {
1010 int ret;
1011
1012 off64_t spos = ::lseek64(fd, pos, SEEK_SET);
1013 if (spos < 0) {
1014 ret = -errno;
1015 derr << "FileJournal::write_bl : lseek64 failed " << cpp_strerror(ret) << dendl;
1016 return ret;
1017 }
1018 ret = bl.write_fd(fd);
1019 if (ret) {
1020 derr << "FileJournal::write_bl : write_fd failed: " << cpp_strerror(ret) << dendl;
1021 return ret;
1022 }
1023 pos += bl.length();
1024 if (pos == header.max_size)
1025 pos = get_top();
1026 return 0;
1027 }
1028
1029 void FileJournal::do_write(bufferlist& bl)
1030 {
1031 // nothing to do?
1032 if (bl.length() == 0 && !must_write_header)
1033 return;
1034
1035 buffer::ptr hbp;
1036 if (cct->_conf->journal_write_header_frequency &&
1037 (((++journaled_since_start) %
1038 cct->_conf->journal_write_header_frequency) == 0)) {
1039 must_write_header = true;
1040 }
1041
1042 if (must_write_header) {
1043 must_write_header = false;
1044 hbp = prepare_header();
1045 }
1046
1047 dout(15) << "do_write writing " << write_pos << "~" << bl.length()
1048 << (hbp.length() ? " + header":"")
1049 << dendl;
1050
1051 utime_t from = ceph_clock_now();
1052
1053 // entry
1054 off64_t pos = write_pos;
1055
1056 // Adjust write_pos
1057 write_pos += bl.length();
1058 if (write_pos >= header.max_size)
1059 write_pos = write_pos - header.max_size + get_top();
1060
1061 write_lock.Unlock();
1062
1063 // split?
1064 off64_t split = 0;
1065 if (pos + bl.length() > header.max_size) {
1066 bufferlist first, second;
1067 split = header.max_size - pos;
1068 first.substr_of(bl, 0, split);
1069 second.substr_of(bl, split, bl.length() - split);
1070 assert(first.length() + second.length() == bl.length());
1071 dout(10) << "do_write wrapping, first bit at " << pos << " len " << first.length()
1072 << " second bit len " << second.length() << " (orig len " << bl.length() << ")" << dendl;
1073
1074 //Save pos to write first piece second
1075 off64_t first_pos = pos;
1076 off64_t orig_pos;
1077 pos = get_top();
1078 // header too?
1079 if (hbp.length()) {
1080 // be sneaky: include the header in the second fragment
1081 second.push_front(hbp);
1082 pos = 0; // we included the header
1083 }
1084 // Write the second portion first possible with the header, so
1085 // do_read_entry() won't even get a valid entry_header_t if there
1086 // is a crash between the two writes.
1087 orig_pos = pos;
1088 if (write_bl(pos, second)) {
1089 derr << "FileJournal::do_write: write_bl(pos=" << orig_pos
1090 << ") failed" << dendl;
1091 check_align(pos, second);
1092 ceph_abort();
1093 }
1094 orig_pos = first_pos;
1095 if (write_bl(first_pos, first)) {
1096 derr << "FileJournal::do_write: write_bl(pos=" << orig_pos
1097 << ") failed" << dendl;
1098 check_align(first_pos, first);
1099 ceph_abort();
1100 }
1101 assert(first_pos == get_top());
1102 } else {
1103 // header too?
1104 if (hbp.length()) {
1105 if (TEMP_FAILURE_RETRY(::pwrite(fd, hbp.c_str(), hbp.length(), 0)) < 0) {
1106 int err = errno;
1107 derr << "FileJournal::do_write: pwrite(fd=" << fd
1108 << ", hbp.length=" << hbp.length() << ") failed :"
1109 << cpp_strerror(err) << dendl;
1110 ceph_abort();
1111 }
1112 }
1113
1114 if (write_bl(pos, bl)) {
1115 derr << "FileJournal::do_write: write_bl(pos=" << pos
1116 << ") failed" << dendl;
1117 check_align(pos, bl);
1118 ceph_abort();
1119 }
1120 }
1121
1122 if (!directio) {
1123 dout(20) << "do_write fsync" << dendl;
1124
1125 /*
1126 * We'd really love to have a fsync_range or fdatasync_range and do a:
1127 *
1128 * if (split) {
1129 * ::fsync_range(fd, header.max_size - split, split)l
1130 * ::fsync_range(fd, get_top(), bl.length() - split);
1131 * else
1132 * ::fsync_range(fd, write_pos, bl.length())
1133 *
1134 * NetBSD and AIX apparently have it, and adding it to Linux wouldn't be
1135 * too hard given all the underlying infrastructure already exist.
1136 *
1137 * NOTE: using sync_file_range here would not be safe as it does not
1138 * flush disk caches or commits any sort of metadata.
1139 */
1140 int ret = 0;
1141 #if defined(DARWIN) || defined(__FreeBSD__)
1142 ret = ::fsync(fd);
1143 #else
1144 ret = ::fdatasync(fd);
1145 #endif
1146 if (ret < 0) {
1147 derr << __func__ << " fsync/fdatasync failed: " << cpp_strerror(errno) << dendl;
1148 ceph_abort();
1149 }
1150 #ifdef HAVE_POSIX_FADVISE
1151 if (cct->_conf->filestore_fadvise)
1152 posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED);
1153 #endif
1154 }
1155
1156 utime_t lat = ceph_clock_now() - from;
1157 dout(20) << "do_write latency " << lat << dendl;
1158
1159 write_lock.Lock();
1160
1161 assert(write_pos == pos);
1162 assert(write_pos % header.alignment == 0);
1163
1164 {
1165 Mutex::Locker locker(finisher_lock);
1166 journaled_seq = writing_seq;
1167
1168 // kick finisher?
1169 // only if we haven't filled up recently!
1170 if (full_state != FULL_NOTFULL) {
1171 dout(10) << "do_write NOT queueing finisher seq " << journaled_seq
1172 << ", full_commit_seq|full_restart_seq" << dendl;
1173 } else {
1174 if (plug_journal_completions) {
1175 dout(20) << "do_write NOT queueing finishers through seq " << journaled_seq
1176 << " due to completion plug" << dendl;
1177 } else {
1178 dout(20) << "do_write queueing finishers through seq " << journaled_seq << dendl;
1179 queue_completions_thru(journaled_seq);
1180 }
1181 }
1182 }
1183 }
1184
1185 void FileJournal::flush()
1186 {
1187 dout(10) << "waiting for completions to empty" << dendl;
1188 {
1189 Mutex::Locker l(finisher_lock);
1190 while (!completions_empty())
1191 finisher_cond.Wait(finisher_lock);
1192 }
1193 dout(10) << "flush waiting for finisher" << dendl;
1194 finisher->wait_for_empty();
1195 dout(10) << "flush done" << dendl;
1196 }
1197
1198
1199 void FileJournal::write_thread_entry()
1200 {
1201 dout(10) << "write_thread_entry start" << dendl;
1202 while (1) {
1203 {
1204 Mutex::Locker locker(writeq_lock);
1205 if (writeq.empty() && !must_write_header) {
1206 if (write_stop)
1207 break;
1208 dout(20) << "write_thread_entry going to sleep" << dendl;
1209 writeq_cond.Wait(writeq_lock);
1210 dout(20) << "write_thread_entry woke up" << dendl;
1211 continue;
1212 }
1213 }
1214
1215 #ifdef HAVE_LIBAIO
1216 if (aio) {
1217 Mutex::Locker locker(aio_lock);
1218 // should we back off to limit aios in flight? try to do this
1219 // adaptively so that we submit larger aios once we have lots of
1220 // them in flight.
1221 //
1222 // NOTE: our condition here is based on aio_num (protected by
1223 // aio_lock) and throttle_bytes (part of the write queue). when
1224 // we sleep, we *only* wait for aio_num to change, and do not
1225 // wake when more data is queued. this is not strictly correct,
1226 // but should be fine given that we will have plenty of aios in
1227 // flight if we hit this limit to ensure we keep the device
1228 // saturated.
1229 while (aio_num > 0) {
1230 int exp = MIN(aio_num * 2, 24);
1231 long unsigned min_new = 1ull << exp;
1232 uint64_t cur = aio_write_queue_bytes;
1233 dout(20) << "write_thread_entry aio throttle: aio num " << aio_num << " bytes " << aio_bytes
1234 << " ... exp " << exp << " min_new " << min_new
1235 << " ... pending " << cur << dendl;
1236 if (cur >= min_new)
1237 break;
1238 dout(20) << "write_thread_entry deferring until more aios complete: "
1239 << aio_num << " aios with " << aio_bytes << " bytes needs " << min_new
1240 << " bytes to start a new aio (currently " << cur << " pending)" << dendl;
1241 aio_cond.Wait(aio_lock);
1242 dout(20) << "write_thread_entry woke up" << dendl;
1243 }
1244 }
1245 #endif
1246
1247 Mutex::Locker locker(write_lock);
1248 uint64_t orig_ops = 0;
1249 uint64_t orig_bytes = 0;
1250
1251 bufferlist bl;
1252 int r = prepare_multi_write(bl, orig_ops, orig_bytes);
1253 // Don't care about journal full if stoppping, so drop queue and
1254 // possibly let header get written and loop above to notice stop
1255 if (r == -ENOSPC) {
1256 if (write_stop) {
1257 dout(20) << "write_thread_entry full and stopping, throw out queue and finish up" << dendl;
1258 while (!writeq_empty()) {
1259 complete_write(1, peek_write().orig_len);
1260 pop_write();
1261 }
1262 print_header(header);
1263 r = 0;
1264 } else {
1265 dout(20) << "write_thread_entry full, going to sleep (waiting for commit)" << dendl;
1266 commit_cond.Wait(write_lock);
1267 dout(20) << "write_thread_entry woke up" << dendl;
1268 continue;
1269 }
1270 }
1271 assert(r == 0);
1272
1273 if (logger) {
1274 logger->inc(l_filestore_journal_wr);
1275 logger->inc(l_filestore_journal_wr_bytes, bl.length());
1276 }
1277
1278 #ifdef HAVE_LIBAIO
1279 if (aio)
1280 do_aio_write(bl);
1281 else
1282 do_write(bl);
1283 #else
1284 do_write(bl);
1285 #endif
1286 complete_write(orig_ops, orig_bytes);
1287 }
1288
1289 dout(10) << "write_thread_entry finish" << dendl;
1290 }
1291
1292 #ifdef HAVE_LIBAIO
1293 void FileJournal::do_aio_write(bufferlist& bl)
1294 {
1295
1296 if (cct->_conf->journal_write_header_frequency &&
1297 (((++journaled_since_start) %
1298 cct->_conf->journal_write_header_frequency) == 0)) {
1299 must_write_header = true;
1300 }
1301
1302 // nothing to do?
1303 if (bl.length() == 0 && !must_write_header)
1304 return;
1305
1306 buffer::ptr hbp;
1307 if (must_write_header) {
1308 must_write_header = false;
1309 hbp = prepare_header();
1310 }
1311
1312 // entry
1313 off64_t pos = write_pos;
1314
1315 dout(15) << "do_aio_write writing " << pos << "~" << bl.length()
1316 << (hbp.length() ? " + header":"")
1317 << dendl;
1318
1319 // split?
1320 off64_t split = 0;
1321 if (pos + bl.length() > header.max_size) {
1322 bufferlist first, second;
1323 split = header.max_size - pos;
1324 first.substr_of(bl, 0, split);
1325 second.substr_of(bl, split, bl.length() - split);
1326 assert(first.length() + second.length() == bl.length());
1327 dout(10) << "do_aio_write wrapping, first bit at " << pos << "~" << first.length() << dendl;
1328
1329 if (write_aio_bl(pos, first, 0)) {
1330 derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
1331 << ") failed" << dendl;
1332 ceph_abort();
1333 }
1334 assert(pos == header.max_size);
1335 if (hbp.length()) {
1336 // be sneaky: include the header in the second fragment
1337 second.push_front(hbp);
1338 pos = 0; // we included the header
1339 } else
1340 pos = get_top(); // no header, start after that
1341 if (write_aio_bl(pos, second, writing_seq)) {
1342 derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
1343 << ") failed" << dendl;
1344 ceph_abort();
1345 }
1346 } else {
1347 // header too?
1348 if (hbp.length()) {
1349 bufferlist hbl;
1350 hbl.push_back(hbp);
1351 loff_t pos = 0;
1352 if (write_aio_bl(pos, hbl, 0)) {
1353 derr << "FileJournal::do_aio_write: write_aio_bl(header) failed" << dendl;
1354 ceph_abort();
1355 }
1356 }
1357
1358 if (write_aio_bl(pos, bl, writing_seq)) {
1359 derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
1360 << ") failed" << dendl;
1361 ceph_abort();
1362 }
1363 }
1364
1365 write_pos = pos;
1366 if (write_pos == header.max_size)
1367 write_pos = get_top();
1368 assert(write_pos % header.alignment == 0);
1369 }
1370
1371 /**
1372 * write a buffer using aio
1373 *
1374 * @param seq seq to trigger when this aio completes. if 0, do not update any state
1375 * on completion.
1376 */
1377 int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq)
1378 {
1379 dout(20) << "write_aio_bl " << pos << "~" << bl.length() << " seq " << seq << dendl;
1380
1381 while (bl.length() > 0) {
1382 int max = MIN(bl.get_num_buffers(), IOV_MAX-1);
1383 iovec *iov = new iovec[max];
1384 int n = 0;
1385 unsigned len = 0;
1386 for (std::list<buffer::ptr>::const_iterator p = bl.buffers().begin();
1387 n < max;
1388 ++p, ++n) {
1389 assert(p != bl.buffers().end());
1390 iov[n].iov_base = (void *)p->c_str();
1391 iov[n].iov_len = p->length();
1392 len += p->length();
1393 }
1394
1395 bufferlist tbl;
1396 bl.splice(0, len, &tbl); // move bytes from bl -> tbl
1397
1398 // lock only aio_queue, current aio, aio_num, aio_bytes, which may be
1399 // modified in check_aio_completion
1400 aio_lock.Lock();
1401 aio_queue.push_back(aio_info(tbl, pos, bl.length() > 0 ? 0 : seq));
1402 aio_info& aio = aio_queue.back();
1403 aio.iov = iov;
1404
1405 io_prep_pwritev(&aio.iocb, fd, aio.iov, n, pos);
1406
1407 dout(20) << "write_aio_bl .. " << aio.off << "~" << aio.len
1408 << " in " << n << dendl;
1409
1410 aio_num++;
1411 aio_bytes += aio.len;
1412
1413 // need to save current aio len to update write_pos later because current
1414 // aio could be ereased from aio_queue once it is done
1415 uint64_t cur_len = aio.len;
1416 // unlock aio_lock because following io_submit might take time to return
1417 aio_lock.Unlock();
1418
1419 iocb *piocb = &aio.iocb;
1420
1421 // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds
1422 int attempts = 16;
1423 int delay = 125;
1424 do {
1425 int r = io_submit(aio_ctx, 1, &piocb);
1426 dout(20) << "write_aio_bl io_submit return value: " << r << dendl;
1427 if (r < 0) {
1428 derr << "io_submit to " << aio.off << "~" << cur_len
1429 << " got " << cpp_strerror(r) << dendl;
1430 if (r == -EAGAIN && attempts-- > 0) {
1431 usleep(delay);
1432 delay *= 2;
1433 continue;
1434 }
1435 check_align(pos, tbl);
1436 assert(0 == "io_submit got unexpected error");
1437 } else {
1438 break;
1439 }
1440 } while (true);
1441 pos += cur_len;
1442 }
1443 aio_lock.Lock();
1444 write_finish_cond.Signal();
1445 aio_lock.Unlock();
1446 return 0;
1447 }
1448 #endif
1449
1450 void FileJournal::write_finish_thread_entry()
1451 {
1452 #ifdef HAVE_LIBAIO
1453 dout(10) << "write_finish_thread_entry enter" << dendl;
1454 while (true) {
1455 {
1456 Mutex::Locker locker(aio_lock);
1457 if (aio_queue.empty()) {
1458 if (aio_stop)
1459 break;
1460 dout(20) << "write_finish_thread_entry sleeping" << dendl;
1461 write_finish_cond.Wait(aio_lock);
1462 continue;
1463 }
1464 }
1465
1466 dout(20) << "write_finish_thread_entry waiting for aio(s)" << dendl;
1467 io_event event[16];
1468 int r = io_getevents(aio_ctx, 1, 16, event, NULL);
1469 if (r < 0) {
1470 if (r == -EINTR) {
1471 dout(0) << "io_getevents got " << cpp_strerror(r) << dendl;
1472 continue;
1473 }
1474 derr << "io_getevents got " << cpp_strerror(r) << dendl;
1475 assert(0 == "got unexpected error from io_getevents");
1476 }
1477
1478 {
1479 Mutex::Locker locker(aio_lock);
1480 for (int i=0; i<r; i++) {
1481 aio_info *ai = (aio_info *)event[i].obj;
1482 if (event[i].res != ai->len) {
1483 derr << "aio to " << ai->off << "~" << ai->len
1484 << " returned: " << (int)event[i].res << dendl;
1485 assert(0 == "unexpected aio error");
1486 }
1487 dout(10) << "write_finish_thread_entry aio " << ai->off
1488 << "~" << ai->len << " done" << dendl;
1489 ai->done = true;
1490 }
1491 check_aio_completion();
1492 }
1493 }
1494 dout(10) << "write_finish_thread_entry exit" << dendl;
1495 #endif
1496 }
1497
1498 #ifdef HAVE_LIBAIO
1499 /**
1500 * check aio_wait for completed aio, and update state appropriately.
1501 */
1502 void FileJournal::check_aio_completion()
1503 {
1504 assert(aio_lock.is_locked());
1505 dout(20) << "check_aio_completion" << dendl;
1506
1507 bool completed_something = false, signal = false;
1508 uint64_t new_journaled_seq = 0;
1509
1510 list<aio_info>::iterator p = aio_queue.begin();
1511 while (p != aio_queue.end() && p->done) {
1512 dout(20) << "check_aio_completion completed seq " << p->seq << " "
1513 << p->off << "~" << p->len << dendl;
1514 if (p->seq) {
1515 new_journaled_seq = p->seq;
1516 completed_something = true;
1517 }
1518 aio_num--;
1519 aio_bytes -= p->len;
1520 aio_queue.erase(p++);
1521 signal = true;
1522 }
1523
1524 if (completed_something) {
1525 // kick finisher?
1526 // only if we haven't filled up recently!
1527 Mutex::Locker locker(finisher_lock);
1528 journaled_seq = new_journaled_seq;
1529 if (full_state != FULL_NOTFULL) {
1530 dout(10) << "check_aio_completion NOT queueing finisher seq " << journaled_seq
1531 << ", full_commit_seq|full_restart_seq" << dendl;
1532 } else {
1533 if (plug_journal_completions) {
1534 dout(20) << "check_aio_completion NOT queueing finishers through seq " << journaled_seq
1535 << " due to completion plug" << dendl;
1536 } else {
1537 dout(20) << "check_aio_completion queueing finishers through seq " << journaled_seq << dendl;
1538 queue_completions_thru(journaled_seq);
1539 }
1540 }
1541 }
1542 if (signal) {
1543 // maybe write queue was waiting for aio count to drop?
1544 aio_cond.Signal();
1545 }
1546 }
1547 #endif
1548
1549 int FileJournal::prepare_entry(vector<ObjectStore::Transaction>& tls, bufferlist* tbl) {
1550 dout(10) << "prepare_entry " << tls << dendl;
1551 int data_len = cct->_conf->journal_align_min_size - 1;
1552 int data_align = -1; // -1 indicates that we don't care about the alignment
1553 bufferlist bl;
1554 for (vector<ObjectStore::Transaction>::iterator p = tls.begin();
1555 p != tls.end(); ++p) {
1556 if ((int)(*p).get_data_length() > data_len) {
1557 data_len = (*p).get_data_length();
1558 data_align = ((*p).get_data_alignment() - bl.length()) & ~CEPH_PAGE_MASK;
1559 }
1560 ::encode(*p, bl);
1561 }
1562 if (tbl->length()) {
1563 bl.claim_append(*tbl);
1564 }
1565 // add it this entry
1566 entry_header_t h;
1567 unsigned head_size = sizeof(entry_header_t);
1568 off64_t base_size = 2*head_size + bl.length();
1569 memset(&h, 0, sizeof(h));
1570 if (data_align >= 0)
1571 h.pre_pad = ((unsigned int)data_align - (unsigned int)head_size) & ~CEPH_PAGE_MASK;
1572 off64_t size = ROUND_UP_TO(base_size + h.pre_pad, header.alignment);
1573 unsigned post_pad = size - base_size - h.pre_pad;
1574 h.len = bl.length();
1575 h.post_pad = post_pad;
1576 h.crc32c = bl.crc32c(0);
1577 dout(10) << " len " << bl.length() << " -> " << size
1578 << " (head " << head_size << " pre_pad " << h.pre_pad
1579 << " bl " << bl.length() << " post_pad " << post_pad << " tail " << head_size << ")"
1580 << " (bl alignment " << data_align << ")"
1581 << dendl;
1582 bufferlist ebl;
1583 // header
1584 ebl.append((const char*)&h, sizeof(h));
1585 if (h.pre_pad) {
1586 ebl.push_back(buffer::create_static(h.pre_pad, zero_buf));
1587 }
1588 // payload
1589 ebl.claim_append(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy
1590 if (h.post_pad) {
1591 ebl.push_back(buffer::create_static(h.post_pad, zero_buf));
1592 }
1593 // footer
1594 ebl.append((const char*)&h, sizeof(h));
1595 if (directio)
1596 ebl.rebuild_aligned(CEPH_DIRECTIO_ALIGNMENT);
1597 tbl->claim(ebl);
1598 return h.len;
1599 }
1600
1601 void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len,
1602 Context *oncommit, TrackedOpRef osd_op)
1603 {
1604 // dump on queue
1605 dout(5) << "submit_entry seq " << seq
1606 << " len " << e.length()
1607 << " (" << oncommit << ")" << dendl;
1608 assert(e.length() > 0);
1609 assert(e.length() < header.max_size);
1610
1611 if (osd_op)
1612 osd_op->mark_event("commit_queued_for_journal_write");
1613 if (logger) {
1614 logger->inc(l_filestore_journal_queue_bytes, orig_len);
1615 logger->inc(l_filestore_journal_queue_ops, 1);
1616 }
1617
1618 throttle.register_throttle_seq(seq, e.length());
1619 if (logger) {
1620 logger->inc(l_filestore_journal_ops, 1);
1621 logger->inc(l_filestore_journal_bytes, e.length());
1622 }
1623
1624 if (osd_op) {
1625 osd_op->mark_event("commit_queued_for_journal_write");
1626 if (osd_op->store_trace) {
1627 osd_op->journal_trace.init("journal", &trace_endpoint, &osd_op->store_trace);
1628 osd_op->journal_trace.event("submit_entry");
1629 osd_op->journal_trace.keyval("seq", seq);
1630 }
1631 }
1632 {
1633 Mutex::Locker l1(writeq_lock);
1634 #ifdef HAVE_LIBAIO
1635 Mutex::Locker l2(aio_lock);
1636 #endif
1637 Mutex::Locker l3(completions_lock);
1638
1639 #ifdef HAVE_LIBAIO
1640 aio_write_queue_ops++;
1641 aio_write_queue_bytes += e.length();
1642 aio_cond.Signal();
1643 #endif
1644
1645 completions.push_back(
1646 completion_item(
1647 seq, oncommit, ceph_clock_now(), osd_op));
1648 if (writeq.empty())
1649 writeq_cond.Signal();
1650 writeq.push_back(write_item(seq, e, orig_len, osd_op));
1651 if (osd_op)
1652 osd_op->journal_trace.keyval("queue depth", writeq.size());
1653 }
1654 }
1655
1656 bool FileJournal::writeq_empty()
1657 {
1658 Mutex::Locker locker(writeq_lock);
1659 return writeq.empty();
1660 }
1661
1662 FileJournal::write_item &FileJournal::peek_write()
1663 {
1664 assert(write_lock.is_locked());
1665 Mutex::Locker locker(writeq_lock);
1666 return writeq.front();
1667 }
1668
1669 void FileJournal::pop_write()
1670 {
1671 assert(write_lock.is_locked());
1672 Mutex::Locker locker(writeq_lock);
1673 if (logger) {
1674 logger->dec(l_filestore_journal_queue_bytes, writeq.front().orig_len);
1675 logger->dec(l_filestore_journal_queue_ops, 1);
1676 }
1677 writeq.pop_front();
1678 }
1679
1680 void FileJournal::batch_pop_write(list<write_item> &items)
1681 {
1682 assert(write_lock.is_locked());
1683 {
1684 Mutex::Locker locker(writeq_lock);
1685 writeq.swap(items);
1686 }
1687 for (auto &&i : items) {
1688 if (logger) {
1689 logger->dec(l_filestore_journal_queue_bytes, i.orig_len);
1690 logger->dec(l_filestore_journal_queue_ops, 1);
1691 }
1692 }
1693 }
1694
1695 void FileJournal::batch_unpop_write(list<write_item> &items)
1696 {
1697 assert(write_lock.is_locked());
1698 for (auto &&i : items) {
1699 if (logger) {
1700 logger->inc(l_filestore_journal_queue_bytes, i.orig_len);
1701 logger->inc(l_filestore_journal_queue_ops, 1);
1702 }
1703 }
1704 Mutex::Locker locker(writeq_lock);
1705 writeq.splice(writeq.begin(), items);
1706 }
1707
1708 void FileJournal::commit_start(uint64_t seq)
1709 {
1710 dout(10) << "commit_start" << dendl;
1711
1712 // was full?
1713 switch (full_state) {
1714 case FULL_NOTFULL:
1715 break; // all good
1716
1717 case FULL_FULL:
1718 if (seq >= journaled_seq) {
1719 dout(1) << " FULL_FULL -> FULL_WAIT. commit_start on seq "
1720 << seq << " > journaled_seq " << journaled_seq
1721 << ", moving to FULL_WAIT."
1722 << dendl;
1723 full_state = FULL_WAIT;
1724 } else {
1725 dout(1) << "FULL_FULL commit_start on seq "
1726 << seq << " < journaled_seq " << journaled_seq
1727 << ", remaining in FULL_FULL"
1728 << dendl;
1729 }
1730 break;
1731
1732 case FULL_WAIT:
1733 dout(1) << " FULL_WAIT -> FULL_NOTFULL. journal now active, setting completion plug." << dendl;
1734 full_state = FULL_NOTFULL;
1735 plug_journal_completions = true;
1736 break;
1737 }
1738 }
1739
1740 /*
1741 *send discard command to joural block deivce
1742 */
1743 void FileJournal::do_discard(int64_t offset, int64_t end)
1744 {
1745 dout(10) << __func__ << "trim(" << offset << ", " << end << dendl;
1746
1747 offset = ROUND_UP_TO(offset, block_size);
1748 if (offset >= end)
1749 return;
1750 end = ROUND_UP_TO(end - block_size, block_size);
1751 assert(end >= offset);
1752 if (offset < end)
1753 if (block_device_discard(fd, offset, end - offset) < 0)
1754 dout(1) << __func__ << "ioctl(BLKDISCARD) error:" << cpp_strerror(errno) << dendl;
1755 }
1756
1757 void FileJournal::committed_thru(uint64_t seq)
1758 {
1759 Mutex::Locker locker(write_lock);
1760
1761 auto released = throttle.flush(seq);
1762 if (logger) {
1763 logger->dec(l_filestore_journal_ops, released.first);
1764 logger->dec(l_filestore_journal_bytes, released.second);
1765 }
1766
1767 if (seq < last_committed_seq) {
1768 dout(5) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl;
1769 assert(seq >= last_committed_seq);
1770 return;
1771 }
1772 if (seq == last_committed_seq) {
1773 dout(5) << "committed_thru " << seq << " == last_committed_seq " << last_committed_seq << dendl;
1774 return;
1775 }
1776
1777 dout(5) << "committed_thru " << seq << " (last_committed_seq " << last_committed_seq << ")" << dendl;
1778 last_committed_seq = seq;
1779
1780 // completions!
1781 {
1782 Mutex::Locker locker(finisher_lock);
1783 queue_completions_thru(seq);
1784 if (plug_journal_completions && seq >= header.start_seq) {
1785 dout(10) << " removing completion plug, queuing completions thru journaled_seq " << journaled_seq << dendl;
1786 plug_journal_completions = false;
1787 queue_completions_thru(journaled_seq);
1788 }
1789 }
1790
1791 // adjust start pointer
1792 while (!journalq.empty() && journalq.front().first <= seq) {
1793 journalq.pop_front();
1794 }
1795
1796 int64_t old_start = header.start;
1797 if (!journalq.empty()) {
1798 header.start = journalq.front().second;
1799 header.start_seq = journalq.front().first;
1800 } else {
1801 header.start = write_pos;
1802 header.start_seq = seq + 1;
1803 }
1804
1805 if (discard) {
1806 dout(10) << __func__ << " will trim (" << old_start << ", " << header.start << ")" << dendl;
1807 if (old_start < header.start)
1808 do_discard(old_start, header.start - 1);
1809 else {
1810 do_discard(old_start, header.max_size - 1);
1811 do_discard(get_top(), header.start - 1);
1812 }
1813 }
1814
1815 must_write_header = true;
1816 print_header(header);
1817
1818 // committed but unjournaled items
1819 while (!writeq_empty() && peek_write().seq <= seq) {
1820 dout(15) << " dropping committed but unwritten seq " << peek_write().seq
1821 << " len " << peek_write().bl.length()
1822 << dendl;
1823 complete_write(1, peek_write().orig_len);
1824 pop_write();
1825 }
1826
1827 commit_cond.Signal();
1828
1829 dout(10) << "committed_thru done" << dendl;
1830 }
1831
1832
1833 void FileJournal::complete_write(uint64_t ops, uint64_t bytes)
1834 {
1835 dout(5) << __func__ << " finished " << ops << " ops and "
1836 << bytes << " bytes" << dendl;
1837 }
1838
1839 int FileJournal::make_writeable()
1840 {
1841 dout(10) << __func__ << dendl;
1842 int r = set_throttle_params();
1843 if (r < 0)
1844 return r;
1845
1846 r = _open(true);
1847 if (r < 0)
1848 return r;
1849
1850 if (read_pos > 0)
1851 write_pos = read_pos;
1852 else
1853 write_pos = get_top();
1854 read_pos = 0;
1855
1856 must_write_header = true;
1857
1858 start_writer();
1859 return 0;
1860 }
1861
1862 int FileJournal::set_throttle_params()
1863 {
1864 stringstream ss;
1865 bool valid = throttle.set_params(
1866 cct->_conf->journal_throttle_low_threshhold,
1867 cct->_conf->journal_throttle_high_threshhold,
1868 cct->_conf->filestore_expected_throughput_bytes,
1869 cct->_conf->journal_throttle_high_multiple,
1870 cct->_conf->journal_throttle_max_multiple,
1871 header.max_size - get_top(),
1872 &ss);
1873
1874 if (!valid) {
1875 derr << "tried to set invalid params: "
1876 << ss.str()
1877 << dendl;
1878 }
1879 return valid ? 0 : -EINVAL;
1880 }
1881
1882 const char** FileJournal::get_tracked_conf_keys() const
1883 {
1884 static const char *KEYS[] = {
1885 "journal_throttle_low_threshhold",
1886 "journal_throttle_high_threshhold",
1887 "journal_throttle_high_multiple",
1888 "journal_throttle_max_multiple",
1889 "filestore_expected_throughput_bytes",
1890 NULL};
1891 return KEYS;
1892 }
1893
1894 void FileJournal::wrap_read_bl(
1895 off64_t pos,
1896 int64_t olen,
1897 bufferlist* bl,
1898 off64_t *out_pos
1899 ) const
1900 {
1901 while (olen > 0) {
1902 while (pos >= header.max_size)
1903 pos = pos + get_top() - header.max_size;
1904
1905 int64_t len;
1906 if (pos + olen > header.max_size)
1907 len = header.max_size - pos; // partial
1908 else
1909 len = olen; // rest
1910
1911 int64_t actual = ::lseek64(fd, pos, SEEK_SET);
1912 assert(actual == pos);
1913
1914 bufferptr bp = buffer::create(len);
1915 int r = safe_read_exact(fd, bp.c_str(), len);
1916 if (r) {
1917 derr << "FileJournal::wrap_read_bl: safe_read_exact " << pos << "~" << len << " returned "
1918 << r << dendl;
1919 ceph_abort();
1920 }
1921 bl->push_back(std::move(bp));
1922 pos += len;
1923 olen -= len;
1924 }
1925 if (pos >= header.max_size)
1926 pos = pos + get_top() - header.max_size;
1927 if (out_pos)
1928 *out_pos = pos;
1929 }
1930
1931 bool FileJournal::read_entry(
1932 bufferlist &bl,
1933 uint64_t &next_seq,
1934 bool *corrupt)
1935 {
1936 if (corrupt)
1937 *corrupt = false;
1938 uint64_t seq = next_seq;
1939
1940 if (!read_pos) {
1941 dout(2) << "read_entry -- not readable" << dendl;
1942 return false;
1943 }
1944
1945 off64_t pos = read_pos;
1946 off64_t next_pos = pos;
1947 stringstream ss;
1948 read_entry_result result = do_read_entry(
1949 pos,
1950 &next_pos,
1951 &bl,
1952 &seq,
1953 &ss);
1954 if (result == SUCCESS) {
1955 journalq.push_back( pair<uint64_t,off64_t>(seq, pos));
1956 uint64_t amount_to_take =
1957 next_pos > pos ?
1958 next_pos - pos :
1959 (header.max_size - pos) + (next_pos - get_top());
1960 throttle.take(amount_to_take);
1961 throttle.register_throttle_seq(next_seq, amount_to_take);
1962 if (logger) {
1963 logger->inc(l_filestore_journal_ops, 1);
1964 logger->inc(l_filestore_journal_bytes, amount_to_take);
1965 }
1966 if (next_seq > seq) {
1967 return false;
1968 } else {
1969 read_pos = next_pos;
1970 next_seq = seq;
1971 if (seq > journaled_seq)
1972 journaled_seq = seq;
1973 return true;
1974 }
1975 }
1976
1977 if (seq && seq < header.committed_up_to) {
1978 derr << "Unable to read past sequence " << seq
1979 << " but header indicates the journal has committed up through "
1980 << header.committed_up_to << ", journal is corrupt" << dendl;
1981 if (cct->_conf->journal_ignore_corruption) {
1982 if (corrupt)
1983 *corrupt = true;
1984 return false;
1985 } else {
1986 ceph_abort();
1987 }
1988 }
1989
1990 dout(25) << ss.str() << dendl;
1991 dout(2) << "No further valid entries found, journal is most likely valid"
1992 << dendl;
1993 return false;
1994 }
1995
1996 FileJournal::read_entry_result FileJournal::do_read_entry(
1997 off64_t init_pos,
1998 off64_t *next_pos,
1999 bufferlist *bl,
2000 uint64_t *seq,
2001 ostream *ss,
2002 entry_header_t *_h) const
2003 {
2004 off64_t cur_pos = init_pos;
2005 bufferlist _bl;
2006 if (!bl)
2007 bl = &_bl;
2008
2009 // header
2010 entry_header_t *h;
2011 bufferlist hbl;
2012 off64_t _next_pos;
2013 wrap_read_bl(cur_pos, sizeof(*h), &hbl, &_next_pos);
2014 h = reinterpret_cast<entry_header_t *>(hbl.c_str());
2015
2016 if (!h->check_magic(cur_pos, header.get_fsid64())) {
2017 dout(25) << "read_entry " << init_pos
2018 << " : bad header magic, end of journal" << dendl;
2019 if (ss)
2020 *ss << "bad header magic";
2021 if (next_pos)
2022 *next_pos = init_pos + (4<<10); // check 4k ahead
2023 return MAYBE_CORRUPT;
2024 }
2025 cur_pos = _next_pos;
2026
2027 // pad + body + pad
2028 if (h->pre_pad)
2029 cur_pos += h->pre_pad;
2030
2031 bl->clear();
2032 wrap_read_bl(cur_pos, h->len, bl, &cur_pos);
2033
2034 if (h->post_pad)
2035 cur_pos += h->post_pad;
2036
2037 // footer
2038 entry_header_t *f;
2039 bufferlist fbl;
2040 wrap_read_bl(cur_pos, sizeof(*f), &fbl, &cur_pos);
2041 f = reinterpret_cast<entry_header_t *>(fbl.c_str());
2042 if (memcmp(f, h, sizeof(*f))) {
2043 if (ss)
2044 *ss << "bad footer magic, partial entry";
2045 if (next_pos)
2046 *next_pos = cur_pos;
2047 return MAYBE_CORRUPT;
2048 }
2049
2050 if ((header.flags & header_t::FLAG_CRC) || // if explicitly enabled (new journal)
2051 h->crc32c != 0) { // newer entry in old journal
2052 uint32_t actual_crc = bl->crc32c(0);
2053 if (actual_crc != h->crc32c) {
2054 if (ss)
2055 *ss << "header crc (" << h->crc32c
2056 << ") doesn't match body crc (" << actual_crc << ")";
2057 if (next_pos)
2058 *next_pos = cur_pos;
2059 return MAYBE_CORRUPT;
2060 }
2061 }
2062
2063 // yay!
2064 dout(2) << "read_entry " << init_pos << " : seq " << h->seq
2065 << " " << h->len << " bytes"
2066 << dendl;
2067
2068 // ok!
2069 if (seq)
2070 *seq = h->seq;
2071
2072
2073 if (next_pos)
2074 *next_pos = cur_pos;
2075
2076 if (_h)
2077 *_h = *h;
2078
2079 assert(cur_pos % header.alignment == 0);
2080 return SUCCESS;
2081 }
2082
2083 void FileJournal::reserve_throttle_and_backoff(uint64_t count)
2084 {
2085 throttle.get(count);
2086 }
2087
2088 void FileJournal::get_header(
2089 uint64_t wanted_seq,
2090 off64_t *_pos,
2091 entry_header_t *h)
2092 {
2093 off64_t pos = header.start;
2094 off64_t next_pos = pos;
2095 bufferlist bl;
2096 uint64_t seq = 0;
2097 dout(2) << __func__ << dendl;
2098 while (1) {
2099 bl.clear();
2100 pos = next_pos;
2101 read_entry_result result = do_read_entry(
2102 pos,
2103 &next_pos,
2104 &bl,
2105 &seq,
2106 0,
2107 h);
2108 if (result == FAILURE || result == MAYBE_CORRUPT)
2109 ceph_abort();
2110 if (seq == wanted_seq) {
2111 if (_pos)
2112 *_pos = pos;
2113 return;
2114 }
2115 }
2116 ceph_abort(); // not reachable
2117 }
2118
2119 void FileJournal::corrupt(
2120 int wfd,
2121 off64_t corrupt_at)
2122 {
2123 dout(2) << __func__ << dendl;
2124 if (corrupt_at >= header.max_size)
2125 corrupt_at = corrupt_at + get_top() - header.max_size;
2126
2127 int64_t actual = ::lseek64(fd, corrupt_at, SEEK_SET);
2128 assert(actual == corrupt_at);
2129
2130 char buf[10];
2131 int r = safe_read_exact(fd, buf, 1);
2132 assert(r == 0);
2133
2134 actual = ::lseek64(wfd, corrupt_at, SEEK_SET);
2135 assert(actual == corrupt_at);
2136
2137 buf[0]++;
2138 r = safe_write(wfd, buf, 1);
2139 assert(r == 0);
2140 }
2141
2142 void FileJournal::corrupt_payload(
2143 int wfd,
2144 uint64_t seq)
2145 {
2146 dout(2) << __func__ << dendl;
2147 off64_t pos = 0;
2148 entry_header_t h;
2149 get_header(seq, &pos, &h);
2150 off64_t corrupt_at =
2151 pos + sizeof(entry_header_t) + h.pre_pad;
2152 corrupt(wfd, corrupt_at);
2153 }
2154
2155
2156 void FileJournal::corrupt_footer_magic(
2157 int wfd,
2158 uint64_t seq)
2159 {
2160 dout(2) << __func__ << dendl;
2161 off64_t pos = 0;
2162 entry_header_t h;
2163 get_header(seq, &pos, &h);
2164 off64_t corrupt_at =
2165 pos + sizeof(entry_header_t) + h.pre_pad +
2166 h.len + h.post_pad +
2167 (reinterpret_cast<char*>(&h.magic2) - reinterpret_cast<char*>(&h));
2168 corrupt(wfd, corrupt_at);
2169 }
2170
2171
2172 void FileJournal::corrupt_header_magic(
2173 int wfd,
2174 uint64_t seq)
2175 {
2176 dout(2) << __func__ << dendl;
2177 off64_t pos = 0;
2178 entry_header_t h;
2179 get_header(seq, &pos, &h);
2180 off64_t corrupt_at =
2181 pos +
2182 (reinterpret_cast<char*>(&h.magic2) - reinterpret_cast<char*>(&h));
2183 corrupt(wfd, corrupt_at);
2184 }
2185
2186 off64_t FileJournal::get_journal_size_estimate()
2187 {
2188 off64_t size, start = header.start;
2189 if (write_pos < start) {
2190 size = (max_size - start) + write_pos;
2191 } else {
2192 size = write_pos - start;
2193 }
2194 dout(20) << __func__ << " journal size=" << size << dendl;
2195 return size;
2196 }