]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | ||
16 | #ifndef CEPH_FILEJOURNAL_H | |
17 | #define CEPH_FILEJOURNAL_H | |
18 | ||
19 | #include <deque> | |
20 | using std::deque; | |
21 | ||
22 | #include "Journal.h" | |
23 | #include "common/Cond.h" | |
24 | #include "common/Mutex.h" | |
25 | #include "common/Thread.h" | |
26 | #include "common/Throttle.h" | |
27 | #include "JournalThrottle.h" | |
28 | #include "common/zipkin_trace.h" | |
29 | ||
30 | #ifdef HAVE_LIBAIO | |
31 | # include <libaio.h> | |
32 | #endif | |
33 | ||
31f18b77 FG |
34 | // re-include our assert to clobber the system one; fix dout: |
35 | #include "include/assert.h" | |
36 | ||
7c673cae FG |
37 | /** |
38 | * Implements journaling on top of block device or file. | |
39 | * | |
40 | * Lock ordering is write_lock > aio_lock > (completions_lock | finisher_lock) | |
41 | */ | |
42 | class FileJournal : | |
43 | public Journal, | |
44 | public md_config_obs_t { | |
45 | public: | |
46 | /// Protected by finisher_lock | |
47 | struct completion_item { | |
48 | uint64_t seq; | |
49 | Context *finish; | |
50 | utime_t start; | |
51 | TrackedOpRef tracked_op; | |
52 | completion_item(uint64_t o, Context *c, utime_t s, TrackedOpRef opref) | |
53 | : seq(o), finish(c), start(s), tracked_op(opref) {} | |
54 | completion_item() : seq(0), finish(0), start(0) {} | |
55 | }; | |
56 | struct write_item { | |
57 | uint64_t seq; | |
58 | bufferlist bl; | |
59 | uint32_t orig_len; | |
60 | TrackedOpRef tracked_op; | |
61 | ZTracer::Trace trace; | |
62 | write_item(uint64_t s, bufferlist& b, int ol, TrackedOpRef opref) : | |
63 | seq(s), orig_len(ol), tracked_op(opref) { | |
64 | bl.claim(b, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy | |
65 | } | |
66 | write_item() : seq(0), orig_len(0) {} | |
67 | }; | |
68 | ||
69 | Mutex finisher_lock; | |
70 | Cond finisher_cond; | |
71 | uint64_t journaled_seq; | |
72 | bool plug_journal_completions; | |
73 | ||
74 | Mutex writeq_lock; | |
75 | Cond writeq_cond; | |
76 | list<write_item> writeq; | |
77 | bool writeq_empty(); | |
78 | write_item &peek_write(); | |
79 | void pop_write(); | |
80 | void batch_pop_write(list<write_item> &items); | |
81 | void batch_unpop_write(list<write_item> &items); | |
82 | ||
83 | Mutex completions_lock; | |
84 | list<completion_item> completions; | |
85 | bool completions_empty() { | |
86 | Mutex::Locker l(completions_lock); | |
87 | return completions.empty(); | |
88 | } | |
89 | void batch_pop_completions(list<completion_item> &items) { | |
90 | Mutex::Locker l(completions_lock); | |
91 | completions.swap(items); | |
92 | } | |
93 | void batch_unpop_completions(list<completion_item> &items) { | |
94 | Mutex::Locker l(completions_lock); | |
95 | completions.splice(completions.begin(), items); | |
96 | } | |
97 | completion_item completion_peek_front() { | |
98 | Mutex::Locker l(completions_lock); | |
99 | assert(!completions.empty()); | |
100 | return completions.front(); | |
101 | } | |
102 | void completion_pop_front() { | |
103 | Mutex::Locker l(completions_lock); | |
104 | assert(!completions.empty()); | |
105 | completions.pop_front(); | |
106 | } | |
107 | ||
108 | int prepare_entry(vector<ObjectStore::Transaction>& tls, bufferlist* tbl) override; | |
109 | ||
110 | void submit_entry(uint64_t seq, bufferlist& bl, uint32_t orig_len, | |
111 | Context *oncommit, | |
112 | TrackedOpRef osd_op = TrackedOpRef()) override; | |
113 | /// End protected by finisher_lock | |
114 | ||
115 | /* | |
116 | * journal header | |
117 | */ | |
118 | struct header_t { | |
119 | enum { | |
120 | FLAG_CRC = (1<<0), | |
121 | // NOTE: remove kludgey weirdness in read_header() next time a flag is added. | |
122 | }; | |
123 | ||
124 | uint64_t flags; | |
125 | uuid_d fsid; | |
126 | __u32 block_size; | |
127 | __u32 alignment; | |
128 | int64_t max_size; // max size of journal ring buffer | |
129 | int64_t start; // offset of first entry | |
130 | uint64_t committed_up_to; // committed up to | |
131 | ||
132 | /** | |
133 | * start_seq | |
134 | * | |
135 | * entry at header.start has sequence >= start_seq | |
136 | * | |
137 | * Generally, the entry at header.start will have sequence | |
138 | * start_seq if it exists. The only exception is immediately | |
139 | * after journal creation since the first sequence number is | |
140 | * not known. | |
141 | * | |
142 | * If the first read on open fails, we can assume corruption | |
143 | * if start_seq > committed_up_to because the entry would have | |
144 | * a sequence >= start_seq and therefore > committed_up_to. | |
145 | */ | |
146 | uint64_t start_seq; | |
147 | ||
148 | header_t() : | |
149 | flags(0), block_size(0), alignment(0), max_size(0), start(0), | |
150 | committed_up_to(0), start_seq(0) {} | |
151 | ||
152 | void clear() { | |
153 | start = block_size; | |
154 | } | |
155 | ||
156 | uint64_t get_fsid64() const { | |
157 | return *(uint64_t*)fsid.bytes(); | |
158 | } | |
159 | ||
160 | void encode(bufferlist& bl) const { | |
161 | __u32 v = 4; | |
162 | ::encode(v, bl); | |
163 | bufferlist em; | |
164 | { | |
165 | ::encode(flags, em); | |
166 | ::encode(fsid, em); | |
167 | ::encode(block_size, em); | |
168 | ::encode(alignment, em); | |
169 | ::encode(max_size, em); | |
170 | ::encode(start, em); | |
171 | ::encode(committed_up_to, em); | |
172 | ::encode(start_seq, em); | |
173 | } | |
174 | ::encode(em, bl); | |
175 | } | |
176 | void decode(bufferlist::iterator& bl) { | |
177 | __u32 v; | |
178 | ::decode(v, bl); | |
179 | if (v < 2) { // normally 0, but concievably 1 | |
180 | // decode old header_t struct (pre v0.40). | |
181 | bl.advance(4); // skip __u32 flags (it was unused by any old code) | |
182 | flags = 0; | |
183 | uint64_t tfsid; | |
184 | ::decode(tfsid, bl); | |
185 | *(uint64_t*)&fsid.bytes()[0] = tfsid; | |
186 | *(uint64_t*)&fsid.bytes()[8] = tfsid; | |
187 | ::decode(block_size, bl); | |
188 | ::decode(alignment, bl); | |
189 | ::decode(max_size, bl); | |
190 | ::decode(start, bl); | |
191 | committed_up_to = 0; | |
192 | start_seq = 0; | |
193 | return; | |
194 | } | |
195 | bufferlist em; | |
196 | ::decode(em, bl); | |
197 | bufferlist::iterator t = em.begin(); | |
198 | ::decode(flags, t); | |
199 | ::decode(fsid, t); | |
200 | ::decode(block_size, t); | |
201 | ::decode(alignment, t); | |
202 | ::decode(max_size, t); | |
203 | ::decode(start, t); | |
204 | ||
205 | if (v > 2) | |
206 | ::decode(committed_up_to, t); | |
207 | else | |
208 | committed_up_to = 0; | |
209 | ||
210 | if (v > 3) | |
211 | ::decode(start_seq, t); | |
212 | else | |
213 | start_seq = 0; | |
214 | } | |
215 | } header; | |
216 | ||
217 | struct entry_header_t { | |
218 | uint64_t seq; // fs op seq # | |
219 | uint32_t crc32c; // payload only. not header, pre_pad, post_pad, or footer. | |
220 | uint32_t len; | |
221 | uint32_t pre_pad, post_pad; | |
222 | uint64_t magic1; | |
223 | uint64_t magic2; | |
224 | ||
225 | static uint64_t make_magic(uint64_t seq, uint32_t len, uint64_t fsid) { | |
226 | return (fsid ^ seq ^ len); | |
227 | } | |
228 | bool check_magic(off64_t pos, uint64_t fsid) { | |
229 | return | |
230 | magic1 == (uint64_t)pos && | |
231 | magic2 == (fsid ^ seq ^ len); | |
232 | } | |
233 | } __attribute__((__packed__, aligned(4))); | |
234 | ||
235 | bool journalq_empty() { return journalq.empty(); } | |
236 | ||
237 | private: | |
238 | string fn; | |
239 | ||
240 | char *zero_buf; | |
241 | off64_t max_size; | |
242 | size_t block_size; | |
243 | bool directio, aio, force_aio; | |
244 | bool must_write_header; | |
245 | off64_t write_pos; // byte where the next entry to be written will go | |
246 | off64_t read_pos; // | |
247 | bool discard; //for block journal whether support discard | |
248 | ||
249 | #ifdef HAVE_LIBAIO | |
250 | /// state associated with an in-flight aio request | |
251 | /// Protected by aio_lock | |
252 | struct aio_info { | |
253 | struct iocb iocb; | |
254 | bufferlist bl; | |
255 | struct iovec *iov; | |
256 | bool done; | |
257 | uint64_t off, len; ///< these are for debug only | |
258 | uint64_t seq; ///< seq number to complete on aio completion, if non-zero | |
259 | ||
260 | aio_info(bufferlist& b, uint64_t o, uint64_t s) | |
261 | : iov(NULL), done(false), off(o), len(b.length()), seq(s) { | |
262 | bl.claim(b); | |
263 | } | |
264 | ~aio_info() { | |
265 | delete[] iov; | |
266 | } | |
267 | }; | |
268 | Mutex aio_lock; | |
269 | Cond aio_cond; | |
270 | Cond write_finish_cond; | |
271 | io_context_t aio_ctx; | |
272 | list<aio_info> aio_queue; | |
273 | int aio_num, aio_bytes; | |
274 | uint64_t aio_write_queue_ops; | |
275 | uint64_t aio_write_queue_bytes; | |
276 | /// End protected by aio_lock | |
277 | #endif | |
278 | ||
279 | uint64_t last_committed_seq; | |
280 | uint64_t journaled_since_start; | |
281 | ||
282 | /* | |
283 | * full states cycle at the beginnging of each commit epoch, when commit_start() | |
284 | * is called. | |
285 | * FULL - we just filled up during this epoch. | |
286 | * WAIT - we filled up last epoch; now we have to wait until everything during | |
287 | * that epoch commits to the fs before we can start writing over it. | |
288 | * NOTFULL - all good, journal away. | |
289 | */ | |
290 | enum { | |
291 | FULL_NOTFULL = 0, | |
292 | FULL_FULL = 1, | |
293 | FULL_WAIT = 2, | |
294 | } full_state; | |
295 | ||
296 | int fd; | |
297 | ||
298 | // in journal | |
299 | deque<pair<uint64_t, off64_t> > journalq; // track seq offsets, so we can trim later. | |
300 | uint64_t writing_seq; | |
301 | ||
302 | ||
303 | // throttle | |
304 | int set_throttle_params(); | |
305 | const char** get_tracked_conf_keys() const override; | |
306 | void handle_conf_change( | |
307 | const struct md_config_t *conf, | |
308 | const std::set <std::string> &changed) override { | |
309 | for (const char **i = get_tracked_conf_keys(); | |
310 | *i; | |
311 | ++i) { | |
312 | if (changed.count(string(*i))) { | |
313 | set_throttle_params(); | |
314 | return; | |
315 | } | |
316 | } | |
317 | } | |
318 | ||
319 | void complete_write(uint64_t ops, uint64_t bytes); | |
320 | JournalThrottle throttle; | |
321 | ||
322 | // write thread | |
323 | Mutex write_lock; | |
324 | bool write_stop; | |
325 | bool aio_stop; | |
326 | ||
327 | Cond commit_cond; | |
328 | ||
329 | int _open(bool wr, bool create=false); | |
330 | int _open_block_device(); | |
331 | void _close(int fd) const; | |
332 | int _open_file(int64_t oldsize, blksize_t blksize, bool create); | |
333 | int _dump(ostream& out, bool simple); | |
334 | void print_header(const header_t &hdr) const; | |
335 | int read_header(header_t *hdr) const; | |
336 | bufferptr prepare_header(); | |
337 | void start_writer(); | |
338 | void stop_writer(); | |
339 | void write_thread_entry(); | |
340 | ||
341 | void queue_completions_thru(uint64_t seq); | |
342 | ||
343 | int check_for_full(uint64_t seq, off64_t pos, off64_t size); | |
344 | int prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytee); | |
345 | int prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos, | |
346 | uint64_t& orig_ops, uint64_t& orig_bytes); | |
347 | void do_write(bufferlist& bl); | |
348 | ||
349 | void write_finish_thread_entry(); | |
350 | void check_aio_completion(); | |
351 | void do_aio_write(bufferlist& bl); | |
352 | int write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq); | |
353 | ||
354 | ||
355 | void check_align(off64_t pos, bufferlist& bl); | |
356 | int write_bl(off64_t& pos, bufferlist& bl); | |
357 | ||
358 | /// read len from journal starting at in_pos and wrapping up to len | |
359 | void wrap_read_bl( | |
360 | off64_t in_pos, ///< [in] start position | |
361 | int64_t len, ///< [in] length to read | |
362 | bufferlist* bl, ///< [out] result | |
363 | off64_t *out_pos ///< [out] next position to read, will be wrapped | |
364 | ) const; | |
365 | ||
366 | void do_discard(int64_t offset, int64_t end); | |
367 | ||
368 | class Writer : public Thread { | |
369 | FileJournal *journal; | |
370 | public: | |
371 | explicit Writer(FileJournal *fj) : journal(fj) {} | |
372 | void *entry() override { | |
373 | journal->write_thread_entry(); | |
374 | return 0; | |
375 | } | |
376 | } write_thread; | |
377 | ||
378 | class WriteFinisher : public Thread { | |
379 | FileJournal *journal; | |
380 | public: | |
381 | explicit WriteFinisher(FileJournal *fj) : journal(fj) {} | |
382 | void *entry() override { | |
383 | journal->write_finish_thread_entry(); | |
384 | return 0; | |
385 | } | |
386 | } write_finish_thread; | |
387 | ||
388 | off64_t get_top() const { | |
389 | return ROUND_UP_TO(sizeof(header), block_size); | |
390 | } | |
391 | ||
392 | ZTracer::Endpoint trace_endpoint; | |
393 | ||
394 | public: | |
395 | FileJournal(CephContext* cct, uuid_d fsid, Finisher *fin, Cond *sync_cond, | |
396 | const char *f, bool dio=false, bool ai=true, bool faio=false) : | |
397 | Journal(cct, fsid, fin, sync_cond), | |
398 | finisher_lock("FileJournal::finisher_lock", false, true, false, cct), | |
399 | journaled_seq(0), | |
400 | plug_journal_completions(false), | |
401 | writeq_lock("FileJournal::writeq_lock", false, true, false, cct), | |
402 | completions_lock( | |
403 | "FileJournal::completions_lock", false, true, false, cct), | |
404 | fn(f), | |
405 | zero_buf(NULL), | |
406 | max_size(0), block_size(0), | |
407 | directio(dio), aio(ai), force_aio(faio), | |
408 | must_write_header(false), | |
409 | write_pos(0), read_pos(0), | |
410 | discard(false), | |
411 | #ifdef HAVE_LIBAIO | |
412 | aio_lock("FileJournal::aio_lock"), | |
413 | aio_ctx(0), | |
414 | aio_num(0), aio_bytes(0), | |
415 | aio_write_queue_ops(0), | |
416 | aio_write_queue_bytes(0), | |
417 | #endif | |
418 | last_committed_seq(0), | |
419 | journaled_since_start(0), | |
420 | full_state(FULL_NOTFULL), | |
421 | fd(-1), | |
422 | writing_seq(0), | |
423 | throttle(cct->_conf->filestore_caller_concurrency), | |
424 | write_lock("FileJournal::write_lock", false, true, false, cct), | |
425 | write_stop(true), | |
426 | aio_stop(true), | |
427 | write_thread(this), | |
428 | write_finish_thread(this), | |
429 | trace_endpoint("0.0.0.0", 0, "FileJournal") { | |
430 | ||
431 | if (aio && !directio) { | |
432 | lderr(cct) << "FileJournal::_open_any: aio not supported without directio; disabling aio" << dendl; | |
433 | aio = false; | |
434 | } | |
435 | #ifndef HAVE_LIBAIO | |
436 | if (aio) { | |
437 | lderr(cct) << "FileJournal::_open_any: libaio not compiled in; disabling aio" << dendl; | |
438 | aio = false; | |
439 | } | |
440 | #endif | |
441 | ||
442 | cct->_conf->add_observer(this); | |
443 | } | |
444 | ~FileJournal() override { | |
445 | assert(fd == -1); | |
446 | delete[] zero_buf; | |
447 | cct->_conf->remove_observer(this); | |
448 | } | |
449 | ||
450 | int check() override; | |
451 | int create() override; | |
452 | int open(uint64_t fs_op_seq) override; | |
453 | void close() override; | |
454 | int peek_fsid(uuid_d& fsid); | |
455 | ||
456 | int dump(ostream& out) override; | |
457 | int simple_dump(ostream& out); | |
458 | int _fdump(Formatter &f, bool simple); | |
459 | ||
460 | void flush() override; | |
461 | ||
462 | void reserve_throttle_and_backoff(uint64_t count) override; | |
463 | ||
464 | bool is_writeable() override { | |
465 | return read_pos == 0; | |
466 | } | |
467 | int make_writeable() override; | |
468 | ||
469 | // writes | |
470 | void commit_start(uint64_t seq) override; | |
471 | void committed_thru(uint64_t seq) override; | |
472 | bool should_commit_now() override { | |
473 | return full_state != FULL_NOTFULL && !write_stop; | |
474 | } | |
475 | ||
476 | void write_header_sync(); | |
477 | ||
478 | void set_wait_on_full(bool b) { wait_on_full = b; } | |
479 | ||
480 | off64_t get_journal_size_estimate(); | |
481 | ||
482 | // reads | |
483 | ||
484 | /// Result code for read_entry | |
485 | enum read_entry_result { | |
486 | SUCCESS, | |
487 | FAILURE, | |
488 | MAYBE_CORRUPT | |
489 | }; | |
490 | ||
491 | /** | |
492 | * read_entry | |
493 | * | |
494 | * Reads next entry starting at pos. If the entry appears | |
495 | * clean, *bl will contain the payload, *seq will contain | |
496 | * the sequence number, and *out_pos will reflect the next | |
497 | * read position. If the entry is invalid *ss will contain | |
498 | * debug text, while *seq, *out_pos, and *bl will be unchanged. | |
499 | * | |
500 | * If the entry suggests a corrupt log, *ss will contain debug | |
501 | * text, *out_pos will contain the next index to check. If | |
502 | * we find an entry in this way that returns SUCCESS, the journal | |
503 | * is most likely corrupt. | |
504 | */ | |
505 | read_entry_result do_read_entry( | |
506 | off64_t pos, ///< [in] position to read | |
507 | off64_t *next_pos, ///< [out] next position to read | |
508 | bufferlist* bl, ///< [out] payload for successful read | |
509 | uint64_t *seq, ///< [out] seq of successful read | |
510 | ostream *ss, ///< [out] error output | |
511 | entry_header_t *h = 0 ///< [out] header | |
512 | ) const; ///< @return result code | |
513 | ||
514 | bool read_entry( | |
515 | bufferlist &bl, | |
516 | uint64_t &last_seq, | |
517 | bool *corrupt | |
518 | ); | |
519 | ||
520 | bool read_entry( | |
521 | bufferlist &bl, | |
522 | uint64_t &last_seq) override { | |
523 | return read_entry(bl, last_seq, 0); | |
524 | } | |
525 | ||
526 | // Debug/Testing | |
527 | void get_header( | |
528 | uint64_t wanted_seq, | |
529 | off64_t *_pos, | |
530 | entry_header_t *h); | |
531 | void corrupt( | |
532 | int wfd, | |
533 | off64_t corrupt_at); | |
534 | void corrupt_payload( | |
535 | int wfd, | |
536 | uint64_t seq); | |
537 | void corrupt_footer_magic( | |
538 | int wfd, | |
539 | uint64_t seq); | |
540 | void corrupt_header_magic( | |
541 | int wfd, | |
542 | uint64_t seq); | |
543 | }; | |
544 | ||
545 | WRITE_CLASS_ENCODER(FileJournal::header_t) | |
546 | ||
547 | #endif |