1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_JOURNAL_OBJECT_PLAYER_H
5 #define CEPH_JOURNAL_OBJECT_PLAYER_H
7 #include "include/Context.h"
8 #include "include/interval_set.h"
9 #include "include/rados/librados.hpp"
10 #include "common/Cond.h"
11 #include "common/Mutex.h"
12 #include "common/RefCountedObj.h"
13 #include "journal/Entry.h"
16 #include <boost/intrusive_ptr.hpp>
17 #include <boost/noncopyable.hpp>
18 #include <boost/unordered_map.hpp>
19 #include "include/assert.h"
26 typedef boost::intrusive_ptr
<ObjectPlayer
> ObjectPlayerPtr
;
28 class ObjectPlayer
: public RefCountedObject
{
30 typedef std::list
<Entry
> Entries
;
31 typedef interval_set
<uint64_t> InvalidRanges
;
35 REFETCH_STATE_REQUIRED
,
36 REFETCH_STATE_IMMEDIATE
39 ObjectPlayer(librados::IoCtx
&ioctx
, const std::string
&object_oid_prefix
,
40 uint64_t object_num
, SafeTimer
&timer
, Mutex
&timer_lock
,
41 uint8_t order
, uint64_t max_fetch_bytes
);
42 ~ObjectPlayer() override
;
44 inline const std::string
&get_oid() const {
47 inline uint64_t get_object_number() const {
51 void fetch(Context
*on_finish
);
52 void watch(Context
*on_fetch
, double interval
);
55 void front(Entry
*entry
) const;
57 inline bool empty() const {
58 Mutex::Locker
locker(m_lock
);
59 return m_entries
.empty();
62 inline void get_entries(Entries
*entries
) {
63 Mutex::Locker
locker(m_lock
);
66 inline void get_invalid_ranges(InvalidRanges
*invalid_ranges
) {
67 Mutex::Locker
locker(m_lock
);
68 *invalid_ranges
= m_invalid_ranges
;
71 inline bool refetch_required() const {
72 return (get_refetch_state() != REFETCH_STATE_NONE
);
74 inline RefetchState
get_refetch_state() const {
75 return m_refetch_state
;
77 inline void set_refetch_state(RefetchState refetch_state
) {
78 m_refetch_state
= refetch_state
;
82 typedef std::pair
<uint64_t, uint64_t> EntryKey
;
83 typedef boost::unordered_map
<EntryKey
, Entries::iterator
> EntryKeys
;
85 struct C_Fetch
: public Context
{
86 ObjectPlayerPtr object_player
;
89 C_Fetch(ObjectPlayer
*o
, Context
*ctx
) : object_player(o
), on_finish(ctx
) {
91 void finish(int r
) override
;
93 struct C_WatchTask
: public Context
{
94 ObjectPlayerPtr object_player
;
95 C_WatchTask(ObjectPlayer
*o
) : object_player(o
) {
97 void finish(int r
) override
;
99 struct C_WatchFetch
: public Context
{
100 ObjectPlayerPtr object_player
;
101 C_WatchFetch(ObjectPlayer
*o
) : object_player(o
) {
103 void finish(int r
) override
;
106 librados::IoCtx m_ioctx
;
107 uint64_t m_object_num
;
115 uint64_t m_max_fetch_bytes
;
117 double m_watch_interval
;
118 Context
*m_watch_task
;
120 mutable Mutex m_lock
;
121 bool m_fetch_in_progress
;
122 bufferlist m_read_bl
;
123 uint32_t m_read_off
= 0;
124 uint32_t m_read_bl_off
= 0;
127 EntryKeys m_entry_keys
;
128 InvalidRanges m_invalid_ranges
;
130 Context
*m_watch_ctx
= nullptr;
132 bool m_unwatched
= false;
133 RefetchState m_refetch_state
= REFETCH_STATE_IMMEDIATE
;
135 int handle_fetch_complete(int r
, const bufferlist
&bl
, bool *refetch
);
137 void clear_invalid_range(uint32_t off
, uint32_t len
);
139 void schedule_watch();
141 void handle_watch_task();
142 void handle_watch_fetched(int r
);
145 } // namespace journal
147 #endif // CEPH_JOURNAL_OBJECT_PLAYER_H