]>
git.proxmox.com Git - ceph.git/blob - ceph/src/os/filestore/JournalingObjectStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 #include "JournalingObjectStore.h"
5 #include "common/errno.h"
6 #include "common/debug.h"
8 #define dout_context cct
9 #define dout_subsys ceph_subsys_journal
11 #define dout_prefix *_dout << "journal "
15 void JournalingObjectStore::journal_start()
17 dout(10) << "journal_start" << dendl
;
21 void JournalingObjectStore::journal_stop()
23 dout(10) << "journal_stop" << dendl
;
24 finisher
.wait_for_empty();
28 // A journal_replay() makes journal writeable, this closes that out.
29 void JournalingObjectStore::journal_write_close()
36 apply_manager
.reset();
39 int JournalingObjectStore::journal_replay(uint64_t fs_op_seq
)
41 dout(10) << "journal_replay fs op_seq " << fs_op_seq
<< dendl
;
43 if (cct
->_conf
->journal_replay_from
) {
44 dout(0) << "journal_replay forcing replay from "
45 << cct
->_conf
->journal_replay_from
46 << " instead of " << fs_op_seq
<< dendl
;
47 // the previous op is the last one committed
48 fs_op_seq
= cct
->_conf
->journal_replay_from
- 1;
51 uint64_t op_seq
= fs_op_seq
;
52 apply_manager
.init_seq(fs_op_seq
);
55 submit_manager
.set_op_seq(op_seq
);
59 int err
= journal
->open(op_seq
);
61 dout(3) << "journal_replay open failed with "
62 << cpp_strerror(err
) << dendl
;
73 uint64_t seq
= op_seq
+ 1;
74 if (!journal
->read_entry(bl
, seq
)) {
75 dout(3) << "journal_replay: end of journal, done." << dendl
;
80 dout(3) << "journal_replay: skipping old op seq " << seq
<< " <= " << op_seq
<< dendl
;
83 ceph_assert(op_seq
== seq
-1);
85 dout(3) << "journal_replay: applying op seq " << seq
<< dendl
;
87 vector
<ObjectStore::Transaction
> tls
;
89 tls
.emplace_back(Transaction(p
));
92 apply_manager
.op_apply_start(seq
);
93 int r
= do_transactions(tls
, seq
);
94 apply_manager
.op_apply_finish(seq
);
99 dout(3) << "journal_replay: r = " << r
<< ", op_seq now " << op_seq
<< dendl
;
103 dout(3) << "journal_replay: total = " << count
<< dendl
;
107 submit_manager
.set_op_seq(op_seq
);
109 // done reading, make writeable.
110 err
= journal
->make_writeable();
115 journal
->committed_thru(fs_op_seq
);
121 // ------------------------------------
123 uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op
)
125 std::unique_lock l
{apply_lock
};
126 blocked_cond
.wait(l
, [this] {
128 dout(10) << "op_apply_start blocked, waiting" << dendl
;
132 dout(10) << "op_apply_start " << op
<< " open_ops " << open_ops
<< " -> "
133 << (open_ops
+1) << dendl
;
134 ceph_assert(!blocked
);
135 ceph_assert(op
> committed_seq
);
140 void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op
)
142 std::lock_guard l
{apply_lock
};
143 dout(10) << "op_apply_finish " << op
<< " open_ops " << open_ops
<< " -> "
144 << (open_ops
-1) << ", max_applied_seq " << max_applied_seq
<< " -> "
145 << std::max(op
, max_applied_seq
) << dendl
;
147 ceph_assert(open_ops
>= 0);
149 // signal a blocked commit_start
151 blocked_cond
.notify_all();
154 // there can be multiple applies in flight; track the max value we
155 // note. note that we can't _read_ this value and learn anything
156 // meaningful unless/until we've quiesced all in-flight applies.
157 if (op
> max_applied_seq
)
158 max_applied_seq
= op
;
161 uint64_t JournalingObjectStore::SubmitManager::op_submit_start()
164 uint64_t op
= ++op_seq
;
165 dout(10) << "op_submit_start " << op
<< dendl
;
169 void JournalingObjectStore::SubmitManager::op_submit_finish(uint64_t op
)
171 dout(10) << "op_submit_finish " << op
<< dendl
;
172 if (op
!= op_submitted
+ 1) {
173 dout(0) << "op_submit_finish " << op
<< " expected " << (op_submitted
+ 1)
174 << ", OUT OF ORDER" << dendl
;
175 ceph_abort_msg("out of order op_submit_finish");
182 // ------------------------------------------
184 void JournalingObjectStore::ApplyManager::add_waiter(uint64_t op
, Context
*c
)
186 std::lock_guard l
{com_lock
};
188 commit_waiters
[op
].push_back(c
);
191 bool JournalingObjectStore::ApplyManager::commit_start()
196 std::unique_lock l
{apply_lock
};
197 dout(10) << "commit_start max_applied_seq " << max_applied_seq
198 << ", open_ops " << open_ops
<< dendl
;
200 blocked_cond
.wait(l
, [this] {
202 dout(10) << "commit_start waiting for " << open_ops
203 << " open ops to drain" << dendl
;
205 return open_ops
== 0;
207 ceph_assert(open_ops
== 0);
208 dout(10) << "commit_start blocked, all open_ops have completed" << dendl
;
210 std::lock_guard l
{com_lock
};
211 if (max_applied_seq
== committed_seq
) {
212 dout(10) << "commit_start nothing to do" << dendl
;
214 ceph_assert(commit_waiters
.empty());
218 committing_seq
= max_applied_seq
;
220 dout(10) << "commit_start committing " << committing_seq
221 << ", still blocked" << dendl
;
227 journal
->commit_start(committing_seq
); // tell the journal too
232 void JournalingObjectStore::ApplyManager::commit_started()
234 std::lock_guard l
{apply_lock
};
235 // allow new ops. (underlying fs should now be committing all prior ops)
236 dout(10) << "commit_started committing " << committing_seq
<< ", unblocking"
239 blocked_cond
.notify_all();
242 void JournalingObjectStore::ApplyManager::commit_finish()
244 std::lock_guard l
{com_lock
};
245 dout(10) << "commit_finish thru " << committing_seq
<< dendl
;
248 journal
->committed_thru(committing_seq
);
250 committed_seq
= committing_seq
;
252 map
<version_t
, vector
<Context
*> >::iterator p
= commit_waiters
.begin();
253 while (p
!= commit_waiters
.end() &&
254 p
->first
<= committing_seq
) {
255 finisher
.queue(p
->second
);
256 commit_waiters
.erase(p
++);
260 void JournalingObjectStore::_op_journal_transactions(
261 bufferlist
& tbl
, uint32_t orig_len
, uint64_t op
,
262 Context
*onjournal
, TrackedOpRef osd_op
)
265 dout(10) << "op_journal_transactions " << op
<< " reqid_t "
266 << (static_cast<OpRequest
*>(osd_op
.get()))->get_reqid() << dendl
;
268 dout(10) << "op_journal_transactions " << op
<< dendl
;
270 if (journal
&& journal
->is_writeable()) {
271 journal
->submit_entry(op
, tbl
, orig_len
, onjournal
, osd_op
);
272 } else if (onjournal
) {
273 apply_manager
.add_waiter(op
, onjournal
);