]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | ||
16 | #ifndef CEPH_FILEJOURNAL_H | |
17 | #define CEPH_FILEJOURNAL_H | |
18 | ||
19 | #include <deque> | |
20 | using std::deque; | |
21 | ||
22 | #include "Journal.h" | |
23 | #include "common/Cond.h" | |
24 | #include "common/Mutex.h" | |
25 | #include "common/Thread.h" | |
26 | #include "common/Throttle.h" | |
27 | #include "JournalThrottle.h" | |
28 | #include "common/zipkin_trace.h" | |
29 | ||
30 | #ifdef HAVE_LIBAIO | |
31 | # include <libaio.h> | |
32 | #endif | |
33 | ||
34 | /** | |
35 | * Implements journaling on top of block device or file. | |
36 | * | |
37 | * Lock ordering is write_lock > aio_lock > (completions_lock | finisher_lock) | |
38 | */ | |
39 | class FileJournal : | |
40 | public Journal, | |
41 | public md_config_obs_t { | |
42 | public: | |
43 | /// Protected by finisher_lock | |
44 | struct completion_item { | |
45 | uint64_t seq; | |
46 | Context *finish; | |
47 | utime_t start; | |
48 | TrackedOpRef tracked_op; | |
49 | completion_item(uint64_t o, Context *c, utime_t s, TrackedOpRef opref) | |
50 | : seq(o), finish(c), start(s), tracked_op(opref) {} | |
51 | completion_item() : seq(0), finish(0), start(0) {} | |
52 | }; | |
53 | struct write_item { | |
54 | uint64_t seq; | |
55 | bufferlist bl; | |
56 | uint32_t orig_len; | |
57 | TrackedOpRef tracked_op; | |
58 | ZTracer::Trace trace; | |
59 | write_item(uint64_t s, bufferlist& b, int ol, TrackedOpRef opref) : | |
60 | seq(s), orig_len(ol), tracked_op(opref) { | |
61 | bl.claim(b, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy | |
62 | } | |
63 | write_item() : seq(0), orig_len(0) {} | |
64 | }; | |
65 | ||
66 | Mutex finisher_lock; | |
67 | Cond finisher_cond; | |
68 | uint64_t journaled_seq; | |
69 | bool plug_journal_completions; | |
70 | ||
71 | Mutex writeq_lock; | |
72 | Cond writeq_cond; | |
73 | list<write_item> writeq; | |
74 | bool writeq_empty(); | |
75 | write_item &peek_write(); | |
76 | void pop_write(); | |
77 | void batch_pop_write(list<write_item> &items); | |
78 | void batch_unpop_write(list<write_item> &items); | |
79 | ||
80 | Mutex completions_lock; | |
81 | list<completion_item> completions; | |
82 | bool completions_empty() { | |
83 | Mutex::Locker l(completions_lock); | |
84 | return completions.empty(); | |
85 | } | |
86 | void batch_pop_completions(list<completion_item> &items) { | |
87 | Mutex::Locker l(completions_lock); | |
88 | completions.swap(items); | |
89 | } | |
90 | void batch_unpop_completions(list<completion_item> &items) { | |
91 | Mutex::Locker l(completions_lock); | |
92 | completions.splice(completions.begin(), items); | |
93 | } | |
94 | completion_item completion_peek_front() { | |
95 | Mutex::Locker l(completions_lock); | |
96 | assert(!completions.empty()); | |
97 | return completions.front(); | |
98 | } | |
99 | void completion_pop_front() { | |
100 | Mutex::Locker l(completions_lock); | |
101 | assert(!completions.empty()); | |
102 | completions.pop_front(); | |
103 | } | |
104 | ||
105 | int prepare_entry(vector<ObjectStore::Transaction>& tls, bufferlist* tbl) override; | |
106 | ||
107 | void submit_entry(uint64_t seq, bufferlist& bl, uint32_t orig_len, | |
108 | Context *oncommit, | |
109 | TrackedOpRef osd_op = TrackedOpRef()) override; | |
110 | /// End protected by finisher_lock | |
111 | ||
112 | /* | |
113 | * journal header | |
114 | */ | |
115 | struct header_t { | |
116 | enum { | |
117 | FLAG_CRC = (1<<0), | |
118 | // NOTE: remove kludgey weirdness in read_header() next time a flag is added. | |
119 | }; | |
120 | ||
121 | uint64_t flags; | |
122 | uuid_d fsid; | |
123 | __u32 block_size; | |
124 | __u32 alignment; | |
125 | int64_t max_size; // max size of journal ring buffer | |
126 | int64_t start; // offset of first entry | |
127 | uint64_t committed_up_to; // committed up to | |
128 | ||
129 | /** | |
130 | * start_seq | |
131 | * | |
132 | * entry at header.start has sequence >= start_seq | |
133 | * | |
134 | * Generally, the entry at header.start will have sequence | |
135 | * start_seq if it exists. The only exception is immediately | |
136 | * after journal creation since the first sequence number is | |
137 | * not known. | |
138 | * | |
139 | * If the first read on open fails, we can assume corruption | |
140 | * if start_seq > committed_up_to because the entry would have | |
141 | * a sequence >= start_seq and therefore > committed_up_to. | |
142 | */ | |
143 | uint64_t start_seq; | |
144 | ||
145 | header_t() : | |
146 | flags(0), block_size(0), alignment(0), max_size(0), start(0), | |
147 | committed_up_to(0), start_seq(0) {} | |
148 | ||
149 | void clear() { | |
150 | start = block_size; | |
151 | } | |
152 | ||
153 | uint64_t get_fsid64() const { | |
154 | return *(uint64_t*)fsid.bytes(); | |
155 | } | |
156 | ||
157 | void encode(bufferlist& bl) const { | |
158 | __u32 v = 4; | |
159 | ::encode(v, bl); | |
160 | bufferlist em; | |
161 | { | |
162 | ::encode(flags, em); | |
163 | ::encode(fsid, em); | |
164 | ::encode(block_size, em); | |
165 | ::encode(alignment, em); | |
166 | ::encode(max_size, em); | |
167 | ::encode(start, em); | |
168 | ::encode(committed_up_to, em); | |
169 | ::encode(start_seq, em); | |
170 | } | |
171 | ::encode(em, bl); | |
172 | } | |
173 | void decode(bufferlist::iterator& bl) { | |
174 | __u32 v; | |
175 | ::decode(v, bl); | |
176 | if (v < 2) { // normally 0, but concievably 1 | |
177 | // decode old header_t struct (pre v0.40). | |
178 | bl.advance(4); // skip __u32 flags (it was unused by any old code) | |
179 | flags = 0; | |
180 | uint64_t tfsid; | |
181 | ::decode(tfsid, bl); | |
182 | *(uint64_t*)&fsid.bytes()[0] = tfsid; | |
183 | *(uint64_t*)&fsid.bytes()[8] = tfsid; | |
184 | ::decode(block_size, bl); | |
185 | ::decode(alignment, bl); | |
186 | ::decode(max_size, bl); | |
187 | ::decode(start, bl); | |
188 | committed_up_to = 0; | |
189 | start_seq = 0; | |
190 | return; | |
191 | } | |
192 | bufferlist em; | |
193 | ::decode(em, bl); | |
194 | bufferlist::iterator t = em.begin(); | |
195 | ::decode(flags, t); | |
196 | ::decode(fsid, t); | |
197 | ::decode(block_size, t); | |
198 | ::decode(alignment, t); | |
199 | ::decode(max_size, t); | |
200 | ::decode(start, t); | |
201 | ||
202 | if (v > 2) | |
203 | ::decode(committed_up_to, t); | |
204 | else | |
205 | committed_up_to = 0; | |
206 | ||
207 | if (v > 3) | |
208 | ::decode(start_seq, t); | |
209 | else | |
210 | start_seq = 0; | |
211 | } | |
212 | } header; | |
213 | ||
214 | struct entry_header_t { | |
215 | uint64_t seq; // fs op seq # | |
216 | uint32_t crc32c; // payload only. not header, pre_pad, post_pad, or footer. | |
217 | uint32_t len; | |
218 | uint32_t pre_pad, post_pad; | |
219 | uint64_t magic1; | |
220 | uint64_t magic2; | |
221 | ||
222 | static uint64_t make_magic(uint64_t seq, uint32_t len, uint64_t fsid) { | |
223 | return (fsid ^ seq ^ len); | |
224 | } | |
225 | bool check_magic(off64_t pos, uint64_t fsid) { | |
226 | return | |
227 | magic1 == (uint64_t)pos && | |
228 | magic2 == (fsid ^ seq ^ len); | |
229 | } | |
230 | } __attribute__((__packed__, aligned(4))); | |
231 | ||
232 | bool journalq_empty() { return journalq.empty(); } | |
233 | ||
234 | private: | |
235 | string fn; | |
236 | ||
237 | char *zero_buf; | |
238 | off64_t max_size; | |
239 | size_t block_size; | |
240 | bool directio, aio, force_aio; | |
241 | bool must_write_header; | |
242 | off64_t write_pos; // byte where the next entry to be written will go | |
243 | off64_t read_pos; // | |
244 | bool discard; //for block journal whether support discard | |
245 | ||
246 | #ifdef HAVE_LIBAIO | |
247 | /// state associated with an in-flight aio request | |
248 | /// Protected by aio_lock | |
249 | struct aio_info { | |
250 | struct iocb iocb; | |
251 | bufferlist bl; | |
252 | struct iovec *iov; | |
253 | bool done; | |
254 | uint64_t off, len; ///< these are for debug only | |
255 | uint64_t seq; ///< seq number to complete on aio completion, if non-zero | |
256 | ||
257 | aio_info(bufferlist& b, uint64_t o, uint64_t s) | |
258 | : iov(NULL), done(false), off(o), len(b.length()), seq(s) { | |
259 | bl.claim(b); | |
260 | } | |
261 | ~aio_info() { | |
262 | delete[] iov; | |
263 | } | |
264 | }; | |
265 | Mutex aio_lock; | |
266 | Cond aio_cond; | |
267 | Cond write_finish_cond; | |
268 | io_context_t aio_ctx; | |
269 | list<aio_info> aio_queue; | |
270 | int aio_num, aio_bytes; | |
271 | uint64_t aio_write_queue_ops; | |
272 | uint64_t aio_write_queue_bytes; | |
273 | /// End protected by aio_lock | |
274 | #endif | |
275 | ||
276 | uint64_t last_committed_seq; | |
277 | uint64_t journaled_since_start; | |
278 | ||
279 | /* | |
280 | * full states cycle at the beginnging of each commit epoch, when commit_start() | |
281 | * is called. | |
282 | * FULL - we just filled up during this epoch. | |
283 | * WAIT - we filled up last epoch; now we have to wait until everything during | |
284 | * that epoch commits to the fs before we can start writing over it. | |
285 | * NOTFULL - all good, journal away. | |
286 | */ | |
287 | enum { | |
288 | FULL_NOTFULL = 0, | |
289 | FULL_FULL = 1, | |
290 | FULL_WAIT = 2, | |
291 | } full_state; | |
292 | ||
293 | int fd; | |
294 | ||
295 | // in journal | |
296 | deque<pair<uint64_t, off64_t> > journalq; // track seq offsets, so we can trim later. | |
297 | uint64_t writing_seq; | |
298 | ||
299 | ||
300 | // throttle | |
301 | int set_throttle_params(); | |
302 | const char** get_tracked_conf_keys() const override; | |
303 | void handle_conf_change( | |
304 | const struct md_config_t *conf, | |
305 | const std::set <std::string> &changed) override { | |
306 | for (const char **i = get_tracked_conf_keys(); | |
307 | *i; | |
308 | ++i) { | |
309 | if (changed.count(string(*i))) { | |
310 | set_throttle_params(); | |
311 | return; | |
312 | } | |
313 | } | |
314 | } | |
315 | ||
316 | void complete_write(uint64_t ops, uint64_t bytes); | |
317 | JournalThrottle throttle; | |
318 | ||
319 | // write thread | |
320 | Mutex write_lock; | |
321 | bool write_stop; | |
322 | bool aio_stop; | |
323 | ||
324 | Cond commit_cond; | |
325 | ||
326 | int _open(bool wr, bool create=false); | |
327 | int _open_block_device(); | |
328 | void _close(int fd) const; | |
329 | int _open_file(int64_t oldsize, blksize_t blksize, bool create); | |
330 | int _dump(ostream& out, bool simple); | |
331 | void print_header(const header_t &hdr) const; | |
332 | int read_header(header_t *hdr) const; | |
333 | bufferptr prepare_header(); | |
334 | void start_writer(); | |
335 | void stop_writer(); | |
336 | void write_thread_entry(); | |
337 | ||
338 | void queue_completions_thru(uint64_t seq); | |
339 | ||
340 | int check_for_full(uint64_t seq, off64_t pos, off64_t size); | |
341 | int prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytee); | |
342 | int prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos, | |
343 | uint64_t& orig_ops, uint64_t& orig_bytes); | |
344 | void do_write(bufferlist& bl); | |
345 | ||
346 | void write_finish_thread_entry(); | |
347 | void check_aio_completion(); | |
348 | void do_aio_write(bufferlist& bl); | |
349 | int write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq); | |
350 | ||
351 | ||
352 | void check_align(off64_t pos, bufferlist& bl); | |
353 | int write_bl(off64_t& pos, bufferlist& bl); | |
354 | ||
355 | /// read len from journal starting at in_pos and wrapping up to len | |
356 | void wrap_read_bl( | |
357 | off64_t in_pos, ///< [in] start position | |
358 | int64_t len, ///< [in] length to read | |
359 | bufferlist* bl, ///< [out] result | |
360 | off64_t *out_pos ///< [out] next position to read, will be wrapped | |
361 | ) const; | |
362 | ||
363 | void do_discard(int64_t offset, int64_t end); | |
364 | ||
365 | class Writer : public Thread { | |
366 | FileJournal *journal; | |
367 | public: | |
368 | explicit Writer(FileJournal *fj) : journal(fj) {} | |
369 | void *entry() override { | |
370 | journal->write_thread_entry(); | |
371 | return 0; | |
372 | } | |
373 | } write_thread; | |
374 | ||
375 | class WriteFinisher : public Thread { | |
376 | FileJournal *journal; | |
377 | public: | |
378 | explicit WriteFinisher(FileJournal *fj) : journal(fj) {} | |
379 | void *entry() override { | |
380 | journal->write_finish_thread_entry(); | |
381 | return 0; | |
382 | } | |
383 | } write_finish_thread; | |
384 | ||
385 | off64_t get_top() const { | |
386 | return ROUND_UP_TO(sizeof(header), block_size); | |
387 | } | |
388 | ||
389 | ZTracer::Endpoint trace_endpoint; | |
390 | ||
391 | public: | |
392 | FileJournal(CephContext* cct, uuid_d fsid, Finisher *fin, Cond *sync_cond, | |
393 | const char *f, bool dio=false, bool ai=true, bool faio=false) : | |
394 | Journal(cct, fsid, fin, sync_cond), | |
395 | finisher_lock("FileJournal::finisher_lock", false, true, false, cct), | |
396 | journaled_seq(0), | |
397 | plug_journal_completions(false), | |
398 | writeq_lock("FileJournal::writeq_lock", false, true, false, cct), | |
399 | completions_lock( | |
400 | "FileJournal::completions_lock", false, true, false, cct), | |
401 | fn(f), | |
402 | zero_buf(NULL), | |
403 | max_size(0), block_size(0), | |
404 | directio(dio), aio(ai), force_aio(faio), | |
405 | must_write_header(false), | |
406 | write_pos(0), read_pos(0), | |
407 | discard(false), | |
408 | #ifdef HAVE_LIBAIO | |
409 | aio_lock("FileJournal::aio_lock"), | |
410 | aio_ctx(0), | |
411 | aio_num(0), aio_bytes(0), | |
412 | aio_write_queue_ops(0), | |
413 | aio_write_queue_bytes(0), | |
414 | #endif | |
415 | last_committed_seq(0), | |
416 | journaled_since_start(0), | |
417 | full_state(FULL_NOTFULL), | |
418 | fd(-1), | |
419 | writing_seq(0), | |
420 | throttle(cct->_conf->filestore_caller_concurrency), | |
421 | write_lock("FileJournal::write_lock", false, true, false, cct), | |
422 | write_stop(true), | |
423 | aio_stop(true), | |
424 | write_thread(this), | |
425 | write_finish_thread(this), | |
426 | trace_endpoint("0.0.0.0", 0, "FileJournal") { | |
427 | ||
428 | if (aio && !directio) { | |
429 | lderr(cct) << "FileJournal::_open_any: aio not supported without directio; disabling aio" << dendl; | |
430 | aio = false; | |
431 | } | |
432 | #ifndef HAVE_LIBAIO | |
433 | if (aio) { | |
434 | lderr(cct) << "FileJournal::_open_any: libaio not compiled in; disabling aio" << dendl; | |
435 | aio = false; | |
436 | } | |
437 | #endif | |
438 | ||
439 | cct->_conf->add_observer(this); | |
440 | } | |
441 | ~FileJournal() override { | |
442 | assert(fd == -1); | |
443 | delete[] zero_buf; | |
444 | cct->_conf->remove_observer(this); | |
445 | } | |
446 | ||
447 | int check() override; | |
448 | int create() override; | |
449 | int open(uint64_t fs_op_seq) override; | |
450 | void close() override; | |
451 | int peek_fsid(uuid_d& fsid); | |
452 | ||
453 | int dump(ostream& out) override; | |
454 | int simple_dump(ostream& out); | |
455 | int _fdump(Formatter &f, bool simple); | |
456 | ||
457 | void flush() override; | |
458 | ||
459 | void reserve_throttle_and_backoff(uint64_t count) override; | |
460 | ||
461 | bool is_writeable() override { | |
462 | return read_pos == 0; | |
463 | } | |
464 | int make_writeable() override; | |
465 | ||
466 | // writes | |
467 | void commit_start(uint64_t seq) override; | |
468 | void committed_thru(uint64_t seq) override; | |
469 | bool should_commit_now() override { | |
470 | return full_state != FULL_NOTFULL && !write_stop; | |
471 | } | |
472 | ||
473 | void write_header_sync(); | |
474 | ||
475 | void set_wait_on_full(bool b) { wait_on_full = b; } | |
476 | ||
477 | off64_t get_journal_size_estimate(); | |
478 | ||
479 | // reads | |
480 | ||
481 | /// Result code for read_entry | |
482 | enum read_entry_result { | |
483 | SUCCESS, | |
484 | FAILURE, | |
485 | MAYBE_CORRUPT | |
486 | }; | |
487 | ||
488 | /** | |
489 | * read_entry | |
490 | * | |
491 | * Reads next entry starting at pos. If the entry appears | |
492 | * clean, *bl will contain the payload, *seq will contain | |
493 | * the sequence number, and *out_pos will reflect the next | |
494 | * read position. If the entry is invalid *ss will contain | |
495 | * debug text, while *seq, *out_pos, and *bl will be unchanged. | |
496 | * | |
497 | * If the entry suggests a corrupt log, *ss will contain debug | |
498 | * text, *out_pos will contain the next index to check. If | |
499 | * we find an entry in this way that returns SUCCESS, the journal | |
500 | * is most likely corrupt. | |
501 | */ | |
502 | read_entry_result do_read_entry( | |
503 | off64_t pos, ///< [in] position to read | |
504 | off64_t *next_pos, ///< [out] next position to read | |
505 | bufferlist* bl, ///< [out] payload for successful read | |
506 | uint64_t *seq, ///< [out] seq of successful read | |
507 | ostream *ss, ///< [out] error output | |
508 | entry_header_t *h = 0 ///< [out] header | |
509 | ) const; ///< @return result code | |
510 | ||
511 | bool read_entry( | |
512 | bufferlist &bl, | |
513 | uint64_t &last_seq, | |
514 | bool *corrupt | |
515 | ); | |
516 | ||
517 | bool read_entry( | |
518 | bufferlist &bl, | |
519 | uint64_t &last_seq) override { | |
520 | return read_entry(bl, last_seq, 0); | |
521 | } | |
522 | ||
523 | // Debug/Testing | |
524 | void get_header( | |
525 | uint64_t wanted_seq, | |
526 | off64_t *_pos, | |
527 | entry_header_t *h); | |
528 | void corrupt( | |
529 | int wfd, | |
530 | off64_t corrupt_at); | |
531 | void corrupt_payload( | |
532 | int wfd, | |
533 | uint64_t seq); | |
534 | void corrupt_footer_magic( | |
535 | int wfd, | |
536 | uint64_t seq); | |
537 | void corrupt_header_magic( | |
538 | int wfd, | |
539 | uint64_t seq); | |
540 | }; | |
541 | ||
542 | WRITE_CLASS_ENCODER(FileJournal::header_t) | |
543 | ||
544 | #endif |