]> git.proxmox.com Git - ceph.git/blame - ceph/src/os/filestore/FileJournal.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / os / filestore / FileJournal.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14#include "acconfig.h"
15
16#include "common/debug.h"
17#include "common/errno.h"
18#include "common/safe_io.h"
19#include "FileJournal.h"
20#include "include/color.h"
21#include "common/perf_counters.h"
22#include "FileStore.h"
23
24#include "include/compat.h"
25
26#include <fcntl.h>
27#include <limits.h>
28#include <sstream>
29#include <stdio.h>
30#include <stdlib.h>
31#include <sys/types.h>
32#include <sys/stat.h>
33#include <sys/mount.h>
34
35#include "common/blkdev.h"
36#if defined(__linux__)
37#include "common/linux_version.h"
38#endif
39
40#if defined(__FreeBSD__)
41#define O_DSYNC O_SYNC
42#endif
43
44#define dout_context cct
45#define dout_subsys ceph_subsys_journal
46#undef dout_prefix
47#define dout_prefix *_dout << "journal "
48
49const static int64_t ONE_MEG(1 << 20);
50const static int CEPH_DIRECTIO_ALIGNMENT(4096);
51
52
53int FileJournal::_open(bool forwrite, bool create)
54{
55 int flags, ret;
56
57 if (forwrite) {
58 flags = O_RDWR;
59 if (directio)
60 flags |= O_DIRECT | O_DSYNC;
61 } else {
62 flags = O_RDONLY;
63 }
64 if (create)
65 flags |= O_CREAT;
66
67 if (fd >= 0) {
68 if (TEMP_FAILURE_RETRY(::close(fd))) {
69 int err = errno;
70 derr << "FileJournal::_open: error closing old fd: "
71 << cpp_strerror(err) << dendl;
72 }
73 }
91327a77 74 fd = TEMP_FAILURE_RETRY(::open(fn.c_str(), flags|O_CLOEXEC, 0644));
7c673cae
FG
75 if (fd < 0) {
76 int err = errno;
77 dout(2) << "FileJournal::_open unable to open journal "
78 << fn << ": " << cpp_strerror(err) << dendl;
79 return -err;
80 }
81
82 struct stat st;
83 ret = ::fstat(fd, &st);
84 if (ret) {
85 ret = errno;
86 derr << "FileJournal::_open: unable to fstat journal: " << cpp_strerror(ret) << dendl;
87 ret = -ret;
88 goto out_fd;
89 }
90
91 if (S_ISBLK(st.st_mode)) {
92 ret = _open_block_device();
93 } else if (S_ISREG(st.st_mode)) {
94 if (aio && !force_aio) {
95 derr << "FileJournal::_open: disabling aio for non-block journal. Use "
96 << "journal_force_aio to force use of aio anyway" << dendl;
97 aio = false;
98 }
99 ret = _open_file(st.st_size, st.st_blksize, create);
100 } else {
101 derr << "FileJournal::_open: wrong journal file type: " << st.st_mode
102 << dendl;
103 ret = -EINVAL;
104 }
105
106 if (ret)
107 goto out_fd;
108
109#ifdef HAVE_LIBAIO
110 if (aio) {
111 aio_ctx = 0;
112 ret = io_setup(128, &aio_ctx);
113 if (ret < 0) {
114 switch (ret) {
115 // Contrary to naive expectations -EAGIAN means ...
116 case -EAGAIN:
117 derr << "FileJournal::_open: user's limit of aio events exceeded. "
118 << "Try increasing /proc/sys/fs/aio-max-nr" << dendl;
119 break;
120 default:
121 derr << "FileJournal::_open: unable to setup io_context " << cpp_strerror(-ret) << dendl;
122 break;
123 }
124 goto out_fd;
125 }
126 }
127#endif
128
129 /* We really want max_size to be a multiple of block_size. */
130 max_size -= max_size % block_size;
131
132 dout(1) << "_open " << fn << " fd " << fd
133 << ": " << max_size
134 << " bytes, block size " << block_size
135 << " bytes, directio = " << directio
136 << ", aio = " << aio
137 << dendl;
138 return 0;
139
140 out_fd:
141 VOID_TEMP_FAILURE_RETRY(::close(fd));
142 fd = -1;
143 return ret;
144}
145
146int FileJournal::_open_block_device()
147{
148 int64_t bdev_sz = 0;
11fdf7f2
TL
149 BlkDev blkdev(fd);
150 int ret = blkdev.get_size(&bdev_sz);
7c673cae
FG
151 if (ret) {
152 dout(0) << __func__ << ": failed to read block device size." << dendl;
153 return -EIO;
154 }
155
156 /* Check for bdev_sz too small */
157 if (bdev_sz < ONE_MEG) {
158 dout(0) << __func__ << ": your block device must be at least "
159 << ONE_MEG << " bytes to be used for a Ceph journal." << dendl;
160 return -EINVAL;
161 }
162
163 dout(10) << __func__ << ": ignoring osd journal size. "
164 << "We'll use the entire block device (size: " << bdev_sz << ")"
165 << dendl;
166 max_size = bdev_sz;
167
168 block_size = cct->_conf->journal_block_size;
169
170 if (cct->_conf->journal_discard) {
11fdf7f2 171 discard = blkdev.support_discard();
7c673cae
FG
172 dout(10) << fn << " support discard: " << (int)discard << dendl;
173 }
174
175 return 0;
176}
177
178int FileJournal::_open_file(int64_t oldsize, blksize_t blksize,
179 bool create)
180{
181 int ret;
182 int64_t conf_journal_sz(cct->_conf->osd_journal_size);
183 conf_journal_sz <<= 20;
184
185 if ((cct->_conf->osd_journal_size == 0) && (oldsize < ONE_MEG)) {
186 derr << "I'm sorry, I don't know how large of a journal to create."
187 << "Please specify a block device to use as the journal OR "
188 << "set osd_journal_size in your ceph.conf" << dendl;
189 return -EINVAL;
190 }
191
192 if (create && (oldsize < conf_journal_sz)) {
193 uint64_t newsize(conf_journal_sz);
194 dout(10) << __func__ << " _open extending to " << newsize << " bytes" << dendl;
195 ret = ::ftruncate(fd, newsize);
196 if (ret < 0) {
197 int err = errno;
198 derr << "FileJournal::_open_file : unable to extend journal to "
199 << newsize << " bytes: " << cpp_strerror(err) << dendl;
200 return -err;
201 }
28e407b8 202 ret = ceph_posix_fallocate(fd, 0, newsize);
7c673cae
FG
203 if (ret) {
204 derr << "FileJournal::_open_file : unable to preallocation journal to "
205 << newsize << " bytes: " << cpp_strerror(ret) << dendl;
206 return -ret;
207 }
208 max_size = newsize;
7c673cae
FG
209 }
210 else {
211 max_size = oldsize;
212 }
213 block_size = cct->_conf->journal_block_size;
214
215 if (create && cct->_conf->journal_zero_on_create) {
216 derr << "FileJournal::_open_file : zeroing journal" << dendl;
217 uint64_t write_size = 1 << 20;
218 char *buf;
219 ret = ::posix_memalign((void **)&buf, block_size, write_size);
220 if (ret != 0) {
221 return -ret;
222 }
223 memset(static_cast<void*>(buf), 0, write_size);
224 uint64_t i = 0;
225 for (; (i + write_size) <= (uint64_t)max_size; i += write_size) {
226 ret = ::pwrite(fd, static_cast<void*>(buf), write_size, i);
227 if (ret < 0) {
228 free(buf);
229 return -errno;
230 }
231 }
232 if (i < (uint64_t)max_size) {
233 ret = ::pwrite(fd, static_cast<void*>(buf), max_size - i, i);
234 if (ret < 0) {
235 free(buf);
236 return -errno;
237 }
238 }
239 free(buf);
240 }
241
242
243 dout(10) << "_open journal is not a block device, NOT checking disk "
244 << "write cache on '" << fn << "'" << dendl;
245
246 return 0;
247}
248
249// This can not be used on an active journal
250int FileJournal::check()
251{
252 int ret;
253
11fdf7f2 254 ceph_assert(fd == -1);
7c673cae
FG
255 ret = _open(false, false);
256 if (ret)
257 return ret;
258
259 ret = read_header(&header);
260 if (ret < 0)
261 goto done;
262
263 if (header.fsid != fsid) {
264 derr << "check: ondisk fsid " << header.fsid << " doesn't match expected " << fsid
265 << ", invalid (someone else's?) journal" << dendl;
266 ret = -EINVAL;
267 goto done;
268 }
269
270 dout(1) << "check: header looks ok" << dendl;
271 ret = 0;
272
273 done:
274 close();
275 return ret;
276}
277
278
279int FileJournal::create()
280{
281 void *buf = 0;
282 int64_t needed_space;
283 int ret;
284 buffer::ptr bp;
285 dout(2) << "create " << fn << " fsid " << fsid << dendl;
286
287 ret = _open(true, true);
288 if (ret)
289 goto done;
290
291 // write empty header
292 header = header_t();
293 header.flags = header_t::FLAG_CRC; // enable crcs on any new journal.
294 header.fsid = fsid;
295 header.max_size = max_size;
296 header.block_size = block_size;
297 if (cct->_conf->journal_block_align || directio)
298 header.alignment = block_size;
299 else
300 header.alignment = 16; // at least stay word aligned on 64bit machines...
301
302 header.start = get_top();
303 header.start_seq = 0;
304
305 print_header(header);
306
307 // static zeroed buffer for alignment padding
308 delete [] zero_buf;
309 zero_buf = new char[header.alignment];
310 memset(zero_buf, 0, header.alignment);
311
312 bp = prepare_header();
313 if (TEMP_FAILURE_RETRY(::pwrite(fd, bp.c_str(), bp.length(), 0)) < 0) {
314 ret = -errno;
315 derr << "FileJournal::create : create write header error "
316 << cpp_strerror(ret) << dendl;
317 goto close_fd;
318 }
319
320 // zero first little bit, too.
321 ret = posix_memalign(&buf, block_size, block_size);
322 if (ret) {
323 ret = -ret;
324 derr << "FileJournal::create: failed to allocate " << block_size
325 << " bytes of memory: " << cpp_strerror(ret) << dendl;
326 goto close_fd;
327 }
328 memset(buf, 0, block_size);
329 if (TEMP_FAILURE_RETRY(::pwrite(fd, buf, block_size, get_top())) < 0) {
330 ret = -errno;
331 derr << "FileJournal::create: error zeroing first " << block_size
332 << " bytes " << cpp_strerror(ret) << dendl;
333 goto free_buf;
334 }
335
11fdf7f2 336 needed_space = cct->_conf->osd_max_write_size << 20;
7c673cae
FG
337 needed_space += (2 * sizeof(entry_header_t)) + get_top();
338 if (header.max_size - header.start < needed_space) {
339 derr << "FileJournal::create: OSD journal is not large enough to hold "
340 << "osd_max_write_size bytes!" << dendl;
341 ret = -ENOSPC;
342 goto free_buf;
343 }
344
345 dout(2) << "create done" << dendl;
346 ret = 0;
347
348free_buf:
349 free(buf);
350 buf = 0;
351close_fd:
352 if (TEMP_FAILURE_RETRY(::close(fd)) < 0) {
353 ret = -errno;
354 derr << "FileJournal::create: error closing fd: " << cpp_strerror(ret)
355 << dendl;
356 }
357done:
358 fd = -1;
359 return ret;
360}
361
362// This can not be used on an active journal
363int FileJournal::peek_fsid(uuid_d& fsid)
364{
11fdf7f2 365 ceph_assert(fd == -1);
7c673cae
FG
366 int r = _open(false, false);
367 if (r)
368 return r;
369 r = read_header(&header);
370 if (r < 0)
371 goto out;
372 fsid = header.fsid;
373out:
374 close();
375 return r;
376}
377
378int FileJournal::open(uint64_t fs_op_seq)
379{
380 dout(2) << "open " << fn << " fsid " << fsid << " fs_op_seq " << fs_op_seq << dendl;
381
382 uint64_t next_seq = fs_op_seq + 1;
224ce89b 383 uint64_t seq = -1;
7c673cae
FG
384
385 int err = _open(false);
386 if (err)
387 return err;
388
389 // assume writeable, unless...
390 read_pos = 0;
391 write_pos = get_top();
392
393 // read header?
394 err = read_header(&header);
395 if (err < 0)
224ce89b 396 goto out;
7c673cae
FG
397
398 // static zeroed buffer for alignment padding
399 delete [] zero_buf;
400 zero_buf = new char[header.alignment];
401 memset(zero_buf, 0, header.alignment);
402
403 dout(10) << "open header.fsid = " << header.fsid
404 //<< " vs expected fsid = " << fsid
405 << dendl;
406 if (header.fsid != fsid) {
407 derr << "FileJournal::open: ondisk fsid " << header.fsid << " doesn't match expected " << fsid
408 << ", invalid (someone else's?) journal" << dendl;
224ce89b
WB
409 err = -EINVAL;
410 goto out;
7c673cae
FG
411 }
412 if (header.max_size > max_size) {
413 dout(2) << "open journal size " << header.max_size << " > current " << max_size << dendl;
224ce89b
WB
414 err = -EINVAL;
415 goto out;
7c673cae
FG
416 }
417 if (header.block_size != block_size) {
418 dout(2) << "open journal block size " << header.block_size << " != current " << block_size << dendl;
224ce89b
WB
419 err = -EINVAL;
420 goto out;
7c673cae
FG
421 }
422 if (header.max_size % header.block_size) {
423 dout(2) << "open journal max size " << header.max_size
424 << " not a multiple of block size " << header.block_size << dendl;
224ce89b
WB
425 err = -EINVAL;
426 goto out;
7c673cae
FG
427 }
428 if (header.alignment != block_size && directio) {
429 dout(0) << "open journal alignment " << header.alignment << " does not match block size "
430 << block_size << " (required for direct_io journal mode)" << dendl;
224ce89b
WB
431 err = -EINVAL;
432 goto out;
7c673cae
FG
433 }
434 if ((header.alignment % CEPH_DIRECTIO_ALIGNMENT) && directio) {
435 dout(0) << "open journal alignment " << header.alignment
436 << " is not multiple of minimum directio alignment "
437 << CEPH_DIRECTIO_ALIGNMENT << " (required for direct_io journal mode)"
438 << dendl;
224ce89b
WB
439 err = -EINVAL;
440 goto out;
7c673cae
FG
441 }
442
443 // looks like a valid header.
444 write_pos = 0; // not writeable yet
445
446 journaled_seq = header.committed_up_to;
447
448 // find next entry
449 read_pos = header.start;
224ce89b 450 seq = header.start_seq;
7c673cae
FG
451
452 while (1) {
453 bufferlist bl;
454 off64_t old_pos = read_pos;
455 if (!read_entry(bl, seq)) {
456 dout(10) << "open reached end of journal." << dendl;
457 break;
458 }
459 if (seq > next_seq) {
460 dout(10) << "open entry " << seq << " len " << bl.length() << " > next_seq " << next_seq
461 << ", ignoring journal contents"
462 << dendl;
463 read_pos = -1;
464 last_committed_seq = 0;
465 return 0;
466 }
467 if (seq == next_seq) {
468 dout(10) << "open reached seq " << seq << dendl;
469 read_pos = old_pos;
470 break;
471 }
472 seq++; // next event should follow.
473 }
474
475 return 0;
224ce89b
WB
476out:
477 close();
478 return err;
7c673cae
FG
479}
480
481void FileJournal::_close(int fd) const
482{
483 VOID_TEMP_FAILURE_RETRY(::close(fd));
484}
485
486void FileJournal::close()
487{
488 dout(1) << "close " << fn << dendl;
489
490 // stop writer thread
491 stop_writer();
492
493 // close
11fdf7f2
TL
494 ceph_assert(writeq_empty());
495 ceph_assert(!must_write_header);
496 ceph_assert(fd >= 0);
7c673cae
FG
497 _close(fd);
498 fd = -1;
499}
500
501
502int FileJournal::dump(ostream& out)
503{
504 return _dump(out, false);
505}
506
507int FileJournal::simple_dump(ostream& out)
508{
509 return _dump(out, true);
510}
511
512int FileJournal::_dump(ostream& out, bool simple)
513{
514 JSONFormatter f(true);
515 int ret = _fdump(f, simple);
516 f.flush(out);
517 return ret;
518}
519
520int FileJournal::_fdump(Formatter &f, bool simple)
521{
522 dout(10) << "_fdump" << dendl;
523
11fdf7f2 524 ceph_assert(fd == -1);
7c673cae
FG
525 int err = _open(false, false);
526 if (err)
527 return err;
528
529 err = read_header(&header);
530 if (err < 0) {
531 close();
532 return err;
533 }
534
535 off64_t next_pos = header.start;
536
537 f.open_object_section("journal");
538
539 f.open_object_section("header");
540 f.dump_unsigned("flags", header.flags);
541 ostringstream os;
542 os << header.fsid;
543 f.dump_string("fsid", os.str());
544 f.dump_unsigned("block_size", header.block_size);
545 f.dump_unsigned("alignment", header.alignment);
546 f.dump_int("max_size", header.max_size);
547 f.dump_int("start", header.start);
548 f.dump_unsigned("committed_up_to", header.committed_up_to);
549 f.dump_unsigned("start_seq", header.start_seq);
550 f.close_section();
551
552 f.open_array_section("entries");
553 uint64_t seq = header.start_seq;
554 while (1) {
555 bufferlist bl;
556 off64_t pos = next_pos;
557
558 if (!pos) {
559 dout(2) << "_dump -- not readable" << dendl;
560 err = -EINVAL;
561 break;
562 }
563 stringstream ss;
564 read_entry_result result = do_read_entry(
565 pos,
566 &next_pos,
567 &bl,
568 &seq,
569 &ss);
570 if (result != SUCCESS) {
571 if (seq < header.committed_up_to) {
572 dout(2) << "Unable to read past sequence " << seq
573 << " but header indicates the journal has committed up through "
574 << header.committed_up_to << ", journal is corrupt" << dendl;
575 err = -EINVAL;
576 }
577 dout(25) << ss.str() << dendl;
578 dout(25) << "No further valid entries found, journal is most likely valid"
579 << dendl;
580 break;
581 }
582
583 f.open_object_section("entry");
584 f.dump_unsigned("offset", pos);
585 f.dump_unsigned("seq", seq);
586 if (simple) {
587 f.dump_unsigned("bl.length", bl.length());
588 } else {
589 f.open_array_section("transactions");
11fdf7f2 590 auto p = bl.cbegin();
7c673cae
FG
591 int trans_num = 0;
592 while (!p.end()) {
593 ObjectStore::Transaction t(p);
594 f.open_object_section("transaction");
595 f.dump_unsigned("trans_num", trans_num);
596 t.dump(&f);
597 f.close_section();
598 trans_num++;
599 }
600 f.close_section();
601 }
602 f.close_section();
603 }
604
605 f.close_section();
606 f.close_section();
607 dout(10) << "dump finish" << dendl;
608
609 close();
610 return err;
611}
612
613
614void FileJournal::start_writer()
615{
616 write_stop = false;
617 aio_stop = false;
618 write_thread.create("journal_write");
619#ifdef HAVE_LIBAIO
620 if (aio)
621 write_finish_thread.create("journal_wrt_fin");
622#endif
623}
624
625void FileJournal::stop_writer()
626{
627 // Do nothing if writer already stopped or never started
628 if (!write_stop)
629 {
630 {
9f95a23c
TL
631 std::lock_guard l{write_lock};
632 std::lock_guard p{writeq_lock};
7c673cae 633 write_stop = true;
9f95a23c 634 writeq_cond.notify_all();
7c673cae
FG
635 // Doesn't hurt to signal commit_cond in case thread is waiting there
636 // and caller didn't use committed_thru() first.
9f95a23c 637 commit_cond.notify_all();
7c673cae
FG
638 }
639 write_thread.join();
640
641 // write journal header now so that we have less to replay on remount
642 write_header_sync();
643 }
644
645#ifdef HAVE_LIBAIO
646 // stop aio completeion thread *after* writer thread has stopped
647 // and has submitted all of its io
648 if (aio && !aio_stop) {
9f95a23c 649 aio_lock.lock();
7c673cae 650 aio_stop = true;
9f95a23c
TL
651 aio_cond.notify_all();
652 write_finish_cond.notify_all();
653 aio_lock.unlock();
7c673cae
FG
654 write_finish_thread.join();
655 }
656#endif
657}
658
659
660
661void FileJournal::print_header(const header_t &header) const
662{
663 dout(10) << "header: block_size " << header.block_size
664 << " alignment " << header.alignment
665 << " max_size " << header.max_size
666 << dendl;
667 dout(10) << "header: start " << header.start << dendl;
668 dout(10) << " write_pos " << write_pos << dendl;
669}
670
671int FileJournal::read_header(header_t *hdr) const
672{
673 dout(10) << "read_header" << dendl;
674 bufferlist bl;
675
11fdf7f2 676 buffer::ptr bp = buffer::create_small_page_aligned(block_size);
7c673cae
FG
677 char* bpdata = bp.c_str();
678 int r = ::pread(fd, bpdata, bp.length(), 0);
679
680 if (r < 0) {
681 int err = errno;
682 dout(0) << "read_header got " << cpp_strerror(err) << dendl;
683 return -err;
684 }
685
686 // don't use bp.zero() here, because it also invalidates
687 // crc cache (which is not yet populated anyway)
688 if (bp.length() != (size_t)r) {
689 // r will be always less or equal than bp.length
690 bpdata += r;
691 memset(bpdata, 0, bp.length() - r);
692 }
693
694 bl.push_back(std::move(bp));
695
696 try {
11fdf7f2
TL
697 auto p = bl.cbegin();
698 decode(*hdr, p);
7c673cae
FG
699 }
700 catch (buffer::error& e) {
701 derr << "read_header error decoding journal header" << dendl;
702 return -EINVAL;
703 }
704
705
706 /*
707 * Unfortunately we weren't initializing the flags field for new
708 * journals! Aie. This is safe(ish) now that we have only one
709 * flag. Probably around when we add the next flag we need to
710 * remove this or else this (eventually old) code will clobber newer
711 * code's flags.
712 */
713 if (hdr->flags > 3) {
714 derr << "read_header appears to have gibberish flags; assuming 0" << dendl;
715 hdr->flags = 0;
716 }
717
718 print_header(*hdr);
719
720 return 0;
721}
722
723bufferptr FileJournal::prepare_header()
724{
725 bufferlist bl;
726 {
9f95a23c 727 std::lock_guard l{finisher_lock};
7c673cae
FG
728 header.committed_up_to = journaled_seq;
729 }
11fdf7f2
TL
730 encode(header, bl);
731 bufferptr bp = buffer::create_small_page_aligned(get_top());
7c673cae
FG
732 // don't use bp.zero() here, because it also invalidates
733 // crc cache (which is not yet populated anyway)
734 char* data = bp.c_str();
735 memcpy(data, bl.c_str(), bl.length());
736 data += bl.length();
737 memset(data, 0, bp.length()-bl.length());
738 return bp;
739}
740
741void FileJournal::write_header_sync()
742{
9f95a23c 743 std::lock_guard locker{write_lock};
7c673cae
FG
744 must_write_header = true;
745 bufferlist bl;
746 do_write(bl);
747 dout(20) << __func__ << " finish" << dendl;
748}
749
750int FileJournal::check_for_full(uint64_t seq, off64_t pos, off64_t size)
751{
752 // already full?
753 if (full_state != FULL_NOTFULL)
754 return -ENOSPC;
755
756 // take 1 byte off so that we only get pos == header.start on EMPTY, never on FULL.
757 off64_t room;
758 if (pos >= header.start)
759 room = (header.max_size - pos) + (header.start - get_top()) - 1;
760 else
761 room = header.start - pos - 1;
762 dout(10) << "room " << room << " max_size " << max_size << " pos " << pos << " header.start " << header.start
763 << " top " << get_top() << dendl;
764
765 if (do_sync_cond) {
766 if (room >= (header.max_size >> 1) &&
767 room - size < (header.max_size >> 1)) {
768 dout(10) << " passing half full mark, triggering commit" << dendl;
9f95a23c
TL
769#ifdef CEPH_DEBUG_MUTEX
770 do_sync_cond->notify_all(true); // initiate a real commit so we can trim
771#else
772 do_sync_cond->notify_all();
773#endif
7c673cae
FG
774 }
775 }
776
777 if (room >= size) {
778 dout(10) << "check_for_full at " << pos << " : " << size << " < " << room << dendl;
779 if (pos + size > header.max_size)
780 must_write_header = true;
781 return 0;
782 }
783
784 // full
785 dout(1) << "check_for_full at " << pos << " : JOURNAL FULL "
786 << pos << " >= " << room
787 << " (max_size " << header.max_size << " start " << header.start << ")"
788 << dendl;
789
790 off64_t max = header.max_size - get_top();
791 if (size > max)
792 dout(0) << "JOURNAL TOO SMALL: continuing, but slow: item " << size << " > journal " << max << " (usable)" << dendl;
793
794 return -ENOSPC;
795}
796
797int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytes)
798{
799 // gather queued writes
800 off64_t queue_pos = write_pos;
801
802 int eleft = cct->_conf->journal_max_write_entries;
803 unsigned bmax = cct->_conf->journal_max_write_bytes;
804
805 if (full_state != FULL_NOTFULL)
806 return -ENOSPC;
807
808 while (!writeq_empty()) {
809 list<write_item> items;
810 batch_pop_write(items);
811 list<write_item>::iterator it = items.begin();
812 while (it != items.end()) {
813 uint64_t bytes = it->bl.length();
814 int r = prepare_single_write(*it, bl, queue_pos, orig_ops, orig_bytes);
815 if (r == 0) { // prepare ok, delete it
816 items.erase(it++);
817#ifdef HAVE_LIBAIO
818 {
9f95a23c 819 std::lock_guard locker{aio_lock};
11fdf7f2 820 ceph_assert(aio_write_queue_ops > 0);
7c673cae 821 aio_write_queue_ops--;
11fdf7f2 822 ceph_assert(aio_write_queue_bytes >= bytes);
7c673cae
FG
823 aio_write_queue_bytes -= bytes;
824 }
825#else
826 (void)bytes;
827#endif
828 }
829 if (r == -ENOSPC) {
830 // the journal maybe full, insert the left item to writeq
831 batch_unpop_write(items);
832 if (orig_ops)
833 goto out; // commit what we have
834
835 if (logger)
836 logger->inc(l_filestore_journal_full);
837
838 if (wait_on_full) {
839 dout(20) << "prepare_multi_write full on first entry, need to wait" << dendl;
840 } else {
841 dout(20) << "prepare_multi_write full on first entry, restarting journal" << dendl;
842
843 // throw out what we have so far
844 full_state = FULL_FULL;
845 while (!writeq_empty()) {
846 complete_write(1, peek_write().orig_len);
847 pop_write();
848 }
849 print_header(header);
850 }
224ce89b 851
7c673cae
FG
852 return -ENOSPC; // hrm, full on first op
853 }
854 if (eleft) {
855 if (--eleft == 0) {
856 dout(20) << "prepare_multi_write hit max events per write "
857 << cct->_conf->journal_max_write_entries << dendl;
858 batch_unpop_write(items);
859 goto out;
860 }
861 }
862 if (bmax) {
863 if (bl.length() >= bmax) {
864 dout(20) << "prepare_multi_write hit max write size "
865 << cct->_conf->journal_max_write_bytes << dendl;
866 batch_unpop_write(items);
867 goto out;
868 }
869 }
870 }
871 }
872
873out:
874 dout(20) << "prepare_multi_write queue_pos now " << queue_pos << dendl;
11fdf7f2 875 ceph_assert((write_pos + bl.length() == queue_pos) ||
7c673cae
FG
876 (write_pos + bl.length() - header.max_size + get_top() == queue_pos));
877 return 0;
878}
879
880/*
881void FileJournal::queue_write_fin(uint64_t seq, Context *fin)
882{
883 writing_seq.push_back(seq);
884 if (!waiting_for_notfull.empty()) {
885 // make sure previously unjournaled stuff waiting for UNFULL triggers
886 // _before_ newly journaled stuff does
887 dout(10) << "queue_write_fin will defer seq " << seq << " callback " << fin
888 << " until after UNFULL" << dendl;
889 C_Gather *g = new C_Gather(writeq.front().fin);
890 writing_fin.push_back(g->new_sub());
891 waiting_for_notfull.push_back(g->new_sub());
892 } else {
893 writing_fin.push_back(writeq.front().fin);
894 dout(20) << "queue_write_fin seq " << seq << " callback " << fin << dendl;
895 }
896}
897*/
898
899void FileJournal::queue_completions_thru(uint64_t seq)
900{
9f95a23c 901 ceph_assert(ceph_mutex_is_locked(finisher_lock));
7c673cae
FG
902 utime_t now = ceph_clock_now();
903 list<completion_item> items;
904 batch_pop_completions(items);
905 list<completion_item>::iterator it = items.begin();
906 while (it != items.end()) {
907 completion_item& next = *it;
908 if (next.seq > seq)
909 break;
910 utime_t lat = now;
911 lat -= next.start;
912 dout(10) << "queue_completions_thru seq " << seq
913 << " queueing seq " << next.seq
914 << " " << next.finish
915 << " lat " << lat << dendl;
916 if (logger) {
917 logger->tinc(l_filestore_journal_latency, lat);
918 }
919 if (next.finish)
920 finisher->queue(next.finish);
921 if (next.tracked_op) {
922 next.tracked_op->mark_event("journaled_completion_queued");
923 next.tracked_op->journal_trace.event("queued completion");
924 next.tracked_op->journal_trace.keyval("completed through", seq);
925 }
926 items.erase(it++);
927 }
928 batch_unpop_completions(items);
9f95a23c 929 finisher_cond.notify_all();
7c673cae
FG
930}
931
932
933int FileJournal::prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes)
934{
935 uint64_t seq = next_write.seq;
936 bufferlist &ebl = next_write.bl;
937 off64_t size = ebl.length();
938
939 int r = check_for_full(seq, queue_pos, size);
940 if (r < 0)
941 return r; // ENOSPC or EAGAIN
942
943 uint32_t orig_len = next_write.orig_len;
944 orig_bytes += orig_len;
945 orig_ops++;
946
947 // add to write buffer
948 dout(15) << "prepare_single_write " << orig_ops << " will write " << queue_pos << " : seq " << seq
949 << " len " << orig_len << " -> " << size << dendl;
950
951 unsigned seq_offset = offsetof(entry_header_t, seq);
952 unsigned magic1_offset = offsetof(entry_header_t, magic1);
953 unsigned magic2_offset = offsetof(entry_header_t, magic2);
954
955 bufferptr headerptr = ebl.buffers().front();
956 uint64_t _seq = seq;
957 uint64_t _queue_pos = queue_pos;
958 uint64_t magic2 = entry_header_t::make_magic(seq, orig_len, header.get_fsid64());
959 headerptr.copy_in(seq_offset, sizeof(uint64_t), (char *)&_seq);
960 headerptr.copy_in(magic1_offset, sizeof(uint64_t), (char *)&_queue_pos);
961 headerptr.copy_in(magic2_offset, sizeof(uint64_t), (char *)&magic2);
962
963 bufferptr footerptr = ebl.buffers().back();
964 unsigned post_offset = footerptr.length() - sizeof(entry_header_t);
965 footerptr.copy_in(post_offset + seq_offset, sizeof(uint64_t), (char *)&_seq);
966 footerptr.copy_in(post_offset + magic1_offset, sizeof(uint64_t), (char *)&_queue_pos);
967 footerptr.copy_in(post_offset + magic2_offset, sizeof(uint64_t), (char *)&magic2);
968
969 bl.claim_append(ebl);
970 if (next_write.tracked_op) {
971 next_write.tracked_op->mark_event("write_thread_in_journal_buffer");
972 next_write.tracked_op->journal_trace.event("prepare_single_write");
973 }
974
975 journalq.push_back(pair<uint64_t,off64_t>(seq, queue_pos));
976 writing_seq = seq;
977
978 queue_pos += size;
979 if (queue_pos >= header.max_size)
980 queue_pos = queue_pos + get_top() - header.max_size;
981
982 return 0;
983}
984
985void FileJournal::check_align(off64_t pos, bufferlist& bl)
986{
987 // make sure list segments are page aligned
988 if (directio && !bl.is_aligned_size_and_memory(block_size, CEPH_DIRECTIO_ALIGNMENT)) {
11fdf7f2
TL
989 ceph_assert((bl.length() & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0);
990 ceph_assert((pos & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0);
991 ceph_abort_msg("bl was not aligned");
7c673cae
FG
992 }
993}
994
995int FileJournal::write_bl(off64_t& pos, bufferlist& bl)
996{
997 int ret;
998
999 off64_t spos = ::lseek64(fd, pos, SEEK_SET);
1000 if (spos < 0) {
1001 ret = -errno;
1002 derr << "FileJournal::write_bl : lseek64 failed " << cpp_strerror(ret) << dendl;
1003 return ret;
1004 }
1005 ret = bl.write_fd(fd);
1006 if (ret) {
1007 derr << "FileJournal::write_bl : write_fd failed: " << cpp_strerror(ret) << dendl;
1008 return ret;
1009 }
1010 pos += bl.length();
1011 if (pos == header.max_size)
1012 pos = get_top();
1013 return 0;
1014}
1015
1016void FileJournal::do_write(bufferlist& bl)
1017{
1018 // nothing to do?
1019 if (bl.length() == 0 && !must_write_header)
1020 return;
1021
1022 buffer::ptr hbp;
1023 if (cct->_conf->journal_write_header_frequency &&
1024 (((++journaled_since_start) %
1025 cct->_conf->journal_write_header_frequency) == 0)) {
1026 must_write_header = true;
1027 }
1028
1029 if (must_write_header) {
1030 must_write_header = false;
1031 hbp = prepare_header();
1032 }
1033
1034 dout(15) << "do_write writing " << write_pos << "~" << bl.length()
1035 << (hbp.length() ? " + header":"")
1036 << dendl;
1037
1038 utime_t from = ceph_clock_now();
1039
1040 // entry
1041 off64_t pos = write_pos;
1042
1043 // Adjust write_pos
1044 write_pos += bl.length();
1045 if (write_pos >= header.max_size)
1046 write_pos = write_pos - header.max_size + get_top();
1047
9f95a23c 1048 write_lock.unlock();
7c673cae
FG
1049
1050 // split?
1051 off64_t split = 0;
1052 if (pos + bl.length() > header.max_size) {
1053 bufferlist first, second;
1054 split = header.max_size - pos;
1055 first.substr_of(bl, 0, split);
1056 second.substr_of(bl, split, bl.length() - split);
11fdf7f2 1057 ceph_assert(first.length() + second.length() == bl.length());
7c673cae
FG
1058 dout(10) << "do_write wrapping, first bit at " << pos << " len " << first.length()
1059 << " second bit len " << second.length() << " (orig len " << bl.length() << ")" << dendl;
1060
1061 //Save pos to write first piece second
1062 off64_t first_pos = pos;
1063 off64_t orig_pos;
1064 pos = get_top();
1065 // header too?
1066 if (hbp.length()) {
1067 // be sneaky: include the header in the second fragment
11fdf7f2
TL
1068 bufferlist tmp;
1069 tmp.push_back(hbp);
1070 tmp.claim_append(second);
1071 second.swap(tmp);
7c673cae
FG
1072 pos = 0; // we included the header
1073 }
1074 // Write the second portion first possible with the header, so
1075 // do_read_entry() won't even get a valid entry_header_t if there
1076 // is a crash between the two writes.
1077 orig_pos = pos;
1078 if (write_bl(pos, second)) {
1079 derr << "FileJournal::do_write: write_bl(pos=" << orig_pos
1080 << ") failed" << dendl;
1081 check_align(pos, second);
1082 ceph_abort();
1083 }
1084 orig_pos = first_pos;
1085 if (write_bl(first_pos, first)) {
1086 derr << "FileJournal::do_write: write_bl(pos=" << orig_pos
1087 << ") failed" << dendl;
1088 check_align(first_pos, first);
1089 ceph_abort();
1090 }
11fdf7f2 1091 ceph_assert(first_pos == get_top());
7c673cae
FG
1092 } else {
1093 // header too?
1094 if (hbp.length()) {
1095 if (TEMP_FAILURE_RETRY(::pwrite(fd, hbp.c_str(), hbp.length(), 0)) < 0) {
1096 int err = errno;
1097 derr << "FileJournal::do_write: pwrite(fd=" << fd
1098 << ", hbp.length=" << hbp.length() << ") failed :"
1099 << cpp_strerror(err) << dendl;
1100 ceph_abort();
1101 }
1102 }
1103
1104 if (write_bl(pos, bl)) {
1105 derr << "FileJournal::do_write: write_bl(pos=" << pos
1106 << ") failed" << dendl;
1107 check_align(pos, bl);
1108 ceph_abort();
1109 }
1110 }
1111
1112 if (!directio) {
1113 dout(20) << "do_write fsync" << dendl;
1114
1115 /*
1116 * We'd really love to have a fsync_range or fdatasync_range and do a:
1117 *
1118 * if (split) {
1119 * ::fsync_range(fd, header.max_size - split, split)l
1120 * ::fsync_range(fd, get_top(), bl.length() - split);
1121 * else
1122 * ::fsync_range(fd, write_pos, bl.length())
1123 *
1124 * NetBSD and AIX apparently have it, and adding it to Linux wouldn't be
1125 * too hard given all the underlying infrastructure already exist.
1126 *
1127 * NOTE: using sync_file_range here would not be safe as it does not
1128 * flush disk caches or commits any sort of metadata.
1129 */
1130 int ret = 0;
11fdf7f2 1131#if defined(__APPLE__) || defined(__FreeBSD__)
7c673cae
FG
1132 ret = ::fsync(fd);
1133#else
1134 ret = ::fdatasync(fd);
1135#endif
1136 if (ret < 0) {
1137 derr << __func__ << " fsync/fdatasync failed: " << cpp_strerror(errno) << dendl;
1138 ceph_abort();
1139 }
1140#ifdef HAVE_POSIX_FADVISE
1141 if (cct->_conf->filestore_fadvise)
1142 posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED);
1143#endif
1144 }
1145
1146 utime_t lat = ceph_clock_now() - from;
1147 dout(20) << "do_write latency " << lat << dendl;
1148
9f95a23c 1149 write_lock.lock();
7c673cae 1150
11fdf7f2
TL
1151 ceph_assert(write_pos == pos);
1152 ceph_assert(write_pos % header.alignment == 0);
7c673cae
FG
1153
1154 {
9f95a23c 1155 std::lock_guard locker{finisher_lock};
7c673cae
FG
1156 journaled_seq = writing_seq;
1157
1158 // kick finisher?
1159 // only if we haven't filled up recently!
1160 if (full_state != FULL_NOTFULL) {
1161 dout(10) << "do_write NOT queueing finisher seq " << journaled_seq
1162 << ", full_commit_seq|full_restart_seq" << dendl;
1163 } else {
1164 if (plug_journal_completions) {
1165 dout(20) << "do_write NOT queueing finishers through seq " << journaled_seq
1166 << " due to completion plug" << dendl;
1167 } else {
1168 dout(20) << "do_write queueing finishers through seq " << journaled_seq << dendl;
1169 queue_completions_thru(journaled_seq);
1170 }
1171 }
1172 }
1173}
1174
1175void FileJournal::flush()
1176{
1177 dout(10) << "waiting for completions to empty" << dendl;
1178 {
9f95a23c
TL
1179 std::unique_lock l{finisher_lock};
1180 finisher_cond.wait(l, [this] { return completions_empty(); });
7c673cae
FG
1181 }
1182 dout(10) << "flush waiting for finisher" << dendl;
1183 finisher->wait_for_empty();
1184 dout(10) << "flush done" << dendl;
1185}
1186
1187
1188void FileJournal::write_thread_entry()
1189{
1190 dout(10) << "write_thread_entry start" << dendl;
1191 while (1) {
1192 {
9f95a23c 1193 std::unique_lock locker{writeq_lock};
7c673cae
FG
1194 if (writeq.empty() && !must_write_header) {
1195 if (write_stop)
1196 break;
1197 dout(20) << "write_thread_entry going to sleep" << dendl;
9f95a23c 1198 writeq_cond.wait(locker);
7c673cae
FG
1199 dout(20) << "write_thread_entry woke up" << dendl;
1200 continue;
1201 }
1202 }
1203
1204#ifdef HAVE_LIBAIO
1205 if (aio) {
9f95a23c 1206 std::unique_lock locker{aio_lock};
7c673cae
FG
1207 // should we back off to limit aios in flight? try to do this
1208 // adaptively so that we submit larger aios once we have lots of
1209 // them in flight.
1210 //
1211 // NOTE: our condition here is based on aio_num (protected by
1212 // aio_lock) and throttle_bytes (part of the write queue). when
1213 // we sleep, we *only* wait for aio_num to change, and do not
1214 // wake when more data is queued. this is not strictly correct,
1215 // but should be fine given that we will have plenty of aios in
1216 // flight if we hit this limit to ensure we keep the device
1217 // saturated.
1218 while (aio_num > 0) {
11fdf7f2 1219 int exp = std::min<int>(aio_num * 2, 24);
7c673cae
FG
1220 long unsigned min_new = 1ull << exp;
1221 uint64_t cur = aio_write_queue_bytes;
1222 dout(20) << "write_thread_entry aio throttle: aio num " << aio_num << " bytes " << aio_bytes
1223 << " ... exp " << exp << " min_new " << min_new
1224 << " ... pending " << cur << dendl;
1225 if (cur >= min_new)
1226 break;
1227 dout(20) << "write_thread_entry deferring until more aios complete: "
1228 << aio_num << " aios with " << aio_bytes << " bytes needs " << min_new
1229 << " bytes to start a new aio (currently " << cur << " pending)" << dendl;
9f95a23c 1230 aio_cond.wait(locker);
7c673cae
FG
1231 dout(20) << "write_thread_entry woke up" << dendl;
1232 }
1233 }
1234#endif
1235
9f95a23c 1236 std::unique_lock locker{write_lock};
7c673cae
FG
1237 uint64_t orig_ops = 0;
1238 uint64_t orig_bytes = 0;
1239
1240 bufferlist bl;
1241 int r = prepare_multi_write(bl, orig_ops, orig_bytes);
1242 // Don't care about journal full if stoppping, so drop queue and
1243 // possibly let header get written and loop above to notice stop
1244 if (r == -ENOSPC) {
1245 if (write_stop) {
1246 dout(20) << "write_thread_entry full and stopping, throw out queue and finish up" << dendl;
1247 while (!writeq_empty()) {
1248 complete_write(1, peek_write().orig_len);
1249 pop_write();
1250 }
1251 print_header(header);
1252 r = 0;
1253 } else {
1254 dout(20) << "write_thread_entry full, going to sleep (waiting for commit)" << dendl;
9f95a23c 1255 commit_cond.wait(locker);
7c673cae
FG
1256 dout(20) << "write_thread_entry woke up" << dendl;
1257 continue;
1258 }
1259 }
11fdf7f2 1260 ceph_assert(r == 0);
7c673cae
FG
1261
1262 if (logger) {
1263 logger->inc(l_filestore_journal_wr);
1264 logger->inc(l_filestore_journal_wr_bytes, bl.length());
1265 }
1266
1267#ifdef HAVE_LIBAIO
1268 if (aio)
1269 do_aio_write(bl);
1270 else
1271 do_write(bl);
1272#else
1273 do_write(bl);
1274#endif
1275 complete_write(orig_ops, orig_bytes);
1276 }
1277
1278 dout(10) << "write_thread_entry finish" << dendl;
1279}
1280
1281#ifdef HAVE_LIBAIO
1282void FileJournal::do_aio_write(bufferlist& bl)
1283{
1284
1285 if (cct->_conf->journal_write_header_frequency &&
1286 (((++journaled_since_start) %
1287 cct->_conf->journal_write_header_frequency) == 0)) {
1288 must_write_header = true;
1289 }
1290
1291 // nothing to do?
1292 if (bl.length() == 0 && !must_write_header)
1293 return;
1294
1295 buffer::ptr hbp;
1296 if (must_write_header) {
1297 must_write_header = false;
1298 hbp = prepare_header();
1299 }
1300
1301 // entry
1302 off64_t pos = write_pos;
1303
1304 dout(15) << "do_aio_write writing " << pos << "~" << bl.length()
1305 << (hbp.length() ? " + header":"")
1306 << dendl;
1307
1308 // split?
1309 off64_t split = 0;
1310 if (pos + bl.length() > header.max_size) {
1311 bufferlist first, second;
1312 split = header.max_size - pos;
1313 first.substr_of(bl, 0, split);
1314 second.substr_of(bl, split, bl.length() - split);
11fdf7f2 1315 ceph_assert(first.length() + second.length() == bl.length());
7c673cae
FG
1316 dout(10) << "do_aio_write wrapping, first bit at " << pos << "~" << first.length() << dendl;
1317
1318 if (write_aio_bl(pos, first, 0)) {
1319 derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
1320 << ") failed" << dendl;
1321 ceph_abort();
1322 }
11fdf7f2 1323 ceph_assert(pos == header.max_size);
7c673cae
FG
1324 if (hbp.length()) {
1325 // be sneaky: include the header in the second fragment
11fdf7f2
TL
1326 bufferlist tmp;
1327 tmp.push_back(hbp);
1328 tmp.claim_append(second);
1329 second.swap(tmp);
7c673cae
FG
1330 pos = 0; // we included the header
1331 } else
1332 pos = get_top(); // no header, start after that
1333 if (write_aio_bl(pos, second, writing_seq)) {
1334 derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
1335 << ") failed" << dendl;
1336 ceph_abort();
1337 }
1338 } else {
1339 // header too?
1340 if (hbp.length()) {
1341 bufferlist hbl;
1342 hbl.push_back(hbp);
1343 loff_t pos = 0;
1344 if (write_aio_bl(pos, hbl, 0)) {
1345 derr << "FileJournal::do_aio_write: write_aio_bl(header) failed" << dendl;
1346 ceph_abort();
1347 }
1348 }
1349
1350 if (write_aio_bl(pos, bl, writing_seq)) {
1351 derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
1352 << ") failed" << dendl;
1353 ceph_abort();
1354 }
1355 }
1356
1357 write_pos = pos;
1358 if (write_pos == header.max_size)
1359 write_pos = get_top();
11fdf7f2 1360 ceph_assert(write_pos % header.alignment == 0);
7c673cae
FG
1361}
1362
1363/**
1364 * write a buffer using aio
1365 *
1366 * @param seq seq to trigger when this aio completes. if 0, do not update any state
1367 * on completion.
1368 */
1369int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq)
1370{
1371 dout(20) << "write_aio_bl " << pos << "~" << bl.length() << " seq " << seq << dendl;
1372
1373 while (bl.length() > 0) {
11fdf7f2 1374 int max = std::min<int>(bl.get_num_buffers(), IOV_MAX-1);
7c673cae
FG
1375 iovec *iov = new iovec[max];
1376 int n = 0;
1377 unsigned len = 0;
11fdf7f2
TL
1378 for (auto p = std::cbegin(bl.buffers()); n < max; ++p, ++n) {
1379 ceph_assert(p != std::cend(bl.buffers()));
1380 iov[n].iov_base = const_cast<void*>(static_cast<const void*>(p->c_str()));
7c673cae
FG
1381 iov[n].iov_len = p->length();
1382 len += p->length();
1383 }
1384
1385 bufferlist tbl;
1386 bl.splice(0, len, &tbl); // move bytes from bl -> tbl
1387
1388 // lock only aio_queue, current aio, aio_num, aio_bytes, which may be
1389 // modified in check_aio_completion
9f95a23c 1390 aio_lock.lock();
7c673cae
FG
1391 aio_queue.push_back(aio_info(tbl, pos, bl.length() > 0 ? 0 : seq));
1392 aio_info& aio = aio_queue.back();
1393 aio.iov = iov;
1394
1395 io_prep_pwritev(&aio.iocb, fd, aio.iov, n, pos);
1396
1397 dout(20) << "write_aio_bl .. " << aio.off << "~" << aio.len
1398 << " in " << n << dendl;
1399
1400 aio_num++;
1401 aio_bytes += aio.len;
1402
1403 // need to save current aio len to update write_pos later because current
1404 // aio could be ereased from aio_queue once it is done
1405 uint64_t cur_len = aio.len;
1406 // unlock aio_lock because following io_submit might take time to return
9f95a23c 1407 aio_lock.unlock();
7c673cae
FG
1408
1409 iocb *piocb = &aio.iocb;
224ce89b 1410
7c673cae
FG
1411 // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds
1412 int attempts = 16;
1413 int delay = 125;
1414 do {
1415 int r = io_submit(aio_ctx, 1, &piocb);
1416 dout(20) << "write_aio_bl io_submit return value: " << r << dendl;
1417 if (r < 0) {
1418 derr << "io_submit to " << aio.off << "~" << cur_len
1419 << " got " << cpp_strerror(r) << dendl;
1420 if (r == -EAGAIN && attempts-- > 0) {
1421 usleep(delay);
1422 delay *= 2;
1423 continue;
1424 }
1425 check_align(pos, tbl);
11fdf7f2 1426 ceph_abort_msg("io_submit got unexpected error");
7c673cae
FG
1427 } else {
1428 break;
1429 }
1430 } while (true);
1431 pos += cur_len;
1432 }
9f95a23c
TL
1433 aio_lock.lock();
1434 write_finish_cond.notify_all();
1435 aio_lock.unlock();
7c673cae
FG
1436 return 0;
1437}
1438#endif
1439
1440void FileJournal::write_finish_thread_entry()
1441{
1442#ifdef HAVE_LIBAIO
b32b8144 1443 dout(10) << __func__ << " enter" << dendl;
7c673cae
FG
1444 while (true) {
1445 {
9f95a23c 1446 std::unique_lock locker{aio_lock};
7c673cae
FG
1447 if (aio_queue.empty()) {
1448 if (aio_stop)
1449 break;
b32b8144 1450 dout(20) << __func__ << " sleeping" << dendl;
9f95a23c 1451 write_finish_cond.wait(locker);
7c673cae
FG
1452 continue;
1453 }
1454 }
1455
b32b8144 1456 dout(20) << __func__ << " waiting for aio(s)" << dendl;
7c673cae
FG
1457 io_event event[16];
1458 int r = io_getevents(aio_ctx, 1, 16, event, NULL);
1459 if (r < 0) {
1460 if (r == -EINTR) {
1461 dout(0) << "io_getevents got " << cpp_strerror(r) << dendl;
1462 continue;
1463 }
1464 derr << "io_getevents got " << cpp_strerror(r) << dendl;
11fdf7f2
TL
1465 if (r == -EIO) {
1466 note_io_error_event(devname.c_str(), fn.c_str(), -EIO, 0, 0, 0);
1467 }
1468 ceph_abort_msg("got unexpected error from io_getevents");
7c673cae
FG
1469 }
1470
1471 {
9f95a23c 1472 std::lock_guard locker{aio_lock};
7c673cae
FG
1473 for (int i=0; i<r; i++) {
1474 aio_info *ai = (aio_info *)event[i].obj;
1475 if (event[i].res != ai->len) {
1476 derr << "aio to " << ai->off << "~" << ai->len
1477 << " returned: " << (int)event[i].res << dendl;
11fdf7f2 1478 ceph_abort_msg("unexpected aio error");
7c673cae 1479 }
b32b8144 1480 dout(10) << __func__ << " aio " << ai->off
7c673cae
FG
1481 << "~" << ai->len << " done" << dendl;
1482 ai->done = true;
1483 }
1484 check_aio_completion();
1485 }
1486 }
b32b8144 1487 dout(10) << __func__ << " exit" << dendl;
7c673cae
FG
1488#endif
1489}
1490
1491#ifdef HAVE_LIBAIO
1492/**
1493 * check aio_wait for completed aio, and update state appropriately.
1494 */
1495void FileJournal::check_aio_completion()
1496{
9f95a23c 1497 ceph_assert(ceph_mutex_is_locked(aio_lock));
7c673cae
FG
1498 dout(20) << "check_aio_completion" << dendl;
1499
1500 bool completed_something = false, signal = false;
1501 uint64_t new_journaled_seq = 0;
1502
1503 list<aio_info>::iterator p = aio_queue.begin();
1504 while (p != aio_queue.end() && p->done) {
1505 dout(20) << "check_aio_completion completed seq " << p->seq << " "
1506 << p->off << "~" << p->len << dendl;
1507 if (p->seq) {
1508 new_journaled_seq = p->seq;
1509 completed_something = true;
1510 }
1511 aio_num--;
1512 aio_bytes -= p->len;
1513 aio_queue.erase(p++);
1514 signal = true;
1515 }
1516
1517 if (completed_something) {
1518 // kick finisher?
1519 // only if we haven't filled up recently!
9f95a23c 1520 std::lock_guard locker{finisher_lock};
7c673cae
FG
1521 journaled_seq = new_journaled_seq;
1522 if (full_state != FULL_NOTFULL) {
1523 dout(10) << "check_aio_completion NOT queueing finisher seq " << journaled_seq
1524 << ", full_commit_seq|full_restart_seq" << dendl;
1525 } else {
1526 if (plug_journal_completions) {
1527 dout(20) << "check_aio_completion NOT queueing finishers through seq " << journaled_seq
1528 << " due to completion plug" << dendl;
1529 } else {
1530 dout(20) << "check_aio_completion queueing finishers through seq " << journaled_seq << dendl;
1531 queue_completions_thru(journaled_seq);
1532 }
1533 }
1534 }
1535 if (signal) {
1536 // maybe write queue was waiting for aio count to drop?
9f95a23c 1537 aio_cond.notify_all();
7c673cae
FG
1538 }
1539}
1540#endif
1541
1542int FileJournal::prepare_entry(vector<ObjectStore::Transaction>& tls, bufferlist* tbl) {
1543 dout(10) << "prepare_entry " << tls << dendl;
1544 int data_len = cct->_conf->journal_align_min_size - 1;
1545 int data_align = -1; // -1 indicates that we don't care about the alignment
1546 bufferlist bl;
1547 for (vector<ObjectStore::Transaction>::iterator p = tls.begin();
1548 p != tls.end(); ++p) {
1549 if ((int)(*p).get_data_length() > data_len) {
1550 data_len = (*p).get_data_length();
1551 data_align = ((*p).get_data_alignment() - bl.length()) & ~CEPH_PAGE_MASK;
1552 }
11fdf7f2 1553 encode(*p, bl);
7c673cae
FG
1554 }
1555 if (tbl->length()) {
1556 bl.claim_append(*tbl);
1557 }
1558 // add it this entry
1559 entry_header_t h;
1560 unsigned head_size = sizeof(entry_header_t);
1561 off64_t base_size = 2*head_size + bl.length();
1562 memset(&h, 0, sizeof(h));
1563 if (data_align >= 0)
1564 h.pre_pad = ((unsigned int)data_align - (unsigned int)head_size) & ~CEPH_PAGE_MASK;
11fdf7f2 1565 off64_t size = round_up_to(base_size + h.pre_pad, header.alignment);
7c673cae
FG
1566 unsigned post_pad = size - base_size - h.pre_pad;
1567 h.len = bl.length();
1568 h.post_pad = post_pad;
1569 h.crc32c = bl.crc32c(0);
1570 dout(10) << " len " << bl.length() << " -> " << size
1571 << " (head " << head_size << " pre_pad " << h.pre_pad
1572 << " bl " << bl.length() << " post_pad " << post_pad << " tail " << head_size << ")"
1573 << " (bl alignment " << data_align << ")"
1574 << dendl;
1575 bufferlist ebl;
1576 // header
1577 ebl.append((const char*)&h, sizeof(h));
1578 if (h.pre_pad) {
1579 ebl.push_back(buffer::create_static(h.pre_pad, zero_buf));
1580 }
1581 // payload
9f95a23c 1582 ebl.claim_append(bl);
7c673cae
FG
1583 if (h.post_pad) {
1584 ebl.push_back(buffer::create_static(h.post_pad, zero_buf));
1585 }
1586 // footer
1587 ebl.append((const char*)&h, sizeof(h));
1588 if (directio)
1589 ebl.rebuild_aligned(CEPH_DIRECTIO_ALIGNMENT);
1590 tbl->claim(ebl);
1591 return h.len;
1592}
1593
1594void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len,
1595 Context *oncommit, TrackedOpRef osd_op)
1596{
1597 // dump on queue
1598 dout(5) << "submit_entry seq " << seq
1599 << " len " << e.length()
1600 << " (" << oncommit << ")" << dendl;
11fdf7f2
TL
1601 ceph_assert(e.length() > 0);
1602 ceph_assert(e.length() < header.max_size);
7c673cae 1603
7c673cae
FG
1604 if (logger) {
1605 logger->inc(l_filestore_journal_queue_bytes, orig_len);
1606 logger->inc(l_filestore_journal_queue_ops, 1);
1607 }
1608
1609 throttle.register_throttle_seq(seq, e.length());
1610 if (logger) {
1611 logger->inc(l_filestore_journal_ops, 1);
1612 logger->inc(l_filestore_journal_bytes, e.length());
1613 }
1614
1615 if (osd_op) {
1616 osd_op->mark_event("commit_queued_for_journal_write");
1617 if (osd_op->store_trace) {
1618 osd_op->journal_trace.init("journal", &trace_endpoint, &osd_op->store_trace);
1619 osd_op->journal_trace.event("submit_entry");
1620 osd_op->journal_trace.keyval("seq", seq);
1621 }
1622 }
1623 {
9f95a23c 1624 std::lock_guard l1{writeq_lock};
7c673cae 1625#ifdef HAVE_LIBAIO
9f95a23c 1626 std::lock_guard l2{aio_lock};
7c673cae 1627#endif
9f95a23c 1628 std::lock_guard l3{completions_lock};
7c673cae
FG
1629
1630#ifdef HAVE_LIBAIO
1631 aio_write_queue_ops++;
1632 aio_write_queue_bytes += e.length();
9f95a23c 1633 aio_cond.notify_all();
7c673cae
FG
1634#endif
1635
1636 completions.push_back(
1637 completion_item(
1638 seq, oncommit, ceph_clock_now(), osd_op));
1639 if (writeq.empty())
9f95a23c 1640 writeq_cond.notify_all();
7c673cae
FG
1641 writeq.push_back(write_item(seq, e, orig_len, osd_op));
1642 if (osd_op)
1643 osd_op->journal_trace.keyval("queue depth", writeq.size());
1644 }
1645}
1646
1647bool FileJournal::writeq_empty()
1648{
9f95a23c 1649 std::lock_guard locker{writeq_lock};
7c673cae
FG
1650 return writeq.empty();
1651}
1652
1653FileJournal::write_item &FileJournal::peek_write()
1654{
9f95a23c
TL
1655 ceph_assert(ceph_mutex_is_locked(write_lock));
1656 std::lock_guard locker{writeq_lock};
7c673cae
FG
1657 return writeq.front();
1658}
1659
1660void FileJournal::pop_write()
1661{
9f95a23c
TL
1662 ceph_assert(ceph_mutex_is_locked(write_lock));
1663 std::lock_guard locker{writeq_lock};
7c673cae
FG
1664 if (logger) {
1665 logger->dec(l_filestore_journal_queue_bytes, writeq.front().orig_len);
1666 logger->dec(l_filestore_journal_queue_ops, 1);
1667 }
1668 writeq.pop_front();
1669}
1670
1671void FileJournal::batch_pop_write(list<write_item> &items)
1672{
9f95a23c 1673 ceph_assert(ceph_mutex_is_locked(write_lock));
7c673cae 1674 {
9f95a23c 1675 std::lock_guard locker{writeq_lock};
7c673cae
FG
1676 writeq.swap(items);
1677 }
1678 for (auto &&i : items) {
1679 if (logger) {
1680 logger->dec(l_filestore_journal_queue_bytes, i.orig_len);
1681 logger->dec(l_filestore_journal_queue_ops, 1);
1682 }
1683 }
1684}
1685
1686void FileJournal::batch_unpop_write(list<write_item> &items)
1687{
9f95a23c 1688 ceph_assert(ceph_mutex_is_locked(write_lock));
7c673cae
FG
1689 for (auto &&i : items) {
1690 if (logger) {
1691 logger->inc(l_filestore_journal_queue_bytes, i.orig_len);
1692 logger->inc(l_filestore_journal_queue_ops, 1);
1693 }
1694 }
9f95a23c 1695 std::lock_guard locker{writeq_lock};
7c673cae
FG
1696 writeq.splice(writeq.begin(), items);
1697}
1698
1699void FileJournal::commit_start(uint64_t seq)
1700{
1701 dout(10) << "commit_start" << dendl;
1702
1703 // was full?
1704 switch (full_state) {
1705 case FULL_NOTFULL:
1706 break; // all good
1707
1708 case FULL_FULL:
1709 if (seq >= journaled_seq) {
1710 dout(1) << " FULL_FULL -> FULL_WAIT. commit_start on seq "
1711 << seq << " > journaled_seq " << journaled_seq
1712 << ", moving to FULL_WAIT."
1713 << dendl;
1714 full_state = FULL_WAIT;
1715 } else {
1716 dout(1) << "FULL_FULL commit_start on seq "
1717 << seq << " < journaled_seq " << journaled_seq
1718 << ", remaining in FULL_FULL"
1719 << dendl;
1720 }
1721 break;
1722
1723 case FULL_WAIT:
1724 dout(1) << " FULL_WAIT -> FULL_NOTFULL. journal now active, setting completion plug." << dendl;
1725 full_state = FULL_NOTFULL;
1726 plug_journal_completions = true;
1727 break;
1728 }
1729}
1730
1731/*
1732 *send discard command to joural block deivce
1733 */
1734void FileJournal::do_discard(int64_t offset, int64_t end)
1735{
11fdf7f2 1736 dout(10) << __func__ << " trim(" << offset << ", " << end << dendl;
7c673cae 1737
11fdf7f2 1738 offset = round_up_to(offset, block_size);
7c673cae
FG
1739 if (offset >= end)
1740 return;
11fdf7f2
TL
1741 end = round_up_to(end - block_size, block_size);
1742 ceph_assert(end >= offset);
1743 if (offset < end) {
1744 BlkDev blkdev(fd);
1745 if (blkdev.discard(offset, end - offset) < 0) {
7c673cae 1746 dout(1) << __func__ << "ioctl(BLKDISCARD) error:" << cpp_strerror(errno) << dendl;
11fdf7f2
TL
1747 }
1748 }
7c673cae
FG
1749}
1750
1751void FileJournal::committed_thru(uint64_t seq)
1752{
9f95a23c 1753 std::lock_guard locker{write_lock};
7c673cae
FG
1754
1755 auto released = throttle.flush(seq);
1756 if (logger) {
1757 logger->dec(l_filestore_journal_ops, released.first);
1758 logger->dec(l_filestore_journal_bytes, released.second);
1759 }
1760
1761 if (seq < last_committed_seq) {
1762 dout(5) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl;
11fdf7f2 1763 ceph_assert(seq >= last_committed_seq);
7c673cae
FG
1764 return;
1765 }
1766 if (seq == last_committed_seq) {
1767 dout(5) << "committed_thru " << seq << " == last_committed_seq " << last_committed_seq << dendl;
1768 return;
1769 }
1770
1771 dout(5) << "committed_thru " << seq << " (last_committed_seq " << last_committed_seq << ")" << dendl;
1772 last_committed_seq = seq;
1773
1774 // completions!
1775 {
9f95a23c 1776 std::lock_guard locker{finisher_lock};
7c673cae
FG
1777 queue_completions_thru(seq);
1778 if (plug_journal_completions && seq >= header.start_seq) {
1779 dout(10) << " removing completion plug, queuing completions thru journaled_seq " << journaled_seq << dendl;
1780 plug_journal_completions = false;
1781 queue_completions_thru(journaled_seq);
1782 }
1783 }
1784
1785 // adjust start pointer
1786 while (!journalq.empty() && journalq.front().first <= seq) {
1787 journalq.pop_front();
1788 }
1789
1790 int64_t old_start = header.start;
1791 if (!journalq.empty()) {
1792 header.start = journalq.front().second;
1793 header.start_seq = journalq.front().first;
1794 } else {
1795 header.start = write_pos;
1796 header.start_seq = seq + 1;
1797 }
1798
1799 if (discard) {
1800 dout(10) << __func__ << " will trim (" << old_start << ", " << header.start << ")" << dendl;
1801 if (old_start < header.start)
1802 do_discard(old_start, header.start - 1);
1803 else {
1804 do_discard(old_start, header.max_size - 1);
1805 do_discard(get_top(), header.start - 1);
1806 }
1807 }
1808
1809 must_write_header = true;
1810 print_header(header);
1811
1812 // committed but unjournaled items
1813 while (!writeq_empty() && peek_write().seq <= seq) {
1814 dout(15) << " dropping committed but unwritten seq " << peek_write().seq
1815 << " len " << peek_write().bl.length()
1816 << dendl;
1817 complete_write(1, peek_write().orig_len);
1818 pop_write();
1819 }
1820
9f95a23c 1821 commit_cond.notify_all();
7c673cae
FG
1822
1823 dout(10) << "committed_thru done" << dendl;
1824}
1825
1826
1827void FileJournal::complete_write(uint64_t ops, uint64_t bytes)
1828{
1829 dout(5) << __func__ << " finished " << ops << " ops and "
1830 << bytes << " bytes" << dendl;
1831}
1832
1833int FileJournal::make_writeable()
1834{
1835 dout(10) << __func__ << dendl;
1836 int r = set_throttle_params();
1837 if (r < 0)
1838 return r;
1839
1840 r = _open(true);
1841 if (r < 0)
1842 return r;
1843
1844 if (read_pos > 0)
1845 write_pos = read_pos;
1846 else
1847 write_pos = get_top();
1848 read_pos = 0;
1849
1850 must_write_header = true;
1851
1852 start_writer();
1853 return 0;
1854}
1855
1856int FileJournal::set_throttle_params()
1857{
1858 stringstream ss;
1859 bool valid = throttle.set_params(
1860 cct->_conf->journal_throttle_low_threshhold,
1861 cct->_conf->journal_throttle_high_threshhold,
1862 cct->_conf->filestore_expected_throughput_bytes,
1863 cct->_conf->journal_throttle_high_multiple,
1864 cct->_conf->journal_throttle_max_multiple,
1865 header.max_size - get_top(),
1866 &ss);
1867
1868 if (!valid) {
1869 derr << "tried to set invalid params: "
1870 << ss.str()
1871 << dendl;
1872 }
1873 return valid ? 0 : -EINVAL;
1874}
1875
1876const char** FileJournal::get_tracked_conf_keys() const
1877{
1878 static const char *KEYS[] = {
1879 "journal_throttle_low_threshhold",
1880 "journal_throttle_high_threshhold",
1881 "journal_throttle_high_multiple",
1882 "journal_throttle_max_multiple",
1883 "filestore_expected_throughput_bytes",
1884 NULL};
1885 return KEYS;
1886}
1887
1888void FileJournal::wrap_read_bl(
1889 off64_t pos,
1890 int64_t olen,
1891 bufferlist* bl,
1892 off64_t *out_pos
1893 ) const
1894{
1895 while (olen > 0) {
1896 while (pos >= header.max_size)
1897 pos = pos + get_top() - header.max_size;
1898
1899 int64_t len;
1900 if (pos + olen > header.max_size)
1901 len = header.max_size - pos; // partial
1902 else
1903 len = olen; // rest
1904
1905 int64_t actual = ::lseek64(fd, pos, SEEK_SET);
11fdf7f2 1906 ceph_assert(actual == pos);
7c673cae
FG
1907
1908 bufferptr bp = buffer::create(len);
1909 int r = safe_read_exact(fd, bp.c_str(), len);
1910 if (r) {
1911 derr << "FileJournal::wrap_read_bl: safe_read_exact " << pos << "~" << len << " returned "
11fdf7f2 1912 << cpp_strerror(r) << dendl;
7c673cae
FG
1913 ceph_abort();
1914 }
1915 bl->push_back(std::move(bp));
1916 pos += len;
1917 olen -= len;
1918 }
1919 if (pos >= header.max_size)
1920 pos = pos + get_top() - header.max_size;
1921 if (out_pos)
1922 *out_pos = pos;
1923}
1924
1925bool FileJournal::read_entry(
1926 bufferlist &bl,
1927 uint64_t &next_seq,
1928 bool *corrupt)
1929{
1930 if (corrupt)
1931 *corrupt = false;
1932 uint64_t seq = next_seq;
1933
1934 if (!read_pos) {
1935 dout(2) << "read_entry -- not readable" << dendl;
1936 return false;
1937 }
1938
1939 off64_t pos = read_pos;
1940 off64_t next_pos = pos;
1941 stringstream ss;
1942 read_entry_result result = do_read_entry(
1943 pos,
1944 &next_pos,
1945 &bl,
1946 &seq,
1947 &ss);
1948 if (result == SUCCESS) {
1949 journalq.push_back( pair<uint64_t,off64_t>(seq, pos));
1950 uint64_t amount_to_take =
1951 next_pos > pos ?
1952 next_pos - pos :
1953 (header.max_size - pos) + (next_pos - get_top());
1954 throttle.take(amount_to_take);
1955 throttle.register_throttle_seq(next_seq, amount_to_take);
1956 if (logger) {
1957 logger->inc(l_filestore_journal_ops, 1);
1958 logger->inc(l_filestore_journal_bytes, amount_to_take);
1959 }
1960 if (next_seq > seq) {
1961 return false;
1962 } else {
1963 read_pos = next_pos;
1964 next_seq = seq;
1965 if (seq > journaled_seq)
1966 journaled_seq = seq;
1967 return true;
1968 }
3efd9988
FG
1969 } else {
1970 derr << "do_read_entry(" << pos << "): " << ss.str() << dendl;
7c673cae
FG
1971 }
1972
1973 if (seq && seq < header.committed_up_to) {
1974 derr << "Unable to read past sequence " << seq
1975 << " but header indicates the journal has committed up through "
1976 << header.committed_up_to << ", journal is corrupt" << dendl;
1977 if (cct->_conf->journal_ignore_corruption) {
1978 if (corrupt)
1979 *corrupt = true;
1980 return false;
1981 } else {
1982 ceph_abort();
1983 }
1984 }
1985
7c673cae
FG
1986 dout(2) << "No further valid entries found, journal is most likely valid"
1987 << dendl;
1988 return false;
1989}
1990
1991FileJournal::read_entry_result FileJournal::do_read_entry(
1992 off64_t init_pos,
1993 off64_t *next_pos,
1994 bufferlist *bl,
1995 uint64_t *seq,
1996 ostream *ss,
1997 entry_header_t *_h) const
1998{
1999 off64_t cur_pos = init_pos;
2000 bufferlist _bl;
2001 if (!bl)
2002 bl = &_bl;
2003
2004 // header
2005 entry_header_t *h;
2006 bufferlist hbl;
2007 off64_t _next_pos;
2008 wrap_read_bl(cur_pos, sizeof(*h), &hbl, &_next_pos);
2009 h = reinterpret_cast<entry_header_t *>(hbl.c_str());
2010
2011 if (!h->check_magic(cur_pos, header.get_fsid64())) {
2012 dout(25) << "read_entry " << init_pos
2013 << " : bad header magic, end of journal" << dendl;
2014 if (ss)
2015 *ss << "bad header magic";
2016 if (next_pos)
2017 *next_pos = init_pos + (4<<10); // check 4k ahead
2018 return MAYBE_CORRUPT;
2019 }
2020 cur_pos = _next_pos;
2021
2022 // pad + body + pad
2023 if (h->pre_pad)
2024 cur_pos += h->pre_pad;
2025
2026 bl->clear();
2027 wrap_read_bl(cur_pos, h->len, bl, &cur_pos);
2028
2029 if (h->post_pad)
2030 cur_pos += h->post_pad;
2031
2032 // footer
2033 entry_header_t *f;
2034 bufferlist fbl;
2035 wrap_read_bl(cur_pos, sizeof(*f), &fbl, &cur_pos);
2036 f = reinterpret_cast<entry_header_t *>(fbl.c_str());
2037 if (memcmp(f, h, sizeof(*f))) {
2038 if (ss)
2039 *ss << "bad footer magic, partial entry";
2040 if (next_pos)
2041 *next_pos = cur_pos;
2042 return MAYBE_CORRUPT;
2043 }
2044
2045 if ((header.flags & header_t::FLAG_CRC) || // if explicitly enabled (new journal)
2046 h->crc32c != 0) { // newer entry in old journal
2047 uint32_t actual_crc = bl->crc32c(0);
2048 if (actual_crc != h->crc32c) {
2049 if (ss)
2050 *ss << "header crc (" << h->crc32c
2051 << ") doesn't match body crc (" << actual_crc << ")";
2052 if (next_pos)
2053 *next_pos = cur_pos;
2054 return MAYBE_CORRUPT;
2055 }
2056 }
2057
2058 // yay!
2059 dout(2) << "read_entry " << init_pos << " : seq " << h->seq
2060 << " " << h->len << " bytes"
2061 << dendl;
2062
2063 // ok!
2064 if (seq)
2065 *seq = h->seq;
2066
2067
2068 if (next_pos)
2069 *next_pos = cur_pos;
2070
2071 if (_h)
2072 *_h = *h;
2073
11fdf7f2 2074 ceph_assert(cur_pos % header.alignment == 0);
7c673cae
FG
2075 return SUCCESS;
2076}
2077
2078void FileJournal::reserve_throttle_and_backoff(uint64_t count)
2079{
2080 throttle.get(count);
2081}
2082
2083void FileJournal::get_header(
2084 uint64_t wanted_seq,
2085 off64_t *_pos,
2086 entry_header_t *h)
2087{
2088 off64_t pos = header.start;
2089 off64_t next_pos = pos;
2090 bufferlist bl;
2091 uint64_t seq = 0;
2092 dout(2) << __func__ << dendl;
2093 while (1) {
2094 bl.clear();
2095 pos = next_pos;
2096 read_entry_result result = do_read_entry(
2097 pos,
2098 &next_pos,
2099 &bl,
2100 &seq,
2101 0,
2102 h);
2103 if (result == FAILURE || result == MAYBE_CORRUPT)
2104 ceph_abort();
2105 if (seq == wanted_seq) {
2106 if (_pos)
2107 *_pos = pos;
2108 return;
2109 }
2110 }
2111 ceph_abort(); // not reachable
2112}
2113
2114void FileJournal::corrupt(
2115 int wfd,
2116 off64_t corrupt_at)
2117{
2118 dout(2) << __func__ << dendl;
2119 if (corrupt_at >= header.max_size)
2120 corrupt_at = corrupt_at + get_top() - header.max_size;
2121
2122 int64_t actual = ::lseek64(fd, corrupt_at, SEEK_SET);
11fdf7f2 2123 ceph_assert(actual == corrupt_at);
7c673cae
FG
2124
2125 char buf[10];
2126 int r = safe_read_exact(fd, buf, 1);
11fdf7f2 2127 ceph_assert(r == 0);
7c673cae
FG
2128
2129 actual = ::lseek64(wfd, corrupt_at, SEEK_SET);
11fdf7f2 2130 ceph_assert(actual == corrupt_at);
7c673cae
FG
2131
2132 buf[0]++;
2133 r = safe_write(wfd, buf, 1);
11fdf7f2 2134 ceph_assert(r == 0);
7c673cae
FG
2135}
2136
2137void FileJournal::corrupt_payload(
2138 int wfd,
2139 uint64_t seq)
2140{
2141 dout(2) << __func__ << dendl;
2142 off64_t pos = 0;
2143 entry_header_t h;
2144 get_header(seq, &pos, &h);
2145 off64_t corrupt_at =
2146 pos + sizeof(entry_header_t) + h.pre_pad;
2147 corrupt(wfd, corrupt_at);
2148}
2149
2150
2151void FileJournal::corrupt_footer_magic(
2152 int wfd,
2153 uint64_t seq)
2154{
2155 dout(2) << __func__ << dendl;
2156 off64_t pos = 0;
2157 entry_header_t h;
2158 get_header(seq, &pos, &h);
2159 off64_t corrupt_at =
2160 pos + sizeof(entry_header_t) + h.pre_pad +
2161 h.len + h.post_pad +
2162 (reinterpret_cast<char*>(&h.magic2) - reinterpret_cast<char*>(&h));
2163 corrupt(wfd, corrupt_at);
2164}
2165
2166
2167void FileJournal::corrupt_header_magic(
2168 int wfd,
2169 uint64_t seq)
2170{
2171 dout(2) << __func__ << dendl;
2172 off64_t pos = 0;
2173 entry_header_t h;
2174 get_header(seq, &pos, &h);
2175 off64_t corrupt_at =
2176 pos +
2177 (reinterpret_cast<char*>(&h.magic2) - reinterpret_cast<char*>(&h));
2178 corrupt(wfd, corrupt_at);
2179}
2180
2181off64_t FileJournal::get_journal_size_estimate()
2182{
2183 off64_t size, start = header.start;
2184 if (write_pos < start) {
2185 size = (max_size - start) + write_pos;
2186 } else {
2187 size = write_pos - start;
2188 }
2189 dout(20) << __func__ << " journal size=" << size << dendl;
2190 return size;
2191}
11fdf7f2
TL
2192
2193void FileJournal::get_devices(set<string> *ls)
2194{
2195 string dev_node;
2196 BlkDev blkdev(fd);
2197 if (int rc = blkdev.wholedisk(&dev_node); rc) {
2198 return;
2199 }
2200 get_raw_devices(dev_node, ls);
2201}
2202
2203void FileJournal::collect_metadata(map<string,string> *pm)
2204{
2205 BlkDev blkdev(fd);
2206 char partition_path[PATH_MAX];
2207 char dev_node[PATH_MAX];
2208 if (blkdev.partition(partition_path, PATH_MAX)) {
2209 (*pm)["backend_filestore_journal_partition_path"] = "unknown";
2210 } else {
2211 (*pm)["backend_filestore_journal_partition_path"] = string(partition_path);
2212 }
2213 if (blkdev.wholedisk(dev_node, PATH_MAX)) {
2214 (*pm)["backend_filestore_journal_dev_node"] = "unknown";
2215 } else {
2216 (*pm)["backend_filestore_journal_dev_node"] = string(dev_node);
2217 devname = dev_node;
2218 }
2219}