1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_JOURNAL_JOURNAL_RECORDER_H
5 #define CEPH_JOURNAL_JOURNAL_RECORDER_H
7 #include "include/int_types.h"
8 #include "include/Context.h"
9 #include "include/rados/librados.hpp"
10 #include "common/Mutex.h"
11 #include "journal/Future.h"
12 #include "journal/FutureImpl.h"
13 #include "journal/JournalMetadata.h"
14 #include "journal/ObjectRecorder.h"
22 class JournalRecorder
{
24 JournalRecorder(librados::IoCtx
&ioctx
, const std::string
&object_oid_prefix
,
25 const JournalMetadataPtr
&journal_metadata
,
26 uint64_t max_in_flight_appends
);
29 void shut_down(Context
*on_safe
);
31 void set_append_batch_options(int flush_interval
, uint64_t flush_bytes
,
34 Future
append(uint64_t tag_tid
, const bufferlist
&bl
);
35 void flush(Context
*on_safe
);
37 ObjectRecorderPtr
get_object(uint8_t splay_offset
);
40 typedef std::map
<uint8_t, ObjectRecorderPtr
> ObjectRecorderPtrs
;
42 struct Listener
: public JournalMetadataListener
{
43 JournalRecorder
*journal_recorder
;
45 Listener(JournalRecorder
*_journal_recorder
)
46 : journal_recorder(_journal_recorder
) {}
48 void handle_update(JournalMetadata
*) override
{
49 journal_recorder
->handle_update();
53 struct ObjectHandler
: public ObjectRecorder::Handler
{
54 JournalRecorder
*journal_recorder
;
56 ObjectHandler(JournalRecorder
*_journal_recorder
)
57 : journal_recorder(_journal_recorder
) {
60 void closed(ObjectRecorder
*object_recorder
) override
{
61 journal_recorder
->handle_closed(object_recorder
);
63 void overflow(ObjectRecorder
*object_recorder
) override
{
64 journal_recorder
->handle_overflow(object_recorder
);
68 struct C_AdvanceObjectSet
: public Context
{
69 JournalRecorder
*journal_recorder
;
71 C_AdvanceObjectSet(JournalRecorder
*_journal_recorder
)
72 : journal_recorder(_journal_recorder
) {
74 void finish(int r
) override
{
75 journal_recorder
->handle_advance_object_set(r
);
79 librados::IoCtx m_ioctx
;
81 std::string m_object_oid_prefix
;
83 JournalMetadataPtr m_journal_metadata
;
85 uint32_t m_flush_interval
= 0;
86 uint64_t m_flush_bytes
= 0;
87 double m_flush_age
= 0;
88 uint64_t m_max_in_flight_appends
;
91 ObjectHandler m_object_handler
;
95 uint32_t m_in_flight_advance_sets
= 0;
96 uint32_t m_in_flight_object_closes
= 0;
97 uint64_t m_current_set
;
98 ObjectRecorderPtrs m_object_ptrs
;
99 std::vector
<std::shared_ptr
<Mutex
>> m_object_locks
;
101 FutureImplPtr m_prev_future
;
103 Context
*m_on_object_set_advanced
= nullptr;
105 void open_object_set();
106 bool close_object_set(uint64_t active_set
);
108 void advance_object_set();
109 void handle_advance_object_set(int r
);
111 void close_and_advance_object_set(uint64_t object_set
);
113 ObjectRecorderPtr
create_object_recorder(uint64_t object_number
,
114 std::shared_ptr
<Mutex
> lock
);
115 void create_next_object_recorder(ObjectRecorderPtr object_recorder
);
117 void handle_update();
119 void handle_closed(ObjectRecorder
*object_recorder
);
120 void handle_overflow(ObjectRecorder
*object_recorder
);
122 void lock_object_recorders() {
123 for (auto& lock
: m_object_locks
) {
128 void unlock_object_recorders() {
129 for (auto& lock
: m_object_locks
) {
135 } // namespace journal
137 #endif // CEPH_JOURNAL_JOURNAL_RECORDER_H