]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | /* Journaler | |
16 | * | |
17 | * This class stripes a serial log over objects on the store. Four | |
18 | * logical pointers: | |
19 | * | |
20 | * write_pos - where we're writing new entries | |
21 | * unused_field - where we're reading old entires | |
22 | * expire_pos - what is deemed "old" by user | |
23 | * trimmed_pos - where we're expiring old items | |
24 | * | |
25 | * trimmed_pos <= expire_pos <= unused_field <= write_pos. | |
26 | * | |
27 | * Often, unused_field <= write_pos (as with MDS log). During | |
28 | * recovery, write_pos is undefined until the end of the log is | |
29 | * discovered. | |
30 | * | |
31 | * A "head" struct at the beginning of the log is used to store | |
32 | * metadata at regular intervals. The basic invariants include: | |
33 | * | |
34 | * head.unused_field <= unused_field -- the head may "lag", since | |
35 | * it's updated lazily. | |
36 | * head.write_pos <= write_pos | |
37 | * head.expire_pos <= expire_pos | |
38 | * head.trimmed_pos <= trimmed_pos | |
39 | * | |
40 | * More significantly, | |
41 | * | |
42 | * head.expire_pos >= trimmed_pos -- this ensures we can find the | |
43 | * "beginning" of the log as last | |
44 | * recorded, before it is trimmed. | |
45 | * trimming will block until a | |
46 | * sufficiently current expire_pos | |
47 | * is committed. | |
48 | * | |
49 | * To recover log state, we simply start at the last write_pos in the | |
50 | * head, and probe the object sequence sizes until we read the end. | |
51 | * | |
52 | * Head struct is stored in the first object. Actual journal starts | |
53 | * after layout.period() bytes. | |
54 | * | |
55 | */ | |
56 | ||
57 | #ifndef CEPH_JOURNALER_H | |
58 | #define CEPH_JOURNALER_H | |
59 | ||
60 | #include <list> | |
61 | #include <map> | |
62 | ||
63 | #include "Objecter.h" | |
64 | #include "Filer.h" | |
65 | ||
66 | #include "common/Timer.h" | |
67 | #include "common/Throttle.h" | |
9f95a23c | 68 | #include "include/common_fwd.h" |
7c673cae | 69 | |
7c673cae | 70 | class Context; |
7c673cae FG |
71 | class Finisher; |
72 | class C_OnFinisher; | |
73 | ||
74 | typedef __u8 stream_format_t; | |
75 | ||
76 | // Legacy envelope is leading uint32_t size | |
77 | enum StreamFormat { | |
78 | JOURNAL_FORMAT_LEGACY = 0, | |
79 | JOURNAL_FORMAT_RESILIENT = 1, | |
80 | // Insert new formats here, before COUNT | |
81 | JOURNAL_FORMAT_COUNT | |
82 | }; | |
83 | ||
84 | // Highest journal format version that we support | |
85 | #define JOURNAL_FORMAT_MAX (JOURNAL_FORMAT_COUNT - 1) | |
86 | ||
87 | // Legacy envelope is leading uint32_t size | |
88 | #define JOURNAL_ENVELOPE_LEGACY (sizeof(uint32_t)) | |
89 | ||
90 | // Resilient envelope is leading uint64_t sentinel, uint32_t size, | |
91 | // trailing uint64_t start_ptr | |
92 | #define JOURNAL_ENVELOPE_RESILIENT (sizeof(uint32_t) + sizeof(uint64_t) + \ | |
93 | sizeof(uint64_t)) | |
94 | ||
95 | /** | |
96 | * Represents a collection of entries serialized in a byte stream. | |
97 | * | |
98 | * Each entry consists of: | |
99 | * - a blob (used by the next level up as a serialized LogEvent) | |
100 | * - a uint64_t (used by the next level up as a pointer to the start | |
101 | * of the entry in the collection bytestream) | |
102 | */ | |
103 | class JournalStream | |
104 | { | |
105 | stream_format_t format; | |
106 | ||
107 | public: | |
108 | JournalStream(stream_format_t format_) : format(format_) {} | |
109 | ||
110 | void set_format(stream_format_t format_) {format = format_;} | |
111 | ||
112 | bool readable(bufferlist &bl, uint64_t *need) const; | |
113 | size_t read(bufferlist &from, bufferlist *to, uint64_t *start_ptr); | |
114 | size_t write(bufferlist &entry, bufferlist *to, uint64_t const &start_ptr); | |
115 | size_t get_envelope_size() const { | |
116 | if (format >= JOURNAL_FORMAT_RESILIENT) { | |
117 | return JOURNAL_ENVELOPE_RESILIENT; | |
118 | } else { | |
119 | return JOURNAL_ENVELOPE_LEGACY; | |
120 | } | |
121 | } | |
122 | ||
123 | // A magic number for the start of journal entries, so that we can | |
124 | // identify them in damaged journals. | |
125 | static const uint64_t sentinel = 0x3141592653589793; | |
126 | }; | |
127 | ||
128 | ||
129 | class Journaler { | |
130 | public: | |
131 | // this goes at the head of the log "file". | |
132 | class Header { | |
133 | public: | |
134 | uint64_t trimmed_pos; | |
135 | uint64_t expire_pos; | |
136 | uint64_t unused_field; | |
137 | uint64_t write_pos; | |
20effc67 | 138 | std::string magic; |
7c673cae FG |
139 | file_layout_t layout; //< The mapping from byte stream offsets |
140 | // to RADOS objects | |
141 | stream_format_t stream_format; //< The encoding of LogEvents | |
142 | // within the journal byte stream | |
143 | ||
144 | Header(const char *m="") : | |
145 | trimmed_pos(0), expire_pos(0), unused_field(0), write_pos(0), magic(m), | |
146 | stream_format(-1) { | |
147 | } | |
148 | ||
149 | void encode(bufferlist &bl) const { | |
150 | ENCODE_START(2, 2, bl); | |
11fdf7f2 TL |
151 | encode(magic, bl); |
152 | encode(trimmed_pos, bl); | |
153 | encode(expire_pos, bl); | |
154 | encode(unused_field, bl); | |
155 | encode(write_pos, bl); | |
156 | encode(layout, bl, 0); // encode in legacy format | |
157 | encode(stream_format, bl); | |
7c673cae FG |
158 | ENCODE_FINISH(bl); |
159 | } | |
11fdf7f2 | 160 | void decode(bufferlist::const_iterator &bl) { |
7c673cae | 161 | DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl); |
11fdf7f2 TL |
162 | decode(magic, bl); |
163 | decode(trimmed_pos, bl); | |
164 | decode(expire_pos, bl); | |
165 | decode(unused_field, bl); | |
166 | decode(write_pos, bl); | |
167 | decode(layout, bl); | |
7c673cae | 168 | if (struct_v > 1) { |
11fdf7f2 | 169 | decode(stream_format, bl); |
7c673cae FG |
170 | } else { |
171 | stream_format = JOURNAL_FORMAT_LEGACY; | |
172 | } | |
173 | DECODE_FINISH(bl); | |
174 | } | |
175 | ||
176 | void dump(Formatter *f) const { | |
177 | f->open_object_section("journal_header"); | |
178 | { | |
179 | f->dump_string("magic", magic); | |
180 | f->dump_unsigned("write_pos", write_pos); | |
181 | f->dump_unsigned("expire_pos", expire_pos); | |
182 | f->dump_unsigned("trimmed_pos", trimmed_pos); | |
183 | f->dump_unsigned("stream_format", stream_format); | |
184 | f->dump_object("layout", layout); | |
185 | } | |
186 | f->close_section(); // journal_header | |
187 | } | |
188 | ||
20effc67 | 189 | static void generate_test_instances(std::list<Header*> &ls) |
7c673cae FG |
190 | { |
191 | ls.push_back(new Header()); | |
192 | ||
193 | ls.push_back(new Header()); | |
194 | ls.back()->trimmed_pos = 1; | |
195 | ls.back()->expire_pos = 2; | |
196 | ls.back()->unused_field = 3; | |
197 | ls.back()->write_pos = 4; | |
198 | ls.back()->magic = "magique"; | |
199 | ||
200 | ls.push_back(new Header()); | |
201 | ls.back()->stream_format = JOURNAL_FORMAT_RESILIENT; | |
202 | } | |
203 | }; | |
204 | WRITE_CLASS_ENCODER(Header) | |
205 | ||
206 | uint32_t get_stream_format() const { | |
207 | return stream_format; | |
208 | } | |
209 | ||
210 | Header last_committed; | |
211 | ||
212 | private: | |
213 | // me | |
214 | CephContext *cct; | |
215 | std::mutex lock; | |
216 | const std::string name; | |
217 | typedef std::lock_guard<std::mutex> lock_guard; | |
218 | typedef std::unique_lock<std::mutex> unique_lock; | |
219 | Finisher *finisher; | |
220 | Header last_written; | |
221 | inodeno_t ino; | |
222 | int64_t pg_pool; | |
223 | bool readonly; | |
224 | file_layout_t layout; | |
225 | uint32_t stream_format; | |
226 | JournalStream journal_stream; | |
227 | ||
228 | const char *magic; | |
229 | Objecter *objecter; | |
230 | Filer filer; | |
231 | ||
232 | PerfCounters *logger; | |
233 | int logger_key_lat; | |
234 | ||
235 | class C_DelayFlush; | |
236 | C_DelayFlush *delay_flush_event; | |
237 | /* | |
238 | * Do a flush as a result of a C_DelayFlush context. | |
239 | */ | |
240 | void _do_delayed_flush() | |
241 | { | |
11fdf7f2 | 242 | ceph_assert(delay_flush_event != NULL); |
7c673cae FG |
243 | lock_guard l(lock); |
244 | delay_flush_event = NULL; | |
245 | _do_flush(); | |
246 | } | |
247 | ||
248 | // my state | |
249 | static const int STATE_UNDEF = 0; | |
250 | static const int STATE_READHEAD = 1; | |
251 | static const int STATE_PROBING = 2; | |
252 | static const int STATE_ACTIVE = 3; | |
253 | static const int STATE_REREADHEAD = 4; | |
254 | static const int STATE_REPROBING = 5; | |
b32b8144 | 255 | static const int STATE_STOPPING = 6; |
7c673cae FG |
256 | |
257 | int state; | |
258 | int error; | |
259 | ||
260 | void _write_head(Context *oncommit=NULL); | |
261 | void _wait_for_flush(Context *onsafe); | |
262 | void _trim(); | |
263 | ||
264 | // header | |
265 | ceph::real_time last_wrote_head; | |
266 | void _finish_write_head(int r, Header &wrote, C_OnFinisher *oncommit); | |
267 | class C_WriteHead; | |
268 | friend class C_WriteHead; | |
269 | ||
270 | void _reread_head(Context *onfinish); | |
271 | void _set_layout(file_layout_t const *l); | |
20effc67 | 272 | std::list<Context*> waitfor_recover; |
7c673cae FG |
273 | void _read_head(Context *on_finish, bufferlist *bl); |
274 | void _finish_read_head(int r, bufferlist& bl); | |
275 | void _finish_reread_head(int r, bufferlist& bl, Context *finish); | |
276 | void _probe(Context *finish, uint64_t *end); | |
277 | void _finish_probe_end(int r, uint64_t end); | |
278 | void _reprobe(C_OnFinisher *onfinish); | |
279 | void _finish_reprobe(int r, uint64_t end, C_OnFinisher *onfinish); | |
280 | void _finish_reread_head_and_probe(int r, C_OnFinisher *onfinish); | |
281 | class C_ReadHead; | |
282 | friend class C_ReadHead; | |
283 | class C_ProbeEnd; | |
284 | friend class C_ProbeEnd; | |
285 | class C_RereadHead; | |
286 | friend class C_RereadHead; | |
287 | class C_ReProbe; | |
288 | friend class C_ReProbe; | |
289 | class C_RereadHeadProbe; | |
290 | friend class C_RereadHeadProbe; | |
291 | ||
292 | // writer | |
293 | uint64_t prezeroing_pos; | |
294 | uint64_t prezero_pos; ///< we zero journal space ahead of write_pos to | |
295 | // avoid problems with tail probing | |
296 | uint64_t write_pos; ///< logical write position, where next entry | |
297 | // will go | |
298 | uint64_t flush_pos; ///< where we will flush. if | |
299 | /// write_pos>flush_pos, we're buffering writes. | |
300 | uint64_t safe_pos; ///< what has been committed safely to disk. | |
301 | ||
11fdf7f2 | 302 | uint64_t next_safe_pos; /// start position of the first entry that isn't |
7c673cae FG |
303 | /// being fully flushed. If we don't flush any |
304 | // partial entry, it's equal to flush_pos. | |
305 | ||
306 | bufferlist write_buf; ///< write buffer. flush_pos + | |
307 | /// write_buf.length() == write_pos. | |
308 | ||
309 | // protect write_buf from bufferlist _len overflow | |
310 | Throttle write_buf_throttle; | |
311 | ||
94b18763 | 312 | uint64_t waiting_for_zero_pos; |
7c673cae | 313 | interval_set<uint64_t> pending_zero; // non-contig bits we've zeroed |
20effc67 | 314 | std::list<Context*> waitfor_prezero; |
28e407b8 | 315 | |
7c673cae FG |
316 | std::map<uint64_t, uint64_t> pending_safe; // flush_pos -> safe_pos |
317 | // when safe through given offset | |
318 | std::map<uint64_t, std::list<Context*> > waitfor_safe; | |
319 | ||
320 | void _flush(C_OnFinisher *onsafe); | |
321 | void _do_flush(unsigned amount=0); | |
322 | void _finish_flush(int r, uint64_t start, ceph::real_time stamp); | |
323 | class C_Flush; | |
324 | friend class C_Flush; | |
325 | ||
326 | // reader | |
327 | uint64_t read_pos; // logical read position, where next entry starts. | |
328 | uint64_t requested_pos; // what we've requested from OSD. | |
329 | uint64_t received_pos; // what we've received from OSD. | |
330 | // read buffer. unused_field + read_buf.length() == prefetch_pos. | |
331 | bufferlist read_buf; | |
332 | ||
20effc67 | 333 | std::map<uint64_t,bufferlist> prefetch_buf; |
7c673cae FG |
334 | |
335 | uint64_t fetch_len; // how much to read at a time | |
336 | uint64_t temp_fetch_len; | |
337 | ||
338 | // for wait_for_readable() | |
339 | C_OnFinisher *on_readable; | |
340 | C_OnFinisher *on_write_error; | |
341 | bool called_write_error; | |
342 | ||
343 | // read completion callback | |
344 | void _finish_read(int r, uint64_t offset, uint64_t length, bufferlist &bl); | |
345 | void _finish_retry_read(int r); | |
346 | void _assimilate_prefetch(); | |
347 | void _issue_read(uint64_t len); // read some more | |
348 | void _prefetch(); // maybe read ahead | |
349 | class C_Read; | |
350 | friend class C_Read; | |
351 | class C_RetryRead; | |
352 | friend class C_RetryRead; | |
353 | ||
354 | // trimmer | |
355 | uint64_t expire_pos; // what we're allowed to trim to | |
356 | uint64_t trimming_pos; // what we've requested to trim through | |
357 | uint64_t trimmed_pos; // what has been trimmed | |
358 | ||
359 | bool readable; | |
360 | ||
361 | void _finish_trim(int r, uint64_t to); | |
362 | class C_Trim; | |
363 | friend class C_Trim; | |
364 | ||
365 | void _issue_prezero(); | |
366 | void _finish_prezero(int r, uint64_t from, uint64_t len); | |
367 | friend struct C_Journaler_Prezero; | |
368 | ||
369 | // only init_headers when following or first reading off-disk | |
370 | void init_headers(Header& h) { | |
11fdf7f2 | 371 | ceph_assert(readonly || |
7c673cae FG |
372 | state == STATE_READHEAD || |
373 | state == STATE_REREADHEAD); | |
374 | last_written = last_committed = h; | |
375 | } | |
376 | ||
377 | /** | |
378 | * handle a write error | |
379 | * | |
380 | * called when we get an objecter error on a write. | |
381 | * | |
382 | * @param r error code | |
383 | */ | |
384 | void handle_write_error(int r); | |
385 | ||
386 | bool _is_readable(); | |
387 | ||
388 | void _finish_erase(int data_result, C_OnFinisher *completion); | |
389 | class C_EraseFinish; | |
390 | friend class C_EraseFinish; | |
391 | ||
392 | C_OnFinisher *wrap_finisher(Context *c); | |
393 | ||
394 | uint32_t write_iohint; // the fadvise flags for write op, see | |
395 | // CEPH_OSD_OP_FADIVSE_* | |
396 | ||
397 | public: | |
398 | Journaler(const std::string &name_, inodeno_t ino_, int64_t pool, | |
399 | const char *mag, Objecter *obj, PerfCounters *l, int lkey, Finisher *f) : | |
400 | last_committed(mag), | |
401 | cct(obj->cct), name(name_), finisher(f), last_written(mag), | |
402 | ino(ino_), pg_pool(pool), readonly(true), | |
403 | stream_format(-1), journal_stream(-1), | |
404 | magic(mag), | |
405 | objecter(obj), filer(objecter, f), logger(l), logger_key_lat(lkey), | |
406 | delay_flush_event(0), | |
407 | state(STATE_UNDEF), error(0), | |
408 | prezeroing_pos(0), prezero_pos(0), write_pos(0), flush_pos(0), | |
409 | safe_pos(0), next_safe_pos(0), | |
410 | write_buf_throttle(cct, "write_buf_throttle", UINT_MAX - (UINT_MAX >> 3)), | |
94b18763 | 411 | waiting_for_zero_pos(0), |
7c673cae FG |
412 | read_pos(0), requested_pos(0), received_pos(0), |
413 | fetch_len(0), temp_fetch_len(0), | |
414 | on_readable(0), on_write_error(NULL), called_write_error(false), | |
415 | expire_pos(0), trimming_pos(0), trimmed_pos(0), readable(false), | |
b32b8144 | 416 | write_iohint(0) |
7c673cae FG |
417 | { |
418 | } | |
419 | ||
420 | /* reset | |
421 | * | |
422 | * NOTE: we assume the caller knows/has ensured that any objects in | |
423 | * our sequence do not exist.. e.g. after a MKFS. this is _not_ an | |
424 | * "erase" method. | |
425 | */ | |
426 | void reset() { | |
427 | lock_guard l(lock); | |
11fdf7f2 | 428 | ceph_assert(state == STATE_ACTIVE); |
7c673cae FG |
429 | |
430 | readonly = true; | |
431 | delay_flush_event = NULL; | |
432 | state = STATE_UNDEF; | |
433 | error = 0; | |
434 | prezeroing_pos = 0; | |
435 | prezero_pos = 0; | |
436 | write_pos = 0; | |
437 | flush_pos = 0; | |
438 | safe_pos = 0; | |
439 | next_safe_pos = 0; | |
440 | read_pos = 0; | |
441 | requested_pos = 0; | |
442 | received_pos = 0; | |
443 | fetch_len = 0; | |
11fdf7f2 | 444 | ceph_assert(!on_readable); |
7c673cae FG |
445 | expire_pos = 0; |
446 | trimming_pos = 0; | |
447 | trimmed_pos = 0; | |
94b18763 | 448 | waiting_for_zero_pos = 0; |
7c673cae FG |
449 | } |
450 | ||
451 | // Asynchronous operations | |
452 | // ======================= | |
453 | void erase(Context *completion); | |
454 | void create(file_layout_t *layout, stream_format_t const sf); | |
455 | void recover(Context *onfinish); | |
456 | void reread_head(Context *onfinish); | |
457 | void reread_head_and_probe(Context *onfinish); | |
458 | void write_head(Context *onsave=0); | |
459 | void wait_for_flush(Context *onsafe = 0); | |
460 | void flush(Context *onsafe = 0); | |
461 | void wait_for_readable(Context *onfinish); | |
462 | bool have_waiter() const; | |
28e407b8 | 463 | void wait_for_prezero(Context *onfinish); |
7c673cae FG |
464 | |
465 | // Synchronous setters | |
466 | // =================== | |
467 | void set_layout(file_layout_t const *l); | |
468 | void set_readonly(); | |
469 | void set_writeable(); | |
470 | void set_write_pos(uint64_t p) { | |
471 | lock_guard l(lock); | |
472 | prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = p; | |
473 | } | |
474 | void set_read_pos(uint64_t p) { | |
475 | lock_guard l(lock); | |
476 | // we can't cope w/ in-progress read right now. | |
11fdf7f2 | 477 | ceph_assert(requested_pos == received_pos); |
7c673cae FG |
478 | read_pos = requested_pos = received_pos = p; |
479 | read_buf.clear(); | |
480 | } | |
481 | uint64_t append_entry(bufferlist& bl); | |
482 | void set_expire_pos(uint64_t ep) { | |
483 | lock_guard l(lock); | |
484 | expire_pos = ep; | |
485 | } | |
486 | void set_trimmed_pos(uint64_t p) { | |
487 | lock_guard l(lock); | |
488 | trimming_pos = trimmed_pos = p; | |
489 | } | |
490 | ||
491 | bool _write_head_needed(); | |
492 | bool write_head_needed() { | |
493 | lock_guard l(lock); | |
494 | return _write_head_needed(); | |
495 | } | |
496 | ||
497 | ||
498 | void trim(); | |
499 | void trim_tail() { | |
500 | lock_guard l(lock); | |
501 | ||
11fdf7f2 | 502 | ceph_assert(!readonly); |
7c673cae FG |
503 | _issue_prezero(); |
504 | } | |
505 | ||
506 | void set_write_error_handler(Context *c); | |
507 | ||
508 | void set_write_iohint(uint32_t iohint_flags) { | |
509 | write_iohint = iohint_flags; | |
510 | } | |
511 | /** | |
512 | * Cause any ongoing waits to error out with -EAGAIN, set error | |
513 | * to -EAGAIN. | |
514 | */ | |
515 | void shutdown(); | |
7c673cae FG |
516 | public: |
517 | ||
518 | // Synchronous getters | |
519 | // =================== | |
520 | // TODO: need some locks on reads for true safety | |
521 | uint64_t get_layout_period() const { | |
522 | return layout.get_period(); | |
523 | } | |
524 | file_layout_t& get_layout() { return layout; } | |
525 | bool is_active() { return state == STATE_ACTIVE; } | |
b32b8144 | 526 | bool is_stopping() { return state == STATE_STOPPING; } |
7c673cae FG |
527 | int get_error() { return error; } |
528 | bool is_readonly() { return readonly; } | |
529 | bool is_readable(); | |
530 | bool try_read_entry(bufferlist& bl); | |
531 | uint64_t get_write_pos() const { return write_pos; } | |
532 | uint64_t get_write_safe_pos() const { return safe_pos; } | |
533 | uint64_t get_read_pos() const { return read_pos; } | |
534 | uint64_t get_expire_pos() const { return expire_pos; } | |
535 | uint64_t get_trimmed_pos() const { return trimmed_pos; } | |
9f95a23c TL |
536 | size_t get_journal_envelope_size() const { |
537 | return journal_stream.get_envelope_size(); | |
538 | } | |
7c673cae FG |
539 | }; |
540 | WRITE_CLASS_ENCODER(Journaler::Header) | |
541 | ||
542 | #endif |