1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #ifndef CEPH_FILEJOURNAL_H
17 #define CEPH_FILEJOURNAL_H
23 #include "common/Cond.h"
24 #include "common/Mutex.h"
25 #include "common/Thread.h"
26 #include "common/Throttle.h"
27 #include "JournalThrottle.h"
28 #include "common/zipkin_trace.h"
34 // re-include our assert to clobber the system one; fix dout:
35 #include "include/assert.h"
38 * Implements journaling on top of block device or file.
40 * Lock ordering is write_lock > aio_lock > (completions_lock | finisher_lock)
44 public md_config_obs_t
{
46 /// Protected by finisher_lock
47 struct completion_item
{
51 TrackedOpRef tracked_op
;
52 completion_item(uint64_t o
, Context
*c
, utime_t s
, TrackedOpRef opref
)
53 : seq(o
), finish(c
), start(s
), tracked_op(opref
) {}
54 completion_item() : seq(0), finish(0), start(0) {}
60 TrackedOpRef tracked_op
;
62 write_item(uint64_t s
, bufferlist
& b
, int ol
, TrackedOpRef opref
) :
63 seq(s
), orig_len(ol
), tracked_op(opref
) {
64 bl
.claim(b
, buffer::list::CLAIM_ALLOW_NONSHAREABLE
); // potential zero-copy
66 write_item() : seq(0), orig_len(0) {}
71 uint64_t journaled_seq
;
72 bool plug_journal_completions
;
76 list
<write_item
> writeq
;
78 write_item
&peek_write();
80 void batch_pop_write(list
<write_item
> &items
);
81 void batch_unpop_write(list
<write_item
> &items
);
83 Mutex completions_lock
;
84 list
<completion_item
> completions
;
85 bool completions_empty() {
86 Mutex::Locker
l(completions_lock
);
87 return completions
.empty();
89 void batch_pop_completions(list
<completion_item
> &items
) {
90 Mutex::Locker
l(completions_lock
);
91 completions
.swap(items
);
93 void batch_unpop_completions(list
<completion_item
> &items
) {
94 Mutex::Locker
l(completions_lock
);
95 completions
.splice(completions
.begin(), items
);
97 completion_item
completion_peek_front() {
98 Mutex::Locker
l(completions_lock
);
99 assert(!completions
.empty());
100 return completions
.front();
102 void completion_pop_front() {
103 Mutex::Locker
l(completions_lock
);
104 assert(!completions
.empty());
105 completions
.pop_front();
108 int prepare_entry(vector
<ObjectStore::Transaction
>& tls
, bufferlist
* tbl
) override
;
110 void submit_entry(uint64_t seq
, bufferlist
& bl
, uint32_t orig_len
,
112 TrackedOpRef osd_op
= TrackedOpRef()) override
;
113 /// End protected by finisher_lock
121 // NOTE: remove kludgey weirdness in read_header() next time a flag is added.
128 int64_t max_size
; // max size of journal ring buffer
129 int64_t start
; // offset of first entry
130 uint64_t committed_up_to
; // committed up to
135 * entry at header.start has sequence >= start_seq
137 * Generally, the entry at header.start will have sequence
138 * start_seq if it exists. The only exception is immediately
139 * after journal creation since the first sequence number is
142 * If the first read on open fails, we can assume corruption
143 * if start_seq > committed_up_to because the entry would have
144 * a sequence >= start_seq and therefore > committed_up_to.
149 flags(0), block_size(0), alignment(0), max_size(0), start(0),
150 committed_up_to(0), start_seq(0) {}
156 uint64_t get_fsid64() const {
157 return *(uint64_t*)fsid
.bytes();
160 void encode(bufferlist
& bl
) const {
167 ::encode(block_size
, em
);
168 ::encode(alignment
, em
);
169 ::encode(max_size
, em
);
171 ::encode(committed_up_to
, em
);
172 ::encode(start_seq
, em
);
176 void decode(bufferlist::iterator
& bl
) {
179 if (v
< 2) { // normally 0, but concievably 1
180 // decode old header_t struct (pre v0.40).
181 bl
.advance(4); // skip __u32 flags (it was unused by any old code)
185 *(uint64_t*)&fsid
.bytes()[0] = tfsid
;
186 *(uint64_t*)&fsid
.bytes()[8] = tfsid
;
187 ::decode(block_size
, bl
);
188 ::decode(alignment
, bl
);
189 ::decode(max_size
, bl
);
197 bufferlist::iterator t
= em
.begin();
200 ::decode(block_size
, t
);
201 ::decode(alignment
, t
);
202 ::decode(max_size
, t
);
206 ::decode(committed_up_to
, t
);
211 ::decode(start_seq
, t
);
217 struct entry_header_t
{
218 uint64_t seq
; // fs op seq #
219 uint32_t crc32c
; // payload only. not header, pre_pad, post_pad, or footer.
221 uint32_t pre_pad
, post_pad
;
225 static uint64_t make_magic(uint64_t seq
, uint32_t len
, uint64_t fsid
) {
226 return (fsid
^ seq
^ len
);
228 bool check_magic(off64_t pos
, uint64_t fsid
) {
230 magic1
== (uint64_t)pos
&&
231 magic2
== (fsid
^ seq
^ len
);
233 } __attribute__((__packed__
, aligned(4)));
235 bool journalq_empty() { return journalq
.empty(); }
243 bool directio
, aio
, force_aio
;
244 bool must_write_header
;
245 off64_t write_pos
; // byte where the next entry to be written will go
247 bool discard
; //for block journal whether support discard
250 /// state associated with an in-flight aio request
251 /// Protected by aio_lock
257 uint64_t off
, len
; ///< these are for debug only
258 uint64_t seq
; ///< seq number to complete on aio completion, if non-zero
260 aio_info(bufferlist
& b
, uint64_t o
, uint64_t s
)
261 : iov(NULL
), done(false), off(o
), len(b
.length()), seq(s
) {
270 Cond write_finish_cond
;
271 io_context_t aio_ctx
;
272 list
<aio_info
> aio_queue
;
273 int aio_num
, aio_bytes
;
274 uint64_t aio_write_queue_ops
;
275 uint64_t aio_write_queue_bytes
;
276 /// End protected by aio_lock
279 uint64_t last_committed_seq
;
280 uint64_t journaled_since_start
;
283 * full states cycle at the beginnging of each commit epoch, when commit_start()
285 * FULL - we just filled up during this epoch.
286 * WAIT - we filled up last epoch; now we have to wait until everything during
287 * that epoch commits to the fs before we can start writing over it.
288 * NOTFULL - all good, journal away.
299 deque
<pair
<uint64_t, off64_t
> > journalq
; // track seq offsets, so we can trim later.
300 uint64_t writing_seq
;
304 int set_throttle_params();
305 const char** get_tracked_conf_keys() const override
;
306 void handle_conf_change(
307 const struct md_config_t
*conf
,
308 const std::set
<std::string
> &changed
) override
{
309 for (const char **i
= get_tracked_conf_keys();
312 if (changed
.count(string(*i
))) {
313 set_throttle_params();
319 void complete_write(uint64_t ops
, uint64_t bytes
);
320 JournalThrottle throttle
;
329 int _open(bool wr
, bool create
=false);
330 int _open_block_device();
331 void _close(int fd
) const;
332 int _open_file(int64_t oldsize
, blksize_t blksize
, bool create
);
333 int _dump(ostream
& out
, bool simple
);
334 void print_header(const header_t
&hdr
) const;
335 int read_header(header_t
*hdr
) const;
336 bufferptr
prepare_header();
339 void write_thread_entry();
341 void queue_completions_thru(uint64_t seq
);
343 int check_for_full(uint64_t seq
, off64_t pos
, off64_t size
);
344 int prepare_multi_write(bufferlist
& bl
, uint64_t& orig_ops
, uint64_t& orig_bytee
);
345 int prepare_single_write(write_item
&next_write
, bufferlist
& bl
, off64_t
& queue_pos
,
346 uint64_t& orig_ops
, uint64_t& orig_bytes
);
347 void do_write(bufferlist
& bl
);
349 void write_finish_thread_entry();
350 void check_aio_completion();
351 void do_aio_write(bufferlist
& bl
);
352 int write_aio_bl(off64_t
& pos
, bufferlist
& bl
, uint64_t seq
);
355 void check_align(off64_t pos
, bufferlist
& bl
);
356 int write_bl(off64_t
& pos
, bufferlist
& bl
);
358 /// read len from journal starting at in_pos and wrapping up to len
360 off64_t in_pos
, ///< [in] start position
361 int64_t len
, ///< [in] length to read
362 bufferlist
* bl
, ///< [out] result
363 off64_t
*out_pos
///< [out] next position to read, will be wrapped
366 void do_discard(int64_t offset
, int64_t end
);
368 class Writer
: public Thread
{
369 FileJournal
*journal
;
371 explicit Writer(FileJournal
*fj
) : journal(fj
) {}
372 void *entry() override
{
373 journal
->write_thread_entry();
378 class WriteFinisher
: public Thread
{
379 FileJournal
*journal
;
381 explicit WriteFinisher(FileJournal
*fj
) : journal(fj
) {}
382 void *entry() override
{
383 journal
->write_finish_thread_entry();
386 } write_finish_thread
;
388 off64_t
get_top() const {
389 return ROUND_UP_TO(sizeof(header
), block_size
);
392 ZTracer::Endpoint trace_endpoint
;
395 FileJournal(CephContext
* cct
, uuid_d fsid
, Finisher
*fin
, Cond
*sync_cond
,
396 const char *f
, bool dio
=false, bool ai
=true, bool faio
=false) :
397 Journal(cct
, fsid
, fin
, sync_cond
),
398 finisher_lock("FileJournal::finisher_lock", false, true, false, cct
),
400 plug_journal_completions(false),
401 writeq_lock("FileJournal::writeq_lock", false, true, false, cct
),
403 "FileJournal::completions_lock", false, true, false, cct
),
406 max_size(0), block_size(0),
407 directio(dio
), aio(ai
), force_aio(faio
),
408 must_write_header(false),
409 write_pos(0), read_pos(0),
412 aio_lock("FileJournal::aio_lock"),
414 aio_num(0), aio_bytes(0),
415 aio_write_queue_ops(0),
416 aio_write_queue_bytes(0),
418 last_committed_seq(0),
419 journaled_since_start(0),
420 full_state(FULL_NOTFULL
),
423 throttle(cct
->_conf
->filestore_caller_concurrency
),
424 write_lock("FileJournal::write_lock", false, true, false, cct
),
428 write_finish_thread(this),
429 trace_endpoint("0.0.0.0", 0, "FileJournal") {
431 if (aio
&& !directio
) {
432 lderr(cct
) << "FileJournal::_open_any: aio not supported without directio; disabling aio" << dendl
;
437 lderr(cct
) << "FileJournal::_open_any: libaio not compiled in; disabling aio" << dendl
;
442 cct
->_conf
->add_observer(this);
444 ~FileJournal() override
{
447 cct
->_conf
->remove_observer(this);
450 int check() override
;
451 int create() override
;
452 int open(uint64_t fs_op_seq
) override
;
453 void close() override
;
454 int peek_fsid(uuid_d
& fsid
);
456 int dump(ostream
& out
) override
;
457 int simple_dump(ostream
& out
);
458 int _fdump(Formatter
&f
, bool simple
);
460 void flush() override
;
462 void reserve_throttle_and_backoff(uint64_t count
) override
;
464 bool is_writeable() override
{
465 return read_pos
== 0;
467 int make_writeable() override
;
470 void commit_start(uint64_t seq
) override
;
471 void committed_thru(uint64_t seq
) override
;
472 bool should_commit_now() override
{
473 return full_state
!= FULL_NOTFULL
&& !write_stop
;
476 void write_header_sync();
478 void set_wait_on_full(bool b
) { wait_on_full
= b
; }
480 off64_t
get_journal_size_estimate();
484 /// Result code for read_entry
485 enum read_entry_result
{
494 * Reads next entry starting at pos. If the entry appears
495 * clean, *bl will contain the payload, *seq will contain
496 * the sequence number, and *out_pos will reflect the next
497 * read position. If the entry is invalid *ss will contain
498 * debug text, while *seq, *out_pos, and *bl will be unchanged.
500 * If the entry suggests a corrupt log, *ss will contain debug
501 * text, *out_pos will contain the next index to check. If
502 * we find an entry in this way that returns SUCCESS, the journal
503 * is most likely corrupt.
505 read_entry_result
do_read_entry(
506 off64_t pos
, ///< [in] position to read
507 off64_t
*next_pos
, ///< [out] next position to read
508 bufferlist
* bl
, ///< [out] payload for successful read
509 uint64_t *seq
, ///< [out] seq of successful read
510 ostream
*ss
, ///< [out] error output
511 entry_header_t
*h
= 0 ///< [out] header
512 ) const; ///< @return result code
522 uint64_t &last_seq
) override
{
523 return read_entry(bl
, last_seq
, 0);
534 void corrupt_payload(
537 void corrupt_footer_magic(
540 void corrupt_header_magic(
545 WRITE_CLASS_ENCODER(FileJournal::header_t
)