1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_JOURNAL_JOURNALER_H
5 #define CEPH_JOURNAL_JOURNALER_H
7 #include "include/int_types.h"
8 #include "include/buffer_fwd.h"
9 #include "include/Context.h"
10 #include "include/rados/librados.hpp"
11 #include "journal/Future.h"
12 #include "journal/JournalMetadataListener.h"
13 #include "cls/journal/cls_journal_types.h"
17 #include "include/ceph_assert.h"
25 class JournalMetadata
;
27 class JournalRecorder
;
36 Threads(CephContext
*cct
);
39 ThreadPool
*thread_pool
= nullptr;
40 ContextWQ
*work_queue
= nullptr;
42 SafeTimer
*timer
= nullptr;
46 typedef cls::journal::Tag Tag
;
47 typedef std::list
<cls::journal::Tag
> Tags
;
48 typedef std::set
<cls::journal::Client
> RegisteredClients
;
50 static std::string
header_oid(const std::string
&journal_id
);
51 static std::string
object_oid_prefix(int pool_id
,
52 const std::string
&journal_id
);
54 Journaler(librados::IoCtx
&header_ioctx
, const std::string
&journal_id
,
55 const std::string
&client_id
, const Settings
&settings
);
56 Journaler(ContextWQ
*work_queue
, SafeTimer
*timer
, Mutex
*timer_lock
,
57 librados::IoCtx
&header_ioctx
, const std::string
&journal_id
,
58 const std::string
&client_id
, const Settings
&settings
);
61 void exists(Context
*on_finish
) const;
62 void create(uint8_t order
, uint8_t splay_width
, int64_t pool_id
, Context
*ctx
);
63 void remove(bool force
, Context
*on_finish
);
65 void init(Context
*on_init
);
67 void shut_down(Context
*on_finish
);
69 bool is_initialized() const;
71 void get_immutable_metadata(uint8_t *order
, uint8_t *splay_width
,
72 int64_t *pool_id
, Context
*on_finish
);
73 void get_mutable_metadata(uint64_t *minimum_set
, uint64_t *active_set
,
74 RegisteredClients
*clients
, Context
*on_finish
);
76 void add_listener(JournalMetadataListener
*listener
);
77 void remove_listener(JournalMetadataListener
*listener
);
79 int register_client(const bufferlist
&data
);
80 void register_client(const bufferlist
&data
, Context
*on_finish
);
82 int unregister_client();
83 void unregister_client(Context
*on_finish
);
85 void update_client(const bufferlist
&data
, Context
*on_finish
);
86 void get_client(const std::string
&client_id
, cls::journal::Client
*client
,
88 int get_cached_client(const std::string
&client_id
,
89 cls::journal::Client
*client
);
91 void flush_commit_position(Context
*on_safe
);
93 void allocate_tag(const bufferlist
&data
, cls::journal::Tag
*tag
,
95 void allocate_tag(uint64_t tag_class
, const bufferlist
&data
,
96 cls::journal::Tag
*tag
, Context
*on_finish
);
97 void get_tag(uint64_t tag_tid
, Tag
*tag
, Context
*on_finish
);
98 void get_tags(uint64_t tag_class
, Tags
*tags
, Context
*on_finish
);
99 void get_tags(uint64_t start_after_tag_tid
, uint64_t tag_class
, Tags
*tags
,
102 void start_replay(ReplayHandler
*replay_handler
);
103 void start_live_replay(ReplayHandler
*replay_handler
, double interval
);
104 bool try_pop_front(ReplayEntry
*replay_entry
, uint64_t *tag_tid
= nullptr);
106 void stop_replay(Context
*on_finish
);
108 uint64_t get_max_append_size() const;
109 void start_append(uint64_t max_in_flight_appends
);
110 void set_append_batch_options(int flush_interval
, uint64_t flush_bytes
,
112 Future
append(uint64_t tag_tid
, const bufferlist
&bl
);
113 void flush_append(Context
*on_safe
);
114 void stop_append(Context
*on_safe
);
116 void committed(const ReplayEntry
&replay_entry
);
117 void committed(const Future
&future
);
119 void get_metadata(uint8_t *order
, uint8_t *splay_width
, int64_t *pool_id
);
122 struct C_InitJournaler
: public Context
{
123 Journaler
*journaler
;
125 C_InitJournaler(Journaler
*_journaler
, Context
*_on_safe
)
126 : journaler(_journaler
), on_safe(_on_safe
) {
128 void finish(int r
) override
{
130 r
= journaler
->init_complete();
132 on_safe
->complete(r
);
136 Threads
*m_threads
= nullptr;
138 mutable librados::IoCtx m_header_ioctx
;
139 librados::IoCtx m_data_ioctx
;
141 std::string m_client_id
;
143 std::string m_header_oid
;
144 std::string m_object_oid_prefix
;
146 bool m_initialized
= false;
147 JournalMetadata
*m_metadata
= nullptr;
148 JournalPlayer
*m_player
= nullptr;
149 JournalRecorder
*m_recorder
= nullptr;
150 JournalTrimmer
*m_trimmer
= nullptr;
152 void set_up(ContextWQ
*work_queue
, SafeTimer
*timer
, Mutex
*timer_lock
,
153 librados::IoCtx
&header_ioctx
, const std::string
&journal_id
,
154 const Settings
&settings
);
157 void create_player(ReplayHandler
*replay_handler
);
159 friend std::ostream
&operator<<(std::ostream
&os
,
160 const Journaler
&journaler
);
163 std::ostream
&operator<<(std::ostream
&os
,
164 const Journaler
&journaler
);
166 } // namespace journal
168 #endif // CEPH_JOURNAL_JOURNALER_H