]> git.proxmox.com Git - ceph.git/blob - ceph/src/journal/ObjectRecorder.h
1b36d246612459f18076461e1025834684695204
[ceph.git] / ceph / src / journal / ObjectRecorder.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_JOURNAL_OBJECT_RECORDER_H
5 #define CEPH_JOURNAL_OBJECT_RECORDER_H
6
7 #include "include/utime.h"
8 #include "include/Context.h"
9 #include "include/rados/librados.hpp"
10 #include "common/ceph_mutex.h"
11 #include "common/RefCountedObj.h"
12 #include "common/WorkQueue.h"
13 #include "journal/FutureImpl.h"
14 #include <list>
15 #include <map>
16 #include <set>
17 #include <boost/noncopyable.hpp>
18 #include "include/ceph_assert.h"
19
20 class SafeTimer;
21
22 namespace journal {
23
24 class ObjectRecorder;
25
26 typedef std::pair<ceph::ref_t<FutureImpl>, bufferlist> AppendBuffer;
27 typedef std::list<AppendBuffer> AppendBuffers;
28
29 class ObjectRecorder : public RefCountedObject, boost::noncopyable {
30 public:
31 struct Handler {
32 virtual ~Handler() {
33 }
34 virtual void closed(ObjectRecorder *object_recorder) = 0;
35 virtual void overflow(ObjectRecorder *object_recorder) = 0;
36 };
37
38 void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
39 double flush_age);
40
41 inline uint64_t get_object_number() const {
42 return m_object_number;
43 }
44 inline const std::string &get_oid() const {
45 return m_oid;
46 }
47
48 bool append(AppendBuffers &&append_buffers);
49 void flush(Context *on_safe);
50 void flush(const ceph::ref_t<FutureImpl> &future);
51
52 void claim_append_buffers(AppendBuffers *append_buffers);
53
54 bool is_closed() const {
55 ceph_assert(ceph_mutex_is_locked(*m_lock));
56 return (m_object_closed && m_in_flight_appends.empty());
57 }
58 bool close();
59
60 inline CephContext *cct() const {
61 return m_cct;
62 }
63
64 inline size_t get_pending_appends() const {
65 std::lock_guard locker{*m_lock};
66 return m_pending_buffers.size();
67 }
68
69 private:
70 FRIEND_MAKE_REF(ObjectRecorder);
71 ObjectRecorder(librados::IoCtx &ioctx, std::string_view oid,
72 uint64_t object_number, ceph::mutex* lock,
73 ContextWQ *work_queue, Handler *handler, uint8_t order,
74 int32_t max_in_flight_appends);
75 ~ObjectRecorder() override;
76
77 typedef std::set<uint64_t> InFlightTids;
78 typedef std::map<uint64_t, AppendBuffers> InFlightAppends;
79
80 struct FlushHandler : public FutureImpl::FlushHandler {
81 ceph::ref_t<ObjectRecorder> object_recorder;
82 virtual void flush(const ceph::ref_t<FutureImpl> &future) override {
83 object_recorder->flush(future);
84 }
85 FlushHandler(ceph::ref_t<ObjectRecorder> o) : object_recorder(std::move(o)) {}
86 };
87 struct C_AppendFlush : public Context {
88 ceph::ref_t<ObjectRecorder> object_recorder;
89 uint64_t tid;
90 C_AppendFlush(ceph::ref_t<ObjectRecorder> o, uint64_t _tid)
91 : object_recorder(std::move(o)), tid(_tid) {
92 }
93 void finish(int r) override {
94 object_recorder->handle_append_flushed(tid, r);
95 }
96 };
97
98 librados::IoCtx m_ioctx;
99 std::string m_oid;
100 uint64_t m_object_number;
101 CephContext *m_cct = nullptr;
102
103 ContextWQ *m_op_work_queue;
104
105 Handler *m_handler;
106
107 uint8_t m_order;
108 uint64_t m_soft_max_size;
109
110 uint32_t m_flush_interval = 0;
111 uint64_t m_flush_bytes = 0;
112 double m_flush_age = 0;
113 int32_t m_max_in_flight_appends;
114
115 bool m_compat_mode;
116
117 /* So that ObjectRecorder::FlushHandler doesn't create a circular reference: */
118 std::weak_ptr<FlushHandler> m_flush_handler;
119 auto get_flush_handler() {
120 auto h = m_flush_handler.lock();
121 if (!h) {
122 h = std::make_shared<FlushHandler>(this);
123 m_flush_handler = h;
124 }
125 return h;
126 }
127
128 mutable ceph::mutex* m_lock;
129 AppendBuffers m_pending_buffers;
130 uint64_t m_pending_bytes = 0;
131 utime_t m_last_flush_time;
132
133 uint64_t m_append_tid = 0;
134
135 InFlightTids m_in_flight_tids;
136 InFlightAppends m_in_flight_appends;
137 uint64_t m_object_bytes = 0;
138
139 bool m_overflowed = false;
140
141 bool m_object_closed = false;
142 bool m_object_closed_notify = false;
143
144 bufferlist m_prefetch_bl;
145
146 bool m_in_flight_callbacks = false;
147 ceph::condition_variable m_in_flight_callbacks_cond;
148 uint64_t m_in_flight_bytes = 0;
149
150 bool send_appends(bool force, ceph::ref_t<FutureImpl> flush_sentinal);
151 void handle_append_flushed(uint64_t tid, int r);
152 void append_overflowed();
153
154 void wake_up_flushes();
155 void notify_handler_unlock(std::unique_lock<ceph::mutex>& locker,
156 bool notify_overflowed);
157 };
158
159 } // namespace journal
160
161 #endif // CEPH_JOURNAL_OBJECT_RECORDER_H