1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_JOURNALINGOBJECTSTORE_H
16 #define CEPH_JOURNALINGOBJECTSTORE_H
18 #include "os/ObjectStore.h"
20 #include "FileJournal.h"
21 #include "osd/OpRequest.h"
23 class JournalingObjectStore
: public ObjectStore
{
31 ceph::mutex lock
= ceph::make_mutex("JOS::SubmitManager::lock");
33 uint64_t op_submitted
;
35 SubmitManager(CephContext
* cct
) :
37 op_seq(0), op_submitted(0)
39 uint64_t op_submit_start();
40 void op_submit_finish(uint64_t op
);
41 void set_op_seq(uint64_t seq
) {
42 std::lock_guard l
{lock
};
43 op_submitted
= op_seq
= seq
;
45 uint64_t get_op_seq() {
55 ceph::mutex apply_lock
= ceph::make_mutex("JOS::ApplyManager::apply_lock");
57 ceph::condition_variable blocked_cond
;
59 uint64_t max_applied_seq
;
61 ceph::mutex com_lock
= ceph::make_mutex("JOS::ApplyManager::com_lock");
62 std::map
<version_t
, std::vector
<Context
*> > commit_waiters
;
63 uint64_t committing_seq
, committed_seq
;
66 ApplyManager(CephContext
* cct
, Journal
*&j
, Finisher
&f
) :
67 cct(cct
), journal(j
), finisher(f
),
71 committing_seq(0), committed_seq(0) {}
73 ceph_assert(open_ops
== 0);
74 ceph_assert(blocked
== false);
79 void add_waiter(uint64_t, Context
*);
80 uint64_t op_apply_start(uint64_t op
);
81 void op_apply_finish(uint64_t op
);
83 void commit_started();
85 bool is_committing() {
86 std::lock_guard l
{com_lock
};
87 return committing_seq
!= committed_seq
;
89 uint64_t get_committed_seq() {
90 std::lock_guard l
{com_lock
};
93 uint64_t get_committing_seq() {
94 std::lock_guard l
{com_lock
};
95 return committing_seq
;
97 void init_seq(uint64_t fs_op_seq
) {
99 std::lock_guard l
{com_lock
};
100 committed_seq
= fs_op_seq
;
101 committing_seq
= fs_op_seq
;
104 std::lock_guard l
{apply_lock
};
105 max_applied_seq
= fs_op_seq
;
113 void journal_start();
115 void journal_write_close();
116 int journal_replay(uint64_t fs_op_seq
);
118 void _op_journal_transactions(ceph::buffer::list
& tls
, uint32_t orig_len
, uint64_t op
,
119 Context
*onjournal
, TrackedOpRef osd_op
);
121 virtual int do_transactions(std::vector
<ObjectStore::Transaction
>& tls
, uint64_t op_seq
) = 0;
124 bool is_committing() {
125 return apply_manager
.is_committing();
127 uint64_t get_committed_seq() {
128 return apply_manager
.get_committed_seq();
132 JournalingObjectStore(CephContext
* cct
, const std::string
& path
)
133 : ObjectStore(cct
, path
),
135 finisher(cct
, "JournalObjectStore", "fn_jrn_objstore"),
137 apply_manager(cct
, journal
, finisher
),
140 ~JournalingObjectStore() override
{