]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #ifndef CEPH_JOURNAL_OBJECT_RECORDER_H | |
5 | #define CEPH_JOURNAL_OBJECT_RECORDER_H | |
6 | ||
494da23a | 7 | #include "include/utime.h" |
7c673cae FG |
8 | #include "include/Context.h" |
9 | #include "include/rados/librados.hpp" | |
10 | #include "common/Cond.h" | |
11 | #include "common/Mutex.h" | |
12 | #include "common/RefCountedObj.h" | |
13 | #include "common/WorkQueue.h" | |
14 | #include "journal/FutureImpl.h" | |
15 | #include <list> | |
16 | #include <map> | |
17 | #include <set> | |
18 | #include <boost/intrusive_ptr.hpp> | |
19 | #include <boost/noncopyable.hpp> | |
11fdf7f2 | 20 | #include "include/ceph_assert.h" |
7c673cae FG |
21 | |
22 | class SafeTimer; | |
23 | ||
24 | namespace journal { | |
25 | ||
26 | class ObjectRecorder; | |
27 | typedef boost::intrusive_ptr<ObjectRecorder> ObjectRecorderPtr; | |
28 | ||
29 | typedef std::pair<FutureImplPtr, bufferlist> AppendBuffer; | |
30 | typedef std::list<AppendBuffer> AppendBuffers; | |
31 | ||
32 | class ObjectRecorder : public RefCountedObject, boost::noncopyable { | |
33 | public: | |
34 | struct Handler { | |
35 | virtual ~Handler() { | |
36 | } | |
37 | virtual void closed(ObjectRecorder *object_recorder) = 0; | |
38 | virtual void overflow(ObjectRecorder *object_recorder) = 0; | |
39 | }; | |
40 | ||
41 | ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, | |
42 | uint64_t object_number, std::shared_ptr<Mutex> lock, | |
494da23a TL |
43 | ContextWQ *work_queue, Handler *handler, uint8_t order, |
44 | int32_t max_in_flight_appends); | |
7c673cae FG |
45 | ~ObjectRecorder() override; |
46 | ||
494da23a TL |
47 | void set_append_batch_options(int flush_interval, uint64_t flush_bytes, |
48 | double flush_age); | |
49 | ||
7c673cae FG |
50 | inline uint64_t get_object_number() const { |
51 | return m_object_number; | |
52 | } | |
53 | inline const std::string &get_oid() const { | |
54 | return m_oid; | |
55 | } | |
56 | ||
494da23a | 57 | bool append(AppendBuffers &&append_buffers); |
7c673cae FG |
58 | void flush(Context *on_safe); |
59 | void flush(const FutureImplPtr &future); | |
60 | ||
61 | void claim_append_buffers(AppendBuffers *append_buffers); | |
62 | ||
63 | bool is_closed() const { | |
11fdf7f2 | 64 | ceph_assert(m_lock->is_locked()); |
7c673cae FG |
65 | return (m_object_closed && m_in_flight_appends.empty()); |
66 | } | |
67 | bool close(); | |
68 | ||
69 | inline CephContext *cct() const { | |
70 | return m_cct; | |
71 | } | |
72 | ||
73 | inline size_t get_pending_appends() const { | |
74 | Mutex::Locker locker(*m_lock); | |
494da23a | 75 | return m_pending_buffers.size(); |
7c673cae FG |
76 | } |
77 | ||
78 | private: | |
79 | typedef std::set<uint64_t> InFlightTids; | |
80 | typedef std::map<uint64_t, AppendBuffers> InFlightAppends; | |
81 | ||
82 | struct FlushHandler : public FutureImpl::FlushHandler { | |
83 | ObjectRecorder *object_recorder; | |
84 | FlushHandler(ObjectRecorder *o) : object_recorder(o) {} | |
85 | void get() override { | |
86 | object_recorder->get(); | |
87 | } | |
88 | void put() override { | |
89 | object_recorder->put(); | |
90 | } | |
91 | void flush(const FutureImplPtr &future) override { | |
7c673cae FG |
92 | object_recorder->flush(future); |
93 | } | |
94 | }; | |
7c673cae FG |
95 | struct C_AppendFlush : public Context { |
96 | ObjectRecorder *object_recorder; | |
97 | uint64_t tid; | |
98 | C_AppendFlush(ObjectRecorder *o, uint64_t _tid) | |
99 | : object_recorder(o), tid(_tid) { | |
100 | object_recorder->get(); | |
101 | } | |
102 | void finish(int r) override { | |
103 | object_recorder->handle_append_flushed(tid, r); | |
104 | object_recorder->put(); | |
105 | } | |
106 | }; | |
107 | ||
108 | librados::IoCtx m_ioctx; | |
109 | std::string m_oid; | |
110 | uint64_t m_object_number; | |
111 | CephContext *m_cct; | |
112 | ||
113 | ContextWQ *m_op_work_queue; | |
114 | ||
7c673cae FG |
115 | Handler *m_handler; |
116 | ||
117 | uint8_t m_order; | |
118 | uint64_t m_soft_max_size; | |
119 | ||
494da23a TL |
120 | uint32_t m_flush_interval = 0; |
121 | uint64_t m_flush_bytes = 0; | |
122 | double m_flush_age = 0; | |
123 | int32_t m_max_in_flight_appends; | |
7c673cae FG |
124 | |
125 | FlushHandler m_flush_handler; | |
126 | ||
7c673cae | 127 | mutable std::shared_ptr<Mutex> m_lock; |
494da23a TL |
128 | AppendBuffers m_pending_buffers; |
129 | uint64_t m_pending_bytes = 0; | |
130 | utime_t m_last_flush_time; | |
131 | ||
7c673cae | 132 | uint64_t m_append_tid; |
7c673cae FG |
133 | |
134 | InFlightTids m_in_flight_tids; | |
135 | InFlightAppends m_in_flight_appends; | |
494da23a | 136 | uint64_t m_object_bytes = 0; |
7c673cae FG |
137 | bool m_overflowed; |
138 | bool m_object_closed; | |
139 | ||
140 | bufferlist m_prefetch_bl; | |
141 | ||
142 | bool m_in_flight_flushes; | |
143 | Cond m_in_flight_flushes_cond; | |
494da23a | 144 | uint64_t m_in_flight_bytes = 0; |
7c673cae | 145 | |
494da23a | 146 | bool send_appends(bool force, FutureImplPtr flush_sentinal); |
7c673cae FG |
147 | void handle_append_flushed(uint64_t tid, int r); |
148 | void append_overflowed(); | |
7c673cae FG |
149 | |
150 | void notify_handler_unlock(); | |
151 | }; | |
152 | ||
153 | } // namespace journal | |
154 | ||
155 | #endif // CEPH_JOURNAL_OBJECT_RECORDER_H |