1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_JOURNAL_JOURNAL_RECORDER_H
5 #define CEPH_JOURNAL_JOURNAL_RECORDER_H
7 #include "include/int_types.h"
8 #include "include/atomic.h"
9 #include "include/Context.h"
10 #include "include/rados/librados.hpp"
11 #include "common/Mutex.h"
12 #include "journal/Future.h"
13 #include "journal/FutureImpl.h"
14 #include "journal/JournalMetadata.h"
15 #include "journal/ObjectRecorder.h"
23 class JournalRecorder
{
25 JournalRecorder(librados::IoCtx
&ioctx
, const std::string
&object_oid_prefix
,
26 const JournalMetadataPtr
&journal_metadata
,
27 uint32_t flush_interval
, uint64_t flush_bytes
,
31 Future
append(uint64_t tag_tid
, const bufferlist
&bl
);
32 void flush(Context
*on_safe
);
34 ObjectRecorderPtr
get_object(uint8_t splay_offset
);
37 typedef std::map
<uint8_t, ObjectRecorderPtr
> ObjectRecorderPtrs
;
39 struct Listener
: public JournalMetadataListener
{
40 JournalRecorder
*journal_recorder
;
42 Listener(JournalRecorder
*_journal_recorder
)
43 : journal_recorder(_journal_recorder
) {}
45 void handle_update(JournalMetadata
*) override
{
46 journal_recorder
->handle_update();
50 struct ObjectHandler
: public ObjectRecorder::Handler
{
51 JournalRecorder
*journal_recorder
;
53 ObjectHandler(JournalRecorder
*_journal_recorder
)
54 : journal_recorder(_journal_recorder
) {
57 void closed(ObjectRecorder
*object_recorder
) override
{
58 journal_recorder
->handle_closed(object_recorder
);
60 void overflow(ObjectRecorder
*object_recorder
) override
{
61 journal_recorder
->handle_overflow(object_recorder
);
65 struct C_AdvanceObjectSet
: public Context
{
66 JournalRecorder
*journal_recorder
;
68 C_AdvanceObjectSet(JournalRecorder
*_journal_recorder
)
69 : journal_recorder(_journal_recorder
) {
71 void finish(int r
) override
{
72 journal_recorder
->handle_advance_object_set(r
);
76 librados::IoCtx m_ioctx
;
78 std::string m_object_oid_prefix
;
80 JournalMetadataPtr m_journal_metadata
;
82 uint32_t m_flush_interval
;
83 uint64_t m_flush_bytes
;
87 ObjectHandler m_object_handler
;
91 uint32_t m_in_flight_advance_sets
= 0;
92 uint32_t m_in_flight_object_closes
= 0;
93 uint64_t m_current_set
;
94 ObjectRecorderPtrs m_object_ptrs
;
95 std::vector
<std::shared_ptr
<Mutex
>> m_object_locks
;
97 FutureImplPtr m_prev_future
;
99 void open_object_set();
100 bool close_object_set(uint64_t active_set
);
102 void advance_object_set();
103 void handle_advance_object_set(int r
);
105 void close_and_advance_object_set(uint64_t object_set
);
107 ObjectRecorderPtr
create_object_recorder(uint64_t object_number
,
108 std::shared_ptr
<Mutex
> lock
);
109 void create_next_object_recorder_unlock(ObjectRecorderPtr object_recorder
);
111 void handle_update();
113 void handle_closed(ObjectRecorder
*object_recorder
);
114 void handle_overflow(ObjectRecorder
*object_recorder
);
116 void lock_object_recorders() {
117 for (auto& lock
: m_object_locks
) {
122 void unlock_object_recorders() {
123 for (auto& lock
: m_object_locks
) {
129 } // namespace journal
131 #endif // CEPH_JOURNAL_JOURNAL_RECORDER_H