]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | #include "acconfig.h" | |
15 | ||
16 | #include "common/debug.h" | |
17 | #include "common/errno.h" | |
18 | #include "common/safe_io.h" | |
19 | #include "FileJournal.h" | |
20 | #include "include/color.h" | |
21 | #include "common/perf_counters.h" | |
22 | #include "FileStore.h" | |
23 | ||
24 | #include "include/compat.h" | |
25 | ||
26 | #include <fcntl.h> | |
27 | #include <limits.h> | |
28 | #include <sstream> | |
29 | #include <stdio.h> | |
30 | #include <stdlib.h> | |
31 | #include <sys/types.h> | |
32 | #include <sys/stat.h> | |
33 | #include <sys/mount.h> | |
34 | ||
35 | #include "common/blkdev.h" | |
36 | #if defined(__linux__) | |
37 | #include "common/linux_version.h" | |
38 | #endif | |
39 | ||
40 | #if defined(__FreeBSD__) | |
41 | #define O_DSYNC O_SYNC | |
42 | #endif | |
43 | ||
44 | #define dout_context cct | |
45 | #define dout_subsys ceph_subsys_journal | |
46 | #undef dout_prefix | |
47 | #define dout_prefix *_dout << "journal " | |
48 | ||
49 | const static int64_t ONE_MEG(1 << 20); | |
50 | const static int CEPH_DIRECTIO_ALIGNMENT(4096); | |
51 | ||
52 | ||
53 | int FileJournal::_open(bool forwrite, bool create) | |
54 | { | |
55 | int flags, ret; | |
56 | ||
57 | if (forwrite) { | |
58 | flags = O_RDWR; | |
59 | if (directio) | |
60 | flags |= O_DIRECT | O_DSYNC; | |
61 | } else { | |
62 | flags = O_RDONLY; | |
63 | } | |
64 | if (create) | |
65 | flags |= O_CREAT; | |
66 | ||
67 | if (fd >= 0) { | |
68 | if (TEMP_FAILURE_RETRY(::close(fd))) { | |
69 | int err = errno; | |
70 | derr << "FileJournal::_open: error closing old fd: " | |
71 | << cpp_strerror(err) << dendl; | |
72 | } | |
73 | } | |
74 | fd = TEMP_FAILURE_RETRY(::open(fn.c_str(), flags, 0644)); | |
75 | if (fd < 0) { | |
76 | int err = errno; | |
77 | dout(2) << "FileJournal::_open unable to open journal " | |
78 | << fn << ": " << cpp_strerror(err) << dendl; | |
79 | return -err; | |
80 | } | |
81 | ||
82 | struct stat st; | |
83 | ret = ::fstat(fd, &st); | |
84 | if (ret) { | |
85 | ret = errno; | |
86 | derr << "FileJournal::_open: unable to fstat journal: " << cpp_strerror(ret) << dendl; | |
87 | ret = -ret; | |
88 | goto out_fd; | |
89 | } | |
90 | ||
91 | if (S_ISBLK(st.st_mode)) { | |
92 | ret = _open_block_device(); | |
93 | } else if (S_ISREG(st.st_mode)) { | |
94 | if (aio && !force_aio) { | |
95 | derr << "FileJournal::_open: disabling aio for non-block journal. Use " | |
96 | << "journal_force_aio to force use of aio anyway" << dendl; | |
97 | aio = false; | |
98 | } | |
99 | ret = _open_file(st.st_size, st.st_blksize, create); | |
100 | } else { | |
101 | derr << "FileJournal::_open: wrong journal file type: " << st.st_mode | |
102 | << dendl; | |
103 | ret = -EINVAL; | |
104 | } | |
105 | ||
106 | if (ret) | |
107 | goto out_fd; | |
108 | ||
109 | #ifdef HAVE_LIBAIO | |
110 | if (aio) { | |
111 | aio_ctx = 0; | |
112 | ret = io_setup(128, &aio_ctx); | |
113 | if (ret < 0) { | |
114 | switch (ret) { | |
115 | // Contrary to naive expectations -EAGIAN means ... | |
116 | case -EAGAIN: | |
117 | derr << "FileJournal::_open: user's limit of aio events exceeded. " | |
118 | << "Try increasing /proc/sys/fs/aio-max-nr" << dendl; | |
119 | break; | |
120 | default: | |
121 | derr << "FileJournal::_open: unable to setup io_context " << cpp_strerror(-ret) << dendl; | |
122 | break; | |
123 | } | |
124 | goto out_fd; | |
125 | } | |
126 | } | |
127 | #endif | |
128 | ||
129 | /* We really want max_size to be a multiple of block_size. */ | |
130 | max_size -= max_size % block_size; | |
131 | ||
132 | dout(1) << "_open " << fn << " fd " << fd | |
133 | << ": " << max_size | |
134 | << " bytes, block size " << block_size | |
135 | << " bytes, directio = " << directio | |
136 | << ", aio = " << aio | |
137 | << dendl; | |
138 | return 0; | |
139 | ||
140 | out_fd: | |
141 | VOID_TEMP_FAILURE_RETRY(::close(fd)); | |
142 | fd = -1; | |
143 | return ret; | |
144 | } | |
145 | ||
146 | int FileJournal::_open_block_device() | |
147 | { | |
148 | int64_t bdev_sz = 0; | |
149 | int ret = get_block_device_size(fd, &bdev_sz); | |
150 | if (ret) { | |
151 | dout(0) << __func__ << ": failed to read block device size." << dendl; | |
152 | return -EIO; | |
153 | } | |
154 | ||
155 | /* Check for bdev_sz too small */ | |
156 | if (bdev_sz < ONE_MEG) { | |
157 | dout(0) << __func__ << ": your block device must be at least " | |
158 | << ONE_MEG << " bytes to be used for a Ceph journal." << dendl; | |
159 | return -EINVAL; | |
160 | } | |
161 | ||
162 | dout(10) << __func__ << ": ignoring osd journal size. " | |
163 | << "We'll use the entire block device (size: " << bdev_sz << ")" | |
164 | << dendl; | |
165 | max_size = bdev_sz; | |
166 | ||
167 | block_size = cct->_conf->journal_block_size; | |
168 | ||
169 | if (cct->_conf->journal_discard) { | |
170 | discard = block_device_support_discard(fn.c_str()); | |
171 | dout(10) << fn << " support discard: " << (int)discard << dendl; | |
172 | } | |
173 | ||
174 | return 0; | |
175 | } | |
176 | ||
177 | int FileJournal::_open_file(int64_t oldsize, blksize_t blksize, | |
178 | bool create) | |
179 | { | |
180 | int ret; | |
181 | int64_t conf_journal_sz(cct->_conf->osd_journal_size); | |
182 | conf_journal_sz <<= 20; | |
183 | ||
184 | if ((cct->_conf->osd_journal_size == 0) && (oldsize < ONE_MEG)) { | |
185 | derr << "I'm sorry, I don't know how large of a journal to create." | |
186 | << "Please specify a block device to use as the journal OR " | |
187 | << "set osd_journal_size in your ceph.conf" << dendl; | |
188 | return -EINVAL; | |
189 | } | |
190 | ||
191 | if (create && (oldsize < conf_journal_sz)) { | |
192 | uint64_t newsize(conf_journal_sz); | |
193 | dout(10) << __func__ << " _open extending to " << newsize << " bytes" << dendl; | |
194 | ret = ::ftruncate(fd, newsize); | |
195 | if (ret < 0) { | |
196 | int err = errno; | |
197 | derr << "FileJournal::_open_file : unable to extend journal to " | |
198 | << newsize << " bytes: " << cpp_strerror(err) << dendl; | |
199 | return -err; | |
200 | } | |
28e407b8 | 201 | ret = ceph_posix_fallocate(fd, 0, newsize); |
7c673cae FG |
202 | if (ret) { |
203 | derr << "FileJournal::_open_file : unable to preallocation journal to " | |
204 | << newsize << " bytes: " << cpp_strerror(ret) << dendl; | |
205 | return -ret; | |
206 | } | |
207 | max_size = newsize; | |
7c673cae FG |
208 | } |
209 | else { | |
210 | max_size = oldsize; | |
211 | } | |
212 | block_size = cct->_conf->journal_block_size; | |
213 | ||
214 | if (create && cct->_conf->journal_zero_on_create) { | |
215 | derr << "FileJournal::_open_file : zeroing journal" << dendl; | |
216 | uint64_t write_size = 1 << 20; | |
217 | char *buf; | |
218 | ret = ::posix_memalign((void **)&buf, block_size, write_size); | |
219 | if (ret != 0) { | |
220 | return -ret; | |
221 | } | |
222 | memset(static_cast<void*>(buf), 0, write_size); | |
223 | uint64_t i = 0; | |
224 | for (; (i + write_size) <= (uint64_t)max_size; i += write_size) { | |
225 | ret = ::pwrite(fd, static_cast<void*>(buf), write_size, i); | |
226 | if (ret < 0) { | |
227 | free(buf); | |
228 | return -errno; | |
229 | } | |
230 | } | |
231 | if (i < (uint64_t)max_size) { | |
232 | ret = ::pwrite(fd, static_cast<void*>(buf), max_size - i, i); | |
233 | if (ret < 0) { | |
234 | free(buf); | |
235 | return -errno; | |
236 | } | |
237 | } | |
238 | free(buf); | |
239 | } | |
240 | ||
241 | ||
242 | dout(10) << "_open journal is not a block device, NOT checking disk " | |
243 | << "write cache on '" << fn << "'" << dendl; | |
244 | ||
245 | return 0; | |
246 | } | |
247 | ||
248 | // This can not be used on an active journal | |
249 | int FileJournal::check() | |
250 | { | |
251 | int ret; | |
252 | ||
253 | assert(fd == -1); | |
254 | ret = _open(false, false); | |
255 | if (ret) | |
256 | return ret; | |
257 | ||
258 | ret = read_header(&header); | |
259 | if (ret < 0) | |
260 | goto done; | |
261 | ||
262 | if (header.fsid != fsid) { | |
263 | derr << "check: ondisk fsid " << header.fsid << " doesn't match expected " << fsid | |
264 | << ", invalid (someone else's?) journal" << dendl; | |
265 | ret = -EINVAL; | |
266 | goto done; | |
267 | } | |
268 | ||
269 | dout(1) << "check: header looks ok" << dendl; | |
270 | ret = 0; | |
271 | ||
272 | done: | |
273 | close(); | |
274 | return ret; | |
275 | } | |
276 | ||
277 | ||
278 | int FileJournal::create() | |
279 | { | |
280 | void *buf = 0; | |
281 | int64_t needed_space; | |
282 | int ret; | |
283 | buffer::ptr bp; | |
284 | dout(2) << "create " << fn << " fsid " << fsid << dendl; | |
285 | ||
286 | ret = _open(true, true); | |
287 | if (ret) | |
288 | goto done; | |
289 | ||
290 | // write empty header | |
291 | header = header_t(); | |
292 | header.flags = header_t::FLAG_CRC; // enable crcs on any new journal. | |
293 | header.fsid = fsid; | |
294 | header.max_size = max_size; | |
295 | header.block_size = block_size; | |
296 | if (cct->_conf->journal_block_align || directio) | |
297 | header.alignment = block_size; | |
298 | else | |
299 | header.alignment = 16; // at least stay word aligned on 64bit machines... | |
300 | ||
301 | header.start = get_top(); | |
302 | header.start_seq = 0; | |
303 | ||
304 | print_header(header); | |
305 | ||
306 | // static zeroed buffer for alignment padding | |
307 | delete [] zero_buf; | |
308 | zero_buf = new char[header.alignment]; | |
309 | memset(zero_buf, 0, header.alignment); | |
310 | ||
311 | bp = prepare_header(); | |
312 | if (TEMP_FAILURE_RETRY(::pwrite(fd, bp.c_str(), bp.length(), 0)) < 0) { | |
313 | ret = -errno; | |
314 | derr << "FileJournal::create : create write header error " | |
315 | << cpp_strerror(ret) << dendl; | |
316 | goto close_fd; | |
317 | } | |
318 | ||
319 | // zero first little bit, too. | |
320 | ret = posix_memalign(&buf, block_size, block_size); | |
321 | if (ret) { | |
322 | ret = -ret; | |
323 | derr << "FileJournal::create: failed to allocate " << block_size | |
324 | << " bytes of memory: " << cpp_strerror(ret) << dendl; | |
325 | goto close_fd; | |
326 | } | |
327 | memset(buf, 0, block_size); | |
328 | if (TEMP_FAILURE_RETRY(::pwrite(fd, buf, block_size, get_top())) < 0) { | |
329 | ret = -errno; | |
330 | derr << "FileJournal::create: error zeroing first " << block_size | |
331 | << " bytes " << cpp_strerror(ret) << dendl; | |
332 | goto free_buf; | |
333 | } | |
334 | ||
335 | needed_space = ((int64_t)cct->_conf->osd_max_write_size) << 20; | |
336 | needed_space += (2 * sizeof(entry_header_t)) + get_top(); | |
337 | if (header.max_size - header.start < needed_space) { | |
338 | derr << "FileJournal::create: OSD journal is not large enough to hold " | |
339 | << "osd_max_write_size bytes!" << dendl; | |
340 | ret = -ENOSPC; | |
341 | goto free_buf; | |
342 | } | |
343 | ||
344 | dout(2) << "create done" << dendl; | |
345 | ret = 0; | |
346 | ||
347 | free_buf: | |
348 | free(buf); | |
349 | buf = 0; | |
350 | close_fd: | |
351 | if (TEMP_FAILURE_RETRY(::close(fd)) < 0) { | |
352 | ret = -errno; | |
353 | derr << "FileJournal::create: error closing fd: " << cpp_strerror(ret) | |
354 | << dendl; | |
355 | } | |
356 | done: | |
357 | fd = -1; | |
358 | return ret; | |
359 | } | |
360 | ||
361 | // This can not be used on an active journal | |
362 | int FileJournal::peek_fsid(uuid_d& fsid) | |
363 | { | |
364 | assert(fd == -1); | |
365 | int r = _open(false, false); | |
366 | if (r) | |
367 | return r; | |
368 | r = read_header(&header); | |
369 | if (r < 0) | |
370 | goto out; | |
371 | fsid = header.fsid; | |
372 | out: | |
373 | close(); | |
374 | return r; | |
375 | } | |
376 | ||
377 | int FileJournal::open(uint64_t fs_op_seq) | |
378 | { | |
379 | dout(2) << "open " << fn << " fsid " << fsid << " fs_op_seq " << fs_op_seq << dendl; | |
380 | ||
381 | uint64_t next_seq = fs_op_seq + 1; | |
224ce89b | 382 | uint64_t seq = -1; |
7c673cae FG |
383 | |
384 | int err = _open(false); | |
385 | if (err) | |
386 | return err; | |
387 | ||
388 | // assume writeable, unless... | |
389 | read_pos = 0; | |
390 | write_pos = get_top(); | |
391 | ||
392 | // read header? | |
393 | err = read_header(&header); | |
394 | if (err < 0) | |
224ce89b | 395 | goto out; |
7c673cae FG |
396 | |
397 | // static zeroed buffer for alignment padding | |
398 | delete [] zero_buf; | |
399 | zero_buf = new char[header.alignment]; | |
400 | memset(zero_buf, 0, header.alignment); | |
401 | ||
402 | dout(10) << "open header.fsid = " << header.fsid | |
403 | //<< " vs expected fsid = " << fsid | |
404 | << dendl; | |
405 | if (header.fsid != fsid) { | |
406 | derr << "FileJournal::open: ondisk fsid " << header.fsid << " doesn't match expected " << fsid | |
407 | << ", invalid (someone else's?) journal" << dendl; | |
224ce89b WB |
408 | err = -EINVAL; |
409 | goto out; | |
7c673cae FG |
410 | } |
411 | if (header.max_size > max_size) { | |
412 | dout(2) << "open journal size " << header.max_size << " > current " << max_size << dendl; | |
224ce89b WB |
413 | err = -EINVAL; |
414 | goto out; | |
7c673cae FG |
415 | } |
416 | if (header.block_size != block_size) { | |
417 | dout(2) << "open journal block size " << header.block_size << " != current " << block_size << dendl; | |
224ce89b WB |
418 | err = -EINVAL; |
419 | goto out; | |
7c673cae FG |
420 | } |
421 | if (header.max_size % header.block_size) { | |
422 | dout(2) << "open journal max size " << header.max_size | |
423 | << " not a multiple of block size " << header.block_size << dendl; | |
224ce89b WB |
424 | err = -EINVAL; |
425 | goto out; | |
7c673cae FG |
426 | } |
427 | if (header.alignment != block_size && directio) { | |
428 | dout(0) << "open journal alignment " << header.alignment << " does not match block size " | |
429 | << block_size << " (required for direct_io journal mode)" << dendl; | |
224ce89b WB |
430 | err = -EINVAL; |
431 | goto out; | |
7c673cae FG |
432 | } |
433 | if ((header.alignment % CEPH_DIRECTIO_ALIGNMENT) && directio) { | |
434 | dout(0) << "open journal alignment " << header.alignment | |
435 | << " is not multiple of minimum directio alignment " | |
436 | << CEPH_DIRECTIO_ALIGNMENT << " (required for direct_io journal mode)" | |
437 | << dendl; | |
224ce89b WB |
438 | err = -EINVAL; |
439 | goto out; | |
7c673cae FG |
440 | } |
441 | ||
442 | // looks like a valid header. | |
443 | write_pos = 0; // not writeable yet | |
444 | ||
445 | journaled_seq = header.committed_up_to; | |
446 | ||
447 | // find next entry | |
448 | read_pos = header.start; | |
224ce89b | 449 | seq = header.start_seq; |
7c673cae FG |
450 | |
451 | while (1) { | |
452 | bufferlist bl; | |
453 | off64_t old_pos = read_pos; | |
454 | if (!read_entry(bl, seq)) { | |
455 | dout(10) << "open reached end of journal." << dendl; | |
456 | break; | |
457 | } | |
458 | if (seq > next_seq) { | |
459 | dout(10) << "open entry " << seq << " len " << bl.length() << " > next_seq " << next_seq | |
460 | << ", ignoring journal contents" | |
461 | << dendl; | |
462 | read_pos = -1; | |
463 | last_committed_seq = 0; | |
464 | return 0; | |
465 | } | |
466 | if (seq == next_seq) { | |
467 | dout(10) << "open reached seq " << seq << dendl; | |
468 | read_pos = old_pos; | |
469 | break; | |
470 | } | |
471 | seq++; // next event should follow. | |
472 | } | |
473 | ||
474 | return 0; | |
224ce89b WB |
475 | out: |
476 | close(); | |
477 | return err; | |
7c673cae FG |
478 | } |
479 | ||
480 | void FileJournal::_close(int fd) const | |
481 | { | |
482 | VOID_TEMP_FAILURE_RETRY(::close(fd)); | |
483 | } | |
484 | ||
485 | void FileJournal::close() | |
486 | { | |
487 | dout(1) << "close " << fn << dendl; | |
488 | ||
489 | // stop writer thread | |
490 | stop_writer(); | |
491 | ||
492 | // close | |
493 | assert(writeq_empty()); | |
494 | assert(!must_write_header); | |
495 | assert(fd >= 0); | |
496 | _close(fd); | |
497 | fd = -1; | |
498 | } | |
499 | ||
500 | ||
501 | int FileJournal::dump(ostream& out) | |
502 | { | |
503 | return _dump(out, false); | |
504 | } | |
505 | ||
506 | int FileJournal::simple_dump(ostream& out) | |
507 | { | |
508 | return _dump(out, true); | |
509 | } | |
510 | ||
511 | int FileJournal::_dump(ostream& out, bool simple) | |
512 | { | |
513 | JSONFormatter f(true); | |
514 | int ret = _fdump(f, simple); | |
515 | f.flush(out); | |
516 | return ret; | |
517 | } | |
518 | ||
519 | int FileJournal::_fdump(Formatter &f, bool simple) | |
520 | { | |
521 | dout(10) << "_fdump" << dendl; | |
522 | ||
523 | assert(fd == -1); | |
524 | int err = _open(false, false); | |
525 | if (err) | |
526 | return err; | |
527 | ||
528 | err = read_header(&header); | |
529 | if (err < 0) { | |
530 | close(); | |
531 | return err; | |
532 | } | |
533 | ||
534 | off64_t next_pos = header.start; | |
535 | ||
536 | f.open_object_section("journal"); | |
537 | ||
538 | f.open_object_section("header"); | |
539 | f.dump_unsigned("flags", header.flags); | |
540 | ostringstream os; | |
541 | os << header.fsid; | |
542 | f.dump_string("fsid", os.str()); | |
543 | f.dump_unsigned("block_size", header.block_size); | |
544 | f.dump_unsigned("alignment", header.alignment); | |
545 | f.dump_int("max_size", header.max_size); | |
546 | f.dump_int("start", header.start); | |
547 | f.dump_unsigned("committed_up_to", header.committed_up_to); | |
548 | f.dump_unsigned("start_seq", header.start_seq); | |
549 | f.close_section(); | |
550 | ||
551 | f.open_array_section("entries"); | |
552 | uint64_t seq = header.start_seq; | |
553 | while (1) { | |
554 | bufferlist bl; | |
555 | off64_t pos = next_pos; | |
556 | ||
557 | if (!pos) { | |
558 | dout(2) << "_dump -- not readable" << dendl; | |
559 | err = -EINVAL; | |
560 | break; | |
561 | } | |
562 | stringstream ss; | |
563 | read_entry_result result = do_read_entry( | |
564 | pos, | |
565 | &next_pos, | |
566 | &bl, | |
567 | &seq, | |
568 | &ss); | |
569 | if (result != SUCCESS) { | |
570 | if (seq < header.committed_up_to) { | |
571 | dout(2) << "Unable to read past sequence " << seq | |
572 | << " but header indicates the journal has committed up through " | |
573 | << header.committed_up_to << ", journal is corrupt" << dendl; | |
574 | err = -EINVAL; | |
575 | } | |
576 | dout(25) << ss.str() << dendl; | |
577 | dout(25) << "No further valid entries found, journal is most likely valid" | |
578 | << dendl; | |
579 | break; | |
580 | } | |
581 | ||
582 | f.open_object_section("entry"); | |
583 | f.dump_unsigned("offset", pos); | |
584 | f.dump_unsigned("seq", seq); | |
585 | if (simple) { | |
586 | f.dump_unsigned("bl.length", bl.length()); | |
587 | } else { | |
588 | f.open_array_section("transactions"); | |
589 | bufferlist::iterator p = bl.begin(); | |
590 | int trans_num = 0; | |
591 | while (!p.end()) { | |
592 | ObjectStore::Transaction t(p); | |
593 | f.open_object_section("transaction"); | |
594 | f.dump_unsigned("trans_num", trans_num); | |
595 | t.dump(&f); | |
596 | f.close_section(); | |
597 | trans_num++; | |
598 | } | |
599 | f.close_section(); | |
600 | } | |
601 | f.close_section(); | |
602 | } | |
603 | ||
604 | f.close_section(); | |
605 | f.close_section(); | |
606 | dout(10) << "dump finish" << dendl; | |
607 | ||
608 | close(); | |
609 | return err; | |
610 | } | |
611 | ||
612 | ||
613 | void FileJournal::start_writer() | |
614 | { | |
615 | write_stop = false; | |
616 | aio_stop = false; | |
617 | write_thread.create("journal_write"); | |
618 | #ifdef HAVE_LIBAIO | |
619 | if (aio) | |
620 | write_finish_thread.create("journal_wrt_fin"); | |
621 | #endif | |
622 | } | |
623 | ||
624 | void FileJournal::stop_writer() | |
625 | { | |
626 | // Do nothing if writer already stopped or never started | |
627 | if (!write_stop) | |
628 | { | |
629 | { | |
630 | Mutex::Locker l(write_lock); | |
631 | Mutex::Locker p(writeq_lock); | |
632 | write_stop = true; | |
633 | writeq_cond.Signal(); | |
634 | // Doesn't hurt to signal commit_cond in case thread is waiting there | |
635 | // and caller didn't use committed_thru() first. | |
636 | commit_cond.Signal(); | |
637 | } | |
638 | write_thread.join(); | |
639 | ||
640 | // write journal header now so that we have less to replay on remount | |
641 | write_header_sync(); | |
642 | } | |
643 | ||
644 | #ifdef HAVE_LIBAIO | |
645 | // stop aio completeion thread *after* writer thread has stopped | |
646 | // and has submitted all of its io | |
647 | if (aio && !aio_stop) { | |
648 | aio_lock.Lock(); | |
649 | aio_stop = true; | |
650 | aio_cond.Signal(); | |
651 | write_finish_cond.Signal(); | |
652 | aio_lock.Unlock(); | |
653 | write_finish_thread.join(); | |
654 | } | |
655 | #endif | |
656 | } | |
657 | ||
658 | ||
659 | ||
660 | void FileJournal::print_header(const header_t &header) const | |
661 | { | |
662 | dout(10) << "header: block_size " << header.block_size | |
663 | << " alignment " << header.alignment | |
664 | << " max_size " << header.max_size | |
665 | << dendl; | |
666 | dout(10) << "header: start " << header.start << dendl; | |
667 | dout(10) << " write_pos " << write_pos << dendl; | |
668 | } | |
669 | ||
670 | int FileJournal::read_header(header_t *hdr) const | |
671 | { | |
672 | dout(10) << "read_header" << dendl; | |
673 | bufferlist bl; | |
674 | ||
675 | buffer::ptr bp = buffer::create_page_aligned(block_size); | |
676 | char* bpdata = bp.c_str(); | |
677 | int r = ::pread(fd, bpdata, bp.length(), 0); | |
678 | ||
679 | if (r < 0) { | |
680 | int err = errno; | |
681 | dout(0) << "read_header got " << cpp_strerror(err) << dendl; | |
682 | return -err; | |
683 | } | |
684 | ||
685 | // don't use bp.zero() here, because it also invalidates | |
686 | // crc cache (which is not yet populated anyway) | |
687 | if (bp.length() != (size_t)r) { | |
688 | // r will be always less or equal than bp.length | |
689 | bpdata += r; | |
690 | memset(bpdata, 0, bp.length() - r); | |
691 | } | |
692 | ||
693 | bl.push_back(std::move(bp)); | |
694 | ||
695 | try { | |
696 | bufferlist::iterator p = bl.begin(); | |
697 | ::decode(*hdr, p); | |
698 | } | |
699 | catch (buffer::error& e) { | |
700 | derr << "read_header error decoding journal header" << dendl; | |
701 | return -EINVAL; | |
702 | } | |
703 | ||
704 | ||
705 | /* | |
706 | * Unfortunately we weren't initializing the flags field for new | |
707 | * journals! Aie. This is safe(ish) now that we have only one | |
708 | * flag. Probably around when we add the next flag we need to | |
709 | * remove this or else this (eventually old) code will clobber newer | |
710 | * code's flags. | |
711 | */ | |
712 | if (hdr->flags > 3) { | |
713 | derr << "read_header appears to have gibberish flags; assuming 0" << dendl; | |
714 | hdr->flags = 0; | |
715 | } | |
716 | ||
717 | print_header(*hdr); | |
718 | ||
719 | return 0; | |
720 | } | |
721 | ||
722 | bufferptr FileJournal::prepare_header() | |
723 | { | |
724 | bufferlist bl; | |
725 | { | |
726 | Mutex::Locker l(finisher_lock); | |
727 | header.committed_up_to = journaled_seq; | |
728 | } | |
729 | ::encode(header, bl); | |
730 | bufferptr bp = buffer::create_page_aligned(get_top()); | |
731 | // don't use bp.zero() here, because it also invalidates | |
732 | // crc cache (which is not yet populated anyway) | |
733 | char* data = bp.c_str(); | |
734 | memcpy(data, bl.c_str(), bl.length()); | |
735 | data += bl.length(); | |
736 | memset(data, 0, bp.length()-bl.length()); | |
737 | return bp; | |
738 | } | |
739 | ||
740 | void FileJournal::write_header_sync() | |
741 | { | |
742 | Mutex::Locker locker(write_lock); | |
743 | must_write_header = true; | |
744 | bufferlist bl; | |
745 | do_write(bl); | |
746 | dout(20) << __func__ << " finish" << dendl; | |
747 | } | |
748 | ||
749 | int FileJournal::check_for_full(uint64_t seq, off64_t pos, off64_t size) | |
750 | { | |
751 | // already full? | |
752 | if (full_state != FULL_NOTFULL) | |
753 | return -ENOSPC; | |
754 | ||
755 | // take 1 byte off so that we only get pos == header.start on EMPTY, never on FULL. | |
756 | off64_t room; | |
757 | if (pos >= header.start) | |
758 | room = (header.max_size - pos) + (header.start - get_top()) - 1; | |
759 | else | |
760 | room = header.start - pos - 1; | |
761 | dout(10) << "room " << room << " max_size " << max_size << " pos " << pos << " header.start " << header.start | |
762 | << " top " << get_top() << dendl; | |
763 | ||
764 | if (do_sync_cond) { | |
765 | if (room >= (header.max_size >> 1) && | |
766 | room - size < (header.max_size >> 1)) { | |
767 | dout(10) << " passing half full mark, triggering commit" << dendl; | |
768 | do_sync_cond->SloppySignal(); // initiate a real commit so we can trim | |
769 | } | |
770 | } | |
771 | ||
772 | if (room >= size) { | |
773 | dout(10) << "check_for_full at " << pos << " : " << size << " < " << room << dendl; | |
774 | if (pos + size > header.max_size) | |
775 | must_write_header = true; | |
776 | return 0; | |
777 | } | |
778 | ||
779 | // full | |
780 | dout(1) << "check_for_full at " << pos << " : JOURNAL FULL " | |
781 | << pos << " >= " << room | |
782 | << " (max_size " << header.max_size << " start " << header.start << ")" | |
783 | << dendl; | |
784 | ||
785 | off64_t max = header.max_size - get_top(); | |
786 | if (size > max) | |
787 | dout(0) << "JOURNAL TOO SMALL: continuing, but slow: item " << size << " > journal " << max << " (usable)" << dendl; | |
788 | ||
789 | return -ENOSPC; | |
790 | } | |
791 | ||
792 | int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytes) | |
793 | { | |
794 | // gather queued writes | |
795 | off64_t queue_pos = write_pos; | |
796 | ||
797 | int eleft = cct->_conf->journal_max_write_entries; | |
798 | unsigned bmax = cct->_conf->journal_max_write_bytes; | |
799 | ||
800 | if (full_state != FULL_NOTFULL) | |
801 | return -ENOSPC; | |
802 | ||
803 | while (!writeq_empty()) { | |
804 | list<write_item> items; | |
805 | batch_pop_write(items); | |
806 | list<write_item>::iterator it = items.begin(); | |
807 | while (it != items.end()) { | |
808 | uint64_t bytes = it->bl.length(); | |
809 | int r = prepare_single_write(*it, bl, queue_pos, orig_ops, orig_bytes); | |
810 | if (r == 0) { // prepare ok, delete it | |
811 | items.erase(it++); | |
812 | #ifdef HAVE_LIBAIO | |
813 | { | |
814 | Mutex::Locker locker(aio_lock); | |
815 | assert(aio_write_queue_ops > 0); | |
816 | aio_write_queue_ops--; | |
817 | assert(aio_write_queue_bytes >= bytes); | |
818 | aio_write_queue_bytes -= bytes; | |
819 | } | |
820 | #else | |
821 | (void)bytes; | |
822 | #endif | |
823 | } | |
824 | if (r == -ENOSPC) { | |
825 | // the journal maybe full, insert the left item to writeq | |
826 | batch_unpop_write(items); | |
827 | if (orig_ops) | |
828 | goto out; // commit what we have | |
829 | ||
830 | if (logger) | |
831 | logger->inc(l_filestore_journal_full); | |
832 | ||
833 | if (wait_on_full) { | |
834 | dout(20) << "prepare_multi_write full on first entry, need to wait" << dendl; | |
835 | } else { | |
836 | dout(20) << "prepare_multi_write full on first entry, restarting journal" << dendl; | |
837 | ||
838 | // throw out what we have so far | |
839 | full_state = FULL_FULL; | |
840 | while (!writeq_empty()) { | |
841 | complete_write(1, peek_write().orig_len); | |
842 | pop_write(); | |
843 | } | |
844 | print_header(header); | |
845 | } | |
224ce89b | 846 | |
7c673cae FG |
847 | return -ENOSPC; // hrm, full on first op |
848 | } | |
849 | if (eleft) { | |
850 | if (--eleft == 0) { | |
851 | dout(20) << "prepare_multi_write hit max events per write " | |
852 | << cct->_conf->journal_max_write_entries << dendl; | |
853 | batch_unpop_write(items); | |
854 | goto out; | |
855 | } | |
856 | } | |
857 | if (bmax) { | |
858 | if (bl.length() >= bmax) { | |
859 | dout(20) << "prepare_multi_write hit max write size " | |
860 | << cct->_conf->journal_max_write_bytes << dendl; | |
861 | batch_unpop_write(items); | |
862 | goto out; | |
863 | } | |
864 | } | |
865 | } | |
866 | } | |
867 | ||
868 | out: | |
869 | dout(20) << "prepare_multi_write queue_pos now " << queue_pos << dendl; | |
870 | assert((write_pos + bl.length() == queue_pos) || | |
871 | (write_pos + bl.length() - header.max_size + get_top() == queue_pos)); | |
872 | return 0; | |
873 | } | |
874 | ||
875 | /* | |
876 | void FileJournal::queue_write_fin(uint64_t seq, Context *fin) | |
877 | { | |
878 | writing_seq.push_back(seq); | |
879 | if (!waiting_for_notfull.empty()) { | |
880 | // make sure previously unjournaled stuff waiting for UNFULL triggers | |
881 | // _before_ newly journaled stuff does | |
882 | dout(10) << "queue_write_fin will defer seq " << seq << " callback " << fin | |
883 | << " until after UNFULL" << dendl; | |
884 | C_Gather *g = new C_Gather(writeq.front().fin); | |
885 | writing_fin.push_back(g->new_sub()); | |
886 | waiting_for_notfull.push_back(g->new_sub()); | |
887 | } else { | |
888 | writing_fin.push_back(writeq.front().fin); | |
889 | dout(20) << "queue_write_fin seq " << seq << " callback " << fin << dendl; | |
890 | } | |
891 | } | |
892 | */ | |
893 | ||
894 | void FileJournal::queue_completions_thru(uint64_t seq) | |
895 | { | |
896 | assert(finisher_lock.is_locked()); | |
897 | utime_t now = ceph_clock_now(); | |
898 | list<completion_item> items; | |
899 | batch_pop_completions(items); | |
900 | list<completion_item>::iterator it = items.begin(); | |
901 | while (it != items.end()) { | |
902 | completion_item& next = *it; | |
903 | if (next.seq > seq) | |
904 | break; | |
905 | utime_t lat = now; | |
906 | lat -= next.start; | |
907 | dout(10) << "queue_completions_thru seq " << seq | |
908 | << " queueing seq " << next.seq | |
909 | << " " << next.finish | |
910 | << " lat " << lat << dendl; | |
911 | if (logger) { | |
912 | logger->tinc(l_filestore_journal_latency, lat); | |
913 | } | |
914 | if (next.finish) | |
915 | finisher->queue(next.finish); | |
916 | if (next.tracked_op) { | |
917 | next.tracked_op->mark_event("journaled_completion_queued"); | |
918 | next.tracked_op->journal_trace.event("queued completion"); | |
919 | next.tracked_op->journal_trace.keyval("completed through", seq); | |
920 | } | |
921 | items.erase(it++); | |
922 | } | |
923 | batch_unpop_completions(items); | |
924 | finisher_cond.Signal(); | |
925 | } | |
926 | ||
927 | ||
928 | int FileJournal::prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes) | |
929 | { | |
930 | uint64_t seq = next_write.seq; | |
931 | bufferlist &ebl = next_write.bl; | |
932 | off64_t size = ebl.length(); | |
933 | ||
934 | int r = check_for_full(seq, queue_pos, size); | |
935 | if (r < 0) | |
936 | return r; // ENOSPC or EAGAIN | |
937 | ||
938 | uint32_t orig_len = next_write.orig_len; | |
939 | orig_bytes += orig_len; | |
940 | orig_ops++; | |
941 | ||
942 | // add to write buffer | |
943 | dout(15) << "prepare_single_write " << orig_ops << " will write " << queue_pos << " : seq " << seq | |
944 | << " len " << orig_len << " -> " << size << dendl; | |
945 | ||
946 | unsigned seq_offset = offsetof(entry_header_t, seq); | |
947 | unsigned magic1_offset = offsetof(entry_header_t, magic1); | |
948 | unsigned magic2_offset = offsetof(entry_header_t, magic2); | |
949 | ||
950 | bufferptr headerptr = ebl.buffers().front(); | |
951 | uint64_t _seq = seq; | |
952 | uint64_t _queue_pos = queue_pos; | |
953 | uint64_t magic2 = entry_header_t::make_magic(seq, orig_len, header.get_fsid64()); | |
954 | headerptr.copy_in(seq_offset, sizeof(uint64_t), (char *)&_seq); | |
955 | headerptr.copy_in(magic1_offset, sizeof(uint64_t), (char *)&_queue_pos); | |
956 | headerptr.copy_in(magic2_offset, sizeof(uint64_t), (char *)&magic2); | |
957 | ||
958 | bufferptr footerptr = ebl.buffers().back(); | |
959 | unsigned post_offset = footerptr.length() - sizeof(entry_header_t); | |
960 | footerptr.copy_in(post_offset + seq_offset, sizeof(uint64_t), (char *)&_seq); | |
961 | footerptr.copy_in(post_offset + magic1_offset, sizeof(uint64_t), (char *)&_queue_pos); | |
962 | footerptr.copy_in(post_offset + magic2_offset, sizeof(uint64_t), (char *)&magic2); | |
963 | ||
964 | bl.claim_append(ebl); | |
965 | if (next_write.tracked_op) { | |
966 | next_write.tracked_op->mark_event("write_thread_in_journal_buffer"); | |
967 | next_write.tracked_op->journal_trace.event("prepare_single_write"); | |
968 | } | |
969 | ||
970 | journalq.push_back(pair<uint64_t,off64_t>(seq, queue_pos)); | |
971 | writing_seq = seq; | |
972 | ||
973 | queue_pos += size; | |
974 | if (queue_pos >= header.max_size) | |
975 | queue_pos = queue_pos + get_top() - header.max_size; | |
976 | ||
977 | return 0; | |
978 | } | |
979 | ||
980 | void FileJournal::check_align(off64_t pos, bufferlist& bl) | |
981 | { | |
982 | // make sure list segments are page aligned | |
983 | if (directio && !bl.is_aligned_size_and_memory(block_size, CEPH_DIRECTIO_ALIGNMENT)) { | |
984 | assert((bl.length() & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0); | |
985 | assert((pos & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0); | |
986 | assert(0 == "bl was not aligned"); | |
987 | } | |
988 | } | |
989 | ||
990 | int FileJournal::write_bl(off64_t& pos, bufferlist& bl) | |
991 | { | |
992 | int ret; | |
993 | ||
994 | off64_t spos = ::lseek64(fd, pos, SEEK_SET); | |
995 | if (spos < 0) { | |
996 | ret = -errno; | |
997 | derr << "FileJournal::write_bl : lseek64 failed " << cpp_strerror(ret) << dendl; | |
998 | return ret; | |
999 | } | |
1000 | ret = bl.write_fd(fd); | |
1001 | if (ret) { | |
1002 | derr << "FileJournal::write_bl : write_fd failed: " << cpp_strerror(ret) << dendl; | |
1003 | return ret; | |
1004 | } | |
1005 | pos += bl.length(); | |
1006 | if (pos == header.max_size) | |
1007 | pos = get_top(); | |
1008 | return 0; | |
1009 | } | |
1010 | ||
1011 | void FileJournal::do_write(bufferlist& bl) | |
1012 | { | |
1013 | // nothing to do? | |
1014 | if (bl.length() == 0 && !must_write_header) | |
1015 | return; | |
1016 | ||
1017 | buffer::ptr hbp; | |
1018 | if (cct->_conf->journal_write_header_frequency && | |
1019 | (((++journaled_since_start) % | |
1020 | cct->_conf->journal_write_header_frequency) == 0)) { | |
1021 | must_write_header = true; | |
1022 | } | |
1023 | ||
1024 | if (must_write_header) { | |
1025 | must_write_header = false; | |
1026 | hbp = prepare_header(); | |
1027 | } | |
1028 | ||
1029 | dout(15) << "do_write writing " << write_pos << "~" << bl.length() | |
1030 | << (hbp.length() ? " + header":"") | |
1031 | << dendl; | |
1032 | ||
1033 | utime_t from = ceph_clock_now(); | |
1034 | ||
1035 | // entry | |
1036 | off64_t pos = write_pos; | |
1037 | ||
1038 | // Adjust write_pos | |
1039 | write_pos += bl.length(); | |
1040 | if (write_pos >= header.max_size) | |
1041 | write_pos = write_pos - header.max_size + get_top(); | |
1042 | ||
1043 | write_lock.Unlock(); | |
1044 | ||
1045 | // split? | |
1046 | off64_t split = 0; | |
1047 | if (pos + bl.length() > header.max_size) { | |
1048 | bufferlist first, second; | |
1049 | split = header.max_size - pos; | |
1050 | first.substr_of(bl, 0, split); | |
1051 | second.substr_of(bl, split, bl.length() - split); | |
1052 | assert(first.length() + second.length() == bl.length()); | |
1053 | dout(10) << "do_write wrapping, first bit at " << pos << " len " << first.length() | |
1054 | << " second bit len " << second.length() << " (orig len " << bl.length() << ")" << dendl; | |
1055 | ||
1056 | //Save pos to write first piece second | |
1057 | off64_t first_pos = pos; | |
1058 | off64_t orig_pos; | |
1059 | pos = get_top(); | |
1060 | // header too? | |
1061 | if (hbp.length()) { | |
1062 | // be sneaky: include the header in the second fragment | |
1063 | second.push_front(hbp); | |
1064 | pos = 0; // we included the header | |
1065 | } | |
1066 | // Write the second portion first possible with the header, so | |
1067 | // do_read_entry() won't even get a valid entry_header_t if there | |
1068 | // is a crash between the two writes. | |
1069 | orig_pos = pos; | |
1070 | if (write_bl(pos, second)) { | |
1071 | derr << "FileJournal::do_write: write_bl(pos=" << orig_pos | |
1072 | << ") failed" << dendl; | |
1073 | check_align(pos, second); | |
1074 | ceph_abort(); | |
1075 | } | |
1076 | orig_pos = first_pos; | |
1077 | if (write_bl(first_pos, first)) { | |
1078 | derr << "FileJournal::do_write: write_bl(pos=" << orig_pos | |
1079 | << ") failed" << dendl; | |
1080 | check_align(first_pos, first); | |
1081 | ceph_abort(); | |
1082 | } | |
1083 | assert(first_pos == get_top()); | |
1084 | } else { | |
1085 | // header too? | |
1086 | if (hbp.length()) { | |
1087 | if (TEMP_FAILURE_RETRY(::pwrite(fd, hbp.c_str(), hbp.length(), 0)) < 0) { | |
1088 | int err = errno; | |
1089 | derr << "FileJournal::do_write: pwrite(fd=" << fd | |
1090 | << ", hbp.length=" << hbp.length() << ") failed :" | |
1091 | << cpp_strerror(err) << dendl; | |
1092 | ceph_abort(); | |
1093 | } | |
1094 | } | |
1095 | ||
1096 | if (write_bl(pos, bl)) { | |
1097 | derr << "FileJournal::do_write: write_bl(pos=" << pos | |
1098 | << ") failed" << dendl; | |
1099 | check_align(pos, bl); | |
1100 | ceph_abort(); | |
1101 | } | |
1102 | } | |
1103 | ||
1104 | if (!directio) { | |
1105 | dout(20) << "do_write fsync" << dendl; | |
1106 | ||
1107 | /* | |
1108 | * We'd really love to have a fsync_range or fdatasync_range and do a: | |
1109 | * | |
1110 | * if (split) { | |
1111 | * ::fsync_range(fd, header.max_size - split, split)l | |
1112 | * ::fsync_range(fd, get_top(), bl.length() - split); | |
1113 | * else | |
1114 | * ::fsync_range(fd, write_pos, bl.length()) | |
1115 | * | |
1116 | * NetBSD and AIX apparently have it, and adding it to Linux wouldn't be | |
1117 | * too hard given all the underlying infrastructure already exist. | |
1118 | * | |
1119 | * NOTE: using sync_file_range here would not be safe as it does not | |
1120 | * flush disk caches or commits any sort of metadata. | |
1121 | */ | |
1122 | int ret = 0; | |
1123 | #if defined(DARWIN) || defined(__FreeBSD__) | |
1124 | ret = ::fsync(fd); | |
1125 | #else | |
1126 | ret = ::fdatasync(fd); | |
1127 | #endif | |
1128 | if (ret < 0) { | |
1129 | derr << __func__ << " fsync/fdatasync failed: " << cpp_strerror(errno) << dendl; | |
1130 | ceph_abort(); | |
1131 | } | |
1132 | #ifdef HAVE_POSIX_FADVISE | |
1133 | if (cct->_conf->filestore_fadvise) | |
1134 | posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED); | |
1135 | #endif | |
1136 | } | |
1137 | ||
1138 | utime_t lat = ceph_clock_now() - from; | |
1139 | dout(20) << "do_write latency " << lat << dendl; | |
1140 | ||
1141 | write_lock.Lock(); | |
1142 | ||
1143 | assert(write_pos == pos); | |
1144 | assert(write_pos % header.alignment == 0); | |
1145 | ||
1146 | { | |
1147 | Mutex::Locker locker(finisher_lock); | |
1148 | journaled_seq = writing_seq; | |
1149 | ||
1150 | // kick finisher? | |
1151 | // only if we haven't filled up recently! | |
1152 | if (full_state != FULL_NOTFULL) { | |
1153 | dout(10) << "do_write NOT queueing finisher seq " << journaled_seq | |
1154 | << ", full_commit_seq|full_restart_seq" << dendl; | |
1155 | } else { | |
1156 | if (plug_journal_completions) { | |
1157 | dout(20) << "do_write NOT queueing finishers through seq " << journaled_seq | |
1158 | << " due to completion plug" << dendl; | |
1159 | } else { | |
1160 | dout(20) << "do_write queueing finishers through seq " << journaled_seq << dendl; | |
1161 | queue_completions_thru(journaled_seq); | |
1162 | } | |
1163 | } | |
1164 | } | |
1165 | } | |
1166 | ||
1167 | void FileJournal::flush() | |
1168 | { | |
1169 | dout(10) << "waiting for completions to empty" << dendl; | |
1170 | { | |
1171 | Mutex::Locker l(finisher_lock); | |
1172 | while (!completions_empty()) | |
1173 | finisher_cond.Wait(finisher_lock); | |
1174 | } | |
1175 | dout(10) << "flush waiting for finisher" << dendl; | |
1176 | finisher->wait_for_empty(); | |
1177 | dout(10) << "flush done" << dendl; | |
1178 | } | |
1179 | ||
1180 | ||
1181 | void FileJournal::write_thread_entry() | |
1182 | { | |
1183 | dout(10) << "write_thread_entry start" << dendl; | |
1184 | while (1) { | |
1185 | { | |
1186 | Mutex::Locker locker(writeq_lock); | |
1187 | if (writeq.empty() && !must_write_header) { | |
1188 | if (write_stop) | |
1189 | break; | |
1190 | dout(20) << "write_thread_entry going to sleep" << dendl; | |
1191 | writeq_cond.Wait(writeq_lock); | |
1192 | dout(20) << "write_thread_entry woke up" << dendl; | |
1193 | continue; | |
1194 | } | |
1195 | } | |
1196 | ||
1197 | #ifdef HAVE_LIBAIO | |
1198 | if (aio) { | |
1199 | Mutex::Locker locker(aio_lock); | |
1200 | // should we back off to limit aios in flight? try to do this | |
1201 | // adaptively so that we submit larger aios once we have lots of | |
1202 | // them in flight. | |
1203 | // | |
1204 | // NOTE: our condition here is based on aio_num (protected by | |
1205 | // aio_lock) and throttle_bytes (part of the write queue). when | |
1206 | // we sleep, we *only* wait for aio_num to change, and do not | |
1207 | // wake when more data is queued. this is not strictly correct, | |
1208 | // but should be fine given that we will have plenty of aios in | |
1209 | // flight if we hit this limit to ensure we keep the device | |
1210 | // saturated. | |
1211 | while (aio_num > 0) { | |
1212 | int exp = MIN(aio_num * 2, 24); | |
1213 | long unsigned min_new = 1ull << exp; | |
1214 | uint64_t cur = aio_write_queue_bytes; | |
1215 | dout(20) << "write_thread_entry aio throttle: aio num " << aio_num << " bytes " << aio_bytes | |
1216 | << " ... exp " << exp << " min_new " << min_new | |
1217 | << " ... pending " << cur << dendl; | |
1218 | if (cur >= min_new) | |
1219 | break; | |
1220 | dout(20) << "write_thread_entry deferring until more aios complete: " | |
1221 | << aio_num << " aios with " << aio_bytes << " bytes needs " << min_new | |
1222 | << " bytes to start a new aio (currently " << cur << " pending)" << dendl; | |
1223 | aio_cond.Wait(aio_lock); | |
1224 | dout(20) << "write_thread_entry woke up" << dendl; | |
1225 | } | |
1226 | } | |
1227 | #endif | |
1228 | ||
1229 | Mutex::Locker locker(write_lock); | |
1230 | uint64_t orig_ops = 0; | |
1231 | uint64_t orig_bytes = 0; | |
1232 | ||
1233 | bufferlist bl; | |
1234 | int r = prepare_multi_write(bl, orig_ops, orig_bytes); | |
1235 | // Don't care about journal full if stoppping, so drop queue and | |
1236 | // possibly let header get written and loop above to notice stop | |
1237 | if (r == -ENOSPC) { | |
1238 | if (write_stop) { | |
1239 | dout(20) << "write_thread_entry full and stopping, throw out queue and finish up" << dendl; | |
1240 | while (!writeq_empty()) { | |
1241 | complete_write(1, peek_write().orig_len); | |
1242 | pop_write(); | |
1243 | } | |
1244 | print_header(header); | |
1245 | r = 0; | |
1246 | } else { | |
1247 | dout(20) << "write_thread_entry full, going to sleep (waiting for commit)" << dendl; | |
1248 | commit_cond.Wait(write_lock); | |
1249 | dout(20) << "write_thread_entry woke up" << dendl; | |
1250 | continue; | |
1251 | } | |
1252 | } | |
1253 | assert(r == 0); | |
1254 | ||
1255 | if (logger) { | |
1256 | logger->inc(l_filestore_journal_wr); | |
1257 | logger->inc(l_filestore_journal_wr_bytes, bl.length()); | |
1258 | } | |
1259 | ||
1260 | #ifdef HAVE_LIBAIO | |
1261 | if (aio) | |
1262 | do_aio_write(bl); | |
1263 | else | |
1264 | do_write(bl); | |
1265 | #else | |
1266 | do_write(bl); | |
1267 | #endif | |
1268 | complete_write(orig_ops, orig_bytes); | |
1269 | } | |
1270 | ||
1271 | dout(10) << "write_thread_entry finish" << dendl; | |
1272 | } | |
1273 | ||
1274 | #ifdef HAVE_LIBAIO | |
1275 | void FileJournal::do_aio_write(bufferlist& bl) | |
1276 | { | |
1277 | ||
1278 | if (cct->_conf->journal_write_header_frequency && | |
1279 | (((++journaled_since_start) % | |
1280 | cct->_conf->journal_write_header_frequency) == 0)) { | |
1281 | must_write_header = true; | |
1282 | } | |
1283 | ||
1284 | // nothing to do? | |
1285 | if (bl.length() == 0 && !must_write_header) | |
1286 | return; | |
1287 | ||
1288 | buffer::ptr hbp; | |
1289 | if (must_write_header) { | |
1290 | must_write_header = false; | |
1291 | hbp = prepare_header(); | |
1292 | } | |
1293 | ||
1294 | // entry | |
1295 | off64_t pos = write_pos; | |
1296 | ||
1297 | dout(15) << "do_aio_write writing " << pos << "~" << bl.length() | |
1298 | << (hbp.length() ? " + header":"") | |
1299 | << dendl; | |
1300 | ||
1301 | // split? | |
1302 | off64_t split = 0; | |
1303 | if (pos + bl.length() > header.max_size) { | |
1304 | bufferlist first, second; | |
1305 | split = header.max_size - pos; | |
1306 | first.substr_of(bl, 0, split); | |
1307 | second.substr_of(bl, split, bl.length() - split); | |
1308 | assert(first.length() + second.length() == bl.length()); | |
1309 | dout(10) << "do_aio_write wrapping, first bit at " << pos << "~" << first.length() << dendl; | |
1310 | ||
1311 | if (write_aio_bl(pos, first, 0)) { | |
1312 | derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos | |
1313 | << ") failed" << dendl; | |
1314 | ceph_abort(); | |
1315 | } | |
1316 | assert(pos == header.max_size); | |
1317 | if (hbp.length()) { | |
1318 | // be sneaky: include the header in the second fragment | |
1319 | second.push_front(hbp); | |
1320 | pos = 0; // we included the header | |
1321 | } else | |
1322 | pos = get_top(); // no header, start after that | |
1323 | if (write_aio_bl(pos, second, writing_seq)) { | |
1324 | derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos | |
1325 | << ") failed" << dendl; | |
1326 | ceph_abort(); | |
1327 | } | |
1328 | } else { | |
1329 | // header too? | |
1330 | if (hbp.length()) { | |
1331 | bufferlist hbl; | |
1332 | hbl.push_back(hbp); | |
1333 | loff_t pos = 0; | |
1334 | if (write_aio_bl(pos, hbl, 0)) { | |
1335 | derr << "FileJournal::do_aio_write: write_aio_bl(header) failed" << dendl; | |
1336 | ceph_abort(); | |
1337 | } | |
1338 | } | |
1339 | ||
1340 | if (write_aio_bl(pos, bl, writing_seq)) { | |
1341 | derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos | |
1342 | << ") failed" << dendl; | |
1343 | ceph_abort(); | |
1344 | } | |
1345 | } | |
1346 | ||
1347 | write_pos = pos; | |
1348 | if (write_pos == header.max_size) | |
1349 | write_pos = get_top(); | |
1350 | assert(write_pos % header.alignment == 0); | |
1351 | } | |
1352 | ||
1353 | /** | |
1354 | * write a buffer using aio | |
1355 | * | |
1356 | * @param seq seq to trigger when this aio completes. if 0, do not update any state | |
1357 | * on completion. | |
1358 | */ | |
1359 | int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq) | |
1360 | { | |
1361 | dout(20) << "write_aio_bl " << pos << "~" << bl.length() << " seq " << seq << dendl; | |
1362 | ||
1363 | while (bl.length() > 0) { | |
1364 | int max = MIN(bl.get_num_buffers(), IOV_MAX-1); | |
1365 | iovec *iov = new iovec[max]; | |
1366 | int n = 0; | |
1367 | unsigned len = 0; | |
1368 | for (std::list<buffer::ptr>::const_iterator p = bl.buffers().begin(); | |
1369 | n < max; | |
1370 | ++p, ++n) { | |
1371 | assert(p != bl.buffers().end()); | |
1372 | iov[n].iov_base = (void *)p->c_str(); | |
1373 | iov[n].iov_len = p->length(); | |
1374 | len += p->length(); | |
1375 | } | |
1376 | ||
1377 | bufferlist tbl; | |
1378 | bl.splice(0, len, &tbl); // move bytes from bl -> tbl | |
1379 | ||
1380 | // lock only aio_queue, current aio, aio_num, aio_bytes, which may be | |
1381 | // modified in check_aio_completion | |
1382 | aio_lock.Lock(); | |
1383 | aio_queue.push_back(aio_info(tbl, pos, bl.length() > 0 ? 0 : seq)); | |
1384 | aio_info& aio = aio_queue.back(); | |
1385 | aio.iov = iov; | |
1386 | ||
1387 | io_prep_pwritev(&aio.iocb, fd, aio.iov, n, pos); | |
1388 | ||
1389 | dout(20) << "write_aio_bl .. " << aio.off << "~" << aio.len | |
1390 | << " in " << n << dendl; | |
1391 | ||
1392 | aio_num++; | |
1393 | aio_bytes += aio.len; | |
1394 | ||
1395 | // need to save current aio len to update write_pos later because current | |
1396 | // aio could be ereased from aio_queue once it is done | |
1397 | uint64_t cur_len = aio.len; | |
1398 | // unlock aio_lock because following io_submit might take time to return | |
1399 | aio_lock.Unlock(); | |
1400 | ||
1401 | iocb *piocb = &aio.iocb; | |
224ce89b | 1402 | |
7c673cae FG |
1403 | // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds |
1404 | int attempts = 16; | |
1405 | int delay = 125; | |
1406 | do { | |
1407 | int r = io_submit(aio_ctx, 1, &piocb); | |
1408 | dout(20) << "write_aio_bl io_submit return value: " << r << dendl; | |
1409 | if (r < 0) { | |
1410 | derr << "io_submit to " << aio.off << "~" << cur_len | |
1411 | << " got " << cpp_strerror(r) << dendl; | |
1412 | if (r == -EAGAIN && attempts-- > 0) { | |
1413 | usleep(delay); | |
1414 | delay *= 2; | |
1415 | continue; | |
1416 | } | |
1417 | check_align(pos, tbl); | |
1418 | assert(0 == "io_submit got unexpected error"); | |
1419 | } else { | |
1420 | break; | |
1421 | } | |
1422 | } while (true); | |
1423 | pos += cur_len; | |
1424 | } | |
1425 | aio_lock.Lock(); | |
1426 | write_finish_cond.Signal(); | |
1427 | aio_lock.Unlock(); | |
1428 | return 0; | |
1429 | } | |
1430 | #endif | |
1431 | ||
1432 | void FileJournal::write_finish_thread_entry() | |
1433 | { | |
1434 | #ifdef HAVE_LIBAIO | |
b32b8144 | 1435 | dout(10) << __func__ << " enter" << dendl; |
7c673cae FG |
1436 | while (true) { |
1437 | { | |
1438 | Mutex::Locker locker(aio_lock); | |
1439 | if (aio_queue.empty()) { | |
1440 | if (aio_stop) | |
1441 | break; | |
b32b8144 | 1442 | dout(20) << __func__ << " sleeping" << dendl; |
7c673cae FG |
1443 | write_finish_cond.Wait(aio_lock); |
1444 | continue; | |
1445 | } | |
1446 | } | |
1447 | ||
b32b8144 | 1448 | dout(20) << __func__ << " waiting for aio(s)" << dendl; |
7c673cae FG |
1449 | io_event event[16]; |
1450 | int r = io_getevents(aio_ctx, 1, 16, event, NULL); | |
1451 | if (r < 0) { | |
1452 | if (r == -EINTR) { | |
1453 | dout(0) << "io_getevents got " << cpp_strerror(r) << dendl; | |
1454 | continue; | |
1455 | } | |
1456 | derr << "io_getevents got " << cpp_strerror(r) << dendl; | |
1457 | assert(0 == "got unexpected error from io_getevents"); | |
1458 | } | |
1459 | ||
1460 | { | |
1461 | Mutex::Locker locker(aio_lock); | |
1462 | for (int i=0; i<r; i++) { | |
1463 | aio_info *ai = (aio_info *)event[i].obj; | |
1464 | if (event[i].res != ai->len) { | |
1465 | derr << "aio to " << ai->off << "~" << ai->len | |
1466 | << " returned: " << (int)event[i].res << dendl; | |
1467 | assert(0 == "unexpected aio error"); | |
1468 | } | |
b32b8144 | 1469 | dout(10) << __func__ << " aio " << ai->off |
7c673cae FG |
1470 | << "~" << ai->len << " done" << dendl; |
1471 | ai->done = true; | |
1472 | } | |
1473 | check_aio_completion(); | |
1474 | } | |
1475 | } | |
b32b8144 | 1476 | dout(10) << __func__ << " exit" << dendl; |
7c673cae FG |
1477 | #endif |
1478 | } | |
1479 | ||
1480 | #ifdef HAVE_LIBAIO | |
1481 | /** | |
1482 | * check aio_wait for completed aio, and update state appropriately. | |
1483 | */ | |
1484 | void FileJournal::check_aio_completion() | |
1485 | { | |
1486 | assert(aio_lock.is_locked()); | |
1487 | dout(20) << "check_aio_completion" << dendl; | |
1488 | ||
1489 | bool completed_something = false, signal = false; | |
1490 | uint64_t new_journaled_seq = 0; | |
1491 | ||
1492 | list<aio_info>::iterator p = aio_queue.begin(); | |
1493 | while (p != aio_queue.end() && p->done) { | |
1494 | dout(20) << "check_aio_completion completed seq " << p->seq << " " | |
1495 | << p->off << "~" << p->len << dendl; | |
1496 | if (p->seq) { | |
1497 | new_journaled_seq = p->seq; | |
1498 | completed_something = true; | |
1499 | } | |
1500 | aio_num--; | |
1501 | aio_bytes -= p->len; | |
1502 | aio_queue.erase(p++); | |
1503 | signal = true; | |
1504 | } | |
1505 | ||
1506 | if (completed_something) { | |
1507 | // kick finisher? | |
1508 | // only if we haven't filled up recently! | |
1509 | Mutex::Locker locker(finisher_lock); | |
1510 | journaled_seq = new_journaled_seq; | |
1511 | if (full_state != FULL_NOTFULL) { | |
1512 | dout(10) << "check_aio_completion NOT queueing finisher seq " << journaled_seq | |
1513 | << ", full_commit_seq|full_restart_seq" << dendl; | |
1514 | } else { | |
1515 | if (plug_journal_completions) { | |
1516 | dout(20) << "check_aio_completion NOT queueing finishers through seq " << journaled_seq | |
1517 | << " due to completion plug" << dendl; | |
1518 | } else { | |
1519 | dout(20) << "check_aio_completion queueing finishers through seq " << journaled_seq << dendl; | |
1520 | queue_completions_thru(journaled_seq); | |
1521 | } | |
1522 | } | |
1523 | } | |
1524 | if (signal) { | |
1525 | // maybe write queue was waiting for aio count to drop? | |
1526 | aio_cond.Signal(); | |
1527 | } | |
1528 | } | |
1529 | #endif | |
1530 | ||
1531 | int FileJournal::prepare_entry(vector<ObjectStore::Transaction>& tls, bufferlist* tbl) { | |
1532 | dout(10) << "prepare_entry " << tls << dendl; | |
1533 | int data_len = cct->_conf->journal_align_min_size - 1; | |
1534 | int data_align = -1; // -1 indicates that we don't care about the alignment | |
1535 | bufferlist bl; | |
1536 | for (vector<ObjectStore::Transaction>::iterator p = tls.begin(); | |
1537 | p != tls.end(); ++p) { | |
1538 | if ((int)(*p).get_data_length() > data_len) { | |
1539 | data_len = (*p).get_data_length(); | |
1540 | data_align = ((*p).get_data_alignment() - bl.length()) & ~CEPH_PAGE_MASK; | |
1541 | } | |
1542 | ::encode(*p, bl); | |
1543 | } | |
1544 | if (tbl->length()) { | |
1545 | bl.claim_append(*tbl); | |
1546 | } | |
1547 | // add it this entry | |
1548 | entry_header_t h; | |
1549 | unsigned head_size = sizeof(entry_header_t); | |
1550 | off64_t base_size = 2*head_size + bl.length(); | |
1551 | memset(&h, 0, sizeof(h)); | |
1552 | if (data_align >= 0) | |
1553 | h.pre_pad = ((unsigned int)data_align - (unsigned int)head_size) & ~CEPH_PAGE_MASK; | |
1554 | off64_t size = ROUND_UP_TO(base_size + h.pre_pad, header.alignment); | |
1555 | unsigned post_pad = size - base_size - h.pre_pad; | |
1556 | h.len = bl.length(); | |
1557 | h.post_pad = post_pad; | |
1558 | h.crc32c = bl.crc32c(0); | |
1559 | dout(10) << " len " << bl.length() << " -> " << size | |
1560 | << " (head " << head_size << " pre_pad " << h.pre_pad | |
1561 | << " bl " << bl.length() << " post_pad " << post_pad << " tail " << head_size << ")" | |
1562 | << " (bl alignment " << data_align << ")" | |
1563 | << dendl; | |
1564 | bufferlist ebl; | |
1565 | // header | |
1566 | ebl.append((const char*)&h, sizeof(h)); | |
1567 | if (h.pre_pad) { | |
1568 | ebl.push_back(buffer::create_static(h.pre_pad, zero_buf)); | |
1569 | } | |
1570 | // payload | |
1571 | ebl.claim_append(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy | |
1572 | if (h.post_pad) { | |
1573 | ebl.push_back(buffer::create_static(h.post_pad, zero_buf)); | |
1574 | } | |
1575 | // footer | |
1576 | ebl.append((const char*)&h, sizeof(h)); | |
1577 | if (directio) | |
1578 | ebl.rebuild_aligned(CEPH_DIRECTIO_ALIGNMENT); | |
1579 | tbl->claim(ebl); | |
1580 | return h.len; | |
1581 | } | |
1582 | ||
1583 | void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len, | |
1584 | Context *oncommit, TrackedOpRef osd_op) | |
1585 | { | |
1586 | // dump on queue | |
1587 | dout(5) << "submit_entry seq " << seq | |
1588 | << " len " << e.length() | |
1589 | << " (" << oncommit << ")" << dendl; | |
1590 | assert(e.length() > 0); | |
1591 | assert(e.length() < header.max_size); | |
1592 | ||
1593 | if (osd_op) | |
1594 | osd_op->mark_event("commit_queued_for_journal_write"); | |
1595 | if (logger) { | |
1596 | logger->inc(l_filestore_journal_queue_bytes, orig_len); | |
1597 | logger->inc(l_filestore_journal_queue_ops, 1); | |
1598 | } | |
1599 | ||
1600 | throttle.register_throttle_seq(seq, e.length()); | |
1601 | if (logger) { | |
1602 | logger->inc(l_filestore_journal_ops, 1); | |
1603 | logger->inc(l_filestore_journal_bytes, e.length()); | |
1604 | } | |
1605 | ||
1606 | if (osd_op) { | |
1607 | osd_op->mark_event("commit_queued_for_journal_write"); | |
1608 | if (osd_op->store_trace) { | |
1609 | osd_op->journal_trace.init("journal", &trace_endpoint, &osd_op->store_trace); | |
1610 | osd_op->journal_trace.event("submit_entry"); | |
1611 | osd_op->journal_trace.keyval("seq", seq); | |
1612 | } | |
1613 | } | |
1614 | { | |
1615 | Mutex::Locker l1(writeq_lock); | |
1616 | #ifdef HAVE_LIBAIO | |
1617 | Mutex::Locker l2(aio_lock); | |
1618 | #endif | |
1619 | Mutex::Locker l3(completions_lock); | |
1620 | ||
1621 | #ifdef HAVE_LIBAIO | |
1622 | aio_write_queue_ops++; | |
1623 | aio_write_queue_bytes += e.length(); | |
1624 | aio_cond.Signal(); | |
1625 | #endif | |
1626 | ||
1627 | completions.push_back( | |
1628 | completion_item( | |
1629 | seq, oncommit, ceph_clock_now(), osd_op)); | |
1630 | if (writeq.empty()) | |
1631 | writeq_cond.Signal(); | |
1632 | writeq.push_back(write_item(seq, e, orig_len, osd_op)); | |
1633 | if (osd_op) | |
1634 | osd_op->journal_trace.keyval("queue depth", writeq.size()); | |
1635 | } | |
1636 | } | |
1637 | ||
1638 | bool FileJournal::writeq_empty() | |
1639 | { | |
1640 | Mutex::Locker locker(writeq_lock); | |
1641 | return writeq.empty(); | |
1642 | } | |
1643 | ||
1644 | FileJournal::write_item &FileJournal::peek_write() | |
1645 | { | |
1646 | assert(write_lock.is_locked()); | |
1647 | Mutex::Locker locker(writeq_lock); | |
1648 | return writeq.front(); | |
1649 | } | |
1650 | ||
1651 | void FileJournal::pop_write() | |
1652 | { | |
1653 | assert(write_lock.is_locked()); | |
1654 | Mutex::Locker locker(writeq_lock); | |
1655 | if (logger) { | |
1656 | logger->dec(l_filestore_journal_queue_bytes, writeq.front().orig_len); | |
1657 | logger->dec(l_filestore_journal_queue_ops, 1); | |
1658 | } | |
1659 | writeq.pop_front(); | |
1660 | } | |
1661 | ||
1662 | void FileJournal::batch_pop_write(list<write_item> &items) | |
1663 | { | |
1664 | assert(write_lock.is_locked()); | |
1665 | { | |
1666 | Mutex::Locker locker(writeq_lock); | |
1667 | writeq.swap(items); | |
1668 | } | |
1669 | for (auto &&i : items) { | |
1670 | if (logger) { | |
1671 | logger->dec(l_filestore_journal_queue_bytes, i.orig_len); | |
1672 | logger->dec(l_filestore_journal_queue_ops, 1); | |
1673 | } | |
1674 | } | |
1675 | } | |
1676 | ||
1677 | void FileJournal::batch_unpop_write(list<write_item> &items) | |
1678 | { | |
1679 | assert(write_lock.is_locked()); | |
1680 | for (auto &&i : items) { | |
1681 | if (logger) { | |
1682 | logger->inc(l_filestore_journal_queue_bytes, i.orig_len); | |
1683 | logger->inc(l_filestore_journal_queue_ops, 1); | |
1684 | } | |
1685 | } | |
1686 | Mutex::Locker locker(writeq_lock); | |
1687 | writeq.splice(writeq.begin(), items); | |
1688 | } | |
1689 | ||
1690 | void FileJournal::commit_start(uint64_t seq) | |
1691 | { | |
1692 | dout(10) << "commit_start" << dendl; | |
1693 | ||
1694 | // was full? | |
1695 | switch (full_state) { | |
1696 | case FULL_NOTFULL: | |
1697 | break; // all good | |
1698 | ||
1699 | case FULL_FULL: | |
1700 | if (seq >= journaled_seq) { | |
1701 | dout(1) << " FULL_FULL -> FULL_WAIT. commit_start on seq " | |
1702 | << seq << " > journaled_seq " << journaled_seq | |
1703 | << ", moving to FULL_WAIT." | |
1704 | << dendl; | |
1705 | full_state = FULL_WAIT; | |
1706 | } else { | |
1707 | dout(1) << "FULL_FULL commit_start on seq " | |
1708 | << seq << " < journaled_seq " << journaled_seq | |
1709 | << ", remaining in FULL_FULL" | |
1710 | << dendl; | |
1711 | } | |
1712 | break; | |
1713 | ||
1714 | case FULL_WAIT: | |
1715 | dout(1) << " FULL_WAIT -> FULL_NOTFULL. journal now active, setting completion plug." << dendl; | |
1716 | full_state = FULL_NOTFULL; | |
1717 | plug_journal_completions = true; | |
1718 | break; | |
1719 | } | |
1720 | } | |
1721 | ||
1722 | /* | |
1723 | *send discard command to joural block deivce | |
1724 | */ | |
1725 | void FileJournal::do_discard(int64_t offset, int64_t end) | |
1726 | { | |
1727 | dout(10) << __func__ << "trim(" << offset << ", " << end << dendl; | |
1728 | ||
1729 | offset = ROUND_UP_TO(offset, block_size); | |
1730 | if (offset >= end) | |
1731 | return; | |
1732 | end = ROUND_UP_TO(end - block_size, block_size); | |
1733 | assert(end >= offset); | |
1734 | if (offset < end) | |
1735 | if (block_device_discard(fd, offset, end - offset) < 0) | |
1736 | dout(1) << __func__ << "ioctl(BLKDISCARD) error:" << cpp_strerror(errno) << dendl; | |
1737 | } | |
1738 | ||
1739 | void FileJournal::committed_thru(uint64_t seq) | |
1740 | { | |
1741 | Mutex::Locker locker(write_lock); | |
1742 | ||
1743 | auto released = throttle.flush(seq); | |
1744 | if (logger) { | |
1745 | logger->dec(l_filestore_journal_ops, released.first); | |
1746 | logger->dec(l_filestore_journal_bytes, released.second); | |
1747 | } | |
1748 | ||
1749 | if (seq < last_committed_seq) { | |
1750 | dout(5) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl; | |
1751 | assert(seq >= last_committed_seq); | |
1752 | return; | |
1753 | } | |
1754 | if (seq == last_committed_seq) { | |
1755 | dout(5) << "committed_thru " << seq << " == last_committed_seq " << last_committed_seq << dendl; | |
1756 | return; | |
1757 | } | |
1758 | ||
1759 | dout(5) << "committed_thru " << seq << " (last_committed_seq " << last_committed_seq << ")" << dendl; | |
1760 | last_committed_seq = seq; | |
1761 | ||
1762 | // completions! | |
1763 | { | |
1764 | Mutex::Locker locker(finisher_lock); | |
1765 | queue_completions_thru(seq); | |
1766 | if (plug_journal_completions && seq >= header.start_seq) { | |
1767 | dout(10) << " removing completion plug, queuing completions thru journaled_seq " << journaled_seq << dendl; | |
1768 | plug_journal_completions = false; | |
1769 | queue_completions_thru(journaled_seq); | |
1770 | } | |
1771 | } | |
1772 | ||
1773 | // adjust start pointer | |
1774 | while (!journalq.empty() && journalq.front().first <= seq) { | |
1775 | journalq.pop_front(); | |
1776 | } | |
1777 | ||
1778 | int64_t old_start = header.start; | |
1779 | if (!journalq.empty()) { | |
1780 | header.start = journalq.front().second; | |
1781 | header.start_seq = journalq.front().first; | |
1782 | } else { | |
1783 | header.start = write_pos; | |
1784 | header.start_seq = seq + 1; | |
1785 | } | |
1786 | ||
1787 | if (discard) { | |
1788 | dout(10) << __func__ << " will trim (" << old_start << ", " << header.start << ")" << dendl; | |
1789 | if (old_start < header.start) | |
1790 | do_discard(old_start, header.start - 1); | |
1791 | else { | |
1792 | do_discard(old_start, header.max_size - 1); | |
1793 | do_discard(get_top(), header.start - 1); | |
1794 | } | |
1795 | } | |
1796 | ||
1797 | must_write_header = true; | |
1798 | print_header(header); | |
1799 | ||
1800 | // committed but unjournaled items | |
1801 | while (!writeq_empty() && peek_write().seq <= seq) { | |
1802 | dout(15) << " dropping committed but unwritten seq " << peek_write().seq | |
1803 | << " len " << peek_write().bl.length() | |
1804 | << dendl; | |
1805 | complete_write(1, peek_write().orig_len); | |
1806 | pop_write(); | |
1807 | } | |
1808 | ||
1809 | commit_cond.Signal(); | |
1810 | ||
1811 | dout(10) << "committed_thru done" << dendl; | |
1812 | } | |
1813 | ||
1814 | ||
1815 | void FileJournal::complete_write(uint64_t ops, uint64_t bytes) | |
1816 | { | |
1817 | dout(5) << __func__ << " finished " << ops << " ops and " | |
1818 | << bytes << " bytes" << dendl; | |
1819 | } | |
1820 | ||
1821 | int FileJournal::make_writeable() | |
1822 | { | |
1823 | dout(10) << __func__ << dendl; | |
1824 | int r = set_throttle_params(); | |
1825 | if (r < 0) | |
1826 | return r; | |
1827 | ||
1828 | r = _open(true); | |
1829 | if (r < 0) | |
1830 | return r; | |
1831 | ||
1832 | if (read_pos > 0) | |
1833 | write_pos = read_pos; | |
1834 | else | |
1835 | write_pos = get_top(); | |
1836 | read_pos = 0; | |
1837 | ||
1838 | must_write_header = true; | |
1839 | ||
1840 | start_writer(); | |
1841 | return 0; | |
1842 | } | |
1843 | ||
1844 | int FileJournal::set_throttle_params() | |
1845 | { | |
1846 | stringstream ss; | |
1847 | bool valid = throttle.set_params( | |
1848 | cct->_conf->journal_throttle_low_threshhold, | |
1849 | cct->_conf->journal_throttle_high_threshhold, | |
1850 | cct->_conf->filestore_expected_throughput_bytes, | |
1851 | cct->_conf->journal_throttle_high_multiple, | |
1852 | cct->_conf->journal_throttle_max_multiple, | |
1853 | header.max_size - get_top(), | |
1854 | &ss); | |
1855 | ||
1856 | if (!valid) { | |
1857 | derr << "tried to set invalid params: " | |
1858 | << ss.str() | |
1859 | << dendl; | |
1860 | } | |
1861 | return valid ? 0 : -EINVAL; | |
1862 | } | |
1863 | ||
1864 | const char** FileJournal::get_tracked_conf_keys() const | |
1865 | { | |
1866 | static const char *KEYS[] = { | |
1867 | "journal_throttle_low_threshhold", | |
1868 | "journal_throttle_high_threshhold", | |
1869 | "journal_throttle_high_multiple", | |
1870 | "journal_throttle_max_multiple", | |
1871 | "filestore_expected_throughput_bytes", | |
1872 | NULL}; | |
1873 | return KEYS; | |
1874 | } | |
1875 | ||
1876 | void FileJournal::wrap_read_bl( | |
1877 | off64_t pos, | |
1878 | int64_t olen, | |
1879 | bufferlist* bl, | |
1880 | off64_t *out_pos | |
1881 | ) const | |
1882 | { | |
1883 | while (olen > 0) { | |
1884 | while (pos >= header.max_size) | |
1885 | pos = pos + get_top() - header.max_size; | |
1886 | ||
1887 | int64_t len; | |
1888 | if (pos + olen > header.max_size) | |
1889 | len = header.max_size - pos; // partial | |
1890 | else | |
1891 | len = olen; // rest | |
1892 | ||
1893 | int64_t actual = ::lseek64(fd, pos, SEEK_SET); | |
1894 | assert(actual == pos); | |
1895 | ||
1896 | bufferptr bp = buffer::create(len); | |
1897 | int r = safe_read_exact(fd, bp.c_str(), len); | |
1898 | if (r) { | |
1899 | derr << "FileJournal::wrap_read_bl: safe_read_exact " << pos << "~" << len << " returned " | |
1900 | << r << dendl; | |
1901 | ceph_abort(); | |
1902 | } | |
1903 | bl->push_back(std::move(bp)); | |
1904 | pos += len; | |
1905 | olen -= len; | |
1906 | } | |
1907 | if (pos >= header.max_size) | |
1908 | pos = pos + get_top() - header.max_size; | |
1909 | if (out_pos) | |
1910 | *out_pos = pos; | |
1911 | } | |
1912 | ||
1913 | bool FileJournal::read_entry( | |
1914 | bufferlist &bl, | |
1915 | uint64_t &next_seq, | |
1916 | bool *corrupt) | |
1917 | { | |
1918 | if (corrupt) | |
1919 | *corrupt = false; | |
1920 | uint64_t seq = next_seq; | |
1921 | ||
1922 | if (!read_pos) { | |
1923 | dout(2) << "read_entry -- not readable" << dendl; | |
1924 | return false; | |
1925 | } | |
1926 | ||
1927 | off64_t pos = read_pos; | |
1928 | off64_t next_pos = pos; | |
1929 | stringstream ss; | |
1930 | read_entry_result result = do_read_entry( | |
1931 | pos, | |
1932 | &next_pos, | |
1933 | &bl, | |
1934 | &seq, | |
1935 | &ss); | |
1936 | if (result == SUCCESS) { | |
1937 | journalq.push_back( pair<uint64_t,off64_t>(seq, pos)); | |
1938 | uint64_t amount_to_take = | |
1939 | next_pos > pos ? | |
1940 | next_pos - pos : | |
1941 | (header.max_size - pos) + (next_pos - get_top()); | |
1942 | throttle.take(amount_to_take); | |
1943 | throttle.register_throttle_seq(next_seq, amount_to_take); | |
1944 | if (logger) { | |
1945 | logger->inc(l_filestore_journal_ops, 1); | |
1946 | logger->inc(l_filestore_journal_bytes, amount_to_take); | |
1947 | } | |
1948 | if (next_seq > seq) { | |
1949 | return false; | |
1950 | } else { | |
1951 | read_pos = next_pos; | |
1952 | next_seq = seq; | |
1953 | if (seq > journaled_seq) | |
1954 | journaled_seq = seq; | |
1955 | return true; | |
1956 | } | |
3efd9988 FG |
1957 | } else { |
1958 | derr << "do_read_entry(" << pos << "): " << ss.str() << dendl; | |
7c673cae FG |
1959 | } |
1960 | ||
1961 | if (seq && seq < header.committed_up_to) { | |
1962 | derr << "Unable to read past sequence " << seq | |
1963 | << " but header indicates the journal has committed up through " | |
1964 | << header.committed_up_to << ", journal is corrupt" << dendl; | |
1965 | if (cct->_conf->journal_ignore_corruption) { | |
1966 | if (corrupt) | |
1967 | *corrupt = true; | |
1968 | return false; | |
1969 | } else { | |
1970 | ceph_abort(); | |
1971 | } | |
1972 | } | |
1973 | ||
7c673cae FG |
1974 | dout(2) << "No further valid entries found, journal is most likely valid" |
1975 | << dendl; | |
1976 | return false; | |
1977 | } | |
1978 | ||
1979 | FileJournal::read_entry_result FileJournal::do_read_entry( | |
1980 | off64_t init_pos, | |
1981 | off64_t *next_pos, | |
1982 | bufferlist *bl, | |
1983 | uint64_t *seq, | |
1984 | ostream *ss, | |
1985 | entry_header_t *_h) const | |
1986 | { | |
1987 | off64_t cur_pos = init_pos; | |
1988 | bufferlist _bl; | |
1989 | if (!bl) | |
1990 | bl = &_bl; | |
1991 | ||
1992 | // header | |
1993 | entry_header_t *h; | |
1994 | bufferlist hbl; | |
1995 | off64_t _next_pos; | |
1996 | wrap_read_bl(cur_pos, sizeof(*h), &hbl, &_next_pos); | |
1997 | h = reinterpret_cast<entry_header_t *>(hbl.c_str()); | |
1998 | ||
1999 | if (!h->check_magic(cur_pos, header.get_fsid64())) { | |
2000 | dout(25) << "read_entry " << init_pos | |
2001 | << " : bad header magic, end of journal" << dendl; | |
2002 | if (ss) | |
2003 | *ss << "bad header magic"; | |
2004 | if (next_pos) | |
2005 | *next_pos = init_pos + (4<<10); // check 4k ahead | |
2006 | return MAYBE_CORRUPT; | |
2007 | } | |
2008 | cur_pos = _next_pos; | |
2009 | ||
2010 | // pad + body + pad | |
2011 | if (h->pre_pad) | |
2012 | cur_pos += h->pre_pad; | |
2013 | ||
2014 | bl->clear(); | |
2015 | wrap_read_bl(cur_pos, h->len, bl, &cur_pos); | |
2016 | ||
2017 | if (h->post_pad) | |
2018 | cur_pos += h->post_pad; | |
2019 | ||
2020 | // footer | |
2021 | entry_header_t *f; | |
2022 | bufferlist fbl; | |
2023 | wrap_read_bl(cur_pos, sizeof(*f), &fbl, &cur_pos); | |
2024 | f = reinterpret_cast<entry_header_t *>(fbl.c_str()); | |
2025 | if (memcmp(f, h, sizeof(*f))) { | |
2026 | if (ss) | |
2027 | *ss << "bad footer magic, partial entry"; | |
2028 | if (next_pos) | |
2029 | *next_pos = cur_pos; | |
2030 | return MAYBE_CORRUPT; | |
2031 | } | |
2032 | ||
2033 | if ((header.flags & header_t::FLAG_CRC) || // if explicitly enabled (new journal) | |
2034 | h->crc32c != 0) { // newer entry in old journal | |
2035 | uint32_t actual_crc = bl->crc32c(0); | |
2036 | if (actual_crc != h->crc32c) { | |
2037 | if (ss) | |
2038 | *ss << "header crc (" << h->crc32c | |
2039 | << ") doesn't match body crc (" << actual_crc << ")"; | |
2040 | if (next_pos) | |
2041 | *next_pos = cur_pos; | |
2042 | return MAYBE_CORRUPT; | |
2043 | } | |
2044 | } | |
2045 | ||
2046 | // yay! | |
2047 | dout(2) << "read_entry " << init_pos << " : seq " << h->seq | |
2048 | << " " << h->len << " bytes" | |
2049 | << dendl; | |
2050 | ||
2051 | // ok! | |
2052 | if (seq) | |
2053 | *seq = h->seq; | |
2054 | ||
2055 | ||
2056 | if (next_pos) | |
2057 | *next_pos = cur_pos; | |
2058 | ||
2059 | if (_h) | |
2060 | *_h = *h; | |
2061 | ||
2062 | assert(cur_pos % header.alignment == 0); | |
2063 | return SUCCESS; | |
2064 | } | |
2065 | ||
2066 | void FileJournal::reserve_throttle_and_backoff(uint64_t count) | |
2067 | { | |
2068 | throttle.get(count); | |
2069 | } | |
2070 | ||
2071 | void FileJournal::get_header( | |
2072 | uint64_t wanted_seq, | |
2073 | off64_t *_pos, | |
2074 | entry_header_t *h) | |
2075 | { | |
2076 | off64_t pos = header.start; | |
2077 | off64_t next_pos = pos; | |
2078 | bufferlist bl; | |
2079 | uint64_t seq = 0; | |
2080 | dout(2) << __func__ << dendl; | |
2081 | while (1) { | |
2082 | bl.clear(); | |
2083 | pos = next_pos; | |
2084 | read_entry_result result = do_read_entry( | |
2085 | pos, | |
2086 | &next_pos, | |
2087 | &bl, | |
2088 | &seq, | |
2089 | 0, | |
2090 | h); | |
2091 | if (result == FAILURE || result == MAYBE_CORRUPT) | |
2092 | ceph_abort(); | |
2093 | if (seq == wanted_seq) { | |
2094 | if (_pos) | |
2095 | *_pos = pos; | |
2096 | return; | |
2097 | } | |
2098 | } | |
2099 | ceph_abort(); // not reachable | |
2100 | } | |
2101 | ||
2102 | void FileJournal::corrupt( | |
2103 | int wfd, | |
2104 | off64_t corrupt_at) | |
2105 | { | |
2106 | dout(2) << __func__ << dendl; | |
2107 | if (corrupt_at >= header.max_size) | |
2108 | corrupt_at = corrupt_at + get_top() - header.max_size; | |
2109 | ||
2110 | int64_t actual = ::lseek64(fd, corrupt_at, SEEK_SET); | |
2111 | assert(actual == corrupt_at); | |
2112 | ||
2113 | char buf[10]; | |
2114 | int r = safe_read_exact(fd, buf, 1); | |
2115 | assert(r == 0); | |
2116 | ||
2117 | actual = ::lseek64(wfd, corrupt_at, SEEK_SET); | |
2118 | assert(actual == corrupt_at); | |
2119 | ||
2120 | buf[0]++; | |
2121 | r = safe_write(wfd, buf, 1); | |
2122 | assert(r == 0); | |
2123 | } | |
2124 | ||
2125 | void FileJournal::corrupt_payload( | |
2126 | int wfd, | |
2127 | uint64_t seq) | |
2128 | { | |
2129 | dout(2) << __func__ << dendl; | |
2130 | off64_t pos = 0; | |
2131 | entry_header_t h; | |
2132 | get_header(seq, &pos, &h); | |
2133 | off64_t corrupt_at = | |
2134 | pos + sizeof(entry_header_t) + h.pre_pad; | |
2135 | corrupt(wfd, corrupt_at); | |
2136 | } | |
2137 | ||
2138 | ||
2139 | void FileJournal::corrupt_footer_magic( | |
2140 | int wfd, | |
2141 | uint64_t seq) | |
2142 | { | |
2143 | dout(2) << __func__ << dendl; | |
2144 | off64_t pos = 0; | |
2145 | entry_header_t h; | |
2146 | get_header(seq, &pos, &h); | |
2147 | off64_t corrupt_at = | |
2148 | pos + sizeof(entry_header_t) + h.pre_pad + | |
2149 | h.len + h.post_pad + | |
2150 | (reinterpret_cast<char*>(&h.magic2) - reinterpret_cast<char*>(&h)); | |
2151 | corrupt(wfd, corrupt_at); | |
2152 | } | |
2153 | ||
2154 | ||
2155 | void FileJournal::corrupt_header_magic( | |
2156 | int wfd, | |
2157 | uint64_t seq) | |
2158 | { | |
2159 | dout(2) << __func__ << dendl; | |
2160 | off64_t pos = 0; | |
2161 | entry_header_t h; | |
2162 | get_header(seq, &pos, &h); | |
2163 | off64_t corrupt_at = | |
2164 | pos + | |
2165 | (reinterpret_cast<char*>(&h.magic2) - reinterpret_cast<char*>(&h)); | |
2166 | corrupt(wfd, corrupt_at); | |
2167 | } | |
2168 | ||
2169 | off64_t FileJournal::get_journal_size_estimate() | |
2170 | { | |
2171 | off64_t size, start = header.start; | |
2172 | if (write_pos < start) { | |
2173 | size = (max_size - start) + write_pos; | |
2174 | } else { | |
2175 | size = write_pos - start; | |
2176 | } | |
2177 | dout(20) << __func__ << " journal size=" << size << dendl; | |
2178 | return size; | |
2179 | } |