]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/filestore/FileJournal.cc
update sources to 12.2.7
[ceph.git] / ceph / src / os / filestore / FileJournal.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14 #include "acconfig.h"
15
16 #include "common/debug.h"
17 #include "common/errno.h"
18 #include "common/safe_io.h"
19 #include "FileJournal.h"
20 #include "include/color.h"
21 #include "common/perf_counters.h"
22 #include "FileStore.h"
23
24 #include "include/compat.h"
25
26 #include <fcntl.h>
27 #include <limits.h>
28 #include <sstream>
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <sys/types.h>
32 #include <sys/stat.h>
33 #include <sys/mount.h>
34
35 #include "common/blkdev.h"
36 #if defined(__linux__)
37 #include "common/linux_version.h"
38 #endif
39
40 #if defined(__FreeBSD__)
41 #define O_DSYNC O_SYNC
42 #endif
43
44 #define dout_context cct
45 #define dout_subsys ceph_subsys_journal
46 #undef dout_prefix
47 #define dout_prefix *_dout << "journal "
48
49 const static int64_t ONE_MEG(1 << 20);
50 const static int CEPH_DIRECTIO_ALIGNMENT(4096);
51
52
53 int FileJournal::_open(bool forwrite, bool create)
54 {
55 int flags, ret;
56
57 if (forwrite) {
58 flags = O_RDWR;
59 if (directio)
60 flags |= O_DIRECT | O_DSYNC;
61 } else {
62 flags = O_RDONLY;
63 }
64 if (create)
65 flags |= O_CREAT;
66
67 if (fd >= 0) {
68 if (TEMP_FAILURE_RETRY(::close(fd))) {
69 int err = errno;
70 derr << "FileJournal::_open: error closing old fd: "
71 << cpp_strerror(err) << dendl;
72 }
73 }
74 fd = TEMP_FAILURE_RETRY(::open(fn.c_str(), flags, 0644));
75 if (fd < 0) {
76 int err = errno;
77 dout(2) << "FileJournal::_open unable to open journal "
78 << fn << ": " << cpp_strerror(err) << dendl;
79 return -err;
80 }
81
82 struct stat st;
83 ret = ::fstat(fd, &st);
84 if (ret) {
85 ret = errno;
86 derr << "FileJournal::_open: unable to fstat journal: " << cpp_strerror(ret) << dendl;
87 ret = -ret;
88 goto out_fd;
89 }
90
91 if (S_ISBLK(st.st_mode)) {
92 ret = _open_block_device();
93 } else if (S_ISREG(st.st_mode)) {
94 if (aio && !force_aio) {
95 derr << "FileJournal::_open: disabling aio for non-block journal. Use "
96 << "journal_force_aio to force use of aio anyway" << dendl;
97 aio = false;
98 }
99 ret = _open_file(st.st_size, st.st_blksize, create);
100 } else {
101 derr << "FileJournal::_open: wrong journal file type: " << st.st_mode
102 << dendl;
103 ret = -EINVAL;
104 }
105
106 if (ret)
107 goto out_fd;
108
109 #ifdef HAVE_LIBAIO
110 if (aio) {
111 aio_ctx = 0;
112 ret = io_setup(128, &aio_ctx);
113 if (ret < 0) {
114 switch (ret) {
115 // Contrary to naive expectations -EAGIAN means ...
116 case -EAGAIN:
117 derr << "FileJournal::_open: user's limit of aio events exceeded. "
118 << "Try increasing /proc/sys/fs/aio-max-nr" << dendl;
119 break;
120 default:
121 derr << "FileJournal::_open: unable to setup io_context " << cpp_strerror(-ret) << dendl;
122 break;
123 }
124 goto out_fd;
125 }
126 }
127 #endif
128
129 /* We really want max_size to be a multiple of block_size. */
130 max_size -= max_size % block_size;
131
132 dout(1) << "_open " << fn << " fd " << fd
133 << ": " << max_size
134 << " bytes, block size " << block_size
135 << " bytes, directio = " << directio
136 << ", aio = " << aio
137 << dendl;
138 return 0;
139
140 out_fd:
141 VOID_TEMP_FAILURE_RETRY(::close(fd));
142 fd = -1;
143 return ret;
144 }
145
146 int FileJournal::_open_block_device()
147 {
148 int64_t bdev_sz = 0;
149 int ret = get_block_device_size(fd, &bdev_sz);
150 if (ret) {
151 dout(0) << __func__ << ": failed to read block device size." << dendl;
152 return -EIO;
153 }
154
155 /* Check for bdev_sz too small */
156 if (bdev_sz < ONE_MEG) {
157 dout(0) << __func__ << ": your block device must be at least "
158 << ONE_MEG << " bytes to be used for a Ceph journal." << dendl;
159 return -EINVAL;
160 }
161
162 dout(10) << __func__ << ": ignoring osd journal size. "
163 << "We'll use the entire block device (size: " << bdev_sz << ")"
164 << dendl;
165 max_size = bdev_sz;
166
167 block_size = cct->_conf->journal_block_size;
168
169 if (cct->_conf->journal_discard) {
170 discard = block_device_support_discard(fn.c_str());
171 dout(10) << fn << " support discard: " << (int)discard << dendl;
172 }
173
174 return 0;
175 }
176
177 int FileJournal::_open_file(int64_t oldsize, blksize_t blksize,
178 bool create)
179 {
180 int ret;
181 int64_t conf_journal_sz(cct->_conf->osd_journal_size);
182 conf_journal_sz <<= 20;
183
184 if ((cct->_conf->osd_journal_size == 0) && (oldsize < ONE_MEG)) {
185 derr << "I'm sorry, I don't know how large of a journal to create."
186 << "Please specify a block device to use as the journal OR "
187 << "set osd_journal_size in your ceph.conf" << dendl;
188 return -EINVAL;
189 }
190
191 if (create && (oldsize < conf_journal_sz)) {
192 uint64_t newsize(conf_journal_sz);
193 dout(10) << __func__ << " _open extending to " << newsize << " bytes" << dendl;
194 ret = ::ftruncate(fd, newsize);
195 if (ret < 0) {
196 int err = errno;
197 derr << "FileJournal::_open_file : unable to extend journal to "
198 << newsize << " bytes: " << cpp_strerror(err) << dendl;
199 return -err;
200 }
201 ret = ceph_posix_fallocate(fd, 0, newsize);
202 if (ret) {
203 derr << "FileJournal::_open_file : unable to preallocation journal to "
204 << newsize << " bytes: " << cpp_strerror(ret) << dendl;
205 return -ret;
206 }
207 max_size = newsize;
208 }
209 else {
210 max_size = oldsize;
211 }
212 block_size = cct->_conf->journal_block_size;
213
214 if (create && cct->_conf->journal_zero_on_create) {
215 derr << "FileJournal::_open_file : zeroing journal" << dendl;
216 uint64_t write_size = 1 << 20;
217 char *buf;
218 ret = ::posix_memalign((void **)&buf, block_size, write_size);
219 if (ret != 0) {
220 return -ret;
221 }
222 memset(static_cast<void*>(buf), 0, write_size);
223 uint64_t i = 0;
224 for (; (i + write_size) <= (uint64_t)max_size; i += write_size) {
225 ret = ::pwrite(fd, static_cast<void*>(buf), write_size, i);
226 if (ret < 0) {
227 free(buf);
228 return -errno;
229 }
230 }
231 if (i < (uint64_t)max_size) {
232 ret = ::pwrite(fd, static_cast<void*>(buf), max_size - i, i);
233 if (ret < 0) {
234 free(buf);
235 return -errno;
236 }
237 }
238 free(buf);
239 }
240
241
242 dout(10) << "_open journal is not a block device, NOT checking disk "
243 << "write cache on '" << fn << "'" << dendl;
244
245 return 0;
246 }
247
248 // This can not be used on an active journal
249 int FileJournal::check()
250 {
251 int ret;
252
253 assert(fd == -1);
254 ret = _open(false, false);
255 if (ret)
256 return ret;
257
258 ret = read_header(&header);
259 if (ret < 0)
260 goto done;
261
262 if (header.fsid != fsid) {
263 derr << "check: ondisk fsid " << header.fsid << " doesn't match expected " << fsid
264 << ", invalid (someone else's?) journal" << dendl;
265 ret = -EINVAL;
266 goto done;
267 }
268
269 dout(1) << "check: header looks ok" << dendl;
270 ret = 0;
271
272 done:
273 close();
274 return ret;
275 }
276
277
278 int FileJournal::create()
279 {
280 void *buf = 0;
281 int64_t needed_space;
282 int ret;
283 buffer::ptr bp;
284 dout(2) << "create " << fn << " fsid " << fsid << dendl;
285
286 ret = _open(true, true);
287 if (ret)
288 goto done;
289
290 // write empty header
291 header = header_t();
292 header.flags = header_t::FLAG_CRC; // enable crcs on any new journal.
293 header.fsid = fsid;
294 header.max_size = max_size;
295 header.block_size = block_size;
296 if (cct->_conf->journal_block_align || directio)
297 header.alignment = block_size;
298 else
299 header.alignment = 16; // at least stay word aligned on 64bit machines...
300
301 header.start = get_top();
302 header.start_seq = 0;
303
304 print_header(header);
305
306 // static zeroed buffer for alignment padding
307 delete [] zero_buf;
308 zero_buf = new char[header.alignment];
309 memset(zero_buf, 0, header.alignment);
310
311 bp = prepare_header();
312 if (TEMP_FAILURE_RETRY(::pwrite(fd, bp.c_str(), bp.length(), 0)) < 0) {
313 ret = -errno;
314 derr << "FileJournal::create : create write header error "
315 << cpp_strerror(ret) << dendl;
316 goto close_fd;
317 }
318
319 // zero first little bit, too.
320 ret = posix_memalign(&buf, block_size, block_size);
321 if (ret) {
322 ret = -ret;
323 derr << "FileJournal::create: failed to allocate " << block_size
324 << " bytes of memory: " << cpp_strerror(ret) << dendl;
325 goto close_fd;
326 }
327 memset(buf, 0, block_size);
328 if (TEMP_FAILURE_RETRY(::pwrite(fd, buf, block_size, get_top())) < 0) {
329 ret = -errno;
330 derr << "FileJournal::create: error zeroing first " << block_size
331 << " bytes " << cpp_strerror(ret) << dendl;
332 goto free_buf;
333 }
334
335 needed_space = ((int64_t)cct->_conf->osd_max_write_size) << 20;
336 needed_space += (2 * sizeof(entry_header_t)) + get_top();
337 if (header.max_size - header.start < needed_space) {
338 derr << "FileJournal::create: OSD journal is not large enough to hold "
339 << "osd_max_write_size bytes!" << dendl;
340 ret = -ENOSPC;
341 goto free_buf;
342 }
343
344 dout(2) << "create done" << dendl;
345 ret = 0;
346
347 free_buf:
348 free(buf);
349 buf = 0;
350 close_fd:
351 if (TEMP_FAILURE_RETRY(::close(fd)) < 0) {
352 ret = -errno;
353 derr << "FileJournal::create: error closing fd: " << cpp_strerror(ret)
354 << dendl;
355 }
356 done:
357 fd = -1;
358 return ret;
359 }
360
361 // This can not be used on an active journal
362 int FileJournal::peek_fsid(uuid_d& fsid)
363 {
364 assert(fd == -1);
365 int r = _open(false, false);
366 if (r)
367 return r;
368 r = read_header(&header);
369 if (r < 0)
370 goto out;
371 fsid = header.fsid;
372 out:
373 close();
374 return r;
375 }
376
377 int FileJournal::open(uint64_t fs_op_seq)
378 {
379 dout(2) << "open " << fn << " fsid " << fsid << " fs_op_seq " << fs_op_seq << dendl;
380
381 uint64_t next_seq = fs_op_seq + 1;
382 uint64_t seq = -1;
383
384 int err = _open(false);
385 if (err)
386 return err;
387
388 // assume writeable, unless...
389 read_pos = 0;
390 write_pos = get_top();
391
392 // read header?
393 err = read_header(&header);
394 if (err < 0)
395 goto out;
396
397 // static zeroed buffer for alignment padding
398 delete [] zero_buf;
399 zero_buf = new char[header.alignment];
400 memset(zero_buf, 0, header.alignment);
401
402 dout(10) << "open header.fsid = " << header.fsid
403 //<< " vs expected fsid = " << fsid
404 << dendl;
405 if (header.fsid != fsid) {
406 derr << "FileJournal::open: ondisk fsid " << header.fsid << " doesn't match expected " << fsid
407 << ", invalid (someone else's?) journal" << dendl;
408 err = -EINVAL;
409 goto out;
410 }
411 if (header.max_size > max_size) {
412 dout(2) << "open journal size " << header.max_size << " > current " << max_size << dendl;
413 err = -EINVAL;
414 goto out;
415 }
416 if (header.block_size != block_size) {
417 dout(2) << "open journal block size " << header.block_size << " != current " << block_size << dendl;
418 err = -EINVAL;
419 goto out;
420 }
421 if (header.max_size % header.block_size) {
422 dout(2) << "open journal max size " << header.max_size
423 << " not a multiple of block size " << header.block_size << dendl;
424 err = -EINVAL;
425 goto out;
426 }
427 if (header.alignment != block_size && directio) {
428 dout(0) << "open journal alignment " << header.alignment << " does not match block size "
429 << block_size << " (required for direct_io journal mode)" << dendl;
430 err = -EINVAL;
431 goto out;
432 }
433 if ((header.alignment % CEPH_DIRECTIO_ALIGNMENT) && directio) {
434 dout(0) << "open journal alignment " << header.alignment
435 << " is not multiple of minimum directio alignment "
436 << CEPH_DIRECTIO_ALIGNMENT << " (required for direct_io journal mode)"
437 << dendl;
438 err = -EINVAL;
439 goto out;
440 }
441
442 // looks like a valid header.
443 write_pos = 0; // not writeable yet
444
445 journaled_seq = header.committed_up_to;
446
447 // find next entry
448 read_pos = header.start;
449 seq = header.start_seq;
450
451 while (1) {
452 bufferlist bl;
453 off64_t old_pos = read_pos;
454 if (!read_entry(bl, seq)) {
455 dout(10) << "open reached end of journal." << dendl;
456 break;
457 }
458 if (seq > next_seq) {
459 dout(10) << "open entry " << seq << " len " << bl.length() << " > next_seq " << next_seq
460 << ", ignoring journal contents"
461 << dendl;
462 read_pos = -1;
463 last_committed_seq = 0;
464 return 0;
465 }
466 if (seq == next_seq) {
467 dout(10) << "open reached seq " << seq << dendl;
468 read_pos = old_pos;
469 break;
470 }
471 seq++; // next event should follow.
472 }
473
474 return 0;
475 out:
476 close();
477 return err;
478 }
479
480 void FileJournal::_close(int fd) const
481 {
482 VOID_TEMP_FAILURE_RETRY(::close(fd));
483 }
484
485 void FileJournal::close()
486 {
487 dout(1) << "close " << fn << dendl;
488
489 // stop writer thread
490 stop_writer();
491
492 // close
493 assert(writeq_empty());
494 assert(!must_write_header);
495 assert(fd >= 0);
496 _close(fd);
497 fd = -1;
498 }
499
500
501 int FileJournal::dump(ostream& out)
502 {
503 return _dump(out, false);
504 }
505
506 int FileJournal::simple_dump(ostream& out)
507 {
508 return _dump(out, true);
509 }
510
511 int FileJournal::_dump(ostream& out, bool simple)
512 {
513 JSONFormatter f(true);
514 int ret = _fdump(f, simple);
515 f.flush(out);
516 return ret;
517 }
518
519 int FileJournal::_fdump(Formatter &f, bool simple)
520 {
521 dout(10) << "_fdump" << dendl;
522
523 assert(fd == -1);
524 int err = _open(false, false);
525 if (err)
526 return err;
527
528 err = read_header(&header);
529 if (err < 0) {
530 close();
531 return err;
532 }
533
534 off64_t next_pos = header.start;
535
536 f.open_object_section("journal");
537
538 f.open_object_section("header");
539 f.dump_unsigned("flags", header.flags);
540 ostringstream os;
541 os << header.fsid;
542 f.dump_string("fsid", os.str());
543 f.dump_unsigned("block_size", header.block_size);
544 f.dump_unsigned("alignment", header.alignment);
545 f.dump_int("max_size", header.max_size);
546 f.dump_int("start", header.start);
547 f.dump_unsigned("committed_up_to", header.committed_up_to);
548 f.dump_unsigned("start_seq", header.start_seq);
549 f.close_section();
550
551 f.open_array_section("entries");
552 uint64_t seq = header.start_seq;
553 while (1) {
554 bufferlist bl;
555 off64_t pos = next_pos;
556
557 if (!pos) {
558 dout(2) << "_dump -- not readable" << dendl;
559 err = -EINVAL;
560 break;
561 }
562 stringstream ss;
563 read_entry_result result = do_read_entry(
564 pos,
565 &next_pos,
566 &bl,
567 &seq,
568 &ss);
569 if (result != SUCCESS) {
570 if (seq < header.committed_up_to) {
571 dout(2) << "Unable to read past sequence " << seq
572 << " but header indicates the journal has committed up through "
573 << header.committed_up_to << ", journal is corrupt" << dendl;
574 err = -EINVAL;
575 }
576 dout(25) << ss.str() << dendl;
577 dout(25) << "No further valid entries found, journal is most likely valid"
578 << dendl;
579 break;
580 }
581
582 f.open_object_section("entry");
583 f.dump_unsigned("offset", pos);
584 f.dump_unsigned("seq", seq);
585 if (simple) {
586 f.dump_unsigned("bl.length", bl.length());
587 } else {
588 f.open_array_section("transactions");
589 bufferlist::iterator p = bl.begin();
590 int trans_num = 0;
591 while (!p.end()) {
592 ObjectStore::Transaction t(p);
593 f.open_object_section("transaction");
594 f.dump_unsigned("trans_num", trans_num);
595 t.dump(&f);
596 f.close_section();
597 trans_num++;
598 }
599 f.close_section();
600 }
601 f.close_section();
602 }
603
604 f.close_section();
605 f.close_section();
606 dout(10) << "dump finish" << dendl;
607
608 close();
609 return err;
610 }
611
612
613 void FileJournal::start_writer()
614 {
615 write_stop = false;
616 aio_stop = false;
617 write_thread.create("journal_write");
618 #ifdef HAVE_LIBAIO
619 if (aio)
620 write_finish_thread.create("journal_wrt_fin");
621 #endif
622 }
623
624 void FileJournal::stop_writer()
625 {
626 // Do nothing if writer already stopped or never started
627 if (!write_stop)
628 {
629 {
630 Mutex::Locker l(write_lock);
631 Mutex::Locker p(writeq_lock);
632 write_stop = true;
633 writeq_cond.Signal();
634 // Doesn't hurt to signal commit_cond in case thread is waiting there
635 // and caller didn't use committed_thru() first.
636 commit_cond.Signal();
637 }
638 write_thread.join();
639
640 // write journal header now so that we have less to replay on remount
641 write_header_sync();
642 }
643
644 #ifdef HAVE_LIBAIO
645 // stop aio completeion thread *after* writer thread has stopped
646 // and has submitted all of its io
647 if (aio && !aio_stop) {
648 aio_lock.Lock();
649 aio_stop = true;
650 aio_cond.Signal();
651 write_finish_cond.Signal();
652 aio_lock.Unlock();
653 write_finish_thread.join();
654 }
655 #endif
656 }
657
658
659
660 void FileJournal::print_header(const header_t &header) const
661 {
662 dout(10) << "header: block_size " << header.block_size
663 << " alignment " << header.alignment
664 << " max_size " << header.max_size
665 << dendl;
666 dout(10) << "header: start " << header.start << dendl;
667 dout(10) << " write_pos " << write_pos << dendl;
668 }
669
670 int FileJournal::read_header(header_t *hdr) const
671 {
672 dout(10) << "read_header" << dendl;
673 bufferlist bl;
674
675 buffer::ptr bp = buffer::create_page_aligned(block_size);
676 char* bpdata = bp.c_str();
677 int r = ::pread(fd, bpdata, bp.length(), 0);
678
679 if (r < 0) {
680 int err = errno;
681 dout(0) << "read_header got " << cpp_strerror(err) << dendl;
682 return -err;
683 }
684
685 // don't use bp.zero() here, because it also invalidates
686 // crc cache (which is not yet populated anyway)
687 if (bp.length() != (size_t)r) {
688 // r will be always less or equal than bp.length
689 bpdata += r;
690 memset(bpdata, 0, bp.length() - r);
691 }
692
693 bl.push_back(std::move(bp));
694
695 try {
696 bufferlist::iterator p = bl.begin();
697 ::decode(*hdr, p);
698 }
699 catch (buffer::error& e) {
700 derr << "read_header error decoding journal header" << dendl;
701 return -EINVAL;
702 }
703
704
705 /*
706 * Unfortunately we weren't initializing the flags field for new
707 * journals! Aie. This is safe(ish) now that we have only one
708 * flag. Probably around when we add the next flag we need to
709 * remove this or else this (eventually old) code will clobber newer
710 * code's flags.
711 */
712 if (hdr->flags > 3) {
713 derr << "read_header appears to have gibberish flags; assuming 0" << dendl;
714 hdr->flags = 0;
715 }
716
717 print_header(*hdr);
718
719 return 0;
720 }
721
722 bufferptr FileJournal::prepare_header()
723 {
724 bufferlist bl;
725 {
726 Mutex::Locker l(finisher_lock);
727 header.committed_up_to = journaled_seq;
728 }
729 ::encode(header, bl);
730 bufferptr bp = buffer::create_page_aligned(get_top());
731 // don't use bp.zero() here, because it also invalidates
732 // crc cache (which is not yet populated anyway)
733 char* data = bp.c_str();
734 memcpy(data, bl.c_str(), bl.length());
735 data += bl.length();
736 memset(data, 0, bp.length()-bl.length());
737 return bp;
738 }
739
740 void FileJournal::write_header_sync()
741 {
742 Mutex::Locker locker(write_lock);
743 must_write_header = true;
744 bufferlist bl;
745 do_write(bl);
746 dout(20) << __func__ << " finish" << dendl;
747 }
748
749 int FileJournal::check_for_full(uint64_t seq, off64_t pos, off64_t size)
750 {
751 // already full?
752 if (full_state != FULL_NOTFULL)
753 return -ENOSPC;
754
755 // take 1 byte off so that we only get pos == header.start on EMPTY, never on FULL.
756 off64_t room;
757 if (pos >= header.start)
758 room = (header.max_size - pos) + (header.start - get_top()) - 1;
759 else
760 room = header.start - pos - 1;
761 dout(10) << "room " << room << " max_size " << max_size << " pos " << pos << " header.start " << header.start
762 << " top " << get_top() << dendl;
763
764 if (do_sync_cond) {
765 if (room >= (header.max_size >> 1) &&
766 room - size < (header.max_size >> 1)) {
767 dout(10) << " passing half full mark, triggering commit" << dendl;
768 do_sync_cond->SloppySignal(); // initiate a real commit so we can trim
769 }
770 }
771
772 if (room >= size) {
773 dout(10) << "check_for_full at " << pos << " : " << size << " < " << room << dendl;
774 if (pos + size > header.max_size)
775 must_write_header = true;
776 return 0;
777 }
778
779 // full
780 dout(1) << "check_for_full at " << pos << " : JOURNAL FULL "
781 << pos << " >= " << room
782 << " (max_size " << header.max_size << " start " << header.start << ")"
783 << dendl;
784
785 off64_t max = header.max_size - get_top();
786 if (size > max)
787 dout(0) << "JOURNAL TOO SMALL: continuing, but slow: item " << size << " > journal " << max << " (usable)" << dendl;
788
789 return -ENOSPC;
790 }
791
792 int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytes)
793 {
794 // gather queued writes
795 off64_t queue_pos = write_pos;
796
797 int eleft = cct->_conf->journal_max_write_entries;
798 unsigned bmax = cct->_conf->journal_max_write_bytes;
799
800 if (full_state != FULL_NOTFULL)
801 return -ENOSPC;
802
803 while (!writeq_empty()) {
804 list<write_item> items;
805 batch_pop_write(items);
806 list<write_item>::iterator it = items.begin();
807 while (it != items.end()) {
808 uint64_t bytes = it->bl.length();
809 int r = prepare_single_write(*it, bl, queue_pos, orig_ops, orig_bytes);
810 if (r == 0) { // prepare ok, delete it
811 items.erase(it++);
812 #ifdef HAVE_LIBAIO
813 {
814 Mutex::Locker locker(aio_lock);
815 assert(aio_write_queue_ops > 0);
816 aio_write_queue_ops--;
817 assert(aio_write_queue_bytes >= bytes);
818 aio_write_queue_bytes -= bytes;
819 }
820 #else
821 (void)bytes;
822 #endif
823 }
824 if (r == -ENOSPC) {
825 // the journal maybe full, insert the left item to writeq
826 batch_unpop_write(items);
827 if (orig_ops)
828 goto out; // commit what we have
829
830 if (logger)
831 logger->inc(l_filestore_journal_full);
832
833 if (wait_on_full) {
834 dout(20) << "prepare_multi_write full on first entry, need to wait" << dendl;
835 } else {
836 dout(20) << "prepare_multi_write full on first entry, restarting journal" << dendl;
837
838 // throw out what we have so far
839 full_state = FULL_FULL;
840 while (!writeq_empty()) {
841 complete_write(1, peek_write().orig_len);
842 pop_write();
843 }
844 print_header(header);
845 }
846
847 return -ENOSPC; // hrm, full on first op
848 }
849 if (eleft) {
850 if (--eleft == 0) {
851 dout(20) << "prepare_multi_write hit max events per write "
852 << cct->_conf->journal_max_write_entries << dendl;
853 batch_unpop_write(items);
854 goto out;
855 }
856 }
857 if (bmax) {
858 if (bl.length() >= bmax) {
859 dout(20) << "prepare_multi_write hit max write size "
860 << cct->_conf->journal_max_write_bytes << dendl;
861 batch_unpop_write(items);
862 goto out;
863 }
864 }
865 }
866 }
867
868 out:
869 dout(20) << "prepare_multi_write queue_pos now " << queue_pos << dendl;
870 assert((write_pos + bl.length() == queue_pos) ||
871 (write_pos + bl.length() - header.max_size + get_top() == queue_pos));
872 return 0;
873 }
874
875 /*
876 void FileJournal::queue_write_fin(uint64_t seq, Context *fin)
877 {
878 writing_seq.push_back(seq);
879 if (!waiting_for_notfull.empty()) {
880 // make sure previously unjournaled stuff waiting for UNFULL triggers
881 // _before_ newly journaled stuff does
882 dout(10) << "queue_write_fin will defer seq " << seq << " callback " << fin
883 << " until after UNFULL" << dendl;
884 C_Gather *g = new C_Gather(writeq.front().fin);
885 writing_fin.push_back(g->new_sub());
886 waiting_for_notfull.push_back(g->new_sub());
887 } else {
888 writing_fin.push_back(writeq.front().fin);
889 dout(20) << "queue_write_fin seq " << seq << " callback " << fin << dendl;
890 }
891 }
892 */
893
894 void FileJournal::queue_completions_thru(uint64_t seq)
895 {
896 assert(finisher_lock.is_locked());
897 utime_t now = ceph_clock_now();
898 list<completion_item> items;
899 batch_pop_completions(items);
900 list<completion_item>::iterator it = items.begin();
901 while (it != items.end()) {
902 completion_item& next = *it;
903 if (next.seq > seq)
904 break;
905 utime_t lat = now;
906 lat -= next.start;
907 dout(10) << "queue_completions_thru seq " << seq
908 << " queueing seq " << next.seq
909 << " " << next.finish
910 << " lat " << lat << dendl;
911 if (logger) {
912 logger->tinc(l_filestore_journal_latency, lat);
913 }
914 if (next.finish)
915 finisher->queue(next.finish);
916 if (next.tracked_op) {
917 next.tracked_op->mark_event("journaled_completion_queued");
918 next.tracked_op->journal_trace.event("queued completion");
919 next.tracked_op->journal_trace.keyval("completed through", seq);
920 }
921 items.erase(it++);
922 }
923 batch_unpop_completions(items);
924 finisher_cond.Signal();
925 }
926
927
928 int FileJournal::prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes)
929 {
930 uint64_t seq = next_write.seq;
931 bufferlist &ebl = next_write.bl;
932 off64_t size = ebl.length();
933
934 int r = check_for_full(seq, queue_pos, size);
935 if (r < 0)
936 return r; // ENOSPC or EAGAIN
937
938 uint32_t orig_len = next_write.orig_len;
939 orig_bytes += orig_len;
940 orig_ops++;
941
942 // add to write buffer
943 dout(15) << "prepare_single_write " << orig_ops << " will write " << queue_pos << " : seq " << seq
944 << " len " << orig_len << " -> " << size << dendl;
945
946 unsigned seq_offset = offsetof(entry_header_t, seq);
947 unsigned magic1_offset = offsetof(entry_header_t, magic1);
948 unsigned magic2_offset = offsetof(entry_header_t, magic2);
949
950 bufferptr headerptr = ebl.buffers().front();
951 uint64_t _seq = seq;
952 uint64_t _queue_pos = queue_pos;
953 uint64_t magic2 = entry_header_t::make_magic(seq, orig_len, header.get_fsid64());
954 headerptr.copy_in(seq_offset, sizeof(uint64_t), (char *)&_seq);
955 headerptr.copy_in(magic1_offset, sizeof(uint64_t), (char *)&_queue_pos);
956 headerptr.copy_in(magic2_offset, sizeof(uint64_t), (char *)&magic2);
957
958 bufferptr footerptr = ebl.buffers().back();
959 unsigned post_offset = footerptr.length() - sizeof(entry_header_t);
960 footerptr.copy_in(post_offset + seq_offset, sizeof(uint64_t), (char *)&_seq);
961 footerptr.copy_in(post_offset + magic1_offset, sizeof(uint64_t), (char *)&_queue_pos);
962 footerptr.copy_in(post_offset + magic2_offset, sizeof(uint64_t), (char *)&magic2);
963
964 bl.claim_append(ebl);
965 if (next_write.tracked_op) {
966 next_write.tracked_op->mark_event("write_thread_in_journal_buffer");
967 next_write.tracked_op->journal_trace.event("prepare_single_write");
968 }
969
970 journalq.push_back(pair<uint64_t,off64_t>(seq, queue_pos));
971 writing_seq = seq;
972
973 queue_pos += size;
974 if (queue_pos >= header.max_size)
975 queue_pos = queue_pos + get_top() - header.max_size;
976
977 return 0;
978 }
979
980 void FileJournal::check_align(off64_t pos, bufferlist& bl)
981 {
982 // make sure list segments are page aligned
983 if (directio && !bl.is_aligned_size_and_memory(block_size, CEPH_DIRECTIO_ALIGNMENT)) {
984 assert((bl.length() & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0);
985 assert((pos & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0);
986 assert(0 == "bl was not aligned");
987 }
988 }
989
990 int FileJournal::write_bl(off64_t& pos, bufferlist& bl)
991 {
992 int ret;
993
994 off64_t spos = ::lseek64(fd, pos, SEEK_SET);
995 if (spos < 0) {
996 ret = -errno;
997 derr << "FileJournal::write_bl : lseek64 failed " << cpp_strerror(ret) << dendl;
998 return ret;
999 }
1000 ret = bl.write_fd(fd);
1001 if (ret) {
1002 derr << "FileJournal::write_bl : write_fd failed: " << cpp_strerror(ret) << dendl;
1003 return ret;
1004 }
1005 pos += bl.length();
1006 if (pos == header.max_size)
1007 pos = get_top();
1008 return 0;
1009 }
1010
1011 void FileJournal::do_write(bufferlist& bl)
1012 {
1013 // nothing to do?
1014 if (bl.length() == 0 && !must_write_header)
1015 return;
1016
1017 buffer::ptr hbp;
1018 if (cct->_conf->journal_write_header_frequency &&
1019 (((++journaled_since_start) %
1020 cct->_conf->journal_write_header_frequency) == 0)) {
1021 must_write_header = true;
1022 }
1023
1024 if (must_write_header) {
1025 must_write_header = false;
1026 hbp = prepare_header();
1027 }
1028
1029 dout(15) << "do_write writing " << write_pos << "~" << bl.length()
1030 << (hbp.length() ? " + header":"")
1031 << dendl;
1032
1033 utime_t from = ceph_clock_now();
1034
1035 // entry
1036 off64_t pos = write_pos;
1037
1038 // Adjust write_pos
1039 write_pos += bl.length();
1040 if (write_pos >= header.max_size)
1041 write_pos = write_pos - header.max_size + get_top();
1042
1043 write_lock.Unlock();
1044
1045 // split?
1046 off64_t split = 0;
1047 if (pos + bl.length() > header.max_size) {
1048 bufferlist first, second;
1049 split = header.max_size - pos;
1050 first.substr_of(bl, 0, split);
1051 second.substr_of(bl, split, bl.length() - split);
1052 assert(first.length() + second.length() == bl.length());
1053 dout(10) << "do_write wrapping, first bit at " << pos << " len " << first.length()
1054 << " second bit len " << second.length() << " (orig len " << bl.length() << ")" << dendl;
1055
1056 //Save pos to write first piece second
1057 off64_t first_pos = pos;
1058 off64_t orig_pos;
1059 pos = get_top();
1060 // header too?
1061 if (hbp.length()) {
1062 // be sneaky: include the header in the second fragment
1063 second.push_front(hbp);
1064 pos = 0; // we included the header
1065 }
1066 // Write the second portion first possible with the header, so
1067 // do_read_entry() won't even get a valid entry_header_t if there
1068 // is a crash between the two writes.
1069 orig_pos = pos;
1070 if (write_bl(pos, second)) {
1071 derr << "FileJournal::do_write: write_bl(pos=" << orig_pos
1072 << ") failed" << dendl;
1073 check_align(pos, second);
1074 ceph_abort();
1075 }
1076 orig_pos = first_pos;
1077 if (write_bl(first_pos, first)) {
1078 derr << "FileJournal::do_write: write_bl(pos=" << orig_pos
1079 << ") failed" << dendl;
1080 check_align(first_pos, first);
1081 ceph_abort();
1082 }
1083 assert(first_pos == get_top());
1084 } else {
1085 // header too?
1086 if (hbp.length()) {
1087 if (TEMP_FAILURE_RETRY(::pwrite(fd, hbp.c_str(), hbp.length(), 0)) < 0) {
1088 int err = errno;
1089 derr << "FileJournal::do_write: pwrite(fd=" << fd
1090 << ", hbp.length=" << hbp.length() << ") failed :"
1091 << cpp_strerror(err) << dendl;
1092 ceph_abort();
1093 }
1094 }
1095
1096 if (write_bl(pos, bl)) {
1097 derr << "FileJournal::do_write: write_bl(pos=" << pos
1098 << ") failed" << dendl;
1099 check_align(pos, bl);
1100 ceph_abort();
1101 }
1102 }
1103
1104 if (!directio) {
1105 dout(20) << "do_write fsync" << dendl;
1106
1107 /*
1108 * We'd really love to have a fsync_range or fdatasync_range and do a:
1109 *
1110 * if (split) {
1111 * ::fsync_range(fd, header.max_size - split, split)l
1112 * ::fsync_range(fd, get_top(), bl.length() - split);
1113 * else
1114 * ::fsync_range(fd, write_pos, bl.length())
1115 *
1116 * NetBSD and AIX apparently have it, and adding it to Linux wouldn't be
1117 * too hard given all the underlying infrastructure already exist.
1118 *
1119 * NOTE: using sync_file_range here would not be safe as it does not
1120 * flush disk caches or commits any sort of metadata.
1121 */
1122 int ret = 0;
1123 #if defined(DARWIN) || defined(__FreeBSD__)
1124 ret = ::fsync(fd);
1125 #else
1126 ret = ::fdatasync(fd);
1127 #endif
1128 if (ret < 0) {
1129 derr << __func__ << " fsync/fdatasync failed: " << cpp_strerror(errno) << dendl;
1130 ceph_abort();
1131 }
1132 #ifdef HAVE_POSIX_FADVISE
1133 if (cct->_conf->filestore_fadvise)
1134 posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED);
1135 #endif
1136 }
1137
1138 utime_t lat = ceph_clock_now() - from;
1139 dout(20) << "do_write latency " << lat << dendl;
1140
1141 write_lock.Lock();
1142
1143 assert(write_pos == pos);
1144 assert(write_pos % header.alignment == 0);
1145
1146 {
1147 Mutex::Locker locker(finisher_lock);
1148 journaled_seq = writing_seq;
1149
1150 // kick finisher?
1151 // only if we haven't filled up recently!
1152 if (full_state != FULL_NOTFULL) {
1153 dout(10) << "do_write NOT queueing finisher seq " << journaled_seq
1154 << ", full_commit_seq|full_restart_seq" << dendl;
1155 } else {
1156 if (plug_journal_completions) {
1157 dout(20) << "do_write NOT queueing finishers through seq " << journaled_seq
1158 << " due to completion plug" << dendl;
1159 } else {
1160 dout(20) << "do_write queueing finishers through seq " << journaled_seq << dendl;
1161 queue_completions_thru(journaled_seq);
1162 }
1163 }
1164 }
1165 }
1166
1167 void FileJournal::flush()
1168 {
1169 dout(10) << "waiting for completions to empty" << dendl;
1170 {
1171 Mutex::Locker l(finisher_lock);
1172 while (!completions_empty())
1173 finisher_cond.Wait(finisher_lock);
1174 }
1175 dout(10) << "flush waiting for finisher" << dendl;
1176 finisher->wait_for_empty();
1177 dout(10) << "flush done" << dendl;
1178 }
1179
1180
1181 void FileJournal::write_thread_entry()
1182 {
1183 dout(10) << "write_thread_entry start" << dendl;
1184 while (1) {
1185 {
1186 Mutex::Locker locker(writeq_lock);
1187 if (writeq.empty() && !must_write_header) {
1188 if (write_stop)
1189 break;
1190 dout(20) << "write_thread_entry going to sleep" << dendl;
1191 writeq_cond.Wait(writeq_lock);
1192 dout(20) << "write_thread_entry woke up" << dendl;
1193 continue;
1194 }
1195 }
1196
1197 #ifdef HAVE_LIBAIO
1198 if (aio) {
1199 Mutex::Locker locker(aio_lock);
1200 // should we back off to limit aios in flight? try to do this
1201 // adaptively so that we submit larger aios once we have lots of
1202 // them in flight.
1203 //
1204 // NOTE: our condition here is based on aio_num (protected by
1205 // aio_lock) and throttle_bytes (part of the write queue). when
1206 // we sleep, we *only* wait for aio_num to change, and do not
1207 // wake when more data is queued. this is not strictly correct,
1208 // but should be fine given that we will have plenty of aios in
1209 // flight if we hit this limit to ensure we keep the device
1210 // saturated.
1211 while (aio_num > 0) {
1212 int exp = MIN(aio_num * 2, 24);
1213 long unsigned min_new = 1ull << exp;
1214 uint64_t cur = aio_write_queue_bytes;
1215 dout(20) << "write_thread_entry aio throttle: aio num " << aio_num << " bytes " << aio_bytes
1216 << " ... exp " << exp << " min_new " << min_new
1217 << " ... pending " << cur << dendl;
1218 if (cur >= min_new)
1219 break;
1220 dout(20) << "write_thread_entry deferring until more aios complete: "
1221 << aio_num << " aios with " << aio_bytes << " bytes needs " << min_new
1222 << " bytes to start a new aio (currently " << cur << " pending)" << dendl;
1223 aio_cond.Wait(aio_lock);
1224 dout(20) << "write_thread_entry woke up" << dendl;
1225 }
1226 }
1227 #endif
1228
1229 Mutex::Locker locker(write_lock);
1230 uint64_t orig_ops = 0;
1231 uint64_t orig_bytes = 0;
1232
1233 bufferlist bl;
1234 int r = prepare_multi_write(bl, orig_ops, orig_bytes);
1235 // Don't care about journal full if stoppping, so drop queue and
1236 // possibly let header get written and loop above to notice stop
1237 if (r == -ENOSPC) {
1238 if (write_stop) {
1239 dout(20) << "write_thread_entry full and stopping, throw out queue and finish up" << dendl;
1240 while (!writeq_empty()) {
1241 complete_write(1, peek_write().orig_len);
1242 pop_write();
1243 }
1244 print_header(header);
1245 r = 0;
1246 } else {
1247 dout(20) << "write_thread_entry full, going to sleep (waiting for commit)" << dendl;
1248 commit_cond.Wait(write_lock);
1249 dout(20) << "write_thread_entry woke up" << dendl;
1250 continue;
1251 }
1252 }
1253 assert(r == 0);
1254
1255 if (logger) {
1256 logger->inc(l_filestore_journal_wr);
1257 logger->inc(l_filestore_journal_wr_bytes, bl.length());
1258 }
1259
1260 #ifdef HAVE_LIBAIO
1261 if (aio)
1262 do_aio_write(bl);
1263 else
1264 do_write(bl);
1265 #else
1266 do_write(bl);
1267 #endif
1268 complete_write(orig_ops, orig_bytes);
1269 }
1270
1271 dout(10) << "write_thread_entry finish" << dendl;
1272 }
1273
1274 #ifdef HAVE_LIBAIO
1275 void FileJournal::do_aio_write(bufferlist& bl)
1276 {
1277
1278 if (cct->_conf->journal_write_header_frequency &&
1279 (((++journaled_since_start) %
1280 cct->_conf->journal_write_header_frequency) == 0)) {
1281 must_write_header = true;
1282 }
1283
1284 // nothing to do?
1285 if (bl.length() == 0 && !must_write_header)
1286 return;
1287
1288 buffer::ptr hbp;
1289 if (must_write_header) {
1290 must_write_header = false;
1291 hbp = prepare_header();
1292 }
1293
1294 // entry
1295 off64_t pos = write_pos;
1296
1297 dout(15) << "do_aio_write writing " << pos << "~" << bl.length()
1298 << (hbp.length() ? " + header":"")
1299 << dendl;
1300
1301 // split?
1302 off64_t split = 0;
1303 if (pos + bl.length() > header.max_size) {
1304 bufferlist first, second;
1305 split = header.max_size - pos;
1306 first.substr_of(bl, 0, split);
1307 second.substr_of(bl, split, bl.length() - split);
1308 assert(first.length() + second.length() == bl.length());
1309 dout(10) << "do_aio_write wrapping, first bit at " << pos << "~" << first.length() << dendl;
1310
1311 if (write_aio_bl(pos, first, 0)) {
1312 derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
1313 << ") failed" << dendl;
1314 ceph_abort();
1315 }
1316 assert(pos == header.max_size);
1317 if (hbp.length()) {
1318 // be sneaky: include the header in the second fragment
1319 second.push_front(hbp);
1320 pos = 0; // we included the header
1321 } else
1322 pos = get_top(); // no header, start after that
1323 if (write_aio_bl(pos, second, writing_seq)) {
1324 derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
1325 << ") failed" << dendl;
1326 ceph_abort();
1327 }
1328 } else {
1329 // header too?
1330 if (hbp.length()) {
1331 bufferlist hbl;
1332 hbl.push_back(hbp);
1333 loff_t pos = 0;
1334 if (write_aio_bl(pos, hbl, 0)) {
1335 derr << "FileJournal::do_aio_write: write_aio_bl(header) failed" << dendl;
1336 ceph_abort();
1337 }
1338 }
1339
1340 if (write_aio_bl(pos, bl, writing_seq)) {
1341 derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
1342 << ") failed" << dendl;
1343 ceph_abort();
1344 }
1345 }
1346
1347 write_pos = pos;
1348 if (write_pos == header.max_size)
1349 write_pos = get_top();
1350 assert(write_pos % header.alignment == 0);
1351 }
1352
1353 /**
1354 * write a buffer using aio
1355 *
1356 * @param seq seq to trigger when this aio completes. if 0, do not update any state
1357 * on completion.
1358 */
1359 int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq)
1360 {
1361 dout(20) << "write_aio_bl " << pos << "~" << bl.length() << " seq " << seq << dendl;
1362
1363 while (bl.length() > 0) {
1364 int max = MIN(bl.get_num_buffers(), IOV_MAX-1);
1365 iovec *iov = new iovec[max];
1366 int n = 0;
1367 unsigned len = 0;
1368 for (std::list<buffer::ptr>::const_iterator p = bl.buffers().begin();
1369 n < max;
1370 ++p, ++n) {
1371 assert(p != bl.buffers().end());
1372 iov[n].iov_base = (void *)p->c_str();
1373 iov[n].iov_len = p->length();
1374 len += p->length();
1375 }
1376
1377 bufferlist tbl;
1378 bl.splice(0, len, &tbl); // move bytes from bl -> tbl
1379
1380 // lock only aio_queue, current aio, aio_num, aio_bytes, which may be
1381 // modified in check_aio_completion
1382 aio_lock.Lock();
1383 aio_queue.push_back(aio_info(tbl, pos, bl.length() > 0 ? 0 : seq));
1384 aio_info& aio = aio_queue.back();
1385 aio.iov = iov;
1386
1387 io_prep_pwritev(&aio.iocb, fd, aio.iov, n, pos);
1388
1389 dout(20) << "write_aio_bl .. " << aio.off << "~" << aio.len
1390 << " in " << n << dendl;
1391
1392 aio_num++;
1393 aio_bytes += aio.len;
1394
1395 // need to save current aio len to update write_pos later because current
1396 // aio could be ereased from aio_queue once it is done
1397 uint64_t cur_len = aio.len;
1398 // unlock aio_lock because following io_submit might take time to return
1399 aio_lock.Unlock();
1400
1401 iocb *piocb = &aio.iocb;
1402
1403 // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds
1404 int attempts = 16;
1405 int delay = 125;
1406 do {
1407 int r = io_submit(aio_ctx, 1, &piocb);
1408 dout(20) << "write_aio_bl io_submit return value: " << r << dendl;
1409 if (r < 0) {
1410 derr << "io_submit to " << aio.off << "~" << cur_len
1411 << " got " << cpp_strerror(r) << dendl;
1412 if (r == -EAGAIN && attempts-- > 0) {
1413 usleep(delay);
1414 delay *= 2;
1415 continue;
1416 }
1417 check_align(pos, tbl);
1418 assert(0 == "io_submit got unexpected error");
1419 } else {
1420 break;
1421 }
1422 } while (true);
1423 pos += cur_len;
1424 }
1425 aio_lock.Lock();
1426 write_finish_cond.Signal();
1427 aio_lock.Unlock();
1428 return 0;
1429 }
1430 #endif
1431
1432 void FileJournal::write_finish_thread_entry()
1433 {
1434 #ifdef HAVE_LIBAIO
1435 dout(10) << __func__ << " enter" << dendl;
1436 while (true) {
1437 {
1438 Mutex::Locker locker(aio_lock);
1439 if (aio_queue.empty()) {
1440 if (aio_stop)
1441 break;
1442 dout(20) << __func__ << " sleeping" << dendl;
1443 write_finish_cond.Wait(aio_lock);
1444 continue;
1445 }
1446 }
1447
1448 dout(20) << __func__ << " waiting for aio(s)" << dendl;
1449 io_event event[16];
1450 int r = io_getevents(aio_ctx, 1, 16, event, NULL);
1451 if (r < 0) {
1452 if (r == -EINTR) {
1453 dout(0) << "io_getevents got " << cpp_strerror(r) << dendl;
1454 continue;
1455 }
1456 derr << "io_getevents got " << cpp_strerror(r) << dendl;
1457 assert(0 == "got unexpected error from io_getevents");
1458 }
1459
1460 {
1461 Mutex::Locker locker(aio_lock);
1462 for (int i=0; i<r; i++) {
1463 aio_info *ai = (aio_info *)event[i].obj;
1464 if (event[i].res != ai->len) {
1465 derr << "aio to " << ai->off << "~" << ai->len
1466 << " returned: " << (int)event[i].res << dendl;
1467 assert(0 == "unexpected aio error");
1468 }
1469 dout(10) << __func__ << " aio " << ai->off
1470 << "~" << ai->len << " done" << dendl;
1471 ai->done = true;
1472 }
1473 check_aio_completion();
1474 }
1475 }
1476 dout(10) << __func__ << " exit" << dendl;
1477 #endif
1478 }
1479
1480 #ifdef HAVE_LIBAIO
1481 /**
1482 * check aio_wait for completed aio, and update state appropriately.
1483 */
1484 void FileJournal::check_aio_completion()
1485 {
1486 assert(aio_lock.is_locked());
1487 dout(20) << "check_aio_completion" << dendl;
1488
1489 bool completed_something = false, signal = false;
1490 uint64_t new_journaled_seq = 0;
1491
1492 list<aio_info>::iterator p = aio_queue.begin();
1493 while (p != aio_queue.end() && p->done) {
1494 dout(20) << "check_aio_completion completed seq " << p->seq << " "
1495 << p->off << "~" << p->len << dendl;
1496 if (p->seq) {
1497 new_journaled_seq = p->seq;
1498 completed_something = true;
1499 }
1500 aio_num--;
1501 aio_bytes -= p->len;
1502 aio_queue.erase(p++);
1503 signal = true;
1504 }
1505
1506 if (completed_something) {
1507 // kick finisher?
1508 // only if we haven't filled up recently!
1509 Mutex::Locker locker(finisher_lock);
1510 journaled_seq = new_journaled_seq;
1511 if (full_state != FULL_NOTFULL) {
1512 dout(10) << "check_aio_completion NOT queueing finisher seq " << journaled_seq
1513 << ", full_commit_seq|full_restart_seq" << dendl;
1514 } else {
1515 if (plug_journal_completions) {
1516 dout(20) << "check_aio_completion NOT queueing finishers through seq " << journaled_seq
1517 << " due to completion plug" << dendl;
1518 } else {
1519 dout(20) << "check_aio_completion queueing finishers through seq " << journaled_seq << dendl;
1520 queue_completions_thru(journaled_seq);
1521 }
1522 }
1523 }
1524 if (signal) {
1525 // maybe write queue was waiting for aio count to drop?
1526 aio_cond.Signal();
1527 }
1528 }
1529 #endif
1530
1531 int FileJournal::prepare_entry(vector<ObjectStore::Transaction>& tls, bufferlist* tbl) {
1532 dout(10) << "prepare_entry " << tls << dendl;
1533 int data_len = cct->_conf->journal_align_min_size - 1;
1534 int data_align = -1; // -1 indicates that we don't care about the alignment
1535 bufferlist bl;
1536 for (vector<ObjectStore::Transaction>::iterator p = tls.begin();
1537 p != tls.end(); ++p) {
1538 if ((int)(*p).get_data_length() > data_len) {
1539 data_len = (*p).get_data_length();
1540 data_align = ((*p).get_data_alignment() - bl.length()) & ~CEPH_PAGE_MASK;
1541 }
1542 ::encode(*p, bl);
1543 }
1544 if (tbl->length()) {
1545 bl.claim_append(*tbl);
1546 }
1547 // add it this entry
1548 entry_header_t h;
1549 unsigned head_size = sizeof(entry_header_t);
1550 off64_t base_size = 2*head_size + bl.length();
1551 memset(&h, 0, sizeof(h));
1552 if (data_align >= 0)
1553 h.pre_pad = ((unsigned int)data_align - (unsigned int)head_size) & ~CEPH_PAGE_MASK;
1554 off64_t size = ROUND_UP_TO(base_size + h.pre_pad, header.alignment);
1555 unsigned post_pad = size - base_size - h.pre_pad;
1556 h.len = bl.length();
1557 h.post_pad = post_pad;
1558 h.crc32c = bl.crc32c(0);
1559 dout(10) << " len " << bl.length() << " -> " << size
1560 << " (head " << head_size << " pre_pad " << h.pre_pad
1561 << " bl " << bl.length() << " post_pad " << post_pad << " tail " << head_size << ")"
1562 << " (bl alignment " << data_align << ")"
1563 << dendl;
1564 bufferlist ebl;
1565 // header
1566 ebl.append((const char*)&h, sizeof(h));
1567 if (h.pre_pad) {
1568 ebl.push_back(buffer::create_static(h.pre_pad, zero_buf));
1569 }
1570 // payload
1571 ebl.claim_append(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy
1572 if (h.post_pad) {
1573 ebl.push_back(buffer::create_static(h.post_pad, zero_buf));
1574 }
1575 // footer
1576 ebl.append((const char*)&h, sizeof(h));
1577 if (directio)
1578 ebl.rebuild_aligned(CEPH_DIRECTIO_ALIGNMENT);
1579 tbl->claim(ebl);
1580 return h.len;
1581 }
1582
1583 void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len,
1584 Context *oncommit, TrackedOpRef osd_op)
1585 {
1586 // dump on queue
1587 dout(5) << "submit_entry seq " << seq
1588 << " len " << e.length()
1589 << " (" << oncommit << ")" << dendl;
1590 assert(e.length() > 0);
1591 assert(e.length() < header.max_size);
1592
1593 if (osd_op)
1594 osd_op->mark_event("commit_queued_for_journal_write");
1595 if (logger) {
1596 logger->inc(l_filestore_journal_queue_bytes, orig_len);
1597 logger->inc(l_filestore_journal_queue_ops, 1);
1598 }
1599
1600 throttle.register_throttle_seq(seq, e.length());
1601 if (logger) {
1602 logger->inc(l_filestore_journal_ops, 1);
1603 logger->inc(l_filestore_journal_bytes, e.length());
1604 }
1605
1606 if (osd_op) {
1607 osd_op->mark_event("commit_queued_for_journal_write");
1608 if (osd_op->store_trace) {
1609 osd_op->journal_trace.init("journal", &trace_endpoint, &osd_op->store_trace);
1610 osd_op->journal_trace.event("submit_entry");
1611 osd_op->journal_trace.keyval("seq", seq);
1612 }
1613 }
1614 {
1615 Mutex::Locker l1(writeq_lock);
1616 #ifdef HAVE_LIBAIO
1617 Mutex::Locker l2(aio_lock);
1618 #endif
1619 Mutex::Locker l3(completions_lock);
1620
1621 #ifdef HAVE_LIBAIO
1622 aio_write_queue_ops++;
1623 aio_write_queue_bytes += e.length();
1624 aio_cond.Signal();
1625 #endif
1626
1627 completions.push_back(
1628 completion_item(
1629 seq, oncommit, ceph_clock_now(), osd_op));
1630 if (writeq.empty())
1631 writeq_cond.Signal();
1632 writeq.push_back(write_item(seq, e, orig_len, osd_op));
1633 if (osd_op)
1634 osd_op->journal_trace.keyval("queue depth", writeq.size());
1635 }
1636 }
1637
1638 bool FileJournal::writeq_empty()
1639 {
1640 Mutex::Locker locker(writeq_lock);
1641 return writeq.empty();
1642 }
1643
1644 FileJournal::write_item &FileJournal::peek_write()
1645 {
1646 assert(write_lock.is_locked());
1647 Mutex::Locker locker(writeq_lock);
1648 return writeq.front();
1649 }
1650
1651 void FileJournal::pop_write()
1652 {
1653 assert(write_lock.is_locked());
1654 Mutex::Locker locker(writeq_lock);
1655 if (logger) {
1656 logger->dec(l_filestore_journal_queue_bytes, writeq.front().orig_len);
1657 logger->dec(l_filestore_journal_queue_ops, 1);
1658 }
1659 writeq.pop_front();
1660 }
1661
1662 void FileJournal::batch_pop_write(list<write_item> &items)
1663 {
1664 assert(write_lock.is_locked());
1665 {
1666 Mutex::Locker locker(writeq_lock);
1667 writeq.swap(items);
1668 }
1669 for (auto &&i : items) {
1670 if (logger) {
1671 logger->dec(l_filestore_journal_queue_bytes, i.orig_len);
1672 logger->dec(l_filestore_journal_queue_ops, 1);
1673 }
1674 }
1675 }
1676
1677 void FileJournal::batch_unpop_write(list<write_item> &items)
1678 {
1679 assert(write_lock.is_locked());
1680 for (auto &&i : items) {
1681 if (logger) {
1682 logger->inc(l_filestore_journal_queue_bytes, i.orig_len);
1683 logger->inc(l_filestore_journal_queue_ops, 1);
1684 }
1685 }
1686 Mutex::Locker locker(writeq_lock);
1687 writeq.splice(writeq.begin(), items);
1688 }
1689
1690 void FileJournal::commit_start(uint64_t seq)
1691 {
1692 dout(10) << "commit_start" << dendl;
1693
1694 // was full?
1695 switch (full_state) {
1696 case FULL_NOTFULL:
1697 break; // all good
1698
1699 case FULL_FULL:
1700 if (seq >= journaled_seq) {
1701 dout(1) << " FULL_FULL -> FULL_WAIT. commit_start on seq "
1702 << seq << " > journaled_seq " << journaled_seq
1703 << ", moving to FULL_WAIT."
1704 << dendl;
1705 full_state = FULL_WAIT;
1706 } else {
1707 dout(1) << "FULL_FULL commit_start on seq "
1708 << seq << " < journaled_seq " << journaled_seq
1709 << ", remaining in FULL_FULL"
1710 << dendl;
1711 }
1712 break;
1713
1714 case FULL_WAIT:
1715 dout(1) << " FULL_WAIT -> FULL_NOTFULL. journal now active, setting completion plug." << dendl;
1716 full_state = FULL_NOTFULL;
1717 plug_journal_completions = true;
1718 break;
1719 }
1720 }
1721
1722 /*
1723 *send discard command to joural block deivce
1724 */
1725 void FileJournal::do_discard(int64_t offset, int64_t end)
1726 {
1727 dout(10) << __func__ << "trim(" << offset << ", " << end << dendl;
1728
1729 offset = ROUND_UP_TO(offset, block_size);
1730 if (offset >= end)
1731 return;
1732 end = ROUND_UP_TO(end - block_size, block_size);
1733 assert(end >= offset);
1734 if (offset < end)
1735 if (block_device_discard(fd, offset, end - offset) < 0)
1736 dout(1) << __func__ << "ioctl(BLKDISCARD) error:" << cpp_strerror(errno) << dendl;
1737 }
1738
1739 void FileJournal::committed_thru(uint64_t seq)
1740 {
1741 Mutex::Locker locker(write_lock);
1742
1743 auto released = throttle.flush(seq);
1744 if (logger) {
1745 logger->dec(l_filestore_journal_ops, released.first);
1746 logger->dec(l_filestore_journal_bytes, released.second);
1747 }
1748
1749 if (seq < last_committed_seq) {
1750 dout(5) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl;
1751 assert(seq >= last_committed_seq);
1752 return;
1753 }
1754 if (seq == last_committed_seq) {
1755 dout(5) << "committed_thru " << seq << " == last_committed_seq " << last_committed_seq << dendl;
1756 return;
1757 }
1758
1759 dout(5) << "committed_thru " << seq << " (last_committed_seq " << last_committed_seq << ")" << dendl;
1760 last_committed_seq = seq;
1761
1762 // completions!
1763 {
1764 Mutex::Locker locker(finisher_lock);
1765 queue_completions_thru(seq);
1766 if (plug_journal_completions && seq >= header.start_seq) {
1767 dout(10) << " removing completion plug, queuing completions thru journaled_seq " << journaled_seq << dendl;
1768 plug_journal_completions = false;
1769 queue_completions_thru(journaled_seq);
1770 }
1771 }
1772
1773 // adjust start pointer
1774 while (!journalq.empty() && journalq.front().first <= seq) {
1775 journalq.pop_front();
1776 }
1777
1778 int64_t old_start = header.start;
1779 if (!journalq.empty()) {
1780 header.start = journalq.front().second;
1781 header.start_seq = journalq.front().first;
1782 } else {
1783 header.start = write_pos;
1784 header.start_seq = seq + 1;
1785 }
1786
1787 if (discard) {
1788 dout(10) << __func__ << " will trim (" << old_start << ", " << header.start << ")" << dendl;
1789 if (old_start < header.start)
1790 do_discard(old_start, header.start - 1);
1791 else {
1792 do_discard(old_start, header.max_size - 1);
1793 do_discard(get_top(), header.start - 1);
1794 }
1795 }
1796
1797 must_write_header = true;
1798 print_header(header);
1799
1800 // committed but unjournaled items
1801 while (!writeq_empty() && peek_write().seq <= seq) {
1802 dout(15) << " dropping committed but unwritten seq " << peek_write().seq
1803 << " len " << peek_write().bl.length()
1804 << dendl;
1805 complete_write(1, peek_write().orig_len);
1806 pop_write();
1807 }
1808
1809 commit_cond.Signal();
1810
1811 dout(10) << "committed_thru done" << dendl;
1812 }
1813
1814
1815 void FileJournal::complete_write(uint64_t ops, uint64_t bytes)
1816 {
1817 dout(5) << __func__ << " finished " << ops << " ops and "
1818 << bytes << " bytes" << dendl;
1819 }
1820
1821 int FileJournal::make_writeable()
1822 {
1823 dout(10) << __func__ << dendl;
1824 int r = set_throttle_params();
1825 if (r < 0)
1826 return r;
1827
1828 r = _open(true);
1829 if (r < 0)
1830 return r;
1831
1832 if (read_pos > 0)
1833 write_pos = read_pos;
1834 else
1835 write_pos = get_top();
1836 read_pos = 0;
1837
1838 must_write_header = true;
1839
1840 start_writer();
1841 return 0;
1842 }
1843
1844 int FileJournal::set_throttle_params()
1845 {
1846 stringstream ss;
1847 bool valid = throttle.set_params(
1848 cct->_conf->journal_throttle_low_threshhold,
1849 cct->_conf->journal_throttle_high_threshhold,
1850 cct->_conf->filestore_expected_throughput_bytes,
1851 cct->_conf->journal_throttle_high_multiple,
1852 cct->_conf->journal_throttle_max_multiple,
1853 header.max_size - get_top(),
1854 &ss);
1855
1856 if (!valid) {
1857 derr << "tried to set invalid params: "
1858 << ss.str()
1859 << dendl;
1860 }
1861 return valid ? 0 : -EINVAL;
1862 }
1863
1864 const char** FileJournal::get_tracked_conf_keys() const
1865 {
1866 static const char *KEYS[] = {
1867 "journal_throttle_low_threshhold",
1868 "journal_throttle_high_threshhold",
1869 "journal_throttle_high_multiple",
1870 "journal_throttle_max_multiple",
1871 "filestore_expected_throughput_bytes",
1872 NULL};
1873 return KEYS;
1874 }
1875
1876 void FileJournal::wrap_read_bl(
1877 off64_t pos,
1878 int64_t olen,
1879 bufferlist* bl,
1880 off64_t *out_pos
1881 ) const
1882 {
1883 while (olen > 0) {
1884 while (pos >= header.max_size)
1885 pos = pos + get_top() - header.max_size;
1886
1887 int64_t len;
1888 if (pos + olen > header.max_size)
1889 len = header.max_size - pos; // partial
1890 else
1891 len = olen; // rest
1892
1893 int64_t actual = ::lseek64(fd, pos, SEEK_SET);
1894 assert(actual == pos);
1895
1896 bufferptr bp = buffer::create(len);
1897 int r = safe_read_exact(fd, bp.c_str(), len);
1898 if (r) {
1899 derr << "FileJournal::wrap_read_bl: safe_read_exact " << pos << "~" << len << " returned "
1900 << r << dendl;
1901 ceph_abort();
1902 }
1903 bl->push_back(std::move(bp));
1904 pos += len;
1905 olen -= len;
1906 }
1907 if (pos >= header.max_size)
1908 pos = pos + get_top() - header.max_size;
1909 if (out_pos)
1910 *out_pos = pos;
1911 }
1912
1913 bool FileJournal::read_entry(
1914 bufferlist &bl,
1915 uint64_t &next_seq,
1916 bool *corrupt)
1917 {
1918 if (corrupt)
1919 *corrupt = false;
1920 uint64_t seq = next_seq;
1921
1922 if (!read_pos) {
1923 dout(2) << "read_entry -- not readable" << dendl;
1924 return false;
1925 }
1926
1927 off64_t pos = read_pos;
1928 off64_t next_pos = pos;
1929 stringstream ss;
1930 read_entry_result result = do_read_entry(
1931 pos,
1932 &next_pos,
1933 &bl,
1934 &seq,
1935 &ss);
1936 if (result == SUCCESS) {
1937 journalq.push_back( pair<uint64_t,off64_t>(seq, pos));
1938 uint64_t amount_to_take =
1939 next_pos > pos ?
1940 next_pos - pos :
1941 (header.max_size - pos) + (next_pos - get_top());
1942 throttle.take(amount_to_take);
1943 throttle.register_throttle_seq(next_seq, amount_to_take);
1944 if (logger) {
1945 logger->inc(l_filestore_journal_ops, 1);
1946 logger->inc(l_filestore_journal_bytes, amount_to_take);
1947 }
1948 if (next_seq > seq) {
1949 return false;
1950 } else {
1951 read_pos = next_pos;
1952 next_seq = seq;
1953 if (seq > journaled_seq)
1954 journaled_seq = seq;
1955 return true;
1956 }
1957 } else {
1958 derr << "do_read_entry(" << pos << "): " << ss.str() << dendl;
1959 }
1960
1961 if (seq && seq < header.committed_up_to) {
1962 derr << "Unable to read past sequence " << seq
1963 << " but header indicates the journal has committed up through "
1964 << header.committed_up_to << ", journal is corrupt" << dendl;
1965 if (cct->_conf->journal_ignore_corruption) {
1966 if (corrupt)
1967 *corrupt = true;
1968 return false;
1969 } else {
1970 ceph_abort();
1971 }
1972 }
1973
1974 dout(2) << "No further valid entries found, journal is most likely valid"
1975 << dendl;
1976 return false;
1977 }
1978
1979 FileJournal::read_entry_result FileJournal::do_read_entry(
1980 off64_t init_pos,
1981 off64_t *next_pos,
1982 bufferlist *bl,
1983 uint64_t *seq,
1984 ostream *ss,
1985 entry_header_t *_h) const
1986 {
1987 off64_t cur_pos = init_pos;
1988 bufferlist _bl;
1989 if (!bl)
1990 bl = &_bl;
1991
1992 // header
1993 entry_header_t *h;
1994 bufferlist hbl;
1995 off64_t _next_pos;
1996 wrap_read_bl(cur_pos, sizeof(*h), &hbl, &_next_pos);
1997 h = reinterpret_cast<entry_header_t *>(hbl.c_str());
1998
1999 if (!h->check_magic(cur_pos, header.get_fsid64())) {
2000 dout(25) << "read_entry " << init_pos
2001 << " : bad header magic, end of journal" << dendl;
2002 if (ss)
2003 *ss << "bad header magic";
2004 if (next_pos)
2005 *next_pos = init_pos + (4<<10); // check 4k ahead
2006 return MAYBE_CORRUPT;
2007 }
2008 cur_pos = _next_pos;
2009
2010 // pad + body + pad
2011 if (h->pre_pad)
2012 cur_pos += h->pre_pad;
2013
2014 bl->clear();
2015 wrap_read_bl(cur_pos, h->len, bl, &cur_pos);
2016
2017 if (h->post_pad)
2018 cur_pos += h->post_pad;
2019
2020 // footer
2021 entry_header_t *f;
2022 bufferlist fbl;
2023 wrap_read_bl(cur_pos, sizeof(*f), &fbl, &cur_pos);
2024 f = reinterpret_cast<entry_header_t *>(fbl.c_str());
2025 if (memcmp(f, h, sizeof(*f))) {
2026 if (ss)
2027 *ss << "bad footer magic, partial entry";
2028 if (next_pos)
2029 *next_pos = cur_pos;
2030 return MAYBE_CORRUPT;
2031 }
2032
2033 if ((header.flags & header_t::FLAG_CRC) || // if explicitly enabled (new journal)
2034 h->crc32c != 0) { // newer entry in old journal
2035 uint32_t actual_crc = bl->crc32c(0);
2036 if (actual_crc != h->crc32c) {
2037 if (ss)
2038 *ss << "header crc (" << h->crc32c
2039 << ") doesn't match body crc (" << actual_crc << ")";
2040 if (next_pos)
2041 *next_pos = cur_pos;
2042 return MAYBE_CORRUPT;
2043 }
2044 }
2045
2046 // yay!
2047 dout(2) << "read_entry " << init_pos << " : seq " << h->seq
2048 << " " << h->len << " bytes"
2049 << dendl;
2050
2051 // ok!
2052 if (seq)
2053 *seq = h->seq;
2054
2055
2056 if (next_pos)
2057 *next_pos = cur_pos;
2058
2059 if (_h)
2060 *_h = *h;
2061
2062 assert(cur_pos % header.alignment == 0);
2063 return SUCCESS;
2064 }
2065
2066 void FileJournal::reserve_throttle_and_backoff(uint64_t count)
2067 {
2068 throttle.get(count);
2069 }
2070
2071 void FileJournal::get_header(
2072 uint64_t wanted_seq,
2073 off64_t *_pos,
2074 entry_header_t *h)
2075 {
2076 off64_t pos = header.start;
2077 off64_t next_pos = pos;
2078 bufferlist bl;
2079 uint64_t seq = 0;
2080 dout(2) << __func__ << dendl;
2081 while (1) {
2082 bl.clear();
2083 pos = next_pos;
2084 read_entry_result result = do_read_entry(
2085 pos,
2086 &next_pos,
2087 &bl,
2088 &seq,
2089 0,
2090 h);
2091 if (result == FAILURE || result == MAYBE_CORRUPT)
2092 ceph_abort();
2093 if (seq == wanted_seq) {
2094 if (_pos)
2095 *_pos = pos;
2096 return;
2097 }
2098 }
2099 ceph_abort(); // not reachable
2100 }
2101
2102 void FileJournal::corrupt(
2103 int wfd,
2104 off64_t corrupt_at)
2105 {
2106 dout(2) << __func__ << dendl;
2107 if (corrupt_at >= header.max_size)
2108 corrupt_at = corrupt_at + get_top() - header.max_size;
2109
2110 int64_t actual = ::lseek64(fd, corrupt_at, SEEK_SET);
2111 assert(actual == corrupt_at);
2112
2113 char buf[10];
2114 int r = safe_read_exact(fd, buf, 1);
2115 assert(r == 0);
2116
2117 actual = ::lseek64(wfd, corrupt_at, SEEK_SET);
2118 assert(actual == corrupt_at);
2119
2120 buf[0]++;
2121 r = safe_write(wfd, buf, 1);
2122 assert(r == 0);
2123 }
2124
2125 void FileJournal::corrupt_payload(
2126 int wfd,
2127 uint64_t seq)
2128 {
2129 dout(2) << __func__ << dendl;
2130 off64_t pos = 0;
2131 entry_header_t h;
2132 get_header(seq, &pos, &h);
2133 off64_t corrupt_at =
2134 pos + sizeof(entry_header_t) + h.pre_pad;
2135 corrupt(wfd, corrupt_at);
2136 }
2137
2138
2139 void FileJournal::corrupt_footer_magic(
2140 int wfd,
2141 uint64_t seq)
2142 {
2143 dout(2) << __func__ << dendl;
2144 off64_t pos = 0;
2145 entry_header_t h;
2146 get_header(seq, &pos, &h);
2147 off64_t corrupt_at =
2148 pos + sizeof(entry_header_t) + h.pre_pad +
2149 h.len + h.post_pad +
2150 (reinterpret_cast<char*>(&h.magic2) - reinterpret_cast<char*>(&h));
2151 corrupt(wfd, corrupt_at);
2152 }
2153
2154
2155 void FileJournal::corrupt_header_magic(
2156 int wfd,
2157 uint64_t seq)
2158 {
2159 dout(2) << __func__ << dendl;
2160 off64_t pos = 0;
2161 entry_header_t h;
2162 get_header(seq, &pos, &h);
2163 off64_t corrupt_at =
2164 pos +
2165 (reinterpret_cast<char*>(&h.magic2) - reinterpret_cast<char*>(&h));
2166 corrupt(wfd, corrupt_at);
2167 }
2168
2169 off64_t FileJournal::get_journal_size_estimate()
2170 {
2171 off64_t size, start = header.start;
2172 if (write_pos < start) {
2173 size = (max_size - start) + write_pos;
2174 } else {
2175 size = write_pos - start;
2176 }
2177 dout(20) << __func__ << " journal size=" << size << dendl;
2178 return size;
2179 }