]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/filestore/FileJournal.h
buildsys: change download over to reef release
[ceph.git] / ceph / src / os / filestore / FileJournal.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #ifndef CEPH_FILEJOURNAL_H
17 #define CEPH_FILEJOURNAL_H
18
19 #include <condition_variable>
20 #include <deque>
21 #include <mutex>
22 #include <stdlib.h>
23 using std::deque;
24
25 #include "Journal.h"
26 #include "common/config_fwd.h"
27 #include "common/Cond.h"
28 #include "common/Thread.h"
29 #include "common/Throttle.h"
30 #include "JournalThrottle.h"
31 #include "common/zipkin_trace.h"
32
33 #ifdef HAVE_LIBAIO
34 # include <libaio.h>
35 #endif
36
37 // re-include our assert to clobber the system one; fix dout:
38 #include "include/ceph_assert.h"
39
40 /**
41 * Implements journaling on top of block device or file.
42 *
43 * Lock ordering is write_lock > aio_lock > (completions_lock | finisher_lock)
44 */
45 class FileJournal :
46 public Journal,
47 public md_config_obs_t {
48 public:
49 /// Protected by finisher_lock
50 struct completion_item {
51 uint64_t seq;
52 Context *finish;
53 utime_t start;
54 TrackedOpRef tracked_op;
55 completion_item(uint64_t o, Context *c, utime_t s, TrackedOpRef opref)
56 : seq(o), finish(c), start(s), tracked_op(opref) {}
57 completion_item() : seq(0), finish(0), start(0) {}
58 };
59 struct write_item {
60 uint64_t seq;
61 ceph::buffer::list bl;
62 uint32_t orig_len;
63 TrackedOpRef tracked_op;
64 ZTracer::Trace trace;
65 write_item(uint64_t s, ceph::buffer::list& b, int ol, TrackedOpRef opref) :
66 seq(s), orig_len(ol), tracked_op(opref) {
67 bl = std::move(b);
68 }
69 write_item() : seq(0), orig_len(0) {}
70 };
71
72 ceph::mutex finisher_lock = ceph::make_mutex("FileJournal::finisher_lock");
73 ceph::condition_variable finisher_cond;
74 uint64_t journaled_seq;
75 bool plug_journal_completions;
76
77 ceph::mutex writeq_lock = ceph::make_mutex("FileJournal::writeq_lock");
78 ceph::condition_variable writeq_cond;
79 std::list<write_item> writeq;
80 bool writeq_empty();
81 write_item &peek_write();
82 void pop_write();
83 void batch_pop_write(std::list<write_item> &items);
84 void batch_unpop_write(std::list<write_item> &items);
85
86 ceph::mutex completions_lock =
87 ceph::make_mutex("FileJournal::completions_lock");
88 std::list<completion_item> completions;
89 bool completions_empty() {
90 std::lock_guard l{completions_lock};
91 return completions.empty();
92 }
93 void batch_pop_completions(std::list<completion_item> &items) {
94 std::lock_guard l{completions_lock};
95 completions.swap(items);
96 }
97 void batch_unpop_completions(std::list<completion_item> &items) {
98 std::lock_guard l{completions_lock};
99 completions.splice(completions.begin(), items);
100 }
101 completion_item completion_peek_front() {
102 std::lock_guard l{completions_lock};
103 ceph_assert(!completions.empty());
104 return completions.front();
105 }
106 void completion_pop_front() {
107 std::lock_guard l{completions_lock};
108 ceph_assert(!completions.empty());
109 completions.pop_front();
110 }
111
112 int prepare_entry(std::vector<ObjectStore::Transaction>& tls, ceph::buffer::list* tbl) override;
113
114 void submit_entry(uint64_t seq, ceph::buffer::list& bl, uint32_t orig_len,
115 Context *oncommit,
116 TrackedOpRef osd_op = TrackedOpRef()) override;
117 /// End protected by finisher_lock
118
119 /*
120 * journal header
121 */
122 struct header_t {
123 enum {
124 FLAG_CRC = (1<<0),
125 // NOTE: remove kludgey weirdness in read_header() next time a flag is added.
126 };
127
128 uint64_t flags;
129 uuid_d fsid;
130 __u32 block_size;
131 __u32 alignment;
132 int64_t max_size; // max size of journal ring buffer
133 int64_t start; // offset of first entry
134 uint64_t committed_up_to; // committed up to
135
136 /**
137 * start_seq
138 *
139 * entry at header.start has sequence >= start_seq
140 *
141 * Generally, the entry at header.start will have sequence
142 * start_seq if it exists. The only exception is immediately
143 * after journal creation since the first sequence number is
144 * not known.
145 *
146 * If the first read on open fails, we can assume corruption
147 * if start_seq > committed_up_to because the entry would have
148 * a sequence >= start_seq and therefore > committed_up_to.
149 */
150 uint64_t start_seq;
151
152 header_t() :
153 flags(0), block_size(0), alignment(0), max_size(0), start(0),
154 committed_up_to(0), start_seq(0) {}
155
156 void clear() {
157 start = block_size;
158 }
159
160 uint64_t get_fsid64() const {
161 return *(uint64_t*)fsid.bytes();
162 }
163
164 void encode(ceph::buffer::list& bl) const {
165 using ceph::encode;
166 __u32 v = 4;
167 encode(v, bl);
168 ceph::buffer::list em;
169 {
170 encode(flags, em);
171 encode(fsid, em);
172 encode(block_size, em);
173 encode(alignment, em);
174 encode(max_size, em);
175 encode(start, em);
176 encode(committed_up_to, em);
177 encode(start_seq, em);
178 }
179 encode(em, bl);
180 }
181 void decode(ceph::buffer::list::const_iterator& bl) {
182 using ceph::decode;
183 __u32 v;
184 decode(v, bl);
185 if (v < 2) { // normally 0, but conceivably 1
186 // decode old header_t struct (pre v0.40).
187 bl += 4u; // skip __u32 flags (it was unused by any old code)
188 flags = 0;
189 uint64_t tfsid;
190 decode(tfsid, bl);
191 *(uint64_t*)&fsid.bytes()[0] = tfsid;
192 *(uint64_t*)&fsid.bytes()[8] = tfsid;
193 decode(block_size, bl);
194 decode(alignment, bl);
195 decode(max_size, bl);
196 decode(start, bl);
197 committed_up_to = 0;
198 start_seq = 0;
199 return;
200 }
201 ceph::buffer::list em;
202 decode(em, bl);
203 auto t = em.cbegin();
204 decode(flags, t);
205 decode(fsid, t);
206 decode(block_size, t);
207 decode(alignment, t);
208 decode(max_size, t);
209 decode(start, t);
210
211 if (v > 2)
212 decode(committed_up_to, t);
213 else
214 committed_up_to = 0;
215
216 if (v > 3)
217 decode(start_seq, t);
218 else
219 start_seq = 0;
220 }
221 } header;
222
223 struct entry_header_t {
224 uint64_t seq; // fs op seq #
225 uint32_t crc32c; // payload only. not header, pre_pad, post_pad, or footer.
226 uint32_t len;
227 uint32_t pre_pad, post_pad;
228 uint64_t magic1;
229 uint64_t magic2;
230
231 static uint64_t make_magic(uint64_t seq, uint32_t len, uint64_t fsid) {
232 return (fsid ^ seq ^ len);
233 }
234 bool check_magic(off64_t pos, uint64_t fsid) {
235 return
236 magic1 == (uint64_t)pos &&
237 magic2 == (fsid ^ seq ^ len);
238 }
239 } __attribute__((__packed__, aligned(4)));
240
241 bool journalq_empty() { return journalq.empty(); }
242
243 private:
244 std::string fn;
245
246 char *zero_buf;
247 off64_t max_size;
248 size_t block_size;
249 bool directio, aio, force_aio;
250 bool must_write_header;
251 off64_t write_pos; // byte where the next entry to be written will go
252 off64_t read_pos; //
253 bool discard; //for block journal whether support discard
254
255 #ifdef HAVE_LIBAIO
256 /// state associated with an in-flight aio request
257 /// Protected by aio_lock
258 struct aio_info {
259 struct iocb iocb {};
260 ceph::buffer::list bl;
261 struct iovec *iov;
262 bool done;
263 uint64_t off, len; ///< these are for debug only
264 uint64_t seq; ///< seq number to complete on aio completion, if non-zero
265
266 aio_info(ceph::buffer::list& b, uint64_t o, uint64_t s)
267 : iov(NULL), done(false), off(o), len(b.length()), seq(s) {
268 bl = std::move(b);
269 }
270 ~aio_info() {
271 delete[] iov;
272 }
273 };
274 ceph::mutex aio_lock = ceph::make_mutex("FileJournal::aio_lock");
275 ceph::condition_variable aio_cond;
276 ceph::condition_variable write_finish_cond;
277 io_context_t aio_ctx = 0;
278 std::list<aio_info> aio_queue;
279 int aio_num = 0, aio_bytes = 0;
280 uint64_t aio_write_queue_ops = 0;
281 uint64_t aio_write_queue_bytes = 0;
282 /// End protected by aio_lock
283 #endif
284
285 uint64_t last_committed_seq;
286 uint64_t journaled_since_start;
287
288 std::string devname;
289
290 /*
291 * full states cycle at the beginnging of each commit epoch, when commit_start()
292 * is called.
293 * FULL - we just filled up during this epoch.
294 * WAIT - we filled up last epoch; now we have to wait until everything during
295 * that epoch commits to the fs before we can start writing over it.
296 * NOTFULL - all good, journal away.
297 */
298 enum {
299 FULL_NOTFULL = 0,
300 FULL_FULL = 1,
301 FULL_WAIT = 2,
302 } full_state;
303
304 int fd;
305
306 // in journal
307 std::deque<std::pair<uint64_t, off64_t> > journalq; // track seq offsets, so we can trim later.
308 uint64_t writing_seq;
309
310
311 // throttle
312 int set_throttle_params();
313 const char** get_tracked_conf_keys() const override;
314 void handle_conf_change(
315 const ConfigProxy& conf,
316 const std::set <std::string> &changed) override {
317 for (const char **i = get_tracked_conf_keys();
318 *i;
319 ++i) {
320 if (changed.count(std::string(*i))) {
321 set_throttle_params();
322 return;
323 }
324 }
325 }
326
327 void complete_write(uint64_t ops, uint64_t bytes);
328 JournalThrottle throttle;
329
330 // write thread
331 ceph::mutex write_lock = ceph::make_mutex("FileJournal::write_lock");
332 bool write_stop;
333 bool aio_stop;
334
335 ceph::condition_variable commit_cond;
336
337 int _open(bool wr, bool create=false);
338 int _open_block_device();
339 void _close(int fd) const;
340 int _open_file(int64_t oldsize, blksize_t blksize, bool create);
341 int _dump(std::ostream& out, bool simple);
342 void print_header(const header_t &hdr) const;
343 int read_header(header_t *hdr) const;
344 ceph::bufferptr prepare_header();
345 void start_writer();
346 void stop_writer();
347 void write_thread_entry();
348
349 void queue_completions_thru(uint64_t seq);
350
351 int check_for_full(uint64_t seq, off64_t pos, off64_t size);
352 int prepare_multi_write(ceph::buffer::list& bl, uint64_t& orig_ops, uint64_t& orig_bytee);
353 int prepare_single_write(write_item &next_write, ceph::buffer::list& bl, off64_t& queue_pos,
354 uint64_t& orig_ops, uint64_t& orig_bytes);
355 void do_write(ceph::buffer::list& bl);
356
357 void write_finish_thread_entry();
358 void check_aio_completion();
359 void do_aio_write(ceph::buffer::list& bl);
360 int write_aio_bl(off64_t& pos, ceph::buffer::list& bl, uint64_t seq);
361
362
363 void check_align(off64_t pos, ceph::buffer::list& bl);
364 int write_bl(off64_t& pos, ceph::buffer::list& bl);
365
366 /// read len from journal starting at in_pos and wrapping up to len
367 void wrap_read_bl(
368 off64_t in_pos, ///< [in] start position
369 int64_t len, ///< [in] length to read
370 ceph::buffer::list* bl, ///< [out] result
371 off64_t *out_pos ///< [out] next position to read, will be wrapped
372 ) const;
373
374 void do_discard(int64_t offset, int64_t end);
375
376 class Writer : public Thread {
377 FileJournal *journal;
378 public:
379 explicit Writer(FileJournal *fj) : journal(fj) {}
380 void *entry() override {
381 journal->write_thread_entry();
382 return 0;
383 }
384 } write_thread;
385
386 class WriteFinisher : public Thread {
387 FileJournal *journal;
388 public:
389 explicit WriteFinisher(FileJournal *fj) : journal(fj) {}
390 void *entry() override {
391 journal->write_finish_thread_entry();
392 return 0;
393 }
394 } write_finish_thread;
395
396 off64_t get_top() const {
397 return round_up_to(sizeof(header), block_size);
398 }
399
400 ZTracer::Endpoint trace_endpoint;
401
402 public:
403 FileJournal(CephContext* cct, uuid_d fsid, Finisher *fin, ceph::condition_variable *sync_cond,
404 const char *f, bool dio=false, bool ai=true, bool faio=false) :
405 Journal(cct, fsid, fin, sync_cond),
406 journaled_seq(0),
407 plug_journal_completions(false),
408 fn(f),
409 zero_buf(NULL),
410 max_size(0), block_size(0),
411 directio(dio), aio(ai), force_aio(faio),
412 must_write_header(false),
413 write_pos(0), read_pos(0),
414 discard(false),
415 last_committed_seq(0),
416 journaled_since_start(0),
417 full_state(FULL_NOTFULL),
418 fd(-1),
419 writing_seq(0),
420 throttle(cct, cct->_conf->filestore_caller_concurrency),
421 write_stop(true),
422 aio_stop(true),
423 write_thread(this),
424 write_finish_thread(this),
425 trace_endpoint("0.0.0.0", 0, "FileJournal") {
426
427 if (aio && !directio) {
428 lderr(cct) << "FileJournal::_open_any: aio not supported without directio; disabling aio" << dendl;
429 aio = false;
430 }
431 #ifndef HAVE_LIBAIO
432 if (aio && ::getenv("CEPH_DEV") == NULL) {
433 lderr(cct) << "FileJournal::_open_any: libaio not compiled in; disabling aio" << dendl;
434 aio = false;
435 }
436 #endif
437
438 cct->_conf.add_observer(this);
439 }
440 ~FileJournal() override {
441 ceph_assert(fd == -1);
442 delete[] zero_buf;
443 cct->_conf.remove_observer(this);
444 }
445
446 int check() override;
447 int create() override;
448 int open(uint64_t fs_op_seq) override;
449 void close() override;
450 int peek_fsid(uuid_d& fsid);
451
452 int dump(std::ostream& out) override;
453 int simple_dump(std::ostream& out);
454 int _fdump(ceph::Formatter &f, bool simple);
455
456 void flush() override;
457
458 void get_devices(std::set<std::string> *ls) override;
459 void collect_metadata(std::map<std::string,std::string> *pm) override;
460
461 void reserve_throttle_and_backoff(uint64_t count) override;
462
463 bool is_writeable() override {
464 return read_pos == 0;
465 }
466 int make_writeable() override;
467
468 // writes
469 void commit_start(uint64_t seq) override;
470 void committed_thru(uint64_t seq) override;
471 bool should_commit_now() override {
472 return full_state != FULL_NOTFULL && !write_stop;
473 }
474
475 void write_header_sync();
476
477 void set_wait_on_full(bool b) { wait_on_full = b; }
478
479 off64_t get_journal_size_estimate() override;
480
481 // reads
482
483 /// Result code for read_entry
484 enum read_entry_result {
485 SUCCESS,
486 FAILURE,
487 MAYBE_CORRUPT
488 };
489
490 /**
491 * read_entry
492 *
493 * Reads next entry starting at pos. If the entry appears
494 * clean, *bl will contain the payload, *seq will contain
495 * the sequence number, and *out_pos will reflect the next
496 * read position. If the entry is invalid *ss will contain
497 * debug text, while *seq, *out_pos, and *bl will be unchanged.
498 *
499 * If the entry suggests a corrupt log, *ss will contain debug
500 * text, *out_pos will contain the next index to check. If
501 * we find an entry in this way that returns SUCCESS, the journal
502 * is most likely corrupt.
503 */
504 read_entry_result do_read_entry(
505 off64_t pos, ///< [in] position to read
506 off64_t *next_pos, ///< [out] next position to read
507 ceph::buffer::list* bl, ///< [out] payload for successful read
508 uint64_t *seq, ///< [out] seq of successful read
509 std::ostream *ss, ///< [out] error output
510 entry_header_t *h = 0 ///< [out] header
511 ) const; ///< @return result code
512
513 bool read_entry(
514 ceph::buffer::list &bl,
515 uint64_t &last_seq,
516 bool *corrupt
517 );
518
519 bool read_entry(
520 ceph::buffer::list &bl,
521 uint64_t &last_seq) override {
522 return read_entry(bl, last_seq, 0);
523 }
524
525 // Debug/Testing
526 void get_header(
527 uint64_t wanted_seq,
528 off64_t *_pos,
529 entry_header_t *h);
530 void corrupt(
531 int wfd,
532 off64_t corrupt_at);
533 void corrupt_payload(
534 int wfd,
535 uint64_t seq);
536 void corrupt_footer_magic(
537 int wfd,
538 uint64_t seq);
539 void corrupt_header_magic(
540 int wfd,
541 uint64_t seq);
542 };
543
544 WRITE_CLASS_ENCODER(FileJournal::header_t)
545
546 #endif