]> git.proxmox.com Git - ceph.git/blame - ceph/src/journal/JournalPlayer.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / journal / JournalPlayer.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#ifndef CEPH_JOURNAL_JOURNAL_PLAYER_H
5#define CEPH_JOURNAL_JOURNAL_PLAYER_H
6
7#include "include/int_types.h"
8#include "include/Context.h"
9#include "include/rados/librados.hpp"
10#include "common/AsyncOpTracker.h"
7c673cae
FG
11#include "journal/JournalMetadata.h"
12#include "journal/ObjectPlayer.h"
9f95a23c 13#include "journal/Types.h"
7c673cae
FG
14#include "cls/journal/cls_journal_types.h"
15#include <boost/none.hpp>
16#include <boost/optional.hpp>
17#include <map>
18
19class SafeTimer;
20
21namespace journal {
22
9f95a23c 23class CacheManagerHandler;
7c673cae
FG
24class Entry;
25class ReplayHandler;
26
27class JournalPlayer {
28public:
29 typedef cls::journal::ObjectPosition ObjectPosition;
30 typedef cls::journal::ObjectPositions ObjectPositions;
31 typedef cls::journal::ObjectSetPosition ObjectSetPosition;
32
9f95a23c
TL
33 JournalPlayer(librados::IoCtx &ioctx, std::string_view object_oid_prefix,
34 ceph::ref_t<JournalMetadata> journal_metadata,
35 ReplayHandler* replay_handler,
36 CacheManagerHandler *cache_manager_handler);
7c673cae
FG
37 ~JournalPlayer();
38
39 void prefetch();
40 void prefetch_and_watch(double interval);
41 void shut_down(Context *on_finish);
42
43 bool try_pop_front(Entry *entry, uint64_t *commit_tid);
44
45private:
46 typedef std::set<uint8_t> PrefetchSplayOffsets;
9f95a23c 47 typedef std::map<uint8_t, ceph::ref_t<ObjectPlayer>> SplayedObjectPlayers;
7c673cae
FG
48 typedef std::map<uint8_t, ObjectPosition> SplayedObjectPositions;
49 typedef std::set<uint64_t> ObjectNumbers;
50
51 enum State {
52 STATE_INIT,
9f95a23c 53 STATE_WAITCACHE,
7c673cae
FG
54 STATE_PREFETCH,
55 STATE_PLAYBACK,
56 STATE_ERROR
57 };
58
59 enum WatchStep {
60 WATCH_STEP_FETCH_CURRENT,
61 WATCH_STEP_FETCH_FIRST,
62 WATCH_STEP_ASSERT_ACTIVE
63 };
64
65 struct C_Fetch : public Context {
66 JournalPlayer *player;
67 uint64_t object_num;
68 C_Fetch(JournalPlayer *p, uint64_t o) : player(p), object_num(o) {
69 player->m_async_op_tracker.start_op();
70 }
71 ~C_Fetch() override {
72 player->m_async_op_tracker.finish_op();
73 }
74 void finish(int r) override {
75 player->handle_fetched(object_num, r);
76 }
77 };
78
79 struct C_Watch : public Context {
80 JournalPlayer *player;
81 uint64_t object_num;
82 C_Watch(JournalPlayer *player, uint64_t object_num)
83 : player(player), object_num(object_num) {
84 player->m_async_op_tracker.start_op();
85 }
86 ~C_Watch() override {
87 player->m_async_op_tracker.finish_op();
88 }
89
90 void finish(int r) override {
91 player->handle_watch(object_num, r);
92 }
93 };
94
9f95a23c
TL
95 struct CacheRebalanceHandler : public journal::CacheRebalanceHandler {
96 JournalPlayer *player;
97
98 CacheRebalanceHandler(JournalPlayer *player) : player(player) {
99 }
100
101 void handle_cache_rebalanced(uint64_t new_cache_bytes) override {
102 player->handle_cache_rebalanced(new_cache_bytes);
103 }
104 };
105
7c673cae 106 librados::IoCtx m_ioctx;
9f95a23c 107 CephContext *m_cct = nullptr;
7c673cae 108 std::string m_object_oid_prefix;
9f95a23c
TL
109 ceph::ref_t<JournalMetadata> m_journal_metadata;
110 ReplayHandler* m_replay_handler;
111 CacheManagerHandler *m_cache_manager_handler;
7c673cae 112
9f95a23c
TL
113 std::string m_cache_name;
114 CacheRebalanceHandler m_cache_rebalance_handler;
115 uint64_t m_max_fetch_bytes;
7c673cae
FG
116
117 AsyncOpTracker m_async_op_tracker;
118
9f95a23c
TL
119 mutable ceph::mutex m_lock = ceph::make_mutex("JournalPlayer::m_lock");
120 State m_state = STATE_INIT;
121 uint8_t m_splay_offset = 0;
7c673cae 122
9f95a23c
TL
123 bool m_watch_enabled = false;
124 bool m_watch_scheduled = false;
125 double m_watch_interval = 0;
7c673cae
FG
126 WatchStep m_watch_step = WATCH_STEP_FETCH_CURRENT;
127 bool m_watch_prune_active_tag = false;
128
129 bool m_shut_down = false;
130 bool m_handler_notified = false;
131
132 ObjectNumbers m_fetch_object_numbers;
133
134 PrefetchSplayOffsets m_prefetch_splay_offsets;
135 SplayedObjectPlayers m_object_players;
136
137 bool m_commit_position_valid = false;
138 ObjectPosition m_commit_position;
139 SplayedObjectPositions m_commit_positions;
11fdf7f2 140 uint64_t m_active_set = 0;
7c673cae
FG
141
142 boost::optional<uint64_t> m_active_tag_tid = boost::none;
143 boost::optional<uint64_t> m_prune_tag_tid = boost::none;
144
145 void advance_splay_object();
146
147 bool is_object_set_ready() const;
148 bool verify_playback_ready();
149 void prune_tag(uint64_t tag_tid);
150 void prune_active_tag(const boost::optional<uint64_t>& tag_tid);
151
9f95a23c
TL
152 ceph::ref_t<ObjectPlayer> get_object_player() const;
153 ceph::ref_t<ObjectPlayer> get_object_player(uint64_t object_number) const;
154 bool remove_empty_object_player(const ceph::ref_t<ObjectPlayer> &object_player);
7c673cae
FG
155
156 void process_state(uint64_t object_number, int r);
157 int process_prefetch(uint64_t object_number);
158 int process_playback(uint64_t object_number);
159
160 void fetch(uint64_t object_num);
9f95a23c 161 void fetch(const ceph::ref_t<ObjectPlayer> &object_player);
7c673cae
FG
162 void handle_fetched(uint64_t object_num, int r);
163 void refetch(bool immediate);
164
165 void schedule_watch(bool immediate);
166 void handle_watch(uint64_t object_num, int r);
167 void handle_watch_assert_active(int r);
168
169 void notify_entries_available();
170 void notify_complete(int r);
9f95a23c
TL
171
172 void handle_cache_rebalanced(uint64_t new_cache_bytes);
7c673cae
FG
173};
174
175} // namespace journal
176
177#endif // CEPH_JOURNAL_JOURNAL_PLAYER_H