]> git.proxmox.com Git - ceph.git/blame - ceph/src/journal/ObjectRecorder.h
import ceph 16.2.7
[ceph.git] / ceph / src / journal / ObjectRecorder.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#ifndef CEPH_JOURNAL_OBJECT_RECORDER_H
5#define CEPH_JOURNAL_OBJECT_RECORDER_H
6
494da23a 7#include "include/utime.h"
7c673cae
FG
8#include "include/Context.h"
9#include "include/rados/librados.hpp"
9f95a23c 10#include "common/ceph_mutex.h"
7c673cae
FG
11#include "common/RefCountedObj.h"
12#include "common/WorkQueue.h"
a4b75251 13#include "common/Timer.h"
7c673cae
FG
14#include "journal/FutureImpl.h"
15#include <list>
16#include <map>
17#include <set>
7c673cae 18#include <boost/noncopyable.hpp>
11fdf7f2 19#include "include/ceph_assert.h"
7c673cae 20
7c673cae
FG
21namespace journal {
22
23class ObjectRecorder;
7c673cae 24
9f95a23c 25typedef std::pair<ceph::ref_t<FutureImpl>, bufferlist> AppendBuffer;
7c673cae
FG
26typedef std::list<AppendBuffer> AppendBuffers;
27
28class ObjectRecorder : public RefCountedObject, boost::noncopyable {
29public:
30 struct Handler {
31 virtual ~Handler() {
32 }
33 virtual void closed(ObjectRecorder *object_recorder) = 0;
34 virtual void overflow(ObjectRecorder *object_recorder) = 0;
35 };
36
494da23a
TL
37 void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
38 double flush_age);
39
7c673cae
FG
40 inline uint64_t get_object_number() const {
41 return m_object_number;
42 }
43 inline const std::string &get_oid() const {
44 return m_oid;
45 }
46
494da23a 47 bool append(AppendBuffers &&append_buffers);
7c673cae 48 void flush(Context *on_safe);
9f95a23c 49 void flush(const ceph::ref_t<FutureImpl> &future);
7c673cae
FG
50
51 void claim_append_buffers(AppendBuffers *append_buffers);
52
53 bool is_closed() const {
9f95a23c 54 ceph_assert(ceph_mutex_is_locked(*m_lock));
7c673cae
FG
55 return (m_object_closed && m_in_flight_appends.empty());
56 }
57 bool close();
58
59 inline CephContext *cct() const {
60 return m_cct;
61 }
62
63 inline size_t get_pending_appends() const {
9f95a23c 64 std::lock_guard locker{*m_lock};
494da23a 65 return m_pending_buffers.size();
7c673cae
FG
66 }
67
68private:
9f95a23c
TL
69 FRIEND_MAKE_REF(ObjectRecorder);
70 ObjectRecorder(librados::IoCtx &ioctx, std::string_view oid,
71 uint64_t object_number, ceph::mutex* lock,
72 ContextWQ *work_queue, Handler *handler, uint8_t order,
73 int32_t max_in_flight_appends);
74 ~ObjectRecorder() override;
75
7c673cae
FG
76 typedef std::set<uint64_t> InFlightTids;
77 typedef std::map<uint64_t, AppendBuffers> InFlightAppends;
78
79 struct FlushHandler : public FutureImpl::FlushHandler {
9f95a23c
TL
80 ceph::ref_t<ObjectRecorder> object_recorder;
81 virtual void flush(const ceph::ref_t<FutureImpl> &future) override {
7c673cae
FG
82 object_recorder->flush(future);
83 }
9f95a23c 84 FlushHandler(ceph::ref_t<ObjectRecorder> o) : object_recorder(std::move(o)) {}
7c673cae 85 };
7c673cae 86 struct C_AppendFlush : public Context {
9f95a23c 87 ceph::ref_t<ObjectRecorder> object_recorder;
7c673cae 88 uint64_t tid;
9f95a23c
TL
89 C_AppendFlush(ceph::ref_t<ObjectRecorder> o, uint64_t _tid)
90 : object_recorder(std::move(o)), tid(_tid) {
7c673cae
FG
91 }
92 void finish(int r) override {
93 object_recorder->handle_append_flushed(tid, r);
7c673cae
FG
94 }
95 };
96
97 librados::IoCtx m_ioctx;
98 std::string m_oid;
99 uint64_t m_object_number;
9f95a23c 100 CephContext *m_cct = nullptr;
7c673cae
FG
101
102 ContextWQ *m_op_work_queue;
103
7c673cae
FG
104 Handler *m_handler;
105
106 uint8_t m_order;
107 uint64_t m_soft_max_size;
108
494da23a
TL
109 uint32_t m_flush_interval = 0;
110 uint64_t m_flush_bytes = 0;
111 double m_flush_age = 0;
112 int32_t m_max_in_flight_appends;
7c673cae 113
9f95a23c 114 bool m_compat_mode;
7c673cae 115
9f95a23c
TL
116 /* So that ObjectRecorder::FlushHandler doesn't create a circular reference: */
117 std::weak_ptr<FlushHandler> m_flush_handler;
118 auto get_flush_handler() {
119 auto h = m_flush_handler.lock();
120 if (!h) {
121 h = std::make_shared<FlushHandler>(this);
122 m_flush_handler = h;
123 }
124 return h;
125 }
126
127 mutable ceph::mutex* m_lock;
494da23a
TL
128 AppendBuffers m_pending_buffers;
129 uint64_t m_pending_bytes = 0;
130 utime_t m_last_flush_time;
131
9f95a23c 132 uint64_t m_append_tid = 0;
7c673cae
FG
133
134 InFlightTids m_in_flight_tids;
135 InFlightAppends m_in_flight_appends;
494da23a 136 uint64_t m_object_bytes = 0;
9f95a23c
TL
137
138 bool m_overflowed = false;
139
140 bool m_object_closed = false;
141 bool m_object_closed_notify = false;
7c673cae
FG
142
143 bufferlist m_prefetch_bl;
144
f91f0fd5 145 uint32_t m_in_flight_callbacks = 0;
9f95a23c 146 ceph::condition_variable m_in_flight_callbacks_cond;
494da23a 147 uint64_t m_in_flight_bytes = 0;
7c673cae 148
9f95a23c 149 bool send_appends(bool force, ceph::ref_t<FutureImpl> flush_sentinal);
7c673cae
FG
150 void handle_append_flushed(uint64_t tid, int r);
151 void append_overflowed();
7c673cae 152
9f95a23c
TL
153 void wake_up_flushes();
154 void notify_handler_unlock(std::unique_lock<ceph::mutex>& locker,
155 bool notify_overflowed);
7c673cae
FG
156};
157
158} // namespace journal
159
160#endif // CEPH_JOURNAL_OBJECT_RECORDER_H