]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/Journaler.h
update sources to 12.2.7
[ceph.git] / ceph / src / osdc / Journaler.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15/* Journaler
16 *
17 * This class stripes a serial log over objects on the store. Four
18 * logical pointers:
19 *
20 * write_pos - where we're writing new entries
21 * unused_field - where we're reading old entires
22 * expire_pos - what is deemed "old" by user
23 * trimmed_pos - where we're expiring old items
24 *
25 * trimmed_pos <= expire_pos <= unused_field <= write_pos.
26 *
27 * Often, unused_field <= write_pos (as with MDS log). During
28 * recovery, write_pos is undefined until the end of the log is
29 * discovered.
30 *
31 * A "head" struct at the beginning of the log is used to store
32 * metadata at regular intervals. The basic invariants include:
33 *
34 * head.unused_field <= unused_field -- the head may "lag", since
35 * it's updated lazily.
36 * head.write_pos <= write_pos
37 * head.expire_pos <= expire_pos
38 * head.trimmed_pos <= trimmed_pos
39 *
40 * More significantly,
41 *
42 * head.expire_pos >= trimmed_pos -- this ensures we can find the
43 * "beginning" of the log as last
44 * recorded, before it is trimmed.
45 * trimming will block until a
46 * sufficiently current expire_pos
47 * is committed.
48 *
49 * To recover log state, we simply start at the last write_pos in the
50 * head, and probe the object sequence sizes until we read the end.
51 *
52 * Head struct is stored in the first object. Actual journal starts
53 * after layout.period() bytes.
54 *
55 */
56
57#ifndef CEPH_JOURNALER_H
58#define CEPH_JOURNALER_H
59
60#include <list>
61#include <map>
62
63#include "Objecter.h"
64#include "Filer.h"
65
66#include "common/Timer.h"
67#include "common/Throttle.h"
68
69class CephContext;
70class Context;
71class PerfCounters;
72class Finisher;
73class C_OnFinisher;
74
75typedef __u8 stream_format_t;
76
77// Legacy envelope is leading uint32_t size
78enum StreamFormat {
79 JOURNAL_FORMAT_LEGACY = 0,
80 JOURNAL_FORMAT_RESILIENT = 1,
81 // Insert new formats here, before COUNT
82 JOURNAL_FORMAT_COUNT
83};
84
85// Highest journal format version that we support
86#define JOURNAL_FORMAT_MAX (JOURNAL_FORMAT_COUNT - 1)
87
88// Legacy envelope is leading uint32_t size
89#define JOURNAL_ENVELOPE_LEGACY (sizeof(uint32_t))
90
91// Resilient envelope is leading uint64_t sentinel, uint32_t size,
92// trailing uint64_t start_ptr
93#define JOURNAL_ENVELOPE_RESILIENT (sizeof(uint32_t) + sizeof(uint64_t) + \
94 sizeof(uint64_t))
95
96/**
97 * Represents a collection of entries serialized in a byte stream.
98 *
99 * Each entry consists of:
100 * - a blob (used by the next level up as a serialized LogEvent)
101 * - a uint64_t (used by the next level up as a pointer to the start
102 * of the entry in the collection bytestream)
103 */
104class JournalStream
105{
106 stream_format_t format;
107
108 public:
109 JournalStream(stream_format_t format_) : format(format_) {}
110
111 void set_format(stream_format_t format_) {format = format_;}
112
113 bool readable(bufferlist &bl, uint64_t *need) const;
114 size_t read(bufferlist &from, bufferlist *to, uint64_t *start_ptr);
115 size_t write(bufferlist &entry, bufferlist *to, uint64_t const &start_ptr);
116 size_t get_envelope_size() const {
117 if (format >= JOURNAL_FORMAT_RESILIENT) {
118 return JOURNAL_ENVELOPE_RESILIENT;
119 } else {
120 return JOURNAL_ENVELOPE_LEGACY;
121 }
122 }
123
124 // A magic number for the start of journal entries, so that we can
125 // identify them in damaged journals.
126 static const uint64_t sentinel = 0x3141592653589793;
127};
128
129
130class Journaler {
131public:
132 // this goes at the head of the log "file".
133 class Header {
134 public:
135 uint64_t trimmed_pos;
136 uint64_t expire_pos;
137 uint64_t unused_field;
138 uint64_t write_pos;
139 string magic;
140 file_layout_t layout; //< The mapping from byte stream offsets
141 // to RADOS objects
142 stream_format_t stream_format; //< The encoding of LogEvents
143 // within the journal byte stream
144
145 Header(const char *m="") :
146 trimmed_pos(0), expire_pos(0), unused_field(0), write_pos(0), magic(m),
147 stream_format(-1) {
148 }
149
150 void encode(bufferlist &bl) const {
151 ENCODE_START(2, 2, bl);
152 ::encode(magic, bl);
153 ::encode(trimmed_pos, bl);
154 ::encode(expire_pos, bl);
155 ::encode(unused_field, bl);
156 ::encode(write_pos, bl);
157 ::encode(layout, bl, 0); // encode in legacy format
158 ::encode(stream_format, bl);
159 ENCODE_FINISH(bl);
160 }
161 void decode(bufferlist::iterator &bl) {
162 DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
163 ::decode(magic, bl);
164 ::decode(trimmed_pos, bl);
165 ::decode(expire_pos, bl);
166 ::decode(unused_field, bl);
167 ::decode(write_pos, bl);
168 ::decode(layout, bl);
169 if (struct_v > 1) {
170 ::decode(stream_format, bl);
171 } else {
172 stream_format = JOURNAL_FORMAT_LEGACY;
173 }
174 DECODE_FINISH(bl);
175 }
176
177 void dump(Formatter *f) const {
178 f->open_object_section("journal_header");
179 {
180 f->dump_string("magic", magic);
181 f->dump_unsigned("write_pos", write_pos);
182 f->dump_unsigned("expire_pos", expire_pos);
183 f->dump_unsigned("trimmed_pos", trimmed_pos);
184 f->dump_unsigned("stream_format", stream_format);
185 f->dump_object("layout", layout);
186 }
187 f->close_section(); // journal_header
188 }
189
190 static void generate_test_instances(list<Header*> &ls)
191 {
192 ls.push_back(new Header());
193
194 ls.push_back(new Header());
195 ls.back()->trimmed_pos = 1;
196 ls.back()->expire_pos = 2;
197 ls.back()->unused_field = 3;
198 ls.back()->write_pos = 4;
199 ls.back()->magic = "magique";
200
201 ls.push_back(new Header());
202 ls.back()->stream_format = JOURNAL_FORMAT_RESILIENT;
203 }
204 };
205 WRITE_CLASS_ENCODER(Header)
206
207 uint32_t get_stream_format() const {
208 return stream_format;
209 }
210
211 Header last_committed;
212
213private:
214 // me
215 CephContext *cct;
216 std::mutex lock;
217 const std::string name;
218 typedef std::lock_guard<std::mutex> lock_guard;
219 typedef std::unique_lock<std::mutex> unique_lock;
220 Finisher *finisher;
221 Header last_written;
222 inodeno_t ino;
223 int64_t pg_pool;
224 bool readonly;
225 file_layout_t layout;
226 uint32_t stream_format;
227 JournalStream journal_stream;
228
229 const char *magic;
230 Objecter *objecter;
231 Filer filer;
232
233 PerfCounters *logger;
234 int logger_key_lat;
235
236 class C_DelayFlush;
237 C_DelayFlush *delay_flush_event;
238 /*
239 * Do a flush as a result of a C_DelayFlush context.
240 */
241 void _do_delayed_flush()
242 {
243 assert(delay_flush_event != NULL);
244 lock_guard l(lock);
245 delay_flush_event = NULL;
246 _do_flush();
247 }
248
249 // my state
250 static const int STATE_UNDEF = 0;
251 static const int STATE_READHEAD = 1;
252 static const int STATE_PROBING = 2;
253 static const int STATE_ACTIVE = 3;
254 static const int STATE_REREADHEAD = 4;
255 static const int STATE_REPROBING = 5;
b32b8144 256 static const int STATE_STOPPING = 6;
7c673cae
FG
257
258 int state;
259 int error;
260
261 void _write_head(Context *oncommit=NULL);
262 void _wait_for_flush(Context *onsafe);
263 void _trim();
264
265 // header
266 ceph::real_time last_wrote_head;
267 void _finish_write_head(int r, Header &wrote, C_OnFinisher *oncommit);
268 class C_WriteHead;
269 friend class C_WriteHead;
270
271 void _reread_head(Context *onfinish);
272 void _set_layout(file_layout_t const *l);
273 list<Context*> waitfor_recover;
274 void _read_head(Context *on_finish, bufferlist *bl);
275 void _finish_read_head(int r, bufferlist& bl);
276 void _finish_reread_head(int r, bufferlist& bl, Context *finish);
277 void _probe(Context *finish, uint64_t *end);
278 void _finish_probe_end(int r, uint64_t end);
279 void _reprobe(C_OnFinisher *onfinish);
280 void _finish_reprobe(int r, uint64_t end, C_OnFinisher *onfinish);
281 void _finish_reread_head_and_probe(int r, C_OnFinisher *onfinish);
282 class C_ReadHead;
283 friend class C_ReadHead;
284 class C_ProbeEnd;
285 friend class C_ProbeEnd;
286 class C_RereadHead;
287 friend class C_RereadHead;
288 class C_ReProbe;
289 friend class C_ReProbe;
290 class C_RereadHeadProbe;
291 friend class C_RereadHeadProbe;
292
293 // writer
294 uint64_t prezeroing_pos;
295 uint64_t prezero_pos; ///< we zero journal space ahead of write_pos to
296 // avoid problems with tail probing
297 uint64_t write_pos; ///< logical write position, where next entry
298 // will go
299 uint64_t flush_pos; ///< where we will flush. if
300 /// write_pos>flush_pos, we're buffering writes.
301 uint64_t safe_pos; ///< what has been committed safely to disk.
302
303 uint64_t next_safe_pos; /// start postion of the first entry that isn't
304 /// being fully flushed. If we don't flush any
305 // partial entry, it's equal to flush_pos.
306
307 bufferlist write_buf; ///< write buffer. flush_pos +
308 /// write_buf.length() == write_pos.
309
310 // protect write_buf from bufferlist _len overflow
311 Throttle write_buf_throttle;
312
94b18763 313 uint64_t waiting_for_zero_pos;
7c673cae 314 interval_set<uint64_t> pending_zero; // non-contig bits we've zeroed
28e407b8
AA
315 list<Context*> waitfor_prezero;
316
7c673cae
FG
317 std::map<uint64_t, uint64_t> pending_safe; // flush_pos -> safe_pos
318 // when safe through given offset
319 std::map<uint64_t, std::list<Context*> > waitfor_safe;
320
321 void _flush(C_OnFinisher *onsafe);
322 void _do_flush(unsigned amount=0);
323 void _finish_flush(int r, uint64_t start, ceph::real_time stamp);
324 class C_Flush;
325 friend class C_Flush;
326
327 // reader
328 uint64_t read_pos; // logical read position, where next entry starts.
329 uint64_t requested_pos; // what we've requested from OSD.
330 uint64_t received_pos; // what we've received from OSD.
331 // read buffer. unused_field + read_buf.length() == prefetch_pos.
332 bufferlist read_buf;
333
334 map<uint64_t,bufferlist> prefetch_buf;
335
336 uint64_t fetch_len; // how much to read at a time
337 uint64_t temp_fetch_len;
338
339 // for wait_for_readable()
340 C_OnFinisher *on_readable;
341 C_OnFinisher *on_write_error;
342 bool called_write_error;
343
344 // read completion callback
345 void _finish_read(int r, uint64_t offset, uint64_t length, bufferlist &bl);
346 void _finish_retry_read(int r);
347 void _assimilate_prefetch();
348 void _issue_read(uint64_t len); // read some more
349 void _prefetch(); // maybe read ahead
350 class C_Read;
351 friend class C_Read;
352 class C_RetryRead;
353 friend class C_RetryRead;
354
355 // trimmer
356 uint64_t expire_pos; // what we're allowed to trim to
357 uint64_t trimming_pos; // what we've requested to trim through
358 uint64_t trimmed_pos; // what has been trimmed
359
360 bool readable;
361
362 void _finish_trim(int r, uint64_t to);
363 class C_Trim;
364 friend class C_Trim;
365
366 void _issue_prezero();
367 void _finish_prezero(int r, uint64_t from, uint64_t len);
368 friend struct C_Journaler_Prezero;
369
370 // only init_headers when following or first reading off-disk
371 void init_headers(Header& h) {
372 assert(readonly ||
373 state == STATE_READHEAD ||
374 state == STATE_REREADHEAD);
375 last_written = last_committed = h;
376 }
377
378 /**
379 * handle a write error
380 *
381 * called when we get an objecter error on a write.
382 *
383 * @param r error code
384 */
385 void handle_write_error(int r);
386
387 bool _is_readable();
388
389 void _finish_erase(int data_result, C_OnFinisher *completion);
390 class C_EraseFinish;
391 friend class C_EraseFinish;
392
393 C_OnFinisher *wrap_finisher(Context *c);
394
395 uint32_t write_iohint; // the fadvise flags for write op, see
396 // CEPH_OSD_OP_FADIVSE_*
397
398public:
399 Journaler(const std::string &name_, inodeno_t ino_, int64_t pool,
400 const char *mag, Objecter *obj, PerfCounters *l, int lkey, Finisher *f) :
401 last_committed(mag),
402 cct(obj->cct), name(name_), finisher(f), last_written(mag),
403 ino(ino_), pg_pool(pool), readonly(true),
404 stream_format(-1), journal_stream(-1),
405 magic(mag),
406 objecter(obj), filer(objecter, f), logger(l), logger_key_lat(lkey),
407 delay_flush_event(0),
408 state(STATE_UNDEF), error(0),
409 prezeroing_pos(0), prezero_pos(0), write_pos(0), flush_pos(0),
410 safe_pos(0), next_safe_pos(0),
411 write_buf_throttle(cct, "write_buf_throttle", UINT_MAX - (UINT_MAX >> 3)),
94b18763 412 waiting_for_zero_pos(0),
7c673cae
FG
413 read_pos(0), requested_pos(0), received_pos(0),
414 fetch_len(0), temp_fetch_len(0),
415 on_readable(0), on_write_error(NULL), called_write_error(false),
416 expire_pos(0), trimming_pos(0), trimmed_pos(0), readable(false),
b32b8144 417 write_iohint(0)
7c673cae
FG
418 {
419 }
420
421 /* reset
422 *
423 * NOTE: we assume the caller knows/has ensured that any objects in
424 * our sequence do not exist.. e.g. after a MKFS. this is _not_ an
425 * "erase" method.
426 */
427 void reset() {
428 lock_guard l(lock);
429 assert(state == STATE_ACTIVE);
430
431 readonly = true;
432 delay_flush_event = NULL;
433 state = STATE_UNDEF;
434 error = 0;
435 prezeroing_pos = 0;
436 prezero_pos = 0;
437 write_pos = 0;
438 flush_pos = 0;
439 safe_pos = 0;
440 next_safe_pos = 0;
441 read_pos = 0;
442 requested_pos = 0;
443 received_pos = 0;
444 fetch_len = 0;
445 assert(!on_readable);
446 expire_pos = 0;
447 trimming_pos = 0;
448 trimmed_pos = 0;
94b18763 449 waiting_for_zero_pos = 0;
7c673cae
FG
450 }
451
452 // Asynchronous operations
453 // =======================
454 void erase(Context *completion);
455 void create(file_layout_t *layout, stream_format_t const sf);
456 void recover(Context *onfinish);
457 void reread_head(Context *onfinish);
458 void reread_head_and_probe(Context *onfinish);
459 void write_head(Context *onsave=0);
460 void wait_for_flush(Context *onsafe = 0);
461 void flush(Context *onsafe = 0);
462 void wait_for_readable(Context *onfinish);
463 bool have_waiter() const;
28e407b8 464 void wait_for_prezero(Context *onfinish);
7c673cae
FG
465
466 // Synchronous setters
467 // ===================
468 void set_layout(file_layout_t const *l);
469 void set_readonly();
470 void set_writeable();
471 void set_write_pos(uint64_t p) {
472 lock_guard l(lock);
473 prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = p;
474 }
475 void set_read_pos(uint64_t p) {
476 lock_guard l(lock);
477 // we can't cope w/ in-progress read right now.
478 assert(requested_pos == received_pos);
479 read_pos = requested_pos = received_pos = p;
480 read_buf.clear();
481 }
482 uint64_t append_entry(bufferlist& bl);
483 void set_expire_pos(uint64_t ep) {
484 lock_guard l(lock);
485 expire_pos = ep;
486 }
487 void set_trimmed_pos(uint64_t p) {
488 lock_guard l(lock);
489 trimming_pos = trimmed_pos = p;
490 }
491
492 bool _write_head_needed();
493 bool write_head_needed() {
494 lock_guard l(lock);
495 return _write_head_needed();
496 }
497
498
499 void trim();
500 void trim_tail() {
501 lock_guard l(lock);
502
503 assert(!readonly);
504 _issue_prezero();
505 }
506
507 void set_write_error_handler(Context *c);
508
509 void set_write_iohint(uint32_t iohint_flags) {
510 write_iohint = iohint_flags;
511 }
512 /**
513 * Cause any ongoing waits to error out with -EAGAIN, set error
514 * to -EAGAIN.
515 */
516 void shutdown();
7c673cae
FG
517public:
518
519 // Synchronous getters
520 // ===================
521 // TODO: need some locks on reads for true safety
522 uint64_t get_layout_period() const {
523 return layout.get_period();
524 }
525 file_layout_t& get_layout() { return layout; }
526 bool is_active() { return state == STATE_ACTIVE; }
b32b8144 527 bool is_stopping() { return state == STATE_STOPPING; }
7c673cae
FG
528 int get_error() { return error; }
529 bool is_readonly() { return readonly; }
530 bool is_readable();
531 bool try_read_entry(bufferlist& bl);
532 uint64_t get_write_pos() const { return write_pos; }
533 uint64_t get_write_safe_pos() const { return safe_pos; }
534 uint64_t get_read_pos() const { return read_pos; }
535 uint64_t get_expire_pos() const { return expire_pos; }
536 uint64_t get_trimmed_pos() const { return trimmed_pos; }
537};
538WRITE_CLASS_ENCODER(Journaler::Header)
539
540#endif