]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | #include "acconfig.h" | |
15 | ||
16 | #include "common/debug.h" | |
17 | #include "common/errno.h" | |
18 | #include "common/safe_io.h" | |
19 | #include "FileJournal.h" | |
20 | #include "include/color.h" | |
21 | #include "common/perf_counters.h" | |
22 | #include "FileStore.h" | |
23 | ||
24 | #include "include/compat.h" | |
25 | ||
26 | #include <fcntl.h> | |
27 | #include <limits.h> | |
28 | #include <sstream> | |
29 | #include <stdio.h> | |
30 | #include <stdlib.h> | |
31 | #include <sys/types.h> | |
32 | #include <sys/stat.h> | |
33 | #include <sys/mount.h> | |
34 | ||
35 | #include "common/blkdev.h" | |
36 | #if defined(__linux__) | |
37 | #include "common/linux_version.h" | |
38 | #endif | |
39 | ||
40 | #if defined(__FreeBSD__) | |
41 | #define O_DSYNC O_SYNC | |
42 | #endif | |
43 | ||
44 | #define dout_context cct | |
45 | #define dout_subsys ceph_subsys_journal | |
46 | #undef dout_prefix | |
47 | #define dout_prefix *_dout << "journal " | |
48 | ||
49 | const static int64_t ONE_MEG(1 << 20); | |
50 | const static int CEPH_DIRECTIO_ALIGNMENT(4096); | |
51 | ||
52 | ||
53 | int FileJournal::_open(bool forwrite, bool create) | |
54 | { | |
55 | int flags, ret; | |
56 | ||
57 | if (forwrite) { | |
58 | flags = O_RDWR; | |
59 | if (directio) | |
60 | flags |= O_DIRECT | O_DSYNC; | |
61 | } else { | |
62 | flags = O_RDONLY; | |
63 | } | |
64 | if (create) | |
65 | flags |= O_CREAT; | |
66 | ||
67 | if (fd >= 0) { | |
68 | if (TEMP_FAILURE_RETRY(::close(fd))) { | |
69 | int err = errno; | |
70 | derr << "FileJournal::_open: error closing old fd: " | |
71 | << cpp_strerror(err) << dendl; | |
72 | } | |
73 | } | |
74 | fd = TEMP_FAILURE_RETRY(::open(fn.c_str(), flags, 0644)); | |
75 | if (fd < 0) { | |
76 | int err = errno; | |
77 | dout(2) << "FileJournal::_open unable to open journal " | |
78 | << fn << ": " << cpp_strerror(err) << dendl; | |
79 | return -err; | |
80 | } | |
81 | ||
82 | struct stat st; | |
83 | ret = ::fstat(fd, &st); | |
84 | if (ret) { | |
85 | ret = errno; | |
86 | derr << "FileJournal::_open: unable to fstat journal: " << cpp_strerror(ret) << dendl; | |
87 | ret = -ret; | |
88 | goto out_fd; | |
89 | } | |
90 | ||
91 | if (S_ISBLK(st.st_mode)) { | |
92 | ret = _open_block_device(); | |
93 | } else if (S_ISREG(st.st_mode)) { | |
94 | if (aio && !force_aio) { | |
95 | derr << "FileJournal::_open: disabling aio for non-block journal. Use " | |
96 | << "journal_force_aio to force use of aio anyway" << dendl; | |
97 | aio = false; | |
98 | } | |
99 | ret = _open_file(st.st_size, st.st_blksize, create); | |
100 | } else { | |
101 | derr << "FileJournal::_open: wrong journal file type: " << st.st_mode | |
102 | << dendl; | |
103 | ret = -EINVAL; | |
104 | } | |
105 | ||
106 | if (ret) | |
107 | goto out_fd; | |
108 | ||
109 | #ifdef HAVE_LIBAIO | |
110 | if (aio) { | |
111 | aio_ctx = 0; | |
112 | ret = io_setup(128, &aio_ctx); | |
113 | if (ret < 0) { | |
114 | switch (ret) { | |
115 | // Contrary to naive expectations -EAGIAN means ... | |
116 | case -EAGAIN: | |
117 | derr << "FileJournal::_open: user's limit of aio events exceeded. " | |
118 | << "Try increasing /proc/sys/fs/aio-max-nr" << dendl; | |
119 | break; | |
120 | default: | |
121 | derr << "FileJournal::_open: unable to setup io_context " << cpp_strerror(-ret) << dendl; | |
122 | break; | |
123 | } | |
124 | goto out_fd; | |
125 | } | |
126 | } | |
127 | #endif | |
128 | ||
129 | /* We really want max_size to be a multiple of block_size. */ | |
130 | max_size -= max_size % block_size; | |
131 | ||
132 | dout(1) << "_open " << fn << " fd " << fd | |
133 | << ": " << max_size | |
134 | << " bytes, block size " << block_size | |
135 | << " bytes, directio = " << directio | |
136 | << ", aio = " << aio | |
137 | << dendl; | |
138 | return 0; | |
139 | ||
140 | out_fd: | |
141 | VOID_TEMP_FAILURE_RETRY(::close(fd)); | |
142 | fd = -1; | |
143 | return ret; | |
144 | } | |
145 | ||
146 | int FileJournal::_open_block_device() | |
147 | { | |
148 | int64_t bdev_sz = 0; | |
149 | int ret = get_block_device_size(fd, &bdev_sz); | |
150 | if (ret) { | |
151 | dout(0) << __func__ << ": failed to read block device size." << dendl; | |
152 | return -EIO; | |
153 | } | |
154 | ||
155 | /* Check for bdev_sz too small */ | |
156 | if (bdev_sz < ONE_MEG) { | |
157 | dout(0) << __func__ << ": your block device must be at least " | |
158 | << ONE_MEG << " bytes to be used for a Ceph journal." << dendl; | |
159 | return -EINVAL; | |
160 | } | |
161 | ||
162 | dout(10) << __func__ << ": ignoring osd journal size. " | |
163 | << "We'll use the entire block device (size: " << bdev_sz << ")" | |
164 | << dendl; | |
165 | max_size = bdev_sz; | |
166 | ||
167 | block_size = cct->_conf->journal_block_size; | |
168 | ||
169 | if (cct->_conf->journal_discard) { | |
170 | discard = block_device_support_discard(fn.c_str()); | |
171 | dout(10) << fn << " support discard: " << (int)discard << dendl; | |
172 | } | |
173 | ||
174 | return 0; | |
175 | } | |
176 | ||
177 | int FileJournal::_open_file(int64_t oldsize, blksize_t blksize, | |
178 | bool create) | |
179 | { | |
180 | int ret; | |
181 | int64_t conf_journal_sz(cct->_conf->osd_journal_size); | |
182 | conf_journal_sz <<= 20; | |
183 | ||
184 | if ((cct->_conf->osd_journal_size == 0) && (oldsize < ONE_MEG)) { | |
185 | derr << "I'm sorry, I don't know how large of a journal to create." | |
186 | << "Please specify a block device to use as the journal OR " | |
187 | << "set osd_journal_size in your ceph.conf" << dendl; | |
188 | return -EINVAL; | |
189 | } | |
190 | ||
191 | if (create && (oldsize < conf_journal_sz)) { | |
192 | uint64_t newsize(conf_journal_sz); | |
193 | dout(10) << __func__ << " _open extending to " << newsize << " bytes" << dendl; | |
194 | ret = ::ftruncate(fd, newsize); | |
195 | if (ret < 0) { | |
196 | int err = errno; | |
197 | derr << "FileJournal::_open_file : unable to extend journal to " | |
198 | << newsize << " bytes: " << cpp_strerror(err) << dendl; | |
199 | return -err; | |
200 | } | |
201 | #ifdef HAVE_POSIX_FALLOCATE | |
202 | ret = ::posix_fallocate(fd, 0, newsize); | |
203 | if (ret) { | |
204 | derr << "FileJournal::_open_file : unable to preallocation journal to " | |
205 | << newsize << " bytes: " << cpp_strerror(ret) << dendl; | |
206 | return -ret; | |
207 | } | |
208 | max_size = newsize; | |
209 | #elif defined(__APPLE__) | |
210 | fstore_t store; | |
211 | store.fst_flags = F_ALLOCATECONTIG; | |
212 | store.fst_posmode = F_PEOFPOSMODE; | |
213 | store.fst_offset = 0; | |
214 | store.fst_length = newsize; | |
215 | ||
216 | ret = ::fcntl(fd, F_PREALLOCATE, &store); | |
217 | if (ret == -1) { | |
218 | ret = -errno; | |
219 | derr << "FileJournal::_open_file : unable to preallocation journal to " | |
220 | << newsize << " bytes: " << cpp_strerror(ret) << dendl; | |
221 | return ret; | |
222 | } | |
223 | max_size = newsize; | |
224 | #else | |
225 | # error "Journal pre-allocation not supported on platform." | |
226 | #endif | |
227 | } | |
228 | else { | |
229 | max_size = oldsize; | |
230 | } | |
231 | block_size = cct->_conf->journal_block_size; | |
232 | ||
233 | if (create && cct->_conf->journal_zero_on_create) { | |
234 | derr << "FileJournal::_open_file : zeroing journal" << dendl; | |
235 | uint64_t write_size = 1 << 20; | |
236 | char *buf; | |
237 | ret = ::posix_memalign((void **)&buf, block_size, write_size); | |
238 | if (ret != 0) { | |
239 | return -ret; | |
240 | } | |
241 | memset(static_cast<void*>(buf), 0, write_size); | |
242 | uint64_t i = 0; | |
243 | for (; (i + write_size) <= (uint64_t)max_size; i += write_size) { | |
244 | ret = ::pwrite(fd, static_cast<void*>(buf), write_size, i); | |
245 | if (ret < 0) { | |
246 | free(buf); | |
247 | return -errno; | |
248 | } | |
249 | } | |
250 | if (i < (uint64_t)max_size) { | |
251 | ret = ::pwrite(fd, static_cast<void*>(buf), max_size - i, i); | |
252 | if (ret < 0) { | |
253 | free(buf); | |
254 | return -errno; | |
255 | } | |
256 | } | |
257 | free(buf); | |
258 | } | |
259 | ||
260 | ||
261 | dout(10) << "_open journal is not a block device, NOT checking disk " | |
262 | << "write cache on '" << fn << "'" << dendl; | |
263 | ||
264 | return 0; | |
265 | } | |
266 | ||
267 | // This can not be used on an active journal | |
268 | int FileJournal::check() | |
269 | { | |
270 | int ret; | |
271 | ||
272 | assert(fd == -1); | |
273 | ret = _open(false, false); | |
274 | if (ret) | |
275 | return ret; | |
276 | ||
277 | ret = read_header(&header); | |
278 | if (ret < 0) | |
279 | goto done; | |
280 | ||
281 | if (header.fsid != fsid) { | |
282 | derr << "check: ondisk fsid " << header.fsid << " doesn't match expected " << fsid | |
283 | << ", invalid (someone else's?) journal" << dendl; | |
284 | ret = -EINVAL; | |
285 | goto done; | |
286 | } | |
287 | ||
288 | dout(1) << "check: header looks ok" << dendl; | |
289 | ret = 0; | |
290 | ||
291 | done: | |
292 | close(); | |
293 | return ret; | |
294 | } | |
295 | ||
296 | ||
297 | int FileJournal::create() | |
298 | { | |
299 | void *buf = 0; | |
300 | int64_t needed_space; | |
301 | int ret; | |
302 | buffer::ptr bp; | |
303 | dout(2) << "create " << fn << " fsid " << fsid << dendl; | |
304 | ||
305 | ret = _open(true, true); | |
306 | if (ret) | |
307 | goto done; | |
308 | ||
309 | // write empty header | |
310 | header = header_t(); | |
311 | header.flags = header_t::FLAG_CRC; // enable crcs on any new journal. | |
312 | header.fsid = fsid; | |
313 | header.max_size = max_size; | |
314 | header.block_size = block_size; | |
315 | if (cct->_conf->journal_block_align || directio) | |
316 | header.alignment = block_size; | |
317 | else | |
318 | header.alignment = 16; // at least stay word aligned on 64bit machines... | |
319 | ||
320 | header.start = get_top(); | |
321 | header.start_seq = 0; | |
322 | ||
323 | print_header(header); | |
324 | ||
325 | // static zeroed buffer for alignment padding | |
326 | delete [] zero_buf; | |
327 | zero_buf = new char[header.alignment]; | |
328 | memset(zero_buf, 0, header.alignment); | |
329 | ||
330 | bp = prepare_header(); | |
331 | if (TEMP_FAILURE_RETRY(::pwrite(fd, bp.c_str(), bp.length(), 0)) < 0) { | |
332 | ret = -errno; | |
333 | derr << "FileJournal::create : create write header error " | |
334 | << cpp_strerror(ret) << dendl; | |
335 | goto close_fd; | |
336 | } | |
337 | ||
338 | // zero first little bit, too. | |
339 | ret = posix_memalign(&buf, block_size, block_size); | |
340 | if (ret) { | |
341 | ret = -ret; | |
342 | derr << "FileJournal::create: failed to allocate " << block_size | |
343 | << " bytes of memory: " << cpp_strerror(ret) << dendl; | |
344 | goto close_fd; | |
345 | } | |
346 | memset(buf, 0, block_size); | |
347 | if (TEMP_FAILURE_RETRY(::pwrite(fd, buf, block_size, get_top())) < 0) { | |
348 | ret = -errno; | |
349 | derr << "FileJournal::create: error zeroing first " << block_size | |
350 | << " bytes " << cpp_strerror(ret) << dendl; | |
351 | goto free_buf; | |
352 | } | |
353 | ||
354 | needed_space = ((int64_t)cct->_conf->osd_max_write_size) << 20; | |
355 | needed_space += (2 * sizeof(entry_header_t)) + get_top(); | |
356 | if (header.max_size - header.start < needed_space) { | |
357 | derr << "FileJournal::create: OSD journal is not large enough to hold " | |
358 | << "osd_max_write_size bytes!" << dendl; | |
359 | ret = -ENOSPC; | |
360 | goto free_buf; | |
361 | } | |
362 | ||
363 | dout(2) << "create done" << dendl; | |
364 | ret = 0; | |
365 | ||
366 | free_buf: | |
367 | free(buf); | |
368 | buf = 0; | |
369 | close_fd: | |
370 | if (TEMP_FAILURE_RETRY(::close(fd)) < 0) { | |
371 | ret = -errno; | |
372 | derr << "FileJournal::create: error closing fd: " << cpp_strerror(ret) | |
373 | << dendl; | |
374 | } | |
375 | done: | |
376 | fd = -1; | |
377 | return ret; | |
378 | } | |
379 | ||
380 | // This can not be used on an active journal | |
381 | int FileJournal::peek_fsid(uuid_d& fsid) | |
382 | { | |
383 | assert(fd == -1); | |
384 | int r = _open(false, false); | |
385 | if (r) | |
386 | return r; | |
387 | r = read_header(&header); | |
388 | if (r < 0) | |
389 | goto out; | |
390 | fsid = header.fsid; | |
391 | out: | |
392 | close(); | |
393 | return r; | |
394 | } | |
395 | ||
396 | int FileJournal::open(uint64_t fs_op_seq) | |
397 | { | |
398 | dout(2) << "open " << fn << " fsid " << fsid << " fs_op_seq " << fs_op_seq << dendl; | |
399 | ||
400 | uint64_t next_seq = fs_op_seq + 1; | |
224ce89b | 401 | uint64_t seq = -1; |
7c673cae FG |
402 | |
403 | int err = _open(false); | |
404 | if (err) | |
405 | return err; | |
406 | ||
407 | // assume writeable, unless... | |
408 | read_pos = 0; | |
409 | write_pos = get_top(); | |
410 | ||
411 | // read header? | |
412 | err = read_header(&header); | |
413 | if (err < 0) | |
224ce89b | 414 | goto out; |
7c673cae FG |
415 | |
416 | // static zeroed buffer for alignment padding | |
417 | delete [] zero_buf; | |
418 | zero_buf = new char[header.alignment]; | |
419 | memset(zero_buf, 0, header.alignment); | |
420 | ||
421 | dout(10) << "open header.fsid = " << header.fsid | |
422 | //<< " vs expected fsid = " << fsid | |
423 | << dendl; | |
424 | if (header.fsid != fsid) { | |
425 | derr << "FileJournal::open: ondisk fsid " << header.fsid << " doesn't match expected " << fsid | |
426 | << ", invalid (someone else's?) journal" << dendl; | |
224ce89b WB |
427 | err = -EINVAL; |
428 | goto out; | |
7c673cae FG |
429 | } |
430 | if (header.max_size > max_size) { | |
431 | dout(2) << "open journal size " << header.max_size << " > current " << max_size << dendl; | |
224ce89b WB |
432 | err = -EINVAL; |
433 | goto out; | |
7c673cae FG |
434 | } |
435 | if (header.block_size != block_size) { | |
436 | dout(2) << "open journal block size " << header.block_size << " != current " << block_size << dendl; | |
224ce89b WB |
437 | err = -EINVAL; |
438 | goto out; | |
7c673cae FG |
439 | } |
440 | if (header.max_size % header.block_size) { | |
441 | dout(2) << "open journal max size " << header.max_size | |
442 | << " not a multiple of block size " << header.block_size << dendl; | |
224ce89b WB |
443 | err = -EINVAL; |
444 | goto out; | |
7c673cae FG |
445 | } |
446 | if (header.alignment != block_size && directio) { | |
447 | dout(0) << "open journal alignment " << header.alignment << " does not match block size " | |
448 | << block_size << " (required for direct_io journal mode)" << dendl; | |
224ce89b WB |
449 | err = -EINVAL; |
450 | goto out; | |
7c673cae FG |
451 | } |
452 | if ((header.alignment % CEPH_DIRECTIO_ALIGNMENT) && directio) { | |
453 | dout(0) << "open journal alignment " << header.alignment | |
454 | << " is not multiple of minimum directio alignment " | |
455 | << CEPH_DIRECTIO_ALIGNMENT << " (required for direct_io journal mode)" | |
456 | << dendl; | |
224ce89b WB |
457 | err = -EINVAL; |
458 | goto out; | |
7c673cae FG |
459 | } |
460 | ||
461 | // looks like a valid header. | |
462 | write_pos = 0; // not writeable yet | |
463 | ||
464 | journaled_seq = header.committed_up_to; | |
465 | ||
466 | // find next entry | |
467 | read_pos = header.start; | |
224ce89b | 468 | seq = header.start_seq; |
7c673cae FG |
469 | |
470 | while (1) { | |
471 | bufferlist bl; | |
472 | off64_t old_pos = read_pos; | |
473 | if (!read_entry(bl, seq)) { | |
474 | dout(10) << "open reached end of journal." << dendl; | |
475 | break; | |
476 | } | |
477 | if (seq > next_seq) { | |
478 | dout(10) << "open entry " << seq << " len " << bl.length() << " > next_seq " << next_seq | |
479 | << ", ignoring journal contents" | |
480 | << dendl; | |
481 | read_pos = -1; | |
482 | last_committed_seq = 0; | |
483 | return 0; | |
484 | } | |
485 | if (seq == next_seq) { | |
486 | dout(10) << "open reached seq " << seq << dendl; | |
487 | read_pos = old_pos; | |
488 | break; | |
489 | } | |
490 | seq++; // next event should follow. | |
491 | } | |
492 | ||
493 | return 0; | |
224ce89b WB |
494 | out: |
495 | close(); | |
496 | return err; | |
7c673cae FG |
497 | } |
498 | ||
499 | void FileJournal::_close(int fd) const | |
500 | { | |
501 | VOID_TEMP_FAILURE_RETRY(::close(fd)); | |
502 | } | |
503 | ||
504 | void FileJournal::close() | |
505 | { | |
506 | dout(1) << "close " << fn << dendl; | |
507 | ||
508 | // stop writer thread | |
509 | stop_writer(); | |
510 | ||
511 | // close | |
512 | assert(writeq_empty()); | |
513 | assert(!must_write_header); | |
514 | assert(fd >= 0); | |
515 | _close(fd); | |
516 | fd = -1; | |
517 | } | |
518 | ||
519 | ||
520 | int FileJournal::dump(ostream& out) | |
521 | { | |
522 | return _dump(out, false); | |
523 | } | |
524 | ||
525 | int FileJournal::simple_dump(ostream& out) | |
526 | { | |
527 | return _dump(out, true); | |
528 | } | |
529 | ||
530 | int FileJournal::_dump(ostream& out, bool simple) | |
531 | { | |
532 | JSONFormatter f(true); | |
533 | int ret = _fdump(f, simple); | |
534 | f.flush(out); | |
535 | return ret; | |
536 | } | |
537 | ||
538 | int FileJournal::_fdump(Formatter &f, bool simple) | |
539 | { | |
540 | dout(10) << "_fdump" << dendl; | |
541 | ||
542 | assert(fd == -1); | |
543 | int err = _open(false, false); | |
544 | if (err) | |
545 | return err; | |
546 | ||
547 | err = read_header(&header); | |
548 | if (err < 0) { | |
549 | close(); | |
550 | return err; | |
551 | } | |
552 | ||
553 | off64_t next_pos = header.start; | |
554 | ||
555 | f.open_object_section("journal"); | |
556 | ||
557 | f.open_object_section("header"); | |
558 | f.dump_unsigned("flags", header.flags); | |
559 | ostringstream os; | |
560 | os << header.fsid; | |
561 | f.dump_string("fsid", os.str()); | |
562 | f.dump_unsigned("block_size", header.block_size); | |
563 | f.dump_unsigned("alignment", header.alignment); | |
564 | f.dump_int("max_size", header.max_size); | |
565 | f.dump_int("start", header.start); | |
566 | f.dump_unsigned("committed_up_to", header.committed_up_to); | |
567 | f.dump_unsigned("start_seq", header.start_seq); | |
568 | f.close_section(); | |
569 | ||
570 | f.open_array_section("entries"); | |
571 | uint64_t seq = header.start_seq; | |
572 | while (1) { | |
573 | bufferlist bl; | |
574 | off64_t pos = next_pos; | |
575 | ||
576 | if (!pos) { | |
577 | dout(2) << "_dump -- not readable" << dendl; | |
578 | err = -EINVAL; | |
579 | break; | |
580 | } | |
581 | stringstream ss; | |
582 | read_entry_result result = do_read_entry( | |
583 | pos, | |
584 | &next_pos, | |
585 | &bl, | |
586 | &seq, | |
587 | &ss); | |
588 | if (result != SUCCESS) { | |
589 | if (seq < header.committed_up_to) { | |
590 | dout(2) << "Unable to read past sequence " << seq | |
591 | << " but header indicates the journal has committed up through " | |
592 | << header.committed_up_to << ", journal is corrupt" << dendl; | |
593 | err = -EINVAL; | |
594 | } | |
595 | dout(25) << ss.str() << dendl; | |
596 | dout(25) << "No further valid entries found, journal is most likely valid" | |
597 | << dendl; | |
598 | break; | |
599 | } | |
600 | ||
601 | f.open_object_section("entry"); | |
602 | f.dump_unsigned("offset", pos); | |
603 | f.dump_unsigned("seq", seq); | |
604 | if (simple) { | |
605 | f.dump_unsigned("bl.length", bl.length()); | |
606 | } else { | |
607 | f.open_array_section("transactions"); | |
608 | bufferlist::iterator p = bl.begin(); | |
609 | int trans_num = 0; | |
610 | while (!p.end()) { | |
611 | ObjectStore::Transaction t(p); | |
612 | f.open_object_section("transaction"); | |
613 | f.dump_unsigned("trans_num", trans_num); | |
614 | t.dump(&f); | |
615 | f.close_section(); | |
616 | trans_num++; | |
617 | } | |
618 | f.close_section(); | |
619 | } | |
620 | f.close_section(); | |
621 | } | |
622 | ||
623 | f.close_section(); | |
624 | f.close_section(); | |
625 | dout(10) << "dump finish" << dendl; | |
626 | ||
627 | close(); | |
628 | return err; | |
629 | } | |
630 | ||
631 | ||
632 | void FileJournal::start_writer() | |
633 | { | |
634 | write_stop = false; | |
635 | aio_stop = false; | |
636 | write_thread.create("journal_write"); | |
637 | #ifdef HAVE_LIBAIO | |
638 | if (aio) | |
639 | write_finish_thread.create("journal_wrt_fin"); | |
640 | #endif | |
641 | } | |
642 | ||
643 | void FileJournal::stop_writer() | |
644 | { | |
645 | // Do nothing if writer already stopped or never started | |
646 | if (!write_stop) | |
647 | { | |
648 | { | |
649 | Mutex::Locker l(write_lock); | |
650 | Mutex::Locker p(writeq_lock); | |
651 | write_stop = true; | |
652 | writeq_cond.Signal(); | |
653 | // Doesn't hurt to signal commit_cond in case thread is waiting there | |
654 | // and caller didn't use committed_thru() first. | |
655 | commit_cond.Signal(); | |
656 | } | |
657 | write_thread.join(); | |
658 | ||
659 | // write journal header now so that we have less to replay on remount | |
660 | write_header_sync(); | |
661 | } | |
662 | ||
663 | #ifdef HAVE_LIBAIO | |
664 | // stop aio completeion thread *after* writer thread has stopped | |
665 | // and has submitted all of its io | |
666 | if (aio && !aio_stop) { | |
667 | aio_lock.Lock(); | |
668 | aio_stop = true; | |
669 | aio_cond.Signal(); | |
670 | write_finish_cond.Signal(); | |
671 | aio_lock.Unlock(); | |
672 | write_finish_thread.join(); | |
673 | } | |
674 | #endif | |
675 | } | |
676 | ||
677 | ||
678 | ||
679 | void FileJournal::print_header(const header_t &header) const | |
680 | { | |
681 | dout(10) << "header: block_size " << header.block_size | |
682 | << " alignment " << header.alignment | |
683 | << " max_size " << header.max_size | |
684 | << dendl; | |
685 | dout(10) << "header: start " << header.start << dendl; | |
686 | dout(10) << " write_pos " << write_pos << dendl; | |
687 | } | |
688 | ||
689 | int FileJournal::read_header(header_t *hdr) const | |
690 | { | |
691 | dout(10) << "read_header" << dendl; | |
692 | bufferlist bl; | |
693 | ||
694 | buffer::ptr bp = buffer::create_page_aligned(block_size); | |
695 | char* bpdata = bp.c_str(); | |
696 | int r = ::pread(fd, bpdata, bp.length(), 0); | |
697 | ||
698 | if (r < 0) { | |
699 | int err = errno; | |
700 | dout(0) << "read_header got " << cpp_strerror(err) << dendl; | |
701 | return -err; | |
702 | } | |
703 | ||
704 | // don't use bp.zero() here, because it also invalidates | |
705 | // crc cache (which is not yet populated anyway) | |
706 | if (bp.length() != (size_t)r) { | |
707 | // r will be always less or equal than bp.length | |
708 | bpdata += r; | |
709 | memset(bpdata, 0, bp.length() - r); | |
710 | } | |
711 | ||
712 | bl.push_back(std::move(bp)); | |
713 | ||
714 | try { | |
715 | bufferlist::iterator p = bl.begin(); | |
716 | ::decode(*hdr, p); | |
717 | } | |
718 | catch (buffer::error& e) { | |
719 | derr << "read_header error decoding journal header" << dendl; | |
720 | return -EINVAL; | |
721 | } | |
722 | ||
723 | ||
724 | /* | |
725 | * Unfortunately we weren't initializing the flags field for new | |
726 | * journals! Aie. This is safe(ish) now that we have only one | |
727 | * flag. Probably around when we add the next flag we need to | |
728 | * remove this or else this (eventually old) code will clobber newer | |
729 | * code's flags. | |
730 | */ | |
731 | if (hdr->flags > 3) { | |
732 | derr << "read_header appears to have gibberish flags; assuming 0" << dendl; | |
733 | hdr->flags = 0; | |
734 | } | |
735 | ||
736 | print_header(*hdr); | |
737 | ||
738 | return 0; | |
739 | } | |
740 | ||
741 | bufferptr FileJournal::prepare_header() | |
742 | { | |
743 | bufferlist bl; | |
744 | { | |
745 | Mutex::Locker l(finisher_lock); | |
746 | header.committed_up_to = journaled_seq; | |
747 | } | |
748 | ::encode(header, bl); | |
749 | bufferptr bp = buffer::create_page_aligned(get_top()); | |
750 | // don't use bp.zero() here, because it also invalidates | |
751 | // crc cache (which is not yet populated anyway) | |
752 | char* data = bp.c_str(); | |
753 | memcpy(data, bl.c_str(), bl.length()); | |
754 | data += bl.length(); | |
755 | memset(data, 0, bp.length()-bl.length()); | |
756 | return bp; | |
757 | } | |
758 | ||
759 | void FileJournal::write_header_sync() | |
760 | { | |
761 | Mutex::Locker locker(write_lock); | |
762 | must_write_header = true; | |
763 | bufferlist bl; | |
764 | do_write(bl); | |
765 | dout(20) << __func__ << " finish" << dendl; | |
766 | } | |
767 | ||
768 | int FileJournal::check_for_full(uint64_t seq, off64_t pos, off64_t size) | |
769 | { | |
770 | // already full? | |
771 | if (full_state != FULL_NOTFULL) | |
772 | return -ENOSPC; | |
773 | ||
774 | // take 1 byte off so that we only get pos == header.start on EMPTY, never on FULL. | |
775 | off64_t room; | |
776 | if (pos >= header.start) | |
777 | room = (header.max_size - pos) + (header.start - get_top()) - 1; | |
778 | else | |
779 | room = header.start - pos - 1; | |
780 | dout(10) << "room " << room << " max_size " << max_size << " pos " << pos << " header.start " << header.start | |
781 | << " top " << get_top() << dendl; | |
782 | ||
783 | if (do_sync_cond) { | |
784 | if (room >= (header.max_size >> 1) && | |
785 | room - size < (header.max_size >> 1)) { | |
786 | dout(10) << " passing half full mark, triggering commit" << dendl; | |
787 | do_sync_cond->SloppySignal(); // initiate a real commit so we can trim | |
788 | } | |
789 | } | |
790 | ||
791 | if (room >= size) { | |
792 | dout(10) << "check_for_full at " << pos << " : " << size << " < " << room << dendl; | |
793 | if (pos + size > header.max_size) | |
794 | must_write_header = true; | |
795 | return 0; | |
796 | } | |
797 | ||
798 | // full | |
799 | dout(1) << "check_for_full at " << pos << " : JOURNAL FULL " | |
800 | << pos << " >= " << room | |
801 | << " (max_size " << header.max_size << " start " << header.start << ")" | |
802 | << dendl; | |
803 | ||
804 | off64_t max = header.max_size - get_top(); | |
805 | if (size > max) | |
806 | dout(0) << "JOURNAL TOO SMALL: continuing, but slow: item " << size << " > journal " << max << " (usable)" << dendl; | |
807 | ||
808 | return -ENOSPC; | |
809 | } | |
810 | ||
811 | int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytes) | |
812 | { | |
813 | // gather queued writes | |
814 | off64_t queue_pos = write_pos; | |
815 | ||
816 | int eleft = cct->_conf->journal_max_write_entries; | |
817 | unsigned bmax = cct->_conf->journal_max_write_bytes; | |
818 | ||
819 | if (full_state != FULL_NOTFULL) | |
820 | return -ENOSPC; | |
821 | ||
822 | while (!writeq_empty()) { | |
823 | list<write_item> items; | |
824 | batch_pop_write(items); | |
825 | list<write_item>::iterator it = items.begin(); | |
826 | while (it != items.end()) { | |
827 | uint64_t bytes = it->bl.length(); | |
828 | int r = prepare_single_write(*it, bl, queue_pos, orig_ops, orig_bytes); | |
829 | if (r == 0) { // prepare ok, delete it | |
830 | items.erase(it++); | |
831 | #ifdef HAVE_LIBAIO | |
832 | { | |
833 | Mutex::Locker locker(aio_lock); | |
834 | assert(aio_write_queue_ops > 0); | |
835 | aio_write_queue_ops--; | |
836 | assert(aio_write_queue_bytes >= bytes); | |
837 | aio_write_queue_bytes -= bytes; | |
838 | } | |
839 | #else | |
840 | (void)bytes; | |
841 | #endif | |
842 | } | |
843 | if (r == -ENOSPC) { | |
844 | // the journal maybe full, insert the left item to writeq | |
845 | batch_unpop_write(items); | |
846 | if (orig_ops) | |
847 | goto out; // commit what we have | |
848 | ||
849 | if (logger) | |
850 | logger->inc(l_filestore_journal_full); | |
851 | ||
852 | if (wait_on_full) { | |
853 | dout(20) << "prepare_multi_write full on first entry, need to wait" << dendl; | |
854 | } else { | |
855 | dout(20) << "prepare_multi_write full on first entry, restarting journal" << dendl; | |
856 | ||
857 | // throw out what we have so far | |
858 | full_state = FULL_FULL; | |
859 | while (!writeq_empty()) { | |
860 | complete_write(1, peek_write().orig_len); | |
861 | pop_write(); | |
862 | } | |
863 | print_header(header); | |
864 | } | |
224ce89b | 865 | |
7c673cae FG |
866 | return -ENOSPC; // hrm, full on first op |
867 | } | |
868 | if (eleft) { | |
869 | if (--eleft == 0) { | |
870 | dout(20) << "prepare_multi_write hit max events per write " | |
871 | << cct->_conf->journal_max_write_entries << dendl; | |
872 | batch_unpop_write(items); | |
873 | goto out; | |
874 | } | |
875 | } | |
876 | if (bmax) { | |
877 | if (bl.length() >= bmax) { | |
878 | dout(20) << "prepare_multi_write hit max write size " | |
879 | << cct->_conf->journal_max_write_bytes << dendl; | |
880 | batch_unpop_write(items); | |
881 | goto out; | |
882 | } | |
883 | } | |
884 | } | |
885 | } | |
886 | ||
887 | out: | |
888 | dout(20) << "prepare_multi_write queue_pos now " << queue_pos << dendl; | |
889 | assert((write_pos + bl.length() == queue_pos) || | |
890 | (write_pos + bl.length() - header.max_size + get_top() == queue_pos)); | |
891 | return 0; | |
892 | } | |
893 | ||
894 | /* | |
895 | void FileJournal::queue_write_fin(uint64_t seq, Context *fin) | |
896 | { | |
897 | writing_seq.push_back(seq); | |
898 | if (!waiting_for_notfull.empty()) { | |
899 | // make sure previously unjournaled stuff waiting for UNFULL triggers | |
900 | // _before_ newly journaled stuff does | |
901 | dout(10) << "queue_write_fin will defer seq " << seq << " callback " << fin | |
902 | << " until after UNFULL" << dendl; | |
903 | C_Gather *g = new C_Gather(writeq.front().fin); | |
904 | writing_fin.push_back(g->new_sub()); | |
905 | waiting_for_notfull.push_back(g->new_sub()); | |
906 | } else { | |
907 | writing_fin.push_back(writeq.front().fin); | |
908 | dout(20) << "queue_write_fin seq " << seq << " callback " << fin << dendl; | |
909 | } | |
910 | } | |
911 | */ | |
912 | ||
913 | void FileJournal::queue_completions_thru(uint64_t seq) | |
914 | { | |
915 | assert(finisher_lock.is_locked()); | |
916 | utime_t now = ceph_clock_now(); | |
917 | list<completion_item> items; | |
918 | batch_pop_completions(items); | |
919 | list<completion_item>::iterator it = items.begin(); | |
920 | while (it != items.end()) { | |
921 | completion_item& next = *it; | |
922 | if (next.seq > seq) | |
923 | break; | |
924 | utime_t lat = now; | |
925 | lat -= next.start; | |
926 | dout(10) << "queue_completions_thru seq " << seq | |
927 | << " queueing seq " << next.seq | |
928 | << " " << next.finish | |
929 | << " lat " << lat << dendl; | |
930 | if (logger) { | |
931 | logger->tinc(l_filestore_journal_latency, lat); | |
932 | } | |
933 | if (next.finish) | |
934 | finisher->queue(next.finish); | |
935 | if (next.tracked_op) { | |
936 | next.tracked_op->mark_event("journaled_completion_queued"); | |
937 | next.tracked_op->journal_trace.event("queued completion"); | |
938 | next.tracked_op->journal_trace.keyval("completed through", seq); | |
939 | } | |
940 | items.erase(it++); | |
941 | } | |
942 | batch_unpop_completions(items); | |
943 | finisher_cond.Signal(); | |
944 | } | |
945 | ||
946 | ||
947 | int FileJournal::prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes) | |
948 | { | |
949 | uint64_t seq = next_write.seq; | |
950 | bufferlist &ebl = next_write.bl; | |
951 | off64_t size = ebl.length(); | |
952 | ||
953 | int r = check_for_full(seq, queue_pos, size); | |
954 | if (r < 0) | |
955 | return r; // ENOSPC or EAGAIN | |
956 | ||
957 | uint32_t orig_len = next_write.orig_len; | |
958 | orig_bytes += orig_len; | |
959 | orig_ops++; | |
960 | ||
961 | // add to write buffer | |
962 | dout(15) << "prepare_single_write " << orig_ops << " will write " << queue_pos << " : seq " << seq | |
963 | << " len " << orig_len << " -> " << size << dendl; | |
964 | ||
965 | unsigned seq_offset = offsetof(entry_header_t, seq); | |
966 | unsigned magic1_offset = offsetof(entry_header_t, magic1); | |
967 | unsigned magic2_offset = offsetof(entry_header_t, magic2); | |
968 | ||
969 | bufferptr headerptr = ebl.buffers().front(); | |
970 | uint64_t _seq = seq; | |
971 | uint64_t _queue_pos = queue_pos; | |
972 | uint64_t magic2 = entry_header_t::make_magic(seq, orig_len, header.get_fsid64()); | |
973 | headerptr.copy_in(seq_offset, sizeof(uint64_t), (char *)&_seq); | |
974 | headerptr.copy_in(magic1_offset, sizeof(uint64_t), (char *)&_queue_pos); | |
975 | headerptr.copy_in(magic2_offset, sizeof(uint64_t), (char *)&magic2); | |
976 | ||
977 | bufferptr footerptr = ebl.buffers().back(); | |
978 | unsigned post_offset = footerptr.length() - sizeof(entry_header_t); | |
979 | footerptr.copy_in(post_offset + seq_offset, sizeof(uint64_t), (char *)&_seq); | |
980 | footerptr.copy_in(post_offset + magic1_offset, sizeof(uint64_t), (char *)&_queue_pos); | |
981 | footerptr.copy_in(post_offset + magic2_offset, sizeof(uint64_t), (char *)&magic2); | |
982 | ||
983 | bl.claim_append(ebl); | |
984 | if (next_write.tracked_op) { | |
985 | next_write.tracked_op->mark_event("write_thread_in_journal_buffer"); | |
986 | next_write.tracked_op->journal_trace.event("prepare_single_write"); | |
987 | } | |
988 | ||
989 | journalq.push_back(pair<uint64_t,off64_t>(seq, queue_pos)); | |
990 | writing_seq = seq; | |
991 | ||
992 | queue_pos += size; | |
993 | if (queue_pos >= header.max_size) | |
994 | queue_pos = queue_pos + get_top() - header.max_size; | |
995 | ||
996 | return 0; | |
997 | } | |
998 | ||
999 | void FileJournal::check_align(off64_t pos, bufferlist& bl) | |
1000 | { | |
1001 | // make sure list segments are page aligned | |
1002 | if (directio && !bl.is_aligned_size_and_memory(block_size, CEPH_DIRECTIO_ALIGNMENT)) { | |
1003 | assert((bl.length() & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0); | |
1004 | assert((pos & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0); | |
1005 | assert(0 == "bl was not aligned"); | |
1006 | } | |
1007 | } | |
1008 | ||
1009 | int FileJournal::write_bl(off64_t& pos, bufferlist& bl) | |
1010 | { | |
1011 | int ret; | |
1012 | ||
1013 | off64_t spos = ::lseek64(fd, pos, SEEK_SET); | |
1014 | if (spos < 0) { | |
1015 | ret = -errno; | |
1016 | derr << "FileJournal::write_bl : lseek64 failed " << cpp_strerror(ret) << dendl; | |
1017 | return ret; | |
1018 | } | |
1019 | ret = bl.write_fd(fd); | |
1020 | if (ret) { | |
1021 | derr << "FileJournal::write_bl : write_fd failed: " << cpp_strerror(ret) << dendl; | |
1022 | return ret; | |
1023 | } | |
1024 | pos += bl.length(); | |
1025 | if (pos == header.max_size) | |
1026 | pos = get_top(); | |
1027 | return 0; | |
1028 | } | |
1029 | ||
1030 | void FileJournal::do_write(bufferlist& bl) | |
1031 | { | |
1032 | // nothing to do? | |
1033 | if (bl.length() == 0 && !must_write_header) | |
1034 | return; | |
1035 | ||
1036 | buffer::ptr hbp; | |
1037 | if (cct->_conf->journal_write_header_frequency && | |
1038 | (((++journaled_since_start) % | |
1039 | cct->_conf->journal_write_header_frequency) == 0)) { | |
1040 | must_write_header = true; | |
1041 | } | |
1042 | ||
1043 | if (must_write_header) { | |
1044 | must_write_header = false; | |
1045 | hbp = prepare_header(); | |
1046 | } | |
1047 | ||
1048 | dout(15) << "do_write writing " << write_pos << "~" << bl.length() | |
1049 | << (hbp.length() ? " + header":"") | |
1050 | << dendl; | |
1051 | ||
1052 | utime_t from = ceph_clock_now(); | |
1053 | ||
1054 | // entry | |
1055 | off64_t pos = write_pos; | |
1056 | ||
1057 | // Adjust write_pos | |
1058 | write_pos += bl.length(); | |
1059 | if (write_pos >= header.max_size) | |
1060 | write_pos = write_pos - header.max_size + get_top(); | |
1061 | ||
1062 | write_lock.Unlock(); | |
1063 | ||
1064 | // split? | |
1065 | off64_t split = 0; | |
1066 | if (pos + bl.length() > header.max_size) { | |
1067 | bufferlist first, second; | |
1068 | split = header.max_size - pos; | |
1069 | first.substr_of(bl, 0, split); | |
1070 | second.substr_of(bl, split, bl.length() - split); | |
1071 | assert(first.length() + second.length() == bl.length()); | |
1072 | dout(10) << "do_write wrapping, first bit at " << pos << " len " << first.length() | |
1073 | << " second bit len " << second.length() << " (orig len " << bl.length() << ")" << dendl; | |
1074 | ||
1075 | //Save pos to write first piece second | |
1076 | off64_t first_pos = pos; | |
1077 | off64_t orig_pos; | |
1078 | pos = get_top(); | |
1079 | // header too? | |
1080 | if (hbp.length()) { | |
1081 | // be sneaky: include the header in the second fragment | |
1082 | second.push_front(hbp); | |
1083 | pos = 0; // we included the header | |
1084 | } | |
1085 | // Write the second portion first possible with the header, so | |
1086 | // do_read_entry() won't even get a valid entry_header_t if there | |
1087 | // is a crash between the two writes. | |
1088 | orig_pos = pos; | |
1089 | if (write_bl(pos, second)) { | |
1090 | derr << "FileJournal::do_write: write_bl(pos=" << orig_pos | |
1091 | << ") failed" << dendl; | |
1092 | check_align(pos, second); | |
1093 | ceph_abort(); | |
1094 | } | |
1095 | orig_pos = first_pos; | |
1096 | if (write_bl(first_pos, first)) { | |
1097 | derr << "FileJournal::do_write: write_bl(pos=" << orig_pos | |
1098 | << ") failed" << dendl; | |
1099 | check_align(first_pos, first); | |
1100 | ceph_abort(); | |
1101 | } | |
1102 | assert(first_pos == get_top()); | |
1103 | } else { | |
1104 | // header too? | |
1105 | if (hbp.length()) { | |
1106 | if (TEMP_FAILURE_RETRY(::pwrite(fd, hbp.c_str(), hbp.length(), 0)) < 0) { | |
1107 | int err = errno; | |
1108 | derr << "FileJournal::do_write: pwrite(fd=" << fd | |
1109 | << ", hbp.length=" << hbp.length() << ") failed :" | |
1110 | << cpp_strerror(err) << dendl; | |
1111 | ceph_abort(); | |
1112 | } | |
1113 | } | |
1114 | ||
1115 | if (write_bl(pos, bl)) { | |
1116 | derr << "FileJournal::do_write: write_bl(pos=" << pos | |
1117 | << ") failed" << dendl; | |
1118 | check_align(pos, bl); | |
1119 | ceph_abort(); | |
1120 | } | |
1121 | } | |
1122 | ||
1123 | if (!directio) { | |
1124 | dout(20) << "do_write fsync" << dendl; | |
1125 | ||
1126 | /* | |
1127 | * We'd really love to have a fsync_range or fdatasync_range and do a: | |
1128 | * | |
1129 | * if (split) { | |
1130 | * ::fsync_range(fd, header.max_size - split, split)l | |
1131 | * ::fsync_range(fd, get_top(), bl.length() - split); | |
1132 | * else | |
1133 | * ::fsync_range(fd, write_pos, bl.length()) | |
1134 | * | |
1135 | * NetBSD and AIX apparently have it, and adding it to Linux wouldn't be | |
1136 | * too hard given all the underlying infrastructure already exist. | |
1137 | * | |
1138 | * NOTE: using sync_file_range here would not be safe as it does not | |
1139 | * flush disk caches or commits any sort of metadata. | |
1140 | */ | |
1141 | int ret = 0; | |
1142 | #if defined(DARWIN) || defined(__FreeBSD__) | |
1143 | ret = ::fsync(fd); | |
1144 | #else | |
1145 | ret = ::fdatasync(fd); | |
1146 | #endif | |
1147 | if (ret < 0) { | |
1148 | derr << __func__ << " fsync/fdatasync failed: " << cpp_strerror(errno) << dendl; | |
1149 | ceph_abort(); | |
1150 | } | |
1151 | #ifdef HAVE_POSIX_FADVISE | |
1152 | if (cct->_conf->filestore_fadvise) | |
1153 | posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED); | |
1154 | #endif | |
1155 | } | |
1156 | ||
1157 | utime_t lat = ceph_clock_now() - from; | |
1158 | dout(20) << "do_write latency " << lat << dendl; | |
1159 | ||
1160 | write_lock.Lock(); | |
1161 | ||
1162 | assert(write_pos == pos); | |
1163 | assert(write_pos % header.alignment == 0); | |
1164 | ||
1165 | { | |
1166 | Mutex::Locker locker(finisher_lock); | |
1167 | journaled_seq = writing_seq; | |
1168 | ||
1169 | // kick finisher? | |
1170 | // only if we haven't filled up recently! | |
1171 | if (full_state != FULL_NOTFULL) { | |
1172 | dout(10) << "do_write NOT queueing finisher seq " << journaled_seq | |
1173 | << ", full_commit_seq|full_restart_seq" << dendl; | |
1174 | } else { | |
1175 | if (plug_journal_completions) { | |
1176 | dout(20) << "do_write NOT queueing finishers through seq " << journaled_seq | |
1177 | << " due to completion plug" << dendl; | |
1178 | } else { | |
1179 | dout(20) << "do_write queueing finishers through seq " << journaled_seq << dendl; | |
1180 | queue_completions_thru(journaled_seq); | |
1181 | } | |
1182 | } | |
1183 | } | |
1184 | } | |
1185 | ||
1186 | void FileJournal::flush() | |
1187 | { | |
1188 | dout(10) << "waiting for completions to empty" << dendl; | |
1189 | { | |
1190 | Mutex::Locker l(finisher_lock); | |
1191 | while (!completions_empty()) | |
1192 | finisher_cond.Wait(finisher_lock); | |
1193 | } | |
1194 | dout(10) << "flush waiting for finisher" << dendl; | |
1195 | finisher->wait_for_empty(); | |
1196 | dout(10) << "flush done" << dendl; | |
1197 | } | |
1198 | ||
1199 | ||
1200 | void FileJournal::write_thread_entry() | |
1201 | { | |
1202 | dout(10) << "write_thread_entry start" << dendl; | |
1203 | while (1) { | |
1204 | { | |
1205 | Mutex::Locker locker(writeq_lock); | |
1206 | if (writeq.empty() && !must_write_header) { | |
1207 | if (write_stop) | |
1208 | break; | |
1209 | dout(20) << "write_thread_entry going to sleep" << dendl; | |
1210 | writeq_cond.Wait(writeq_lock); | |
1211 | dout(20) << "write_thread_entry woke up" << dendl; | |
1212 | continue; | |
1213 | } | |
1214 | } | |
1215 | ||
1216 | #ifdef HAVE_LIBAIO | |
1217 | if (aio) { | |
1218 | Mutex::Locker locker(aio_lock); | |
1219 | // should we back off to limit aios in flight? try to do this | |
1220 | // adaptively so that we submit larger aios once we have lots of | |
1221 | // them in flight. | |
1222 | // | |
1223 | // NOTE: our condition here is based on aio_num (protected by | |
1224 | // aio_lock) and throttle_bytes (part of the write queue). when | |
1225 | // we sleep, we *only* wait for aio_num to change, and do not | |
1226 | // wake when more data is queued. this is not strictly correct, | |
1227 | // but should be fine given that we will have plenty of aios in | |
1228 | // flight if we hit this limit to ensure we keep the device | |
1229 | // saturated. | |
1230 | while (aio_num > 0) { | |
1231 | int exp = MIN(aio_num * 2, 24); | |
1232 | long unsigned min_new = 1ull << exp; | |
1233 | uint64_t cur = aio_write_queue_bytes; | |
1234 | dout(20) << "write_thread_entry aio throttle: aio num " << aio_num << " bytes " << aio_bytes | |
1235 | << " ... exp " << exp << " min_new " << min_new | |
1236 | << " ... pending " << cur << dendl; | |
1237 | if (cur >= min_new) | |
1238 | break; | |
1239 | dout(20) << "write_thread_entry deferring until more aios complete: " | |
1240 | << aio_num << " aios with " << aio_bytes << " bytes needs " << min_new | |
1241 | << " bytes to start a new aio (currently " << cur << " pending)" << dendl; | |
1242 | aio_cond.Wait(aio_lock); | |
1243 | dout(20) << "write_thread_entry woke up" << dendl; | |
1244 | } | |
1245 | } | |
1246 | #endif | |
1247 | ||
1248 | Mutex::Locker locker(write_lock); | |
1249 | uint64_t orig_ops = 0; | |
1250 | uint64_t orig_bytes = 0; | |
1251 | ||
1252 | bufferlist bl; | |
1253 | int r = prepare_multi_write(bl, orig_ops, orig_bytes); | |
1254 | // Don't care about journal full if stoppping, so drop queue and | |
1255 | // possibly let header get written and loop above to notice stop | |
1256 | if (r == -ENOSPC) { | |
1257 | if (write_stop) { | |
1258 | dout(20) << "write_thread_entry full and stopping, throw out queue and finish up" << dendl; | |
1259 | while (!writeq_empty()) { | |
1260 | complete_write(1, peek_write().orig_len); | |
1261 | pop_write(); | |
1262 | } | |
1263 | print_header(header); | |
1264 | r = 0; | |
1265 | } else { | |
1266 | dout(20) << "write_thread_entry full, going to sleep (waiting for commit)" << dendl; | |
1267 | commit_cond.Wait(write_lock); | |
1268 | dout(20) << "write_thread_entry woke up" << dendl; | |
1269 | continue; | |
1270 | } | |
1271 | } | |
1272 | assert(r == 0); | |
1273 | ||
1274 | if (logger) { | |
1275 | logger->inc(l_filestore_journal_wr); | |
1276 | logger->inc(l_filestore_journal_wr_bytes, bl.length()); | |
1277 | } | |
1278 | ||
1279 | #ifdef HAVE_LIBAIO | |
1280 | if (aio) | |
1281 | do_aio_write(bl); | |
1282 | else | |
1283 | do_write(bl); | |
1284 | #else | |
1285 | do_write(bl); | |
1286 | #endif | |
1287 | complete_write(orig_ops, orig_bytes); | |
1288 | } | |
1289 | ||
1290 | dout(10) << "write_thread_entry finish" << dendl; | |
1291 | } | |
1292 | ||
1293 | #ifdef HAVE_LIBAIO | |
1294 | void FileJournal::do_aio_write(bufferlist& bl) | |
1295 | { | |
1296 | ||
1297 | if (cct->_conf->journal_write_header_frequency && | |
1298 | (((++journaled_since_start) % | |
1299 | cct->_conf->journal_write_header_frequency) == 0)) { | |
1300 | must_write_header = true; | |
1301 | } | |
1302 | ||
1303 | // nothing to do? | |
1304 | if (bl.length() == 0 && !must_write_header) | |
1305 | return; | |
1306 | ||
1307 | buffer::ptr hbp; | |
1308 | if (must_write_header) { | |
1309 | must_write_header = false; | |
1310 | hbp = prepare_header(); | |
1311 | } | |
1312 | ||
1313 | // entry | |
1314 | off64_t pos = write_pos; | |
1315 | ||
1316 | dout(15) << "do_aio_write writing " << pos << "~" << bl.length() | |
1317 | << (hbp.length() ? " + header":"") | |
1318 | << dendl; | |
1319 | ||
1320 | // split? | |
1321 | off64_t split = 0; | |
1322 | if (pos + bl.length() > header.max_size) { | |
1323 | bufferlist first, second; | |
1324 | split = header.max_size - pos; | |
1325 | first.substr_of(bl, 0, split); | |
1326 | second.substr_of(bl, split, bl.length() - split); | |
1327 | assert(first.length() + second.length() == bl.length()); | |
1328 | dout(10) << "do_aio_write wrapping, first bit at " << pos << "~" << first.length() << dendl; | |
1329 | ||
1330 | if (write_aio_bl(pos, first, 0)) { | |
1331 | derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos | |
1332 | << ") failed" << dendl; | |
1333 | ceph_abort(); | |
1334 | } | |
1335 | assert(pos == header.max_size); | |
1336 | if (hbp.length()) { | |
1337 | // be sneaky: include the header in the second fragment | |
1338 | second.push_front(hbp); | |
1339 | pos = 0; // we included the header | |
1340 | } else | |
1341 | pos = get_top(); // no header, start after that | |
1342 | if (write_aio_bl(pos, second, writing_seq)) { | |
1343 | derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos | |
1344 | << ") failed" << dendl; | |
1345 | ceph_abort(); | |
1346 | } | |
1347 | } else { | |
1348 | // header too? | |
1349 | if (hbp.length()) { | |
1350 | bufferlist hbl; | |
1351 | hbl.push_back(hbp); | |
1352 | loff_t pos = 0; | |
1353 | if (write_aio_bl(pos, hbl, 0)) { | |
1354 | derr << "FileJournal::do_aio_write: write_aio_bl(header) failed" << dendl; | |
1355 | ceph_abort(); | |
1356 | } | |
1357 | } | |
1358 | ||
1359 | if (write_aio_bl(pos, bl, writing_seq)) { | |
1360 | derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos | |
1361 | << ") failed" << dendl; | |
1362 | ceph_abort(); | |
1363 | } | |
1364 | } | |
1365 | ||
1366 | write_pos = pos; | |
1367 | if (write_pos == header.max_size) | |
1368 | write_pos = get_top(); | |
1369 | assert(write_pos % header.alignment == 0); | |
1370 | } | |
1371 | ||
1372 | /** | |
1373 | * write a buffer using aio | |
1374 | * | |
1375 | * @param seq seq to trigger when this aio completes. if 0, do not update any state | |
1376 | * on completion. | |
1377 | */ | |
1378 | int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq) | |
1379 | { | |
1380 | dout(20) << "write_aio_bl " << pos << "~" << bl.length() << " seq " << seq << dendl; | |
1381 | ||
1382 | while (bl.length() > 0) { | |
1383 | int max = MIN(bl.get_num_buffers(), IOV_MAX-1); | |
1384 | iovec *iov = new iovec[max]; | |
1385 | int n = 0; | |
1386 | unsigned len = 0; | |
1387 | for (std::list<buffer::ptr>::const_iterator p = bl.buffers().begin(); | |
1388 | n < max; | |
1389 | ++p, ++n) { | |
1390 | assert(p != bl.buffers().end()); | |
1391 | iov[n].iov_base = (void *)p->c_str(); | |
1392 | iov[n].iov_len = p->length(); | |
1393 | len += p->length(); | |
1394 | } | |
1395 | ||
1396 | bufferlist tbl; | |
1397 | bl.splice(0, len, &tbl); // move bytes from bl -> tbl | |
1398 | ||
1399 | // lock only aio_queue, current aio, aio_num, aio_bytes, which may be | |
1400 | // modified in check_aio_completion | |
1401 | aio_lock.Lock(); | |
1402 | aio_queue.push_back(aio_info(tbl, pos, bl.length() > 0 ? 0 : seq)); | |
1403 | aio_info& aio = aio_queue.back(); | |
1404 | aio.iov = iov; | |
1405 | ||
1406 | io_prep_pwritev(&aio.iocb, fd, aio.iov, n, pos); | |
1407 | ||
1408 | dout(20) << "write_aio_bl .. " << aio.off << "~" << aio.len | |
1409 | << " in " << n << dendl; | |
1410 | ||
1411 | aio_num++; | |
1412 | aio_bytes += aio.len; | |
1413 | ||
1414 | // need to save current aio len to update write_pos later because current | |
1415 | // aio could be ereased from aio_queue once it is done | |
1416 | uint64_t cur_len = aio.len; | |
1417 | // unlock aio_lock because following io_submit might take time to return | |
1418 | aio_lock.Unlock(); | |
1419 | ||
1420 | iocb *piocb = &aio.iocb; | |
224ce89b | 1421 | |
7c673cae FG |
1422 | // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds |
1423 | int attempts = 16; | |
1424 | int delay = 125; | |
1425 | do { | |
1426 | int r = io_submit(aio_ctx, 1, &piocb); | |
1427 | dout(20) << "write_aio_bl io_submit return value: " << r << dendl; | |
1428 | if (r < 0) { | |
1429 | derr << "io_submit to " << aio.off << "~" << cur_len | |
1430 | << " got " << cpp_strerror(r) << dendl; | |
1431 | if (r == -EAGAIN && attempts-- > 0) { | |
1432 | usleep(delay); | |
1433 | delay *= 2; | |
1434 | continue; | |
1435 | } | |
1436 | check_align(pos, tbl); | |
1437 | assert(0 == "io_submit got unexpected error"); | |
1438 | } else { | |
1439 | break; | |
1440 | } | |
1441 | } while (true); | |
1442 | pos += cur_len; | |
1443 | } | |
1444 | aio_lock.Lock(); | |
1445 | write_finish_cond.Signal(); | |
1446 | aio_lock.Unlock(); | |
1447 | return 0; | |
1448 | } | |
1449 | #endif | |
1450 | ||
1451 | void FileJournal::write_finish_thread_entry() | |
1452 | { | |
1453 | #ifdef HAVE_LIBAIO | |
b32b8144 | 1454 | dout(10) << __func__ << " enter" << dendl; |
7c673cae FG |
1455 | while (true) { |
1456 | { | |
1457 | Mutex::Locker locker(aio_lock); | |
1458 | if (aio_queue.empty()) { | |
1459 | if (aio_stop) | |
1460 | break; | |
b32b8144 | 1461 | dout(20) << __func__ << " sleeping" << dendl; |
7c673cae FG |
1462 | write_finish_cond.Wait(aio_lock); |
1463 | continue; | |
1464 | } | |
1465 | } | |
1466 | ||
b32b8144 | 1467 | dout(20) << __func__ << " waiting for aio(s)" << dendl; |
7c673cae FG |
1468 | io_event event[16]; |
1469 | int r = io_getevents(aio_ctx, 1, 16, event, NULL); | |
1470 | if (r < 0) { | |
1471 | if (r == -EINTR) { | |
1472 | dout(0) << "io_getevents got " << cpp_strerror(r) << dendl; | |
1473 | continue; | |
1474 | } | |
1475 | derr << "io_getevents got " << cpp_strerror(r) << dendl; | |
1476 | assert(0 == "got unexpected error from io_getevents"); | |
1477 | } | |
1478 | ||
1479 | { | |
1480 | Mutex::Locker locker(aio_lock); | |
1481 | for (int i=0; i<r; i++) { | |
1482 | aio_info *ai = (aio_info *)event[i].obj; | |
1483 | if (event[i].res != ai->len) { | |
1484 | derr << "aio to " << ai->off << "~" << ai->len | |
1485 | << " returned: " << (int)event[i].res << dendl; | |
1486 | assert(0 == "unexpected aio error"); | |
1487 | } | |
b32b8144 | 1488 | dout(10) << __func__ << " aio " << ai->off |
7c673cae FG |
1489 | << "~" << ai->len << " done" << dendl; |
1490 | ai->done = true; | |
1491 | } | |
1492 | check_aio_completion(); | |
1493 | } | |
1494 | } | |
b32b8144 | 1495 | dout(10) << __func__ << " exit" << dendl; |
7c673cae FG |
1496 | #endif |
1497 | } | |
1498 | ||
1499 | #ifdef HAVE_LIBAIO | |
1500 | /** | |
1501 | * check aio_wait for completed aio, and update state appropriately. | |
1502 | */ | |
1503 | void FileJournal::check_aio_completion() | |
1504 | { | |
1505 | assert(aio_lock.is_locked()); | |
1506 | dout(20) << "check_aio_completion" << dendl; | |
1507 | ||
1508 | bool completed_something = false, signal = false; | |
1509 | uint64_t new_journaled_seq = 0; | |
1510 | ||
1511 | list<aio_info>::iterator p = aio_queue.begin(); | |
1512 | while (p != aio_queue.end() && p->done) { | |
1513 | dout(20) << "check_aio_completion completed seq " << p->seq << " " | |
1514 | << p->off << "~" << p->len << dendl; | |
1515 | if (p->seq) { | |
1516 | new_journaled_seq = p->seq; | |
1517 | completed_something = true; | |
1518 | } | |
1519 | aio_num--; | |
1520 | aio_bytes -= p->len; | |
1521 | aio_queue.erase(p++); | |
1522 | signal = true; | |
1523 | } | |
1524 | ||
1525 | if (completed_something) { | |
1526 | // kick finisher? | |
1527 | // only if we haven't filled up recently! | |
1528 | Mutex::Locker locker(finisher_lock); | |
1529 | journaled_seq = new_journaled_seq; | |
1530 | if (full_state != FULL_NOTFULL) { | |
1531 | dout(10) << "check_aio_completion NOT queueing finisher seq " << journaled_seq | |
1532 | << ", full_commit_seq|full_restart_seq" << dendl; | |
1533 | } else { | |
1534 | if (plug_journal_completions) { | |
1535 | dout(20) << "check_aio_completion NOT queueing finishers through seq " << journaled_seq | |
1536 | << " due to completion plug" << dendl; | |
1537 | } else { | |
1538 | dout(20) << "check_aio_completion queueing finishers through seq " << journaled_seq << dendl; | |
1539 | queue_completions_thru(journaled_seq); | |
1540 | } | |
1541 | } | |
1542 | } | |
1543 | if (signal) { | |
1544 | // maybe write queue was waiting for aio count to drop? | |
1545 | aio_cond.Signal(); | |
1546 | } | |
1547 | } | |
1548 | #endif | |
1549 | ||
1550 | int FileJournal::prepare_entry(vector<ObjectStore::Transaction>& tls, bufferlist* tbl) { | |
1551 | dout(10) << "prepare_entry " << tls << dendl; | |
1552 | int data_len = cct->_conf->journal_align_min_size - 1; | |
1553 | int data_align = -1; // -1 indicates that we don't care about the alignment | |
1554 | bufferlist bl; | |
1555 | for (vector<ObjectStore::Transaction>::iterator p = tls.begin(); | |
1556 | p != tls.end(); ++p) { | |
1557 | if ((int)(*p).get_data_length() > data_len) { | |
1558 | data_len = (*p).get_data_length(); | |
1559 | data_align = ((*p).get_data_alignment() - bl.length()) & ~CEPH_PAGE_MASK; | |
1560 | } | |
1561 | ::encode(*p, bl); | |
1562 | } | |
1563 | if (tbl->length()) { | |
1564 | bl.claim_append(*tbl); | |
1565 | } | |
1566 | // add it this entry | |
1567 | entry_header_t h; | |
1568 | unsigned head_size = sizeof(entry_header_t); | |
1569 | off64_t base_size = 2*head_size + bl.length(); | |
1570 | memset(&h, 0, sizeof(h)); | |
1571 | if (data_align >= 0) | |
1572 | h.pre_pad = ((unsigned int)data_align - (unsigned int)head_size) & ~CEPH_PAGE_MASK; | |
1573 | off64_t size = ROUND_UP_TO(base_size + h.pre_pad, header.alignment); | |
1574 | unsigned post_pad = size - base_size - h.pre_pad; | |
1575 | h.len = bl.length(); | |
1576 | h.post_pad = post_pad; | |
1577 | h.crc32c = bl.crc32c(0); | |
1578 | dout(10) << " len " << bl.length() << " -> " << size | |
1579 | << " (head " << head_size << " pre_pad " << h.pre_pad | |
1580 | << " bl " << bl.length() << " post_pad " << post_pad << " tail " << head_size << ")" | |
1581 | << " (bl alignment " << data_align << ")" | |
1582 | << dendl; | |
1583 | bufferlist ebl; | |
1584 | // header | |
1585 | ebl.append((const char*)&h, sizeof(h)); | |
1586 | if (h.pre_pad) { | |
1587 | ebl.push_back(buffer::create_static(h.pre_pad, zero_buf)); | |
1588 | } | |
1589 | // payload | |
1590 | ebl.claim_append(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy | |
1591 | if (h.post_pad) { | |
1592 | ebl.push_back(buffer::create_static(h.post_pad, zero_buf)); | |
1593 | } | |
1594 | // footer | |
1595 | ebl.append((const char*)&h, sizeof(h)); | |
1596 | if (directio) | |
1597 | ebl.rebuild_aligned(CEPH_DIRECTIO_ALIGNMENT); | |
1598 | tbl->claim(ebl); | |
1599 | return h.len; | |
1600 | } | |
1601 | ||
1602 | void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len, | |
1603 | Context *oncommit, TrackedOpRef osd_op) | |
1604 | { | |
1605 | // dump on queue | |
1606 | dout(5) << "submit_entry seq " << seq | |
1607 | << " len " << e.length() | |
1608 | << " (" << oncommit << ")" << dendl; | |
1609 | assert(e.length() > 0); | |
1610 | assert(e.length() < header.max_size); | |
1611 | ||
1612 | if (osd_op) | |
1613 | osd_op->mark_event("commit_queued_for_journal_write"); | |
1614 | if (logger) { | |
1615 | logger->inc(l_filestore_journal_queue_bytes, orig_len); | |
1616 | logger->inc(l_filestore_journal_queue_ops, 1); | |
1617 | } | |
1618 | ||
1619 | throttle.register_throttle_seq(seq, e.length()); | |
1620 | if (logger) { | |
1621 | logger->inc(l_filestore_journal_ops, 1); | |
1622 | logger->inc(l_filestore_journal_bytes, e.length()); | |
1623 | } | |
1624 | ||
1625 | if (osd_op) { | |
1626 | osd_op->mark_event("commit_queued_for_journal_write"); | |
1627 | if (osd_op->store_trace) { | |
1628 | osd_op->journal_trace.init("journal", &trace_endpoint, &osd_op->store_trace); | |
1629 | osd_op->journal_trace.event("submit_entry"); | |
1630 | osd_op->journal_trace.keyval("seq", seq); | |
1631 | } | |
1632 | } | |
1633 | { | |
1634 | Mutex::Locker l1(writeq_lock); | |
1635 | #ifdef HAVE_LIBAIO | |
1636 | Mutex::Locker l2(aio_lock); | |
1637 | #endif | |
1638 | Mutex::Locker l3(completions_lock); | |
1639 | ||
1640 | #ifdef HAVE_LIBAIO | |
1641 | aio_write_queue_ops++; | |
1642 | aio_write_queue_bytes += e.length(); | |
1643 | aio_cond.Signal(); | |
1644 | #endif | |
1645 | ||
1646 | completions.push_back( | |
1647 | completion_item( | |
1648 | seq, oncommit, ceph_clock_now(), osd_op)); | |
1649 | if (writeq.empty()) | |
1650 | writeq_cond.Signal(); | |
1651 | writeq.push_back(write_item(seq, e, orig_len, osd_op)); | |
1652 | if (osd_op) | |
1653 | osd_op->journal_trace.keyval("queue depth", writeq.size()); | |
1654 | } | |
1655 | } | |
1656 | ||
1657 | bool FileJournal::writeq_empty() | |
1658 | { | |
1659 | Mutex::Locker locker(writeq_lock); | |
1660 | return writeq.empty(); | |
1661 | } | |
1662 | ||
1663 | FileJournal::write_item &FileJournal::peek_write() | |
1664 | { | |
1665 | assert(write_lock.is_locked()); | |
1666 | Mutex::Locker locker(writeq_lock); | |
1667 | return writeq.front(); | |
1668 | } | |
1669 | ||
1670 | void FileJournal::pop_write() | |
1671 | { | |
1672 | assert(write_lock.is_locked()); | |
1673 | Mutex::Locker locker(writeq_lock); | |
1674 | if (logger) { | |
1675 | logger->dec(l_filestore_journal_queue_bytes, writeq.front().orig_len); | |
1676 | logger->dec(l_filestore_journal_queue_ops, 1); | |
1677 | } | |
1678 | writeq.pop_front(); | |
1679 | } | |
1680 | ||
1681 | void FileJournal::batch_pop_write(list<write_item> &items) | |
1682 | { | |
1683 | assert(write_lock.is_locked()); | |
1684 | { | |
1685 | Mutex::Locker locker(writeq_lock); | |
1686 | writeq.swap(items); | |
1687 | } | |
1688 | for (auto &&i : items) { | |
1689 | if (logger) { | |
1690 | logger->dec(l_filestore_journal_queue_bytes, i.orig_len); | |
1691 | logger->dec(l_filestore_journal_queue_ops, 1); | |
1692 | } | |
1693 | } | |
1694 | } | |
1695 | ||
1696 | void FileJournal::batch_unpop_write(list<write_item> &items) | |
1697 | { | |
1698 | assert(write_lock.is_locked()); | |
1699 | for (auto &&i : items) { | |
1700 | if (logger) { | |
1701 | logger->inc(l_filestore_journal_queue_bytes, i.orig_len); | |
1702 | logger->inc(l_filestore_journal_queue_ops, 1); | |
1703 | } | |
1704 | } | |
1705 | Mutex::Locker locker(writeq_lock); | |
1706 | writeq.splice(writeq.begin(), items); | |
1707 | } | |
1708 | ||
1709 | void FileJournal::commit_start(uint64_t seq) | |
1710 | { | |
1711 | dout(10) << "commit_start" << dendl; | |
1712 | ||
1713 | // was full? | |
1714 | switch (full_state) { | |
1715 | case FULL_NOTFULL: | |
1716 | break; // all good | |
1717 | ||
1718 | case FULL_FULL: | |
1719 | if (seq >= journaled_seq) { | |
1720 | dout(1) << " FULL_FULL -> FULL_WAIT. commit_start on seq " | |
1721 | << seq << " > journaled_seq " << journaled_seq | |
1722 | << ", moving to FULL_WAIT." | |
1723 | << dendl; | |
1724 | full_state = FULL_WAIT; | |
1725 | } else { | |
1726 | dout(1) << "FULL_FULL commit_start on seq " | |
1727 | << seq << " < journaled_seq " << journaled_seq | |
1728 | << ", remaining in FULL_FULL" | |
1729 | << dendl; | |
1730 | } | |
1731 | break; | |
1732 | ||
1733 | case FULL_WAIT: | |
1734 | dout(1) << " FULL_WAIT -> FULL_NOTFULL. journal now active, setting completion plug." << dendl; | |
1735 | full_state = FULL_NOTFULL; | |
1736 | plug_journal_completions = true; | |
1737 | break; | |
1738 | } | |
1739 | } | |
1740 | ||
1741 | /* | |
1742 | *send discard command to joural block deivce | |
1743 | */ | |
1744 | void FileJournal::do_discard(int64_t offset, int64_t end) | |
1745 | { | |
1746 | dout(10) << __func__ << "trim(" << offset << ", " << end << dendl; | |
1747 | ||
1748 | offset = ROUND_UP_TO(offset, block_size); | |
1749 | if (offset >= end) | |
1750 | return; | |
1751 | end = ROUND_UP_TO(end - block_size, block_size); | |
1752 | assert(end >= offset); | |
1753 | if (offset < end) | |
1754 | if (block_device_discard(fd, offset, end - offset) < 0) | |
1755 | dout(1) << __func__ << "ioctl(BLKDISCARD) error:" << cpp_strerror(errno) << dendl; | |
1756 | } | |
1757 | ||
1758 | void FileJournal::committed_thru(uint64_t seq) | |
1759 | { | |
1760 | Mutex::Locker locker(write_lock); | |
1761 | ||
1762 | auto released = throttle.flush(seq); | |
1763 | if (logger) { | |
1764 | logger->dec(l_filestore_journal_ops, released.first); | |
1765 | logger->dec(l_filestore_journal_bytes, released.second); | |
1766 | } | |
1767 | ||
1768 | if (seq < last_committed_seq) { | |
1769 | dout(5) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl; | |
1770 | assert(seq >= last_committed_seq); | |
1771 | return; | |
1772 | } | |
1773 | if (seq == last_committed_seq) { | |
1774 | dout(5) << "committed_thru " << seq << " == last_committed_seq " << last_committed_seq << dendl; | |
1775 | return; | |
1776 | } | |
1777 | ||
1778 | dout(5) << "committed_thru " << seq << " (last_committed_seq " << last_committed_seq << ")" << dendl; | |
1779 | last_committed_seq = seq; | |
1780 | ||
1781 | // completions! | |
1782 | { | |
1783 | Mutex::Locker locker(finisher_lock); | |
1784 | queue_completions_thru(seq); | |
1785 | if (plug_journal_completions && seq >= header.start_seq) { | |
1786 | dout(10) << " removing completion plug, queuing completions thru journaled_seq " << journaled_seq << dendl; | |
1787 | plug_journal_completions = false; | |
1788 | queue_completions_thru(journaled_seq); | |
1789 | } | |
1790 | } | |
1791 | ||
1792 | // adjust start pointer | |
1793 | while (!journalq.empty() && journalq.front().first <= seq) { | |
1794 | journalq.pop_front(); | |
1795 | } | |
1796 | ||
1797 | int64_t old_start = header.start; | |
1798 | if (!journalq.empty()) { | |
1799 | header.start = journalq.front().second; | |
1800 | header.start_seq = journalq.front().first; | |
1801 | } else { | |
1802 | header.start = write_pos; | |
1803 | header.start_seq = seq + 1; | |
1804 | } | |
1805 | ||
1806 | if (discard) { | |
1807 | dout(10) << __func__ << " will trim (" << old_start << ", " << header.start << ")" << dendl; | |
1808 | if (old_start < header.start) | |
1809 | do_discard(old_start, header.start - 1); | |
1810 | else { | |
1811 | do_discard(old_start, header.max_size - 1); | |
1812 | do_discard(get_top(), header.start - 1); | |
1813 | } | |
1814 | } | |
1815 | ||
1816 | must_write_header = true; | |
1817 | print_header(header); | |
1818 | ||
1819 | // committed but unjournaled items | |
1820 | while (!writeq_empty() && peek_write().seq <= seq) { | |
1821 | dout(15) << " dropping committed but unwritten seq " << peek_write().seq | |
1822 | << " len " << peek_write().bl.length() | |
1823 | << dendl; | |
1824 | complete_write(1, peek_write().orig_len); | |
1825 | pop_write(); | |
1826 | } | |
1827 | ||
1828 | commit_cond.Signal(); | |
1829 | ||
1830 | dout(10) << "committed_thru done" << dendl; | |
1831 | } | |
1832 | ||
1833 | ||
1834 | void FileJournal::complete_write(uint64_t ops, uint64_t bytes) | |
1835 | { | |
1836 | dout(5) << __func__ << " finished " << ops << " ops and " | |
1837 | << bytes << " bytes" << dendl; | |
1838 | } | |
1839 | ||
1840 | int FileJournal::make_writeable() | |
1841 | { | |
1842 | dout(10) << __func__ << dendl; | |
1843 | int r = set_throttle_params(); | |
1844 | if (r < 0) | |
1845 | return r; | |
1846 | ||
1847 | r = _open(true); | |
1848 | if (r < 0) | |
1849 | return r; | |
1850 | ||
1851 | if (read_pos > 0) | |
1852 | write_pos = read_pos; | |
1853 | else | |
1854 | write_pos = get_top(); | |
1855 | read_pos = 0; | |
1856 | ||
1857 | must_write_header = true; | |
1858 | ||
1859 | start_writer(); | |
1860 | return 0; | |
1861 | } | |
1862 | ||
1863 | int FileJournal::set_throttle_params() | |
1864 | { | |
1865 | stringstream ss; | |
1866 | bool valid = throttle.set_params( | |
1867 | cct->_conf->journal_throttle_low_threshhold, | |
1868 | cct->_conf->journal_throttle_high_threshhold, | |
1869 | cct->_conf->filestore_expected_throughput_bytes, | |
1870 | cct->_conf->journal_throttle_high_multiple, | |
1871 | cct->_conf->journal_throttle_max_multiple, | |
1872 | header.max_size - get_top(), | |
1873 | &ss); | |
1874 | ||
1875 | if (!valid) { | |
1876 | derr << "tried to set invalid params: " | |
1877 | << ss.str() | |
1878 | << dendl; | |
1879 | } | |
1880 | return valid ? 0 : -EINVAL; | |
1881 | } | |
1882 | ||
1883 | const char** FileJournal::get_tracked_conf_keys() const | |
1884 | { | |
1885 | static const char *KEYS[] = { | |
1886 | "journal_throttle_low_threshhold", | |
1887 | "journal_throttle_high_threshhold", | |
1888 | "journal_throttle_high_multiple", | |
1889 | "journal_throttle_max_multiple", | |
1890 | "filestore_expected_throughput_bytes", | |
1891 | NULL}; | |
1892 | return KEYS; | |
1893 | } | |
1894 | ||
1895 | void FileJournal::wrap_read_bl( | |
1896 | off64_t pos, | |
1897 | int64_t olen, | |
1898 | bufferlist* bl, | |
1899 | off64_t *out_pos | |
1900 | ) const | |
1901 | { | |
1902 | while (olen > 0) { | |
1903 | while (pos >= header.max_size) | |
1904 | pos = pos + get_top() - header.max_size; | |
1905 | ||
1906 | int64_t len; | |
1907 | if (pos + olen > header.max_size) | |
1908 | len = header.max_size - pos; // partial | |
1909 | else | |
1910 | len = olen; // rest | |
1911 | ||
1912 | int64_t actual = ::lseek64(fd, pos, SEEK_SET); | |
1913 | assert(actual == pos); | |
1914 | ||
1915 | bufferptr bp = buffer::create(len); | |
1916 | int r = safe_read_exact(fd, bp.c_str(), len); | |
1917 | if (r) { | |
1918 | derr << "FileJournal::wrap_read_bl: safe_read_exact " << pos << "~" << len << " returned " | |
1919 | << r << dendl; | |
1920 | ceph_abort(); | |
1921 | } | |
1922 | bl->push_back(std::move(bp)); | |
1923 | pos += len; | |
1924 | olen -= len; | |
1925 | } | |
1926 | if (pos >= header.max_size) | |
1927 | pos = pos + get_top() - header.max_size; | |
1928 | if (out_pos) | |
1929 | *out_pos = pos; | |
1930 | } | |
1931 | ||
1932 | bool FileJournal::read_entry( | |
1933 | bufferlist &bl, | |
1934 | uint64_t &next_seq, | |
1935 | bool *corrupt) | |
1936 | { | |
1937 | if (corrupt) | |
1938 | *corrupt = false; | |
1939 | uint64_t seq = next_seq; | |
1940 | ||
1941 | if (!read_pos) { | |
1942 | dout(2) << "read_entry -- not readable" << dendl; | |
1943 | return false; | |
1944 | } | |
1945 | ||
1946 | off64_t pos = read_pos; | |
1947 | off64_t next_pos = pos; | |
1948 | stringstream ss; | |
1949 | read_entry_result result = do_read_entry( | |
1950 | pos, | |
1951 | &next_pos, | |
1952 | &bl, | |
1953 | &seq, | |
1954 | &ss); | |
1955 | if (result == SUCCESS) { | |
1956 | journalq.push_back( pair<uint64_t,off64_t>(seq, pos)); | |
1957 | uint64_t amount_to_take = | |
1958 | next_pos > pos ? | |
1959 | next_pos - pos : | |
1960 | (header.max_size - pos) + (next_pos - get_top()); | |
1961 | throttle.take(amount_to_take); | |
1962 | throttle.register_throttle_seq(next_seq, amount_to_take); | |
1963 | if (logger) { | |
1964 | logger->inc(l_filestore_journal_ops, 1); | |
1965 | logger->inc(l_filestore_journal_bytes, amount_to_take); | |
1966 | } | |
1967 | if (next_seq > seq) { | |
1968 | return false; | |
1969 | } else { | |
1970 | read_pos = next_pos; | |
1971 | next_seq = seq; | |
1972 | if (seq > journaled_seq) | |
1973 | journaled_seq = seq; | |
1974 | return true; | |
1975 | } | |
3efd9988 FG |
1976 | } else { |
1977 | derr << "do_read_entry(" << pos << "): " << ss.str() << dendl; | |
7c673cae FG |
1978 | } |
1979 | ||
1980 | if (seq && seq < header.committed_up_to) { | |
1981 | derr << "Unable to read past sequence " << seq | |
1982 | << " but header indicates the journal has committed up through " | |
1983 | << header.committed_up_to << ", journal is corrupt" << dendl; | |
1984 | if (cct->_conf->journal_ignore_corruption) { | |
1985 | if (corrupt) | |
1986 | *corrupt = true; | |
1987 | return false; | |
1988 | } else { | |
1989 | ceph_abort(); | |
1990 | } | |
1991 | } | |
1992 | ||
7c673cae FG |
1993 | dout(2) << "No further valid entries found, journal is most likely valid" |
1994 | << dendl; | |
1995 | return false; | |
1996 | } | |
1997 | ||
1998 | FileJournal::read_entry_result FileJournal::do_read_entry( | |
1999 | off64_t init_pos, | |
2000 | off64_t *next_pos, | |
2001 | bufferlist *bl, | |
2002 | uint64_t *seq, | |
2003 | ostream *ss, | |
2004 | entry_header_t *_h) const | |
2005 | { | |
2006 | off64_t cur_pos = init_pos; | |
2007 | bufferlist _bl; | |
2008 | if (!bl) | |
2009 | bl = &_bl; | |
2010 | ||
2011 | // header | |
2012 | entry_header_t *h; | |
2013 | bufferlist hbl; | |
2014 | off64_t _next_pos; | |
2015 | wrap_read_bl(cur_pos, sizeof(*h), &hbl, &_next_pos); | |
2016 | h = reinterpret_cast<entry_header_t *>(hbl.c_str()); | |
2017 | ||
2018 | if (!h->check_magic(cur_pos, header.get_fsid64())) { | |
2019 | dout(25) << "read_entry " << init_pos | |
2020 | << " : bad header magic, end of journal" << dendl; | |
2021 | if (ss) | |
2022 | *ss << "bad header magic"; | |
2023 | if (next_pos) | |
2024 | *next_pos = init_pos + (4<<10); // check 4k ahead | |
2025 | return MAYBE_CORRUPT; | |
2026 | } | |
2027 | cur_pos = _next_pos; | |
2028 | ||
2029 | // pad + body + pad | |
2030 | if (h->pre_pad) | |
2031 | cur_pos += h->pre_pad; | |
2032 | ||
2033 | bl->clear(); | |
2034 | wrap_read_bl(cur_pos, h->len, bl, &cur_pos); | |
2035 | ||
2036 | if (h->post_pad) | |
2037 | cur_pos += h->post_pad; | |
2038 | ||
2039 | // footer | |
2040 | entry_header_t *f; | |
2041 | bufferlist fbl; | |
2042 | wrap_read_bl(cur_pos, sizeof(*f), &fbl, &cur_pos); | |
2043 | f = reinterpret_cast<entry_header_t *>(fbl.c_str()); | |
2044 | if (memcmp(f, h, sizeof(*f))) { | |
2045 | if (ss) | |
2046 | *ss << "bad footer magic, partial entry"; | |
2047 | if (next_pos) | |
2048 | *next_pos = cur_pos; | |
2049 | return MAYBE_CORRUPT; | |
2050 | } | |
2051 | ||
2052 | if ((header.flags & header_t::FLAG_CRC) || // if explicitly enabled (new journal) | |
2053 | h->crc32c != 0) { // newer entry in old journal | |
2054 | uint32_t actual_crc = bl->crc32c(0); | |
2055 | if (actual_crc != h->crc32c) { | |
2056 | if (ss) | |
2057 | *ss << "header crc (" << h->crc32c | |
2058 | << ") doesn't match body crc (" << actual_crc << ")"; | |
2059 | if (next_pos) | |
2060 | *next_pos = cur_pos; | |
2061 | return MAYBE_CORRUPT; | |
2062 | } | |
2063 | } | |
2064 | ||
2065 | // yay! | |
2066 | dout(2) << "read_entry " << init_pos << " : seq " << h->seq | |
2067 | << " " << h->len << " bytes" | |
2068 | << dendl; | |
2069 | ||
2070 | // ok! | |
2071 | if (seq) | |
2072 | *seq = h->seq; | |
2073 | ||
2074 | ||
2075 | if (next_pos) | |
2076 | *next_pos = cur_pos; | |
2077 | ||
2078 | if (_h) | |
2079 | *_h = *h; | |
2080 | ||
2081 | assert(cur_pos % header.alignment == 0); | |
2082 | return SUCCESS; | |
2083 | } | |
2084 | ||
2085 | void FileJournal::reserve_throttle_and_backoff(uint64_t count) | |
2086 | { | |
2087 | throttle.get(count); | |
2088 | } | |
2089 | ||
2090 | void FileJournal::get_header( | |
2091 | uint64_t wanted_seq, | |
2092 | off64_t *_pos, | |
2093 | entry_header_t *h) | |
2094 | { | |
2095 | off64_t pos = header.start; | |
2096 | off64_t next_pos = pos; | |
2097 | bufferlist bl; | |
2098 | uint64_t seq = 0; | |
2099 | dout(2) << __func__ << dendl; | |
2100 | while (1) { | |
2101 | bl.clear(); | |
2102 | pos = next_pos; | |
2103 | read_entry_result result = do_read_entry( | |
2104 | pos, | |
2105 | &next_pos, | |
2106 | &bl, | |
2107 | &seq, | |
2108 | 0, | |
2109 | h); | |
2110 | if (result == FAILURE || result == MAYBE_CORRUPT) | |
2111 | ceph_abort(); | |
2112 | if (seq == wanted_seq) { | |
2113 | if (_pos) | |
2114 | *_pos = pos; | |
2115 | return; | |
2116 | } | |
2117 | } | |
2118 | ceph_abort(); // not reachable | |
2119 | } | |
2120 | ||
2121 | void FileJournal::corrupt( | |
2122 | int wfd, | |
2123 | off64_t corrupt_at) | |
2124 | { | |
2125 | dout(2) << __func__ << dendl; | |
2126 | if (corrupt_at >= header.max_size) | |
2127 | corrupt_at = corrupt_at + get_top() - header.max_size; | |
2128 | ||
2129 | int64_t actual = ::lseek64(fd, corrupt_at, SEEK_SET); | |
2130 | assert(actual == corrupt_at); | |
2131 | ||
2132 | char buf[10]; | |
2133 | int r = safe_read_exact(fd, buf, 1); | |
2134 | assert(r == 0); | |
2135 | ||
2136 | actual = ::lseek64(wfd, corrupt_at, SEEK_SET); | |
2137 | assert(actual == corrupt_at); | |
2138 | ||
2139 | buf[0]++; | |
2140 | r = safe_write(wfd, buf, 1); | |
2141 | assert(r == 0); | |
2142 | } | |
2143 | ||
2144 | void FileJournal::corrupt_payload( | |
2145 | int wfd, | |
2146 | uint64_t seq) | |
2147 | { | |
2148 | dout(2) << __func__ << dendl; | |
2149 | off64_t pos = 0; | |
2150 | entry_header_t h; | |
2151 | get_header(seq, &pos, &h); | |
2152 | off64_t corrupt_at = | |
2153 | pos + sizeof(entry_header_t) + h.pre_pad; | |
2154 | corrupt(wfd, corrupt_at); | |
2155 | } | |
2156 | ||
2157 | ||
2158 | void FileJournal::corrupt_footer_magic( | |
2159 | int wfd, | |
2160 | uint64_t seq) | |
2161 | { | |
2162 | dout(2) << __func__ << dendl; | |
2163 | off64_t pos = 0; | |
2164 | entry_header_t h; | |
2165 | get_header(seq, &pos, &h); | |
2166 | off64_t corrupt_at = | |
2167 | pos + sizeof(entry_header_t) + h.pre_pad + | |
2168 | h.len + h.post_pad + | |
2169 | (reinterpret_cast<char*>(&h.magic2) - reinterpret_cast<char*>(&h)); | |
2170 | corrupt(wfd, corrupt_at); | |
2171 | } | |
2172 | ||
2173 | ||
2174 | void FileJournal::corrupt_header_magic( | |
2175 | int wfd, | |
2176 | uint64_t seq) | |
2177 | { | |
2178 | dout(2) << __func__ << dendl; | |
2179 | off64_t pos = 0; | |
2180 | entry_header_t h; | |
2181 | get_header(seq, &pos, &h); | |
2182 | off64_t corrupt_at = | |
2183 | pos + | |
2184 | (reinterpret_cast<char*>(&h.magic2) - reinterpret_cast<char*>(&h)); | |
2185 | corrupt(wfd, corrupt_at); | |
2186 | } | |
2187 | ||
2188 | off64_t FileJournal::get_journal_size_estimate() | |
2189 | { | |
2190 | off64_t size, start = header.start; | |
2191 | if (write_pos < start) { | |
2192 | size = (max_size - start) + write_pos; | |
2193 | } else { | |
2194 | size = write_pos - start; | |
2195 | } | |
2196 | dout(20) << __func__ << " journal size=" << size << dendl; | |
2197 | return size; | |
2198 | } |