1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 * This class stripes a serial log over objects on the store. Four
20 * write_pos - where we're writing new entries
21 * unused_field - where we're reading old entires
22 * expire_pos - what is deemed "old" by user
23 * trimmed_pos - where we're expiring old items
25 * trimmed_pos <= expire_pos <= unused_field <= write_pos.
27 * Often, unused_field <= write_pos (as with MDS log). During
28 * recovery, write_pos is undefined until the end of the log is
31 * A "head" struct at the beginning of the log is used to store
32 * metadata at regular intervals. The basic invariants include:
34 * head.unused_field <= unused_field -- the head may "lag", since
35 * it's updated lazily.
36 * head.write_pos <= write_pos
37 * head.expire_pos <= expire_pos
38 * head.trimmed_pos <= trimmed_pos
42 * head.expire_pos >= trimmed_pos -- this ensures we can find the
43 * "beginning" of the log as last
44 * recorded, before it is trimmed.
45 * trimming will block until a
46 * sufficiently current expire_pos
49 * To recover log state, we simply start at the last write_pos in the
50 * head, and probe the object sequence sizes until we read the end.
52 * Head struct is stored in the first object. Actual journal starts
53 * after layout.period() bytes.
57 #ifndef CEPH_JOURNALER_H
58 #define CEPH_JOURNALER_H
66 #include "common/Timer.h"
67 #include "common/Throttle.h"
68 #include "include/common_fwd.h"
74 typedef __u8 stream_format_t
;
76 // Legacy envelope is leading uint32_t size
78 JOURNAL_FORMAT_LEGACY
= 0,
79 JOURNAL_FORMAT_RESILIENT
= 1,
80 // Insert new formats here, before COUNT
84 // Highest journal format version that we support
85 #define JOURNAL_FORMAT_MAX (JOURNAL_FORMAT_COUNT - 1)
87 // Legacy envelope is leading uint32_t size
88 #define JOURNAL_ENVELOPE_LEGACY (sizeof(uint32_t))
90 // Resilient envelope is leading uint64_t sentinel, uint32_t size,
91 // trailing uint64_t start_ptr
92 #define JOURNAL_ENVELOPE_RESILIENT (sizeof(uint32_t) + sizeof(uint64_t) + \
96 * Represents a collection of entries serialized in a byte stream.
98 * Each entry consists of:
99 * - a blob (used by the next level up as a serialized LogEvent)
100 * - a uint64_t (used by the next level up as a pointer to the start
101 * of the entry in the collection bytestream)
105 stream_format_t format
;
108 JournalStream(stream_format_t format_
) : format(format_
) {}
110 void set_format(stream_format_t format_
) {format
= format_
;}
112 bool readable(bufferlist
&bl
, uint64_t *need
) const;
113 size_t read(bufferlist
&from
, bufferlist
*to
, uint64_t *start_ptr
);
114 size_t write(bufferlist
&entry
, bufferlist
*to
, uint64_t const &start_ptr
);
115 size_t get_envelope_size() const {
116 if (format
>= JOURNAL_FORMAT_RESILIENT
) {
117 return JOURNAL_ENVELOPE_RESILIENT
;
119 return JOURNAL_ENVELOPE_LEGACY
;
123 // A magic number for the start of journal entries, so that we can
124 // identify them in damaged journals.
125 static const uint64_t sentinel
= 0x3141592653589793;
131 // this goes at the head of the log "file".
134 uint64_t trimmed_pos
;
136 uint64_t unused_field
;
139 file_layout_t layout
; //< The mapping from byte stream offsets
141 stream_format_t stream_format
; //< The encoding of LogEvents
142 // within the journal byte stream
144 Header(const char *m
="") :
145 trimmed_pos(0), expire_pos(0), unused_field(0), write_pos(0), magic(m
),
149 void encode(bufferlist
&bl
) const {
150 ENCODE_START(2, 2, bl
);
152 encode(trimmed_pos
, bl
);
153 encode(expire_pos
, bl
);
154 encode(unused_field
, bl
);
155 encode(write_pos
, bl
);
156 encode(layout
, bl
, 0); // encode in legacy format
157 encode(stream_format
, bl
);
160 void decode(bufferlist::const_iterator
&bl
) {
161 DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl
);
163 decode(trimmed_pos
, bl
);
164 decode(expire_pos
, bl
);
165 decode(unused_field
, bl
);
166 decode(write_pos
, bl
);
169 decode(stream_format
, bl
);
171 stream_format
= JOURNAL_FORMAT_LEGACY
;
176 void dump(Formatter
*f
) const {
177 f
->open_object_section("journal_header");
179 f
->dump_string("magic", magic
);
180 f
->dump_unsigned("write_pos", write_pos
);
181 f
->dump_unsigned("expire_pos", expire_pos
);
182 f
->dump_unsigned("trimmed_pos", trimmed_pos
);
183 f
->dump_unsigned("stream_format", stream_format
);
184 f
->dump_object("layout", layout
);
186 f
->close_section(); // journal_header
189 static void generate_test_instances(std::list
<Header
*> &ls
)
191 ls
.push_back(new Header());
193 ls
.push_back(new Header());
194 ls
.back()->trimmed_pos
= 1;
195 ls
.back()->expire_pos
= 2;
196 ls
.back()->unused_field
= 3;
197 ls
.back()->write_pos
= 4;
198 ls
.back()->magic
= "magique";
200 ls
.push_back(new Header());
201 ls
.back()->stream_format
= JOURNAL_FORMAT_RESILIENT
;
204 WRITE_CLASS_ENCODER(Header
)
206 uint32_t get_stream_format() const {
207 return stream_format
;
210 Header last_committed
;
216 const std::string name
;
217 typedef std::lock_guard
<std::mutex
> lock_guard
;
218 typedef std::unique_lock
<std::mutex
> unique_lock
;
224 file_layout_t layout
;
225 uint32_t stream_format
;
226 JournalStream journal_stream
;
232 PerfCounters
*logger
;
236 C_DelayFlush
*delay_flush_event
;
238 * Do a flush as a result of a C_DelayFlush context.
240 void _do_delayed_flush()
242 ceph_assert(delay_flush_event
!= NULL
);
244 delay_flush_event
= NULL
;
249 static const int STATE_UNDEF
= 0;
250 static const int STATE_READHEAD
= 1;
251 static const int STATE_PROBING
= 2;
252 static const int STATE_ACTIVE
= 3;
253 static const int STATE_REREADHEAD
= 4;
254 static const int STATE_REPROBING
= 5;
255 static const int STATE_STOPPING
= 6;
260 void _write_head(Context
*oncommit
=NULL
);
261 void _wait_for_flush(Context
*onsafe
);
265 ceph::real_time last_wrote_head
;
266 void _finish_write_head(int r
, Header
&wrote
, C_OnFinisher
*oncommit
);
268 friend class C_WriteHead
;
270 void _reread_head(Context
*onfinish
);
271 void _set_layout(file_layout_t
const *l
);
272 std::list
<Context
*> waitfor_recover
;
273 void _read_head(Context
*on_finish
, bufferlist
*bl
);
274 void _finish_read_head(int r
, bufferlist
& bl
);
275 void _finish_reread_head(int r
, bufferlist
& bl
, Context
*finish
);
276 void _probe(Context
*finish
, uint64_t *end
);
277 void _finish_probe_end(int r
, uint64_t end
);
278 void _reprobe(C_OnFinisher
*onfinish
);
279 void _finish_reprobe(int r
, uint64_t end
, C_OnFinisher
*onfinish
);
280 void _finish_reread_head_and_probe(int r
, C_OnFinisher
*onfinish
);
282 friend class C_ReadHead
;
284 friend class C_ProbeEnd
;
286 friend class C_RereadHead
;
288 friend class C_ReProbe
;
289 class C_RereadHeadProbe
;
290 friend class C_RereadHeadProbe
;
293 uint64_t prezeroing_pos
;
294 uint64_t prezero_pos
; ///< we zero journal space ahead of write_pos to
295 // avoid problems with tail probing
296 uint64_t write_pos
; ///< logical write position, where next entry
298 uint64_t flush_pos
; ///< where we will flush. if
299 /// write_pos>flush_pos, we're buffering writes.
300 uint64_t safe_pos
; ///< what has been committed safely to disk.
302 uint64_t next_safe_pos
; /// start position of the first entry that isn't
303 /// being fully flushed. If we don't flush any
304 // partial entry, it's equal to flush_pos.
306 bufferlist write_buf
; ///< write buffer. flush_pos +
307 /// write_buf.length() == write_pos.
309 // protect write_buf from bufferlist _len overflow
310 Throttle write_buf_throttle
;
312 uint64_t waiting_for_zero_pos
;
313 interval_set
<uint64_t> pending_zero
; // non-contig bits we've zeroed
314 std::list
<Context
*> waitfor_prezero
;
316 std::map
<uint64_t, uint64_t> pending_safe
; // flush_pos -> safe_pos
317 // when safe through given offset
318 std::map
<uint64_t, std::list
<Context
*> > waitfor_safe
;
320 void _flush(C_OnFinisher
*onsafe
);
321 void _do_flush(unsigned amount
=0);
322 void _finish_flush(int r
, uint64_t start
, ceph::real_time stamp
);
324 friend class C_Flush
;
327 uint64_t read_pos
; // logical read position, where next entry starts.
328 uint64_t requested_pos
; // what we've requested from OSD.
329 uint64_t received_pos
; // what we've received from OSD.
330 // read buffer. unused_field + read_buf.length() == prefetch_pos.
333 std::map
<uint64_t,bufferlist
> prefetch_buf
;
335 uint64_t fetch_len
; // how much to read at a time
336 uint64_t temp_fetch_len
;
338 // for wait_for_readable()
339 C_OnFinisher
*on_readable
;
340 C_OnFinisher
*on_write_error
;
341 bool called_write_error
;
343 // read completion callback
344 void _finish_read(int r
, uint64_t offset
, uint64_t length
, bufferlist
&bl
);
345 void _finish_retry_read(int r
);
346 void _assimilate_prefetch();
347 void _issue_read(uint64_t len
); // read some more
348 void _prefetch(); // maybe read ahead
352 friend class C_RetryRead
;
355 uint64_t expire_pos
; // what we're allowed to trim to
356 uint64_t trimming_pos
; // what we've requested to trim through
357 uint64_t trimmed_pos
; // what has been trimmed
361 void _finish_trim(int r
, uint64_t to
);
365 void _issue_prezero();
366 void _finish_prezero(int r
, uint64_t from
, uint64_t len
);
367 friend struct C_Journaler_Prezero
;
369 // only init_headers when following or first reading off-disk
370 void init_headers(Header
& h
) {
371 ceph_assert(readonly
||
372 state
== STATE_READHEAD
||
373 state
== STATE_REREADHEAD
);
374 last_written
= last_committed
= h
;
378 * handle a write error
380 * called when we get an objecter error on a write.
382 * @param r error code
384 void handle_write_error(int r
);
388 void _finish_erase(int data_result
, C_OnFinisher
*completion
);
390 friend class C_EraseFinish
;
392 C_OnFinisher
*wrap_finisher(Context
*c
);
394 uint32_t write_iohint
; // the fadvise flags for write op, see
395 // CEPH_OSD_OP_FADIVSE_*
398 Journaler(const std::string
&name_
, inodeno_t ino_
, int64_t pool
,
399 const char *mag
, Objecter
*obj
, PerfCounters
*l
, int lkey
, Finisher
*f
) :
401 cct(obj
->cct
), name(name_
), finisher(f
), last_written(mag
),
402 ino(ino_
), pg_pool(pool
), readonly(true),
403 stream_format(-1), journal_stream(-1),
405 objecter(obj
), filer(objecter
, f
), logger(l
), logger_key_lat(lkey
),
406 delay_flush_event(0),
407 state(STATE_UNDEF
), error(0),
408 prezeroing_pos(0), prezero_pos(0), write_pos(0), flush_pos(0),
409 safe_pos(0), next_safe_pos(0),
410 write_buf_throttle(cct
, "write_buf_throttle", UINT_MAX
- (UINT_MAX
>> 3)),
411 waiting_for_zero_pos(0),
412 read_pos(0), requested_pos(0), received_pos(0),
413 fetch_len(0), temp_fetch_len(0),
414 on_readable(0), on_write_error(NULL
), called_write_error(false),
415 expire_pos(0), trimming_pos(0), trimmed_pos(0), readable(false),
422 * NOTE: we assume the caller knows/has ensured that any objects in
423 * our sequence do not exist.. e.g. after a MKFS. this is _not_ an
428 ceph_assert(state
== STATE_ACTIVE
);
431 delay_flush_event
= NULL
;
444 ceph_assert(!on_readable
);
448 waiting_for_zero_pos
= 0;
451 // Asynchronous operations
452 // =======================
453 void erase(Context
*completion
);
454 void create(file_layout_t
*layout
, stream_format_t
const sf
);
455 void recover(Context
*onfinish
);
456 void reread_head(Context
*onfinish
);
457 void reread_head_and_probe(Context
*onfinish
);
458 void write_head(Context
*onsave
=0);
459 void wait_for_flush(Context
*onsafe
= 0);
460 void flush(Context
*onsafe
= 0);
461 void wait_for_readable(Context
*onfinish
);
462 bool have_waiter() const;
463 void wait_for_prezero(Context
*onfinish
);
465 // Synchronous setters
466 // ===================
467 void set_layout(file_layout_t
const *l
);
469 void set_writeable();
470 void set_write_pos(uint64_t p
) {
472 prezeroing_pos
= prezero_pos
= write_pos
= flush_pos
= safe_pos
= next_safe_pos
= p
;
474 void set_read_pos(uint64_t p
) {
476 // we can't cope w/ in-progress read right now.
477 ceph_assert(requested_pos
== received_pos
);
478 read_pos
= requested_pos
= received_pos
= p
;
481 uint64_t append_entry(bufferlist
& bl
);
482 void set_expire_pos(uint64_t ep
) {
486 void set_trimmed_pos(uint64_t p
) {
488 trimming_pos
= trimmed_pos
= p
;
491 bool _write_head_needed();
492 bool write_head_needed() {
494 return _write_head_needed();
502 ceph_assert(!readonly
);
506 void set_write_error_handler(Context
*c
);
508 void set_write_iohint(uint32_t iohint_flags
) {
509 write_iohint
= iohint_flags
;
512 * Cause any ongoing waits to error out with -EAGAIN, set error
518 // Synchronous getters
519 // ===================
520 // TODO: need some locks on reads for true safety
521 uint64_t get_layout_period() const {
522 return layout
.get_period();
524 file_layout_t
& get_layout() { return layout
; }
525 bool is_active() { return state
== STATE_ACTIVE
; }
526 bool is_stopping() { return state
== STATE_STOPPING
; }
527 int get_error() { return error
; }
528 bool is_readonly() { return readonly
; }
530 bool try_read_entry(bufferlist
& bl
);
531 uint64_t get_write_pos() const { return write_pos
; }
532 uint64_t get_write_safe_pos() const { return safe_pos
; }
533 uint64_t get_read_pos() const { return read_pos
; }
534 uint64_t get_expire_pos() const { return expire_pos
; }
535 uint64_t get_trimmed_pos() const { return trimmed_pos
; }
536 size_t get_journal_envelope_size() const {
537 return journal_stream
.get_envelope_size();
540 WRITE_CLASS_ENCODER(Journaler::Header
)