]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #ifndef CEPH_JOURNAL_JOURNAL_PLAYER_H | |
5 | #define CEPH_JOURNAL_JOURNAL_PLAYER_H | |
6 | ||
7 | #include "include/int_types.h" | |
8 | #include "include/Context.h" | |
9 | #include "include/rados/librados.hpp" | |
10 | #include "common/AsyncOpTracker.h" | |
7c673cae FG |
11 | #include "journal/JournalMetadata.h" |
12 | #include "journal/ObjectPlayer.h" | |
9f95a23c | 13 | #include "journal/Types.h" |
7c673cae FG |
14 | #include "cls/journal/cls_journal_types.h" |
15 | #include <boost/none.hpp> | |
16 | #include <boost/optional.hpp> | |
17 | #include <map> | |
18 | ||
19 | class SafeTimer; | |
20 | ||
21 | namespace journal { | |
22 | ||
9f95a23c | 23 | class CacheManagerHandler; |
7c673cae FG |
24 | class Entry; |
25 | class ReplayHandler; | |
26 | ||
27 | class JournalPlayer { | |
28 | public: | |
29 | typedef cls::journal::ObjectPosition ObjectPosition; | |
30 | typedef cls::journal::ObjectPositions ObjectPositions; | |
31 | typedef cls::journal::ObjectSetPosition ObjectSetPosition; | |
32 | ||
9f95a23c TL |
33 | JournalPlayer(librados::IoCtx &ioctx, std::string_view object_oid_prefix, |
34 | ceph::ref_t<JournalMetadata> journal_metadata, | |
35 | ReplayHandler* replay_handler, | |
36 | CacheManagerHandler *cache_manager_handler); | |
7c673cae FG |
37 | ~JournalPlayer(); |
38 | ||
39 | void prefetch(); | |
40 | void prefetch_and_watch(double interval); | |
41 | void shut_down(Context *on_finish); | |
42 | ||
43 | bool try_pop_front(Entry *entry, uint64_t *commit_tid); | |
44 | ||
45 | private: | |
46 | typedef std::set<uint8_t> PrefetchSplayOffsets; | |
9f95a23c | 47 | typedef std::map<uint8_t, ceph::ref_t<ObjectPlayer>> SplayedObjectPlayers; |
7c673cae FG |
48 | typedef std::map<uint8_t, ObjectPosition> SplayedObjectPositions; |
49 | typedef std::set<uint64_t> ObjectNumbers; | |
50 | ||
51 | enum State { | |
52 | STATE_INIT, | |
9f95a23c | 53 | STATE_WAITCACHE, |
7c673cae FG |
54 | STATE_PREFETCH, |
55 | STATE_PLAYBACK, | |
56 | STATE_ERROR | |
57 | }; | |
58 | ||
59 | enum WatchStep { | |
60 | WATCH_STEP_FETCH_CURRENT, | |
61 | WATCH_STEP_FETCH_FIRST, | |
62 | WATCH_STEP_ASSERT_ACTIVE | |
63 | }; | |
64 | ||
65 | struct C_Fetch : public Context { | |
66 | JournalPlayer *player; | |
67 | uint64_t object_num; | |
68 | C_Fetch(JournalPlayer *p, uint64_t o) : player(p), object_num(o) { | |
69 | player->m_async_op_tracker.start_op(); | |
70 | } | |
71 | ~C_Fetch() override { | |
72 | player->m_async_op_tracker.finish_op(); | |
73 | } | |
74 | void finish(int r) override { | |
75 | player->handle_fetched(object_num, r); | |
76 | } | |
77 | }; | |
78 | ||
79 | struct C_Watch : public Context { | |
80 | JournalPlayer *player; | |
81 | uint64_t object_num; | |
82 | C_Watch(JournalPlayer *player, uint64_t object_num) | |
83 | : player(player), object_num(object_num) { | |
84 | player->m_async_op_tracker.start_op(); | |
85 | } | |
86 | ~C_Watch() override { | |
87 | player->m_async_op_tracker.finish_op(); | |
88 | } | |
89 | ||
90 | void finish(int r) override { | |
91 | player->handle_watch(object_num, r); | |
92 | } | |
93 | }; | |
94 | ||
9f95a23c TL |
95 | struct CacheRebalanceHandler : public journal::CacheRebalanceHandler { |
96 | JournalPlayer *player; | |
97 | ||
98 | CacheRebalanceHandler(JournalPlayer *player) : player(player) { | |
99 | } | |
100 | ||
101 | void handle_cache_rebalanced(uint64_t new_cache_bytes) override { | |
102 | player->handle_cache_rebalanced(new_cache_bytes); | |
103 | } | |
104 | }; | |
105 | ||
7c673cae | 106 | librados::IoCtx m_ioctx; |
9f95a23c | 107 | CephContext *m_cct = nullptr; |
7c673cae | 108 | std::string m_object_oid_prefix; |
9f95a23c TL |
109 | ceph::ref_t<JournalMetadata> m_journal_metadata; |
110 | ReplayHandler* m_replay_handler; | |
111 | CacheManagerHandler *m_cache_manager_handler; | |
7c673cae | 112 | |
9f95a23c TL |
113 | std::string m_cache_name; |
114 | CacheRebalanceHandler m_cache_rebalance_handler; | |
115 | uint64_t m_max_fetch_bytes; | |
7c673cae FG |
116 | |
117 | AsyncOpTracker m_async_op_tracker; | |
118 | ||
9f95a23c TL |
119 | mutable ceph::mutex m_lock = ceph::make_mutex("JournalPlayer::m_lock"); |
120 | State m_state = STATE_INIT; | |
121 | uint8_t m_splay_offset = 0; | |
7c673cae | 122 | |
9f95a23c TL |
123 | bool m_watch_enabled = false; |
124 | bool m_watch_scheduled = false; | |
125 | double m_watch_interval = 0; | |
7c673cae FG |
126 | WatchStep m_watch_step = WATCH_STEP_FETCH_CURRENT; |
127 | bool m_watch_prune_active_tag = false; | |
128 | ||
129 | bool m_shut_down = false; | |
130 | bool m_handler_notified = false; | |
131 | ||
132 | ObjectNumbers m_fetch_object_numbers; | |
133 | ||
134 | PrefetchSplayOffsets m_prefetch_splay_offsets; | |
135 | SplayedObjectPlayers m_object_players; | |
136 | ||
137 | bool m_commit_position_valid = false; | |
138 | ObjectPosition m_commit_position; | |
139 | SplayedObjectPositions m_commit_positions; | |
11fdf7f2 | 140 | uint64_t m_active_set = 0; |
7c673cae FG |
141 | |
142 | boost::optional<uint64_t> m_active_tag_tid = boost::none; | |
143 | boost::optional<uint64_t> m_prune_tag_tid = boost::none; | |
144 | ||
145 | void advance_splay_object(); | |
146 | ||
147 | bool is_object_set_ready() const; | |
148 | bool verify_playback_ready(); | |
149 | void prune_tag(uint64_t tag_tid); | |
150 | void prune_active_tag(const boost::optional<uint64_t>& tag_tid); | |
151 | ||
9f95a23c TL |
152 | ceph::ref_t<ObjectPlayer> get_object_player() const; |
153 | ceph::ref_t<ObjectPlayer> get_object_player(uint64_t object_number) const; | |
154 | bool remove_empty_object_player(const ceph::ref_t<ObjectPlayer> &object_player); | |
7c673cae FG |
155 | |
156 | void process_state(uint64_t object_number, int r); | |
157 | int process_prefetch(uint64_t object_number); | |
158 | int process_playback(uint64_t object_number); | |
159 | ||
160 | void fetch(uint64_t object_num); | |
9f95a23c | 161 | void fetch(const ceph::ref_t<ObjectPlayer> &object_player); |
7c673cae FG |
162 | void handle_fetched(uint64_t object_num, int r); |
163 | void refetch(bool immediate); | |
164 | ||
165 | void schedule_watch(bool immediate); | |
166 | void handle_watch(uint64_t object_num, int r); | |
167 | void handle_watch_assert_active(int r); | |
168 | ||
169 | void notify_entries_available(); | |
170 | void notify_complete(int r); | |
9f95a23c TL |
171 | |
172 | void handle_cache_rebalanced(uint64_t new_cache_bytes); | |
7c673cae FG |
173 | }; |
174 | ||
175 | } // namespace journal | |
176 | ||
177 | #endif // CEPH_JOURNAL_JOURNAL_PLAYER_H |