]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/filestore/FileJournal.h
update sources to v12.1.0
[ceph.git] / ceph / src / os / filestore / FileJournal.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #ifndef CEPH_FILEJOURNAL_H
17 #define CEPH_FILEJOURNAL_H
18
19 #include <deque>
20 using std::deque;
21
22 #include "Journal.h"
23 #include "common/Cond.h"
24 #include "common/Mutex.h"
25 #include "common/Thread.h"
26 #include "common/Throttle.h"
27 #include "JournalThrottle.h"
28 #include "common/zipkin_trace.h"
29
30 #ifdef HAVE_LIBAIO
31 # include <libaio.h>
32 #endif
33
34 // re-include our assert to clobber the system one; fix dout:
35 #include "include/assert.h"
36
37 /**
38 * Implements journaling on top of block device or file.
39 *
40 * Lock ordering is write_lock > aio_lock > (completions_lock | finisher_lock)
41 */
42 class FileJournal :
43 public Journal,
44 public md_config_obs_t {
45 public:
46 /// Protected by finisher_lock
47 struct completion_item {
48 uint64_t seq;
49 Context *finish;
50 utime_t start;
51 TrackedOpRef tracked_op;
52 completion_item(uint64_t o, Context *c, utime_t s, TrackedOpRef opref)
53 : seq(o), finish(c), start(s), tracked_op(opref) {}
54 completion_item() : seq(0), finish(0), start(0) {}
55 };
56 struct write_item {
57 uint64_t seq;
58 bufferlist bl;
59 uint32_t orig_len;
60 TrackedOpRef tracked_op;
61 ZTracer::Trace trace;
62 write_item(uint64_t s, bufferlist& b, int ol, TrackedOpRef opref) :
63 seq(s), orig_len(ol), tracked_op(opref) {
64 bl.claim(b, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy
65 }
66 write_item() : seq(0), orig_len(0) {}
67 };
68
69 Mutex finisher_lock;
70 Cond finisher_cond;
71 uint64_t journaled_seq;
72 bool plug_journal_completions;
73
74 Mutex writeq_lock;
75 Cond writeq_cond;
76 list<write_item> writeq;
77 bool writeq_empty();
78 write_item &peek_write();
79 void pop_write();
80 void batch_pop_write(list<write_item> &items);
81 void batch_unpop_write(list<write_item> &items);
82
83 Mutex completions_lock;
84 list<completion_item> completions;
85 bool completions_empty() {
86 Mutex::Locker l(completions_lock);
87 return completions.empty();
88 }
89 void batch_pop_completions(list<completion_item> &items) {
90 Mutex::Locker l(completions_lock);
91 completions.swap(items);
92 }
93 void batch_unpop_completions(list<completion_item> &items) {
94 Mutex::Locker l(completions_lock);
95 completions.splice(completions.begin(), items);
96 }
97 completion_item completion_peek_front() {
98 Mutex::Locker l(completions_lock);
99 assert(!completions.empty());
100 return completions.front();
101 }
102 void completion_pop_front() {
103 Mutex::Locker l(completions_lock);
104 assert(!completions.empty());
105 completions.pop_front();
106 }
107
108 int prepare_entry(vector<ObjectStore::Transaction>& tls, bufferlist* tbl) override;
109
110 void submit_entry(uint64_t seq, bufferlist& bl, uint32_t orig_len,
111 Context *oncommit,
112 TrackedOpRef osd_op = TrackedOpRef()) override;
113 /// End protected by finisher_lock
114
115 /*
116 * journal header
117 */
118 struct header_t {
119 enum {
120 FLAG_CRC = (1<<0),
121 // NOTE: remove kludgey weirdness in read_header() next time a flag is added.
122 };
123
124 uint64_t flags;
125 uuid_d fsid;
126 __u32 block_size;
127 __u32 alignment;
128 int64_t max_size; // max size of journal ring buffer
129 int64_t start; // offset of first entry
130 uint64_t committed_up_to; // committed up to
131
132 /**
133 * start_seq
134 *
135 * entry at header.start has sequence >= start_seq
136 *
137 * Generally, the entry at header.start will have sequence
138 * start_seq if it exists. The only exception is immediately
139 * after journal creation since the first sequence number is
140 * not known.
141 *
142 * If the first read on open fails, we can assume corruption
143 * if start_seq > committed_up_to because the entry would have
144 * a sequence >= start_seq and therefore > committed_up_to.
145 */
146 uint64_t start_seq;
147
148 header_t() :
149 flags(0), block_size(0), alignment(0), max_size(0), start(0),
150 committed_up_to(0), start_seq(0) {}
151
152 void clear() {
153 start = block_size;
154 }
155
156 uint64_t get_fsid64() const {
157 return *(uint64_t*)fsid.bytes();
158 }
159
160 void encode(bufferlist& bl) const {
161 __u32 v = 4;
162 ::encode(v, bl);
163 bufferlist em;
164 {
165 ::encode(flags, em);
166 ::encode(fsid, em);
167 ::encode(block_size, em);
168 ::encode(alignment, em);
169 ::encode(max_size, em);
170 ::encode(start, em);
171 ::encode(committed_up_to, em);
172 ::encode(start_seq, em);
173 }
174 ::encode(em, bl);
175 }
176 void decode(bufferlist::iterator& bl) {
177 __u32 v;
178 ::decode(v, bl);
179 if (v < 2) { // normally 0, but concievably 1
180 // decode old header_t struct (pre v0.40).
181 bl.advance(4); // skip __u32 flags (it was unused by any old code)
182 flags = 0;
183 uint64_t tfsid;
184 ::decode(tfsid, bl);
185 *(uint64_t*)&fsid.bytes()[0] = tfsid;
186 *(uint64_t*)&fsid.bytes()[8] = tfsid;
187 ::decode(block_size, bl);
188 ::decode(alignment, bl);
189 ::decode(max_size, bl);
190 ::decode(start, bl);
191 committed_up_to = 0;
192 start_seq = 0;
193 return;
194 }
195 bufferlist em;
196 ::decode(em, bl);
197 bufferlist::iterator t = em.begin();
198 ::decode(flags, t);
199 ::decode(fsid, t);
200 ::decode(block_size, t);
201 ::decode(alignment, t);
202 ::decode(max_size, t);
203 ::decode(start, t);
204
205 if (v > 2)
206 ::decode(committed_up_to, t);
207 else
208 committed_up_to = 0;
209
210 if (v > 3)
211 ::decode(start_seq, t);
212 else
213 start_seq = 0;
214 }
215 } header;
216
217 struct entry_header_t {
218 uint64_t seq; // fs op seq #
219 uint32_t crc32c; // payload only. not header, pre_pad, post_pad, or footer.
220 uint32_t len;
221 uint32_t pre_pad, post_pad;
222 uint64_t magic1;
223 uint64_t magic2;
224
225 static uint64_t make_magic(uint64_t seq, uint32_t len, uint64_t fsid) {
226 return (fsid ^ seq ^ len);
227 }
228 bool check_magic(off64_t pos, uint64_t fsid) {
229 return
230 magic1 == (uint64_t)pos &&
231 magic2 == (fsid ^ seq ^ len);
232 }
233 } __attribute__((__packed__, aligned(4)));
234
235 bool journalq_empty() { return journalq.empty(); }
236
237 private:
238 string fn;
239
240 char *zero_buf;
241 off64_t max_size;
242 size_t block_size;
243 bool directio, aio, force_aio;
244 bool must_write_header;
245 off64_t write_pos; // byte where the next entry to be written will go
246 off64_t read_pos; //
247 bool discard; //for block journal whether support discard
248
249 #ifdef HAVE_LIBAIO
250 /// state associated with an in-flight aio request
251 /// Protected by aio_lock
252 struct aio_info {
253 struct iocb iocb;
254 bufferlist bl;
255 struct iovec *iov;
256 bool done;
257 uint64_t off, len; ///< these are for debug only
258 uint64_t seq; ///< seq number to complete on aio completion, if non-zero
259
260 aio_info(bufferlist& b, uint64_t o, uint64_t s)
261 : iov(NULL), done(false), off(o), len(b.length()), seq(s) {
262 bl.claim(b);
263 }
264 ~aio_info() {
265 delete[] iov;
266 }
267 };
268 Mutex aio_lock;
269 Cond aio_cond;
270 Cond write_finish_cond;
271 io_context_t aio_ctx;
272 list<aio_info> aio_queue;
273 int aio_num, aio_bytes;
274 uint64_t aio_write_queue_ops;
275 uint64_t aio_write_queue_bytes;
276 /// End protected by aio_lock
277 #endif
278
279 uint64_t last_committed_seq;
280 uint64_t journaled_since_start;
281
282 /*
283 * full states cycle at the beginnging of each commit epoch, when commit_start()
284 * is called.
285 * FULL - we just filled up during this epoch.
286 * WAIT - we filled up last epoch; now we have to wait until everything during
287 * that epoch commits to the fs before we can start writing over it.
288 * NOTFULL - all good, journal away.
289 */
290 enum {
291 FULL_NOTFULL = 0,
292 FULL_FULL = 1,
293 FULL_WAIT = 2,
294 } full_state;
295
296 int fd;
297
298 // in journal
299 deque<pair<uint64_t, off64_t> > journalq; // track seq offsets, so we can trim later.
300 uint64_t writing_seq;
301
302
303 // throttle
304 int set_throttle_params();
305 const char** get_tracked_conf_keys() const override;
306 void handle_conf_change(
307 const struct md_config_t *conf,
308 const std::set <std::string> &changed) override {
309 for (const char **i = get_tracked_conf_keys();
310 *i;
311 ++i) {
312 if (changed.count(string(*i))) {
313 set_throttle_params();
314 return;
315 }
316 }
317 }
318
319 void complete_write(uint64_t ops, uint64_t bytes);
320 JournalThrottle throttle;
321
322 // write thread
323 Mutex write_lock;
324 bool write_stop;
325 bool aio_stop;
326
327 Cond commit_cond;
328
329 int _open(bool wr, bool create=false);
330 int _open_block_device();
331 void _close(int fd) const;
332 int _open_file(int64_t oldsize, blksize_t blksize, bool create);
333 int _dump(ostream& out, bool simple);
334 void print_header(const header_t &hdr) const;
335 int read_header(header_t *hdr) const;
336 bufferptr prepare_header();
337 void start_writer();
338 void stop_writer();
339 void write_thread_entry();
340
341 void queue_completions_thru(uint64_t seq);
342
343 int check_for_full(uint64_t seq, off64_t pos, off64_t size);
344 int prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytee);
345 int prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos,
346 uint64_t& orig_ops, uint64_t& orig_bytes);
347 void do_write(bufferlist& bl);
348
349 void write_finish_thread_entry();
350 void check_aio_completion();
351 void do_aio_write(bufferlist& bl);
352 int write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq);
353
354
355 void check_align(off64_t pos, bufferlist& bl);
356 int write_bl(off64_t& pos, bufferlist& bl);
357
358 /// read len from journal starting at in_pos and wrapping up to len
359 void wrap_read_bl(
360 off64_t in_pos, ///< [in] start position
361 int64_t len, ///< [in] length to read
362 bufferlist* bl, ///< [out] result
363 off64_t *out_pos ///< [out] next position to read, will be wrapped
364 ) const;
365
366 void do_discard(int64_t offset, int64_t end);
367
368 class Writer : public Thread {
369 FileJournal *journal;
370 public:
371 explicit Writer(FileJournal *fj) : journal(fj) {}
372 void *entry() override {
373 journal->write_thread_entry();
374 return 0;
375 }
376 } write_thread;
377
378 class WriteFinisher : public Thread {
379 FileJournal *journal;
380 public:
381 explicit WriteFinisher(FileJournal *fj) : journal(fj) {}
382 void *entry() override {
383 journal->write_finish_thread_entry();
384 return 0;
385 }
386 } write_finish_thread;
387
388 off64_t get_top() const {
389 return ROUND_UP_TO(sizeof(header), block_size);
390 }
391
392 ZTracer::Endpoint trace_endpoint;
393
394 public:
395 FileJournal(CephContext* cct, uuid_d fsid, Finisher *fin, Cond *sync_cond,
396 const char *f, bool dio=false, bool ai=true, bool faio=false) :
397 Journal(cct, fsid, fin, sync_cond),
398 finisher_lock("FileJournal::finisher_lock", false, true, false, cct),
399 journaled_seq(0),
400 plug_journal_completions(false),
401 writeq_lock("FileJournal::writeq_lock", false, true, false, cct),
402 completions_lock(
403 "FileJournal::completions_lock", false, true, false, cct),
404 fn(f),
405 zero_buf(NULL),
406 max_size(0), block_size(0),
407 directio(dio), aio(ai), force_aio(faio),
408 must_write_header(false),
409 write_pos(0), read_pos(0),
410 discard(false),
411 #ifdef HAVE_LIBAIO
412 aio_lock("FileJournal::aio_lock"),
413 aio_ctx(0),
414 aio_num(0), aio_bytes(0),
415 aio_write_queue_ops(0),
416 aio_write_queue_bytes(0),
417 #endif
418 last_committed_seq(0),
419 journaled_since_start(0),
420 full_state(FULL_NOTFULL),
421 fd(-1),
422 writing_seq(0),
423 throttle(cct->_conf->filestore_caller_concurrency),
424 write_lock("FileJournal::write_lock", false, true, false, cct),
425 write_stop(true),
426 aio_stop(true),
427 write_thread(this),
428 write_finish_thread(this),
429 trace_endpoint("0.0.0.0", 0, "FileJournal") {
430
431 if (aio && !directio) {
432 lderr(cct) << "FileJournal::_open_any: aio not supported without directio; disabling aio" << dendl;
433 aio = false;
434 }
435 #ifndef HAVE_LIBAIO
436 if (aio) {
437 lderr(cct) << "FileJournal::_open_any: libaio not compiled in; disabling aio" << dendl;
438 aio = false;
439 }
440 #endif
441
442 cct->_conf->add_observer(this);
443 }
444 ~FileJournal() override {
445 assert(fd == -1);
446 delete[] zero_buf;
447 cct->_conf->remove_observer(this);
448 }
449
450 int check() override;
451 int create() override;
452 int open(uint64_t fs_op_seq) override;
453 void close() override;
454 int peek_fsid(uuid_d& fsid);
455
456 int dump(ostream& out) override;
457 int simple_dump(ostream& out);
458 int _fdump(Formatter &f, bool simple);
459
460 void flush() override;
461
462 void reserve_throttle_and_backoff(uint64_t count) override;
463
464 bool is_writeable() override {
465 return read_pos == 0;
466 }
467 int make_writeable() override;
468
469 // writes
470 void commit_start(uint64_t seq) override;
471 void committed_thru(uint64_t seq) override;
472 bool should_commit_now() override {
473 return full_state != FULL_NOTFULL && !write_stop;
474 }
475
476 void write_header_sync();
477
478 void set_wait_on_full(bool b) { wait_on_full = b; }
479
480 off64_t get_journal_size_estimate();
481
482 // reads
483
484 /// Result code for read_entry
485 enum read_entry_result {
486 SUCCESS,
487 FAILURE,
488 MAYBE_CORRUPT
489 };
490
491 /**
492 * read_entry
493 *
494 * Reads next entry starting at pos. If the entry appears
495 * clean, *bl will contain the payload, *seq will contain
496 * the sequence number, and *out_pos will reflect the next
497 * read position. If the entry is invalid *ss will contain
498 * debug text, while *seq, *out_pos, and *bl will be unchanged.
499 *
500 * If the entry suggests a corrupt log, *ss will contain debug
501 * text, *out_pos will contain the next index to check. If
502 * we find an entry in this way that returns SUCCESS, the journal
503 * is most likely corrupt.
504 */
505 read_entry_result do_read_entry(
506 off64_t pos, ///< [in] position to read
507 off64_t *next_pos, ///< [out] next position to read
508 bufferlist* bl, ///< [out] payload for successful read
509 uint64_t *seq, ///< [out] seq of successful read
510 ostream *ss, ///< [out] error output
511 entry_header_t *h = 0 ///< [out] header
512 ) const; ///< @return result code
513
514 bool read_entry(
515 bufferlist &bl,
516 uint64_t &last_seq,
517 bool *corrupt
518 );
519
520 bool read_entry(
521 bufferlist &bl,
522 uint64_t &last_seq) override {
523 return read_entry(bl, last_seq, 0);
524 }
525
526 // Debug/Testing
527 void get_header(
528 uint64_t wanted_seq,
529 off64_t *_pos,
530 entry_header_t *h);
531 void corrupt(
532 int wfd,
533 off64_t corrupt_at);
534 void corrupt_payload(
535 int wfd,
536 uint64_t seq);
537 void corrupt_footer_magic(
538 int wfd,
539 uint64_t seq);
540 void corrupt_header_magic(
541 int wfd,
542 uint64_t seq);
543 };
544
545 WRITE_CLASS_ENCODER(FileJournal::header_t)
546
547 #endif