1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #ifndef CEPH_FILEJOURNAL_H
17 #define CEPH_FILEJOURNAL_H
23 #include "common/Cond.h"
24 #include "common/Mutex.h"
25 #include "common/Thread.h"
26 #include "common/Throttle.h"
27 #include "JournalThrottle.h"
28 #include "common/zipkin_trace.h"
35 * Implements journaling on top of block device or file.
37 * Lock ordering is write_lock > aio_lock > (completions_lock | finisher_lock)
41 public md_config_obs_t
{
43 /// Protected by finisher_lock
44 struct completion_item
{
48 TrackedOpRef tracked_op
;
49 completion_item(uint64_t o
, Context
*c
, utime_t s
, TrackedOpRef opref
)
50 : seq(o
), finish(c
), start(s
), tracked_op(opref
) {}
51 completion_item() : seq(0), finish(0), start(0) {}
57 TrackedOpRef tracked_op
;
59 write_item(uint64_t s
, bufferlist
& b
, int ol
, TrackedOpRef opref
) :
60 seq(s
), orig_len(ol
), tracked_op(opref
) {
61 bl
.claim(b
, buffer::list::CLAIM_ALLOW_NONSHAREABLE
); // potential zero-copy
63 write_item() : seq(0), orig_len(0) {}
68 uint64_t journaled_seq
;
69 bool plug_journal_completions
;
73 list
<write_item
> writeq
;
75 write_item
&peek_write();
77 void batch_pop_write(list
<write_item
> &items
);
78 void batch_unpop_write(list
<write_item
> &items
);
80 Mutex completions_lock
;
81 list
<completion_item
> completions
;
82 bool completions_empty() {
83 Mutex::Locker
l(completions_lock
);
84 return completions
.empty();
86 void batch_pop_completions(list
<completion_item
> &items
) {
87 Mutex::Locker
l(completions_lock
);
88 completions
.swap(items
);
90 void batch_unpop_completions(list
<completion_item
> &items
) {
91 Mutex::Locker
l(completions_lock
);
92 completions
.splice(completions
.begin(), items
);
94 completion_item
completion_peek_front() {
95 Mutex::Locker
l(completions_lock
);
96 assert(!completions
.empty());
97 return completions
.front();
99 void completion_pop_front() {
100 Mutex::Locker
l(completions_lock
);
101 assert(!completions
.empty());
102 completions
.pop_front();
105 int prepare_entry(vector
<ObjectStore::Transaction
>& tls
, bufferlist
* tbl
) override
;
107 void submit_entry(uint64_t seq
, bufferlist
& bl
, uint32_t orig_len
,
109 TrackedOpRef osd_op
= TrackedOpRef()) override
;
110 /// End protected by finisher_lock
118 // NOTE: remove kludgey weirdness in read_header() next time a flag is added.
125 int64_t max_size
; // max size of journal ring buffer
126 int64_t start
; // offset of first entry
127 uint64_t committed_up_to
; // committed up to
132 * entry at header.start has sequence >= start_seq
134 * Generally, the entry at header.start will have sequence
135 * start_seq if it exists. The only exception is immediately
136 * after journal creation since the first sequence number is
139 * If the first read on open fails, we can assume corruption
140 * if start_seq > committed_up_to because the entry would have
141 * a sequence >= start_seq and therefore > committed_up_to.
146 flags(0), block_size(0), alignment(0), max_size(0), start(0),
147 committed_up_to(0), start_seq(0) {}
153 uint64_t get_fsid64() const {
154 return *(uint64_t*)fsid
.bytes();
157 void encode(bufferlist
& bl
) const {
164 ::encode(block_size
, em
);
165 ::encode(alignment
, em
);
166 ::encode(max_size
, em
);
168 ::encode(committed_up_to
, em
);
169 ::encode(start_seq
, em
);
173 void decode(bufferlist::iterator
& bl
) {
176 if (v
< 2) { // normally 0, but concievably 1
177 // decode old header_t struct (pre v0.40).
178 bl
.advance(4); // skip __u32 flags (it was unused by any old code)
182 *(uint64_t*)&fsid
.bytes()[0] = tfsid
;
183 *(uint64_t*)&fsid
.bytes()[8] = tfsid
;
184 ::decode(block_size
, bl
);
185 ::decode(alignment
, bl
);
186 ::decode(max_size
, bl
);
194 bufferlist::iterator t
= em
.begin();
197 ::decode(block_size
, t
);
198 ::decode(alignment
, t
);
199 ::decode(max_size
, t
);
203 ::decode(committed_up_to
, t
);
208 ::decode(start_seq
, t
);
214 struct entry_header_t
{
215 uint64_t seq
; // fs op seq #
216 uint32_t crc32c
; // payload only. not header, pre_pad, post_pad, or footer.
218 uint32_t pre_pad
, post_pad
;
222 static uint64_t make_magic(uint64_t seq
, uint32_t len
, uint64_t fsid
) {
223 return (fsid
^ seq
^ len
);
225 bool check_magic(off64_t pos
, uint64_t fsid
) {
227 magic1
== (uint64_t)pos
&&
228 magic2
== (fsid
^ seq
^ len
);
230 } __attribute__((__packed__
, aligned(4)));
232 bool journalq_empty() { return journalq
.empty(); }
240 bool directio
, aio
, force_aio
;
241 bool must_write_header
;
242 off64_t write_pos
; // byte where the next entry to be written will go
244 bool discard
; //for block journal whether support discard
247 /// state associated with an in-flight aio request
248 /// Protected by aio_lock
254 uint64_t off
, len
; ///< these are for debug only
255 uint64_t seq
; ///< seq number to complete on aio completion, if non-zero
257 aio_info(bufferlist
& b
, uint64_t o
, uint64_t s
)
258 : iov(NULL
), done(false), off(o
), len(b
.length()), seq(s
) {
267 Cond write_finish_cond
;
268 io_context_t aio_ctx
;
269 list
<aio_info
> aio_queue
;
270 int aio_num
, aio_bytes
;
271 uint64_t aio_write_queue_ops
;
272 uint64_t aio_write_queue_bytes
;
273 /// End protected by aio_lock
276 uint64_t last_committed_seq
;
277 uint64_t journaled_since_start
;
280 * full states cycle at the beginnging of each commit epoch, when commit_start()
282 * FULL - we just filled up during this epoch.
283 * WAIT - we filled up last epoch; now we have to wait until everything during
284 * that epoch commits to the fs before we can start writing over it.
285 * NOTFULL - all good, journal away.
296 deque
<pair
<uint64_t, off64_t
> > journalq
; // track seq offsets, so we can trim later.
297 uint64_t writing_seq
;
301 int set_throttle_params();
302 const char** get_tracked_conf_keys() const override
;
303 void handle_conf_change(
304 const struct md_config_t
*conf
,
305 const std::set
<std::string
> &changed
) override
{
306 for (const char **i
= get_tracked_conf_keys();
309 if (changed
.count(string(*i
))) {
310 set_throttle_params();
316 void complete_write(uint64_t ops
, uint64_t bytes
);
317 JournalThrottle throttle
;
326 int _open(bool wr
, bool create
=false);
327 int _open_block_device();
328 void _close(int fd
) const;
329 int _open_file(int64_t oldsize
, blksize_t blksize
, bool create
);
330 int _dump(ostream
& out
, bool simple
);
331 void print_header(const header_t
&hdr
) const;
332 int read_header(header_t
*hdr
) const;
333 bufferptr
prepare_header();
336 void write_thread_entry();
338 void queue_completions_thru(uint64_t seq
);
340 int check_for_full(uint64_t seq
, off64_t pos
, off64_t size
);
341 int prepare_multi_write(bufferlist
& bl
, uint64_t& orig_ops
, uint64_t& orig_bytee
);
342 int prepare_single_write(write_item
&next_write
, bufferlist
& bl
, off64_t
& queue_pos
,
343 uint64_t& orig_ops
, uint64_t& orig_bytes
);
344 void do_write(bufferlist
& bl
);
346 void write_finish_thread_entry();
347 void check_aio_completion();
348 void do_aio_write(bufferlist
& bl
);
349 int write_aio_bl(off64_t
& pos
, bufferlist
& bl
, uint64_t seq
);
352 void check_align(off64_t pos
, bufferlist
& bl
);
353 int write_bl(off64_t
& pos
, bufferlist
& bl
);
355 /// read len from journal starting at in_pos and wrapping up to len
357 off64_t in_pos
, ///< [in] start position
358 int64_t len
, ///< [in] length to read
359 bufferlist
* bl
, ///< [out] result
360 off64_t
*out_pos
///< [out] next position to read, will be wrapped
363 void do_discard(int64_t offset
, int64_t end
);
365 class Writer
: public Thread
{
366 FileJournal
*journal
;
368 explicit Writer(FileJournal
*fj
) : journal(fj
) {}
369 void *entry() override
{
370 journal
->write_thread_entry();
375 class WriteFinisher
: public Thread
{
376 FileJournal
*journal
;
378 explicit WriteFinisher(FileJournal
*fj
) : journal(fj
) {}
379 void *entry() override
{
380 journal
->write_finish_thread_entry();
383 } write_finish_thread
;
385 off64_t
get_top() const {
386 return ROUND_UP_TO(sizeof(header
), block_size
);
389 ZTracer::Endpoint trace_endpoint
;
392 FileJournal(CephContext
* cct
, uuid_d fsid
, Finisher
*fin
, Cond
*sync_cond
,
393 const char *f
, bool dio
=false, bool ai
=true, bool faio
=false) :
394 Journal(cct
, fsid
, fin
, sync_cond
),
395 finisher_lock("FileJournal::finisher_lock", false, true, false, cct
),
397 plug_journal_completions(false),
398 writeq_lock("FileJournal::writeq_lock", false, true, false, cct
),
400 "FileJournal::completions_lock", false, true, false, cct
),
403 max_size(0), block_size(0),
404 directio(dio
), aio(ai
), force_aio(faio
),
405 must_write_header(false),
406 write_pos(0), read_pos(0),
409 aio_lock("FileJournal::aio_lock"),
411 aio_num(0), aio_bytes(0),
412 aio_write_queue_ops(0),
413 aio_write_queue_bytes(0),
415 last_committed_seq(0),
416 journaled_since_start(0),
417 full_state(FULL_NOTFULL
),
420 throttle(cct
->_conf
->filestore_caller_concurrency
),
421 write_lock("FileJournal::write_lock", false, true, false, cct
),
425 write_finish_thread(this),
426 trace_endpoint("0.0.0.0", 0, "FileJournal") {
428 if (aio
&& !directio
) {
429 lderr(cct
) << "FileJournal::_open_any: aio not supported without directio; disabling aio" << dendl
;
434 lderr(cct
) << "FileJournal::_open_any: libaio not compiled in; disabling aio" << dendl
;
439 cct
->_conf
->add_observer(this);
441 ~FileJournal() override
{
444 cct
->_conf
->remove_observer(this);
447 int check() override
;
448 int create() override
;
449 int open(uint64_t fs_op_seq
) override
;
450 void close() override
;
451 int peek_fsid(uuid_d
& fsid
);
453 int dump(ostream
& out
) override
;
454 int simple_dump(ostream
& out
);
455 int _fdump(Formatter
&f
, bool simple
);
457 void flush() override
;
459 void reserve_throttle_and_backoff(uint64_t count
) override
;
461 bool is_writeable() override
{
462 return read_pos
== 0;
464 int make_writeable() override
;
467 void commit_start(uint64_t seq
) override
;
468 void committed_thru(uint64_t seq
) override
;
469 bool should_commit_now() override
{
470 return full_state
!= FULL_NOTFULL
&& !write_stop
;
473 void write_header_sync();
475 void set_wait_on_full(bool b
) { wait_on_full
= b
; }
477 off64_t
get_journal_size_estimate();
481 /// Result code for read_entry
482 enum read_entry_result
{
491 * Reads next entry starting at pos. If the entry appears
492 * clean, *bl will contain the payload, *seq will contain
493 * the sequence number, and *out_pos will reflect the next
494 * read position. If the entry is invalid *ss will contain
495 * debug text, while *seq, *out_pos, and *bl will be unchanged.
497 * If the entry suggests a corrupt log, *ss will contain debug
498 * text, *out_pos will contain the next index to check. If
499 * we find an entry in this way that returns SUCCESS, the journal
500 * is most likely corrupt.
502 read_entry_result
do_read_entry(
503 off64_t pos
, ///< [in] position to read
504 off64_t
*next_pos
, ///< [out] next position to read
505 bufferlist
* bl
, ///< [out] payload for successful read
506 uint64_t *seq
, ///< [out] seq of successful read
507 ostream
*ss
, ///< [out] error output
508 entry_header_t
*h
= 0 ///< [out] header
509 ) const; ///< @return result code
519 uint64_t &last_seq
) override
{
520 return read_entry(bl
, last_seq
, 0);
531 void corrupt_payload(
534 void corrupt_footer_magic(
537 void corrupt_header_magic(
542 WRITE_CLASS_ENCODER(FileJournal::header_t
)