]> git.proxmox.com Git - ceph.git/blob - ceph/src/journal/JournalRecorder.h
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / journal / JournalRecorder.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_JOURNAL_JOURNAL_RECORDER_H
5 #define CEPH_JOURNAL_JOURNAL_RECORDER_H
6
7 #include "include/int_types.h"
8 #include "include/Context.h"
9 #include "include/rados/librados.hpp"
10 #include "common/Mutex.h"
11 #include "journal/Future.h"
12 #include "journal/FutureImpl.h"
13 #include "journal/JournalMetadata.h"
14 #include "journal/ObjectRecorder.h"
15 #include <map>
16 #include <string>
17
18 class SafeTimer;
19
20 namespace journal {
21
22 class JournalRecorder {
23 public:
24 JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
25 const JournalMetadataPtr &journal_metadata,
26 uint64_t max_in_flight_appends);
27 ~JournalRecorder();
28
29 void shut_down(Context *on_safe);
30
31 void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
32 double flush_age);
33
34 Future append(uint64_t tag_tid, const bufferlist &bl);
35 void flush(Context *on_safe);
36
37 ObjectRecorderPtr get_object(uint8_t splay_offset);
38
39 private:
40 typedef std::map<uint8_t, ObjectRecorderPtr> ObjectRecorderPtrs;
41
42 struct Listener : public JournalMetadataListener {
43 JournalRecorder *journal_recorder;
44
45 Listener(JournalRecorder *_journal_recorder)
46 : journal_recorder(_journal_recorder) {}
47
48 void handle_update(JournalMetadata *) override {
49 journal_recorder->handle_update();
50 }
51 };
52
53 struct ObjectHandler : public ObjectRecorder::Handler {
54 JournalRecorder *journal_recorder;
55
56 ObjectHandler(JournalRecorder *_journal_recorder)
57 : journal_recorder(_journal_recorder) {
58 }
59
60 void closed(ObjectRecorder *object_recorder) override {
61 journal_recorder->handle_closed(object_recorder);
62 }
63 void overflow(ObjectRecorder *object_recorder) override {
64 journal_recorder->handle_overflow(object_recorder);
65 }
66 };
67
68 struct C_AdvanceObjectSet : public Context {
69 JournalRecorder *journal_recorder;
70
71 C_AdvanceObjectSet(JournalRecorder *_journal_recorder)
72 : journal_recorder(_journal_recorder) {
73 }
74 void finish(int r) override {
75 journal_recorder->handle_advance_object_set(r);
76 }
77 };
78
79 librados::IoCtx m_ioctx;
80 CephContext *m_cct;
81 std::string m_object_oid_prefix;
82
83 JournalMetadataPtr m_journal_metadata;
84
85 uint32_t m_flush_interval = 0;
86 uint64_t m_flush_bytes = 0;
87 double m_flush_age = 0;
88 uint64_t m_max_in_flight_appends;
89
90 Listener m_listener;
91 ObjectHandler m_object_handler;
92
93 Mutex m_lock;
94
95 uint32_t m_in_flight_advance_sets = 0;
96 uint32_t m_in_flight_object_closes = 0;
97 uint64_t m_current_set;
98 ObjectRecorderPtrs m_object_ptrs;
99 std::vector<std::shared_ptr<Mutex>> m_object_locks;
100
101 FutureImplPtr m_prev_future;
102
103 Context *m_on_object_set_advanced = nullptr;
104
105 void open_object_set();
106 bool close_object_set(uint64_t active_set);
107
108 void advance_object_set();
109 void handle_advance_object_set(int r);
110
111 void close_and_advance_object_set(uint64_t object_set);
112
113 ObjectRecorderPtr create_object_recorder(uint64_t object_number,
114 std::shared_ptr<Mutex> lock);
115 void create_next_object_recorder(ObjectRecorderPtr object_recorder);
116
117 void handle_update();
118
119 void handle_closed(ObjectRecorder *object_recorder);
120 void handle_overflow(ObjectRecorder *object_recorder);
121
122 void lock_object_recorders() {
123 for (auto& lock : m_object_locks) {
124 lock->Lock();
125 }
126 }
127
128 void unlock_object_recorders() {
129 for (auto& lock : m_object_locks) {
130 lock->Unlock();
131 }
132 }
133 };
134
135 } // namespace journal
136
137 #endif // CEPH_JOURNAL_JOURNAL_RECORDER_H