]> git.proxmox.com Git - ceph.git/blob - ceph/src/journal/JournalRecorder.h
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / journal / JournalRecorder.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_JOURNAL_JOURNAL_RECORDER_H
5 #define CEPH_JOURNAL_JOURNAL_RECORDER_H
6
7 #include "include/int_types.h"
8 #include "include/atomic.h"
9 #include "include/Context.h"
10 #include "include/rados/librados.hpp"
11 #include "common/Mutex.h"
12 #include "journal/Future.h"
13 #include "journal/FutureImpl.h"
14 #include "journal/JournalMetadata.h"
15 #include "journal/ObjectRecorder.h"
16 #include <map>
17 #include <string>
18
19 class SafeTimer;
20
21 namespace journal {
22
23 class JournalRecorder {
24 public:
25 JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
26 const JournalMetadataPtr &journal_metadata,
27 uint32_t flush_interval, uint64_t flush_bytes,
28 double flush_age);
29 ~JournalRecorder();
30
31 Future append(uint64_t tag_tid, const bufferlist &bl);
32 void flush(Context *on_safe);
33
34 ObjectRecorderPtr get_object(uint8_t splay_offset);
35
36 private:
37 typedef std::map<uint8_t, ObjectRecorderPtr> ObjectRecorderPtrs;
38
39 struct Listener : public JournalMetadataListener {
40 JournalRecorder *journal_recorder;
41
42 Listener(JournalRecorder *_journal_recorder)
43 : journal_recorder(_journal_recorder) {}
44
45 void handle_update(JournalMetadata *) override {
46 journal_recorder->handle_update();
47 }
48 };
49
50 struct ObjectHandler : public ObjectRecorder::Handler {
51 JournalRecorder *journal_recorder;
52
53 ObjectHandler(JournalRecorder *_journal_recorder)
54 : journal_recorder(_journal_recorder) {
55 }
56
57 void closed(ObjectRecorder *object_recorder) override {
58 journal_recorder->handle_closed(object_recorder);
59 }
60 void overflow(ObjectRecorder *object_recorder) override {
61 journal_recorder->handle_overflow(object_recorder);
62 }
63 };
64
65 struct C_AdvanceObjectSet : public Context {
66 JournalRecorder *journal_recorder;
67
68 C_AdvanceObjectSet(JournalRecorder *_journal_recorder)
69 : journal_recorder(_journal_recorder) {
70 }
71 void finish(int r) override {
72 journal_recorder->handle_advance_object_set(r);
73 }
74 };
75
76 librados::IoCtx m_ioctx;
77 CephContext *m_cct;
78 std::string m_object_oid_prefix;
79
80 JournalMetadataPtr m_journal_metadata;
81
82 uint32_t m_flush_interval;
83 uint64_t m_flush_bytes;
84 double m_flush_age;
85
86 Listener m_listener;
87 ObjectHandler m_object_handler;
88
89 Mutex m_lock;
90
91 uint32_t m_in_flight_advance_sets = 0;
92 uint32_t m_in_flight_object_closes = 0;
93 uint64_t m_current_set;
94 ObjectRecorderPtrs m_object_ptrs;
95 std::vector<std::shared_ptr<Mutex>> m_object_locks;
96
97 FutureImplPtr m_prev_future;
98
99 void open_object_set();
100 bool close_object_set(uint64_t active_set);
101
102 void advance_object_set();
103 void handle_advance_object_set(int r);
104
105 void close_and_advance_object_set(uint64_t object_set);
106
107 ObjectRecorderPtr create_object_recorder(uint64_t object_number,
108 std::shared_ptr<Mutex> lock);
109 void create_next_object_recorder_unlock(ObjectRecorderPtr object_recorder);
110
111 void handle_update();
112
113 void handle_closed(ObjectRecorder *object_recorder);
114 void handle_overflow(ObjectRecorder *object_recorder);
115
116 void lock_object_recorders() {
117 for (auto& lock : m_object_locks) {
118 lock->Lock();
119 }
120 }
121
122 void unlock_object_recorders() {
123 for (auto& lock : m_object_locks) {
124 lock->Unlock();
125 }
126 }
127 };
128
129 } // namespace journal
130
131 #endif // CEPH_JOURNAL_JOURNAL_RECORDER_H