]> git.proxmox.com Git - ceph.git/blob - ceph/src/journal/JournalRecorder.h
import ceph 16.2.7
[ceph.git] / ceph / src / journal / JournalRecorder.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_JOURNAL_JOURNAL_RECORDER_H
5 #define CEPH_JOURNAL_JOURNAL_RECORDER_H
6
7 #include "include/int_types.h"
8 #include "include/Context.h"
9 #include "include/rados/librados.hpp"
10 #include "common/ceph_mutex.h"
11 #include "common/containers.h"
12 #include "common/Timer.h"
13 #include "journal/Future.h"
14 #include "journal/FutureImpl.h"
15 #include "journal/JournalMetadata.h"
16 #include "journal/ObjectRecorder.h"
17 #include <map>
18 #include <string>
19
20 namespace journal {
21
22 class JournalRecorder {
23 public:
24 JournalRecorder(librados::IoCtx &ioctx, std::string_view object_oid_prefix,
25 ceph::ref_t<JournalMetadata> journal_metadata,
26 uint64_t max_in_flight_appends);
27 ~JournalRecorder();
28
29 void shut_down(Context *on_safe);
30
31 void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
32 double flush_age);
33
34 Future append(uint64_t tag_tid, const bufferlist &bl);
35 void flush(Context *on_safe);
36
37 ceph::ref_t<ObjectRecorder> get_object(uint8_t splay_offset);
38
39 private:
40 typedef std::map<uint8_t, ceph::ref_t<ObjectRecorder>> ObjectRecorderPtrs;
41 typedef std::vector<std::unique_lock<ceph::mutex>> Lockers;
42
43 struct Listener : public JournalMetadataListener {
44 JournalRecorder *journal_recorder;
45
46 Listener(JournalRecorder *_journal_recorder)
47 : journal_recorder(_journal_recorder) {}
48
49 void handle_update(JournalMetadata *) override {
50 journal_recorder->handle_update();
51 }
52 };
53
54 struct ObjectHandler : public ObjectRecorder::Handler {
55 JournalRecorder *journal_recorder;
56
57 ObjectHandler(JournalRecorder *_journal_recorder)
58 : journal_recorder(_journal_recorder) {
59 }
60
61 void closed(ObjectRecorder *object_recorder) override {
62 journal_recorder->handle_closed(object_recorder);
63 }
64 void overflow(ObjectRecorder *object_recorder) override {
65 journal_recorder->handle_overflow(object_recorder);
66 }
67 };
68
69 struct C_AdvanceObjectSet : public Context {
70 JournalRecorder *journal_recorder;
71
72 C_AdvanceObjectSet(JournalRecorder *_journal_recorder)
73 : journal_recorder(_journal_recorder) {
74 }
75 void finish(int r) override {
76 journal_recorder->handle_advance_object_set(r);
77 }
78 };
79
80 librados::IoCtx m_ioctx;
81 CephContext *m_cct = nullptr;
82 std::string m_object_oid_prefix;
83
84 ceph::ref_t<JournalMetadata> m_journal_metadata;
85
86 uint32_t m_flush_interval = 0;
87 uint64_t m_flush_bytes = 0;
88 double m_flush_age = 0;
89 uint64_t m_max_in_flight_appends;
90
91 Listener m_listener;
92 ObjectHandler m_object_handler;
93
94 ceph::mutex m_lock = ceph::make_mutex("JournalerRecorder::m_lock");
95
96 uint32_t m_in_flight_advance_sets = 0;
97 uint32_t m_in_flight_object_closes = 0;
98 uint64_t m_current_set;
99 ObjectRecorderPtrs m_object_ptrs;
100 ceph::containers::tiny_vector<ceph::mutex> m_object_locks;
101
102 ceph::ref_t<FutureImpl> m_prev_future;
103
104 Context *m_on_object_set_advanced = nullptr;
105
106 void open_object_set();
107 bool close_object_set(uint64_t active_set);
108
109 void advance_object_set();
110 void handle_advance_object_set(int r);
111
112 void close_and_advance_object_set(uint64_t object_set);
113
114 ceph::ref_t<ObjectRecorder> create_object_recorder(uint64_t object_number,
115 ceph::mutex* lock);
116 bool create_next_object_recorder(ceph::ref_t<ObjectRecorder> object_recorder);
117
118 void handle_update();
119
120 void handle_closed(ObjectRecorder *object_recorder);
121 void handle_overflow(ObjectRecorder *object_recorder);
122
123 Lockers lock_object_recorders();
124 };
125
126 } // namespace journal
127
128 #endif // CEPH_JOURNAL_JOURNAL_RECORDER_H