]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #ifndef CEPH_JOURNAL_JOURNAL_RECORDER_H | |
5 | #define CEPH_JOURNAL_JOURNAL_RECORDER_H | |
6 | ||
7 | #include "include/int_types.h" | |
7c673cae FG |
8 | #include "include/Context.h" |
9 | #include "include/rados/librados.hpp" | |
10 | #include "common/Mutex.h" | |
11 | #include "journal/Future.h" | |
12 | #include "journal/FutureImpl.h" | |
13 | #include "journal/JournalMetadata.h" | |
14 | #include "journal/ObjectRecorder.h" | |
15 | #include <map> | |
16 | #include <string> | |
17 | ||
18 | class SafeTimer; | |
19 | ||
20 | namespace journal { | |
21 | ||
22 | class JournalRecorder { | |
23 | public: | |
24 | JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix, | |
25 | const JournalMetadataPtr &journal_metadata, | |
494da23a | 26 | uint64_t max_in_flight_appends); |
7c673cae FG |
27 | ~JournalRecorder(); |
28 | ||
494da23a TL |
29 | void shut_down(Context *on_safe); |
30 | ||
31 | void set_append_batch_options(int flush_interval, uint64_t flush_bytes, | |
32 | double flush_age); | |
33 | ||
7c673cae FG |
34 | Future append(uint64_t tag_tid, const bufferlist &bl); |
35 | void flush(Context *on_safe); | |
36 | ||
37 | ObjectRecorderPtr get_object(uint8_t splay_offset); | |
38 | ||
39 | private: | |
40 | typedef std::map<uint8_t, ObjectRecorderPtr> ObjectRecorderPtrs; | |
41 | ||
42 | struct Listener : public JournalMetadataListener { | |
43 | JournalRecorder *journal_recorder; | |
44 | ||
45 | Listener(JournalRecorder *_journal_recorder) | |
46 | : journal_recorder(_journal_recorder) {} | |
47 | ||
48 | void handle_update(JournalMetadata *) override { | |
49 | journal_recorder->handle_update(); | |
50 | } | |
51 | }; | |
52 | ||
53 | struct ObjectHandler : public ObjectRecorder::Handler { | |
54 | JournalRecorder *journal_recorder; | |
55 | ||
56 | ObjectHandler(JournalRecorder *_journal_recorder) | |
57 | : journal_recorder(_journal_recorder) { | |
58 | } | |
59 | ||
60 | void closed(ObjectRecorder *object_recorder) override { | |
61 | journal_recorder->handle_closed(object_recorder); | |
62 | } | |
63 | void overflow(ObjectRecorder *object_recorder) override { | |
64 | journal_recorder->handle_overflow(object_recorder); | |
65 | } | |
66 | }; | |
67 | ||
68 | struct C_AdvanceObjectSet : public Context { | |
69 | JournalRecorder *journal_recorder; | |
70 | ||
71 | C_AdvanceObjectSet(JournalRecorder *_journal_recorder) | |
72 | : journal_recorder(_journal_recorder) { | |
73 | } | |
74 | void finish(int r) override { | |
75 | journal_recorder->handle_advance_object_set(r); | |
76 | } | |
77 | }; | |
78 | ||
79 | librados::IoCtx m_ioctx; | |
80 | CephContext *m_cct; | |
81 | std::string m_object_oid_prefix; | |
82 | ||
83 | JournalMetadataPtr m_journal_metadata; | |
84 | ||
494da23a TL |
85 | uint32_t m_flush_interval = 0; |
86 | uint64_t m_flush_bytes = 0; | |
87 | double m_flush_age = 0; | |
11fdf7f2 | 88 | uint64_t m_max_in_flight_appends; |
7c673cae FG |
89 | |
90 | Listener m_listener; | |
91 | ObjectHandler m_object_handler; | |
92 | ||
93 | Mutex m_lock; | |
94 | ||
95 | uint32_t m_in_flight_advance_sets = 0; | |
96 | uint32_t m_in_flight_object_closes = 0; | |
97 | uint64_t m_current_set; | |
98 | ObjectRecorderPtrs m_object_ptrs; | |
99 | std::vector<std::shared_ptr<Mutex>> m_object_locks; | |
100 | ||
101 | FutureImplPtr m_prev_future; | |
102 | ||
494da23a TL |
103 | Context *m_on_object_set_advanced = nullptr; |
104 | ||
7c673cae FG |
105 | void open_object_set(); |
106 | bool close_object_set(uint64_t active_set); | |
107 | ||
108 | void advance_object_set(); | |
109 | void handle_advance_object_set(int r); | |
110 | ||
111 | void close_and_advance_object_set(uint64_t object_set); | |
112 | ||
113 | ObjectRecorderPtr create_object_recorder(uint64_t object_number, | |
114 | std::shared_ptr<Mutex> lock); | |
494da23a | 115 | void create_next_object_recorder(ObjectRecorderPtr object_recorder); |
7c673cae FG |
116 | |
117 | void handle_update(); | |
118 | ||
119 | void handle_closed(ObjectRecorder *object_recorder); | |
120 | void handle_overflow(ObjectRecorder *object_recorder); | |
121 | ||
122 | void lock_object_recorders() { | |
123 | for (auto& lock : m_object_locks) { | |
124 | lock->Lock(); | |
125 | } | |
126 | } | |
127 | ||
128 | void unlock_object_recorders() { | |
129 | for (auto& lock : m_object_locks) { | |
130 | lock->Unlock(); | |
131 | } | |
132 | } | |
133 | }; | |
134 | ||
135 | } // namespace journal | |
136 | ||
137 | #endif // CEPH_JOURNAL_JOURNAL_RECORDER_H |