]> git.proxmox.com Git - ceph.git/blob - ceph/src/journal/ObjectRecorder.h
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / journal / ObjectRecorder.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_JOURNAL_OBJECT_RECORDER_H
5 #define CEPH_JOURNAL_OBJECT_RECORDER_H
6
7 #include "include/utime.h"
8 #include "include/Context.h"
9 #include "include/rados/librados.hpp"
10 #include "common/Cond.h"
11 #include "common/Mutex.h"
12 #include "common/RefCountedObj.h"
13 #include "common/WorkQueue.h"
14 #include "journal/FutureImpl.h"
15 #include <list>
16 #include <map>
17 #include <set>
18 #include <boost/intrusive_ptr.hpp>
19 #include <boost/noncopyable.hpp>
20 #include "include/ceph_assert.h"
21
22 class SafeTimer;
23
24 namespace journal {
25
26 class ObjectRecorder;
27 typedef boost::intrusive_ptr<ObjectRecorder> ObjectRecorderPtr;
28
29 typedef std::pair<FutureImplPtr, bufferlist> AppendBuffer;
30 typedef std::list<AppendBuffer> AppendBuffers;
31
32 class ObjectRecorder : public RefCountedObject, boost::noncopyable {
33 public:
34 struct Handler {
35 virtual ~Handler() {
36 }
37 virtual void closed(ObjectRecorder *object_recorder) = 0;
38 virtual void overflow(ObjectRecorder *object_recorder) = 0;
39 };
40
41 ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
42 uint64_t object_number, std::shared_ptr<Mutex> lock,
43 ContextWQ *work_queue, Handler *handler, uint8_t order,
44 int32_t max_in_flight_appends);
45 ~ObjectRecorder() override;
46
47 void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
48 double flush_age);
49
50 inline uint64_t get_object_number() const {
51 return m_object_number;
52 }
53 inline const std::string &get_oid() const {
54 return m_oid;
55 }
56
57 bool append(AppendBuffers &&append_buffers);
58 void flush(Context *on_safe);
59 void flush(const FutureImplPtr &future);
60
61 void claim_append_buffers(AppendBuffers *append_buffers);
62
63 bool is_closed() const {
64 ceph_assert(m_lock->is_locked());
65 return (m_object_closed && m_in_flight_appends.empty());
66 }
67 bool close();
68
69 inline CephContext *cct() const {
70 return m_cct;
71 }
72
73 inline size_t get_pending_appends() const {
74 Mutex::Locker locker(*m_lock);
75 return m_pending_buffers.size();
76 }
77
78 private:
79 typedef std::set<uint64_t> InFlightTids;
80 typedef std::map<uint64_t, AppendBuffers> InFlightAppends;
81
82 struct FlushHandler : public FutureImpl::FlushHandler {
83 ObjectRecorder *object_recorder;
84 FlushHandler(ObjectRecorder *o) : object_recorder(o) {}
85 void get() override {
86 object_recorder->get();
87 }
88 void put() override {
89 object_recorder->put();
90 }
91 void flush(const FutureImplPtr &future) override {
92 object_recorder->flush(future);
93 }
94 };
95 struct C_AppendFlush : public Context {
96 ObjectRecorder *object_recorder;
97 uint64_t tid;
98 C_AppendFlush(ObjectRecorder *o, uint64_t _tid)
99 : object_recorder(o), tid(_tid) {
100 object_recorder->get();
101 }
102 void finish(int r) override {
103 object_recorder->handle_append_flushed(tid, r);
104 object_recorder->put();
105 }
106 };
107
108 librados::IoCtx m_ioctx;
109 std::string m_oid;
110 uint64_t m_object_number;
111 CephContext *m_cct;
112
113 ContextWQ *m_op_work_queue;
114
115 Handler *m_handler;
116
117 uint8_t m_order;
118 uint64_t m_soft_max_size;
119
120 uint32_t m_flush_interval = 0;
121 uint64_t m_flush_bytes = 0;
122 double m_flush_age = 0;
123 int32_t m_max_in_flight_appends;
124
125 FlushHandler m_flush_handler;
126
127 mutable std::shared_ptr<Mutex> m_lock;
128 AppendBuffers m_pending_buffers;
129 uint64_t m_pending_bytes = 0;
130 utime_t m_last_flush_time;
131
132 uint64_t m_append_tid;
133
134 InFlightTids m_in_flight_tids;
135 InFlightAppends m_in_flight_appends;
136 uint64_t m_object_bytes = 0;
137 bool m_overflowed;
138 bool m_object_closed;
139
140 bufferlist m_prefetch_bl;
141
142 bool m_in_flight_flushes;
143 Cond m_in_flight_flushes_cond;
144 uint64_t m_in_flight_bytes = 0;
145
146 bool send_appends(bool force, FutureImplPtr flush_sentinal);
147 void handle_append_flushed(uint64_t tid, int r);
148 void append_overflowed();
149
150 void notify_handler_unlock();
151 };
152
153 } // namespace journal
154
155 #endif // CEPH_JOURNAL_OBJECT_RECORDER_H