]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #ifndef CEPH_JOURNAL_JOURNAL_RECORDER_H | |
5 | #define CEPH_JOURNAL_JOURNAL_RECORDER_H | |
6 | ||
7 | #include "include/int_types.h" | |
7c673cae FG |
8 | #include "include/Context.h" |
9 | #include "include/rados/librados.hpp" | |
9f95a23c TL |
10 | #include "common/ceph_mutex.h" |
11 | #include "common/containers.h" | |
7c673cae FG |
12 | #include "journal/Future.h" |
13 | #include "journal/FutureImpl.h" | |
14 | #include "journal/JournalMetadata.h" | |
15 | #include "journal/ObjectRecorder.h" | |
16 | #include <map> | |
17 | #include <string> | |
18 | ||
19 | class SafeTimer; | |
20 | ||
21 | namespace journal { | |
22 | ||
23 | class JournalRecorder { | |
24 | public: | |
9f95a23c TL |
25 | JournalRecorder(librados::IoCtx &ioctx, std::string_view object_oid_prefix, |
26 | ceph::ref_t<JournalMetadata> journal_metadata, | |
494da23a | 27 | uint64_t max_in_flight_appends); |
7c673cae FG |
28 | ~JournalRecorder(); |
29 | ||
494da23a TL |
30 | void shut_down(Context *on_safe); |
31 | ||
32 | void set_append_batch_options(int flush_interval, uint64_t flush_bytes, | |
33 | double flush_age); | |
34 | ||
7c673cae FG |
35 | Future append(uint64_t tag_tid, const bufferlist &bl); |
36 | void flush(Context *on_safe); | |
37 | ||
9f95a23c | 38 | ceph::ref_t<ObjectRecorder> get_object(uint8_t splay_offset); |
7c673cae FG |
39 | |
40 | private: | |
9f95a23c TL |
41 | typedef std::map<uint8_t, ceph::ref_t<ObjectRecorder>> ObjectRecorderPtrs; |
42 | typedef std::vector<std::unique_lock<ceph::mutex>> Lockers; | |
7c673cae FG |
43 | |
44 | struct Listener : public JournalMetadataListener { | |
45 | JournalRecorder *journal_recorder; | |
46 | ||
47 | Listener(JournalRecorder *_journal_recorder) | |
48 | : journal_recorder(_journal_recorder) {} | |
49 | ||
50 | void handle_update(JournalMetadata *) override { | |
51 | journal_recorder->handle_update(); | |
52 | } | |
53 | }; | |
54 | ||
55 | struct ObjectHandler : public ObjectRecorder::Handler { | |
56 | JournalRecorder *journal_recorder; | |
57 | ||
58 | ObjectHandler(JournalRecorder *_journal_recorder) | |
59 | : journal_recorder(_journal_recorder) { | |
60 | } | |
61 | ||
62 | void closed(ObjectRecorder *object_recorder) override { | |
63 | journal_recorder->handle_closed(object_recorder); | |
64 | } | |
65 | void overflow(ObjectRecorder *object_recorder) override { | |
66 | journal_recorder->handle_overflow(object_recorder); | |
67 | } | |
68 | }; | |
69 | ||
70 | struct C_AdvanceObjectSet : public Context { | |
71 | JournalRecorder *journal_recorder; | |
72 | ||
73 | C_AdvanceObjectSet(JournalRecorder *_journal_recorder) | |
74 | : journal_recorder(_journal_recorder) { | |
75 | } | |
76 | void finish(int r) override { | |
77 | journal_recorder->handle_advance_object_set(r); | |
78 | } | |
79 | }; | |
80 | ||
81 | librados::IoCtx m_ioctx; | |
9f95a23c | 82 | CephContext *m_cct = nullptr; |
7c673cae FG |
83 | std::string m_object_oid_prefix; |
84 | ||
9f95a23c | 85 | ceph::ref_t<JournalMetadata> m_journal_metadata; |
7c673cae | 86 | |
494da23a TL |
87 | uint32_t m_flush_interval = 0; |
88 | uint64_t m_flush_bytes = 0; | |
89 | double m_flush_age = 0; | |
11fdf7f2 | 90 | uint64_t m_max_in_flight_appends; |
7c673cae FG |
91 | |
92 | Listener m_listener; | |
93 | ObjectHandler m_object_handler; | |
94 | ||
9f95a23c | 95 | ceph::mutex m_lock = ceph::make_mutex("JournalerRecorder::m_lock"); |
7c673cae FG |
96 | |
97 | uint32_t m_in_flight_advance_sets = 0; | |
98 | uint32_t m_in_flight_object_closes = 0; | |
99 | uint64_t m_current_set; | |
100 | ObjectRecorderPtrs m_object_ptrs; | |
9f95a23c | 101 | ceph::containers::tiny_vector<ceph::mutex> m_object_locks; |
7c673cae | 102 | |
9f95a23c | 103 | ceph::ref_t<FutureImpl> m_prev_future; |
7c673cae | 104 | |
494da23a TL |
105 | Context *m_on_object_set_advanced = nullptr; |
106 | ||
7c673cae FG |
107 | void open_object_set(); |
108 | bool close_object_set(uint64_t active_set); | |
109 | ||
110 | void advance_object_set(); | |
111 | void handle_advance_object_set(int r); | |
112 | ||
113 | void close_and_advance_object_set(uint64_t object_set); | |
114 | ||
9f95a23c TL |
115 | ceph::ref_t<ObjectRecorder> create_object_recorder(uint64_t object_number, |
116 | ceph::mutex* lock); | |
117 | bool create_next_object_recorder(ceph::ref_t<ObjectRecorder> object_recorder); | |
7c673cae FG |
118 | |
119 | void handle_update(); | |
120 | ||
121 | void handle_closed(ObjectRecorder *object_recorder); | |
122 | void handle_overflow(ObjectRecorder *object_recorder); | |
123 | ||
9f95a23c | 124 | Lockers lock_object_recorders(); |
7c673cae FG |
125 | }; |
126 | ||
127 | } // namespace journal | |
128 | ||
129 | #endif // CEPH_JOURNAL_JOURNAL_RECORDER_H |