]> git.proxmox.com Git - ceph.git/blame - ceph/src/os/filestore/FileJournal.cc
buildsys: change download over to reef release
[ceph.git] / ceph / src / os / filestore / FileJournal.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14#include "acconfig.h"
15
16#include "common/debug.h"
17#include "common/errno.h"
18#include "common/safe_io.h"
19#include "FileJournal.h"
20#include "include/color.h"
21#include "common/perf_counters.h"
22#include "FileStore.h"
23
24#include "include/compat.h"
25
26#include <fcntl.h>
27#include <limits.h>
28#include <sstream>
29#include <stdio.h>
30#include <stdlib.h>
31#include <sys/types.h>
32#include <sys/stat.h>
33#include <sys/mount.h>
34
35#include "common/blkdev.h"
36#if defined(__linux__)
37#include "common/linux_version.h"
38#endif
39
40#if defined(__FreeBSD__)
41#define O_DSYNC O_SYNC
42#endif
43
44#define dout_context cct
45#define dout_subsys ceph_subsys_journal
46#undef dout_prefix
47#define dout_prefix *_dout << "journal "
48
f67539c2
TL
49using std::list;
50using std::map;
51using std::ostream;
52using std::ostringstream;
53using std::pair;
54using std::set;
55using std::string;
56using std::stringstream;
57using std::vector;
58
59using ceph::bufferlist;
60using ceph::bufferptr;
61using ceph::Formatter;
62using ceph::JSONFormatter;
63
7c673cae
FG
64const static int64_t ONE_MEG(1 << 20);
65const static int CEPH_DIRECTIO_ALIGNMENT(4096);
66
67
68int FileJournal::_open(bool forwrite, bool create)
69{
70 int flags, ret;
71
72 if (forwrite) {
73 flags = O_RDWR;
74 if (directio)
75 flags |= O_DIRECT | O_DSYNC;
76 } else {
77 flags = O_RDONLY;
78 }
79 if (create)
80 flags |= O_CREAT;
81
82 if (fd >= 0) {
83 if (TEMP_FAILURE_RETRY(::close(fd))) {
84 int err = errno;
85 derr << "FileJournal::_open: error closing old fd: "
86 << cpp_strerror(err) << dendl;
87 }
88 }
91327a77 89 fd = TEMP_FAILURE_RETRY(::open(fn.c_str(), flags|O_CLOEXEC, 0644));
7c673cae
FG
90 if (fd < 0) {
91 int err = errno;
92 dout(2) << "FileJournal::_open unable to open journal "
93 << fn << ": " << cpp_strerror(err) << dendl;
94 return -err;
95 }
96
97 struct stat st;
98 ret = ::fstat(fd, &st);
99 if (ret) {
100 ret = errno;
101 derr << "FileJournal::_open: unable to fstat journal: " << cpp_strerror(ret) << dendl;
102 ret = -ret;
103 goto out_fd;
104 }
105
106 if (S_ISBLK(st.st_mode)) {
107 ret = _open_block_device();
108 } else if (S_ISREG(st.st_mode)) {
109 if (aio && !force_aio) {
110 derr << "FileJournal::_open: disabling aio for non-block journal. Use "
111 << "journal_force_aio to force use of aio anyway" << dendl;
112 aio = false;
113 }
114 ret = _open_file(st.st_size, st.st_blksize, create);
115 } else {
116 derr << "FileJournal::_open: wrong journal file type: " << st.st_mode
117 << dendl;
118 ret = -EINVAL;
119 }
120
121 if (ret)
122 goto out_fd;
123
124#ifdef HAVE_LIBAIO
125 if (aio) {
126 aio_ctx = 0;
127 ret = io_setup(128, &aio_ctx);
128 if (ret < 0) {
129 switch (ret) {
130 // Contrary to naive expectations -EAGIAN means ...
131 case -EAGAIN:
132 derr << "FileJournal::_open: user's limit of aio events exceeded. "
133 << "Try increasing /proc/sys/fs/aio-max-nr" << dendl;
134 break;
135 default:
136 derr << "FileJournal::_open: unable to setup io_context " << cpp_strerror(-ret) << dendl;
137 break;
138 }
139 goto out_fd;
140 }
141 }
142#endif
143
144 /* We really want max_size to be a multiple of block_size. */
145 max_size -= max_size % block_size;
146
147 dout(1) << "_open " << fn << " fd " << fd
148 << ": " << max_size
149 << " bytes, block size " << block_size
150 << " bytes, directio = " << directio
151 << ", aio = " << aio
152 << dendl;
153 return 0;
154
155 out_fd:
156 VOID_TEMP_FAILURE_RETRY(::close(fd));
157 fd = -1;
158 return ret;
159}
160
161int FileJournal::_open_block_device()
162{
163 int64_t bdev_sz = 0;
11fdf7f2
TL
164 BlkDev blkdev(fd);
165 int ret = blkdev.get_size(&bdev_sz);
7c673cae
FG
166 if (ret) {
167 dout(0) << __func__ << ": failed to read block device size." << dendl;
168 return -EIO;
169 }
170
171 /* Check for bdev_sz too small */
172 if (bdev_sz < ONE_MEG) {
173 dout(0) << __func__ << ": your block device must be at least "
174 << ONE_MEG << " bytes to be used for a Ceph journal." << dendl;
175 return -EINVAL;
176 }
177
178 dout(10) << __func__ << ": ignoring osd journal size. "
179 << "We'll use the entire block device (size: " << bdev_sz << ")"
180 << dendl;
181 max_size = bdev_sz;
182
183 block_size = cct->_conf->journal_block_size;
184
185 if (cct->_conf->journal_discard) {
11fdf7f2 186 discard = blkdev.support_discard();
7c673cae
FG
187 dout(10) << fn << " support discard: " << (int)discard << dendl;
188 }
189
190 return 0;
191}
192
193int FileJournal::_open_file(int64_t oldsize, blksize_t blksize,
194 bool create)
195{
196 int ret;
197 int64_t conf_journal_sz(cct->_conf->osd_journal_size);
198 conf_journal_sz <<= 20;
199
200 if ((cct->_conf->osd_journal_size == 0) && (oldsize < ONE_MEG)) {
201 derr << "I'm sorry, I don't know how large of a journal to create."
202 << "Please specify a block device to use as the journal OR "
203 << "set osd_journal_size in your ceph.conf" << dendl;
204 return -EINVAL;
205 }
206
207 if (create && (oldsize < conf_journal_sz)) {
208 uint64_t newsize(conf_journal_sz);
209 dout(10) << __func__ << " _open extending to " << newsize << " bytes" << dendl;
210 ret = ::ftruncate(fd, newsize);
211 if (ret < 0) {
212 int err = errno;
213 derr << "FileJournal::_open_file : unable to extend journal to "
214 << newsize << " bytes: " << cpp_strerror(err) << dendl;
215 return -err;
216 }
28e407b8 217 ret = ceph_posix_fallocate(fd, 0, newsize);
7c673cae
FG
218 if (ret) {
219 derr << "FileJournal::_open_file : unable to preallocation journal to "
220 << newsize << " bytes: " << cpp_strerror(ret) << dendl;
221 return -ret;
222 }
223 max_size = newsize;
7c673cae
FG
224 }
225 else {
226 max_size = oldsize;
227 }
228 block_size = cct->_conf->journal_block_size;
229
230 if (create && cct->_conf->journal_zero_on_create) {
231 derr << "FileJournal::_open_file : zeroing journal" << dendl;
232 uint64_t write_size = 1 << 20;
233 char *buf;
234 ret = ::posix_memalign((void **)&buf, block_size, write_size);
235 if (ret != 0) {
236 return -ret;
237 }
238 memset(static_cast<void*>(buf), 0, write_size);
239 uint64_t i = 0;
240 for (; (i + write_size) <= (uint64_t)max_size; i += write_size) {
241 ret = ::pwrite(fd, static_cast<void*>(buf), write_size, i);
242 if (ret < 0) {
f67539c2 243 aligned_free(buf);
7c673cae
FG
244 return -errno;
245 }
246 }
247 if (i < (uint64_t)max_size) {
248 ret = ::pwrite(fd, static_cast<void*>(buf), max_size - i, i);
249 if (ret < 0) {
f67539c2 250 aligned_free(buf);
7c673cae
FG
251 return -errno;
252 }
253 }
f67539c2 254 aligned_free(buf);
7c673cae
FG
255 }
256
257
258 dout(10) << "_open journal is not a block device, NOT checking disk "
259 << "write cache on '" << fn << "'" << dendl;
260
261 return 0;
262}
263
264// This can not be used on an active journal
265int FileJournal::check()
266{
267 int ret;
268
11fdf7f2 269 ceph_assert(fd == -1);
7c673cae
FG
270 ret = _open(false, false);
271 if (ret)
272 return ret;
273
274 ret = read_header(&header);
275 if (ret < 0)
276 goto done;
277
278 if (header.fsid != fsid) {
279 derr << "check: ondisk fsid " << header.fsid << " doesn't match expected " << fsid
280 << ", invalid (someone else's?) journal" << dendl;
281 ret = -EINVAL;
282 goto done;
283 }
284
285 dout(1) << "check: header looks ok" << dendl;
286 ret = 0;
287
288 done:
289 close();
290 return ret;
291}
292
293
294int FileJournal::create()
295{
296 void *buf = 0;
297 int64_t needed_space;
298 int ret;
f67539c2 299 ceph::buffer::ptr bp;
7c673cae
FG
300 dout(2) << "create " << fn << " fsid " << fsid << dendl;
301
302 ret = _open(true, true);
303 if (ret)
304 goto done;
305
306 // write empty header
307 header = header_t();
308 header.flags = header_t::FLAG_CRC; // enable crcs on any new journal.
309 header.fsid = fsid;
310 header.max_size = max_size;
311 header.block_size = block_size;
312 if (cct->_conf->journal_block_align || directio)
313 header.alignment = block_size;
314 else
315 header.alignment = 16; // at least stay word aligned on 64bit machines...
316
317 header.start = get_top();
318 header.start_seq = 0;
319
320 print_header(header);
321
322 // static zeroed buffer for alignment padding
323 delete [] zero_buf;
324 zero_buf = new char[header.alignment];
325 memset(zero_buf, 0, header.alignment);
326
327 bp = prepare_header();
328 if (TEMP_FAILURE_RETRY(::pwrite(fd, bp.c_str(), bp.length(), 0)) < 0) {
329 ret = -errno;
330 derr << "FileJournal::create : create write header error "
331 << cpp_strerror(ret) << dendl;
332 goto close_fd;
333 }
334
335 // zero first little bit, too.
336 ret = posix_memalign(&buf, block_size, block_size);
337 if (ret) {
338 ret = -ret;
339 derr << "FileJournal::create: failed to allocate " << block_size
340 << " bytes of memory: " << cpp_strerror(ret) << dendl;
341 goto close_fd;
342 }
343 memset(buf, 0, block_size);
344 if (TEMP_FAILURE_RETRY(::pwrite(fd, buf, block_size, get_top())) < 0) {
345 ret = -errno;
346 derr << "FileJournal::create: error zeroing first " << block_size
347 << " bytes " << cpp_strerror(ret) << dendl;
348 goto free_buf;
349 }
350
11fdf7f2 351 needed_space = cct->_conf->osd_max_write_size << 20;
7c673cae
FG
352 needed_space += (2 * sizeof(entry_header_t)) + get_top();
353 if (header.max_size - header.start < needed_space) {
354 derr << "FileJournal::create: OSD journal is not large enough to hold "
355 << "osd_max_write_size bytes!" << dendl;
356 ret = -ENOSPC;
357 goto free_buf;
358 }
359
360 dout(2) << "create done" << dendl;
361 ret = 0;
362
363free_buf:
364 free(buf);
365 buf = 0;
366close_fd:
367 if (TEMP_FAILURE_RETRY(::close(fd)) < 0) {
368 ret = -errno;
369 derr << "FileJournal::create: error closing fd: " << cpp_strerror(ret)
370 << dendl;
371 }
372done:
373 fd = -1;
374 return ret;
375}
376
377// This can not be used on an active journal
378int FileJournal::peek_fsid(uuid_d& fsid)
379{
11fdf7f2 380 ceph_assert(fd == -1);
7c673cae
FG
381 int r = _open(false, false);
382 if (r)
383 return r;
384 r = read_header(&header);
385 if (r < 0)
386 goto out;
387 fsid = header.fsid;
388out:
389 close();
390 return r;
391}
392
393int FileJournal::open(uint64_t fs_op_seq)
394{
395 dout(2) << "open " << fn << " fsid " << fsid << " fs_op_seq " << fs_op_seq << dendl;
396
397 uint64_t next_seq = fs_op_seq + 1;
224ce89b 398 uint64_t seq = -1;
7c673cae
FG
399
400 int err = _open(false);
401 if (err)
402 return err;
403
404 // assume writeable, unless...
405 read_pos = 0;
406 write_pos = get_top();
407
408 // read header?
409 err = read_header(&header);
410 if (err < 0)
224ce89b 411 goto out;
7c673cae
FG
412
413 // static zeroed buffer for alignment padding
414 delete [] zero_buf;
415 zero_buf = new char[header.alignment];
416 memset(zero_buf, 0, header.alignment);
417
418 dout(10) << "open header.fsid = " << header.fsid
419 //<< " vs expected fsid = " << fsid
420 << dendl;
421 if (header.fsid != fsid) {
422 derr << "FileJournal::open: ondisk fsid " << header.fsid << " doesn't match expected " << fsid
423 << ", invalid (someone else's?) journal" << dendl;
224ce89b
WB
424 err = -EINVAL;
425 goto out;
7c673cae
FG
426 }
427 if (header.max_size > max_size) {
428 dout(2) << "open journal size " << header.max_size << " > current " << max_size << dendl;
224ce89b
WB
429 err = -EINVAL;
430 goto out;
7c673cae
FG
431 }
432 if (header.block_size != block_size) {
433 dout(2) << "open journal block size " << header.block_size << " != current " << block_size << dendl;
224ce89b
WB
434 err = -EINVAL;
435 goto out;
7c673cae
FG
436 }
437 if (header.max_size % header.block_size) {
438 dout(2) << "open journal max size " << header.max_size
439 << " not a multiple of block size " << header.block_size << dendl;
224ce89b
WB
440 err = -EINVAL;
441 goto out;
7c673cae
FG
442 }
443 if (header.alignment != block_size && directio) {
444 dout(0) << "open journal alignment " << header.alignment << " does not match block size "
445 << block_size << " (required for direct_io journal mode)" << dendl;
224ce89b
WB
446 err = -EINVAL;
447 goto out;
7c673cae
FG
448 }
449 if ((header.alignment % CEPH_DIRECTIO_ALIGNMENT) && directio) {
450 dout(0) << "open journal alignment " << header.alignment
451 << " is not multiple of minimum directio alignment "
452 << CEPH_DIRECTIO_ALIGNMENT << " (required for direct_io journal mode)"
453 << dendl;
224ce89b
WB
454 err = -EINVAL;
455 goto out;
7c673cae
FG
456 }
457
458 // looks like a valid header.
459 write_pos = 0; // not writeable yet
460
461 journaled_seq = header.committed_up_to;
462
463 // find next entry
464 read_pos = header.start;
224ce89b 465 seq = header.start_seq;
7c673cae
FG
466
467 while (1) {
468 bufferlist bl;
469 off64_t old_pos = read_pos;
470 if (!read_entry(bl, seq)) {
471 dout(10) << "open reached end of journal." << dendl;
472 break;
473 }
474 if (seq > next_seq) {
475 dout(10) << "open entry " << seq << " len " << bl.length() << " > next_seq " << next_seq
476 << ", ignoring journal contents"
477 << dendl;
478 read_pos = -1;
479 last_committed_seq = 0;
480 return 0;
481 }
482 if (seq == next_seq) {
483 dout(10) << "open reached seq " << seq << dendl;
484 read_pos = old_pos;
485 break;
486 }
487 seq++; // next event should follow.
488 }
489
490 return 0;
224ce89b
WB
491out:
492 close();
493 return err;
7c673cae
FG
494}
495
496void FileJournal::_close(int fd) const
497{
498 VOID_TEMP_FAILURE_RETRY(::close(fd));
499}
500
501void FileJournal::close()
502{
503 dout(1) << "close " << fn << dendl;
504
505 // stop writer thread
506 stop_writer();
507
508 // close
11fdf7f2
TL
509 ceph_assert(writeq_empty());
510 ceph_assert(!must_write_header);
511 ceph_assert(fd >= 0);
7c673cae
FG
512 _close(fd);
513 fd = -1;
514}
515
516
517int FileJournal::dump(ostream& out)
518{
519 return _dump(out, false);
520}
521
522int FileJournal::simple_dump(ostream& out)
523{
524 return _dump(out, true);
525}
526
527int FileJournal::_dump(ostream& out, bool simple)
528{
529 JSONFormatter f(true);
530 int ret = _fdump(f, simple);
531 f.flush(out);
532 return ret;
533}
534
535int FileJournal::_fdump(Formatter &f, bool simple)
536{
537 dout(10) << "_fdump" << dendl;
538
11fdf7f2 539 ceph_assert(fd == -1);
7c673cae
FG
540 int err = _open(false, false);
541 if (err)
542 return err;
543
544 err = read_header(&header);
545 if (err < 0) {
546 close();
547 return err;
548 }
549
550 off64_t next_pos = header.start;
551
552 f.open_object_section("journal");
553
554 f.open_object_section("header");
555 f.dump_unsigned("flags", header.flags);
556 ostringstream os;
557 os << header.fsid;
558 f.dump_string("fsid", os.str());
559 f.dump_unsigned("block_size", header.block_size);
560 f.dump_unsigned("alignment", header.alignment);
561 f.dump_int("max_size", header.max_size);
562 f.dump_int("start", header.start);
563 f.dump_unsigned("committed_up_to", header.committed_up_to);
564 f.dump_unsigned("start_seq", header.start_seq);
565 f.close_section();
566
567 f.open_array_section("entries");
568 uint64_t seq = header.start_seq;
569 while (1) {
570 bufferlist bl;
571 off64_t pos = next_pos;
572
573 if (!pos) {
574 dout(2) << "_dump -- not readable" << dendl;
575 err = -EINVAL;
576 break;
577 }
578 stringstream ss;
579 read_entry_result result = do_read_entry(
580 pos,
581 &next_pos,
582 &bl,
583 &seq,
584 &ss);
585 if (result != SUCCESS) {
586 if (seq < header.committed_up_to) {
587 dout(2) << "Unable to read past sequence " << seq
588 << " but header indicates the journal has committed up through "
589 << header.committed_up_to << ", journal is corrupt" << dendl;
590 err = -EINVAL;
591 }
592 dout(25) << ss.str() << dendl;
593 dout(25) << "No further valid entries found, journal is most likely valid"
594 << dendl;
595 break;
596 }
597
598 f.open_object_section("entry");
599 f.dump_unsigned("offset", pos);
600 f.dump_unsigned("seq", seq);
601 if (simple) {
602 f.dump_unsigned("bl.length", bl.length());
603 } else {
604 f.open_array_section("transactions");
11fdf7f2 605 auto p = bl.cbegin();
7c673cae
FG
606 int trans_num = 0;
607 while (!p.end()) {
608 ObjectStore::Transaction t(p);
609 f.open_object_section("transaction");
610 f.dump_unsigned("trans_num", trans_num);
611 t.dump(&f);
612 f.close_section();
613 trans_num++;
614 }
615 f.close_section();
616 }
617 f.close_section();
618 }
619
620 f.close_section();
621 f.close_section();
622 dout(10) << "dump finish" << dendl;
623
624 close();
625 return err;
626}
627
628
629void FileJournal::start_writer()
630{
631 write_stop = false;
632 aio_stop = false;
633 write_thread.create("journal_write");
634#ifdef HAVE_LIBAIO
635 if (aio)
636 write_finish_thread.create("journal_wrt_fin");
637#endif
638}
639
640void FileJournal::stop_writer()
641{
642 // Do nothing if writer already stopped or never started
643 if (!write_stop)
644 {
645 {
9f95a23c
TL
646 std::lock_guard l{write_lock};
647 std::lock_guard p{writeq_lock};
7c673cae 648 write_stop = true;
9f95a23c 649 writeq_cond.notify_all();
7c673cae
FG
650 // Doesn't hurt to signal commit_cond in case thread is waiting there
651 // and caller didn't use committed_thru() first.
9f95a23c 652 commit_cond.notify_all();
7c673cae
FG
653 }
654 write_thread.join();
655
656 // write journal header now so that we have less to replay on remount
657 write_header_sync();
658 }
659
660#ifdef HAVE_LIBAIO
661 // stop aio completeion thread *after* writer thread has stopped
662 // and has submitted all of its io
663 if (aio && !aio_stop) {
9f95a23c 664 aio_lock.lock();
7c673cae 665 aio_stop = true;
9f95a23c
TL
666 aio_cond.notify_all();
667 write_finish_cond.notify_all();
668 aio_lock.unlock();
7c673cae
FG
669 write_finish_thread.join();
670 }
671#endif
672}
673
674
675
676void FileJournal::print_header(const header_t &header) const
677{
678 dout(10) << "header: block_size " << header.block_size
679 << " alignment " << header.alignment
680 << " max_size " << header.max_size
681 << dendl;
682 dout(10) << "header: start " << header.start << dendl;
683 dout(10) << " write_pos " << write_pos << dendl;
684}
685
686int FileJournal::read_header(header_t *hdr) const
687{
688 dout(10) << "read_header" << dendl;
689 bufferlist bl;
690
f67539c2 691 ceph::buffer::ptr bp = ceph::buffer::create_small_page_aligned(block_size);
7c673cae
FG
692 char* bpdata = bp.c_str();
693 int r = ::pread(fd, bpdata, bp.length(), 0);
694
695 if (r < 0) {
696 int err = errno;
697 dout(0) << "read_header got " << cpp_strerror(err) << dendl;
698 return -err;
699 }
700
701 // don't use bp.zero() here, because it also invalidates
702 // crc cache (which is not yet populated anyway)
703 if (bp.length() != (size_t)r) {
704 // r will be always less or equal than bp.length
705 bpdata += r;
706 memset(bpdata, 0, bp.length() - r);
707 }
708
709 bl.push_back(std::move(bp));
710
711 try {
11fdf7f2
TL
712 auto p = bl.cbegin();
713 decode(*hdr, p);
7c673cae 714 }
f67539c2 715 catch (ceph::buffer::error& e) {
7c673cae
FG
716 derr << "read_header error decoding journal header" << dendl;
717 return -EINVAL;
718 }
719
720
721 /*
722 * Unfortunately we weren't initializing the flags field for new
723 * journals! Aie. This is safe(ish) now that we have only one
724 * flag. Probably around when we add the next flag we need to
725 * remove this or else this (eventually old) code will clobber newer
726 * code's flags.
727 */
728 if (hdr->flags > 3) {
729 derr << "read_header appears to have gibberish flags; assuming 0" << dendl;
730 hdr->flags = 0;
731 }
732
733 print_header(*hdr);
734
735 return 0;
736}
737
738bufferptr FileJournal::prepare_header()
739{
740 bufferlist bl;
741 {
9f95a23c 742 std::lock_guard l{finisher_lock};
7c673cae
FG
743 header.committed_up_to = journaled_seq;
744 }
11fdf7f2 745 encode(header, bl);
f67539c2 746 bufferptr bp = ceph::buffer::create_small_page_aligned(get_top());
7c673cae
FG
747 // don't use bp.zero() here, because it also invalidates
748 // crc cache (which is not yet populated anyway)
749 char* data = bp.c_str();
750 memcpy(data, bl.c_str(), bl.length());
751 data += bl.length();
752 memset(data, 0, bp.length()-bl.length());
753 return bp;
754}
755
756void FileJournal::write_header_sync()
757{
9f95a23c 758 std::lock_guard locker{write_lock};
7c673cae
FG
759 must_write_header = true;
760 bufferlist bl;
761 do_write(bl);
762 dout(20) << __func__ << " finish" << dendl;
763}
764
765int FileJournal::check_for_full(uint64_t seq, off64_t pos, off64_t size)
766{
767 // already full?
768 if (full_state != FULL_NOTFULL)
769 return -ENOSPC;
770
771 // take 1 byte off so that we only get pos == header.start on EMPTY, never on FULL.
772 off64_t room;
773 if (pos >= header.start)
774 room = (header.max_size - pos) + (header.start - get_top()) - 1;
775 else
776 room = header.start - pos - 1;
777 dout(10) << "room " << room << " max_size " << max_size << " pos " << pos << " header.start " << header.start
778 << " top " << get_top() << dendl;
779
780 if (do_sync_cond) {
781 if (room >= (header.max_size >> 1) &&
782 room - size < (header.max_size >> 1)) {
783 dout(10) << " passing half full mark, triggering commit" << dendl;
9f95a23c
TL
784#ifdef CEPH_DEBUG_MUTEX
785 do_sync_cond->notify_all(true); // initiate a real commit so we can trim
786#else
787 do_sync_cond->notify_all();
788#endif
7c673cae
FG
789 }
790 }
791
792 if (room >= size) {
793 dout(10) << "check_for_full at " << pos << " : " << size << " < " << room << dendl;
794 if (pos + size > header.max_size)
795 must_write_header = true;
796 return 0;
797 }
798
799 // full
800 dout(1) << "check_for_full at " << pos << " : JOURNAL FULL "
801 << pos << " >= " << room
802 << " (max_size " << header.max_size << " start " << header.start << ")"
803 << dendl;
804
805 off64_t max = header.max_size - get_top();
806 if (size > max)
807 dout(0) << "JOURNAL TOO SMALL: continuing, but slow: item " << size << " > journal " << max << " (usable)" << dendl;
808
809 return -ENOSPC;
810}
811
812int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytes)
813{
814 // gather queued writes
815 off64_t queue_pos = write_pos;
816
817 int eleft = cct->_conf->journal_max_write_entries;
818 unsigned bmax = cct->_conf->journal_max_write_bytes;
819
820 if (full_state != FULL_NOTFULL)
821 return -ENOSPC;
822
823 while (!writeq_empty()) {
824 list<write_item> items;
825 batch_pop_write(items);
826 list<write_item>::iterator it = items.begin();
827 while (it != items.end()) {
828 uint64_t bytes = it->bl.length();
829 int r = prepare_single_write(*it, bl, queue_pos, orig_ops, orig_bytes);
830 if (r == 0) { // prepare ok, delete it
831 items.erase(it++);
832#ifdef HAVE_LIBAIO
833 {
9f95a23c 834 std::lock_guard locker{aio_lock};
11fdf7f2 835 ceph_assert(aio_write_queue_ops > 0);
7c673cae 836 aio_write_queue_ops--;
11fdf7f2 837 ceph_assert(aio_write_queue_bytes >= bytes);
7c673cae
FG
838 aio_write_queue_bytes -= bytes;
839 }
840#else
841 (void)bytes;
842#endif
843 }
844 if (r == -ENOSPC) {
845 // the journal maybe full, insert the left item to writeq
846 batch_unpop_write(items);
847 if (orig_ops)
848 goto out; // commit what we have
849
850 if (logger)
851 logger->inc(l_filestore_journal_full);
852
853 if (wait_on_full) {
854 dout(20) << "prepare_multi_write full on first entry, need to wait" << dendl;
855 } else {
856 dout(20) << "prepare_multi_write full on first entry, restarting journal" << dendl;
857
858 // throw out what we have so far
859 full_state = FULL_FULL;
860 while (!writeq_empty()) {
861 complete_write(1, peek_write().orig_len);
862 pop_write();
863 }
864 print_header(header);
865 }
224ce89b 866
7c673cae
FG
867 return -ENOSPC; // hrm, full on first op
868 }
869 if (eleft) {
870 if (--eleft == 0) {
871 dout(20) << "prepare_multi_write hit max events per write "
872 << cct->_conf->journal_max_write_entries << dendl;
873 batch_unpop_write(items);
874 goto out;
875 }
876 }
877 if (bmax) {
878 if (bl.length() >= bmax) {
879 dout(20) << "prepare_multi_write hit max write size "
880 << cct->_conf->journal_max_write_bytes << dendl;
881 batch_unpop_write(items);
882 goto out;
883 }
884 }
885 }
886 }
887
888out:
889 dout(20) << "prepare_multi_write queue_pos now " << queue_pos << dendl;
11fdf7f2 890 ceph_assert((write_pos + bl.length() == queue_pos) ||
7c673cae
FG
891 (write_pos + bl.length() - header.max_size + get_top() == queue_pos));
892 return 0;
893}
894
895/*
896void FileJournal::queue_write_fin(uint64_t seq, Context *fin)
897{
898 writing_seq.push_back(seq);
899 if (!waiting_for_notfull.empty()) {
900 // make sure previously unjournaled stuff waiting for UNFULL triggers
901 // _before_ newly journaled stuff does
902 dout(10) << "queue_write_fin will defer seq " << seq << " callback " << fin
903 << " until after UNFULL" << dendl;
904 C_Gather *g = new C_Gather(writeq.front().fin);
905 writing_fin.push_back(g->new_sub());
906 waiting_for_notfull.push_back(g->new_sub());
907 } else {
908 writing_fin.push_back(writeq.front().fin);
909 dout(20) << "queue_write_fin seq " << seq << " callback " << fin << dendl;
910 }
911}
912*/
913
914void FileJournal::queue_completions_thru(uint64_t seq)
915{
9f95a23c 916 ceph_assert(ceph_mutex_is_locked(finisher_lock));
7c673cae
FG
917 utime_t now = ceph_clock_now();
918 list<completion_item> items;
919 batch_pop_completions(items);
920 list<completion_item>::iterator it = items.begin();
921 while (it != items.end()) {
922 completion_item& next = *it;
923 if (next.seq > seq)
924 break;
925 utime_t lat = now;
926 lat -= next.start;
927 dout(10) << "queue_completions_thru seq " << seq
928 << " queueing seq " << next.seq
929 << " " << next.finish
930 << " lat " << lat << dendl;
931 if (logger) {
932 logger->tinc(l_filestore_journal_latency, lat);
933 }
934 if (next.finish)
935 finisher->queue(next.finish);
936 if (next.tracked_op) {
937 next.tracked_op->mark_event("journaled_completion_queued");
938 next.tracked_op->journal_trace.event("queued completion");
939 next.tracked_op->journal_trace.keyval("completed through", seq);
940 }
941 items.erase(it++);
942 }
943 batch_unpop_completions(items);
9f95a23c 944 finisher_cond.notify_all();
7c673cae
FG
945}
946
947
948int FileJournal::prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes)
949{
950 uint64_t seq = next_write.seq;
951 bufferlist &ebl = next_write.bl;
952 off64_t size = ebl.length();
953
954 int r = check_for_full(seq, queue_pos, size);
955 if (r < 0)
956 return r; // ENOSPC or EAGAIN
957
958 uint32_t orig_len = next_write.orig_len;
959 orig_bytes += orig_len;
960 orig_ops++;
961
962 // add to write buffer
963 dout(15) << "prepare_single_write " << orig_ops << " will write " << queue_pos << " : seq " << seq
964 << " len " << orig_len << " -> " << size << dendl;
965
966 unsigned seq_offset = offsetof(entry_header_t, seq);
967 unsigned magic1_offset = offsetof(entry_header_t, magic1);
968 unsigned magic2_offset = offsetof(entry_header_t, magic2);
969
970 bufferptr headerptr = ebl.buffers().front();
971 uint64_t _seq = seq;
972 uint64_t _queue_pos = queue_pos;
973 uint64_t magic2 = entry_header_t::make_magic(seq, orig_len, header.get_fsid64());
974 headerptr.copy_in(seq_offset, sizeof(uint64_t), (char *)&_seq);
975 headerptr.copy_in(magic1_offset, sizeof(uint64_t), (char *)&_queue_pos);
976 headerptr.copy_in(magic2_offset, sizeof(uint64_t), (char *)&magic2);
977
978 bufferptr footerptr = ebl.buffers().back();
979 unsigned post_offset = footerptr.length() - sizeof(entry_header_t);
980 footerptr.copy_in(post_offset + seq_offset, sizeof(uint64_t), (char *)&_seq);
981 footerptr.copy_in(post_offset + magic1_offset, sizeof(uint64_t), (char *)&_queue_pos);
982 footerptr.copy_in(post_offset + magic2_offset, sizeof(uint64_t), (char *)&magic2);
983
984 bl.claim_append(ebl);
985 if (next_write.tracked_op) {
986 next_write.tracked_op->mark_event("write_thread_in_journal_buffer");
987 next_write.tracked_op->journal_trace.event("prepare_single_write");
988 }
989
990 journalq.push_back(pair<uint64_t,off64_t>(seq, queue_pos));
991 writing_seq = seq;
992
993 queue_pos += size;
994 if (queue_pos >= header.max_size)
995 queue_pos = queue_pos + get_top() - header.max_size;
996
997 return 0;
998}
999
1000void FileJournal::check_align(off64_t pos, bufferlist& bl)
1001{
1002 // make sure list segments are page aligned
1003 if (directio && !bl.is_aligned_size_and_memory(block_size, CEPH_DIRECTIO_ALIGNMENT)) {
11fdf7f2
TL
1004 ceph_assert((bl.length() & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0);
1005 ceph_assert((pos & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0);
1006 ceph_abort_msg("bl was not aligned");
7c673cae
FG
1007 }
1008}
1009
1010int FileJournal::write_bl(off64_t& pos, bufferlist& bl)
1011{
1012 int ret;
1013
1014 off64_t spos = ::lseek64(fd, pos, SEEK_SET);
1015 if (spos < 0) {
1016 ret = -errno;
1017 derr << "FileJournal::write_bl : lseek64 failed " << cpp_strerror(ret) << dendl;
1018 return ret;
1019 }
1020 ret = bl.write_fd(fd);
1021 if (ret) {
1022 derr << "FileJournal::write_bl : write_fd failed: " << cpp_strerror(ret) << dendl;
1023 return ret;
1024 }
1025 pos += bl.length();
1026 if (pos == header.max_size)
1027 pos = get_top();
1028 return 0;
1029}
1030
1031void FileJournal::do_write(bufferlist& bl)
1032{
1033 // nothing to do?
1034 if (bl.length() == 0 && !must_write_header)
1035 return;
1036
f67539c2 1037 ceph::buffer::ptr hbp;
7c673cae
FG
1038 if (cct->_conf->journal_write_header_frequency &&
1039 (((++journaled_since_start) %
1040 cct->_conf->journal_write_header_frequency) == 0)) {
1041 must_write_header = true;
1042 }
1043
1044 if (must_write_header) {
1045 must_write_header = false;
1046 hbp = prepare_header();
1047 }
1048
1049 dout(15) << "do_write writing " << write_pos << "~" << bl.length()
1050 << (hbp.length() ? " + header":"")
1051 << dendl;
1052
1053 utime_t from = ceph_clock_now();
1054
1055 // entry
1056 off64_t pos = write_pos;
1057
1058 // Adjust write_pos
1059 write_pos += bl.length();
1060 if (write_pos >= header.max_size)
1061 write_pos = write_pos - header.max_size + get_top();
1062
9f95a23c 1063 write_lock.unlock();
7c673cae
FG
1064
1065 // split?
1066 off64_t split = 0;
1067 if (pos + bl.length() > header.max_size) {
1068 bufferlist first, second;
1069 split = header.max_size - pos;
1070 first.substr_of(bl, 0, split);
1071 second.substr_of(bl, split, bl.length() - split);
11fdf7f2 1072 ceph_assert(first.length() + second.length() == bl.length());
7c673cae
FG
1073 dout(10) << "do_write wrapping, first bit at " << pos << " len " << first.length()
1074 << " second bit len " << second.length() << " (orig len " << bl.length() << ")" << dendl;
1075
1076 //Save pos to write first piece second
1077 off64_t first_pos = pos;
1078 off64_t orig_pos;
1079 pos = get_top();
1080 // header too?
1081 if (hbp.length()) {
1082 // be sneaky: include the header in the second fragment
11fdf7f2
TL
1083 bufferlist tmp;
1084 tmp.push_back(hbp);
1085 tmp.claim_append(second);
1086 second.swap(tmp);
7c673cae
FG
1087 pos = 0; // we included the header
1088 }
1089 // Write the second portion first possible with the header, so
1090 // do_read_entry() won't even get a valid entry_header_t if there
1091 // is a crash between the two writes.
1092 orig_pos = pos;
1093 if (write_bl(pos, second)) {
1094 derr << "FileJournal::do_write: write_bl(pos=" << orig_pos
1095 << ") failed" << dendl;
1096 check_align(pos, second);
1097 ceph_abort();
1098 }
1099 orig_pos = first_pos;
1100 if (write_bl(first_pos, first)) {
1101 derr << "FileJournal::do_write: write_bl(pos=" << orig_pos
1102 << ") failed" << dendl;
1103 check_align(first_pos, first);
1104 ceph_abort();
1105 }
11fdf7f2 1106 ceph_assert(first_pos == get_top());
7c673cae
FG
1107 } else {
1108 // header too?
1109 if (hbp.length()) {
1110 if (TEMP_FAILURE_RETRY(::pwrite(fd, hbp.c_str(), hbp.length(), 0)) < 0) {
1111 int err = errno;
1112 derr << "FileJournal::do_write: pwrite(fd=" << fd
1113 << ", hbp.length=" << hbp.length() << ") failed :"
1114 << cpp_strerror(err) << dendl;
1115 ceph_abort();
1116 }
1117 }
1118
1119 if (write_bl(pos, bl)) {
1120 derr << "FileJournal::do_write: write_bl(pos=" << pos
1121 << ") failed" << dendl;
1122 check_align(pos, bl);
1123 ceph_abort();
1124 }
1125 }
1126
1127 if (!directio) {
1128 dout(20) << "do_write fsync" << dendl;
1129
1130 /*
1131 * We'd really love to have a fsync_range or fdatasync_range and do a:
1132 *
1133 * if (split) {
1134 * ::fsync_range(fd, header.max_size - split, split)l
1135 * ::fsync_range(fd, get_top(), bl.length() - split);
1136 * else
1137 * ::fsync_range(fd, write_pos, bl.length())
1138 *
1139 * NetBSD and AIX apparently have it, and adding it to Linux wouldn't be
1140 * too hard given all the underlying infrastructure already exist.
1141 *
1142 * NOTE: using sync_file_range here would not be safe as it does not
1143 * flush disk caches or commits any sort of metadata.
1144 */
1145 int ret = 0;
11fdf7f2 1146#if defined(__APPLE__) || defined(__FreeBSD__)
7c673cae
FG
1147 ret = ::fsync(fd);
1148#else
1149 ret = ::fdatasync(fd);
1150#endif
1151 if (ret < 0) {
1152 derr << __func__ << " fsync/fdatasync failed: " << cpp_strerror(errno) << dendl;
1153 ceph_abort();
1154 }
1155#ifdef HAVE_POSIX_FADVISE
1156 if (cct->_conf->filestore_fadvise)
1157 posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED);
1158#endif
1159 }
1160
1161 utime_t lat = ceph_clock_now() - from;
1162 dout(20) << "do_write latency " << lat << dendl;
1163
9f95a23c 1164 write_lock.lock();
7c673cae 1165
11fdf7f2
TL
1166 ceph_assert(write_pos == pos);
1167 ceph_assert(write_pos % header.alignment == 0);
7c673cae
FG
1168
1169 {
9f95a23c 1170 std::lock_guard locker{finisher_lock};
7c673cae
FG
1171 journaled_seq = writing_seq;
1172
1173 // kick finisher?
1174 // only if we haven't filled up recently!
1175 if (full_state != FULL_NOTFULL) {
1176 dout(10) << "do_write NOT queueing finisher seq " << journaled_seq
1177 << ", full_commit_seq|full_restart_seq" << dendl;
1178 } else {
1179 if (plug_journal_completions) {
1180 dout(20) << "do_write NOT queueing finishers through seq " << journaled_seq
1181 << " due to completion plug" << dendl;
1182 } else {
1183 dout(20) << "do_write queueing finishers through seq " << journaled_seq << dendl;
1184 queue_completions_thru(journaled_seq);
1185 }
1186 }
1187 }
1188}
1189
1190void FileJournal::flush()
1191{
1192 dout(10) << "waiting for completions to empty" << dendl;
1193 {
9f95a23c
TL
1194 std::unique_lock l{finisher_lock};
1195 finisher_cond.wait(l, [this] { return completions_empty(); });
7c673cae
FG
1196 }
1197 dout(10) << "flush waiting for finisher" << dendl;
1198 finisher->wait_for_empty();
1199 dout(10) << "flush done" << dendl;
1200}
1201
1202
1203void FileJournal::write_thread_entry()
1204{
1205 dout(10) << "write_thread_entry start" << dendl;
1206 while (1) {
1207 {
9f95a23c 1208 std::unique_lock locker{writeq_lock};
7c673cae
FG
1209 if (writeq.empty() && !must_write_header) {
1210 if (write_stop)
1211 break;
1212 dout(20) << "write_thread_entry going to sleep" << dendl;
9f95a23c 1213 writeq_cond.wait(locker);
7c673cae
FG
1214 dout(20) << "write_thread_entry woke up" << dendl;
1215 continue;
1216 }
1217 }
1218
1219#ifdef HAVE_LIBAIO
1220 if (aio) {
9f95a23c 1221 std::unique_lock locker{aio_lock};
7c673cae
FG
1222 // should we back off to limit aios in flight? try to do this
1223 // adaptively so that we submit larger aios once we have lots of
1224 // them in flight.
1225 //
1226 // NOTE: our condition here is based on aio_num (protected by
1227 // aio_lock) and throttle_bytes (part of the write queue). when
1228 // we sleep, we *only* wait for aio_num to change, and do not
1229 // wake when more data is queued. this is not strictly correct,
1230 // but should be fine given that we will have plenty of aios in
1231 // flight if we hit this limit to ensure we keep the device
1232 // saturated.
1233 while (aio_num > 0) {
11fdf7f2 1234 int exp = std::min<int>(aio_num * 2, 24);
7c673cae
FG
1235 long unsigned min_new = 1ull << exp;
1236 uint64_t cur = aio_write_queue_bytes;
1237 dout(20) << "write_thread_entry aio throttle: aio num " << aio_num << " bytes " << aio_bytes
1238 << " ... exp " << exp << " min_new " << min_new
1239 << " ... pending " << cur << dendl;
1240 if (cur >= min_new)
1241 break;
1242 dout(20) << "write_thread_entry deferring until more aios complete: "
1243 << aio_num << " aios with " << aio_bytes << " bytes needs " << min_new
1244 << " bytes to start a new aio (currently " << cur << " pending)" << dendl;
9f95a23c 1245 aio_cond.wait(locker);
7c673cae
FG
1246 dout(20) << "write_thread_entry woke up" << dendl;
1247 }
1248 }
1249#endif
1250
9f95a23c 1251 std::unique_lock locker{write_lock};
7c673cae
FG
1252 uint64_t orig_ops = 0;
1253 uint64_t orig_bytes = 0;
1254
1255 bufferlist bl;
1256 int r = prepare_multi_write(bl, orig_ops, orig_bytes);
1257 // Don't care about journal full if stoppping, so drop queue and
1258 // possibly let header get written and loop above to notice stop
1259 if (r == -ENOSPC) {
1260 if (write_stop) {
1261 dout(20) << "write_thread_entry full and stopping, throw out queue and finish up" << dendl;
1262 while (!writeq_empty()) {
1263 complete_write(1, peek_write().orig_len);
1264 pop_write();
1265 }
1266 print_header(header);
1267 r = 0;
1268 } else {
1269 dout(20) << "write_thread_entry full, going to sleep (waiting for commit)" << dendl;
9f95a23c 1270 commit_cond.wait(locker);
7c673cae
FG
1271 dout(20) << "write_thread_entry woke up" << dendl;
1272 continue;
1273 }
1274 }
11fdf7f2 1275 ceph_assert(r == 0);
7c673cae
FG
1276
1277 if (logger) {
1278 logger->inc(l_filestore_journal_wr);
1279 logger->inc(l_filestore_journal_wr_bytes, bl.length());
1280 }
1281
1282#ifdef HAVE_LIBAIO
1283 if (aio)
1284 do_aio_write(bl);
1285 else
1286 do_write(bl);
1287#else
1288 do_write(bl);
1289#endif
1290 complete_write(orig_ops, orig_bytes);
1291 }
1292
1293 dout(10) << "write_thread_entry finish" << dendl;
1294}
1295
1296#ifdef HAVE_LIBAIO
1297void FileJournal::do_aio_write(bufferlist& bl)
1298{
1299
1300 if (cct->_conf->journal_write_header_frequency &&
1301 (((++journaled_since_start) %
1302 cct->_conf->journal_write_header_frequency) == 0)) {
1303 must_write_header = true;
1304 }
1305
1306 // nothing to do?
1307 if (bl.length() == 0 && !must_write_header)
1308 return;
1309
f67539c2 1310 ceph::buffer::ptr hbp;
7c673cae
FG
1311 if (must_write_header) {
1312 must_write_header = false;
1313 hbp = prepare_header();
1314 }
1315
1316 // entry
1317 off64_t pos = write_pos;
1318
1319 dout(15) << "do_aio_write writing " << pos << "~" << bl.length()
1320 << (hbp.length() ? " + header":"")
1321 << dendl;
1322
1323 // split?
1324 off64_t split = 0;
1325 if (pos + bl.length() > header.max_size) {
1326 bufferlist first, second;
1327 split = header.max_size - pos;
1328 first.substr_of(bl, 0, split);
1329 second.substr_of(bl, split, bl.length() - split);
11fdf7f2 1330 ceph_assert(first.length() + second.length() == bl.length());
7c673cae
FG
1331 dout(10) << "do_aio_write wrapping, first bit at " << pos << "~" << first.length() << dendl;
1332
1333 if (write_aio_bl(pos, first, 0)) {
1334 derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
1335 << ") failed" << dendl;
1336 ceph_abort();
1337 }
11fdf7f2 1338 ceph_assert(pos == header.max_size);
7c673cae
FG
1339 if (hbp.length()) {
1340 // be sneaky: include the header in the second fragment
11fdf7f2
TL
1341 bufferlist tmp;
1342 tmp.push_back(hbp);
1343 tmp.claim_append(second);
1344 second.swap(tmp);
7c673cae
FG
1345 pos = 0; // we included the header
1346 } else
1347 pos = get_top(); // no header, start after that
1348 if (write_aio_bl(pos, second, writing_seq)) {
1349 derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
1350 << ") failed" << dendl;
1351 ceph_abort();
1352 }
1353 } else {
1354 // header too?
1355 if (hbp.length()) {
1356 bufferlist hbl;
1357 hbl.push_back(hbp);
1358 loff_t pos = 0;
1359 if (write_aio_bl(pos, hbl, 0)) {
1360 derr << "FileJournal::do_aio_write: write_aio_bl(header) failed" << dendl;
1361 ceph_abort();
1362 }
1363 }
1364
1365 if (write_aio_bl(pos, bl, writing_seq)) {
1366 derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
1367 << ") failed" << dendl;
1368 ceph_abort();
1369 }
1370 }
1371
1372 write_pos = pos;
1373 if (write_pos == header.max_size)
1374 write_pos = get_top();
11fdf7f2 1375 ceph_assert(write_pos % header.alignment == 0);
7c673cae
FG
1376}
1377
1378/**
1379 * write a buffer using aio
1380 *
1381 * @param seq seq to trigger when this aio completes. if 0, do not update any state
1382 * on completion.
1383 */
1384int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq)
1385{
1386 dout(20) << "write_aio_bl " << pos << "~" << bl.length() << " seq " << seq << dendl;
1387
1388 while (bl.length() > 0) {
11fdf7f2 1389 int max = std::min<int>(bl.get_num_buffers(), IOV_MAX-1);
7c673cae
FG
1390 iovec *iov = new iovec[max];
1391 int n = 0;
1392 unsigned len = 0;
11fdf7f2
TL
1393 for (auto p = std::cbegin(bl.buffers()); n < max; ++p, ++n) {
1394 ceph_assert(p != std::cend(bl.buffers()));
1395 iov[n].iov_base = const_cast<void*>(static_cast<const void*>(p->c_str()));
7c673cae
FG
1396 iov[n].iov_len = p->length();
1397 len += p->length();
1398 }
1399
1400 bufferlist tbl;
1401 bl.splice(0, len, &tbl); // move bytes from bl -> tbl
1402
1403 // lock only aio_queue, current aio, aio_num, aio_bytes, which may be
1404 // modified in check_aio_completion
9f95a23c 1405 aio_lock.lock();
7c673cae
FG
1406 aio_queue.push_back(aio_info(tbl, pos, bl.length() > 0 ? 0 : seq));
1407 aio_info& aio = aio_queue.back();
1408 aio.iov = iov;
1409
1410 io_prep_pwritev(&aio.iocb, fd, aio.iov, n, pos);
1411
1412 dout(20) << "write_aio_bl .. " << aio.off << "~" << aio.len
1413 << " in " << n << dendl;
1414
1415 aio_num++;
1416 aio_bytes += aio.len;
1417
1418 // need to save current aio len to update write_pos later because current
1419 // aio could be ereased from aio_queue once it is done
1420 uint64_t cur_len = aio.len;
1421 // unlock aio_lock because following io_submit might take time to return
9f95a23c 1422 aio_lock.unlock();
7c673cae
FG
1423
1424 iocb *piocb = &aio.iocb;
224ce89b 1425
7c673cae
FG
1426 // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds
1427 int attempts = 16;
1428 int delay = 125;
1429 do {
1430 int r = io_submit(aio_ctx, 1, &piocb);
1431 dout(20) << "write_aio_bl io_submit return value: " << r << dendl;
1432 if (r < 0) {
1433 derr << "io_submit to " << aio.off << "~" << cur_len
1434 << " got " << cpp_strerror(r) << dendl;
1435 if (r == -EAGAIN && attempts-- > 0) {
1436 usleep(delay);
1437 delay *= 2;
1438 continue;
1439 }
1440 check_align(pos, tbl);
11fdf7f2 1441 ceph_abort_msg("io_submit got unexpected error");
7c673cae
FG
1442 } else {
1443 break;
1444 }
1445 } while (true);
1446 pos += cur_len;
1447 }
9f95a23c
TL
1448 aio_lock.lock();
1449 write_finish_cond.notify_all();
1450 aio_lock.unlock();
7c673cae
FG
1451 return 0;
1452}
1453#endif
1454
1455void FileJournal::write_finish_thread_entry()
1456{
1457#ifdef HAVE_LIBAIO
b32b8144 1458 dout(10) << __func__ << " enter" << dendl;
7c673cae
FG
1459 while (true) {
1460 {
9f95a23c 1461 std::unique_lock locker{aio_lock};
7c673cae
FG
1462 if (aio_queue.empty()) {
1463 if (aio_stop)
1464 break;
b32b8144 1465 dout(20) << __func__ << " sleeping" << dendl;
9f95a23c 1466 write_finish_cond.wait(locker);
7c673cae
FG
1467 continue;
1468 }
1469 }
1470
b32b8144 1471 dout(20) << __func__ << " waiting for aio(s)" << dendl;
7c673cae
FG
1472 io_event event[16];
1473 int r = io_getevents(aio_ctx, 1, 16, event, NULL);
1474 if (r < 0) {
1475 if (r == -EINTR) {
1476 dout(0) << "io_getevents got " << cpp_strerror(r) << dendl;
1477 continue;
1478 }
1479 derr << "io_getevents got " << cpp_strerror(r) << dendl;
11fdf7f2
TL
1480 if (r == -EIO) {
1481 note_io_error_event(devname.c_str(), fn.c_str(), -EIO, 0, 0, 0);
1482 }
1483 ceph_abort_msg("got unexpected error from io_getevents");
7c673cae
FG
1484 }
1485
1486 {
9f95a23c 1487 std::lock_guard locker{aio_lock};
7c673cae
FG
1488 for (int i=0; i<r; i++) {
1489 aio_info *ai = (aio_info *)event[i].obj;
1490 if (event[i].res != ai->len) {
1491 derr << "aio to " << ai->off << "~" << ai->len
1492 << " returned: " << (int)event[i].res << dendl;
11fdf7f2 1493 ceph_abort_msg("unexpected aio error");
7c673cae 1494 }
b32b8144 1495 dout(10) << __func__ << " aio " << ai->off
7c673cae
FG
1496 << "~" << ai->len << " done" << dendl;
1497 ai->done = true;
1498 }
1499 check_aio_completion();
1500 }
1501 }
b32b8144 1502 dout(10) << __func__ << " exit" << dendl;
7c673cae
FG
1503#endif
1504}
1505
1506#ifdef HAVE_LIBAIO
1507/**
1508 * check aio_wait for completed aio, and update state appropriately.
1509 */
1510void FileJournal::check_aio_completion()
1511{
9f95a23c 1512 ceph_assert(ceph_mutex_is_locked(aio_lock));
7c673cae
FG
1513 dout(20) << "check_aio_completion" << dendl;
1514
1515 bool completed_something = false, signal = false;
1516 uint64_t new_journaled_seq = 0;
1517
1518 list<aio_info>::iterator p = aio_queue.begin();
1519 while (p != aio_queue.end() && p->done) {
1520 dout(20) << "check_aio_completion completed seq " << p->seq << " "
1521 << p->off << "~" << p->len << dendl;
1522 if (p->seq) {
1523 new_journaled_seq = p->seq;
1524 completed_something = true;
1525 }
1526 aio_num--;
1527 aio_bytes -= p->len;
1528 aio_queue.erase(p++);
1529 signal = true;
1530 }
1531
1532 if (completed_something) {
1533 // kick finisher?
1534 // only if we haven't filled up recently!
9f95a23c 1535 std::lock_guard locker{finisher_lock};
7c673cae
FG
1536 journaled_seq = new_journaled_seq;
1537 if (full_state != FULL_NOTFULL) {
1538 dout(10) << "check_aio_completion NOT queueing finisher seq " << journaled_seq
1539 << ", full_commit_seq|full_restart_seq" << dendl;
1540 } else {
1541 if (plug_journal_completions) {
1542 dout(20) << "check_aio_completion NOT queueing finishers through seq " << journaled_seq
1543 << " due to completion plug" << dendl;
1544 } else {
1545 dout(20) << "check_aio_completion queueing finishers through seq " << journaled_seq << dendl;
1546 queue_completions_thru(journaled_seq);
1547 }
1548 }
1549 }
1550 if (signal) {
1551 // maybe write queue was waiting for aio count to drop?
9f95a23c 1552 aio_cond.notify_all();
7c673cae
FG
1553 }
1554}
1555#endif
1556
1557int FileJournal::prepare_entry(vector<ObjectStore::Transaction>& tls, bufferlist* tbl) {
1558 dout(10) << "prepare_entry " << tls << dendl;
1559 int data_len = cct->_conf->journal_align_min_size - 1;
1560 int data_align = -1; // -1 indicates that we don't care about the alignment
1561 bufferlist bl;
1562 for (vector<ObjectStore::Transaction>::iterator p = tls.begin();
1563 p != tls.end(); ++p) {
1564 if ((int)(*p).get_data_length() > data_len) {
1565 data_len = (*p).get_data_length();
1566 data_align = ((*p).get_data_alignment() - bl.length()) & ~CEPH_PAGE_MASK;
1567 }
11fdf7f2 1568 encode(*p, bl);
7c673cae
FG
1569 }
1570 if (tbl->length()) {
1571 bl.claim_append(*tbl);
1572 }
1573 // add it this entry
1574 entry_header_t h;
1575 unsigned head_size = sizeof(entry_header_t);
1576 off64_t base_size = 2*head_size + bl.length();
1577 memset(&h, 0, sizeof(h));
1578 if (data_align >= 0)
1579 h.pre_pad = ((unsigned int)data_align - (unsigned int)head_size) & ~CEPH_PAGE_MASK;
11fdf7f2 1580 off64_t size = round_up_to(base_size + h.pre_pad, header.alignment);
7c673cae
FG
1581 unsigned post_pad = size - base_size - h.pre_pad;
1582 h.len = bl.length();
1583 h.post_pad = post_pad;
1584 h.crc32c = bl.crc32c(0);
1585 dout(10) << " len " << bl.length() << " -> " << size
1586 << " (head " << head_size << " pre_pad " << h.pre_pad
1587 << " bl " << bl.length() << " post_pad " << post_pad << " tail " << head_size << ")"
1588 << " (bl alignment " << data_align << ")"
1589 << dendl;
1590 bufferlist ebl;
1591 // header
1592 ebl.append((const char*)&h, sizeof(h));
1593 if (h.pre_pad) {
f67539c2 1594 ebl.push_back(ceph::buffer::create_static(h.pre_pad, zero_buf));
7c673cae
FG
1595 }
1596 // payload
9f95a23c 1597 ebl.claim_append(bl);
7c673cae 1598 if (h.post_pad) {
f67539c2 1599 ebl.push_back(ceph::buffer::create_static(h.post_pad, zero_buf));
7c673cae
FG
1600 }
1601 // footer
1602 ebl.append((const char*)&h, sizeof(h));
1603 if (directio)
1604 ebl.rebuild_aligned(CEPH_DIRECTIO_ALIGNMENT);
f67539c2 1605 *tbl = std::move(ebl);
7c673cae
FG
1606 return h.len;
1607}
1608
1609void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len,
1610 Context *oncommit, TrackedOpRef osd_op)
1611{
1612 // dump on queue
1613 dout(5) << "submit_entry seq " << seq
1614 << " len " << e.length()
1615 << " (" << oncommit << ")" << dendl;
11fdf7f2
TL
1616 ceph_assert(e.length() > 0);
1617 ceph_assert(e.length() < header.max_size);
7c673cae 1618
7c673cae
FG
1619 if (logger) {
1620 logger->inc(l_filestore_journal_queue_bytes, orig_len);
1621 logger->inc(l_filestore_journal_queue_ops, 1);
1622 }
1623
1624 throttle.register_throttle_seq(seq, e.length());
1625 if (logger) {
1626 logger->inc(l_filestore_journal_ops, 1);
1627 logger->inc(l_filestore_journal_bytes, e.length());
1628 }
1629
1630 if (osd_op) {
1631 osd_op->mark_event("commit_queued_for_journal_write");
1632 if (osd_op->store_trace) {
1633 osd_op->journal_trace.init("journal", &trace_endpoint, &osd_op->store_trace);
1634 osd_op->journal_trace.event("submit_entry");
1635 osd_op->journal_trace.keyval("seq", seq);
1636 }
1637 }
1638 {
9f95a23c 1639 std::lock_guard l1{writeq_lock};
7c673cae 1640#ifdef HAVE_LIBAIO
9f95a23c 1641 std::lock_guard l2{aio_lock};
7c673cae 1642#endif
9f95a23c 1643 std::lock_guard l3{completions_lock};
7c673cae
FG
1644
1645#ifdef HAVE_LIBAIO
1646 aio_write_queue_ops++;
1647 aio_write_queue_bytes += e.length();
9f95a23c 1648 aio_cond.notify_all();
7c673cae
FG
1649#endif
1650
1651 completions.push_back(
1652 completion_item(
1653 seq, oncommit, ceph_clock_now(), osd_op));
1654 if (writeq.empty())
9f95a23c 1655 writeq_cond.notify_all();
7c673cae
FG
1656 writeq.push_back(write_item(seq, e, orig_len, osd_op));
1657 if (osd_op)
1658 osd_op->journal_trace.keyval("queue depth", writeq.size());
1659 }
1660}
1661
1662bool FileJournal::writeq_empty()
1663{
9f95a23c 1664 std::lock_guard locker{writeq_lock};
7c673cae
FG
1665 return writeq.empty();
1666}
1667
1668FileJournal::write_item &FileJournal::peek_write()
1669{
9f95a23c
TL
1670 ceph_assert(ceph_mutex_is_locked(write_lock));
1671 std::lock_guard locker{writeq_lock};
7c673cae
FG
1672 return writeq.front();
1673}
1674
1675void FileJournal::pop_write()
1676{
9f95a23c
TL
1677 ceph_assert(ceph_mutex_is_locked(write_lock));
1678 std::lock_guard locker{writeq_lock};
7c673cae
FG
1679 if (logger) {
1680 logger->dec(l_filestore_journal_queue_bytes, writeq.front().orig_len);
1681 logger->dec(l_filestore_journal_queue_ops, 1);
1682 }
1683 writeq.pop_front();
1684}
1685
1686void FileJournal::batch_pop_write(list<write_item> &items)
1687{
9f95a23c 1688 ceph_assert(ceph_mutex_is_locked(write_lock));
7c673cae 1689 {
9f95a23c 1690 std::lock_guard locker{writeq_lock};
7c673cae
FG
1691 writeq.swap(items);
1692 }
1693 for (auto &&i : items) {
1694 if (logger) {
1695 logger->dec(l_filestore_journal_queue_bytes, i.orig_len);
1696 logger->dec(l_filestore_journal_queue_ops, 1);
1697 }
1698 }
1699}
1700
1701void FileJournal::batch_unpop_write(list<write_item> &items)
1702{
9f95a23c 1703 ceph_assert(ceph_mutex_is_locked(write_lock));
7c673cae
FG
1704 for (auto &&i : items) {
1705 if (logger) {
1706 logger->inc(l_filestore_journal_queue_bytes, i.orig_len);
1707 logger->inc(l_filestore_journal_queue_ops, 1);
1708 }
1709 }
9f95a23c 1710 std::lock_guard locker{writeq_lock};
7c673cae
FG
1711 writeq.splice(writeq.begin(), items);
1712}
1713
1714void FileJournal::commit_start(uint64_t seq)
1715{
1716 dout(10) << "commit_start" << dendl;
1717
1718 // was full?
1719 switch (full_state) {
1720 case FULL_NOTFULL:
1721 break; // all good
1722
1723 case FULL_FULL:
1724 if (seq >= journaled_seq) {
1725 dout(1) << " FULL_FULL -> FULL_WAIT. commit_start on seq "
1726 << seq << " > journaled_seq " << journaled_seq
1727 << ", moving to FULL_WAIT."
1728 << dendl;
1729 full_state = FULL_WAIT;
1730 } else {
1731 dout(1) << "FULL_FULL commit_start on seq "
1732 << seq << " < journaled_seq " << journaled_seq
1733 << ", remaining in FULL_FULL"
1734 << dendl;
1735 }
1736 break;
1737
1738 case FULL_WAIT:
1739 dout(1) << " FULL_WAIT -> FULL_NOTFULL. journal now active, setting completion plug." << dendl;
1740 full_state = FULL_NOTFULL;
1741 plug_journal_completions = true;
1742 break;
1743 }
1744}
1745
1746/*
1747 *send discard command to joural block deivce
1748 */
1749void FileJournal::do_discard(int64_t offset, int64_t end)
1750{
11fdf7f2 1751 dout(10) << __func__ << " trim(" << offset << ", " << end << dendl;
7c673cae 1752
11fdf7f2 1753 offset = round_up_to(offset, block_size);
7c673cae
FG
1754 if (offset >= end)
1755 return;
11fdf7f2
TL
1756 end = round_up_to(end - block_size, block_size);
1757 ceph_assert(end >= offset);
1758 if (offset < end) {
1759 BlkDev blkdev(fd);
1760 if (blkdev.discard(offset, end - offset) < 0) {
7c673cae 1761 dout(1) << __func__ << "ioctl(BLKDISCARD) error:" << cpp_strerror(errno) << dendl;
11fdf7f2
TL
1762 }
1763 }
7c673cae
FG
1764}
1765
1766void FileJournal::committed_thru(uint64_t seq)
1767{
9f95a23c 1768 std::lock_guard locker{write_lock};
7c673cae
FG
1769
1770 auto released = throttle.flush(seq);
1771 if (logger) {
1772 logger->dec(l_filestore_journal_ops, released.first);
1773 logger->dec(l_filestore_journal_bytes, released.second);
1774 }
1775
1776 if (seq < last_committed_seq) {
1777 dout(5) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl;
11fdf7f2 1778 ceph_assert(seq >= last_committed_seq);
7c673cae
FG
1779 return;
1780 }
1781 if (seq == last_committed_seq) {
1782 dout(5) << "committed_thru " << seq << " == last_committed_seq " << last_committed_seq << dendl;
1783 return;
1784 }
1785
1786 dout(5) << "committed_thru " << seq << " (last_committed_seq " << last_committed_seq << ")" << dendl;
1787 last_committed_seq = seq;
1788
1789 // completions!
1790 {
9f95a23c 1791 std::lock_guard locker{finisher_lock};
7c673cae
FG
1792 queue_completions_thru(seq);
1793 if (plug_journal_completions && seq >= header.start_seq) {
1794 dout(10) << " removing completion plug, queuing completions thru journaled_seq " << journaled_seq << dendl;
1795 plug_journal_completions = false;
1796 queue_completions_thru(journaled_seq);
1797 }
1798 }
1799
1800 // adjust start pointer
1801 while (!journalq.empty() && journalq.front().first <= seq) {
1802 journalq.pop_front();
1803 }
1804
1805 int64_t old_start = header.start;
1806 if (!journalq.empty()) {
1807 header.start = journalq.front().second;
1808 header.start_seq = journalq.front().first;
1809 } else {
1810 header.start = write_pos;
1811 header.start_seq = seq + 1;
1812 }
1813
1814 if (discard) {
1815 dout(10) << __func__ << " will trim (" << old_start << ", " << header.start << ")" << dendl;
1816 if (old_start < header.start)
1817 do_discard(old_start, header.start - 1);
1818 else {
1819 do_discard(old_start, header.max_size - 1);
1820 do_discard(get_top(), header.start - 1);
1821 }
1822 }
1823
1824 must_write_header = true;
1825 print_header(header);
1826
1827 // committed but unjournaled items
1828 while (!writeq_empty() && peek_write().seq <= seq) {
1829 dout(15) << " dropping committed but unwritten seq " << peek_write().seq
1830 << " len " << peek_write().bl.length()
1831 << dendl;
1832 complete_write(1, peek_write().orig_len);
1833 pop_write();
1834 }
1835
9f95a23c 1836 commit_cond.notify_all();
7c673cae
FG
1837
1838 dout(10) << "committed_thru done" << dendl;
1839}
1840
1841
1842void FileJournal::complete_write(uint64_t ops, uint64_t bytes)
1843{
1844 dout(5) << __func__ << " finished " << ops << " ops and "
1845 << bytes << " bytes" << dendl;
1846}
1847
1848int FileJournal::make_writeable()
1849{
1850 dout(10) << __func__ << dendl;
1851 int r = set_throttle_params();
1852 if (r < 0)
1853 return r;
1854
1855 r = _open(true);
1856 if (r < 0)
1857 return r;
1858
1859 if (read_pos > 0)
1860 write_pos = read_pos;
1861 else
1862 write_pos = get_top();
1863 read_pos = 0;
1864
1865 must_write_header = true;
1866
1867 start_writer();
1868 return 0;
1869}
1870
1871int FileJournal::set_throttle_params()
1872{
1873 stringstream ss;
1874 bool valid = throttle.set_params(
1875 cct->_conf->journal_throttle_low_threshhold,
1876 cct->_conf->journal_throttle_high_threshhold,
1877 cct->_conf->filestore_expected_throughput_bytes,
1878 cct->_conf->journal_throttle_high_multiple,
1879 cct->_conf->journal_throttle_max_multiple,
1880 header.max_size - get_top(),
1881 &ss);
1882
1883 if (!valid) {
1884 derr << "tried to set invalid params: "
1885 << ss.str()
1886 << dendl;
1887 }
1888 return valid ? 0 : -EINVAL;
1889}
1890
1891const char** FileJournal::get_tracked_conf_keys() const
1892{
1893 static const char *KEYS[] = {
1894 "journal_throttle_low_threshhold",
1895 "journal_throttle_high_threshhold",
1896 "journal_throttle_high_multiple",
1897 "journal_throttle_max_multiple",
1898 "filestore_expected_throughput_bytes",
1899 NULL};
1900 return KEYS;
1901}
1902
1903void FileJournal::wrap_read_bl(
1904 off64_t pos,
1905 int64_t olen,
1906 bufferlist* bl,
1907 off64_t *out_pos
1908 ) const
1909{
1910 while (olen > 0) {
1911 while (pos >= header.max_size)
1912 pos = pos + get_top() - header.max_size;
1913
1914 int64_t len;
1915 if (pos + olen > header.max_size)
1916 len = header.max_size - pos; // partial
1917 else
1918 len = olen; // rest
1919
1920 int64_t actual = ::lseek64(fd, pos, SEEK_SET);
11fdf7f2 1921 ceph_assert(actual == pos);
7c673cae 1922
f67539c2 1923 bufferptr bp = ceph::buffer::create(len);
7c673cae
FG
1924 int r = safe_read_exact(fd, bp.c_str(), len);
1925 if (r) {
1926 derr << "FileJournal::wrap_read_bl: safe_read_exact " << pos << "~" << len << " returned "
11fdf7f2 1927 << cpp_strerror(r) << dendl;
7c673cae
FG
1928 ceph_abort();
1929 }
1930 bl->push_back(std::move(bp));
1931 pos += len;
1932 olen -= len;
1933 }
1934 if (pos >= header.max_size)
1935 pos = pos + get_top() - header.max_size;
1936 if (out_pos)
1937 *out_pos = pos;
1938}
1939
1940bool FileJournal::read_entry(
1941 bufferlist &bl,
1942 uint64_t &next_seq,
1943 bool *corrupt)
1944{
1945 if (corrupt)
1946 *corrupt = false;
1947 uint64_t seq = next_seq;
1948
1949 if (!read_pos) {
1950 dout(2) << "read_entry -- not readable" << dendl;
1951 return false;
1952 }
1953
1954 off64_t pos = read_pos;
1955 off64_t next_pos = pos;
1956 stringstream ss;
1957 read_entry_result result = do_read_entry(
1958 pos,
1959 &next_pos,
1960 &bl,
1961 &seq,
1962 &ss);
1963 if (result == SUCCESS) {
1964 journalq.push_back( pair<uint64_t,off64_t>(seq, pos));
1965 uint64_t amount_to_take =
1966 next_pos > pos ?
1967 next_pos - pos :
1968 (header.max_size - pos) + (next_pos - get_top());
1969 throttle.take(amount_to_take);
1970 throttle.register_throttle_seq(next_seq, amount_to_take);
1971 if (logger) {
1972 logger->inc(l_filestore_journal_ops, 1);
1973 logger->inc(l_filestore_journal_bytes, amount_to_take);
1974 }
1975 if (next_seq > seq) {
1976 return false;
1977 } else {
1978 read_pos = next_pos;
1979 next_seq = seq;
1980 if (seq > journaled_seq)
1981 journaled_seq = seq;
1982 return true;
1983 }
3efd9988
FG
1984 } else {
1985 derr << "do_read_entry(" << pos << "): " << ss.str() << dendl;
7c673cae
FG
1986 }
1987
1988 if (seq && seq < header.committed_up_to) {
1989 derr << "Unable to read past sequence " << seq
1990 << " but header indicates the journal has committed up through "
1991 << header.committed_up_to << ", journal is corrupt" << dendl;
1992 if (cct->_conf->journal_ignore_corruption) {
1993 if (corrupt)
1994 *corrupt = true;
1995 return false;
1996 } else {
1997 ceph_abort();
1998 }
1999 }
2000
7c673cae
FG
2001 dout(2) << "No further valid entries found, journal is most likely valid"
2002 << dendl;
2003 return false;
2004}
2005
2006FileJournal::read_entry_result FileJournal::do_read_entry(
2007 off64_t init_pos,
2008 off64_t *next_pos,
2009 bufferlist *bl,
2010 uint64_t *seq,
2011 ostream *ss,
2012 entry_header_t *_h) const
2013{
2014 off64_t cur_pos = init_pos;
2015 bufferlist _bl;
2016 if (!bl)
2017 bl = &_bl;
2018
2019 // header
2020 entry_header_t *h;
2021 bufferlist hbl;
2022 off64_t _next_pos;
2023 wrap_read_bl(cur_pos, sizeof(*h), &hbl, &_next_pos);
2024 h = reinterpret_cast<entry_header_t *>(hbl.c_str());
2025
2026 if (!h->check_magic(cur_pos, header.get_fsid64())) {
2027 dout(25) << "read_entry " << init_pos
2028 << " : bad header magic, end of journal" << dendl;
2029 if (ss)
2030 *ss << "bad header magic";
2031 if (next_pos)
2032 *next_pos = init_pos + (4<<10); // check 4k ahead
2033 return MAYBE_CORRUPT;
2034 }
2035 cur_pos = _next_pos;
2036
2037 // pad + body + pad
2038 if (h->pre_pad)
2039 cur_pos += h->pre_pad;
2040
2041 bl->clear();
2042 wrap_read_bl(cur_pos, h->len, bl, &cur_pos);
2043
2044 if (h->post_pad)
2045 cur_pos += h->post_pad;
2046
2047 // footer
2048 entry_header_t *f;
2049 bufferlist fbl;
2050 wrap_read_bl(cur_pos, sizeof(*f), &fbl, &cur_pos);
2051 f = reinterpret_cast<entry_header_t *>(fbl.c_str());
2052 if (memcmp(f, h, sizeof(*f))) {
2053 if (ss)
2054 *ss << "bad footer magic, partial entry";
2055 if (next_pos)
2056 *next_pos = cur_pos;
2057 return MAYBE_CORRUPT;
2058 }
2059
2060 if ((header.flags & header_t::FLAG_CRC) || // if explicitly enabled (new journal)
2061 h->crc32c != 0) { // newer entry in old journal
2062 uint32_t actual_crc = bl->crc32c(0);
2063 if (actual_crc != h->crc32c) {
2064 if (ss)
2065 *ss << "header crc (" << h->crc32c
2066 << ") doesn't match body crc (" << actual_crc << ")";
2067 if (next_pos)
2068 *next_pos = cur_pos;
2069 return MAYBE_CORRUPT;
2070 }
2071 }
2072
2073 // yay!
2074 dout(2) << "read_entry " << init_pos << " : seq " << h->seq
2075 << " " << h->len << " bytes"
2076 << dendl;
2077
2078 // ok!
2079 if (seq)
2080 *seq = h->seq;
2081
2082
2083 if (next_pos)
2084 *next_pos = cur_pos;
2085
2086 if (_h)
2087 *_h = *h;
2088
11fdf7f2 2089 ceph_assert(cur_pos % header.alignment == 0);
7c673cae
FG
2090 return SUCCESS;
2091}
2092
2093void FileJournal::reserve_throttle_and_backoff(uint64_t count)
2094{
2095 throttle.get(count);
2096}
2097
2098void FileJournal::get_header(
2099 uint64_t wanted_seq,
2100 off64_t *_pos,
2101 entry_header_t *h)
2102{
2103 off64_t pos = header.start;
2104 off64_t next_pos = pos;
2105 bufferlist bl;
2106 uint64_t seq = 0;
2107 dout(2) << __func__ << dendl;
2108 while (1) {
2109 bl.clear();
2110 pos = next_pos;
2111 read_entry_result result = do_read_entry(
2112 pos,
2113 &next_pos,
2114 &bl,
2115 &seq,
2116 0,
2117 h);
2118 if (result == FAILURE || result == MAYBE_CORRUPT)
2119 ceph_abort();
2120 if (seq == wanted_seq) {
2121 if (_pos)
2122 *_pos = pos;
2123 return;
2124 }
2125 }
2126 ceph_abort(); // not reachable
2127}
2128
2129void FileJournal::corrupt(
2130 int wfd,
2131 off64_t corrupt_at)
2132{
2133 dout(2) << __func__ << dendl;
2134 if (corrupt_at >= header.max_size)
2135 corrupt_at = corrupt_at + get_top() - header.max_size;
2136
2137 int64_t actual = ::lseek64(fd, corrupt_at, SEEK_SET);
11fdf7f2 2138 ceph_assert(actual == corrupt_at);
7c673cae
FG
2139
2140 char buf[10];
2141 int r = safe_read_exact(fd, buf, 1);
11fdf7f2 2142 ceph_assert(r == 0);
7c673cae
FG
2143
2144 actual = ::lseek64(wfd, corrupt_at, SEEK_SET);
11fdf7f2 2145 ceph_assert(actual == corrupt_at);
7c673cae
FG
2146
2147 buf[0]++;
2148 r = safe_write(wfd, buf, 1);
11fdf7f2 2149 ceph_assert(r == 0);
7c673cae
FG
2150}
2151
2152void FileJournal::corrupt_payload(
2153 int wfd,
2154 uint64_t seq)
2155{
2156 dout(2) << __func__ << dendl;
2157 off64_t pos = 0;
2158 entry_header_t h;
2159 get_header(seq, &pos, &h);
2160 off64_t corrupt_at =
2161 pos + sizeof(entry_header_t) + h.pre_pad;
2162 corrupt(wfd, corrupt_at);
2163}
2164
2165
2166void FileJournal::corrupt_footer_magic(
2167 int wfd,
2168 uint64_t seq)
2169{
2170 dout(2) << __func__ << dendl;
2171 off64_t pos = 0;
2172 entry_header_t h;
2173 get_header(seq, &pos, &h);
2174 off64_t corrupt_at =
2175 pos + sizeof(entry_header_t) + h.pre_pad +
2176 h.len + h.post_pad +
2177 (reinterpret_cast<char*>(&h.magic2) - reinterpret_cast<char*>(&h));
2178 corrupt(wfd, corrupt_at);
2179}
2180
2181
2182void FileJournal::corrupt_header_magic(
2183 int wfd,
2184 uint64_t seq)
2185{
2186 dout(2) << __func__ << dendl;
2187 off64_t pos = 0;
2188 entry_header_t h;
2189 get_header(seq, &pos, &h);
2190 off64_t corrupt_at =
2191 pos +
2192 (reinterpret_cast<char*>(&h.magic2) - reinterpret_cast<char*>(&h));
2193 corrupt(wfd, corrupt_at);
2194}
2195
2196off64_t FileJournal::get_journal_size_estimate()
2197{
2198 off64_t size, start = header.start;
2199 if (write_pos < start) {
2200 size = (max_size - start) + write_pos;
2201 } else {
2202 size = write_pos - start;
2203 }
2204 dout(20) << __func__ << " journal size=" << size << dendl;
2205 return size;
2206}
11fdf7f2
TL
2207
2208void FileJournal::get_devices(set<string> *ls)
2209{
2210 string dev_node;
2211 BlkDev blkdev(fd);
2212 if (int rc = blkdev.wholedisk(&dev_node); rc) {
2213 return;
2214 }
2215 get_raw_devices(dev_node, ls);
2216}
2217
2218void FileJournal::collect_metadata(map<string,string> *pm)
2219{
2220 BlkDev blkdev(fd);
2221 char partition_path[PATH_MAX];
2222 char dev_node[PATH_MAX];
2223 if (blkdev.partition(partition_path, PATH_MAX)) {
2224 (*pm)["backend_filestore_journal_partition_path"] = "unknown";
2225 } else {
2226 (*pm)["backend_filestore_journal_partition_path"] = string(partition_path);
2227 }
2228 if (blkdev.wholedisk(dev_node, PATH_MAX)) {
2229 (*pm)["backend_filestore_journal_dev_node"] = "unknown";
2230 } else {
2231 (*pm)["backend_filestore_journal_dev_node"] = string(dev_node);
2232 devname = dev_node;
2233 }
2234}