]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | #include "acconfig.h" | |
15 | ||
16 | #include "common/debug.h" | |
17 | #include "common/errno.h" | |
18 | #include "common/safe_io.h" | |
19 | #include "FileJournal.h" | |
20 | #include "include/color.h" | |
21 | #include "common/perf_counters.h" | |
22 | #include "FileStore.h" | |
23 | ||
24 | #include "include/compat.h" | |
25 | ||
26 | #include <fcntl.h> | |
27 | #include <limits.h> | |
28 | #include <sstream> | |
29 | #include <stdio.h> | |
30 | #include <stdlib.h> | |
31 | #include <sys/types.h> | |
32 | #include <sys/stat.h> | |
33 | #include <sys/mount.h> | |
34 | ||
35 | #include "common/blkdev.h" | |
36 | #if defined(__linux__) | |
37 | #include "common/linux_version.h" | |
38 | #endif | |
39 | ||
40 | #if defined(__FreeBSD__) | |
41 | #define O_DSYNC O_SYNC | |
42 | #endif | |
43 | ||
44 | #define dout_context cct | |
45 | #define dout_subsys ceph_subsys_journal | |
46 | #undef dout_prefix | |
47 | #define dout_prefix *_dout << "journal " | |
48 | ||
49 | const static int64_t ONE_MEG(1 << 20); | |
50 | const static int CEPH_DIRECTIO_ALIGNMENT(4096); | |
51 | ||
52 | ||
53 | int FileJournal::_open(bool forwrite, bool create) | |
54 | { | |
55 | int flags, ret; | |
56 | ||
57 | if (forwrite) { | |
58 | flags = O_RDWR; | |
59 | if (directio) | |
60 | flags |= O_DIRECT | O_DSYNC; | |
61 | } else { | |
62 | flags = O_RDONLY; | |
63 | } | |
64 | if (create) | |
65 | flags |= O_CREAT; | |
66 | ||
67 | if (fd >= 0) { | |
68 | if (TEMP_FAILURE_RETRY(::close(fd))) { | |
69 | int err = errno; | |
70 | derr << "FileJournal::_open: error closing old fd: " | |
71 | << cpp_strerror(err) << dendl; | |
72 | } | |
73 | } | |
91327a77 | 74 | fd = TEMP_FAILURE_RETRY(::open(fn.c_str(), flags|O_CLOEXEC, 0644)); |
7c673cae FG |
75 | if (fd < 0) { |
76 | int err = errno; | |
77 | dout(2) << "FileJournal::_open unable to open journal " | |
78 | << fn << ": " << cpp_strerror(err) << dendl; | |
79 | return -err; | |
80 | } | |
81 | ||
82 | struct stat st; | |
83 | ret = ::fstat(fd, &st); | |
84 | if (ret) { | |
85 | ret = errno; | |
86 | derr << "FileJournal::_open: unable to fstat journal: " << cpp_strerror(ret) << dendl; | |
87 | ret = -ret; | |
88 | goto out_fd; | |
89 | } | |
90 | ||
91 | if (S_ISBLK(st.st_mode)) { | |
92 | ret = _open_block_device(); | |
93 | } else if (S_ISREG(st.st_mode)) { | |
94 | if (aio && !force_aio) { | |
95 | derr << "FileJournal::_open: disabling aio for non-block journal. Use " | |
96 | << "journal_force_aio to force use of aio anyway" << dendl; | |
97 | aio = false; | |
98 | } | |
99 | ret = _open_file(st.st_size, st.st_blksize, create); | |
100 | } else { | |
101 | derr << "FileJournal::_open: wrong journal file type: " << st.st_mode | |
102 | << dendl; | |
103 | ret = -EINVAL; | |
104 | } | |
105 | ||
106 | if (ret) | |
107 | goto out_fd; | |
108 | ||
109 | #ifdef HAVE_LIBAIO | |
110 | if (aio) { | |
111 | aio_ctx = 0; | |
112 | ret = io_setup(128, &aio_ctx); | |
113 | if (ret < 0) { | |
114 | switch (ret) { | |
115 | // Contrary to naive expectations -EAGIAN means ... | |
116 | case -EAGAIN: | |
117 | derr << "FileJournal::_open: user's limit of aio events exceeded. " | |
118 | << "Try increasing /proc/sys/fs/aio-max-nr" << dendl; | |
119 | break; | |
120 | default: | |
121 | derr << "FileJournal::_open: unable to setup io_context " << cpp_strerror(-ret) << dendl; | |
122 | break; | |
123 | } | |
124 | goto out_fd; | |
125 | } | |
126 | } | |
127 | #endif | |
128 | ||
129 | /* We really want max_size to be a multiple of block_size. */ | |
130 | max_size -= max_size % block_size; | |
131 | ||
132 | dout(1) << "_open " << fn << " fd " << fd | |
133 | << ": " << max_size | |
134 | << " bytes, block size " << block_size | |
135 | << " bytes, directio = " << directio | |
136 | << ", aio = " << aio | |
137 | << dendl; | |
138 | return 0; | |
139 | ||
140 | out_fd: | |
141 | VOID_TEMP_FAILURE_RETRY(::close(fd)); | |
142 | fd = -1; | |
143 | return ret; | |
144 | } | |
145 | ||
146 | int FileJournal::_open_block_device() | |
147 | { | |
148 | int64_t bdev_sz = 0; | |
11fdf7f2 TL |
149 | BlkDev blkdev(fd); |
150 | int ret = blkdev.get_size(&bdev_sz); | |
7c673cae FG |
151 | if (ret) { |
152 | dout(0) << __func__ << ": failed to read block device size." << dendl; | |
153 | return -EIO; | |
154 | } | |
155 | ||
156 | /* Check for bdev_sz too small */ | |
157 | if (bdev_sz < ONE_MEG) { | |
158 | dout(0) << __func__ << ": your block device must be at least " | |
159 | << ONE_MEG << " bytes to be used for a Ceph journal." << dendl; | |
160 | return -EINVAL; | |
161 | } | |
162 | ||
163 | dout(10) << __func__ << ": ignoring osd journal size. " | |
164 | << "We'll use the entire block device (size: " << bdev_sz << ")" | |
165 | << dendl; | |
166 | max_size = bdev_sz; | |
167 | ||
168 | block_size = cct->_conf->journal_block_size; | |
169 | ||
170 | if (cct->_conf->journal_discard) { | |
11fdf7f2 | 171 | discard = blkdev.support_discard(); |
7c673cae FG |
172 | dout(10) << fn << " support discard: " << (int)discard << dendl; |
173 | } | |
174 | ||
175 | return 0; | |
176 | } | |
177 | ||
178 | int FileJournal::_open_file(int64_t oldsize, blksize_t blksize, | |
179 | bool create) | |
180 | { | |
181 | int ret; | |
182 | int64_t conf_journal_sz(cct->_conf->osd_journal_size); | |
183 | conf_journal_sz <<= 20; | |
184 | ||
185 | if ((cct->_conf->osd_journal_size == 0) && (oldsize < ONE_MEG)) { | |
186 | derr << "I'm sorry, I don't know how large of a journal to create." | |
187 | << "Please specify a block device to use as the journal OR " | |
188 | << "set osd_journal_size in your ceph.conf" << dendl; | |
189 | return -EINVAL; | |
190 | } | |
191 | ||
192 | if (create && (oldsize < conf_journal_sz)) { | |
193 | uint64_t newsize(conf_journal_sz); | |
194 | dout(10) << __func__ << " _open extending to " << newsize << " bytes" << dendl; | |
195 | ret = ::ftruncate(fd, newsize); | |
196 | if (ret < 0) { | |
197 | int err = errno; | |
198 | derr << "FileJournal::_open_file : unable to extend journal to " | |
199 | << newsize << " bytes: " << cpp_strerror(err) << dendl; | |
200 | return -err; | |
201 | } | |
28e407b8 | 202 | ret = ceph_posix_fallocate(fd, 0, newsize); |
7c673cae FG |
203 | if (ret) { |
204 | derr << "FileJournal::_open_file : unable to preallocation journal to " | |
205 | << newsize << " bytes: " << cpp_strerror(ret) << dendl; | |
206 | return -ret; | |
207 | } | |
208 | max_size = newsize; | |
7c673cae FG |
209 | } |
210 | else { | |
211 | max_size = oldsize; | |
212 | } | |
213 | block_size = cct->_conf->journal_block_size; | |
214 | ||
215 | if (create && cct->_conf->journal_zero_on_create) { | |
216 | derr << "FileJournal::_open_file : zeroing journal" << dendl; | |
217 | uint64_t write_size = 1 << 20; | |
218 | char *buf; | |
219 | ret = ::posix_memalign((void **)&buf, block_size, write_size); | |
220 | if (ret != 0) { | |
221 | return -ret; | |
222 | } | |
223 | memset(static_cast<void*>(buf), 0, write_size); | |
224 | uint64_t i = 0; | |
225 | for (; (i + write_size) <= (uint64_t)max_size; i += write_size) { | |
226 | ret = ::pwrite(fd, static_cast<void*>(buf), write_size, i); | |
227 | if (ret < 0) { | |
228 | free(buf); | |
229 | return -errno; | |
230 | } | |
231 | } | |
232 | if (i < (uint64_t)max_size) { | |
233 | ret = ::pwrite(fd, static_cast<void*>(buf), max_size - i, i); | |
234 | if (ret < 0) { | |
235 | free(buf); | |
236 | return -errno; | |
237 | } | |
238 | } | |
239 | free(buf); | |
240 | } | |
241 | ||
242 | ||
243 | dout(10) << "_open journal is not a block device, NOT checking disk " | |
244 | << "write cache on '" << fn << "'" << dendl; | |
245 | ||
246 | return 0; | |
247 | } | |
248 | ||
249 | // This can not be used on an active journal | |
250 | int FileJournal::check() | |
251 | { | |
252 | int ret; | |
253 | ||
11fdf7f2 | 254 | ceph_assert(fd == -1); |
7c673cae FG |
255 | ret = _open(false, false); |
256 | if (ret) | |
257 | return ret; | |
258 | ||
259 | ret = read_header(&header); | |
260 | if (ret < 0) | |
261 | goto done; | |
262 | ||
263 | if (header.fsid != fsid) { | |
264 | derr << "check: ondisk fsid " << header.fsid << " doesn't match expected " << fsid | |
265 | << ", invalid (someone else's?) journal" << dendl; | |
266 | ret = -EINVAL; | |
267 | goto done; | |
268 | } | |
269 | ||
270 | dout(1) << "check: header looks ok" << dendl; | |
271 | ret = 0; | |
272 | ||
273 | done: | |
274 | close(); | |
275 | return ret; | |
276 | } | |
277 | ||
278 | ||
279 | int FileJournal::create() | |
280 | { | |
281 | void *buf = 0; | |
282 | int64_t needed_space; | |
283 | int ret; | |
284 | buffer::ptr bp; | |
285 | dout(2) << "create " << fn << " fsid " << fsid << dendl; | |
286 | ||
287 | ret = _open(true, true); | |
288 | if (ret) | |
289 | goto done; | |
290 | ||
291 | // write empty header | |
292 | header = header_t(); | |
293 | header.flags = header_t::FLAG_CRC; // enable crcs on any new journal. | |
294 | header.fsid = fsid; | |
295 | header.max_size = max_size; | |
296 | header.block_size = block_size; | |
297 | if (cct->_conf->journal_block_align || directio) | |
298 | header.alignment = block_size; | |
299 | else | |
300 | header.alignment = 16; // at least stay word aligned on 64bit machines... | |
301 | ||
302 | header.start = get_top(); | |
303 | header.start_seq = 0; | |
304 | ||
305 | print_header(header); | |
306 | ||
307 | // static zeroed buffer for alignment padding | |
308 | delete [] zero_buf; | |
309 | zero_buf = new char[header.alignment]; | |
310 | memset(zero_buf, 0, header.alignment); | |
311 | ||
312 | bp = prepare_header(); | |
313 | if (TEMP_FAILURE_RETRY(::pwrite(fd, bp.c_str(), bp.length(), 0)) < 0) { | |
314 | ret = -errno; | |
315 | derr << "FileJournal::create : create write header error " | |
316 | << cpp_strerror(ret) << dendl; | |
317 | goto close_fd; | |
318 | } | |
319 | ||
320 | // zero first little bit, too. | |
321 | ret = posix_memalign(&buf, block_size, block_size); | |
322 | if (ret) { | |
323 | ret = -ret; | |
324 | derr << "FileJournal::create: failed to allocate " << block_size | |
325 | << " bytes of memory: " << cpp_strerror(ret) << dendl; | |
326 | goto close_fd; | |
327 | } | |
328 | memset(buf, 0, block_size); | |
329 | if (TEMP_FAILURE_RETRY(::pwrite(fd, buf, block_size, get_top())) < 0) { | |
330 | ret = -errno; | |
331 | derr << "FileJournal::create: error zeroing first " << block_size | |
332 | << " bytes " << cpp_strerror(ret) << dendl; | |
333 | goto free_buf; | |
334 | } | |
335 | ||
11fdf7f2 | 336 | needed_space = cct->_conf->osd_max_write_size << 20; |
7c673cae FG |
337 | needed_space += (2 * sizeof(entry_header_t)) + get_top(); |
338 | if (header.max_size - header.start < needed_space) { | |
339 | derr << "FileJournal::create: OSD journal is not large enough to hold " | |
340 | << "osd_max_write_size bytes!" << dendl; | |
341 | ret = -ENOSPC; | |
342 | goto free_buf; | |
343 | } | |
344 | ||
345 | dout(2) << "create done" << dendl; | |
346 | ret = 0; | |
347 | ||
348 | free_buf: | |
349 | free(buf); | |
350 | buf = 0; | |
351 | close_fd: | |
352 | if (TEMP_FAILURE_RETRY(::close(fd)) < 0) { | |
353 | ret = -errno; | |
354 | derr << "FileJournal::create: error closing fd: " << cpp_strerror(ret) | |
355 | << dendl; | |
356 | } | |
357 | done: | |
358 | fd = -1; | |
359 | return ret; | |
360 | } | |
361 | ||
362 | // This can not be used on an active journal | |
363 | int FileJournal::peek_fsid(uuid_d& fsid) | |
364 | { | |
11fdf7f2 | 365 | ceph_assert(fd == -1); |
7c673cae FG |
366 | int r = _open(false, false); |
367 | if (r) | |
368 | return r; | |
369 | r = read_header(&header); | |
370 | if (r < 0) | |
371 | goto out; | |
372 | fsid = header.fsid; | |
373 | out: | |
374 | close(); | |
375 | return r; | |
376 | } | |
377 | ||
378 | int FileJournal::open(uint64_t fs_op_seq) | |
379 | { | |
380 | dout(2) << "open " << fn << " fsid " << fsid << " fs_op_seq " << fs_op_seq << dendl; | |
381 | ||
382 | uint64_t next_seq = fs_op_seq + 1; | |
224ce89b | 383 | uint64_t seq = -1; |
7c673cae FG |
384 | |
385 | int err = _open(false); | |
386 | if (err) | |
387 | return err; | |
388 | ||
389 | // assume writeable, unless... | |
390 | read_pos = 0; | |
391 | write_pos = get_top(); | |
392 | ||
393 | // read header? | |
394 | err = read_header(&header); | |
395 | if (err < 0) | |
224ce89b | 396 | goto out; |
7c673cae FG |
397 | |
398 | // static zeroed buffer for alignment padding | |
399 | delete [] zero_buf; | |
400 | zero_buf = new char[header.alignment]; | |
401 | memset(zero_buf, 0, header.alignment); | |
402 | ||
403 | dout(10) << "open header.fsid = " << header.fsid | |
404 | //<< " vs expected fsid = " << fsid | |
405 | << dendl; | |
406 | if (header.fsid != fsid) { | |
407 | derr << "FileJournal::open: ondisk fsid " << header.fsid << " doesn't match expected " << fsid | |
408 | << ", invalid (someone else's?) journal" << dendl; | |
224ce89b WB |
409 | err = -EINVAL; |
410 | goto out; | |
7c673cae FG |
411 | } |
412 | if (header.max_size > max_size) { | |
413 | dout(2) << "open journal size " << header.max_size << " > current " << max_size << dendl; | |
224ce89b WB |
414 | err = -EINVAL; |
415 | goto out; | |
7c673cae FG |
416 | } |
417 | if (header.block_size != block_size) { | |
418 | dout(2) << "open journal block size " << header.block_size << " != current " << block_size << dendl; | |
224ce89b WB |
419 | err = -EINVAL; |
420 | goto out; | |
7c673cae FG |
421 | } |
422 | if (header.max_size % header.block_size) { | |
423 | dout(2) << "open journal max size " << header.max_size | |
424 | << " not a multiple of block size " << header.block_size << dendl; | |
224ce89b WB |
425 | err = -EINVAL; |
426 | goto out; | |
7c673cae FG |
427 | } |
428 | if (header.alignment != block_size && directio) { | |
429 | dout(0) << "open journal alignment " << header.alignment << " does not match block size " | |
430 | << block_size << " (required for direct_io journal mode)" << dendl; | |
224ce89b WB |
431 | err = -EINVAL; |
432 | goto out; | |
7c673cae FG |
433 | } |
434 | if ((header.alignment % CEPH_DIRECTIO_ALIGNMENT) && directio) { | |
435 | dout(0) << "open journal alignment " << header.alignment | |
436 | << " is not multiple of minimum directio alignment " | |
437 | << CEPH_DIRECTIO_ALIGNMENT << " (required for direct_io journal mode)" | |
438 | << dendl; | |
224ce89b WB |
439 | err = -EINVAL; |
440 | goto out; | |
7c673cae FG |
441 | } |
442 | ||
443 | // looks like a valid header. | |
444 | write_pos = 0; // not writeable yet | |
445 | ||
446 | journaled_seq = header.committed_up_to; | |
447 | ||
448 | // find next entry | |
449 | read_pos = header.start; | |
224ce89b | 450 | seq = header.start_seq; |
7c673cae FG |
451 | |
452 | while (1) { | |
453 | bufferlist bl; | |
454 | off64_t old_pos = read_pos; | |
455 | if (!read_entry(bl, seq)) { | |
456 | dout(10) << "open reached end of journal." << dendl; | |
457 | break; | |
458 | } | |
459 | if (seq > next_seq) { | |
460 | dout(10) << "open entry " << seq << " len " << bl.length() << " > next_seq " << next_seq | |
461 | << ", ignoring journal contents" | |
462 | << dendl; | |
463 | read_pos = -1; | |
464 | last_committed_seq = 0; | |
465 | return 0; | |
466 | } | |
467 | if (seq == next_seq) { | |
468 | dout(10) << "open reached seq " << seq << dendl; | |
469 | read_pos = old_pos; | |
470 | break; | |
471 | } | |
472 | seq++; // next event should follow. | |
473 | } | |
474 | ||
475 | return 0; | |
224ce89b WB |
476 | out: |
477 | close(); | |
478 | return err; | |
7c673cae FG |
479 | } |
480 | ||
481 | void FileJournal::_close(int fd) const | |
482 | { | |
483 | VOID_TEMP_FAILURE_RETRY(::close(fd)); | |
484 | } | |
485 | ||
486 | void FileJournal::close() | |
487 | { | |
488 | dout(1) << "close " << fn << dendl; | |
489 | ||
490 | // stop writer thread | |
491 | stop_writer(); | |
492 | ||
493 | // close | |
11fdf7f2 TL |
494 | ceph_assert(writeq_empty()); |
495 | ceph_assert(!must_write_header); | |
496 | ceph_assert(fd >= 0); | |
7c673cae FG |
497 | _close(fd); |
498 | fd = -1; | |
499 | } | |
500 | ||
501 | ||
502 | int FileJournal::dump(ostream& out) | |
503 | { | |
504 | return _dump(out, false); | |
505 | } | |
506 | ||
507 | int FileJournal::simple_dump(ostream& out) | |
508 | { | |
509 | return _dump(out, true); | |
510 | } | |
511 | ||
512 | int FileJournal::_dump(ostream& out, bool simple) | |
513 | { | |
514 | JSONFormatter f(true); | |
515 | int ret = _fdump(f, simple); | |
516 | f.flush(out); | |
517 | return ret; | |
518 | } | |
519 | ||
520 | int FileJournal::_fdump(Formatter &f, bool simple) | |
521 | { | |
522 | dout(10) << "_fdump" << dendl; | |
523 | ||
11fdf7f2 | 524 | ceph_assert(fd == -1); |
7c673cae FG |
525 | int err = _open(false, false); |
526 | if (err) | |
527 | return err; | |
528 | ||
529 | err = read_header(&header); | |
530 | if (err < 0) { | |
531 | close(); | |
532 | return err; | |
533 | } | |
534 | ||
535 | off64_t next_pos = header.start; | |
536 | ||
537 | f.open_object_section("journal"); | |
538 | ||
539 | f.open_object_section("header"); | |
540 | f.dump_unsigned("flags", header.flags); | |
541 | ostringstream os; | |
542 | os << header.fsid; | |
543 | f.dump_string("fsid", os.str()); | |
544 | f.dump_unsigned("block_size", header.block_size); | |
545 | f.dump_unsigned("alignment", header.alignment); | |
546 | f.dump_int("max_size", header.max_size); | |
547 | f.dump_int("start", header.start); | |
548 | f.dump_unsigned("committed_up_to", header.committed_up_to); | |
549 | f.dump_unsigned("start_seq", header.start_seq); | |
550 | f.close_section(); | |
551 | ||
552 | f.open_array_section("entries"); | |
553 | uint64_t seq = header.start_seq; | |
554 | while (1) { | |
555 | bufferlist bl; | |
556 | off64_t pos = next_pos; | |
557 | ||
558 | if (!pos) { | |
559 | dout(2) << "_dump -- not readable" << dendl; | |
560 | err = -EINVAL; | |
561 | break; | |
562 | } | |
563 | stringstream ss; | |
564 | read_entry_result result = do_read_entry( | |
565 | pos, | |
566 | &next_pos, | |
567 | &bl, | |
568 | &seq, | |
569 | &ss); | |
570 | if (result != SUCCESS) { | |
571 | if (seq < header.committed_up_to) { | |
572 | dout(2) << "Unable to read past sequence " << seq | |
573 | << " but header indicates the journal has committed up through " | |
574 | << header.committed_up_to << ", journal is corrupt" << dendl; | |
575 | err = -EINVAL; | |
576 | } | |
577 | dout(25) << ss.str() << dendl; | |
578 | dout(25) << "No further valid entries found, journal is most likely valid" | |
579 | << dendl; | |
580 | break; | |
581 | } | |
582 | ||
583 | f.open_object_section("entry"); | |
584 | f.dump_unsigned("offset", pos); | |
585 | f.dump_unsigned("seq", seq); | |
586 | if (simple) { | |
587 | f.dump_unsigned("bl.length", bl.length()); | |
588 | } else { | |
589 | f.open_array_section("transactions"); | |
11fdf7f2 | 590 | auto p = bl.cbegin(); |
7c673cae FG |
591 | int trans_num = 0; |
592 | while (!p.end()) { | |
593 | ObjectStore::Transaction t(p); | |
594 | f.open_object_section("transaction"); | |
595 | f.dump_unsigned("trans_num", trans_num); | |
596 | t.dump(&f); | |
597 | f.close_section(); | |
598 | trans_num++; | |
599 | } | |
600 | f.close_section(); | |
601 | } | |
602 | f.close_section(); | |
603 | } | |
604 | ||
605 | f.close_section(); | |
606 | f.close_section(); | |
607 | dout(10) << "dump finish" << dendl; | |
608 | ||
609 | close(); | |
610 | return err; | |
611 | } | |
612 | ||
613 | ||
614 | void FileJournal::start_writer() | |
615 | { | |
616 | write_stop = false; | |
617 | aio_stop = false; | |
618 | write_thread.create("journal_write"); | |
619 | #ifdef HAVE_LIBAIO | |
620 | if (aio) | |
621 | write_finish_thread.create("journal_wrt_fin"); | |
622 | #endif | |
623 | } | |
624 | ||
625 | void FileJournal::stop_writer() | |
626 | { | |
627 | // Do nothing if writer already stopped or never started | |
628 | if (!write_stop) | |
629 | { | |
630 | { | |
9f95a23c TL |
631 | std::lock_guard l{write_lock}; |
632 | std::lock_guard p{writeq_lock}; | |
7c673cae | 633 | write_stop = true; |
9f95a23c | 634 | writeq_cond.notify_all(); |
7c673cae FG |
635 | // Doesn't hurt to signal commit_cond in case thread is waiting there |
636 | // and caller didn't use committed_thru() first. | |
9f95a23c | 637 | commit_cond.notify_all(); |
7c673cae FG |
638 | } |
639 | write_thread.join(); | |
640 | ||
641 | // write journal header now so that we have less to replay on remount | |
642 | write_header_sync(); | |
643 | } | |
644 | ||
645 | #ifdef HAVE_LIBAIO | |
646 | // stop aio completeion thread *after* writer thread has stopped | |
647 | // and has submitted all of its io | |
648 | if (aio && !aio_stop) { | |
9f95a23c | 649 | aio_lock.lock(); |
7c673cae | 650 | aio_stop = true; |
9f95a23c TL |
651 | aio_cond.notify_all(); |
652 | write_finish_cond.notify_all(); | |
653 | aio_lock.unlock(); | |
7c673cae FG |
654 | write_finish_thread.join(); |
655 | } | |
656 | #endif | |
657 | } | |
658 | ||
659 | ||
660 | ||
661 | void FileJournal::print_header(const header_t &header) const | |
662 | { | |
663 | dout(10) << "header: block_size " << header.block_size | |
664 | << " alignment " << header.alignment | |
665 | << " max_size " << header.max_size | |
666 | << dendl; | |
667 | dout(10) << "header: start " << header.start << dendl; | |
668 | dout(10) << " write_pos " << write_pos << dendl; | |
669 | } | |
670 | ||
671 | int FileJournal::read_header(header_t *hdr) const | |
672 | { | |
673 | dout(10) << "read_header" << dendl; | |
674 | bufferlist bl; | |
675 | ||
11fdf7f2 | 676 | buffer::ptr bp = buffer::create_small_page_aligned(block_size); |
7c673cae FG |
677 | char* bpdata = bp.c_str(); |
678 | int r = ::pread(fd, bpdata, bp.length(), 0); | |
679 | ||
680 | if (r < 0) { | |
681 | int err = errno; | |
682 | dout(0) << "read_header got " << cpp_strerror(err) << dendl; | |
683 | return -err; | |
684 | } | |
685 | ||
686 | // don't use bp.zero() here, because it also invalidates | |
687 | // crc cache (which is not yet populated anyway) | |
688 | if (bp.length() != (size_t)r) { | |
689 | // r will be always less or equal than bp.length | |
690 | bpdata += r; | |
691 | memset(bpdata, 0, bp.length() - r); | |
692 | } | |
693 | ||
694 | bl.push_back(std::move(bp)); | |
695 | ||
696 | try { | |
11fdf7f2 TL |
697 | auto p = bl.cbegin(); |
698 | decode(*hdr, p); | |
7c673cae FG |
699 | } |
700 | catch (buffer::error& e) { | |
701 | derr << "read_header error decoding journal header" << dendl; | |
702 | return -EINVAL; | |
703 | } | |
704 | ||
705 | ||
706 | /* | |
707 | * Unfortunately we weren't initializing the flags field for new | |
708 | * journals! Aie. This is safe(ish) now that we have only one | |
709 | * flag. Probably around when we add the next flag we need to | |
710 | * remove this or else this (eventually old) code will clobber newer | |
711 | * code's flags. | |
712 | */ | |
713 | if (hdr->flags > 3) { | |
714 | derr << "read_header appears to have gibberish flags; assuming 0" << dendl; | |
715 | hdr->flags = 0; | |
716 | } | |
717 | ||
718 | print_header(*hdr); | |
719 | ||
720 | return 0; | |
721 | } | |
722 | ||
723 | bufferptr FileJournal::prepare_header() | |
724 | { | |
725 | bufferlist bl; | |
726 | { | |
9f95a23c | 727 | std::lock_guard l{finisher_lock}; |
7c673cae FG |
728 | header.committed_up_to = journaled_seq; |
729 | } | |
11fdf7f2 TL |
730 | encode(header, bl); |
731 | bufferptr bp = buffer::create_small_page_aligned(get_top()); | |
7c673cae FG |
732 | // don't use bp.zero() here, because it also invalidates |
733 | // crc cache (which is not yet populated anyway) | |
734 | char* data = bp.c_str(); | |
735 | memcpy(data, bl.c_str(), bl.length()); | |
736 | data += bl.length(); | |
737 | memset(data, 0, bp.length()-bl.length()); | |
738 | return bp; | |
739 | } | |
740 | ||
741 | void FileJournal::write_header_sync() | |
742 | { | |
9f95a23c | 743 | std::lock_guard locker{write_lock}; |
7c673cae FG |
744 | must_write_header = true; |
745 | bufferlist bl; | |
746 | do_write(bl); | |
747 | dout(20) << __func__ << " finish" << dendl; | |
748 | } | |
749 | ||
750 | int FileJournal::check_for_full(uint64_t seq, off64_t pos, off64_t size) | |
751 | { | |
752 | // already full? | |
753 | if (full_state != FULL_NOTFULL) | |
754 | return -ENOSPC; | |
755 | ||
756 | // take 1 byte off so that we only get pos == header.start on EMPTY, never on FULL. | |
757 | off64_t room; | |
758 | if (pos >= header.start) | |
759 | room = (header.max_size - pos) + (header.start - get_top()) - 1; | |
760 | else | |
761 | room = header.start - pos - 1; | |
762 | dout(10) << "room " << room << " max_size " << max_size << " pos " << pos << " header.start " << header.start | |
763 | << " top " << get_top() << dendl; | |
764 | ||
765 | if (do_sync_cond) { | |
766 | if (room >= (header.max_size >> 1) && | |
767 | room - size < (header.max_size >> 1)) { | |
768 | dout(10) << " passing half full mark, triggering commit" << dendl; | |
9f95a23c TL |
769 | #ifdef CEPH_DEBUG_MUTEX |
770 | do_sync_cond->notify_all(true); // initiate a real commit so we can trim | |
771 | #else | |
772 | do_sync_cond->notify_all(); | |
773 | #endif | |
7c673cae FG |
774 | } |
775 | } | |
776 | ||
777 | if (room >= size) { | |
778 | dout(10) << "check_for_full at " << pos << " : " << size << " < " << room << dendl; | |
779 | if (pos + size > header.max_size) | |
780 | must_write_header = true; | |
781 | return 0; | |
782 | } | |
783 | ||
784 | // full | |
785 | dout(1) << "check_for_full at " << pos << " : JOURNAL FULL " | |
786 | << pos << " >= " << room | |
787 | << " (max_size " << header.max_size << " start " << header.start << ")" | |
788 | << dendl; | |
789 | ||
790 | off64_t max = header.max_size - get_top(); | |
791 | if (size > max) | |
792 | dout(0) << "JOURNAL TOO SMALL: continuing, but slow: item " << size << " > journal " << max << " (usable)" << dendl; | |
793 | ||
794 | return -ENOSPC; | |
795 | } | |
796 | ||
797 | int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytes) | |
798 | { | |
799 | // gather queued writes | |
800 | off64_t queue_pos = write_pos; | |
801 | ||
802 | int eleft = cct->_conf->journal_max_write_entries; | |
803 | unsigned bmax = cct->_conf->journal_max_write_bytes; | |
804 | ||
805 | if (full_state != FULL_NOTFULL) | |
806 | return -ENOSPC; | |
807 | ||
808 | while (!writeq_empty()) { | |
809 | list<write_item> items; | |
810 | batch_pop_write(items); | |
811 | list<write_item>::iterator it = items.begin(); | |
812 | while (it != items.end()) { | |
813 | uint64_t bytes = it->bl.length(); | |
814 | int r = prepare_single_write(*it, bl, queue_pos, orig_ops, orig_bytes); | |
815 | if (r == 0) { // prepare ok, delete it | |
816 | items.erase(it++); | |
817 | #ifdef HAVE_LIBAIO | |
818 | { | |
9f95a23c | 819 | std::lock_guard locker{aio_lock}; |
11fdf7f2 | 820 | ceph_assert(aio_write_queue_ops > 0); |
7c673cae | 821 | aio_write_queue_ops--; |
11fdf7f2 | 822 | ceph_assert(aio_write_queue_bytes >= bytes); |
7c673cae FG |
823 | aio_write_queue_bytes -= bytes; |
824 | } | |
825 | #else | |
826 | (void)bytes; | |
827 | #endif | |
828 | } | |
829 | if (r == -ENOSPC) { | |
830 | // the journal maybe full, insert the left item to writeq | |
831 | batch_unpop_write(items); | |
832 | if (orig_ops) | |
833 | goto out; // commit what we have | |
834 | ||
835 | if (logger) | |
836 | logger->inc(l_filestore_journal_full); | |
837 | ||
838 | if (wait_on_full) { | |
839 | dout(20) << "prepare_multi_write full on first entry, need to wait" << dendl; | |
840 | } else { | |
841 | dout(20) << "prepare_multi_write full on first entry, restarting journal" << dendl; | |
842 | ||
843 | // throw out what we have so far | |
844 | full_state = FULL_FULL; | |
845 | while (!writeq_empty()) { | |
846 | complete_write(1, peek_write().orig_len); | |
847 | pop_write(); | |
848 | } | |
849 | print_header(header); | |
850 | } | |
224ce89b | 851 | |
7c673cae FG |
852 | return -ENOSPC; // hrm, full on first op |
853 | } | |
854 | if (eleft) { | |
855 | if (--eleft == 0) { | |
856 | dout(20) << "prepare_multi_write hit max events per write " | |
857 | << cct->_conf->journal_max_write_entries << dendl; | |
858 | batch_unpop_write(items); | |
859 | goto out; | |
860 | } | |
861 | } | |
862 | if (bmax) { | |
863 | if (bl.length() >= bmax) { | |
864 | dout(20) << "prepare_multi_write hit max write size " | |
865 | << cct->_conf->journal_max_write_bytes << dendl; | |
866 | batch_unpop_write(items); | |
867 | goto out; | |
868 | } | |
869 | } | |
870 | } | |
871 | } | |
872 | ||
873 | out: | |
874 | dout(20) << "prepare_multi_write queue_pos now " << queue_pos << dendl; | |
11fdf7f2 | 875 | ceph_assert((write_pos + bl.length() == queue_pos) || |
7c673cae FG |
876 | (write_pos + bl.length() - header.max_size + get_top() == queue_pos)); |
877 | return 0; | |
878 | } | |
879 | ||
880 | /* | |
881 | void FileJournal::queue_write_fin(uint64_t seq, Context *fin) | |
882 | { | |
883 | writing_seq.push_back(seq); | |
884 | if (!waiting_for_notfull.empty()) { | |
885 | // make sure previously unjournaled stuff waiting for UNFULL triggers | |
886 | // _before_ newly journaled stuff does | |
887 | dout(10) << "queue_write_fin will defer seq " << seq << " callback " << fin | |
888 | << " until after UNFULL" << dendl; | |
889 | C_Gather *g = new C_Gather(writeq.front().fin); | |
890 | writing_fin.push_back(g->new_sub()); | |
891 | waiting_for_notfull.push_back(g->new_sub()); | |
892 | } else { | |
893 | writing_fin.push_back(writeq.front().fin); | |
894 | dout(20) << "queue_write_fin seq " << seq << " callback " << fin << dendl; | |
895 | } | |
896 | } | |
897 | */ | |
898 | ||
899 | void FileJournal::queue_completions_thru(uint64_t seq) | |
900 | { | |
9f95a23c | 901 | ceph_assert(ceph_mutex_is_locked(finisher_lock)); |
7c673cae FG |
902 | utime_t now = ceph_clock_now(); |
903 | list<completion_item> items; | |
904 | batch_pop_completions(items); | |
905 | list<completion_item>::iterator it = items.begin(); | |
906 | while (it != items.end()) { | |
907 | completion_item& next = *it; | |
908 | if (next.seq > seq) | |
909 | break; | |
910 | utime_t lat = now; | |
911 | lat -= next.start; | |
912 | dout(10) << "queue_completions_thru seq " << seq | |
913 | << " queueing seq " << next.seq | |
914 | << " " << next.finish | |
915 | << " lat " << lat << dendl; | |
916 | if (logger) { | |
917 | logger->tinc(l_filestore_journal_latency, lat); | |
918 | } | |
919 | if (next.finish) | |
920 | finisher->queue(next.finish); | |
921 | if (next.tracked_op) { | |
922 | next.tracked_op->mark_event("journaled_completion_queued"); | |
923 | next.tracked_op->journal_trace.event("queued completion"); | |
924 | next.tracked_op->journal_trace.keyval("completed through", seq); | |
925 | } | |
926 | items.erase(it++); | |
927 | } | |
928 | batch_unpop_completions(items); | |
9f95a23c | 929 | finisher_cond.notify_all(); |
7c673cae FG |
930 | } |
931 | ||
932 | ||
933 | int FileJournal::prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes) | |
934 | { | |
935 | uint64_t seq = next_write.seq; | |
936 | bufferlist &ebl = next_write.bl; | |
937 | off64_t size = ebl.length(); | |
938 | ||
939 | int r = check_for_full(seq, queue_pos, size); | |
940 | if (r < 0) | |
941 | return r; // ENOSPC or EAGAIN | |
942 | ||
943 | uint32_t orig_len = next_write.orig_len; | |
944 | orig_bytes += orig_len; | |
945 | orig_ops++; | |
946 | ||
947 | // add to write buffer | |
948 | dout(15) << "prepare_single_write " << orig_ops << " will write " << queue_pos << " : seq " << seq | |
949 | << " len " << orig_len << " -> " << size << dendl; | |
950 | ||
951 | unsigned seq_offset = offsetof(entry_header_t, seq); | |
952 | unsigned magic1_offset = offsetof(entry_header_t, magic1); | |
953 | unsigned magic2_offset = offsetof(entry_header_t, magic2); | |
954 | ||
955 | bufferptr headerptr = ebl.buffers().front(); | |
956 | uint64_t _seq = seq; | |
957 | uint64_t _queue_pos = queue_pos; | |
958 | uint64_t magic2 = entry_header_t::make_magic(seq, orig_len, header.get_fsid64()); | |
959 | headerptr.copy_in(seq_offset, sizeof(uint64_t), (char *)&_seq); | |
960 | headerptr.copy_in(magic1_offset, sizeof(uint64_t), (char *)&_queue_pos); | |
961 | headerptr.copy_in(magic2_offset, sizeof(uint64_t), (char *)&magic2); | |
962 | ||
963 | bufferptr footerptr = ebl.buffers().back(); | |
964 | unsigned post_offset = footerptr.length() - sizeof(entry_header_t); | |
965 | footerptr.copy_in(post_offset + seq_offset, sizeof(uint64_t), (char *)&_seq); | |
966 | footerptr.copy_in(post_offset + magic1_offset, sizeof(uint64_t), (char *)&_queue_pos); | |
967 | footerptr.copy_in(post_offset + magic2_offset, sizeof(uint64_t), (char *)&magic2); | |
968 | ||
969 | bl.claim_append(ebl); | |
970 | if (next_write.tracked_op) { | |
971 | next_write.tracked_op->mark_event("write_thread_in_journal_buffer"); | |
972 | next_write.tracked_op->journal_trace.event("prepare_single_write"); | |
973 | } | |
974 | ||
975 | journalq.push_back(pair<uint64_t,off64_t>(seq, queue_pos)); | |
976 | writing_seq = seq; | |
977 | ||
978 | queue_pos += size; | |
979 | if (queue_pos >= header.max_size) | |
980 | queue_pos = queue_pos + get_top() - header.max_size; | |
981 | ||
982 | return 0; | |
983 | } | |
984 | ||
985 | void FileJournal::check_align(off64_t pos, bufferlist& bl) | |
986 | { | |
987 | // make sure list segments are page aligned | |
988 | if (directio && !bl.is_aligned_size_and_memory(block_size, CEPH_DIRECTIO_ALIGNMENT)) { | |
11fdf7f2 TL |
989 | ceph_assert((bl.length() & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0); |
990 | ceph_assert((pos & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0); | |
991 | ceph_abort_msg("bl was not aligned"); | |
7c673cae FG |
992 | } |
993 | } | |
994 | ||
995 | int FileJournal::write_bl(off64_t& pos, bufferlist& bl) | |
996 | { | |
997 | int ret; | |
998 | ||
999 | off64_t spos = ::lseek64(fd, pos, SEEK_SET); | |
1000 | if (spos < 0) { | |
1001 | ret = -errno; | |
1002 | derr << "FileJournal::write_bl : lseek64 failed " << cpp_strerror(ret) << dendl; | |
1003 | return ret; | |
1004 | } | |
1005 | ret = bl.write_fd(fd); | |
1006 | if (ret) { | |
1007 | derr << "FileJournal::write_bl : write_fd failed: " << cpp_strerror(ret) << dendl; | |
1008 | return ret; | |
1009 | } | |
1010 | pos += bl.length(); | |
1011 | if (pos == header.max_size) | |
1012 | pos = get_top(); | |
1013 | return 0; | |
1014 | } | |
1015 | ||
1016 | void FileJournal::do_write(bufferlist& bl) | |
1017 | { | |
1018 | // nothing to do? | |
1019 | if (bl.length() == 0 && !must_write_header) | |
1020 | return; | |
1021 | ||
1022 | buffer::ptr hbp; | |
1023 | if (cct->_conf->journal_write_header_frequency && | |
1024 | (((++journaled_since_start) % | |
1025 | cct->_conf->journal_write_header_frequency) == 0)) { | |
1026 | must_write_header = true; | |
1027 | } | |
1028 | ||
1029 | if (must_write_header) { | |
1030 | must_write_header = false; | |
1031 | hbp = prepare_header(); | |
1032 | } | |
1033 | ||
1034 | dout(15) << "do_write writing " << write_pos << "~" << bl.length() | |
1035 | << (hbp.length() ? " + header":"") | |
1036 | << dendl; | |
1037 | ||
1038 | utime_t from = ceph_clock_now(); | |
1039 | ||
1040 | // entry | |
1041 | off64_t pos = write_pos; | |
1042 | ||
1043 | // Adjust write_pos | |
1044 | write_pos += bl.length(); | |
1045 | if (write_pos >= header.max_size) | |
1046 | write_pos = write_pos - header.max_size + get_top(); | |
1047 | ||
9f95a23c | 1048 | write_lock.unlock(); |
7c673cae FG |
1049 | |
1050 | // split? | |
1051 | off64_t split = 0; | |
1052 | if (pos + bl.length() > header.max_size) { | |
1053 | bufferlist first, second; | |
1054 | split = header.max_size - pos; | |
1055 | first.substr_of(bl, 0, split); | |
1056 | second.substr_of(bl, split, bl.length() - split); | |
11fdf7f2 | 1057 | ceph_assert(first.length() + second.length() == bl.length()); |
7c673cae FG |
1058 | dout(10) << "do_write wrapping, first bit at " << pos << " len " << first.length() |
1059 | << " second bit len " << second.length() << " (orig len " << bl.length() << ")" << dendl; | |
1060 | ||
1061 | //Save pos to write first piece second | |
1062 | off64_t first_pos = pos; | |
1063 | off64_t orig_pos; | |
1064 | pos = get_top(); | |
1065 | // header too? | |
1066 | if (hbp.length()) { | |
1067 | // be sneaky: include the header in the second fragment | |
11fdf7f2 TL |
1068 | bufferlist tmp; |
1069 | tmp.push_back(hbp); | |
1070 | tmp.claim_append(second); | |
1071 | second.swap(tmp); | |
7c673cae FG |
1072 | pos = 0; // we included the header |
1073 | } | |
1074 | // Write the second portion first possible with the header, so | |
1075 | // do_read_entry() won't even get a valid entry_header_t if there | |
1076 | // is a crash between the two writes. | |
1077 | orig_pos = pos; | |
1078 | if (write_bl(pos, second)) { | |
1079 | derr << "FileJournal::do_write: write_bl(pos=" << orig_pos | |
1080 | << ") failed" << dendl; | |
1081 | check_align(pos, second); | |
1082 | ceph_abort(); | |
1083 | } | |
1084 | orig_pos = first_pos; | |
1085 | if (write_bl(first_pos, first)) { | |
1086 | derr << "FileJournal::do_write: write_bl(pos=" << orig_pos | |
1087 | << ") failed" << dendl; | |
1088 | check_align(first_pos, first); | |
1089 | ceph_abort(); | |
1090 | } | |
11fdf7f2 | 1091 | ceph_assert(first_pos == get_top()); |
7c673cae FG |
1092 | } else { |
1093 | // header too? | |
1094 | if (hbp.length()) { | |
1095 | if (TEMP_FAILURE_RETRY(::pwrite(fd, hbp.c_str(), hbp.length(), 0)) < 0) { | |
1096 | int err = errno; | |
1097 | derr << "FileJournal::do_write: pwrite(fd=" << fd | |
1098 | << ", hbp.length=" << hbp.length() << ") failed :" | |
1099 | << cpp_strerror(err) << dendl; | |
1100 | ceph_abort(); | |
1101 | } | |
1102 | } | |
1103 | ||
1104 | if (write_bl(pos, bl)) { | |
1105 | derr << "FileJournal::do_write: write_bl(pos=" << pos | |
1106 | << ") failed" << dendl; | |
1107 | check_align(pos, bl); | |
1108 | ceph_abort(); | |
1109 | } | |
1110 | } | |
1111 | ||
1112 | if (!directio) { | |
1113 | dout(20) << "do_write fsync" << dendl; | |
1114 | ||
1115 | /* | |
1116 | * We'd really love to have a fsync_range or fdatasync_range and do a: | |
1117 | * | |
1118 | * if (split) { | |
1119 | * ::fsync_range(fd, header.max_size - split, split)l | |
1120 | * ::fsync_range(fd, get_top(), bl.length() - split); | |
1121 | * else | |
1122 | * ::fsync_range(fd, write_pos, bl.length()) | |
1123 | * | |
1124 | * NetBSD and AIX apparently have it, and adding it to Linux wouldn't be | |
1125 | * too hard given all the underlying infrastructure already exist. | |
1126 | * | |
1127 | * NOTE: using sync_file_range here would not be safe as it does not | |
1128 | * flush disk caches or commits any sort of metadata. | |
1129 | */ | |
1130 | int ret = 0; | |
11fdf7f2 | 1131 | #if defined(__APPLE__) || defined(__FreeBSD__) |
7c673cae FG |
1132 | ret = ::fsync(fd); |
1133 | #else | |
1134 | ret = ::fdatasync(fd); | |
1135 | #endif | |
1136 | if (ret < 0) { | |
1137 | derr << __func__ << " fsync/fdatasync failed: " << cpp_strerror(errno) << dendl; | |
1138 | ceph_abort(); | |
1139 | } | |
1140 | #ifdef HAVE_POSIX_FADVISE | |
1141 | if (cct->_conf->filestore_fadvise) | |
1142 | posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED); | |
1143 | #endif | |
1144 | } | |
1145 | ||
1146 | utime_t lat = ceph_clock_now() - from; | |
1147 | dout(20) << "do_write latency " << lat << dendl; | |
1148 | ||
9f95a23c | 1149 | write_lock.lock(); |
7c673cae | 1150 | |
11fdf7f2 TL |
1151 | ceph_assert(write_pos == pos); |
1152 | ceph_assert(write_pos % header.alignment == 0); | |
7c673cae FG |
1153 | |
1154 | { | |
9f95a23c | 1155 | std::lock_guard locker{finisher_lock}; |
7c673cae FG |
1156 | journaled_seq = writing_seq; |
1157 | ||
1158 | // kick finisher? | |
1159 | // only if we haven't filled up recently! | |
1160 | if (full_state != FULL_NOTFULL) { | |
1161 | dout(10) << "do_write NOT queueing finisher seq " << journaled_seq | |
1162 | << ", full_commit_seq|full_restart_seq" << dendl; | |
1163 | } else { | |
1164 | if (plug_journal_completions) { | |
1165 | dout(20) << "do_write NOT queueing finishers through seq " << journaled_seq | |
1166 | << " due to completion plug" << dendl; | |
1167 | } else { | |
1168 | dout(20) << "do_write queueing finishers through seq " << journaled_seq << dendl; | |
1169 | queue_completions_thru(journaled_seq); | |
1170 | } | |
1171 | } | |
1172 | } | |
1173 | } | |
1174 | ||
1175 | void FileJournal::flush() | |
1176 | { | |
1177 | dout(10) << "waiting for completions to empty" << dendl; | |
1178 | { | |
9f95a23c TL |
1179 | std::unique_lock l{finisher_lock}; |
1180 | finisher_cond.wait(l, [this] { return completions_empty(); }); | |
7c673cae FG |
1181 | } |
1182 | dout(10) << "flush waiting for finisher" << dendl; | |
1183 | finisher->wait_for_empty(); | |
1184 | dout(10) << "flush done" << dendl; | |
1185 | } | |
1186 | ||
1187 | ||
1188 | void FileJournal::write_thread_entry() | |
1189 | { | |
1190 | dout(10) << "write_thread_entry start" << dendl; | |
1191 | while (1) { | |
1192 | { | |
9f95a23c | 1193 | std::unique_lock locker{writeq_lock}; |
7c673cae FG |
1194 | if (writeq.empty() && !must_write_header) { |
1195 | if (write_stop) | |
1196 | break; | |
1197 | dout(20) << "write_thread_entry going to sleep" << dendl; | |
9f95a23c | 1198 | writeq_cond.wait(locker); |
7c673cae FG |
1199 | dout(20) << "write_thread_entry woke up" << dendl; |
1200 | continue; | |
1201 | } | |
1202 | } | |
1203 | ||
1204 | #ifdef HAVE_LIBAIO | |
1205 | if (aio) { | |
9f95a23c | 1206 | std::unique_lock locker{aio_lock}; |
7c673cae FG |
1207 | // should we back off to limit aios in flight? try to do this |
1208 | // adaptively so that we submit larger aios once we have lots of | |
1209 | // them in flight. | |
1210 | // | |
1211 | // NOTE: our condition here is based on aio_num (protected by | |
1212 | // aio_lock) and throttle_bytes (part of the write queue). when | |
1213 | // we sleep, we *only* wait for aio_num to change, and do not | |
1214 | // wake when more data is queued. this is not strictly correct, | |
1215 | // but should be fine given that we will have plenty of aios in | |
1216 | // flight if we hit this limit to ensure we keep the device | |
1217 | // saturated. | |
1218 | while (aio_num > 0) { | |
11fdf7f2 | 1219 | int exp = std::min<int>(aio_num * 2, 24); |
7c673cae FG |
1220 | long unsigned min_new = 1ull << exp; |
1221 | uint64_t cur = aio_write_queue_bytes; | |
1222 | dout(20) << "write_thread_entry aio throttle: aio num " << aio_num << " bytes " << aio_bytes | |
1223 | << " ... exp " << exp << " min_new " << min_new | |
1224 | << " ... pending " << cur << dendl; | |
1225 | if (cur >= min_new) | |
1226 | break; | |
1227 | dout(20) << "write_thread_entry deferring until more aios complete: " | |
1228 | << aio_num << " aios with " << aio_bytes << " bytes needs " << min_new | |
1229 | << " bytes to start a new aio (currently " << cur << " pending)" << dendl; | |
9f95a23c | 1230 | aio_cond.wait(locker); |
7c673cae FG |
1231 | dout(20) << "write_thread_entry woke up" << dendl; |
1232 | } | |
1233 | } | |
1234 | #endif | |
1235 | ||
9f95a23c | 1236 | std::unique_lock locker{write_lock}; |
7c673cae FG |
1237 | uint64_t orig_ops = 0; |
1238 | uint64_t orig_bytes = 0; | |
1239 | ||
1240 | bufferlist bl; | |
1241 | int r = prepare_multi_write(bl, orig_ops, orig_bytes); | |
1242 | // Don't care about journal full if stoppping, so drop queue and | |
1243 | // possibly let header get written and loop above to notice stop | |
1244 | if (r == -ENOSPC) { | |
1245 | if (write_stop) { | |
1246 | dout(20) << "write_thread_entry full and stopping, throw out queue and finish up" << dendl; | |
1247 | while (!writeq_empty()) { | |
1248 | complete_write(1, peek_write().orig_len); | |
1249 | pop_write(); | |
1250 | } | |
1251 | print_header(header); | |
1252 | r = 0; | |
1253 | } else { | |
1254 | dout(20) << "write_thread_entry full, going to sleep (waiting for commit)" << dendl; | |
9f95a23c | 1255 | commit_cond.wait(locker); |
7c673cae FG |
1256 | dout(20) << "write_thread_entry woke up" << dendl; |
1257 | continue; | |
1258 | } | |
1259 | } | |
11fdf7f2 | 1260 | ceph_assert(r == 0); |
7c673cae FG |
1261 | |
1262 | if (logger) { | |
1263 | logger->inc(l_filestore_journal_wr); | |
1264 | logger->inc(l_filestore_journal_wr_bytes, bl.length()); | |
1265 | } | |
1266 | ||
1267 | #ifdef HAVE_LIBAIO | |
1268 | if (aio) | |
1269 | do_aio_write(bl); | |
1270 | else | |
1271 | do_write(bl); | |
1272 | #else | |
1273 | do_write(bl); | |
1274 | #endif | |
1275 | complete_write(orig_ops, orig_bytes); | |
1276 | } | |
1277 | ||
1278 | dout(10) << "write_thread_entry finish" << dendl; | |
1279 | } | |
1280 | ||
1281 | #ifdef HAVE_LIBAIO | |
1282 | void FileJournal::do_aio_write(bufferlist& bl) | |
1283 | { | |
1284 | ||
1285 | if (cct->_conf->journal_write_header_frequency && | |
1286 | (((++journaled_since_start) % | |
1287 | cct->_conf->journal_write_header_frequency) == 0)) { | |
1288 | must_write_header = true; | |
1289 | } | |
1290 | ||
1291 | // nothing to do? | |
1292 | if (bl.length() == 0 && !must_write_header) | |
1293 | return; | |
1294 | ||
1295 | buffer::ptr hbp; | |
1296 | if (must_write_header) { | |
1297 | must_write_header = false; | |
1298 | hbp = prepare_header(); | |
1299 | } | |
1300 | ||
1301 | // entry | |
1302 | off64_t pos = write_pos; | |
1303 | ||
1304 | dout(15) << "do_aio_write writing " << pos << "~" << bl.length() | |
1305 | << (hbp.length() ? " + header":"") | |
1306 | << dendl; | |
1307 | ||
1308 | // split? | |
1309 | off64_t split = 0; | |
1310 | if (pos + bl.length() > header.max_size) { | |
1311 | bufferlist first, second; | |
1312 | split = header.max_size - pos; | |
1313 | first.substr_of(bl, 0, split); | |
1314 | second.substr_of(bl, split, bl.length() - split); | |
11fdf7f2 | 1315 | ceph_assert(first.length() + second.length() == bl.length()); |
7c673cae FG |
1316 | dout(10) << "do_aio_write wrapping, first bit at " << pos << "~" << first.length() << dendl; |
1317 | ||
1318 | if (write_aio_bl(pos, first, 0)) { | |
1319 | derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos | |
1320 | << ") failed" << dendl; | |
1321 | ceph_abort(); | |
1322 | } | |
11fdf7f2 | 1323 | ceph_assert(pos == header.max_size); |
7c673cae FG |
1324 | if (hbp.length()) { |
1325 | // be sneaky: include the header in the second fragment | |
11fdf7f2 TL |
1326 | bufferlist tmp; |
1327 | tmp.push_back(hbp); | |
1328 | tmp.claim_append(second); | |
1329 | second.swap(tmp); | |
7c673cae FG |
1330 | pos = 0; // we included the header |
1331 | } else | |
1332 | pos = get_top(); // no header, start after that | |
1333 | if (write_aio_bl(pos, second, writing_seq)) { | |
1334 | derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos | |
1335 | << ") failed" << dendl; | |
1336 | ceph_abort(); | |
1337 | } | |
1338 | } else { | |
1339 | // header too? | |
1340 | if (hbp.length()) { | |
1341 | bufferlist hbl; | |
1342 | hbl.push_back(hbp); | |
1343 | loff_t pos = 0; | |
1344 | if (write_aio_bl(pos, hbl, 0)) { | |
1345 | derr << "FileJournal::do_aio_write: write_aio_bl(header) failed" << dendl; | |
1346 | ceph_abort(); | |
1347 | } | |
1348 | } | |
1349 | ||
1350 | if (write_aio_bl(pos, bl, writing_seq)) { | |
1351 | derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos | |
1352 | << ") failed" << dendl; | |
1353 | ceph_abort(); | |
1354 | } | |
1355 | } | |
1356 | ||
1357 | write_pos = pos; | |
1358 | if (write_pos == header.max_size) | |
1359 | write_pos = get_top(); | |
11fdf7f2 | 1360 | ceph_assert(write_pos % header.alignment == 0); |
7c673cae FG |
1361 | } |
1362 | ||
1363 | /** | |
1364 | * write a buffer using aio | |
1365 | * | |
1366 | * @param seq seq to trigger when this aio completes. if 0, do not update any state | |
1367 | * on completion. | |
1368 | */ | |
1369 | int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq) | |
1370 | { | |
1371 | dout(20) << "write_aio_bl " << pos << "~" << bl.length() << " seq " << seq << dendl; | |
1372 | ||
1373 | while (bl.length() > 0) { | |
11fdf7f2 | 1374 | int max = std::min<int>(bl.get_num_buffers(), IOV_MAX-1); |
7c673cae FG |
1375 | iovec *iov = new iovec[max]; |
1376 | int n = 0; | |
1377 | unsigned len = 0; | |
11fdf7f2 TL |
1378 | for (auto p = std::cbegin(bl.buffers()); n < max; ++p, ++n) { |
1379 | ceph_assert(p != std::cend(bl.buffers())); | |
1380 | iov[n].iov_base = const_cast<void*>(static_cast<const void*>(p->c_str())); | |
7c673cae FG |
1381 | iov[n].iov_len = p->length(); |
1382 | len += p->length(); | |
1383 | } | |
1384 | ||
1385 | bufferlist tbl; | |
1386 | bl.splice(0, len, &tbl); // move bytes from bl -> tbl | |
1387 | ||
1388 | // lock only aio_queue, current aio, aio_num, aio_bytes, which may be | |
1389 | // modified in check_aio_completion | |
9f95a23c | 1390 | aio_lock.lock(); |
7c673cae FG |
1391 | aio_queue.push_back(aio_info(tbl, pos, bl.length() > 0 ? 0 : seq)); |
1392 | aio_info& aio = aio_queue.back(); | |
1393 | aio.iov = iov; | |
1394 | ||
1395 | io_prep_pwritev(&aio.iocb, fd, aio.iov, n, pos); | |
1396 | ||
1397 | dout(20) << "write_aio_bl .. " << aio.off << "~" << aio.len | |
1398 | << " in " << n << dendl; | |
1399 | ||
1400 | aio_num++; | |
1401 | aio_bytes += aio.len; | |
1402 | ||
1403 | // need to save current aio len to update write_pos later because current | |
1404 | // aio could be ereased from aio_queue once it is done | |
1405 | uint64_t cur_len = aio.len; | |
1406 | // unlock aio_lock because following io_submit might take time to return | |
9f95a23c | 1407 | aio_lock.unlock(); |
7c673cae FG |
1408 | |
1409 | iocb *piocb = &aio.iocb; | |
224ce89b | 1410 | |
7c673cae FG |
1411 | // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds |
1412 | int attempts = 16; | |
1413 | int delay = 125; | |
1414 | do { | |
1415 | int r = io_submit(aio_ctx, 1, &piocb); | |
1416 | dout(20) << "write_aio_bl io_submit return value: " << r << dendl; | |
1417 | if (r < 0) { | |
1418 | derr << "io_submit to " << aio.off << "~" << cur_len | |
1419 | << " got " << cpp_strerror(r) << dendl; | |
1420 | if (r == -EAGAIN && attempts-- > 0) { | |
1421 | usleep(delay); | |
1422 | delay *= 2; | |
1423 | continue; | |
1424 | } | |
1425 | check_align(pos, tbl); | |
11fdf7f2 | 1426 | ceph_abort_msg("io_submit got unexpected error"); |
7c673cae FG |
1427 | } else { |
1428 | break; | |
1429 | } | |
1430 | } while (true); | |
1431 | pos += cur_len; | |
1432 | } | |
9f95a23c TL |
1433 | aio_lock.lock(); |
1434 | write_finish_cond.notify_all(); | |
1435 | aio_lock.unlock(); | |
7c673cae FG |
1436 | return 0; |
1437 | } | |
1438 | #endif | |
1439 | ||
1440 | void FileJournal::write_finish_thread_entry() | |
1441 | { | |
1442 | #ifdef HAVE_LIBAIO | |
b32b8144 | 1443 | dout(10) << __func__ << " enter" << dendl; |
7c673cae FG |
1444 | while (true) { |
1445 | { | |
9f95a23c | 1446 | std::unique_lock locker{aio_lock}; |
7c673cae FG |
1447 | if (aio_queue.empty()) { |
1448 | if (aio_stop) | |
1449 | break; | |
b32b8144 | 1450 | dout(20) << __func__ << " sleeping" << dendl; |
9f95a23c | 1451 | write_finish_cond.wait(locker); |
7c673cae FG |
1452 | continue; |
1453 | } | |
1454 | } | |
1455 | ||
b32b8144 | 1456 | dout(20) << __func__ << " waiting for aio(s)" << dendl; |
7c673cae FG |
1457 | io_event event[16]; |
1458 | int r = io_getevents(aio_ctx, 1, 16, event, NULL); | |
1459 | if (r < 0) { | |
1460 | if (r == -EINTR) { | |
1461 | dout(0) << "io_getevents got " << cpp_strerror(r) << dendl; | |
1462 | continue; | |
1463 | } | |
1464 | derr << "io_getevents got " << cpp_strerror(r) << dendl; | |
11fdf7f2 TL |
1465 | if (r == -EIO) { |
1466 | note_io_error_event(devname.c_str(), fn.c_str(), -EIO, 0, 0, 0); | |
1467 | } | |
1468 | ceph_abort_msg("got unexpected error from io_getevents"); | |
7c673cae FG |
1469 | } |
1470 | ||
1471 | { | |
9f95a23c | 1472 | std::lock_guard locker{aio_lock}; |
7c673cae FG |
1473 | for (int i=0; i<r; i++) { |
1474 | aio_info *ai = (aio_info *)event[i].obj; | |
1475 | if (event[i].res != ai->len) { | |
1476 | derr << "aio to " << ai->off << "~" << ai->len | |
1477 | << " returned: " << (int)event[i].res << dendl; | |
11fdf7f2 | 1478 | ceph_abort_msg("unexpected aio error"); |
7c673cae | 1479 | } |
b32b8144 | 1480 | dout(10) << __func__ << " aio " << ai->off |
7c673cae FG |
1481 | << "~" << ai->len << " done" << dendl; |
1482 | ai->done = true; | |
1483 | } | |
1484 | check_aio_completion(); | |
1485 | } | |
1486 | } | |
b32b8144 | 1487 | dout(10) << __func__ << " exit" << dendl; |
7c673cae FG |
1488 | #endif |
1489 | } | |
1490 | ||
1491 | #ifdef HAVE_LIBAIO | |
1492 | /** | |
1493 | * check aio_wait for completed aio, and update state appropriately. | |
1494 | */ | |
1495 | void FileJournal::check_aio_completion() | |
1496 | { | |
9f95a23c | 1497 | ceph_assert(ceph_mutex_is_locked(aio_lock)); |
7c673cae FG |
1498 | dout(20) << "check_aio_completion" << dendl; |
1499 | ||
1500 | bool completed_something = false, signal = false; | |
1501 | uint64_t new_journaled_seq = 0; | |
1502 | ||
1503 | list<aio_info>::iterator p = aio_queue.begin(); | |
1504 | while (p != aio_queue.end() && p->done) { | |
1505 | dout(20) << "check_aio_completion completed seq " << p->seq << " " | |
1506 | << p->off << "~" << p->len << dendl; | |
1507 | if (p->seq) { | |
1508 | new_journaled_seq = p->seq; | |
1509 | completed_something = true; | |
1510 | } | |
1511 | aio_num--; | |
1512 | aio_bytes -= p->len; | |
1513 | aio_queue.erase(p++); | |
1514 | signal = true; | |
1515 | } | |
1516 | ||
1517 | if (completed_something) { | |
1518 | // kick finisher? | |
1519 | // only if we haven't filled up recently! | |
9f95a23c | 1520 | std::lock_guard locker{finisher_lock}; |
7c673cae FG |
1521 | journaled_seq = new_journaled_seq; |
1522 | if (full_state != FULL_NOTFULL) { | |
1523 | dout(10) << "check_aio_completion NOT queueing finisher seq " << journaled_seq | |
1524 | << ", full_commit_seq|full_restart_seq" << dendl; | |
1525 | } else { | |
1526 | if (plug_journal_completions) { | |
1527 | dout(20) << "check_aio_completion NOT queueing finishers through seq " << journaled_seq | |
1528 | << " due to completion plug" << dendl; | |
1529 | } else { | |
1530 | dout(20) << "check_aio_completion queueing finishers through seq " << journaled_seq << dendl; | |
1531 | queue_completions_thru(journaled_seq); | |
1532 | } | |
1533 | } | |
1534 | } | |
1535 | if (signal) { | |
1536 | // maybe write queue was waiting for aio count to drop? | |
9f95a23c | 1537 | aio_cond.notify_all(); |
7c673cae FG |
1538 | } |
1539 | } | |
1540 | #endif | |
1541 | ||
1542 | int FileJournal::prepare_entry(vector<ObjectStore::Transaction>& tls, bufferlist* tbl) { | |
1543 | dout(10) << "prepare_entry " << tls << dendl; | |
1544 | int data_len = cct->_conf->journal_align_min_size - 1; | |
1545 | int data_align = -1; // -1 indicates that we don't care about the alignment | |
1546 | bufferlist bl; | |
1547 | for (vector<ObjectStore::Transaction>::iterator p = tls.begin(); | |
1548 | p != tls.end(); ++p) { | |
1549 | if ((int)(*p).get_data_length() > data_len) { | |
1550 | data_len = (*p).get_data_length(); | |
1551 | data_align = ((*p).get_data_alignment() - bl.length()) & ~CEPH_PAGE_MASK; | |
1552 | } | |
11fdf7f2 | 1553 | encode(*p, bl); |
7c673cae FG |
1554 | } |
1555 | if (tbl->length()) { | |
1556 | bl.claim_append(*tbl); | |
1557 | } | |
1558 | // add it this entry | |
1559 | entry_header_t h; | |
1560 | unsigned head_size = sizeof(entry_header_t); | |
1561 | off64_t base_size = 2*head_size + bl.length(); | |
1562 | memset(&h, 0, sizeof(h)); | |
1563 | if (data_align >= 0) | |
1564 | h.pre_pad = ((unsigned int)data_align - (unsigned int)head_size) & ~CEPH_PAGE_MASK; | |
11fdf7f2 | 1565 | off64_t size = round_up_to(base_size + h.pre_pad, header.alignment); |
7c673cae FG |
1566 | unsigned post_pad = size - base_size - h.pre_pad; |
1567 | h.len = bl.length(); | |
1568 | h.post_pad = post_pad; | |
1569 | h.crc32c = bl.crc32c(0); | |
1570 | dout(10) << " len " << bl.length() << " -> " << size | |
1571 | << " (head " << head_size << " pre_pad " << h.pre_pad | |
1572 | << " bl " << bl.length() << " post_pad " << post_pad << " tail " << head_size << ")" | |
1573 | << " (bl alignment " << data_align << ")" | |
1574 | << dendl; | |
1575 | bufferlist ebl; | |
1576 | // header | |
1577 | ebl.append((const char*)&h, sizeof(h)); | |
1578 | if (h.pre_pad) { | |
1579 | ebl.push_back(buffer::create_static(h.pre_pad, zero_buf)); | |
1580 | } | |
1581 | // payload | |
9f95a23c | 1582 | ebl.claim_append(bl); |
7c673cae FG |
1583 | if (h.post_pad) { |
1584 | ebl.push_back(buffer::create_static(h.post_pad, zero_buf)); | |
1585 | } | |
1586 | // footer | |
1587 | ebl.append((const char*)&h, sizeof(h)); | |
1588 | if (directio) | |
1589 | ebl.rebuild_aligned(CEPH_DIRECTIO_ALIGNMENT); | |
1590 | tbl->claim(ebl); | |
1591 | return h.len; | |
1592 | } | |
1593 | ||
1594 | void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len, | |
1595 | Context *oncommit, TrackedOpRef osd_op) | |
1596 | { | |
1597 | // dump on queue | |
1598 | dout(5) << "submit_entry seq " << seq | |
1599 | << " len " << e.length() | |
1600 | << " (" << oncommit << ")" << dendl; | |
11fdf7f2 TL |
1601 | ceph_assert(e.length() > 0); |
1602 | ceph_assert(e.length() < header.max_size); | |
7c673cae | 1603 | |
7c673cae FG |
1604 | if (logger) { |
1605 | logger->inc(l_filestore_journal_queue_bytes, orig_len); | |
1606 | logger->inc(l_filestore_journal_queue_ops, 1); | |
1607 | } | |
1608 | ||
1609 | throttle.register_throttle_seq(seq, e.length()); | |
1610 | if (logger) { | |
1611 | logger->inc(l_filestore_journal_ops, 1); | |
1612 | logger->inc(l_filestore_journal_bytes, e.length()); | |
1613 | } | |
1614 | ||
1615 | if (osd_op) { | |
1616 | osd_op->mark_event("commit_queued_for_journal_write"); | |
1617 | if (osd_op->store_trace) { | |
1618 | osd_op->journal_trace.init("journal", &trace_endpoint, &osd_op->store_trace); | |
1619 | osd_op->journal_trace.event("submit_entry"); | |
1620 | osd_op->journal_trace.keyval("seq", seq); | |
1621 | } | |
1622 | } | |
1623 | { | |
9f95a23c | 1624 | std::lock_guard l1{writeq_lock}; |
7c673cae | 1625 | #ifdef HAVE_LIBAIO |
9f95a23c | 1626 | std::lock_guard l2{aio_lock}; |
7c673cae | 1627 | #endif |
9f95a23c | 1628 | std::lock_guard l3{completions_lock}; |
7c673cae FG |
1629 | |
1630 | #ifdef HAVE_LIBAIO | |
1631 | aio_write_queue_ops++; | |
1632 | aio_write_queue_bytes += e.length(); | |
9f95a23c | 1633 | aio_cond.notify_all(); |
7c673cae FG |
1634 | #endif |
1635 | ||
1636 | completions.push_back( | |
1637 | completion_item( | |
1638 | seq, oncommit, ceph_clock_now(), osd_op)); | |
1639 | if (writeq.empty()) | |
9f95a23c | 1640 | writeq_cond.notify_all(); |
7c673cae FG |
1641 | writeq.push_back(write_item(seq, e, orig_len, osd_op)); |
1642 | if (osd_op) | |
1643 | osd_op->journal_trace.keyval("queue depth", writeq.size()); | |
1644 | } | |
1645 | } | |
1646 | ||
1647 | bool FileJournal::writeq_empty() | |
1648 | { | |
9f95a23c | 1649 | std::lock_guard locker{writeq_lock}; |
7c673cae FG |
1650 | return writeq.empty(); |
1651 | } | |
1652 | ||
1653 | FileJournal::write_item &FileJournal::peek_write() | |
1654 | { | |
9f95a23c TL |
1655 | ceph_assert(ceph_mutex_is_locked(write_lock)); |
1656 | std::lock_guard locker{writeq_lock}; | |
7c673cae FG |
1657 | return writeq.front(); |
1658 | } | |
1659 | ||
1660 | void FileJournal::pop_write() | |
1661 | { | |
9f95a23c TL |
1662 | ceph_assert(ceph_mutex_is_locked(write_lock)); |
1663 | std::lock_guard locker{writeq_lock}; | |
7c673cae FG |
1664 | if (logger) { |
1665 | logger->dec(l_filestore_journal_queue_bytes, writeq.front().orig_len); | |
1666 | logger->dec(l_filestore_journal_queue_ops, 1); | |
1667 | } | |
1668 | writeq.pop_front(); | |
1669 | } | |
1670 | ||
1671 | void FileJournal::batch_pop_write(list<write_item> &items) | |
1672 | { | |
9f95a23c | 1673 | ceph_assert(ceph_mutex_is_locked(write_lock)); |
7c673cae | 1674 | { |
9f95a23c | 1675 | std::lock_guard locker{writeq_lock}; |
7c673cae FG |
1676 | writeq.swap(items); |
1677 | } | |
1678 | for (auto &&i : items) { | |
1679 | if (logger) { | |
1680 | logger->dec(l_filestore_journal_queue_bytes, i.orig_len); | |
1681 | logger->dec(l_filestore_journal_queue_ops, 1); | |
1682 | } | |
1683 | } | |
1684 | } | |
1685 | ||
1686 | void FileJournal::batch_unpop_write(list<write_item> &items) | |
1687 | { | |
9f95a23c | 1688 | ceph_assert(ceph_mutex_is_locked(write_lock)); |
7c673cae FG |
1689 | for (auto &&i : items) { |
1690 | if (logger) { | |
1691 | logger->inc(l_filestore_journal_queue_bytes, i.orig_len); | |
1692 | logger->inc(l_filestore_journal_queue_ops, 1); | |
1693 | } | |
1694 | } | |
9f95a23c | 1695 | std::lock_guard locker{writeq_lock}; |
7c673cae FG |
1696 | writeq.splice(writeq.begin(), items); |
1697 | } | |
1698 | ||
1699 | void FileJournal::commit_start(uint64_t seq) | |
1700 | { | |
1701 | dout(10) << "commit_start" << dendl; | |
1702 | ||
1703 | // was full? | |
1704 | switch (full_state) { | |
1705 | case FULL_NOTFULL: | |
1706 | break; // all good | |
1707 | ||
1708 | case FULL_FULL: | |
1709 | if (seq >= journaled_seq) { | |
1710 | dout(1) << " FULL_FULL -> FULL_WAIT. commit_start on seq " | |
1711 | << seq << " > journaled_seq " << journaled_seq | |
1712 | << ", moving to FULL_WAIT." | |
1713 | << dendl; | |
1714 | full_state = FULL_WAIT; | |
1715 | } else { | |
1716 | dout(1) << "FULL_FULL commit_start on seq " | |
1717 | << seq << " < journaled_seq " << journaled_seq | |
1718 | << ", remaining in FULL_FULL" | |
1719 | << dendl; | |
1720 | } | |
1721 | break; | |
1722 | ||
1723 | case FULL_WAIT: | |
1724 | dout(1) << " FULL_WAIT -> FULL_NOTFULL. journal now active, setting completion plug." << dendl; | |
1725 | full_state = FULL_NOTFULL; | |
1726 | plug_journal_completions = true; | |
1727 | break; | |
1728 | } | |
1729 | } | |
1730 | ||
1731 | /* | |
1732 | *send discard command to joural block deivce | |
1733 | */ | |
1734 | void FileJournal::do_discard(int64_t offset, int64_t end) | |
1735 | { | |
11fdf7f2 | 1736 | dout(10) << __func__ << " trim(" << offset << ", " << end << dendl; |
7c673cae | 1737 | |
11fdf7f2 | 1738 | offset = round_up_to(offset, block_size); |
7c673cae FG |
1739 | if (offset >= end) |
1740 | return; | |
11fdf7f2 TL |
1741 | end = round_up_to(end - block_size, block_size); |
1742 | ceph_assert(end >= offset); | |
1743 | if (offset < end) { | |
1744 | BlkDev blkdev(fd); | |
1745 | if (blkdev.discard(offset, end - offset) < 0) { | |
7c673cae | 1746 | dout(1) << __func__ << "ioctl(BLKDISCARD) error:" << cpp_strerror(errno) << dendl; |
11fdf7f2 TL |
1747 | } |
1748 | } | |
7c673cae FG |
1749 | } |
1750 | ||
1751 | void FileJournal::committed_thru(uint64_t seq) | |
1752 | { | |
9f95a23c | 1753 | std::lock_guard locker{write_lock}; |
7c673cae FG |
1754 | |
1755 | auto released = throttle.flush(seq); | |
1756 | if (logger) { | |
1757 | logger->dec(l_filestore_journal_ops, released.first); | |
1758 | logger->dec(l_filestore_journal_bytes, released.second); | |
1759 | } | |
1760 | ||
1761 | if (seq < last_committed_seq) { | |
1762 | dout(5) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl; | |
11fdf7f2 | 1763 | ceph_assert(seq >= last_committed_seq); |
7c673cae FG |
1764 | return; |
1765 | } | |
1766 | if (seq == last_committed_seq) { | |
1767 | dout(5) << "committed_thru " << seq << " == last_committed_seq " << last_committed_seq << dendl; | |
1768 | return; | |
1769 | } | |
1770 | ||
1771 | dout(5) << "committed_thru " << seq << " (last_committed_seq " << last_committed_seq << ")" << dendl; | |
1772 | last_committed_seq = seq; | |
1773 | ||
1774 | // completions! | |
1775 | { | |
9f95a23c | 1776 | std::lock_guard locker{finisher_lock}; |
7c673cae FG |
1777 | queue_completions_thru(seq); |
1778 | if (plug_journal_completions && seq >= header.start_seq) { | |
1779 | dout(10) << " removing completion plug, queuing completions thru journaled_seq " << journaled_seq << dendl; | |
1780 | plug_journal_completions = false; | |
1781 | queue_completions_thru(journaled_seq); | |
1782 | } | |
1783 | } | |
1784 | ||
1785 | // adjust start pointer | |
1786 | while (!journalq.empty() && journalq.front().first <= seq) { | |
1787 | journalq.pop_front(); | |
1788 | } | |
1789 | ||
1790 | int64_t old_start = header.start; | |
1791 | if (!journalq.empty()) { | |
1792 | header.start = journalq.front().second; | |
1793 | header.start_seq = journalq.front().first; | |
1794 | } else { | |
1795 | header.start = write_pos; | |
1796 | header.start_seq = seq + 1; | |
1797 | } | |
1798 | ||
1799 | if (discard) { | |
1800 | dout(10) << __func__ << " will trim (" << old_start << ", " << header.start << ")" << dendl; | |
1801 | if (old_start < header.start) | |
1802 | do_discard(old_start, header.start - 1); | |
1803 | else { | |
1804 | do_discard(old_start, header.max_size - 1); | |
1805 | do_discard(get_top(), header.start - 1); | |
1806 | } | |
1807 | } | |
1808 | ||
1809 | must_write_header = true; | |
1810 | print_header(header); | |
1811 | ||
1812 | // committed but unjournaled items | |
1813 | while (!writeq_empty() && peek_write().seq <= seq) { | |
1814 | dout(15) << " dropping committed but unwritten seq " << peek_write().seq | |
1815 | << " len " << peek_write().bl.length() | |
1816 | << dendl; | |
1817 | complete_write(1, peek_write().orig_len); | |
1818 | pop_write(); | |
1819 | } | |
1820 | ||
9f95a23c | 1821 | commit_cond.notify_all(); |
7c673cae FG |
1822 | |
1823 | dout(10) << "committed_thru done" << dendl; | |
1824 | } | |
1825 | ||
1826 | ||
1827 | void FileJournal::complete_write(uint64_t ops, uint64_t bytes) | |
1828 | { | |
1829 | dout(5) << __func__ << " finished " << ops << " ops and " | |
1830 | << bytes << " bytes" << dendl; | |
1831 | } | |
1832 | ||
1833 | int FileJournal::make_writeable() | |
1834 | { | |
1835 | dout(10) << __func__ << dendl; | |
1836 | int r = set_throttle_params(); | |
1837 | if (r < 0) | |
1838 | return r; | |
1839 | ||
1840 | r = _open(true); | |
1841 | if (r < 0) | |
1842 | return r; | |
1843 | ||
1844 | if (read_pos > 0) | |
1845 | write_pos = read_pos; | |
1846 | else | |
1847 | write_pos = get_top(); | |
1848 | read_pos = 0; | |
1849 | ||
1850 | must_write_header = true; | |
1851 | ||
1852 | start_writer(); | |
1853 | return 0; | |
1854 | } | |
1855 | ||
1856 | int FileJournal::set_throttle_params() | |
1857 | { | |
1858 | stringstream ss; | |
1859 | bool valid = throttle.set_params( | |
1860 | cct->_conf->journal_throttle_low_threshhold, | |
1861 | cct->_conf->journal_throttle_high_threshhold, | |
1862 | cct->_conf->filestore_expected_throughput_bytes, | |
1863 | cct->_conf->journal_throttle_high_multiple, | |
1864 | cct->_conf->journal_throttle_max_multiple, | |
1865 | header.max_size - get_top(), | |
1866 | &ss); | |
1867 | ||
1868 | if (!valid) { | |
1869 | derr << "tried to set invalid params: " | |
1870 | << ss.str() | |
1871 | << dendl; | |
1872 | } | |
1873 | return valid ? 0 : -EINVAL; | |
1874 | } | |
1875 | ||
1876 | const char** FileJournal::get_tracked_conf_keys() const | |
1877 | { | |
1878 | static const char *KEYS[] = { | |
1879 | "journal_throttle_low_threshhold", | |
1880 | "journal_throttle_high_threshhold", | |
1881 | "journal_throttle_high_multiple", | |
1882 | "journal_throttle_max_multiple", | |
1883 | "filestore_expected_throughput_bytes", | |
1884 | NULL}; | |
1885 | return KEYS; | |
1886 | } | |
1887 | ||
1888 | void FileJournal::wrap_read_bl( | |
1889 | off64_t pos, | |
1890 | int64_t olen, | |
1891 | bufferlist* bl, | |
1892 | off64_t *out_pos | |
1893 | ) const | |
1894 | { | |
1895 | while (olen > 0) { | |
1896 | while (pos >= header.max_size) | |
1897 | pos = pos + get_top() - header.max_size; | |
1898 | ||
1899 | int64_t len; | |
1900 | if (pos + olen > header.max_size) | |
1901 | len = header.max_size - pos; // partial | |
1902 | else | |
1903 | len = olen; // rest | |
1904 | ||
1905 | int64_t actual = ::lseek64(fd, pos, SEEK_SET); | |
11fdf7f2 | 1906 | ceph_assert(actual == pos); |
7c673cae FG |
1907 | |
1908 | bufferptr bp = buffer::create(len); | |
1909 | int r = safe_read_exact(fd, bp.c_str(), len); | |
1910 | if (r) { | |
1911 | derr << "FileJournal::wrap_read_bl: safe_read_exact " << pos << "~" << len << " returned " | |
11fdf7f2 | 1912 | << cpp_strerror(r) << dendl; |
7c673cae FG |
1913 | ceph_abort(); |
1914 | } | |
1915 | bl->push_back(std::move(bp)); | |
1916 | pos += len; | |
1917 | olen -= len; | |
1918 | } | |
1919 | if (pos >= header.max_size) | |
1920 | pos = pos + get_top() - header.max_size; | |
1921 | if (out_pos) | |
1922 | *out_pos = pos; | |
1923 | } | |
1924 | ||
1925 | bool FileJournal::read_entry( | |
1926 | bufferlist &bl, | |
1927 | uint64_t &next_seq, | |
1928 | bool *corrupt) | |
1929 | { | |
1930 | if (corrupt) | |
1931 | *corrupt = false; | |
1932 | uint64_t seq = next_seq; | |
1933 | ||
1934 | if (!read_pos) { | |
1935 | dout(2) << "read_entry -- not readable" << dendl; | |
1936 | return false; | |
1937 | } | |
1938 | ||
1939 | off64_t pos = read_pos; | |
1940 | off64_t next_pos = pos; | |
1941 | stringstream ss; | |
1942 | read_entry_result result = do_read_entry( | |
1943 | pos, | |
1944 | &next_pos, | |
1945 | &bl, | |
1946 | &seq, | |
1947 | &ss); | |
1948 | if (result == SUCCESS) { | |
1949 | journalq.push_back( pair<uint64_t,off64_t>(seq, pos)); | |
1950 | uint64_t amount_to_take = | |
1951 | next_pos > pos ? | |
1952 | next_pos - pos : | |
1953 | (header.max_size - pos) + (next_pos - get_top()); | |
1954 | throttle.take(amount_to_take); | |
1955 | throttle.register_throttle_seq(next_seq, amount_to_take); | |
1956 | if (logger) { | |
1957 | logger->inc(l_filestore_journal_ops, 1); | |
1958 | logger->inc(l_filestore_journal_bytes, amount_to_take); | |
1959 | } | |
1960 | if (next_seq > seq) { | |
1961 | return false; | |
1962 | } else { | |
1963 | read_pos = next_pos; | |
1964 | next_seq = seq; | |
1965 | if (seq > journaled_seq) | |
1966 | journaled_seq = seq; | |
1967 | return true; | |
1968 | } | |
3efd9988 FG |
1969 | } else { |
1970 | derr << "do_read_entry(" << pos << "): " << ss.str() << dendl; | |
7c673cae FG |
1971 | } |
1972 | ||
1973 | if (seq && seq < header.committed_up_to) { | |
1974 | derr << "Unable to read past sequence " << seq | |
1975 | << " but header indicates the journal has committed up through " | |
1976 | << header.committed_up_to << ", journal is corrupt" << dendl; | |
1977 | if (cct->_conf->journal_ignore_corruption) { | |
1978 | if (corrupt) | |
1979 | *corrupt = true; | |
1980 | return false; | |
1981 | } else { | |
1982 | ceph_abort(); | |
1983 | } | |
1984 | } | |
1985 | ||
7c673cae FG |
1986 | dout(2) << "No further valid entries found, journal is most likely valid" |
1987 | << dendl; | |
1988 | return false; | |
1989 | } | |
1990 | ||
1991 | FileJournal::read_entry_result FileJournal::do_read_entry( | |
1992 | off64_t init_pos, | |
1993 | off64_t *next_pos, | |
1994 | bufferlist *bl, | |
1995 | uint64_t *seq, | |
1996 | ostream *ss, | |
1997 | entry_header_t *_h) const | |
1998 | { | |
1999 | off64_t cur_pos = init_pos; | |
2000 | bufferlist _bl; | |
2001 | if (!bl) | |
2002 | bl = &_bl; | |
2003 | ||
2004 | // header | |
2005 | entry_header_t *h; | |
2006 | bufferlist hbl; | |
2007 | off64_t _next_pos; | |
2008 | wrap_read_bl(cur_pos, sizeof(*h), &hbl, &_next_pos); | |
2009 | h = reinterpret_cast<entry_header_t *>(hbl.c_str()); | |
2010 | ||
2011 | if (!h->check_magic(cur_pos, header.get_fsid64())) { | |
2012 | dout(25) << "read_entry " << init_pos | |
2013 | << " : bad header magic, end of journal" << dendl; | |
2014 | if (ss) | |
2015 | *ss << "bad header magic"; | |
2016 | if (next_pos) | |
2017 | *next_pos = init_pos + (4<<10); // check 4k ahead | |
2018 | return MAYBE_CORRUPT; | |
2019 | } | |
2020 | cur_pos = _next_pos; | |
2021 | ||
2022 | // pad + body + pad | |
2023 | if (h->pre_pad) | |
2024 | cur_pos += h->pre_pad; | |
2025 | ||
2026 | bl->clear(); | |
2027 | wrap_read_bl(cur_pos, h->len, bl, &cur_pos); | |
2028 | ||
2029 | if (h->post_pad) | |
2030 | cur_pos += h->post_pad; | |
2031 | ||
2032 | // footer | |
2033 | entry_header_t *f; | |
2034 | bufferlist fbl; | |
2035 | wrap_read_bl(cur_pos, sizeof(*f), &fbl, &cur_pos); | |
2036 | f = reinterpret_cast<entry_header_t *>(fbl.c_str()); | |
2037 | if (memcmp(f, h, sizeof(*f))) { | |
2038 | if (ss) | |
2039 | *ss << "bad footer magic, partial entry"; | |
2040 | if (next_pos) | |
2041 | *next_pos = cur_pos; | |
2042 | return MAYBE_CORRUPT; | |
2043 | } | |
2044 | ||
2045 | if ((header.flags & header_t::FLAG_CRC) || // if explicitly enabled (new journal) | |
2046 | h->crc32c != 0) { // newer entry in old journal | |
2047 | uint32_t actual_crc = bl->crc32c(0); | |
2048 | if (actual_crc != h->crc32c) { | |
2049 | if (ss) | |
2050 | *ss << "header crc (" << h->crc32c | |
2051 | << ") doesn't match body crc (" << actual_crc << ")"; | |
2052 | if (next_pos) | |
2053 | *next_pos = cur_pos; | |
2054 | return MAYBE_CORRUPT; | |
2055 | } | |
2056 | } | |
2057 | ||
2058 | // yay! | |
2059 | dout(2) << "read_entry " << init_pos << " : seq " << h->seq | |
2060 | << " " << h->len << " bytes" | |
2061 | << dendl; | |
2062 | ||
2063 | // ok! | |
2064 | if (seq) | |
2065 | *seq = h->seq; | |
2066 | ||
2067 | ||
2068 | if (next_pos) | |
2069 | *next_pos = cur_pos; | |
2070 | ||
2071 | if (_h) | |
2072 | *_h = *h; | |
2073 | ||
11fdf7f2 | 2074 | ceph_assert(cur_pos % header.alignment == 0); |
7c673cae FG |
2075 | return SUCCESS; |
2076 | } | |
2077 | ||
2078 | void FileJournal::reserve_throttle_and_backoff(uint64_t count) | |
2079 | { | |
2080 | throttle.get(count); | |
2081 | } | |
2082 | ||
2083 | void FileJournal::get_header( | |
2084 | uint64_t wanted_seq, | |
2085 | off64_t *_pos, | |
2086 | entry_header_t *h) | |
2087 | { | |
2088 | off64_t pos = header.start; | |
2089 | off64_t next_pos = pos; | |
2090 | bufferlist bl; | |
2091 | uint64_t seq = 0; | |
2092 | dout(2) << __func__ << dendl; | |
2093 | while (1) { | |
2094 | bl.clear(); | |
2095 | pos = next_pos; | |
2096 | read_entry_result result = do_read_entry( | |
2097 | pos, | |
2098 | &next_pos, | |
2099 | &bl, | |
2100 | &seq, | |
2101 | 0, | |
2102 | h); | |
2103 | if (result == FAILURE || result == MAYBE_CORRUPT) | |
2104 | ceph_abort(); | |
2105 | if (seq == wanted_seq) { | |
2106 | if (_pos) | |
2107 | *_pos = pos; | |
2108 | return; | |
2109 | } | |
2110 | } | |
2111 | ceph_abort(); // not reachable | |
2112 | } | |
2113 | ||
2114 | void FileJournal::corrupt( | |
2115 | int wfd, | |
2116 | off64_t corrupt_at) | |
2117 | { | |
2118 | dout(2) << __func__ << dendl; | |
2119 | if (corrupt_at >= header.max_size) | |
2120 | corrupt_at = corrupt_at + get_top() - header.max_size; | |
2121 | ||
2122 | int64_t actual = ::lseek64(fd, corrupt_at, SEEK_SET); | |
11fdf7f2 | 2123 | ceph_assert(actual == corrupt_at); |
7c673cae FG |
2124 | |
2125 | char buf[10]; | |
2126 | int r = safe_read_exact(fd, buf, 1); | |
11fdf7f2 | 2127 | ceph_assert(r == 0); |
7c673cae FG |
2128 | |
2129 | actual = ::lseek64(wfd, corrupt_at, SEEK_SET); | |
11fdf7f2 | 2130 | ceph_assert(actual == corrupt_at); |
7c673cae FG |
2131 | |
2132 | buf[0]++; | |
2133 | r = safe_write(wfd, buf, 1); | |
11fdf7f2 | 2134 | ceph_assert(r == 0); |
7c673cae FG |
2135 | } |
2136 | ||
2137 | void FileJournal::corrupt_payload( | |
2138 | int wfd, | |
2139 | uint64_t seq) | |
2140 | { | |
2141 | dout(2) << __func__ << dendl; | |
2142 | off64_t pos = 0; | |
2143 | entry_header_t h; | |
2144 | get_header(seq, &pos, &h); | |
2145 | off64_t corrupt_at = | |
2146 | pos + sizeof(entry_header_t) + h.pre_pad; | |
2147 | corrupt(wfd, corrupt_at); | |
2148 | } | |
2149 | ||
2150 | ||
2151 | void FileJournal::corrupt_footer_magic( | |
2152 | int wfd, | |
2153 | uint64_t seq) | |
2154 | { | |
2155 | dout(2) << __func__ << dendl; | |
2156 | off64_t pos = 0; | |
2157 | entry_header_t h; | |
2158 | get_header(seq, &pos, &h); | |
2159 | off64_t corrupt_at = | |
2160 | pos + sizeof(entry_header_t) + h.pre_pad + | |
2161 | h.len + h.post_pad + | |
2162 | (reinterpret_cast<char*>(&h.magic2) - reinterpret_cast<char*>(&h)); | |
2163 | corrupt(wfd, corrupt_at); | |
2164 | } | |
2165 | ||
2166 | ||
2167 | void FileJournal::corrupt_header_magic( | |
2168 | int wfd, | |
2169 | uint64_t seq) | |
2170 | { | |
2171 | dout(2) << __func__ << dendl; | |
2172 | off64_t pos = 0; | |
2173 | entry_header_t h; | |
2174 | get_header(seq, &pos, &h); | |
2175 | off64_t corrupt_at = | |
2176 | pos + | |
2177 | (reinterpret_cast<char*>(&h.magic2) - reinterpret_cast<char*>(&h)); | |
2178 | corrupt(wfd, corrupt_at); | |
2179 | } | |
2180 | ||
2181 | off64_t FileJournal::get_journal_size_estimate() | |
2182 | { | |
2183 | off64_t size, start = header.start; | |
2184 | if (write_pos < start) { | |
2185 | size = (max_size - start) + write_pos; | |
2186 | } else { | |
2187 | size = write_pos - start; | |
2188 | } | |
2189 | dout(20) << __func__ << " journal size=" << size << dendl; | |
2190 | return size; | |
2191 | } | |
11fdf7f2 TL |
2192 | |
2193 | void FileJournal::get_devices(set<string> *ls) | |
2194 | { | |
2195 | string dev_node; | |
2196 | BlkDev blkdev(fd); | |
2197 | if (int rc = blkdev.wholedisk(&dev_node); rc) { | |
2198 | return; | |
2199 | } | |
2200 | get_raw_devices(dev_node, ls); | |
2201 | } | |
2202 | ||
2203 | void FileJournal::collect_metadata(map<string,string> *pm) | |
2204 | { | |
2205 | BlkDev blkdev(fd); | |
2206 | char partition_path[PATH_MAX]; | |
2207 | char dev_node[PATH_MAX]; | |
2208 | if (blkdev.partition(partition_path, PATH_MAX)) { | |
2209 | (*pm)["backend_filestore_journal_partition_path"] = "unknown"; | |
2210 | } else { | |
2211 | (*pm)["backend_filestore_journal_partition_path"] = string(partition_path); | |
2212 | } | |
2213 | if (blkdev.wholedisk(dev_node, PATH_MAX)) { | |
2214 | (*pm)["backend_filestore_journal_dev_node"] = "unknown"; | |
2215 | } else { | |
2216 | (*pm)["backend_filestore_journal_dev_node"] = string(dev_node); | |
2217 | devname = dev_node; | |
2218 | } | |
2219 | } |