]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/filestore/JournalingObjectStore.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / os / filestore / JournalingObjectStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_JOURNALINGOBJECTSTORE_H
16 #define CEPH_JOURNALINGOBJECTSTORE_H
17
18 #include "os/ObjectStore.h"
19 #include "Journal.h"
20 #include "FileJournal.h"
21 #include "common/RWLock.h"
22 #include "osd/OpRequest.h"
23
24 class JournalingObjectStore : public ObjectStore {
25 protected:
26 Journal *journal;
27 Finisher finisher;
28
29
30 class SubmitManager {
31 CephContext* cct;
32 Mutex lock;
33 uint64_t op_seq;
34 uint64_t op_submitted;
35 public:
36 SubmitManager(CephContext* cct) :
37 cct(cct), lock("JOS::SubmitManager::lock", false, true, false),
38 op_seq(0), op_submitted(0)
39 {}
40 uint64_t op_submit_start();
41 void op_submit_finish(uint64_t op);
42 void set_op_seq(uint64_t seq) {
43 Mutex::Locker l(lock);
44 op_submitted = op_seq = seq;
45 }
46 uint64_t get_op_seq() {
47 return op_seq;
48 }
49 } submit_manager;
50
51 class ApplyManager {
52 CephContext* cct;
53 Journal *&journal;
54 Finisher &finisher;
55
56 Mutex apply_lock;
57 bool blocked;
58 Cond blocked_cond;
59 int open_ops;
60 uint64_t max_applied_seq;
61
62 Mutex com_lock;
63 map<version_t, vector<Context*> > commit_waiters;
64 uint64_t committing_seq, committed_seq;
65
66 public:
67 ApplyManager(CephContext* cct, Journal *&j, Finisher &f) :
68 cct(cct), journal(j), finisher(f),
69 apply_lock("JOS::ApplyManager::apply_lock", false, true, false),
70 blocked(false),
71 open_ops(0),
72 max_applied_seq(0),
73 com_lock("JOS::ApplyManager::com_lock", false, true, false),
74 committing_seq(0), committed_seq(0) {}
75 void reset() {
76 ceph_assert(open_ops == 0);
77 ceph_assert(blocked == false);
78 max_applied_seq = 0;
79 committing_seq = 0;
80 committed_seq = 0;
81 }
82 void add_waiter(uint64_t, Context*);
83 uint64_t op_apply_start(uint64_t op);
84 void op_apply_finish(uint64_t op);
85 bool commit_start();
86 void commit_started();
87 void commit_finish();
88 bool is_committing() {
89 Mutex::Locker l(com_lock);
90 return committing_seq != committed_seq;
91 }
92 uint64_t get_committed_seq() {
93 Mutex::Locker l(com_lock);
94 return committed_seq;
95 }
96 uint64_t get_committing_seq() {
97 Mutex::Locker l(com_lock);
98 return committing_seq;
99 }
100 void init_seq(uint64_t fs_op_seq) {
101 {
102 Mutex::Locker l(com_lock);
103 committed_seq = fs_op_seq;
104 committing_seq = fs_op_seq;
105 }
106 {
107 Mutex::Locker l(apply_lock);
108 max_applied_seq = fs_op_seq;
109 }
110 }
111 } apply_manager;
112
113 bool replaying;
114
115 protected:
116 void journal_start();
117 void journal_stop();
118 void journal_write_close();
119 int journal_replay(uint64_t fs_op_seq);
120
121 void _op_journal_transactions(bufferlist& tls, uint32_t orig_len, uint64_t op,
122 Context *onjournal, TrackedOpRef osd_op);
123
124 virtual int do_transactions(vector<ObjectStore::Transaction>& tls, uint64_t op_seq) = 0;
125
126 public:
127 bool is_committing() {
128 return apply_manager.is_committing();
129 }
130 uint64_t get_committed_seq() {
131 return apply_manager.get_committed_seq();
132 }
133
134 public:
135 JournalingObjectStore(CephContext* cct, const std::string& path)
136 : ObjectStore(cct, path),
137 journal(NULL),
138 finisher(cct, "JournalObjectStore", "fn_jrn_objstore"),
139 submit_manager(cct),
140 apply_manager(cct, journal, finisher),
141 replaying(false) {}
142
143 ~JournalingObjectStore() override {
144 }
145 };
146
147 #endif