1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_JOURNAL_JOURNAL_RECORDER_H
5 #define CEPH_JOURNAL_JOURNAL_RECORDER_H
7 #include "include/int_types.h"
8 #include "include/Context.h"
9 #include "include/rados/librados.hpp"
10 #include "common/ceph_mutex.h"
11 #include "common/containers.h"
12 #include "common/Timer.h"
13 #include "journal/Future.h"
14 #include "journal/FutureImpl.h"
15 #include "journal/JournalMetadata.h"
16 #include "journal/ObjectRecorder.h"
22 class JournalRecorder
{
24 JournalRecorder(librados::IoCtx
&ioctx
, std::string_view object_oid_prefix
,
25 ceph::ref_t
<JournalMetadata
> journal_metadata
,
26 uint64_t max_in_flight_appends
);
29 void shut_down(Context
*on_safe
);
31 void set_append_batch_options(int flush_interval
, uint64_t flush_bytes
,
34 Future
append(uint64_t tag_tid
, const bufferlist
&bl
);
35 void flush(Context
*on_safe
);
37 ceph::ref_t
<ObjectRecorder
> get_object(uint8_t splay_offset
);
40 typedef std::map
<uint8_t, ceph::ref_t
<ObjectRecorder
>> ObjectRecorderPtrs
;
41 typedef std::vector
<std::unique_lock
<ceph::mutex
>> Lockers
;
43 struct Listener
: public JournalMetadataListener
{
44 JournalRecorder
*journal_recorder
;
46 Listener(JournalRecorder
*_journal_recorder
)
47 : journal_recorder(_journal_recorder
) {}
49 void handle_update(JournalMetadata
*) override
{
50 journal_recorder
->handle_update();
54 struct ObjectHandler
: public ObjectRecorder::Handler
{
55 JournalRecorder
*journal_recorder
;
57 ObjectHandler(JournalRecorder
*_journal_recorder
)
58 : journal_recorder(_journal_recorder
) {
61 void closed(ObjectRecorder
*object_recorder
) override
{
62 journal_recorder
->handle_closed(object_recorder
);
64 void overflow(ObjectRecorder
*object_recorder
) override
{
65 journal_recorder
->handle_overflow(object_recorder
);
69 struct C_AdvanceObjectSet
: public Context
{
70 JournalRecorder
*journal_recorder
;
72 C_AdvanceObjectSet(JournalRecorder
*_journal_recorder
)
73 : journal_recorder(_journal_recorder
) {
75 void finish(int r
) override
{
76 journal_recorder
->handle_advance_object_set(r
);
80 librados::IoCtx m_ioctx
;
81 CephContext
*m_cct
= nullptr;
82 std::string m_object_oid_prefix
;
84 ceph::ref_t
<JournalMetadata
> m_journal_metadata
;
86 uint32_t m_flush_interval
= 0;
87 uint64_t m_flush_bytes
= 0;
88 double m_flush_age
= 0;
89 uint64_t m_max_in_flight_appends
;
92 ObjectHandler m_object_handler
;
94 ceph::mutex m_lock
= ceph::make_mutex("JournalerRecorder::m_lock");
96 uint32_t m_in_flight_advance_sets
= 0;
97 uint32_t m_in_flight_object_closes
= 0;
98 uint64_t m_current_set
;
99 ObjectRecorderPtrs m_object_ptrs
;
100 ceph::containers::tiny_vector
<ceph::mutex
> m_object_locks
;
102 ceph::ref_t
<FutureImpl
> m_prev_future
;
104 Context
*m_on_object_set_advanced
= nullptr;
106 void open_object_set();
107 bool close_object_set(uint64_t active_set
);
109 void advance_object_set();
110 void handle_advance_object_set(int r
);
112 void close_and_advance_object_set(uint64_t object_set
);
114 ceph::ref_t
<ObjectRecorder
> create_object_recorder(uint64_t object_number
,
116 bool create_next_object_recorder(ceph::ref_t
<ObjectRecorder
> object_recorder
);
118 void handle_update();
120 void handle_closed(ObjectRecorder
*object_recorder
);
121 void handle_overflow(ObjectRecorder
*object_recorder
);
123 Lockers
lock_object_recorders();
126 } // namespace journal
128 #endif // CEPH_JOURNAL_JOURNAL_RECORDER_H