]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/filestore/JournalingObjectStore.h
buildsys: change download over to reef release
[ceph.git] / ceph / src / os / filestore / JournalingObjectStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_JOURNALINGOBJECTSTORE_H
16 #define CEPH_JOURNALINGOBJECTSTORE_H
17
18 #include "os/ObjectStore.h"
19 #include "Journal.h"
20 #include "FileJournal.h"
21 #include "osd/OpRequest.h"
22
23 class JournalingObjectStore : public ObjectStore {
24 protected:
25 Journal *journal;
26 Finisher finisher;
27
28
29 class SubmitManager {
30 CephContext* cct;
31 ceph::mutex lock = ceph::make_mutex("JOS::SubmitManager::lock");
32 uint64_t op_seq;
33 uint64_t op_submitted;
34 public:
35 SubmitManager(CephContext* cct) :
36 cct(cct),
37 op_seq(0), op_submitted(0)
38 {}
39 uint64_t op_submit_start();
40 void op_submit_finish(uint64_t op);
41 void set_op_seq(uint64_t seq) {
42 std::lock_guard l{lock};
43 op_submitted = op_seq = seq;
44 }
45 uint64_t get_op_seq() {
46 return op_seq;
47 }
48 } submit_manager;
49
50 class ApplyManager {
51 CephContext* cct;
52 Journal *&journal;
53 Finisher &finisher;
54
55 ceph::mutex apply_lock = ceph::make_mutex("JOS::ApplyManager::apply_lock");
56 bool blocked;
57 ceph::condition_variable blocked_cond;
58 int open_ops;
59 uint64_t max_applied_seq;
60
61 ceph::mutex com_lock = ceph::make_mutex("JOS::ApplyManager::com_lock");
62 std::map<version_t, std::vector<Context*> > commit_waiters;
63 uint64_t committing_seq, committed_seq;
64
65 public:
66 ApplyManager(CephContext* cct, Journal *&j, Finisher &f) :
67 cct(cct), journal(j), finisher(f),
68 blocked(false),
69 open_ops(0),
70 max_applied_seq(0),
71 committing_seq(0), committed_seq(0) {}
72 void reset() {
73 ceph_assert(open_ops == 0);
74 ceph_assert(blocked == false);
75 max_applied_seq = 0;
76 committing_seq = 0;
77 committed_seq = 0;
78 }
79 void add_waiter(uint64_t, Context*);
80 uint64_t op_apply_start(uint64_t op);
81 void op_apply_finish(uint64_t op);
82 bool commit_start();
83 void commit_started();
84 void commit_finish();
85 bool is_committing() {
86 std::lock_guard l{com_lock};
87 return committing_seq != committed_seq;
88 }
89 uint64_t get_committed_seq() {
90 std::lock_guard l{com_lock};
91 return committed_seq;
92 }
93 uint64_t get_committing_seq() {
94 std::lock_guard l{com_lock};
95 return committing_seq;
96 }
97 void init_seq(uint64_t fs_op_seq) {
98 {
99 std::lock_guard l{com_lock};
100 committed_seq = fs_op_seq;
101 committing_seq = fs_op_seq;
102 }
103 {
104 std::lock_guard l{apply_lock};
105 max_applied_seq = fs_op_seq;
106 }
107 }
108 } apply_manager;
109
110 bool replaying;
111
112 protected:
113 void journal_start();
114 void journal_stop();
115 void journal_write_close();
116 int journal_replay(uint64_t fs_op_seq);
117
118 void _op_journal_transactions(ceph::buffer::list& tls, uint32_t orig_len, uint64_t op,
119 Context *onjournal, TrackedOpRef osd_op);
120
121 virtual int do_transactions(std::vector<ObjectStore::Transaction>& tls, uint64_t op_seq) = 0;
122
123 public:
124 bool is_committing() {
125 return apply_manager.is_committing();
126 }
127 uint64_t get_committed_seq() {
128 return apply_manager.get_committed_seq();
129 }
130
131 public:
132 JournalingObjectStore(CephContext* cct, const std::string& path)
133 : ObjectStore(cct, path),
134 journal(NULL),
135 finisher(cct, "JournalObjectStore", "fn_jrn_objstore"),
136 submit_manager(cct),
137 apply_manager(cct, journal, finisher),
138 replaying(false) {}
139
140 ~JournalingObjectStore() override {
141 }
142 };
143
144 #endif