]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | #include "acconfig.h" | |
15 | ||
16 | #include "common/debug.h" | |
17 | #include "common/errno.h" | |
18 | #include "common/safe_io.h" | |
19 | #include "FileJournal.h" | |
20 | #include "include/color.h" | |
21 | #include "common/perf_counters.h" | |
22 | #include "FileStore.h" | |
23 | ||
24 | #include "include/compat.h" | |
25 | ||
26 | #include <fcntl.h> | |
27 | #include <limits.h> | |
28 | #include <sstream> | |
29 | #include <stdio.h> | |
30 | #include <stdlib.h> | |
31 | #include <sys/types.h> | |
32 | #include <sys/stat.h> | |
33 | #include <sys/mount.h> | |
34 | ||
35 | #include "common/blkdev.h" | |
36 | #if defined(__linux__) | |
37 | #include "common/linux_version.h" | |
38 | #endif | |
39 | ||
40 | #if defined(__FreeBSD__) | |
41 | #define O_DSYNC O_SYNC | |
42 | #endif | |
43 | ||
44 | #define dout_context cct | |
45 | #define dout_subsys ceph_subsys_journal | |
46 | #undef dout_prefix | |
47 | #define dout_prefix *_dout << "journal " | |
48 | ||
f67539c2 TL |
49 | using std::list; |
50 | using std::map; | |
51 | using std::ostream; | |
52 | using std::ostringstream; | |
53 | using std::pair; | |
54 | using std::set; | |
55 | using std::string; | |
56 | using std::stringstream; | |
57 | using std::vector; | |
58 | ||
59 | using ceph::bufferlist; | |
60 | using ceph::bufferptr; | |
61 | using ceph::Formatter; | |
62 | using ceph::JSONFormatter; | |
63 | ||
7c673cae FG |
64 | const static int64_t ONE_MEG(1 << 20); |
65 | const static int CEPH_DIRECTIO_ALIGNMENT(4096); | |
66 | ||
67 | ||
68 | int FileJournal::_open(bool forwrite, bool create) | |
69 | { | |
70 | int flags, ret; | |
71 | ||
72 | if (forwrite) { | |
73 | flags = O_RDWR; | |
74 | if (directio) | |
75 | flags |= O_DIRECT | O_DSYNC; | |
76 | } else { | |
77 | flags = O_RDONLY; | |
78 | } | |
79 | if (create) | |
80 | flags |= O_CREAT; | |
81 | ||
82 | if (fd >= 0) { | |
83 | if (TEMP_FAILURE_RETRY(::close(fd))) { | |
84 | int err = errno; | |
85 | derr << "FileJournal::_open: error closing old fd: " | |
86 | << cpp_strerror(err) << dendl; | |
87 | } | |
88 | } | |
91327a77 | 89 | fd = TEMP_FAILURE_RETRY(::open(fn.c_str(), flags|O_CLOEXEC, 0644)); |
7c673cae FG |
90 | if (fd < 0) { |
91 | int err = errno; | |
92 | dout(2) << "FileJournal::_open unable to open journal " | |
93 | << fn << ": " << cpp_strerror(err) << dendl; | |
94 | return -err; | |
95 | } | |
96 | ||
97 | struct stat st; | |
98 | ret = ::fstat(fd, &st); | |
99 | if (ret) { | |
100 | ret = errno; | |
101 | derr << "FileJournal::_open: unable to fstat journal: " << cpp_strerror(ret) << dendl; | |
102 | ret = -ret; | |
103 | goto out_fd; | |
104 | } | |
105 | ||
106 | if (S_ISBLK(st.st_mode)) { | |
107 | ret = _open_block_device(); | |
108 | } else if (S_ISREG(st.st_mode)) { | |
109 | if (aio && !force_aio) { | |
110 | derr << "FileJournal::_open: disabling aio for non-block journal. Use " | |
111 | << "journal_force_aio to force use of aio anyway" << dendl; | |
112 | aio = false; | |
113 | } | |
114 | ret = _open_file(st.st_size, st.st_blksize, create); | |
115 | } else { | |
116 | derr << "FileJournal::_open: wrong journal file type: " << st.st_mode | |
117 | << dendl; | |
118 | ret = -EINVAL; | |
119 | } | |
120 | ||
121 | if (ret) | |
122 | goto out_fd; | |
123 | ||
124 | #ifdef HAVE_LIBAIO | |
125 | if (aio) { | |
126 | aio_ctx = 0; | |
127 | ret = io_setup(128, &aio_ctx); | |
128 | if (ret < 0) { | |
129 | switch (ret) { | |
130 | // Contrary to naive expectations -EAGIAN means ... | |
131 | case -EAGAIN: | |
132 | derr << "FileJournal::_open: user's limit of aio events exceeded. " | |
133 | << "Try increasing /proc/sys/fs/aio-max-nr" << dendl; | |
134 | break; | |
135 | default: | |
136 | derr << "FileJournal::_open: unable to setup io_context " << cpp_strerror(-ret) << dendl; | |
137 | break; | |
138 | } | |
139 | goto out_fd; | |
140 | } | |
141 | } | |
142 | #endif | |
143 | ||
144 | /* We really want max_size to be a multiple of block_size. */ | |
145 | max_size -= max_size % block_size; | |
146 | ||
147 | dout(1) << "_open " << fn << " fd " << fd | |
148 | << ": " << max_size | |
149 | << " bytes, block size " << block_size | |
150 | << " bytes, directio = " << directio | |
151 | << ", aio = " << aio | |
152 | << dendl; | |
153 | return 0; | |
154 | ||
155 | out_fd: | |
156 | VOID_TEMP_FAILURE_RETRY(::close(fd)); | |
157 | fd = -1; | |
158 | return ret; | |
159 | } | |
160 | ||
161 | int FileJournal::_open_block_device() | |
162 | { | |
163 | int64_t bdev_sz = 0; | |
11fdf7f2 TL |
164 | BlkDev blkdev(fd); |
165 | int ret = blkdev.get_size(&bdev_sz); | |
7c673cae FG |
166 | if (ret) { |
167 | dout(0) << __func__ << ": failed to read block device size." << dendl; | |
168 | return -EIO; | |
169 | } | |
170 | ||
171 | /* Check for bdev_sz too small */ | |
172 | if (bdev_sz < ONE_MEG) { | |
173 | dout(0) << __func__ << ": your block device must be at least " | |
174 | << ONE_MEG << " bytes to be used for a Ceph journal." << dendl; | |
175 | return -EINVAL; | |
176 | } | |
177 | ||
178 | dout(10) << __func__ << ": ignoring osd journal size. " | |
179 | << "We'll use the entire block device (size: " << bdev_sz << ")" | |
180 | << dendl; | |
181 | max_size = bdev_sz; | |
182 | ||
183 | block_size = cct->_conf->journal_block_size; | |
184 | ||
185 | if (cct->_conf->journal_discard) { | |
11fdf7f2 | 186 | discard = blkdev.support_discard(); |
7c673cae FG |
187 | dout(10) << fn << " support discard: " << (int)discard << dendl; |
188 | } | |
189 | ||
190 | return 0; | |
191 | } | |
192 | ||
193 | int FileJournal::_open_file(int64_t oldsize, blksize_t blksize, | |
194 | bool create) | |
195 | { | |
196 | int ret; | |
197 | int64_t conf_journal_sz(cct->_conf->osd_journal_size); | |
198 | conf_journal_sz <<= 20; | |
199 | ||
200 | if ((cct->_conf->osd_journal_size == 0) && (oldsize < ONE_MEG)) { | |
201 | derr << "I'm sorry, I don't know how large of a journal to create." | |
202 | << "Please specify a block device to use as the journal OR " | |
203 | << "set osd_journal_size in your ceph.conf" << dendl; | |
204 | return -EINVAL; | |
205 | } | |
206 | ||
207 | if (create && (oldsize < conf_journal_sz)) { | |
208 | uint64_t newsize(conf_journal_sz); | |
209 | dout(10) << __func__ << " _open extending to " << newsize << " bytes" << dendl; | |
210 | ret = ::ftruncate(fd, newsize); | |
211 | if (ret < 0) { | |
212 | int err = errno; | |
213 | derr << "FileJournal::_open_file : unable to extend journal to " | |
214 | << newsize << " bytes: " << cpp_strerror(err) << dendl; | |
215 | return -err; | |
216 | } | |
28e407b8 | 217 | ret = ceph_posix_fallocate(fd, 0, newsize); |
7c673cae FG |
218 | if (ret) { |
219 | derr << "FileJournal::_open_file : unable to preallocation journal to " | |
220 | << newsize << " bytes: " << cpp_strerror(ret) << dendl; | |
221 | return -ret; | |
222 | } | |
223 | max_size = newsize; | |
7c673cae FG |
224 | } |
225 | else { | |
226 | max_size = oldsize; | |
227 | } | |
228 | block_size = cct->_conf->journal_block_size; | |
229 | ||
230 | if (create && cct->_conf->journal_zero_on_create) { | |
231 | derr << "FileJournal::_open_file : zeroing journal" << dendl; | |
232 | uint64_t write_size = 1 << 20; | |
233 | char *buf; | |
234 | ret = ::posix_memalign((void **)&buf, block_size, write_size); | |
235 | if (ret != 0) { | |
236 | return -ret; | |
237 | } | |
238 | memset(static_cast<void*>(buf), 0, write_size); | |
239 | uint64_t i = 0; | |
240 | for (; (i + write_size) <= (uint64_t)max_size; i += write_size) { | |
241 | ret = ::pwrite(fd, static_cast<void*>(buf), write_size, i); | |
242 | if (ret < 0) { | |
f67539c2 | 243 | aligned_free(buf); |
7c673cae FG |
244 | return -errno; |
245 | } | |
246 | } | |
247 | if (i < (uint64_t)max_size) { | |
248 | ret = ::pwrite(fd, static_cast<void*>(buf), max_size - i, i); | |
249 | if (ret < 0) { | |
f67539c2 | 250 | aligned_free(buf); |
7c673cae FG |
251 | return -errno; |
252 | } | |
253 | } | |
f67539c2 | 254 | aligned_free(buf); |
7c673cae FG |
255 | } |
256 | ||
257 | ||
258 | dout(10) << "_open journal is not a block device, NOT checking disk " | |
259 | << "write cache on '" << fn << "'" << dendl; | |
260 | ||
261 | return 0; | |
262 | } | |
263 | ||
264 | // This can not be used on an active journal | |
265 | int FileJournal::check() | |
266 | { | |
267 | int ret; | |
268 | ||
11fdf7f2 | 269 | ceph_assert(fd == -1); |
7c673cae FG |
270 | ret = _open(false, false); |
271 | if (ret) | |
272 | return ret; | |
273 | ||
274 | ret = read_header(&header); | |
275 | if (ret < 0) | |
276 | goto done; | |
277 | ||
278 | if (header.fsid != fsid) { | |
279 | derr << "check: ondisk fsid " << header.fsid << " doesn't match expected " << fsid | |
280 | << ", invalid (someone else's?) journal" << dendl; | |
281 | ret = -EINVAL; | |
282 | goto done; | |
283 | } | |
284 | ||
285 | dout(1) << "check: header looks ok" << dendl; | |
286 | ret = 0; | |
287 | ||
288 | done: | |
289 | close(); | |
290 | return ret; | |
291 | } | |
292 | ||
293 | ||
294 | int FileJournal::create() | |
295 | { | |
296 | void *buf = 0; | |
297 | int64_t needed_space; | |
298 | int ret; | |
f67539c2 | 299 | ceph::buffer::ptr bp; |
7c673cae FG |
300 | dout(2) << "create " << fn << " fsid " << fsid << dendl; |
301 | ||
302 | ret = _open(true, true); | |
303 | if (ret) | |
304 | goto done; | |
305 | ||
306 | // write empty header | |
307 | header = header_t(); | |
308 | header.flags = header_t::FLAG_CRC; // enable crcs on any new journal. | |
309 | header.fsid = fsid; | |
310 | header.max_size = max_size; | |
311 | header.block_size = block_size; | |
312 | if (cct->_conf->journal_block_align || directio) | |
313 | header.alignment = block_size; | |
314 | else | |
315 | header.alignment = 16; // at least stay word aligned on 64bit machines... | |
316 | ||
317 | header.start = get_top(); | |
318 | header.start_seq = 0; | |
319 | ||
320 | print_header(header); | |
321 | ||
322 | // static zeroed buffer for alignment padding | |
323 | delete [] zero_buf; | |
324 | zero_buf = new char[header.alignment]; | |
325 | memset(zero_buf, 0, header.alignment); | |
326 | ||
327 | bp = prepare_header(); | |
328 | if (TEMP_FAILURE_RETRY(::pwrite(fd, bp.c_str(), bp.length(), 0)) < 0) { | |
329 | ret = -errno; | |
330 | derr << "FileJournal::create : create write header error " | |
331 | << cpp_strerror(ret) << dendl; | |
332 | goto close_fd; | |
333 | } | |
334 | ||
335 | // zero first little bit, too. | |
336 | ret = posix_memalign(&buf, block_size, block_size); | |
337 | if (ret) { | |
338 | ret = -ret; | |
339 | derr << "FileJournal::create: failed to allocate " << block_size | |
340 | << " bytes of memory: " << cpp_strerror(ret) << dendl; | |
341 | goto close_fd; | |
342 | } | |
343 | memset(buf, 0, block_size); | |
344 | if (TEMP_FAILURE_RETRY(::pwrite(fd, buf, block_size, get_top())) < 0) { | |
345 | ret = -errno; | |
346 | derr << "FileJournal::create: error zeroing first " << block_size | |
347 | << " bytes " << cpp_strerror(ret) << dendl; | |
348 | goto free_buf; | |
349 | } | |
350 | ||
11fdf7f2 | 351 | needed_space = cct->_conf->osd_max_write_size << 20; |
7c673cae FG |
352 | needed_space += (2 * sizeof(entry_header_t)) + get_top(); |
353 | if (header.max_size - header.start < needed_space) { | |
354 | derr << "FileJournal::create: OSD journal is not large enough to hold " | |
355 | << "osd_max_write_size bytes!" << dendl; | |
356 | ret = -ENOSPC; | |
357 | goto free_buf; | |
358 | } | |
359 | ||
360 | dout(2) << "create done" << dendl; | |
361 | ret = 0; | |
362 | ||
363 | free_buf: | |
364 | free(buf); | |
365 | buf = 0; | |
366 | close_fd: | |
367 | if (TEMP_FAILURE_RETRY(::close(fd)) < 0) { | |
368 | ret = -errno; | |
369 | derr << "FileJournal::create: error closing fd: " << cpp_strerror(ret) | |
370 | << dendl; | |
371 | } | |
372 | done: | |
373 | fd = -1; | |
374 | return ret; | |
375 | } | |
376 | ||
377 | // This can not be used on an active journal | |
378 | int FileJournal::peek_fsid(uuid_d& fsid) | |
379 | { | |
11fdf7f2 | 380 | ceph_assert(fd == -1); |
7c673cae FG |
381 | int r = _open(false, false); |
382 | if (r) | |
383 | return r; | |
384 | r = read_header(&header); | |
385 | if (r < 0) | |
386 | goto out; | |
387 | fsid = header.fsid; | |
388 | out: | |
389 | close(); | |
390 | return r; | |
391 | } | |
392 | ||
393 | int FileJournal::open(uint64_t fs_op_seq) | |
394 | { | |
395 | dout(2) << "open " << fn << " fsid " << fsid << " fs_op_seq " << fs_op_seq << dendl; | |
396 | ||
397 | uint64_t next_seq = fs_op_seq + 1; | |
224ce89b | 398 | uint64_t seq = -1; |
7c673cae FG |
399 | |
400 | int err = _open(false); | |
401 | if (err) | |
402 | return err; | |
403 | ||
404 | // assume writeable, unless... | |
405 | read_pos = 0; | |
406 | write_pos = get_top(); | |
407 | ||
408 | // read header? | |
409 | err = read_header(&header); | |
410 | if (err < 0) | |
224ce89b | 411 | goto out; |
7c673cae FG |
412 | |
413 | // static zeroed buffer for alignment padding | |
414 | delete [] zero_buf; | |
415 | zero_buf = new char[header.alignment]; | |
416 | memset(zero_buf, 0, header.alignment); | |
417 | ||
418 | dout(10) << "open header.fsid = " << header.fsid | |
419 | //<< " vs expected fsid = " << fsid | |
420 | << dendl; | |
421 | if (header.fsid != fsid) { | |
422 | derr << "FileJournal::open: ondisk fsid " << header.fsid << " doesn't match expected " << fsid | |
423 | << ", invalid (someone else's?) journal" << dendl; | |
224ce89b WB |
424 | err = -EINVAL; |
425 | goto out; | |
7c673cae FG |
426 | } |
427 | if (header.max_size > max_size) { | |
428 | dout(2) << "open journal size " << header.max_size << " > current " << max_size << dendl; | |
224ce89b WB |
429 | err = -EINVAL; |
430 | goto out; | |
7c673cae FG |
431 | } |
432 | if (header.block_size != block_size) { | |
433 | dout(2) << "open journal block size " << header.block_size << " != current " << block_size << dendl; | |
224ce89b WB |
434 | err = -EINVAL; |
435 | goto out; | |
7c673cae FG |
436 | } |
437 | if (header.max_size % header.block_size) { | |
438 | dout(2) << "open journal max size " << header.max_size | |
439 | << " not a multiple of block size " << header.block_size << dendl; | |
224ce89b WB |
440 | err = -EINVAL; |
441 | goto out; | |
7c673cae FG |
442 | } |
443 | if (header.alignment != block_size && directio) { | |
444 | dout(0) << "open journal alignment " << header.alignment << " does not match block size " | |
445 | << block_size << " (required for direct_io journal mode)" << dendl; | |
224ce89b WB |
446 | err = -EINVAL; |
447 | goto out; | |
7c673cae FG |
448 | } |
449 | if ((header.alignment % CEPH_DIRECTIO_ALIGNMENT) && directio) { | |
450 | dout(0) << "open journal alignment " << header.alignment | |
451 | << " is not multiple of minimum directio alignment " | |
452 | << CEPH_DIRECTIO_ALIGNMENT << " (required for direct_io journal mode)" | |
453 | << dendl; | |
224ce89b WB |
454 | err = -EINVAL; |
455 | goto out; | |
7c673cae FG |
456 | } |
457 | ||
458 | // looks like a valid header. | |
459 | write_pos = 0; // not writeable yet | |
460 | ||
461 | journaled_seq = header.committed_up_to; | |
462 | ||
463 | // find next entry | |
464 | read_pos = header.start; | |
224ce89b | 465 | seq = header.start_seq; |
7c673cae FG |
466 | |
467 | while (1) { | |
468 | bufferlist bl; | |
469 | off64_t old_pos = read_pos; | |
470 | if (!read_entry(bl, seq)) { | |
471 | dout(10) << "open reached end of journal." << dendl; | |
472 | break; | |
473 | } | |
474 | if (seq > next_seq) { | |
475 | dout(10) << "open entry " << seq << " len " << bl.length() << " > next_seq " << next_seq | |
476 | << ", ignoring journal contents" | |
477 | << dendl; | |
478 | read_pos = -1; | |
479 | last_committed_seq = 0; | |
480 | return 0; | |
481 | } | |
482 | if (seq == next_seq) { | |
483 | dout(10) << "open reached seq " << seq << dendl; | |
484 | read_pos = old_pos; | |
485 | break; | |
486 | } | |
487 | seq++; // next event should follow. | |
488 | } | |
489 | ||
490 | return 0; | |
224ce89b WB |
491 | out: |
492 | close(); | |
493 | return err; | |
7c673cae FG |
494 | } |
495 | ||
496 | void FileJournal::_close(int fd) const | |
497 | { | |
498 | VOID_TEMP_FAILURE_RETRY(::close(fd)); | |
499 | } | |
500 | ||
501 | void FileJournal::close() | |
502 | { | |
503 | dout(1) << "close " << fn << dendl; | |
504 | ||
505 | // stop writer thread | |
506 | stop_writer(); | |
507 | ||
508 | // close | |
11fdf7f2 TL |
509 | ceph_assert(writeq_empty()); |
510 | ceph_assert(!must_write_header); | |
511 | ceph_assert(fd >= 0); | |
7c673cae FG |
512 | _close(fd); |
513 | fd = -1; | |
514 | } | |
515 | ||
516 | ||
517 | int FileJournal::dump(ostream& out) | |
518 | { | |
519 | return _dump(out, false); | |
520 | } | |
521 | ||
522 | int FileJournal::simple_dump(ostream& out) | |
523 | { | |
524 | return _dump(out, true); | |
525 | } | |
526 | ||
527 | int FileJournal::_dump(ostream& out, bool simple) | |
528 | { | |
529 | JSONFormatter f(true); | |
530 | int ret = _fdump(f, simple); | |
531 | f.flush(out); | |
532 | return ret; | |
533 | } | |
534 | ||
535 | int FileJournal::_fdump(Formatter &f, bool simple) | |
536 | { | |
537 | dout(10) << "_fdump" << dendl; | |
538 | ||
11fdf7f2 | 539 | ceph_assert(fd == -1); |
7c673cae FG |
540 | int err = _open(false, false); |
541 | if (err) | |
542 | return err; | |
543 | ||
544 | err = read_header(&header); | |
545 | if (err < 0) { | |
546 | close(); | |
547 | return err; | |
548 | } | |
549 | ||
550 | off64_t next_pos = header.start; | |
551 | ||
552 | f.open_object_section("journal"); | |
553 | ||
554 | f.open_object_section("header"); | |
555 | f.dump_unsigned("flags", header.flags); | |
556 | ostringstream os; | |
557 | os << header.fsid; | |
558 | f.dump_string("fsid", os.str()); | |
559 | f.dump_unsigned("block_size", header.block_size); | |
560 | f.dump_unsigned("alignment", header.alignment); | |
561 | f.dump_int("max_size", header.max_size); | |
562 | f.dump_int("start", header.start); | |
563 | f.dump_unsigned("committed_up_to", header.committed_up_to); | |
564 | f.dump_unsigned("start_seq", header.start_seq); | |
565 | f.close_section(); | |
566 | ||
567 | f.open_array_section("entries"); | |
568 | uint64_t seq = header.start_seq; | |
569 | while (1) { | |
570 | bufferlist bl; | |
571 | off64_t pos = next_pos; | |
572 | ||
573 | if (!pos) { | |
574 | dout(2) << "_dump -- not readable" << dendl; | |
575 | err = -EINVAL; | |
576 | break; | |
577 | } | |
578 | stringstream ss; | |
579 | read_entry_result result = do_read_entry( | |
580 | pos, | |
581 | &next_pos, | |
582 | &bl, | |
583 | &seq, | |
584 | &ss); | |
585 | if (result != SUCCESS) { | |
586 | if (seq < header.committed_up_to) { | |
587 | dout(2) << "Unable to read past sequence " << seq | |
588 | << " but header indicates the journal has committed up through " | |
589 | << header.committed_up_to << ", journal is corrupt" << dendl; | |
590 | err = -EINVAL; | |
591 | } | |
592 | dout(25) << ss.str() << dendl; | |
593 | dout(25) << "No further valid entries found, journal is most likely valid" | |
594 | << dendl; | |
595 | break; | |
596 | } | |
597 | ||
598 | f.open_object_section("entry"); | |
599 | f.dump_unsigned("offset", pos); | |
600 | f.dump_unsigned("seq", seq); | |
601 | if (simple) { | |
602 | f.dump_unsigned("bl.length", bl.length()); | |
603 | } else { | |
604 | f.open_array_section("transactions"); | |
11fdf7f2 | 605 | auto p = bl.cbegin(); |
7c673cae FG |
606 | int trans_num = 0; |
607 | while (!p.end()) { | |
608 | ObjectStore::Transaction t(p); | |
609 | f.open_object_section("transaction"); | |
610 | f.dump_unsigned("trans_num", trans_num); | |
611 | t.dump(&f); | |
612 | f.close_section(); | |
613 | trans_num++; | |
614 | } | |
615 | f.close_section(); | |
616 | } | |
617 | f.close_section(); | |
618 | } | |
619 | ||
620 | f.close_section(); | |
621 | f.close_section(); | |
622 | dout(10) << "dump finish" << dendl; | |
623 | ||
624 | close(); | |
625 | return err; | |
626 | } | |
627 | ||
628 | ||
629 | void FileJournal::start_writer() | |
630 | { | |
631 | write_stop = false; | |
632 | aio_stop = false; | |
633 | write_thread.create("journal_write"); | |
634 | #ifdef HAVE_LIBAIO | |
635 | if (aio) | |
636 | write_finish_thread.create("journal_wrt_fin"); | |
637 | #endif | |
638 | } | |
639 | ||
640 | void FileJournal::stop_writer() | |
641 | { | |
642 | // Do nothing if writer already stopped or never started | |
643 | if (!write_stop) | |
644 | { | |
645 | { | |
9f95a23c TL |
646 | std::lock_guard l{write_lock}; |
647 | std::lock_guard p{writeq_lock}; | |
7c673cae | 648 | write_stop = true; |
9f95a23c | 649 | writeq_cond.notify_all(); |
7c673cae FG |
650 | // Doesn't hurt to signal commit_cond in case thread is waiting there |
651 | // and caller didn't use committed_thru() first. | |
9f95a23c | 652 | commit_cond.notify_all(); |
7c673cae FG |
653 | } |
654 | write_thread.join(); | |
655 | ||
656 | // write journal header now so that we have less to replay on remount | |
657 | write_header_sync(); | |
658 | } | |
659 | ||
660 | #ifdef HAVE_LIBAIO | |
661 | // stop aio completeion thread *after* writer thread has stopped | |
662 | // and has submitted all of its io | |
663 | if (aio && !aio_stop) { | |
9f95a23c | 664 | aio_lock.lock(); |
7c673cae | 665 | aio_stop = true; |
9f95a23c TL |
666 | aio_cond.notify_all(); |
667 | write_finish_cond.notify_all(); | |
668 | aio_lock.unlock(); | |
7c673cae FG |
669 | write_finish_thread.join(); |
670 | } | |
671 | #endif | |
672 | } | |
673 | ||
674 | ||
675 | ||
676 | void FileJournal::print_header(const header_t &header) const | |
677 | { | |
678 | dout(10) << "header: block_size " << header.block_size | |
679 | << " alignment " << header.alignment | |
680 | << " max_size " << header.max_size | |
681 | << dendl; | |
682 | dout(10) << "header: start " << header.start << dendl; | |
683 | dout(10) << " write_pos " << write_pos << dendl; | |
684 | } | |
685 | ||
686 | int FileJournal::read_header(header_t *hdr) const | |
687 | { | |
688 | dout(10) << "read_header" << dendl; | |
689 | bufferlist bl; | |
690 | ||
f67539c2 | 691 | ceph::buffer::ptr bp = ceph::buffer::create_small_page_aligned(block_size); |
7c673cae FG |
692 | char* bpdata = bp.c_str(); |
693 | int r = ::pread(fd, bpdata, bp.length(), 0); | |
694 | ||
695 | if (r < 0) { | |
696 | int err = errno; | |
697 | dout(0) << "read_header got " << cpp_strerror(err) << dendl; | |
698 | return -err; | |
699 | } | |
700 | ||
701 | // don't use bp.zero() here, because it also invalidates | |
702 | // crc cache (which is not yet populated anyway) | |
703 | if (bp.length() != (size_t)r) { | |
704 | // r will be always less or equal than bp.length | |
705 | bpdata += r; | |
706 | memset(bpdata, 0, bp.length() - r); | |
707 | } | |
708 | ||
709 | bl.push_back(std::move(bp)); | |
710 | ||
711 | try { | |
11fdf7f2 TL |
712 | auto p = bl.cbegin(); |
713 | decode(*hdr, p); | |
7c673cae | 714 | } |
f67539c2 | 715 | catch (ceph::buffer::error& e) { |
7c673cae FG |
716 | derr << "read_header error decoding journal header" << dendl; |
717 | return -EINVAL; | |
718 | } | |
719 | ||
720 | ||
721 | /* | |
722 | * Unfortunately we weren't initializing the flags field for new | |
723 | * journals! Aie. This is safe(ish) now that we have only one | |
724 | * flag. Probably around when we add the next flag we need to | |
725 | * remove this or else this (eventually old) code will clobber newer | |
726 | * code's flags. | |
727 | */ | |
728 | if (hdr->flags > 3) { | |
729 | derr << "read_header appears to have gibberish flags; assuming 0" << dendl; | |
730 | hdr->flags = 0; | |
731 | } | |
732 | ||
733 | print_header(*hdr); | |
734 | ||
735 | return 0; | |
736 | } | |
737 | ||
738 | bufferptr FileJournal::prepare_header() | |
739 | { | |
740 | bufferlist bl; | |
741 | { | |
9f95a23c | 742 | std::lock_guard l{finisher_lock}; |
7c673cae FG |
743 | header.committed_up_to = journaled_seq; |
744 | } | |
11fdf7f2 | 745 | encode(header, bl); |
f67539c2 | 746 | bufferptr bp = ceph::buffer::create_small_page_aligned(get_top()); |
7c673cae FG |
747 | // don't use bp.zero() here, because it also invalidates |
748 | // crc cache (which is not yet populated anyway) | |
749 | char* data = bp.c_str(); | |
750 | memcpy(data, bl.c_str(), bl.length()); | |
751 | data += bl.length(); | |
752 | memset(data, 0, bp.length()-bl.length()); | |
753 | return bp; | |
754 | } | |
755 | ||
756 | void FileJournal::write_header_sync() | |
757 | { | |
9f95a23c | 758 | std::lock_guard locker{write_lock}; |
7c673cae FG |
759 | must_write_header = true; |
760 | bufferlist bl; | |
761 | do_write(bl); | |
762 | dout(20) << __func__ << " finish" << dendl; | |
763 | } | |
764 | ||
765 | int FileJournal::check_for_full(uint64_t seq, off64_t pos, off64_t size) | |
766 | { | |
767 | // already full? | |
768 | if (full_state != FULL_NOTFULL) | |
769 | return -ENOSPC; | |
770 | ||
771 | // take 1 byte off so that we only get pos == header.start on EMPTY, never on FULL. | |
772 | off64_t room; | |
773 | if (pos >= header.start) | |
774 | room = (header.max_size - pos) + (header.start - get_top()) - 1; | |
775 | else | |
776 | room = header.start - pos - 1; | |
777 | dout(10) << "room " << room << " max_size " << max_size << " pos " << pos << " header.start " << header.start | |
778 | << " top " << get_top() << dendl; | |
779 | ||
780 | if (do_sync_cond) { | |
781 | if (room >= (header.max_size >> 1) && | |
782 | room - size < (header.max_size >> 1)) { | |
783 | dout(10) << " passing half full mark, triggering commit" << dendl; | |
9f95a23c TL |
784 | #ifdef CEPH_DEBUG_MUTEX |
785 | do_sync_cond->notify_all(true); // initiate a real commit so we can trim | |
786 | #else | |
787 | do_sync_cond->notify_all(); | |
788 | #endif | |
7c673cae FG |
789 | } |
790 | } | |
791 | ||
792 | if (room >= size) { | |
793 | dout(10) << "check_for_full at " << pos << " : " << size << " < " << room << dendl; | |
794 | if (pos + size > header.max_size) | |
795 | must_write_header = true; | |
796 | return 0; | |
797 | } | |
798 | ||
799 | // full | |
800 | dout(1) << "check_for_full at " << pos << " : JOURNAL FULL " | |
801 | << pos << " >= " << room | |
802 | << " (max_size " << header.max_size << " start " << header.start << ")" | |
803 | << dendl; | |
804 | ||
805 | off64_t max = header.max_size - get_top(); | |
806 | if (size > max) | |
807 | dout(0) << "JOURNAL TOO SMALL: continuing, but slow: item " << size << " > journal " << max << " (usable)" << dendl; | |
808 | ||
809 | return -ENOSPC; | |
810 | } | |
811 | ||
812 | int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytes) | |
813 | { | |
814 | // gather queued writes | |
815 | off64_t queue_pos = write_pos; | |
816 | ||
817 | int eleft = cct->_conf->journal_max_write_entries; | |
818 | unsigned bmax = cct->_conf->journal_max_write_bytes; | |
819 | ||
820 | if (full_state != FULL_NOTFULL) | |
821 | return -ENOSPC; | |
822 | ||
823 | while (!writeq_empty()) { | |
824 | list<write_item> items; | |
825 | batch_pop_write(items); | |
826 | list<write_item>::iterator it = items.begin(); | |
827 | while (it != items.end()) { | |
828 | uint64_t bytes = it->bl.length(); | |
829 | int r = prepare_single_write(*it, bl, queue_pos, orig_ops, orig_bytes); | |
830 | if (r == 0) { // prepare ok, delete it | |
831 | items.erase(it++); | |
832 | #ifdef HAVE_LIBAIO | |
833 | { | |
9f95a23c | 834 | std::lock_guard locker{aio_lock}; |
11fdf7f2 | 835 | ceph_assert(aio_write_queue_ops > 0); |
7c673cae | 836 | aio_write_queue_ops--; |
11fdf7f2 | 837 | ceph_assert(aio_write_queue_bytes >= bytes); |
7c673cae FG |
838 | aio_write_queue_bytes -= bytes; |
839 | } | |
840 | #else | |
841 | (void)bytes; | |
842 | #endif | |
843 | } | |
844 | if (r == -ENOSPC) { | |
845 | // the journal maybe full, insert the left item to writeq | |
846 | batch_unpop_write(items); | |
847 | if (orig_ops) | |
848 | goto out; // commit what we have | |
849 | ||
850 | if (logger) | |
851 | logger->inc(l_filestore_journal_full); | |
852 | ||
853 | if (wait_on_full) { | |
854 | dout(20) << "prepare_multi_write full on first entry, need to wait" << dendl; | |
855 | } else { | |
856 | dout(20) << "prepare_multi_write full on first entry, restarting journal" << dendl; | |
857 | ||
858 | // throw out what we have so far | |
859 | full_state = FULL_FULL; | |
860 | while (!writeq_empty()) { | |
861 | complete_write(1, peek_write().orig_len); | |
862 | pop_write(); | |
863 | } | |
864 | print_header(header); | |
865 | } | |
224ce89b | 866 | |
7c673cae FG |
867 | return -ENOSPC; // hrm, full on first op |
868 | } | |
869 | if (eleft) { | |
870 | if (--eleft == 0) { | |
871 | dout(20) << "prepare_multi_write hit max events per write " | |
872 | << cct->_conf->journal_max_write_entries << dendl; | |
873 | batch_unpop_write(items); | |
874 | goto out; | |
875 | } | |
876 | } | |
877 | if (bmax) { | |
878 | if (bl.length() >= bmax) { | |
879 | dout(20) << "prepare_multi_write hit max write size " | |
880 | << cct->_conf->journal_max_write_bytes << dendl; | |
881 | batch_unpop_write(items); | |
882 | goto out; | |
883 | } | |
884 | } | |
885 | } | |
886 | } | |
887 | ||
888 | out: | |
889 | dout(20) << "prepare_multi_write queue_pos now " << queue_pos << dendl; | |
11fdf7f2 | 890 | ceph_assert((write_pos + bl.length() == queue_pos) || |
7c673cae FG |
891 | (write_pos + bl.length() - header.max_size + get_top() == queue_pos)); |
892 | return 0; | |
893 | } | |
894 | ||
895 | /* | |
896 | void FileJournal::queue_write_fin(uint64_t seq, Context *fin) | |
897 | { | |
898 | writing_seq.push_back(seq); | |
899 | if (!waiting_for_notfull.empty()) { | |
900 | // make sure previously unjournaled stuff waiting for UNFULL triggers | |
901 | // _before_ newly journaled stuff does | |
902 | dout(10) << "queue_write_fin will defer seq " << seq << " callback " << fin | |
903 | << " until after UNFULL" << dendl; | |
904 | C_Gather *g = new C_Gather(writeq.front().fin); | |
905 | writing_fin.push_back(g->new_sub()); | |
906 | waiting_for_notfull.push_back(g->new_sub()); | |
907 | } else { | |
908 | writing_fin.push_back(writeq.front().fin); | |
909 | dout(20) << "queue_write_fin seq " << seq << " callback " << fin << dendl; | |
910 | } | |
911 | } | |
912 | */ | |
913 | ||
914 | void FileJournal::queue_completions_thru(uint64_t seq) | |
915 | { | |
9f95a23c | 916 | ceph_assert(ceph_mutex_is_locked(finisher_lock)); |
7c673cae FG |
917 | utime_t now = ceph_clock_now(); |
918 | list<completion_item> items; | |
919 | batch_pop_completions(items); | |
920 | list<completion_item>::iterator it = items.begin(); | |
921 | while (it != items.end()) { | |
922 | completion_item& next = *it; | |
923 | if (next.seq > seq) | |
924 | break; | |
925 | utime_t lat = now; | |
926 | lat -= next.start; | |
927 | dout(10) << "queue_completions_thru seq " << seq | |
928 | << " queueing seq " << next.seq | |
929 | << " " << next.finish | |
930 | << " lat " << lat << dendl; | |
931 | if (logger) { | |
932 | logger->tinc(l_filestore_journal_latency, lat); | |
933 | } | |
934 | if (next.finish) | |
935 | finisher->queue(next.finish); | |
936 | if (next.tracked_op) { | |
937 | next.tracked_op->mark_event("journaled_completion_queued"); | |
938 | next.tracked_op->journal_trace.event("queued completion"); | |
939 | next.tracked_op->journal_trace.keyval("completed through", seq); | |
940 | } | |
941 | items.erase(it++); | |
942 | } | |
943 | batch_unpop_completions(items); | |
9f95a23c | 944 | finisher_cond.notify_all(); |
7c673cae FG |
945 | } |
946 | ||
947 | ||
948 | int FileJournal::prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes) | |
949 | { | |
950 | uint64_t seq = next_write.seq; | |
951 | bufferlist &ebl = next_write.bl; | |
952 | off64_t size = ebl.length(); | |
953 | ||
954 | int r = check_for_full(seq, queue_pos, size); | |
955 | if (r < 0) | |
956 | return r; // ENOSPC or EAGAIN | |
957 | ||
958 | uint32_t orig_len = next_write.orig_len; | |
959 | orig_bytes += orig_len; | |
960 | orig_ops++; | |
961 | ||
962 | // add to write buffer | |
963 | dout(15) << "prepare_single_write " << orig_ops << " will write " << queue_pos << " : seq " << seq | |
964 | << " len " << orig_len << " -> " << size << dendl; | |
965 | ||
966 | unsigned seq_offset = offsetof(entry_header_t, seq); | |
967 | unsigned magic1_offset = offsetof(entry_header_t, magic1); | |
968 | unsigned magic2_offset = offsetof(entry_header_t, magic2); | |
969 | ||
970 | bufferptr headerptr = ebl.buffers().front(); | |
971 | uint64_t _seq = seq; | |
972 | uint64_t _queue_pos = queue_pos; | |
973 | uint64_t magic2 = entry_header_t::make_magic(seq, orig_len, header.get_fsid64()); | |
974 | headerptr.copy_in(seq_offset, sizeof(uint64_t), (char *)&_seq); | |
975 | headerptr.copy_in(magic1_offset, sizeof(uint64_t), (char *)&_queue_pos); | |
976 | headerptr.copy_in(magic2_offset, sizeof(uint64_t), (char *)&magic2); | |
977 | ||
978 | bufferptr footerptr = ebl.buffers().back(); | |
979 | unsigned post_offset = footerptr.length() - sizeof(entry_header_t); | |
980 | footerptr.copy_in(post_offset + seq_offset, sizeof(uint64_t), (char *)&_seq); | |
981 | footerptr.copy_in(post_offset + magic1_offset, sizeof(uint64_t), (char *)&_queue_pos); | |
982 | footerptr.copy_in(post_offset + magic2_offset, sizeof(uint64_t), (char *)&magic2); | |
983 | ||
984 | bl.claim_append(ebl); | |
985 | if (next_write.tracked_op) { | |
986 | next_write.tracked_op->mark_event("write_thread_in_journal_buffer"); | |
987 | next_write.tracked_op->journal_trace.event("prepare_single_write"); | |
988 | } | |
989 | ||
990 | journalq.push_back(pair<uint64_t,off64_t>(seq, queue_pos)); | |
991 | writing_seq = seq; | |
992 | ||
993 | queue_pos += size; | |
994 | if (queue_pos >= header.max_size) | |
995 | queue_pos = queue_pos + get_top() - header.max_size; | |
996 | ||
997 | return 0; | |
998 | } | |
999 | ||
1000 | void FileJournal::check_align(off64_t pos, bufferlist& bl) | |
1001 | { | |
1002 | // make sure list segments are page aligned | |
1003 | if (directio && !bl.is_aligned_size_and_memory(block_size, CEPH_DIRECTIO_ALIGNMENT)) { | |
11fdf7f2 TL |
1004 | ceph_assert((bl.length() & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0); |
1005 | ceph_assert((pos & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0); | |
1006 | ceph_abort_msg("bl was not aligned"); | |
7c673cae FG |
1007 | } |
1008 | } | |
1009 | ||
1010 | int FileJournal::write_bl(off64_t& pos, bufferlist& bl) | |
1011 | { | |
1012 | int ret; | |
1013 | ||
1014 | off64_t spos = ::lseek64(fd, pos, SEEK_SET); | |
1015 | if (spos < 0) { | |
1016 | ret = -errno; | |
1017 | derr << "FileJournal::write_bl : lseek64 failed " << cpp_strerror(ret) << dendl; | |
1018 | return ret; | |
1019 | } | |
1020 | ret = bl.write_fd(fd); | |
1021 | if (ret) { | |
1022 | derr << "FileJournal::write_bl : write_fd failed: " << cpp_strerror(ret) << dendl; | |
1023 | return ret; | |
1024 | } | |
1025 | pos += bl.length(); | |
1026 | if (pos == header.max_size) | |
1027 | pos = get_top(); | |
1028 | return 0; | |
1029 | } | |
1030 | ||
1031 | void FileJournal::do_write(bufferlist& bl) | |
1032 | { | |
1033 | // nothing to do? | |
1034 | if (bl.length() == 0 && !must_write_header) | |
1035 | return; | |
1036 | ||
f67539c2 | 1037 | ceph::buffer::ptr hbp; |
7c673cae FG |
1038 | if (cct->_conf->journal_write_header_frequency && |
1039 | (((++journaled_since_start) % | |
1040 | cct->_conf->journal_write_header_frequency) == 0)) { | |
1041 | must_write_header = true; | |
1042 | } | |
1043 | ||
1044 | if (must_write_header) { | |
1045 | must_write_header = false; | |
1046 | hbp = prepare_header(); | |
1047 | } | |
1048 | ||
1049 | dout(15) << "do_write writing " << write_pos << "~" << bl.length() | |
1050 | << (hbp.length() ? " + header":"") | |
1051 | << dendl; | |
1052 | ||
1053 | utime_t from = ceph_clock_now(); | |
1054 | ||
1055 | // entry | |
1056 | off64_t pos = write_pos; | |
1057 | ||
1058 | // Adjust write_pos | |
1059 | write_pos += bl.length(); | |
1060 | if (write_pos >= header.max_size) | |
1061 | write_pos = write_pos - header.max_size + get_top(); | |
1062 | ||
9f95a23c | 1063 | write_lock.unlock(); |
7c673cae FG |
1064 | |
1065 | // split? | |
1066 | off64_t split = 0; | |
1067 | if (pos + bl.length() > header.max_size) { | |
1068 | bufferlist first, second; | |
1069 | split = header.max_size - pos; | |
1070 | first.substr_of(bl, 0, split); | |
1071 | second.substr_of(bl, split, bl.length() - split); | |
11fdf7f2 | 1072 | ceph_assert(first.length() + second.length() == bl.length()); |
7c673cae FG |
1073 | dout(10) << "do_write wrapping, first bit at " << pos << " len " << first.length() |
1074 | << " second bit len " << second.length() << " (orig len " << bl.length() << ")" << dendl; | |
1075 | ||
1076 | //Save pos to write first piece second | |
1077 | off64_t first_pos = pos; | |
1078 | off64_t orig_pos; | |
1079 | pos = get_top(); | |
1080 | // header too? | |
1081 | if (hbp.length()) { | |
1082 | // be sneaky: include the header in the second fragment | |
11fdf7f2 TL |
1083 | bufferlist tmp; |
1084 | tmp.push_back(hbp); | |
1085 | tmp.claim_append(second); | |
1086 | second.swap(tmp); | |
7c673cae FG |
1087 | pos = 0; // we included the header |
1088 | } | |
1089 | // Write the second portion first possible with the header, so | |
1090 | // do_read_entry() won't even get a valid entry_header_t if there | |
1091 | // is a crash between the two writes. | |
1092 | orig_pos = pos; | |
1093 | if (write_bl(pos, second)) { | |
1094 | derr << "FileJournal::do_write: write_bl(pos=" << orig_pos | |
1095 | << ") failed" << dendl; | |
1096 | check_align(pos, second); | |
1097 | ceph_abort(); | |
1098 | } | |
1099 | orig_pos = first_pos; | |
1100 | if (write_bl(first_pos, first)) { | |
1101 | derr << "FileJournal::do_write: write_bl(pos=" << orig_pos | |
1102 | << ") failed" << dendl; | |
1103 | check_align(first_pos, first); | |
1104 | ceph_abort(); | |
1105 | } | |
11fdf7f2 | 1106 | ceph_assert(first_pos == get_top()); |
7c673cae FG |
1107 | } else { |
1108 | // header too? | |
1109 | if (hbp.length()) { | |
1110 | if (TEMP_FAILURE_RETRY(::pwrite(fd, hbp.c_str(), hbp.length(), 0)) < 0) { | |
1111 | int err = errno; | |
1112 | derr << "FileJournal::do_write: pwrite(fd=" << fd | |
1113 | << ", hbp.length=" << hbp.length() << ") failed :" | |
1114 | << cpp_strerror(err) << dendl; | |
1115 | ceph_abort(); | |
1116 | } | |
1117 | } | |
1118 | ||
1119 | if (write_bl(pos, bl)) { | |
1120 | derr << "FileJournal::do_write: write_bl(pos=" << pos | |
1121 | << ") failed" << dendl; | |
1122 | check_align(pos, bl); | |
1123 | ceph_abort(); | |
1124 | } | |
1125 | } | |
1126 | ||
1127 | if (!directio) { | |
1128 | dout(20) << "do_write fsync" << dendl; | |
1129 | ||
1130 | /* | |
1131 | * We'd really love to have a fsync_range or fdatasync_range and do a: | |
1132 | * | |
1133 | * if (split) { | |
1134 | * ::fsync_range(fd, header.max_size - split, split)l | |
1135 | * ::fsync_range(fd, get_top(), bl.length() - split); | |
1136 | * else | |
1137 | * ::fsync_range(fd, write_pos, bl.length()) | |
1138 | * | |
1139 | * NetBSD and AIX apparently have it, and adding it to Linux wouldn't be | |
1140 | * too hard given all the underlying infrastructure already exist. | |
1141 | * | |
1142 | * NOTE: using sync_file_range here would not be safe as it does not | |
1143 | * flush disk caches or commits any sort of metadata. | |
1144 | */ | |
1145 | int ret = 0; | |
11fdf7f2 | 1146 | #if defined(__APPLE__) || defined(__FreeBSD__) |
7c673cae FG |
1147 | ret = ::fsync(fd); |
1148 | #else | |
1149 | ret = ::fdatasync(fd); | |
1150 | #endif | |
1151 | if (ret < 0) { | |
1152 | derr << __func__ << " fsync/fdatasync failed: " << cpp_strerror(errno) << dendl; | |
1153 | ceph_abort(); | |
1154 | } | |
1155 | #ifdef HAVE_POSIX_FADVISE | |
1156 | if (cct->_conf->filestore_fadvise) | |
1157 | posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED); | |
1158 | #endif | |
1159 | } | |
1160 | ||
1161 | utime_t lat = ceph_clock_now() - from; | |
1162 | dout(20) << "do_write latency " << lat << dendl; | |
1163 | ||
9f95a23c | 1164 | write_lock.lock(); |
7c673cae | 1165 | |
11fdf7f2 TL |
1166 | ceph_assert(write_pos == pos); |
1167 | ceph_assert(write_pos % header.alignment == 0); | |
7c673cae FG |
1168 | |
1169 | { | |
9f95a23c | 1170 | std::lock_guard locker{finisher_lock}; |
7c673cae FG |
1171 | journaled_seq = writing_seq; |
1172 | ||
1173 | // kick finisher? | |
1174 | // only if we haven't filled up recently! | |
1175 | if (full_state != FULL_NOTFULL) { | |
1176 | dout(10) << "do_write NOT queueing finisher seq " << journaled_seq | |
1177 | << ", full_commit_seq|full_restart_seq" << dendl; | |
1178 | } else { | |
1179 | if (plug_journal_completions) { | |
1180 | dout(20) << "do_write NOT queueing finishers through seq " << journaled_seq | |
1181 | << " due to completion plug" << dendl; | |
1182 | } else { | |
1183 | dout(20) << "do_write queueing finishers through seq " << journaled_seq << dendl; | |
1184 | queue_completions_thru(journaled_seq); | |
1185 | } | |
1186 | } | |
1187 | } | |
1188 | } | |
1189 | ||
1190 | void FileJournal::flush() | |
1191 | { | |
1192 | dout(10) << "waiting for completions to empty" << dendl; | |
1193 | { | |
9f95a23c TL |
1194 | std::unique_lock l{finisher_lock}; |
1195 | finisher_cond.wait(l, [this] { return completions_empty(); }); | |
7c673cae FG |
1196 | } |
1197 | dout(10) << "flush waiting for finisher" << dendl; | |
1198 | finisher->wait_for_empty(); | |
1199 | dout(10) << "flush done" << dendl; | |
1200 | } | |
1201 | ||
1202 | ||
1203 | void FileJournal::write_thread_entry() | |
1204 | { | |
1205 | dout(10) << "write_thread_entry start" << dendl; | |
1206 | while (1) { | |
1207 | { | |
9f95a23c | 1208 | std::unique_lock locker{writeq_lock}; |
7c673cae FG |
1209 | if (writeq.empty() && !must_write_header) { |
1210 | if (write_stop) | |
1211 | break; | |
1212 | dout(20) << "write_thread_entry going to sleep" << dendl; | |
9f95a23c | 1213 | writeq_cond.wait(locker); |
7c673cae FG |
1214 | dout(20) << "write_thread_entry woke up" << dendl; |
1215 | continue; | |
1216 | } | |
1217 | } | |
1218 | ||
1219 | #ifdef HAVE_LIBAIO | |
1220 | if (aio) { | |
9f95a23c | 1221 | std::unique_lock locker{aio_lock}; |
7c673cae FG |
1222 | // should we back off to limit aios in flight? try to do this |
1223 | // adaptively so that we submit larger aios once we have lots of | |
1224 | // them in flight. | |
1225 | // | |
1226 | // NOTE: our condition here is based on aio_num (protected by | |
1227 | // aio_lock) and throttle_bytes (part of the write queue). when | |
1228 | // we sleep, we *only* wait for aio_num to change, and do not | |
1229 | // wake when more data is queued. this is not strictly correct, | |
1230 | // but should be fine given that we will have plenty of aios in | |
1231 | // flight if we hit this limit to ensure we keep the device | |
1232 | // saturated. | |
1233 | while (aio_num > 0) { | |
11fdf7f2 | 1234 | int exp = std::min<int>(aio_num * 2, 24); |
7c673cae FG |
1235 | long unsigned min_new = 1ull << exp; |
1236 | uint64_t cur = aio_write_queue_bytes; | |
1237 | dout(20) << "write_thread_entry aio throttle: aio num " << aio_num << " bytes " << aio_bytes | |
1238 | << " ... exp " << exp << " min_new " << min_new | |
1239 | << " ... pending " << cur << dendl; | |
1240 | if (cur >= min_new) | |
1241 | break; | |
1242 | dout(20) << "write_thread_entry deferring until more aios complete: " | |
1243 | << aio_num << " aios with " << aio_bytes << " bytes needs " << min_new | |
1244 | << " bytes to start a new aio (currently " << cur << " pending)" << dendl; | |
9f95a23c | 1245 | aio_cond.wait(locker); |
7c673cae FG |
1246 | dout(20) << "write_thread_entry woke up" << dendl; |
1247 | } | |
1248 | } | |
1249 | #endif | |
1250 | ||
9f95a23c | 1251 | std::unique_lock locker{write_lock}; |
7c673cae FG |
1252 | uint64_t orig_ops = 0; |
1253 | uint64_t orig_bytes = 0; | |
1254 | ||
1255 | bufferlist bl; | |
1256 | int r = prepare_multi_write(bl, orig_ops, orig_bytes); | |
1257 | // Don't care about journal full if stoppping, so drop queue and | |
1258 | // possibly let header get written and loop above to notice stop | |
1259 | if (r == -ENOSPC) { | |
1260 | if (write_stop) { | |
1261 | dout(20) << "write_thread_entry full and stopping, throw out queue and finish up" << dendl; | |
1262 | while (!writeq_empty()) { | |
1263 | complete_write(1, peek_write().orig_len); | |
1264 | pop_write(); | |
1265 | } | |
1266 | print_header(header); | |
1267 | r = 0; | |
1268 | } else { | |
1269 | dout(20) << "write_thread_entry full, going to sleep (waiting for commit)" << dendl; | |
9f95a23c | 1270 | commit_cond.wait(locker); |
7c673cae FG |
1271 | dout(20) << "write_thread_entry woke up" << dendl; |
1272 | continue; | |
1273 | } | |
1274 | } | |
11fdf7f2 | 1275 | ceph_assert(r == 0); |
7c673cae FG |
1276 | |
1277 | if (logger) { | |
1278 | logger->inc(l_filestore_journal_wr); | |
1279 | logger->inc(l_filestore_journal_wr_bytes, bl.length()); | |
1280 | } | |
1281 | ||
1282 | #ifdef HAVE_LIBAIO | |
1283 | if (aio) | |
1284 | do_aio_write(bl); | |
1285 | else | |
1286 | do_write(bl); | |
1287 | #else | |
1288 | do_write(bl); | |
1289 | #endif | |
1290 | complete_write(orig_ops, orig_bytes); | |
1291 | } | |
1292 | ||
1293 | dout(10) << "write_thread_entry finish" << dendl; | |
1294 | } | |
1295 | ||
1296 | #ifdef HAVE_LIBAIO | |
1297 | void FileJournal::do_aio_write(bufferlist& bl) | |
1298 | { | |
1299 | ||
1300 | if (cct->_conf->journal_write_header_frequency && | |
1301 | (((++journaled_since_start) % | |
1302 | cct->_conf->journal_write_header_frequency) == 0)) { | |
1303 | must_write_header = true; | |
1304 | } | |
1305 | ||
1306 | // nothing to do? | |
1307 | if (bl.length() == 0 && !must_write_header) | |
1308 | return; | |
1309 | ||
f67539c2 | 1310 | ceph::buffer::ptr hbp; |
7c673cae FG |
1311 | if (must_write_header) { |
1312 | must_write_header = false; | |
1313 | hbp = prepare_header(); | |
1314 | } | |
1315 | ||
1316 | // entry | |
1317 | off64_t pos = write_pos; | |
1318 | ||
1319 | dout(15) << "do_aio_write writing " << pos << "~" << bl.length() | |
1320 | << (hbp.length() ? " + header":"") | |
1321 | << dendl; | |
1322 | ||
1323 | // split? | |
1324 | off64_t split = 0; | |
1325 | if (pos + bl.length() > header.max_size) { | |
1326 | bufferlist first, second; | |
1327 | split = header.max_size - pos; | |
1328 | first.substr_of(bl, 0, split); | |
1329 | second.substr_of(bl, split, bl.length() - split); | |
11fdf7f2 | 1330 | ceph_assert(first.length() + second.length() == bl.length()); |
7c673cae FG |
1331 | dout(10) << "do_aio_write wrapping, first bit at " << pos << "~" << first.length() << dendl; |
1332 | ||
1333 | if (write_aio_bl(pos, first, 0)) { | |
1334 | derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos | |
1335 | << ") failed" << dendl; | |
1336 | ceph_abort(); | |
1337 | } | |
11fdf7f2 | 1338 | ceph_assert(pos == header.max_size); |
7c673cae FG |
1339 | if (hbp.length()) { |
1340 | // be sneaky: include the header in the second fragment | |
11fdf7f2 TL |
1341 | bufferlist tmp; |
1342 | tmp.push_back(hbp); | |
1343 | tmp.claim_append(second); | |
1344 | second.swap(tmp); | |
7c673cae FG |
1345 | pos = 0; // we included the header |
1346 | } else | |
1347 | pos = get_top(); // no header, start after that | |
1348 | if (write_aio_bl(pos, second, writing_seq)) { | |
1349 | derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos | |
1350 | << ") failed" << dendl; | |
1351 | ceph_abort(); | |
1352 | } | |
1353 | } else { | |
1354 | // header too? | |
1355 | if (hbp.length()) { | |
1356 | bufferlist hbl; | |
1357 | hbl.push_back(hbp); | |
1358 | loff_t pos = 0; | |
1359 | if (write_aio_bl(pos, hbl, 0)) { | |
1360 | derr << "FileJournal::do_aio_write: write_aio_bl(header) failed" << dendl; | |
1361 | ceph_abort(); | |
1362 | } | |
1363 | } | |
1364 | ||
1365 | if (write_aio_bl(pos, bl, writing_seq)) { | |
1366 | derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos | |
1367 | << ") failed" << dendl; | |
1368 | ceph_abort(); | |
1369 | } | |
1370 | } | |
1371 | ||
1372 | write_pos = pos; | |
1373 | if (write_pos == header.max_size) | |
1374 | write_pos = get_top(); | |
11fdf7f2 | 1375 | ceph_assert(write_pos % header.alignment == 0); |
7c673cae FG |
1376 | } |
1377 | ||
1378 | /** | |
1379 | * write a buffer using aio | |
1380 | * | |
1381 | * @param seq seq to trigger when this aio completes. if 0, do not update any state | |
1382 | * on completion. | |
1383 | */ | |
1384 | int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq) | |
1385 | { | |
1386 | dout(20) << "write_aio_bl " << pos << "~" << bl.length() << " seq " << seq << dendl; | |
1387 | ||
1388 | while (bl.length() > 0) { | |
11fdf7f2 | 1389 | int max = std::min<int>(bl.get_num_buffers(), IOV_MAX-1); |
7c673cae FG |
1390 | iovec *iov = new iovec[max]; |
1391 | int n = 0; | |
1392 | unsigned len = 0; | |
11fdf7f2 TL |
1393 | for (auto p = std::cbegin(bl.buffers()); n < max; ++p, ++n) { |
1394 | ceph_assert(p != std::cend(bl.buffers())); | |
1395 | iov[n].iov_base = const_cast<void*>(static_cast<const void*>(p->c_str())); | |
7c673cae FG |
1396 | iov[n].iov_len = p->length(); |
1397 | len += p->length(); | |
1398 | } | |
1399 | ||
1400 | bufferlist tbl; | |
1401 | bl.splice(0, len, &tbl); // move bytes from bl -> tbl | |
1402 | ||
1403 | // lock only aio_queue, current aio, aio_num, aio_bytes, which may be | |
1404 | // modified in check_aio_completion | |
9f95a23c | 1405 | aio_lock.lock(); |
7c673cae FG |
1406 | aio_queue.push_back(aio_info(tbl, pos, bl.length() > 0 ? 0 : seq)); |
1407 | aio_info& aio = aio_queue.back(); | |
1408 | aio.iov = iov; | |
1409 | ||
1410 | io_prep_pwritev(&aio.iocb, fd, aio.iov, n, pos); | |
1411 | ||
1412 | dout(20) << "write_aio_bl .. " << aio.off << "~" << aio.len | |
1413 | << " in " << n << dendl; | |
1414 | ||
1415 | aio_num++; | |
1416 | aio_bytes += aio.len; | |
1417 | ||
1418 | // need to save current aio len to update write_pos later because current | |
1419 | // aio could be ereased from aio_queue once it is done | |
1420 | uint64_t cur_len = aio.len; | |
1421 | // unlock aio_lock because following io_submit might take time to return | |
9f95a23c | 1422 | aio_lock.unlock(); |
7c673cae FG |
1423 | |
1424 | iocb *piocb = &aio.iocb; | |
224ce89b | 1425 | |
7c673cae FG |
1426 | // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds |
1427 | int attempts = 16; | |
1428 | int delay = 125; | |
1429 | do { | |
1430 | int r = io_submit(aio_ctx, 1, &piocb); | |
1431 | dout(20) << "write_aio_bl io_submit return value: " << r << dendl; | |
1432 | if (r < 0) { | |
1433 | derr << "io_submit to " << aio.off << "~" << cur_len | |
1434 | << " got " << cpp_strerror(r) << dendl; | |
1435 | if (r == -EAGAIN && attempts-- > 0) { | |
1436 | usleep(delay); | |
1437 | delay *= 2; | |
1438 | continue; | |
1439 | } | |
1440 | check_align(pos, tbl); | |
11fdf7f2 | 1441 | ceph_abort_msg("io_submit got unexpected error"); |
7c673cae FG |
1442 | } else { |
1443 | break; | |
1444 | } | |
1445 | } while (true); | |
1446 | pos += cur_len; | |
1447 | } | |
9f95a23c TL |
1448 | aio_lock.lock(); |
1449 | write_finish_cond.notify_all(); | |
1450 | aio_lock.unlock(); | |
7c673cae FG |
1451 | return 0; |
1452 | } | |
1453 | #endif | |
1454 | ||
1455 | void FileJournal::write_finish_thread_entry() | |
1456 | { | |
1457 | #ifdef HAVE_LIBAIO | |
b32b8144 | 1458 | dout(10) << __func__ << " enter" << dendl; |
7c673cae FG |
1459 | while (true) { |
1460 | { | |
9f95a23c | 1461 | std::unique_lock locker{aio_lock}; |
7c673cae FG |
1462 | if (aio_queue.empty()) { |
1463 | if (aio_stop) | |
1464 | break; | |
b32b8144 | 1465 | dout(20) << __func__ << " sleeping" << dendl; |
9f95a23c | 1466 | write_finish_cond.wait(locker); |
7c673cae FG |
1467 | continue; |
1468 | } | |
1469 | } | |
1470 | ||
b32b8144 | 1471 | dout(20) << __func__ << " waiting for aio(s)" << dendl; |
7c673cae FG |
1472 | io_event event[16]; |
1473 | int r = io_getevents(aio_ctx, 1, 16, event, NULL); | |
1474 | if (r < 0) { | |
1475 | if (r == -EINTR) { | |
1476 | dout(0) << "io_getevents got " << cpp_strerror(r) << dendl; | |
1477 | continue; | |
1478 | } | |
1479 | derr << "io_getevents got " << cpp_strerror(r) << dendl; | |
11fdf7f2 TL |
1480 | if (r == -EIO) { |
1481 | note_io_error_event(devname.c_str(), fn.c_str(), -EIO, 0, 0, 0); | |
1482 | } | |
1483 | ceph_abort_msg("got unexpected error from io_getevents"); | |
7c673cae FG |
1484 | } |
1485 | ||
1486 | { | |
9f95a23c | 1487 | std::lock_guard locker{aio_lock}; |
7c673cae FG |
1488 | for (int i=0; i<r; i++) { |
1489 | aio_info *ai = (aio_info *)event[i].obj; | |
1490 | if (event[i].res != ai->len) { | |
1491 | derr << "aio to " << ai->off << "~" << ai->len | |
1492 | << " returned: " << (int)event[i].res << dendl; | |
11fdf7f2 | 1493 | ceph_abort_msg("unexpected aio error"); |
7c673cae | 1494 | } |
b32b8144 | 1495 | dout(10) << __func__ << " aio " << ai->off |
7c673cae FG |
1496 | << "~" << ai->len << " done" << dendl; |
1497 | ai->done = true; | |
1498 | } | |
1499 | check_aio_completion(); | |
1500 | } | |
1501 | } | |
b32b8144 | 1502 | dout(10) << __func__ << " exit" << dendl; |
7c673cae FG |
1503 | #endif |
1504 | } | |
1505 | ||
1506 | #ifdef HAVE_LIBAIO | |
1507 | /** | |
1508 | * check aio_wait for completed aio, and update state appropriately. | |
1509 | */ | |
1510 | void FileJournal::check_aio_completion() | |
1511 | { | |
9f95a23c | 1512 | ceph_assert(ceph_mutex_is_locked(aio_lock)); |
7c673cae FG |
1513 | dout(20) << "check_aio_completion" << dendl; |
1514 | ||
1515 | bool completed_something = false, signal = false; | |
1516 | uint64_t new_journaled_seq = 0; | |
1517 | ||
1518 | list<aio_info>::iterator p = aio_queue.begin(); | |
1519 | while (p != aio_queue.end() && p->done) { | |
1520 | dout(20) << "check_aio_completion completed seq " << p->seq << " " | |
1521 | << p->off << "~" << p->len << dendl; | |
1522 | if (p->seq) { | |
1523 | new_journaled_seq = p->seq; | |
1524 | completed_something = true; | |
1525 | } | |
1526 | aio_num--; | |
1527 | aio_bytes -= p->len; | |
1528 | aio_queue.erase(p++); | |
1529 | signal = true; | |
1530 | } | |
1531 | ||
1532 | if (completed_something) { | |
1533 | // kick finisher? | |
1534 | // only if we haven't filled up recently! | |
9f95a23c | 1535 | std::lock_guard locker{finisher_lock}; |
7c673cae FG |
1536 | journaled_seq = new_journaled_seq; |
1537 | if (full_state != FULL_NOTFULL) { | |
1538 | dout(10) << "check_aio_completion NOT queueing finisher seq " << journaled_seq | |
1539 | << ", full_commit_seq|full_restart_seq" << dendl; | |
1540 | } else { | |
1541 | if (plug_journal_completions) { | |
1542 | dout(20) << "check_aio_completion NOT queueing finishers through seq " << journaled_seq | |
1543 | << " due to completion plug" << dendl; | |
1544 | } else { | |
1545 | dout(20) << "check_aio_completion queueing finishers through seq " << journaled_seq << dendl; | |
1546 | queue_completions_thru(journaled_seq); | |
1547 | } | |
1548 | } | |
1549 | } | |
1550 | if (signal) { | |
1551 | // maybe write queue was waiting for aio count to drop? | |
9f95a23c | 1552 | aio_cond.notify_all(); |
7c673cae FG |
1553 | } |
1554 | } | |
1555 | #endif | |
1556 | ||
1557 | int FileJournal::prepare_entry(vector<ObjectStore::Transaction>& tls, bufferlist* tbl) { | |
1558 | dout(10) << "prepare_entry " << tls << dendl; | |
1559 | int data_len = cct->_conf->journal_align_min_size - 1; | |
1560 | int data_align = -1; // -1 indicates that we don't care about the alignment | |
1561 | bufferlist bl; | |
1562 | for (vector<ObjectStore::Transaction>::iterator p = tls.begin(); | |
1563 | p != tls.end(); ++p) { | |
1564 | if ((int)(*p).get_data_length() > data_len) { | |
1565 | data_len = (*p).get_data_length(); | |
1566 | data_align = ((*p).get_data_alignment() - bl.length()) & ~CEPH_PAGE_MASK; | |
1567 | } | |
11fdf7f2 | 1568 | encode(*p, bl); |
7c673cae FG |
1569 | } |
1570 | if (tbl->length()) { | |
1571 | bl.claim_append(*tbl); | |
1572 | } | |
1573 | // add it this entry | |
1574 | entry_header_t h; | |
1575 | unsigned head_size = sizeof(entry_header_t); | |
1576 | off64_t base_size = 2*head_size + bl.length(); | |
1577 | memset(&h, 0, sizeof(h)); | |
1578 | if (data_align >= 0) | |
1579 | h.pre_pad = ((unsigned int)data_align - (unsigned int)head_size) & ~CEPH_PAGE_MASK; | |
11fdf7f2 | 1580 | off64_t size = round_up_to(base_size + h.pre_pad, header.alignment); |
7c673cae FG |
1581 | unsigned post_pad = size - base_size - h.pre_pad; |
1582 | h.len = bl.length(); | |
1583 | h.post_pad = post_pad; | |
1584 | h.crc32c = bl.crc32c(0); | |
1585 | dout(10) << " len " << bl.length() << " -> " << size | |
1586 | << " (head " << head_size << " pre_pad " << h.pre_pad | |
1587 | << " bl " << bl.length() << " post_pad " << post_pad << " tail " << head_size << ")" | |
1588 | << " (bl alignment " << data_align << ")" | |
1589 | << dendl; | |
1590 | bufferlist ebl; | |
1591 | // header | |
1592 | ebl.append((const char*)&h, sizeof(h)); | |
1593 | if (h.pre_pad) { | |
f67539c2 | 1594 | ebl.push_back(ceph::buffer::create_static(h.pre_pad, zero_buf)); |
7c673cae FG |
1595 | } |
1596 | // payload | |
9f95a23c | 1597 | ebl.claim_append(bl); |
7c673cae | 1598 | if (h.post_pad) { |
f67539c2 | 1599 | ebl.push_back(ceph::buffer::create_static(h.post_pad, zero_buf)); |
7c673cae FG |
1600 | } |
1601 | // footer | |
1602 | ebl.append((const char*)&h, sizeof(h)); | |
1603 | if (directio) | |
1604 | ebl.rebuild_aligned(CEPH_DIRECTIO_ALIGNMENT); | |
f67539c2 | 1605 | *tbl = std::move(ebl); |
7c673cae FG |
1606 | return h.len; |
1607 | } | |
1608 | ||
1609 | void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len, | |
1610 | Context *oncommit, TrackedOpRef osd_op) | |
1611 | { | |
1612 | // dump on queue | |
1613 | dout(5) << "submit_entry seq " << seq | |
1614 | << " len " << e.length() | |
1615 | << " (" << oncommit << ")" << dendl; | |
11fdf7f2 TL |
1616 | ceph_assert(e.length() > 0); |
1617 | ceph_assert(e.length() < header.max_size); | |
7c673cae | 1618 | |
7c673cae FG |
1619 | if (logger) { |
1620 | logger->inc(l_filestore_journal_queue_bytes, orig_len); | |
1621 | logger->inc(l_filestore_journal_queue_ops, 1); | |
1622 | } | |
1623 | ||
1624 | throttle.register_throttle_seq(seq, e.length()); | |
1625 | if (logger) { | |
1626 | logger->inc(l_filestore_journal_ops, 1); | |
1627 | logger->inc(l_filestore_journal_bytes, e.length()); | |
1628 | } | |
1629 | ||
1630 | if (osd_op) { | |
1631 | osd_op->mark_event("commit_queued_for_journal_write"); | |
1632 | if (osd_op->store_trace) { | |
1633 | osd_op->journal_trace.init("journal", &trace_endpoint, &osd_op->store_trace); | |
1634 | osd_op->journal_trace.event("submit_entry"); | |
1635 | osd_op->journal_trace.keyval("seq", seq); | |
1636 | } | |
1637 | } | |
1638 | { | |
9f95a23c | 1639 | std::lock_guard l1{writeq_lock}; |
7c673cae | 1640 | #ifdef HAVE_LIBAIO |
9f95a23c | 1641 | std::lock_guard l2{aio_lock}; |
7c673cae | 1642 | #endif |
9f95a23c | 1643 | std::lock_guard l3{completions_lock}; |
7c673cae FG |
1644 | |
1645 | #ifdef HAVE_LIBAIO | |
1646 | aio_write_queue_ops++; | |
1647 | aio_write_queue_bytes += e.length(); | |
9f95a23c | 1648 | aio_cond.notify_all(); |
7c673cae FG |
1649 | #endif |
1650 | ||
1651 | completions.push_back( | |
1652 | completion_item( | |
1653 | seq, oncommit, ceph_clock_now(), osd_op)); | |
1654 | if (writeq.empty()) | |
9f95a23c | 1655 | writeq_cond.notify_all(); |
7c673cae FG |
1656 | writeq.push_back(write_item(seq, e, orig_len, osd_op)); |
1657 | if (osd_op) | |
1658 | osd_op->journal_trace.keyval("queue depth", writeq.size()); | |
1659 | } | |
1660 | } | |
1661 | ||
1662 | bool FileJournal::writeq_empty() | |
1663 | { | |
9f95a23c | 1664 | std::lock_guard locker{writeq_lock}; |
7c673cae FG |
1665 | return writeq.empty(); |
1666 | } | |
1667 | ||
1668 | FileJournal::write_item &FileJournal::peek_write() | |
1669 | { | |
9f95a23c TL |
1670 | ceph_assert(ceph_mutex_is_locked(write_lock)); |
1671 | std::lock_guard locker{writeq_lock}; | |
7c673cae FG |
1672 | return writeq.front(); |
1673 | } | |
1674 | ||
1675 | void FileJournal::pop_write() | |
1676 | { | |
9f95a23c TL |
1677 | ceph_assert(ceph_mutex_is_locked(write_lock)); |
1678 | std::lock_guard locker{writeq_lock}; | |
7c673cae FG |
1679 | if (logger) { |
1680 | logger->dec(l_filestore_journal_queue_bytes, writeq.front().orig_len); | |
1681 | logger->dec(l_filestore_journal_queue_ops, 1); | |
1682 | } | |
1683 | writeq.pop_front(); | |
1684 | } | |
1685 | ||
1686 | void FileJournal::batch_pop_write(list<write_item> &items) | |
1687 | { | |
9f95a23c | 1688 | ceph_assert(ceph_mutex_is_locked(write_lock)); |
7c673cae | 1689 | { |
9f95a23c | 1690 | std::lock_guard locker{writeq_lock}; |
7c673cae FG |
1691 | writeq.swap(items); |
1692 | } | |
1693 | for (auto &&i : items) { | |
1694 | if (logger) { | |
1695 | logger->dec(l_filestore_journal_queue_bytes, i.orig_len); | |
1696 | logger->dec(l_filestore_journal_queue_ops, 1); | |
1697 | } | |
1698 | } | |
1699 | } | |
1700 | ||
1701 | void FileJournal::batch_unpop_write(list<write_item> &items) | |
1702 | { | |
9f95a23c | 1703 | ceph_assert(ceph_mutex_is_locked(write_lock)); |
7c673cae FG |
1704 | for (auto &&i : items) { |
1705 | if (logger) { | |
1706 | logger->inc(l_filestore_journal_queue_bytes, i.orig_len); | |
1707 | logger->inc(l_filestore_journal_queue_ops, 1); | |
1708 | } | |
1709 | } | |
9f95a23c | 1710 | std::lock_guard locker{writeq_lock}; |
7c673cae FG |
1711 | writeq.splice(writeq.begin(), items); |
1712 | } | |
1713 | ||
1714 | void FileJournal::commit_start(uint64_t seq) | |
1715 | { | |
1716 | dout(10) << "commit_start" << dendl; | |
1717 | ||
1718 | // was full? | |
1719 | switch (full_state) { | |
1720 | case FULL_NOTFULL: | |
1721 | break; // all good | |
1722 | ||
1723 | case FULL_FULL: | |
1724 | if (seq >= journaled_seq) { | |
1725 | dout(1) << " FULL_FULL -> FULL_WAIT. commit_start on seq " | |
1726 | << seq << " > journaled_seq " << journaled_seq | |
1727 | << ", moving to FULL_WAIT." | |
1728 | << dendl; | |
1729 | full_state = FULL_WAIT; | |
1730 | } else { | |
1731 | dout(1) << "FULL_FULL commit_start on seq " | |
1732 | << seq << " < journaled_seq " << journaled_seq | |
1733 | << ", remaining in FULL_FULL" | |
1734 | << dendl; | |
1735 | } | |
1736 | break; | |
1737 | ||
1738 | case FULL_WAIT: | |
1739 | dout(1) << " FULL_WAIT -> FULL_NOTFULL. journal now active, setting completion plug." << dendl; | |
1740 | full_state = FULL_NOTFULL; | |
1741 | plug_journal_completions = true; | |
1742 | break; | |
1743 | } | |
1744 | } | |
1745 | ||
1746 | /* | |
1747 | *send discard command to joural block deivce | |
1748 | */ | |
1749 | void FileJournal::do_discard(int64_t offset, int64_t end) | |
1750 | { | |
11fdf7f2 | 1751 | dout(10) << __func__ << " trim(" << offset << ", " << end << dendl; |
7c673cae | 1752 | |
11fdf7f2 | 1753 | offset = round_up_to(offset, block_size); |
7c673cae FG |
1754 | if (offset >= end) |
1755 | return; | |
11fdf7f2 TL |
1756 | end = round_up_to(end - block_size, block_size); |
1757 | ceph_assert(end >= offset); | |
1758 | if (offset < end) { | |
1759 | BlkDev blkdev(fd); | |
1760 | if (blkdev.discard(offset, end - offset) < 0) { | |
7c673cae | 1761 | dout(1) << __func__ << "ioctl(BLKDISCARD) error:" << cpp_strerror(errno) << dendl; |
11fdf7f2 TL |
1762 | } |
1763 | } | |
7c673cae FG |
1764 | } |
1765 | ||
1766 | void FileJournal::committed_thru(uint64_t seq) | |
1767 | { | |
9f95a23c | 1768 | std::lock_guard locker{write_lock}; |
7c673cae FG |
1769 | |
1770 | auto released = throttle.flush(seq); | |
1771 | if (logger) { | |
1772 | logger->dec(l_filestore_journal_ops, released.first); | |
1773 | logger->dec(l_filestore_journal_bytes, released.second); | |
1774 | } | |
1775 | ||
1776 | if (seq < last_committed_seq) { | |
1777 | dout(5) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl; | |
11fdf7f2 | 1778 | ceph_assert(seq >= last_committed_seq); |
7c673cae FG |
1779 | return; |
1780 | } | |
1781 | if (seq == last_committed_seq) { | |
1782 | dout(5) << "committed_thru " << seq << " == last_committed_seq " << last_committed_seq << dendl; | |
1783 | return; | |
1784 | } | |
1785 | ||
1786 | dout(5) << "committed_thru " << seq << " (last_committed_seq " << last_committed_seq << ")" << dendl; | |
1787 | last_committed_seq = seq; | |
1788 | ||
1789 | // completions! | |
1790 | { | |
9f95a23c | 1791 | std::lock_guard locker{finisher_lock}; |
7c673cae FG |
1792 | queue_completions_thru(seq); |
1793 | if (plug_journal_completions && seq >= header.start_seq) { | |
1794 | dout(10) << " removing completion plug, queuing completions thru journaled_seq " << journaled_seq << dendl; | |
1795 | plug_journal_completions = false; | |
1796 | queue_completions_thru(journaled_seq); | |
1797 | } | |
1798 | } | |
1799 | ||
1800 | // adjust start pointer | |
1801 | while (!journalq.empty() && journalq.front().first <= seq) { | |
1802 | journalq.pop_front(); | |
1803 | } | |
1804 | ||
1805 | int64_t old_start = header.start; | |
1806 | if (!journalq.empty()) { | |
1807 | header.start = journalq.front().second; | |
1808 | header.start_seq = journalq.front().first; | |
1809 | } else { | |
1810 | header.start = write_pos; | |
1811 | header.start_seq = seq + 1; | |
1812 | } | |
1813 | ||
1814 | if (discard) { | |
1815 | dout(10) << __func__ << " will trim (" << old_start << ", " << header.start << ")" << dendl; | |
1816 | if (old_start < header.start) | |
1817 | do_discard(old_start, header.start - 1); | |
1818 | else { | |
1819 | do_discard(old_start, header.max_size - 1); | |
1820 | do_discard(get_top(), header.start - 1); | |
1821 | } | |
1822 | } | |
1823 | ||
1824 | must_write_header = true; | |
1825 | print_header(header); | |
1826 | ||
1827 | // committed but unjournaled items | |
1828 | while (!writeq_empty() && peek_write().seq <= seq) { | |
1829 | dout(15) << " dropping committed but unwritten seq " << peek_write().seq | |
1830 | << " len " << peek_write().bl.length() | |
1831 | << dendl; | |
1832 | complete_write(1, peek_write().orig_len); | |
1833 | pop_write(); | |
1834 | } | |
1835 | ||
9f95a23c | 1836 | commit_cond.notify_all(); |
7c673cae FG |
1837 | |
1838 | dout(10) << "committed_thru done" << dendl; | |
1839 | } | |
1840 | ||
1841 | ||
1842 | void FileJournal::complete_write(uint64_t ops, uint64_t bytes) | |
1843 | { | |
1844 | dout(5) << __func__ << " finished " << ops << " ops and " | |
1845 | << bytes << " bytes" << dendl; | |
1846 | } | |
1847 | ||
1848 | int FileJournal::make_writeable() | |
1849 | { | |
1850 | dout(10) << __func__ << dendl; | |
1851 | int r = set_throttle_params(); | |
1852 | if (r < 0) | |
1853 | return r; | |
1854 | ||
1855 | r = _open(true); | |
1856 | if (r < 0) | |
1857 | return r; | |
1858 | ||
1859 | if (read_pos > 0) | |
1860 | write_pos = read_pos; | |
1861 | else | |
1862 | write_pos = get_top(); | |
1863 | read_pos = 0; | |
1864 | ||
1865 | must_write_header = true; | |
1866 | ||
1867 | start_writer(); | |
1868 | return 0; | |
1869 | } | |
1870 | ||
1871 | int FileJournal::set_throttle_params() | |
1872 | { | |
1873 | stringstream ss; | |
1874 | bool valid = throttle.set_params( | |
1875 | cct->_conf->journal_throttle_low_threshhold, | |
1876 | cct->_conf->journal_throttle_high_threshhold, | |
1877 | cct->_conf->filestore_expected_throughput_bytes, | |
1878 | cct->_conf->journal_throttle_high_multiple, | |
1879 | cct->_conf->journal_throttle_max_multiple, | |
1880 | header.max_size - get_top(), | |
1881 | &ss); | |
1882 | ||
1883 | if (!valid) { | |
1884 | derr << "tried to set invalid params: " | |
1885 | << ss.str() | |
1886 | << dendl; | |
1887 | } | |
1888 | return valid ? 0 : -EINVAL; | |
1889 | } | |
1890 | ||
1891 | const char** FileJournal::get_tracked_conf_keys() const | |
1892 | { | |
1893 | static const char *KEYS[] = { | |
1894 | "journal_throttle_low_threshhold", | |
1895 | "journal_throttle_high_threshhold", | |
1896 | "journal_throttle_high_multiple", | |
1897 | "journal_throttle_max_multiple", | |
1898 | "filestore_expected_throughput_bytes", | |
1899 | NULL}; | |
1900 | return KEYS; | |
1901 | } | |
1902 | ||
1903 | void FileJournal::wrap_read_bl( | |
1904 | off64_t pos, | |
1905 | int64_t olen, | |
1906 | bufferlist* bl, | |
1907 | off64_t *out_pos | |
1908 | ) const | |
1909 | { | |
1910 | while (olen > 0) { | |
1911 | while (pos >= header.max_size) | |
1912 | pos = pos + get_top() - header.max_size; | |
1913 | ||
1914 | int64_t len; | |
1915 | if (pos + olen > header.max_size) | |
1916 | len = header.max_size - pos; // partial | |
1917 | else | |
1918 | len = olen; // rest | |
1919 | ||
1920 | int64_t actual = ::lseek64(fd, pos, SEEK_SET); | |
11fdf7f2 | 1921 | ceph_assert(actual == pos); |
7c673cae | 1922 | |
f67539c2 | 1923 | bufferptr bp = ceph::buffer::create(len); |
7c673cae FG |
1924 | int r = safe_read_exact(fd, bp.c_str(), len); |
1925 | if (r) { | |
1926 | derr << "FileJournal::wrap_read_bl: safe_read_exact " << pos << "~" << len << " returned " | |
11fdf7f2 | 1927 | << cpp_strerror(r) << dendl; |
7c673cae FG |
1928 | ceph_abort(); |
1929 | } | |
1930 | bl->push_back(std::move(bp)); | |
1931 | pos += len; | |
1932 | olen -= len; | |
1933 | } | |
1934 | if (pos >= header.max_size) | |
1935 | pos = pos + get_top() - header.max_size; | |
1936 | if (out_pos) | |
1937 | *out_pos = pos; | |
1938 | } | |
1939 | ||
1940 | bool FileJournal::read_entry( | |
1941 | bufferlist &bl, | |
1942 | uint64_t &next_seq, | |
1943 | bool *corrupt) | |
1944 | { | |
1945 | if (corrupt) | |
1946 | *corrupt = false; | |
1947 | uint64_t seq = next_seq; | |
1948 | ||
1949 | if (!read_pos) { | |
1950 | dout(2) << "read_entry -- not readable" << dendl; | |
1951 | return false; | |
1952 | } | |
1953 | ||
1954 | off64_t pos = read_pos; | |
1955 | off64_t next_pos = pos; | |
1956 | stringstream ss; | |
1957 | read_entry_result result = do_read_entry( | |
1958 | pos, | |
1959 | &next_pos, | |
1960 | &bl, | |
1961 | &seq, | |
1962 | &ss); | |
1963 | if (result == SUCCESS) { | |
1964 | journalq.push_back( pair<uint64_t,off64_t>(seq, pos)); | |
1965 | uint64_t amount_to_take = | |
1966 | next_pos > pos ? | |
1967 | next_pos - pos : | |
1968 | (header.max_size - pos) + (next_pos - get_top()); | |
1969 | throttle.take(amount_to_take); | |
1970 | throttle.register_throttle_seq(next_seq, amount_to_take); | |
1971 | if (logger) { | |
1972 | logger->inc(l_filestore_journal_ops, 1); | |
1973 | logger->inc(l_filestore_journal_bytes, amount_to_take); | |
1974 | } | |
1975 | if (next_seq > seq) { | |
1976 | return false; | |
1977 | } else { | |
1978 | read_pos = next_pos; | |
1979 | next_seq = seq; | |
1980 | if (seq > journaled_seq) | |
1981 | journaled_seq = seq; | |
1982 | return true; | |
1983 | } | |
3efd9988 FG |
1984 | } else { |
1985 | derr << "do_read_entry(" << pos << "): " << ss.str() << dendl; | |
7c673cae FG |
1986 | } |
1987 | ||
1988 | if (seq && seq < header.committed_up_to) { | |
1989 | derr << "Unable to read past sequence " << seq | |
1990 | << " but header indicates the journal has committed up through " | |
1991 | << header.committed_up_to << ", journal is corrupt" << dendl; | |
1992 | if (cct->_conf->journal_ignore_corruption) { | |
1993 | if (corrupt) | |
1994 | *corrupt = true; | |
1995 | return false; | |
1996 | } else { | |
1997 | ceph_abort(); | |
1998 | } | |
1999 | } | |
2000 | ||
7c673cae FG |
2001 | dout(2) << "No further valid entries found, journal is most likely valid" |
2002 | << dendl; | |
2003 | return false; | |
2004 | } | |
2005 | ||
2006 | FileJournal::read_entry_result FileJournal::do_read_entry( | |
2007 | off64_t init_pos, | |
2008 | off64_t *next_pos, | |
2009 | bufferlist *bl, | |
2010 | uint64_t *seq, | |
2011 | ostream *ss, | |
2012 | entry_header_t *_h) const | |
2013 | { | |
2014 | off64_t cur_pos = init_pos; | |
2015 | bufferlist _bl; | |
2016 | if (!bl) | |
2017 | bl = &_bl; | |
2018 | ||
2019 | // header | |
2020 | entry_header_t *h; | |
2021 | bufferlist hbl; | |
2022 | off64_t _next_pos; | |
2023 | wrap_read_bl(cur_pos, sizeof(*h), &hbl, &_next_pos); | |
2024 | h = reinterpret_cast<entry_header_t *>(hbl.c_str()); | |
2025 | ||
2026 | if (!h->check_magic(cur_pos, header.get_fsid64())) { | |
2027 | dout(25) << "read_entry " << init_pos | |
2028 | << " : bad header magic, end of journal" << dendl; | |
2029 | if (ss) | |
2030 | *ss << "bad header magic"; | |
2031 | if (next_pos) | |
2032 | *next_pos = init_pos + (4<<10); // check 4k ahead | |
2033 | return MAYBE_CORRUPT; | |
2034 | } | |
2035 | cur_pos = _next_pos; | |
2036 | ||
2037 | // pad + body + pad | |
2038 | if (h->pre_pad) | |
2039 | cur_pos += h->pre_pad; | |
2040 | ||
2041 | bl->clear(); | |
2042 | wrap_read_bl(cur_pos, h->len, bl, &cur_pos); | |
2043 | ||
2044 | if (h->post_pad) | |
2045 | cur_pos += h->post_pad; | |
2046 | ||
2047 | // footer | |
2048 | entry_header_t *f; | |
2049 | bufferlist fbl; | |
2050 | wrap_read_bl(cur_pos, sizeof(*f), &fbl, &cur_pos); | |
2051 | f = reinterpret_cast<entry_header_t *>(fbl.c_str()); | |
2052 | if (memcmp(f, h, sizeof(*f))) { | |
2053 | if (ss) | |
2054 | *ss << "bad footer magic, partial entry"; | |
2055 | if (next_pos) | |
2056 | *next_pos = cur_pos; | |
2057 | return MAYBE_CORRUPT; | |
2058 | } | |
2059 | ||
2060 | if ((header.flags & header_t::FLAG_CRC) || // if explicitly enabled (new journal) | |
2061 | h->crc32c != 0) { // newer entry in old journal | |
2062 | uint32_t actual_crc = bl->crc32c(0); | |
2063 | if (actual_crc != h->crc32c) { | |
2064 | if (ss) | |
2065 | *ss << "header crc (" << h->crc32c | |
2066 | << ") doesn't match body crc (" << actual_crc << ")"; | |
2067 | if (next_pos) | |
2068 | *next_pos = cur_pos; | |
2069 | return MAYBE_CORRUPT; | |
2070 | } | |
2071 | } | |
2072 | ||
2073 | // yay! | |
2074 | dout(2) << "read_entry " << init_pos << " : seq " << h->seq | |
2075 | << " " << h->len << " bytes" | |
2076 | << dendl; | |
2077 | ||
2078 | // ok! | |
2079 | if (seq) | |
2080 | *seq = h->seq; | |
2081 | ||
2082 | ||
2083 | if (next_pos) | |
2084 | *next_pos = cur_pos; | |
2085 | ||
2086 | if (_h) | |
2087 | *_h = *h; | |
2088 | ||
11fdf7f2 | 2089 | ceph_assert(cur_pos % header.alignment == 0); |
7c673cae FG |
2090 | return SUCCESS; |
2091 | } | |
2092 | ||
2093 | void FileJournal::reserve_throttle_and_backoff(uint64_t count) | |
2094 | { | |
2095 | throttle.get(count); | |
2096 | } | |
2097 | ||
2098 | void FileJournal::get_header( | |
2099 | uint64_t wanted_seq, | |
2100 | off64_t *_pos, | |
2101 | entry_header_t *h) | |
2102 | { | |
2103 | off64_t pos = header.start; | |
2104 | off64_t next_pos = pos; | |
2105 | bufferlist bl; | |
2106 | uint64_t seq = 0; | |
2107 | dout(2) << __func__ << dendl; | |
2108 | while (1) { | |
2109 | bl.clear(); | |
2110 | pos = next_pos; | |
2111 | read_entry_result result = do_read_entry( | |
2112 | pos, | |
2113 | &next_pos, | |
2114 | &bl, | |
2115 | &seq, | |
2116 | 0, | |
2117 | h); | |
2118 | if (result == FAILURE || result == MAYBE_CORRUPT) | |
2119 | ceph_abort(); | |
2120 | if (seq == wanted_seq) { | |
2121 | if (_pos) | |
2122 | *_pos = pos; | |
2123 | return; | |
2124 | } | |
2125 | } | |
2126 | ceph_abort(); // not reachable | |
2127 | } | |
2128 | ||
2129 | void FileJournal::corrupt( | |
2130 | int wfd, | |
2131 | off64_t corrupt_at) | |
2132 | { | |
2133 | dout(2) << __func__ << dendl; | |
2134 | if (corrupt_at >= header.max_size) | |
2135 | corrupt_at = corrupt_at + get_top() - header.max_size; | |
2136 | ||
2137 | int64_t actual = ::lseek64(fd, corrupt_at, SEEK_SET); | |
11fdf7f2 | 2138 | ceph_assert(actual == corrupt_at); |
7c673cae FG |
2139 | |
2140 | char buf[10]; | |
2141 | int r = safe_read_exact(fd, buf, 1); | |
11fdf7f2 | 2142 | ceph_assert(r == 0); |
7c673cae FG |
2143 | |
2144 | actual = ::lseek64(wfd, corrupt_at, SEEK_SET); | |
11fdf7f2 | 2145 | ceph_assert(actual == corrupt_at); |
7c673cae FG |
2146 | |
2147 | buf[0]++; | |
2148 | r = safe_write(wfd, buf, 1); | |
11fdf7f2 | 2149 | ceph_assert(r == 0); |
7c673cae FG |
2150 | } |
2151 | ||
2152 | void FileJournal::corrupt_payload( | |
2153 | int wfd, | |
2154 | uint64_t seq) | |
2155 | { | |
2156 | dout(2) << __func__ << dendl; | |
2157 | off64_t pos = 0; | |
2158 | entry_header_t h; | |
2159 | get_header(seq, &pos, &h); | |
2160 | off64_t corrupt_at = | |
2161 | pos + sizeof(entry_header_t) + h.pre_pad; | |
2162 | corrupt(wfd, corrupt_at); | |
2163 | } | |
2164 | ||
2165 | ||
2166 | void FileJournal::corrupt_footer_magic( | |
2167 | int wfd, | |
2168 | uint64_t seq) | |
2169 | { | |
2170 | dout(2) << __func__ << dendl; | |
2171 | off64_t pos = 0; | |
2172 | entry_header_t h; | |
2173 | get_header(seq, &pos, &h); | |
2174 | off64_t corrupt_at = | |
2175 | pos + sizeof(entry_header_t) + h.pre_pad + | |
2176 | h.len + h.post_pad + | |
2177 | (reinterpret_cast<char*>(&h.magic2) - reinterpret_cast<char*>(&h)); | |
2178 | corrupt(wfd, corrupt_at); | |
2179 | } | |
2180 | ||
2181 | ||
2182 | void FileJournal::corrupt_header_magic( | |
2183 | int wfd, | |
2184 | uint64_t seq) | |
2185 | { | |
2186 | dout(2) << __func__ << dendl; | |
2187 | off64_t pos = 0; | |
2188 | entry_header_t h; | |
2189 | get_header(seq, &pos, &h); | |
2190 | off64_t corrupt_at = | |
2191 | pos + | |
2192 | (reinterpret_cast<char*>(&h.magic2) - reinterpret_cast<char*>(&h)); | |
2193 | corrupt(wfd, corrupt_at); | |
2194 | } | |
2195 | ||
2196 | off64_t FileJournal::get_journal_size_estimate() | |
2197 | { | |
2198 | off64_t size, start = header.start; | |
2199 | if (write_pos < start) { | |
2200 | size = (max_size - start) + write_pos; | |
2201 | } else { | |
2202 | size = write_pos - start; | |
2203 | } | |
2204 | dout(20) << __func__ << " journal size=" << size << dendl; | |
2205 | return size; | |
2206 | } | |
11fdf7f2 TL |
2207 | |
2208 | void FileJournal::get_devices(set<string> *ls) | |
2209 | { | |
2210 | string dev_node; | |
2211 | BlkDev blkdev(fd); | |
2212 | if (int rc = blkdev.wholedisk(&dev_node); rc) { | |
2213 | return; | |
2214 | } | |
2215 | get_raw_devices(dev_node, ls); | |
2216 | } | |
2217 | ||
2218 | void FileJournal::collect_metadata(map<string,string> *pm) | |
2219 | { | |
2220 | BlkDev blkdev(fd); | |
2221 | char partition_path[PATH_MAX]; | |
2222 | char dev_node[PATH_MAX]; | |
2223 | if (blkdev.partition(partition_path, PATH_MAX)) { | |
2224 | (*pm)["backend_filestore_journal_partition_path"] = "unknown"; | |
2225 | } else { | |
2226 | (*pm)["backend_filestore_journal_partition_path"] = string(partition_path); | |
2227 | } | |
2228 | if (blkdev.wholedisk(dev_node, PATH_MAX)) { | |
2229 | (*pm)["backend_filestore_journal_dev_node"] = "unknown"; | |
2230 | } else { | |
2231 | (*pm)["backend_filestore_journal_dev_node"] = string(dev_node); | |
2232 | devname = dev_node; | |
2233 | } | |
2234 | } |