]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/filestore/JournalingObjectStore.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / os / filestore / JournalingObjectStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2
3 #include "JournalingObjectStore.h"
4
5 #include "common/errno.h"
6 #include "common/debug.h"
7
8 #define dout_context cct
9 #define dout_subsys ceph_subsys_journal
10 #undef dout_prefix
11 #define dout_prefix *_dout << "journal "
12
13
14
15 void JournalingObjectStore::journal_start()
16 {
17 dout(10) << "journal_start" << dendl;
18 finisher.start();
19 }
20
21 void JournalingObjectStore::journal_stop()
22 {
23 dout(10) << "journal_stop" << dendl;
24 finisher.wait_for_empty();
25 finisher.stop();
26 }
27
28 // A journal_replay() makes journal writeable, this closes that out.
29 void JournalingObjectStore::journal_write_close()
30 {
31 if (journal) {
32 journal->close();
33 delete journal;
34 journal = 0;
35 }
36 apply_manager.reset();
37 }
38
39 int JournalingObjectStore::journal_replay(uint64_t fs_op_seq)
40 {
41 dout(10) << "journal_replay fs op_seq " << fs_op_seq << dendl;
42
43 if (cct->_conf->journal_replay_from) {
44 dout(0) << "journal_replay forcing replay from "
45 << cct->_conf->journal_replay_from
46 << " instead of " << fs_op_seq << dendl;
47 // the previous op is the last one committed
48 fs_op_seq = cct->_conf->journal_replay_from - 1;
49 }
50
51 uint64_t op_seq = fs_op_seq;
52 apply_manager.init_seq(fs_op_seq);
53
54 if (!journal) {
55 submit_manager.set_op_seq(op_seq);
56 return 0;
57 }
58
59 int err = journal->open(op_seq);
60 if (err < 0) {
61 dout(3) << "journal_replay open failed with "
62 << cpp_strerror(err) << dendl;
63 delete journal;
64 journal = 0;
65 return err;
66 }
67
68 replaying = true;
69
70 int count = 0;
71 while (1) {
72 bufferlist bl;
73 uint64_t seq = op_seq + 1;
74 if (!journal->read_entry(bl, seq)) {
75 dout(3) << "journal_replay: end of journal, done." << dendl;
76 break;
77 }
78
79 if (seq <= op_seq) {
80 dout(3) << "journal_replay: skipping old op seq " << seq << " <= " << op_seq << dendl;
81 continue;
82 }
83 ceph_assert(op_seq == seq-1);
84
85 dout(3) << "journal_replay: applying op seq " << seq << dendl;
86 auto p = bl.cbegin();
87 vector<ObjectStore::Transaction> tls;
88 while (!p.end()) {
89 tls.emplace_back(Transaction(p));
90 }
91
92 apply_manager.op_apply_start(seq);
93 int r = do_transactions(tls, seq);
94 apply_manager.op_apply_finish(seq);
95
96 op_seq = seq;
97 count++;
98
99 dout(3) << "journal_replay: r = " << r << ", op_seq now " << op_seq << dendl;
100 }
101
102 if (count)
103 dout(3) << "journal_replay: total = " << count << dendl;
104
105 replaying = false;
106
107 submit_manager.set_op_seq(op_seq);
108
109 // done reading, make writeable.
110 err = journal->make_writeable();
111 if (err < 0)
112 return err;
113
114 if (!count)
115 journal->committed_thru(fs_op_seq);
116
117 return count;
118 }
119
120
121 // ------------------------------------
122
123 uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op)
124 {
125 std::unique_lock l{apply_lock};
126 blocked_cond.wait(l, [this] {
127 if (blocked) {
128 dout(10) << "op_apply_start blocked, waiting" << dendl;
129 }
130 return !blocked;
131 });
132 dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> "
133 << (open_ops+1) << dendl;
134 ceph_assert(!blocked);
135 ceph_assert(op > committed_seq);
136 open_ops++;
137 return op;
138 }
139
140 void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op)
141 {
142 std::lock_guard l{apply_lock};
143 dout(10) << "op_apply_finish " << op << " open_ops " << open_ops << " -> "
144 << (open_ops-1) << ", max_applied_seq " << max_applied_seq << " -> "
145 << std::max(op, max_applied_seq) << dendl;
146 --open_ops;
147 ceph_assert(open_ops >= 0);
148
149 // signal a blocked commit_start
150 if (blocked) {
151 blocked_cond.notify_all();
152 }
153
154 // there can be multiple applies in flight; track the max value we
155 // note. note that we can't _read_ this value and learn anything
156 // meaningful unless/until we've quiesced all in-flight applies.
157 if (op > max_applied_seq)
158 max_applied_seq = op;
159 }
160
161 uint64_t JournalingObjectStore::SubmitManager::op_submit_start()
162 {
163 lock.lock();
164 uint64_t op = ++op_seq;
165 dout(10) << "op_submit_start " << op << dendl;
166 return op;
167 }
168
169 void JournalingObjectStore::SubmitManager::op_submit_finish(uint64_t op)
170 {
171 dout(10) << "op_submit_finish " << op << dendl;
172 if (op != op_submitted + 1) {
173 dout(0) << "op_submit_finish " << op << " expected " << (op_submitted + 1)
174 << ", OUT OF ORDER" << dendl;
175 ceph_abort_msg("out of order op_submit_finish");
176 }
177 op_submitted = op;
178 lock.unlock();
179 }
180
181
182 // ------------------------------------------
183
184 void JournalingObjectStore::ApplyManager::add_waiter(uint64_t op, Context *c)
185 {
186 std::lock_guard l{com_lock};
187 ceph_assert(c);
188 commit_waiters[op].push_back(c);
189 }
190
191 bool JournalingObjectStore::ApplyManager::commit_start()
192 {
193 bool ret = false;
194
195 {
196 std::unique_lock l{apply_lock};
197 dout(10) << "commit_start max_applied_seq " << max_applied_seq
198 << ", open_ops " << open_ops << dendl;
199 blocked = true;
200 blocked_cond.wait(l, [this] {
201 if (open_ops > 0) {
202 dout(10) << "commit_start waiting for " << open_ops
203 << " open ops to drain" << dendl;
204 }
205 return open_ops == 0;
206 });
207 ceph_assert(open_ops == 0);
208 dout(10) << "commit_start blocked, all open_ops have completed" << dendl;
209 {
210 std::lock_guard l{com_lock};
211 if (max_applied_seq == committed_seq) {
212 dout(10) << "commit_start nothing to do" << dendl;
213 blocked = false;
214 ceph_assert(commit_waiters.empty());
215 goto out;
216 }
217
218 committing_seq = max_applied_seq;
219
220 dout(10) << "commit_start committing " << committing_seq
221 << ", still blocked" << dendl;
222 }
223 }
224 ret = true;
225
226 if (journal)
227 journal->commit_start(committing_seq); // tell the journal too
228 out:
229 return ret;
230 }
231
232 void JournalingObjectStore::ApplyManager::commit_started()
233 {
234 std::lock_guard l{apply_lock};
235 // allow new ops. (underlying fs should now be committing all prior ops)
236 dout(10) << "commit_started committing " << committing_seq << ", unblocking"
237 << dendl;
238 blocked = false;
239 blocked_cond.notify_all();
240 }
241
242 void JournalingObjectStore::ApplyManager::commit_finish()
243 {
244 std::lock_guard l{com_lock};
245 dout(10) << "commit_finish thru " << committing_seq << dendl;
246
247 if (journal)
248 journal->committed_thru(committing_seq);
249
250 committed_seq = committing_seq;
251
252 map<version_t, vector<Context*> >::iterator p = commit_waiters.begin();
253 while (p != commit_waiters.end() &&
254 p->first <= committing_seq) {
255 finisher.queue(p->second);
256 commit_waiters.erase(p++);
257 }
258 }
259
260 void JournalingObjectStore::_op_journal_transactions(
261 bufferlist& tbl, uint32_t orig_len, uint64_t op,
262 Context *onjournal, TrackedOpRef osd_op)
263 {
264 if (osd_op.get())
265 dout(10) << "op_journal_transactions " << op << " reqid_t "
266 << (static_cast<OpRequest *>(osd_op.get()))->get_reqid() << dendl;
267 else
268 dout(10) << "op_journal_transactions " << op << dendl;
269
270 if (journal && journal->is_writeable()) {
271 journal->submit_entry(op, tbl, orig_len, onjournal, osd_op);
272 } else if (onjournal) {
273 apply_manager.add_waiter(op, onjournal);
274 }
275 }