]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | ||
3 | #include "JournalingObjectStore.h" | |
4 | ||
5 | #include "common/errno.h" | |
6 | #include "common/debug.h" | |
7 | ||
8 | #define dout_context cct | |
9 | #define dout_subsys ceph_subsys_journal | |
10 | #undef dout_prefix | |
11 | #define dout_prefix *_dout << "journal " | |
12 | ||
13 | ||
14 | ||
15 | void JournalingObjectStore::journal_start() | |
16 | { | |
17 | dout(10) << "journal_start" << dendl; | |
18 | finisher.start(); | |
19 | } | |
20 | ||
21 | void JournalingObjectStore::journal_stop() | |
22 | { | |
23 | dout(10) << "journal_stop" << dendl; | |
24 | finisher.wait_for_empty(); | |
25 | finisher.stop(); | |
26 | } | |
27 | ||
28 | // A journal_replay() makes journal writeable, this closes that out. | |
29 | void JournalingObjectStore::journal_write_close() | |
30 | { | |
31 | if (journal) { | |
32 | journal->close(); | |
33 | delete journal; | |
34 | journal = 0; | |
35 | } | |
36 | apply_manager.reset(); | |
37 | } | |
38 | ||
39 | int JournalingObjectStore::journal_replay(uint64_t fs_op_seq) | |
40 | { | |
41 | dout(10) << "journal_replay fs op_seq " << fs_op_seq << dendl; | |
42 | ||
43 | if (cct->_conf->journal_replay_from) { | |
44 | dout(0) << "journal_replay forcing replay from " | |
45 | << cct->_conf->journal_replay_from | |
46 | << " instead of " << fs_op_seq << dendl; | |
47 | // the previous op is the last one committed | |
48 | fs_op_seq = cct->_conf->journal_replay_from - 1; | |
49 | } | |
50 | ||
51 | uint64_t op_seq = fs_op_seq; | |
52 | apply_manager.init_seq(fs_op_seq); | |
53 | ||
54 | if (!journal) { | |
55 | submit_manager.set_op_seq(op_seq); | |
56 | return 0; | |
57 | } | |
58 | ||
59 | int err = journal->open(op_seq); | |
60 | if (err < 0) { | |
61 | dout(3) << "journal_replay open failed with " | |
62 | << cpp_strerror(err) << dendl; | |
63 | delete journal; | |
64 | journal = 0; | |
65 | return err; | |
66 | } | |
67 | ||
68 | replaying = true; | |
69 | ||
70 | int count = 0; | |
71 | while (1) { | |
72 | bufferlist bl; | |
73 | uint64_t seq = op_seq + 1; | |
74 | if (!journal->read_entry(bl, seq)) { | |
75 | dout(3) << "journal_replay: end of journal, done." << dendl; | |
76 | break; | |
77 | } | |
78 | ||
79 | if (seq <= op_seq) { | |
80 | dout(3) << "journal_replay: skipping old op seq " << seq << " <= " << op_seq << dendl; | |
81 | continue; | |
82 | } | |
83 | assert(op_seq == seq-1); | |
84 | ||
85 | dout(3) << "journal_replay: applying op seq " << seq << dendl; | |
86 | bufferlist::iterator p = bl.begin(); | |
87 | vector<ObjectStore::Transaction> tls; | |
88 | while (!p.end()) { | |
89 | tls.emplace_back(Transaction(p)); | |
90 | } | |
91 | ||
92 | apply_manager.op_apply_start(seq); | |
93 | int r = do_transactions(tls, seq); | |
94 | apply_manager.op_apply_finish(seq); | |
95 | ||
96 | op_seq = seq; | |
97 | count++; | |
98 | ||
99 | dout(3) << "journal_replay: r = " << r << ", op_seq now " << op_seq << dendl; | |
100 | } | |
101 | ||
102 | if (count) | |
103 | dout(3) << "journal_replay: total = " << count << dendl; | |
104 | ||
105 | replaying = false; | |
106 | ||
107 | submit_manager.set_op_seq(op_seq); | |
108 | ||
109 | // done reading, make writeable. | |
110 | err = journal->make_writeable(); | |
111 | if (err < 0) | |
112 | return err; | |
113 | ||
224ce89b WB |
114 | if (!count) |
115 | journal->committed_thru(fs_op_seq); | |
116 | ||
7c673cae FG |
117 | return count; |
118 | } | |
119 | ||
120 | ||
121 | // ------------------------------------ | |
122 | ||
123 | uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op) | |
124 | { | |
125 | Mutex::Locker l(apply_lock); | |
126 | while (blocked) { | |
127 | dout(10) << "op_apply_start blocked, waiting" << dendl; | |
128 | blocked_cond.Wait(apply_lock); | |
129 | } | |
130 | dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " | |
131 | << (open_ops+1) << dendl; | |
132 | assert(!blocked); | |
133 | assert(op > committed_seq); | |
134 | open_ops++; | |
135 | return op; | |
136 | } | |
137 | ||
138 | void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op) | |
139 | { | |
140 | Mutex::Locker l(apply_lock); | |
141 | dout(10) << "op_apply_finish " << op << " open_ops " << open_ops << " -> " | |
142 | << (open_ops-1) << ", max_applied_seq " << max_applied_seq << " -> " | |
143 | << MAX(op, max_applied_seq) << dendl; | |
144 | --open_ops; | |
145 | assert(open_ops >= 0); | |
146 | ||
147 | // signal a blocked commit_start | |
148 | if (blocked) { | |
149 | blocked_cond.Signal(); | |
150 | } | |
151 | ||
152 | // there can be multiple applies in flight; track the max value we | |
153 | // note. note that we can't _read_ this value and learn anything | |
154 | // meaningful unless/until we've quiesced all in-flight applies. | |
155 | if (op > max_applied_seq) | |
156 | max_applied_seq = op; | |
157 | } | |
158 | ||
159 | uint64_t JournalingObjectStore::SubmitManager::op_submit_start() | |
160 | { | |
161 | lock.Lock(); | |
162 | uint64_t op = ++op_seq; | |
163 | dout(10) << "op_submit_start " << op << dendl; | |
164 | return op; | |
165 | } | |
166 | ||
167 | void JournalingObjectStore::SubmitManager::op_submit_finish(uint64_t op) | |
168 | { | |
169 | dout(10) << "op_submit_finish " << op << dendl; | |
170 | if (op != op_submitted + 1) { | |
171 | dout(0) << "op_submit_finish " << op << " expected " << (op_submitted + 1) | |
172 | << ", OUT OF ORDER" << dendl; | |
173 | assert(0 == "out of order op_submit_finish"); | |
174 | } | |
175 | op_submitted = op; | |
176 | lock.Unlock(); | |
177 | } | |
178 | ||
179 | ||
180 | // ------------------------------------------ | |
181 | ||
182 | void JournalingObjectStore::ApplyManager::add_waiter(uint64_t op, Context *c) | |
183 | { | |
184 | Mutex::Locker l(com_lock); | |
185 | assert(c); | |
186 | commit_waiters[op].push_back(c); | |
187 | } | |
188 | ||
189 | bool JournalingObjectStore::ApplyManager::commit_start() | |
190 | { | |
191 | bool ret = false; | |
192 | ||
193 | { | |
194 | Mutex::Locker l(apply_lock); | |
195 | dout(10) << "commit_start max_applied_seq " << max_applied_seq | |
196 | << ", open_ops " << open_ops << dendl; | |
197 | blocked = true; | |
198 | while (open_ops > 0) { | |
199 | dout(10) << "commit_start waiting for " << open_ops | |
200 | << " open ops to drain" << dendl; | |
201 | blocked_cond.Wait(apply_lock); | |
202 | } | |
203 | assert(open_ops == 0); | |
204 | dout(10) << "commit_start blocked, all open_ops have completed" << dendl; | |
205 | { | |
206 | Mutex::Locker l(com_lock); | |
207 | if (max_applied_seq == committed_seq) { | |
208 | dout(10) << "commit_start nothing to do" << dendl; | |
209 | blocked = false; | |
210 | assert(commit_waiters.empty()); | |
211 | goto out; | |
212 | } | |
213 | ||
214 | committing_seq = max_applied_seq; | |
215 | ||
216 | dout(10) << "commit_start committing " << committing_seq | |
217 | << ", still blocked" << dendl; | |
218 | } | |
219 | } | |
220 | ret = true; | |
221 | ||
222 | if (journal) | |
223 | journal->commit_start(committing_seq); // tell the journal too | |
224 | out: | |
225 | return ret; | |
226 | } | |
227 | ||
228 | void JournalingObjectStore::ApplyManager::commit_started() | |
229 | { | |
230 | Mutex::Locker l(apply_lock); | |
231 | // allow new ops. (underlying fs should now be committing all prior ops) | |
232 | dout(10) << "commit_started committing " << committing_seq << ", unblocking" | |
233 | << dendl; | |
234 | blocked = false; | |
235 | blocked_cond.Signal(); | |
236 | } | |
237 | ||
238 | void JournalingObjectStore::ApplyManager::commit_finish() | |
239 | { | |
240 | Mutex::Locker l(com_lock); | |
241 | dout(10) << "commit_finish thru " << committing_seq << dendl; | |
242 | ||
243 | if (journal) | |
244 | journal->committed_thru(committing_seq); | |
245 | ||
246 | committed_seq = committing_seq; | |
247 | ||
248 | map<version_t, vector<Context*> >::iterator p = commit_waiters.begin(); | |
249 | while (p != commit_waiters.end() && | |
250 | p->first <= committing_seq) { | |
251 | finisher.queue(p->second); | |
252 | commit_waiters.erase(p++); | |
253 | } | |
254 | } | |
255 | ||
256 | void JournalingObjectStore::_op_journal_transactions( | |
257 | bufferlist& tbl, uint32_t orig_len, uint64_t op, | |
258 | Context *onjournal, TrackedOpRef osd_op) | |
259 | { | |
260 | if (osd_op.get()) | |
261 | dout(10) << "op_journal_transactions " << op << " reqid_t " | |
262 | << (static_cast<OpRequest *>(osd_op.get()))->get_reqid() << dendl; | |
263 | else | |
264 | dout(10) << "op_journal_transactions " << op << dendl; | |
265 | ||
266 | if (journal && journal->is_writeable()) { | |
267 | journal->submit_entry(op, tbl, orig_len, onjournal, osd_op); | |
268 | } else if (onjournal) { | |
269 | apply_manager.add_waiter(op, onjournal); | |
270 | } | |
271 | } |