]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/filestore/FileJournal.h
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / os / filestore / FileJournal.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #ifndef CEPH_FILEJOURNAL_H
17 #define CEPH_FILEJOURNAL_H
18
19 #include <deque>
20 using std::deque;
21
22 #include "Journal.h"
23 #include "common/Cond.h"
24 #include "common/Mutex.h"
25 #include "common/Thread.h"
26 #include "common/Throttle.h"
27 #include "JournalThrottle.h"
28 #include "common/zipkin_trace.h"
29
30 #ifdef HAVE_LIBAIO
31 # include <libaio.h>
32 #endif
33
34 /**
35 * Implements journaling on top of block device or file.
36 *
37 * Lock ordering is write_lock > aio_lock > (completions_lock | finisher_lock)
38 */
39 class FileJournal :
40 public Journal,
41 public md_config_obs_t {
42 public:
43 /// Protected by finisher_lock
44 struct completion_item {
45 uint64_t seq;
46 Context *finish;
47 utime_t start;
48 TrackedOpRef tracked_op;
49 completion_item(uint64_t o, Context *c, utime_t s, TrackedOpRef opref)
50 : seq(o), finish(c), start(s), tracked_op(opref) {}
51 completion_item() : seq(0), finish(0), start(0) {}
52 };
53 struct write_item {
54 uint64_t seq;
55 bufferlist bl;
56 uint32_t orig_len;
57 TrackedOpRef tracked_op;
58 ZTracer::Trace trace;
59 write_item(uint64_t s, bufferlist& b, int ol, TrackedOpRef opref) :
60 seq(s), orig_len(ol), tracked_op(opref) {
61 bl.claim(b, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy
62 }
63 write_item() : seq(0), orig_len(0) {}
64 };
65
66 Mutex finisher_lock;
67 Cond finisher_cond;
68 uint64_t journaled_seq;
69 bool plug_journal_completions;
70
71 Mutex writeq_lock;
72 Cond writeq_cond;
73 list<write_item> writeq;
74 bool writeq_empty();
75 write_item &peek_write();
76 void pop_write();
77 void batch_pop_write(list<write_item> &items);
78 void batch_unpop_write(list<write_item> &items);
79
80 Mutex completions_lock;
81 list<completion_item> completions;
82 bool completions_empty() {
83 Mutex::Locker l(completions_lock);
84 return completions.empty();
85 }
86 void batch_pop_completions(list<completion_item> &items) {
87 Mutex::Locker l(completions_lock);
88 completions.swap(items);
89 }
90 void batch_unpop_completions(list<completion_item> &items) {
91 Mutex::Locker l(completions_lock);
92 completions.splice(completions.begin(), items);
93 }
94 completion_item completion_peek_front() {
95 Mutex::Locker l(completions_lock);
96 assert(!completions.empty());
97 return completions.front();
98 }
99 void completion_pop_front() {
100 Mutex::Locker l(completions_lock);
101 assert(!completions.empty());
102 completions.pop_front();
103 }
104
105 int prepare_entry(vector<ObjectStore::Transaction>& tls, bufferlist* tbl) override;
106
107 void submit_entry(uint64_t seq, bufferlist& bl, uint32_t orig_len,
108 Context *oncommit,
109 TrackedOpRef osd_op = TrackedOpRef()) override;
110 /// End protected by finisher_lock
111
112 /*
113 * journal header
114 */
115 struct header_t {
116 enum {
117 FLAG_CRC = (1<<0),
118 // NOTE: remove kludgey weirdness in read_header() next time a flag is added.
119 };
120
121 uint64_t flags;
122 uuid_d fsid;
123 __u32 block_size;
124 __u32 alignment;
125 int64_t max_size; // max size of journal ring buffer
126 int64_t start; // offset of first entry
127 uint64_t committed_up_to; // committed up to
128
129 /**
130 * start_seq
131 *
132 * entry at header.start has sequence >= start_seq
133 *
134 * Generally, the entry at header.start will have sequence
135 * start_seq if it exists. The only exception is immediately
136 * after journal creation since the first sequence number is
137 * not known.
138 *
139 * If the first read on open fails, we can assume corruption
140 * if start_seq > committed_up_to because the entry would have
141 * a sequence >= start_seq and therefore > committed_up_to.
142 */
143 uint64_t start_seq;
144
145 header_t() :
146 flags(0), block_size(0), alignment(0), max_size(0), start(0),
147 committed_up_to(0), start_seq(0) {}
148
149 void clear() {
150 start = block_size;
151 }
152
153 uint64_t get_fsid64() const {
154 return *(uint64_t*)fsid.bytes();
155 }
156
157 void encode(bufferlist& bl) const {
158 __u32 v = 4;
159 ::encode(v, bl);
160 bufferlist em;
161 {
162 ::encode(flags, em);
163 ::encode(fsid, em);
164 ::encode(block_size, em);
165 ::encode(alignment, em);
166 ::encode(max_size, em);
167 ::encode(start, em);
168 ::encode(committed_up_to, em);
169 ::encode(start_seq, em);
170 }
171 ::encode(em, bl);
172 }
173 void decode(bufferlist::iterator& bl) {
174 __u32 v;
175 ::decode(v, bl);
176 if (v < 2) { // normally 0, but concievably 1
177 // decode old header_t struct (pre v0.40).
178 bl.advance(4); // skip __u32 flags (it was unused by any old code)
179 flags = 0;
180 uint64_t tfsid;
181 ::decode(tfsid, bl);
182 *(uint64_t*)&fsid.bytes()[0] = tfsid;
183 *(uint64_t*)&fsid.bytes()[8] = tfsid;
184 ::decode(block_size, bl);
185 ::decode(alignment, bl);
186 ::decode(max_size, bl);
187 ::decode(start, bl);
188 committed_up_to = 0;
189 start_seq = 0;
190 return;
191 }
192 bufferlist em;
193 ::decode(em, bl);
194 bufferlist::iterator t = em.begin();
195 ::decode(flags, t);
196 ::decode(fsid, t);
197 ::decode(block_size, t);
198 ::decode(alignment, t);
199 ::decode(max_size, t);
200 ::decode(start, t);
201
202 if (v > 2)
203 ::decode(committed_up_to, t);
204 else
205 committed_up_to = 0;
206
207 if (v > 3)
208 ::decode(start_seq, t);
209 else
210 start_seq = 0;
211 }
212 } header;
213
214 struct entry_header_t {
215 uint64_t seq; // fs op seq #
216 uint32_t crc32c; // payload only. not header, pre_pad, post_pad, or footer.
217 uint32_t len;
218 uint32_t pre_pad, post_pad;
219 uint64_t magic1;
220 uint64_t magic2;
221
222 static uint64_t make_magic(uint64_t seq, uint32_t len, uint64_t fsid) {
223 return (fsid ^ seq ^ len);
224 }
225 bool check_magic(off64_t pos, uint64_t fsid) {
226 return
227 magic1 == (uint64_t)pos &&
228 magic2 == (fsid ^ seq ^ len);
229 }
230 } __attribute__((__packed__, aligned(4)));
231
232 bool journalq_empty() { return journalq.empty(); }
233
234 private:
235 string fn;
236
237 char *zero_buf;
238 off64_t max_size;
239 size_t block_size;
240 bool directio, aio, force_aio;
241 bool must_write_header;
242 off64_t write_pos; // byte where the next entry to be written will go
243 off64_t read_pos; //
244 bool discard; //for block journal whether support discard
245
246 #ifdef HAVE_LIBAIO
247 /// state associated with an in-flight aio request
248 /// Protected by aio_lock
249 struct aio_info {
250 struct iocb iocb;
251 bufferlist bl;
252 struct iovec *iov;
253 bool done;
254 uint64_t off, len; ///< these are for debug only
255 uint64_t seq; ///< seq number to complete on aio completion, if non-zero
256
257 aio_info(bufferlist& b, uint64_t o, uint64_t s)
258 : iov(NULL), done(false), off(o), len(b.length()), seq(s) {
259 bl.claim(b);
260 }
261 ~aio_info() {
262 delete[] iov;
263 }
264 };
265 Mutex aio_lock;
266 Cond aio_cond;
267 Cond write_finish_cond;
268 io_context_t aio_ctx;
269 list<aio_info> aio_queue;
270 int aio_num, aio_bytes;
271 uint64_t aio_write_queue_ops;
272 uint64_t aio_write_queue_bytes;
273 /// End protected by aio_lock
274 #endif
275
276 uint64_t last_committed_seq;
277 uint64_t journaled_since_start;
278
279 /*
280 * full states cycle at the beginnging of each commit epoch, when commit_start()
281 * is called.
282 * FULL - we just filled up during this epoch.
283 * WAIT - we filled up last epoch; now we have to wait until everything during
284 * that epoch commits to the fs before we can start writing over it.
285 * NOTFULL - all good, journal away.
286 */
287 enum {
288 FULL_NOTFULL = 0,
289 FULL_FULL = 1,
290 FULL_WAIT = 2,
291 } full_state;
292
293 int fd;
294
295 // in journal
296 deque<pair<uint64_t, off64_t> > journalq; // track seq offsets, so we can trim later.
297 uint64_t writing_seq;
298
299
300 // throttle
301 int set_throttle_params();
302 const char** get_tracked_conf_keys() const override;
303 void handle_conf_change(
304 const struct md_config_t *conf,
305 const std::set <std::string> &changed) override {
306 for (const char **i = get_tracked_conf_keys();
307 *i;
308 ++i) {
309 if (changed.count(string(*i))) {
310 set_throttle_params();
311 return;
312 }
313 }
314 }
315
316 void complete_write(uint64_t ops, uint64_t bytes);
317 JournalThrottle throttle;
318
319 // write thread
320 Mutex write_lock;
321 bool write_stop;
322 bool aio_stop;
323
324 Cond commit_cond;
325
326 int _open(bool wr, bool create=false);
327 int _open_block_device();
328 void _close(int fd) const;
329 int _open_file(int64_t oldsize, blksize_t blksize, bool create);
330 int _dump(ostream& out, bool simple);
331 void print_header(const header_t &hdr) const;
332 int read_header(header_t *hdr) const;
333 bufferptr prepare_header();
334 void start_writer();
335 void stop_writer();
336 void write_thread_entry();
337
338 void queue_completions_thru(uint64_t seq);
339
340 int check_for_full(uint64_t seq, off64_t pos, off64_t size);
341 int prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytee);
342 int prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos,
343 uint64_t& orig_ops, uint64_t& orig_bytes);
344 void do_write(bufferlist& bl);
345
346 void write_finish_thread_entry();
347 void check_aio_completion();
348 void do_aio_write(bufferlist& bl);
349 int write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq);
350
351
352 void check_align(off64_t pos, bufferlist& bl);
353 int write_bl(off64_t& pos, bufferlist& bl);
354
355 /// read len from journal starting at in_pos and wrapping up to len
356 void wrap_read_bl(
357 off64_t in_pos, ///< [in] start position
358 int64_t len, ///< [in] length to read
359 bufferlist* bl, ///< [out] result
360 off64_t *out_pos ///< [out] next position to read, will be wrapped
361 ) const;
362
363 void do_discard(int64_t offset, int64_t end);
364
365 class Writer : public Thread {
366 FileJournal *journal;
367 public:
368 explicit Writer(FileJournal *fj) : journal(fj) {}
369 void *entry() override {
370 journal->write_thread_entry();
371 return 0;
372 }
373 } write_thread;
374
375 class WriteFinisher : public Thread {
376 FileJournal *journal;
377 public:
378 explicit WriteFinisher(FileJournal *fj) : journal(fj) {}
379 void *entry() override {
380 journal->write_finish_thread_entry();
381 return 0;
382 }
383 } write_finish_thread;
384
385 off64_t get_top() const {
386 return ROUND_UP_TO(sizeof(header), block_size);
387 }
388
389 ZTracer::Endpoint trace_endpoint;
390
391 public:
392 FileJournal(CephContext* cct, uuid_d fsid, Finisher *fin, Cond *sync_cond,
393 const char *f, bool dio=false, bool ai=true, bool faio=false) :
394 Journal(cct, fsid, fin, sync_cond),
395 finisher_lock("FileJournal::finisher_lock", false, true, false, cct),
396 journaled_seq(0),
397 plug_journal_completions(false),
398 writeq_lock("FileJournal::writeq_lock", false, true, false, cct),
399 completions_lock(
400 "FileJournal::completions_lock", false, true, false, cct),
401 fn(f),
402 zero_buf(NULL),
403 max_size(0), block_size(0),
404 directio(dio), aio(ai), force_aio(faio),
405 must_write_header(false),
406 write_pos(0), read_pos(0),
407 discard(false),
408 #ifdef HAVE_LIBAIO
409 aio_lock("FileJournal::aio_lock"),
410 aio_ctx(0),
411 aio_num(0), aio_bytes(0),
412 aio_write_queue_ops(0),
413 aio_write_queue_bytes(0),
414 #endif
415 last_committed_seq(0),
416 journaled_since_start(0),
417 full_state(FULL_NOTFULL),
418 fd(-1),
419 writing_seq(0),
420 throttle(cct->_conf->filestore_caller_concurrency),
421 write_lock("FileJournal::write_lock", false, true, false, cct),
422 write_stop(true),
423 aio_stop(true),
424 write_thread(this),
425 write_finish_thread(this),
426 trace_endpoint("0.0.0.0", 0, "FileJournal") {
427
428 if (aio && !directio) {
429 lderr(cct) << "FileJournal::_open_any: aio not supported without directio; disabling aio" << dendl;
430 aio = false;
431 }
432 #ifndef HAVE_LIBAIO
433 if (aio) {
434 lderr(cct) << "FileJournal::_open_any: libaio not compiled in; disabling aio" << dendl;
435 aio = false;
436 }
437 #endif
438
439 cct->_conf->add_observer(this);
440 }
441 ~FileJournal() override {
442 assert(fd == -1);
443 delete[] zero_buf;
444 cct->_conf->remove_observer(this);
445 }
446
447 int check() override;
448 int create() override;
449 int open(uint64_t fs_op_seq) override;
450 void close() override;
451 int peek_fsid(uuid_d& fsid);
452
453 int dump(ostream& out) override;
454 int simple_dump(ostream& out);
455 int _fdump(Formatter &f, bool simple);
456
457 void flush() override;
458
459 void reserve_throttle_and_backoff(uint64_t count) override;
460
461 bool is_writeable() override {
462 return read_pos == 0;
463 }
464 int make_writeable() override;
465
466 // writes
467 void commit_start(uint64_t seq) override;
468 void committed_thru(uint64_t seq) override;
469 bool should_commit_now() override {
470 return full_state != FULL_NOTFULL && !write_stop;
471 }
472
473 void write_header_sync();
474
475 void set_wait_on_full(bool b) { wait_on_full = b; }
476
477 off64_t get_journal_size_estimate();
478
479 // reads
480
481 /// Result code for read_entry
482 enum read_entry_result {
483 SUCCESS,
484 FAILURE,
485 MAYBE_CORRUPT
486 };
487
488 /**
489 * read_entry
490 *
491 * Reads next entry starting at pos. If the entry appears
492 * clean, *bl will contain the payload, *seq will contain
493 * the sequence number, and *out_pos will reflect the next
494 * read position. If the entry is invalid *ss will contain
495 * debug text, while *seq, *out_pos, and *bl will be unchanged.
496 *
497 * If the entry suggests a corrupt log, *ss will contain debug
498 * text, *out_pos will contain the next index to check. If
499 * we find an entry in this way that returns SUCCESS, the journal
500 * is most likely corrupt.
501 */
502 read_entry_result do_read_entry(
503 off64_t pos, ///< [in] position to read
504 off64_t *next_pos, ///< [out] next position to read
505 bufferlist* bl, ///< [out] payload for successful read
506 uint64_t *seq, ///< [out] seq of successful read
507 ostream *ss, ///< [out] error output
508 entry_header_t *h = 0 ///< [out] header
509 ) const; ///< @return result code
510
511 bool read_entry(
512 bufferlist &bl,
513 uint64_t &last_seq,
514 bool *corrupt
515 );
516
517 bool read_entry(
518 bufferlist &bl,
519 uint64_t &last_seq) override {
520 return read_entry(bl, last_seq, 0);
521 }
522
523 // Debug/Testing
524 void get_header(
525 uint64_t wanted_seq,
526 off64_t *_pos,
527 entry_header_t *h);
528 void corrupt(
529 int wfd,
530 off64_t corrupt_at);
531 void corrupt_payload(
532 int wfd,
533 uint64_t seq);
534 void corrupt_footer_magic(
535 int wfd,
536 uint64_t seq);
537 void corrupt_header_magic(
538 int wfd,
539 uint64_t seq);
540 };
541
542 WRITE_CLASS_ENCODER(FileJournal::header_t)
543
544 #endif