1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #ifndef CEPH_FILEJOURNAL_H
17 #define CEPH_FILEJOURNAL_H
19 #include <condition_variable>
26 #include "common/config_fwd.h"
27 #include "common/Cond.h"
28 #include "common/Thread.h"
29 #include "common/Throttle.h"
30 #include "JournalThrottle.h"
31 #include "common/zipkin_trace.h"
37 // re-include our assert to clobber the system one; fix dout:
38 #include "include/ceph_assert.h"
41 * Implements journaling on top of block device or file.
43 * Lock ordering is write_lock > aio_lock > (completions_lock | finisher_lock)
47 public md_config_obs_t
{
49 /// Protected by finisher_lock
50 struct completion_item
{
54 TrackedOpRef tracked_op
;
55 completion_item(uint64_t o
, Context
*c
, utime_t s
, TrackedOpRef opref
)
56 : seq(o
), finish(c
), start(s
), tracked_op(opref
) {}
57 completion_item() : seq(0), finish(0), start(0) {}
63 TrackedOpRef tracked_op
;
65 write_item(uint64_t s
, bufferlist
& b
, int ol
, TrackedOpRef opref
) :
66 seq(s
), orig_len(ol
), tracked_op(opref
) {
69 write_item() : seq(0), orig_len(0) {}
72 ceph::mutex finisher_lock
= ceph::make_mutex("FileJournal::finisher_lock");
73 ceph::condition_variable finisher_cond
;
74 uint64_t journaled_seq
;
75 bool plug_journal_completions
;
77 ceph::mutex writeq_lock
= ceph::make_mutex("FileJournal::writeq_lock");
78 ceph::condition_variable writeq_cond
;
79 list
<write_item
> writeq
;
81 write_item
&peek_write();
83 void batch_pop_write(list
<write_item
> &items
);
84 void batch_unpop_write(list
<write_item
> &items
);
86 ceph::mutex completions_lock
=
87 ceph::make_mutex("FileJournal::completions_lock");
88 list
<completion_item
> completions
;
89 bool completions_empty() {
90 std::lock_guard l
{completions_lock
};
91 return completions
.empty();
93 void batch_pop_completions(list
<completion_item
> &items
) {
94 std::lock_guard l
{completions_lock
};
95 completions
.swap(items
);
97 void batch_unpop_completions(list
<completion_item
> &items
) {
98 std::lock_guard l
{completions_lock
};
99 completions
.splice(completions
.begin(), items
);
101 completion_item
completion_peek_front() {
102 std::lock_guard l
{completions_lock
};
103 ceph_assert(!completions
.empty());
104 return completions
.front();
106 void completion_pop_front() {
107 std::lock_guard l
{completions_lock
};
108 ceph_assert(!completions
.empty());
109 completions
.pop_front();
112 int prepare_entry(vector
<ObjectStore::Transaction
>& tls
, bufferlist
* tbl
) override
;
114 void submit_entry(uint64_t seq
, bufferlist
& bl
, uint32_t orig_len
,
116 TrackedOpRef osd_op
= TrackedOpRef()) override
;
117 /// End protected by finisher_lock
125 // NOTE: remove kludgey weirdness in read_header() next time a flag is added.
132 int64_t max_size
; // max size of journal ring buffer
133 int64_t start
; // offset of first entry
134 uint64_t committed_up_to
; // committed up to
139 * entry at header.start has sequence >= start_seq
141 * Generally, the entry at header.start will have sequence
142 * start_seq if it exists. The only exception is immediately
143 * after journal creation since the first sequence number is
146 * If the first read on open fails, we can assume corruption
147 * if start_seq > committed_up_to because the entry would have
148 * a sequence >= start_seq and therefore > committed_up_to.
153 flags(0), block_size(0), alignment(0), max_size(0), start(0),
154 committed_up_to(0), start_seq(0) {}
160 uint64_t get_fsid64() const {
161 return *(uint64_t*)fsid
.bytes();
164 void encode(bufferlist
& bl
) const {
172 encode(block_size
, em
);
173 encode(alignment
, em
);
174 encode(max_size
, em
);
176 encode(committed_up_to
, em
);
177 encode(start_seq
, em
);
181 void decode(bufferlist::const_iterator
& bl
) {
185 if (v
< 2) { // normally 0, but conceivably 1
186 // decode old header_t struct (pre v0.40).
187 bl
+= 4u; // skip __u32 flags (it was unused by any old code)
191 *(uint64_t*)&fsid
.bytes()[0] = tfsid
;
192 *(uint64_t*)&fsid
.bytes()[8] = tfsid
;
193 decode(block_size
, bl
);
194 decode(alignment
, bl
);
195 decode(max_size
, bl
);
203 auto t
= em
.cbegin();
206 decode(block_size
, t
);
207 decode(alignment
, t
);
212 decode(committed_up_to
, t
);
217 decode(start_seq
, t
);
223 struct entry_header_t
{
224 uint64_t seq
; // fs op seq #
225 uint32_t crc32c
; // payload only. not header, pre_pad, post_pad, or footer.
227 uint32_t pre_pad
, post_pad
;
231 static uint64_t make_magic(uint64_t seq
, uint32_t len
, uint64_t fsid
) {
232 return (fsid
^ seq
^ len
);
234 bool check_magic(off64_t pos
, uint64_t fsid
) {
236 magic1
== (uint64_t)pos
&&
237 magic2
== (fsid
^ seq
^ len
);
239 } __attribute__((__packed__
, aligned(4)));
241 bool journalq_empty() { return journalq
.empty(); }
249 bool directio
, aio
, force_aio
;
250 bool must_write_header
;
251 off64_t write_pos
; // byte where the next entry to be written will go
253 bool discard
; //for block journal whether support discard
256 /// state associated with an in-flight aio request
257 /// Protected by aio_lock
263 uint64_t off
, len
; ///< these are for debug only
264 uint64_t seq
; ///< seq number to complete on aio completion, if non-zero
266 aio_info(bufferlist
& b
, uint64_t o
, uint64_t s
)
267 : iov(NULL
), done(false), off(o
), len(b
.length()), seq(s
) {
274 ceph::mutex aio_lock
= ceph::make_mutex("FileJournal::aio_lock");
275 ceph::condition_variable aio_cond
;
276 ceph::condition_variable write_finish_cond
;
277 io_context_t aio_ctx
= 0;
278 list
<aio_info
> aio_queue
;
279 int aio_num
= 0, aio_bytes
= 0;
280 uint64_t aio_write_queue_ops
= 0;
281 uint64_t aio_write_queue_bytes
= 0;
282 /// End protected by aio_lock
285 uint64_t last_committed_seq
;
286 uint64_t journaled_since_start
;
291 * full states cycle at the beginnging of each commit epoch, when commit_start()
293 * FULL - we just filled up during this epoch.
294 * WAIT - we filled up last epoch; now we have to wait until everything during
295 * that epoch commits to the fs before we can start writing over it.
296 * NOTFULL - all good, journal away.
307 deque
<pair
<uint64_t, off64_t
> > journalq
; // track seq offsets, so we can trim later.
308 uint64_t writing_seq
;
312 int set_throttle_params();
313 const char** get_tracked_conf_keys() const override
;
314 void handle_conf_change(
315 const ConfigProxy
& conf
,
316 const std::set
<std::string
> &changed
) override
{
317 for (const char **i
= get_tracked_conf_keys();
320 if (changed
.count(string(*i
))) {
321 set_throttle_params();
327 void complete_write(uint64_t ops
, uint64_t bytes
);
328 JournalThrottle throttle
;
331 ceph::mutex write_lock
= ceph::make_mutex("FileJournal::write_lock");
335 ceph::condition_variable commit_cond
;
337 int _open(bool wr
, bool create
=false);
338 int _open_block_device();
339 void _close(int fd
) const;
340 int _open_file(int64_t oldsize
, blksize_t blksize
, bool create
);
341 int _dump(ostream
& out
, bool simple
);
342 void print_header(const header_t
&hdr
) const;
343 int read_header(header_t
*hdr
) const;
344 bufferptr
prepare_header();
347 void write_thread_entry();
349 void queue_completions_thru(uint64_t seq
);
351 int check_for_full(uint64_t seq
, off64_t pos
, off64_t size
);
352 int prepare_multi_write(bufferlist
& bl
, uint64_t& orig_ops
, uint64_t& orig_bytee
);
353 int prepare_single_write(write_item
&next_write
, bufferlist
& bl
, off64_t
& queue_pos
,
354 uint64_t& orig_ops
, uint64_t& orig_bytes
);
355 void do_write(bufferlist
& bl
);
357 void write_finish_thread_entry();
358 void check_aio_completion();
359 void do_aio_write(bufferlist
& bl
);
360 int write_aio_bl(off64_t
& pos
, bufferlist
& bl
, uint64_t seq
);
363 void check_align(off64_t pos
, bufferlist
& bl
);
364 int write_bl(off64_t
& pos
, bufferlist
& bl
);
366 /// read len from journal starting at in_pos and wrapping up to len
368 off64_t in_pos
, ///< [in] start position
369 int64_t len
, ///< [in] length to read
370 bufferlist
* bl
, ///< [out] result
371 off64_t
*out_pos
///< [out] next position to read, will be wrapped
374 void do_discard(int64_t offset
, int64_t end
);
376 class Writer
: public Thread
{
377 FileJournal
*journal
;
379 explicit Writer(FileJournal
*fj
) : journal(fj
) {}
380 void *entry() override
{
381 journal
->write_thread_entry();
386 class WriteFinisher
: public Thread
{
387 FileJournal
*journal
;
389 explicit WriteFinisher(FileJournal
*fj
) : journal(fj
) {}
390 void *entry() override
{
391 journal
->write_finish_thread_entry();
394 } write_finish_thread
;
396 off64_t
get_top() const {
397 return round_up_to(sizeof(header
), block_size
);
400 ZTracer::Endpoint trace_endpoint
;
403 FileJournal(CephContext
* cct
, uuid_d fsid
, Finisher
*fin
, ceph::condition_variable
*sync_cond
,
404 const char *f
, bool dio
=false, bool ai
=true, bool faio
=false) :
405 Journal(cct
, fsid
, fin
, sync_cond
),
407 plug_journal_completions(false),
410 max_size(0), block_size(0),
411 directio(dio
), aio(ai
), force_aio(faio
),
412 must_write_header(false),
413 write_pos(0), read_pos(0),
415 last_committed_seq(0),
416 journaled_since_start(0),
417 full_state(FULL_NOTFULL
),
420 throttle(cct
->_conf
->filestore_caller_concurrency
),
424 write_finish_thread(this),
425 trace_endpoint("0.0.0.0", 0, "FileJournal") {
427 if (aio
&& !directio
) {
428 lderr(cct
) << "FileJournal::_open_any: aio not supported without directio; disabling aio" << dendl
;
432 if (aio
&& ::getenv("CEPH_DEV") == NULL
) {
433 lderr(cct
) << "FileJournal::_open_any: libaio not compiled in; disabling aio" << dendl
;
438 cct
->_conf
.add_observer(this);
440 ~FileJournal() override
{
441 ceph_assert(fd
== -1);
443 cct
->_conf
.remove_observer(this);
446 int check() override
;
447 int create() override
;
448 int open(uint64_t fs_op_seq
) override
;
449 void close() override
;
450 int peek_fsid(uuid_d
& fsid
);
452 int dump(ostream
& out
) override
;
453 int simple_dump(ostream
& out
);
454 int _fdump(Formatter
&f
, bool simple
);
456 void flush() override
;
458 void get_devices(set
<string
> *ls
) override
;
459 void collect_metadata(map
<string
,string
> *pm
) override
;
461 void reserve_throttle_and_backoff(uint64_t count
) override
;
463 bool is_writeable() override
{
464 return read_pos
== 0;
466 int make_writeable() override
;
469 void commit_start(uint64_t seq
) override
;
470 void committed_thru(uint64_t seq
) override
;
471 bool should_commit_now() override
{
472 return full_state
!= FULL_NOTFULL
&& !write_stop
;
475 void write_header_sync();
477 void set_wait_on_full(bool b
) { wait_on_full
= b
; }
479 off64_t
get_journal_size_estimate();
483 /// Result code for read_entry
484 enum read_entry_result
{
493 * Reads next entry starting at pos. If the entry appears
494 * clean, *bl will contain the payload, *seq will contain
495 * the sequence number, and *out_pos will reflect the next
496 * read position. If the entry is invalid *ss will contain
497 * debug text, while *seq, *out_pos, and *bl will be unchanged.
499 * If the entry suggests a corrupt log, *ss will contain debug
500 * text, *out_pos will contain the next index to check. If
501 * we find an entry in this way that returns SUCCESS, the journal
502 * is most likely corrupt.
504 read_entry_result
do_read_entry(
505 off64_t pos
, ///< [in] position to read
506 off64_t
*next_pos
, ///< [out] next position to read
507 bufferlist
* bl
, ///< [out] payload for successful read
508 uint64_t *seq
, ///< [out] seq of successful read
509 ostream
*ss
, ///< [out] error output
510 entry_header_t
*h
= 0 ///< [out] header
511 ) const; ///< @return result code
521 uint64_t &last_seq
) override
{
522 return read_entry(bl
, last_seq
, 0);
533 void corrupt_payload(
536 void corrupt_footer_magic(
539 void corrupt_header_magic(
544 WRITE_CLASS_ENCODER(FileJournal::header_t
)