]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #ifndef CEPH_JOURNAL_OBJECT_RECORDER_H | |
5 | #define CEPH_JOURNAL_OBJECT_RECORDER_H | |
6 | ||
494da23a | 7 | #include "include/utime.h" |
7c673cae FG |
8 | #include "include/Context.h" |
9 | #include "include/rados/librados.hpp" | |
9f95a23c | 10 | #include "common/ceph_mutex.h" |
7c673cae FG |
11 | #include "common/RefCountedObj.h" |
12 | #include "common/WorkQueue.h" | |
a4b75251 | 13 | #include "common/Timer.h" |
7c673cae FG |
14 | #include "journal/FutureImpl.h" |
15 | #include <list> | |
16 | #include <map> | |
17 | #include <set> | |
7c673cae | 18 | #include <boost/noncopyable.hpp> |
11fdf7f2 | 19 | #include "include/ceph_assert.h" |
7c673cae | 20 | |
7c673cae FG |
21 | namespace journal { |
22 | ||
23 | class ObjectRecorder; | |
7c673cae | 24 | |
9f95a23c | 25 | typedef std::pair<ceph::ref_t<FutureImpl>, bufferlist> AppendBuffer; |
7c673cae FG |
26 | typedef std::list<AppendBuffer> AppendBuffers; |
27 | ||
28 | class ObjectRecorder : public RefCountedObject, boost::noncopyable { | |
29 | public: | |
30 | struct Handler { | |
31 | virtual ~Handler() { | |
32 | } | |
33 | virtual void closed(ObjectRecorder *object_recorder) = 0; | |
34 | virtual void overflow(ObjectRecorder *object_recorder) = 0; | |
35 | }; | |
36 | ||
494da23a TL |
37 | void set_append_batch_options(int flush_interval, uint64_t flush_bytes, |
38 | double flush_age); | |
39 | ||
7c673cae FG |
40 | inline uint64_t get_object_number() const { |
41 | return m_object_number; | |
42 | } | |
43 | inline const std::string &get_oid() const { | |
44 | return m_oid; | |
45 | } | |
46 | ||
494da23a | 47 | bool append(AppendBuffers &&append_buffers); |
7c673cae | 48 | void flush(Context *on_safe); |
9f95a23c | 49 | void flush(const ceph::ref_t<FutureImpl> &future); |
7c673cae FG |
50 | |
51 | void claim_append_buffers(AppendBuffers *append_buffers); | |
52 | ||
53 | bool is_closed() const { | |
9f95a23c | 54 | ceph_assert(ceph_mutex_is_locked(*m_lock)); |
7c673cae FG |
55 | return (m_object_closed && m_in_flight_appends.empty()); |
56 | } | |
57 | bool close(); | |
58 | ||
59 | inline CephContext *cct() const { | |
60 | return m_cct; | |
61 | } | |
62 | ||
63 | inline size_t get_pending_appends() const { | |
9f95a23c | 64 | std::lock_guard locker{*m_lock}; |
494da23a | 65 | return m_pending_buffers.size(); |
7c673cae FG |
66 | } |
67 | ||
68 | private: | |
9f95a23c TL |
69 | FRIEND_MAKE_REF(ObjectRecorder); |
70 | ObjectRecorder(librados::IoCtx &ioctx, std::string_view oid, | |
71 | uint64_t object_number, ceph::mutex* lock, | |
72 | ContextWQ *work_queue, Handler *handler, uint8_t order, | |
73 | int32_t max_in_flight_appends); | |
74 | ~ObjectRecorder() override; | |
75 | ||
7c673cae FG |
76 | typedef std::set<uint64_t> InFlightTids; |
77 | typedef std::map<uint64_t, AppendBuffers> InFlightAppends; | |
78 | ||
79 | struct FlushHandler : public FutureImpl::FlushHandler { | |
9f95a23c TL |
80 | ceph::ref_t<ObjectRecorder> object_recorder; |
81 | virtual void flush(const ceph::ref_t<FutureImpl> &future) override { | |
7c673cae FG |
82 | object_recorder->flush(future); |
83 | } | |
9f95a23c | 84 | FlushHandler(ceph::ref_t<ObjectRecorder> o) : object_recorder(std::move(o)) {} |
7c673cae | 85 | }; |
7c673cae | 86 | struct C_AppendFlush : public Context { |
9f95a23c | 87 | ceph::ref_t<ObjectRecorder> object_recorder; |
7c673cae | 88 | uint64_t tid; |
9f95a23c TL |
89 | C_AppendFlush(ceph::ref_t<ObjectRecorder> o, uint64_t _tid) |
90 | : object_recorder(std::move(o)), tid(_tid) { | |
7c673cae FG |
91 | } |
92 | void finish(int r) override { | |
93 | object_recorder->handle_append_flushed(tid, r); | |
7c673cae FG |
94 | } |
95 | }; | |
96 | ||
97 | librados::IoCtx m_ioctx; | |
98 | std::string m_oid; | |
99 | uint64_t m_object_number; | |
9f95a23c | 100 | CephContext *m_cct = nullptr; |
7c673cae FG |
101 | |
102 | ContextWQ *m_op_work_queue; | |
103 | ||
7c673cae FG |
104 | Handler *m_handler; |
105 | ||
106 | uint8_t m_order; | |
107 | uint64_t m_soft_max_size; | |
108 | ||
494da23a TL |
109 | uint32_t m_flush_interval = 0; |
110 | uint64_t m_flush_bytes = 0; | |
111 | double m_flush_age = 0; | |
112 | int32_t m_max_in_flight_appends; | |
7c673cae | 113 | |
9f95a23c | 114 | bool m_compat_mode; |
7c673cae | 115 | |
9f95a23c TL |
116 | /* So that ObjectRecorder::FlushHandler doesn't create a circular reference: */ |
117 | std::weak_ptr<FlushHandler> m_flush_handler; | |
118 | auto get_flush_handler() { | |
119 | auto h = m_flush_handler.lock(); | |
120 | if (!h) { | |
121 | h = std::make_shared<FlushHandler>(this); | |
122 | m_flush_handler = h; | |
123 | } | |
124 | return h; | |
125 | } | |
126 | ||
127 | mutable ceph::mutex* m_lock; | |
494da23a TL |
128 | AppendBuffers m_pending_buffers; |
129 | uint64_t m_pending_bytes = 0; | |
130 | utime_t m_last_flush_time; | |
131 | ||
9f95a23c | 132 | uint64_t m_append_tid = 0; |
7c673cae FG |
133 | |
134 | InFlightTids m_in_flight_tids; | |
135 | InFlightAppends m_in_flight_appends; | |
494da23a | 136 | uint64_t m_object_bytes = 0; |
9f95a23c TL |
137 | |
138 | bool m_overflowed = false; | |
139 | ||
140 | bool m_object_closed = false; | |
141 | bool m_object_closed_notify = false; | |
7c673cae FG |
142 | |
143 | bufferlist m_prefetch_bl; | |
144 | ||
f91f0fd5 | 145 | uint32_t m_in_flight_callbacks = 0; |
9f95a23c | 146 | ceph::condition_variable m_in_flight_callbacks_cond; |
494da23a | 147 | uint64_t m_in_flight_bytes = 0; |
7c673cae | 148 | |
9f95a23c | 149 | bool send_appends(bool force, ceph::ref_t<FutureImpl> flush_sentinal); |
7c673cae FG |
150 | void handle_append_flushed(uint64_t tid, int r); |
151 | void append_overflowed(); | |
7c673cae | 152 | |
9f95a23c TL |
153 | void wake_up_flushes(); |
154 | void notify_handler_unlock(std::unique_lock<ceph::mutex>& locker, | |
155 | bool notify_overflowed); | |
7c673cae FG |
156 | }; |
157 | ||
158 | } // namespace journal | |
159 | ||
160 | #endif // CEPH_JOURNAL_OBJECT_RECORDER_H |