]> git.proxmox.com Git - ceph.git/blame - ceph/src/journal/ObjectRecorder.h
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / journal / ObjectRecorder.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#ifndef CEPH_JOURNAL_OBJECT_RECORDER_H
5#define CEPH_JOURNAL_OBJECT_RECORDER_H
6
494da23a 7#include "include/utime.h"
7c673cae
FG
8#include "include/Context.h"
9#include "include/rados/librados.hpp"
10#include "common/Cond.h"
11#include "common/Mutex.h"
12#include "common/RefCountedObj.h"
13#include "common/WorkQueue.h"
14#include "journal/FutureImpl.h"
15#include <list>
16#include <map>
17#include <set>
18#include <boost/intrusive_ptr.hpp>
19#include <boost/noncopyable.hpp>
11fdf7f2 20#include "include/ceph_assert.h"
7c673cae
FG
21
22class SafeTimer;
23
24namespace journal {
25
26class ObjectRecorder;
27typedef boost::intrusive_ptr<ObjectRecorder> ObjectRecorderPtr;
28
29typedef std::pair<FutureImplPtr, bufferlist> AppendBuffer;
30typedef std::list<AppendBuffer> AppendBuffers;
31
32class ObjectRecorder : public RefCountedObject, boost::noncopyable {
33public:
34 struct Handler {
35 virtual ~Handler() {
36 }
37 virtual void closed(ObjectRecorder *object_recorder) = 0;
38 virtual void overflow(ObjectRecorder *object_recorder) = 0;
39 };
40
41 ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
42 uint64_t object_number, std::shared_ptr<Mutex> lock,
494da23a
TL
43 ContextWQ *work_queue, Handler *handler, uint8_t order,
44 int32_t max_in_flight_appends);
7c673cae
FG
45 ~ObjectRecorder() override;
46
494da23a
TL
47 void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
48 double flush_age);
49
7c673cae
FG
50 inline uint64_t get_object_number() const {
51 return m_object_number;
52 }
53 inline const std::string &get_oid() const {
54 return m_oid;
55 }
56
494da23a 57 bool append(AppendBuffers &&append_buffers);
7c673cae
FG
58 void flush(Context *on_safe);
59 void flush(const FutureImplPtr &future);
60
61 void claim_append_buffers(AppendBuffers *append_buffers);
62
63 bool is_closed() const {
11fdf7f2 64 ceph_assert(m_lock->is_locked());
7c673cae
FG
65 return (m_object_closed && m_in_flight_appends.empty());
66 }
67 bool close();
68
69 inline CephContext *cct() const {
70 return m_cct;
71 }
72
73 inline size_t get_pending_appends() const {
74 Mutex::Locker locker(*m_lock);
494da23a 75 return m_pending_buffers.size();
7c673cae
FG
76 }
77
78private:
79 typedef std::set<uint64_t> InFlightTids;
80 typedef std::map<uint64_t, AppendBuffers> InFlightAppends;
81
82 struct FlushHandler : public FutureImpl::FlushHandler {
83 ObjectRecorder *object_recorder;
84 FlushHandler(ObjectRecorder *o) : object_recorder(o) {}
85 void get() override {
86 object_recorder->get();
87 }
88 void put() override {
89 object_recorder->put();
90 }
91 void flush(const FutureImplPtr &future) override {
7c673cae
FG
92 object_recorder->flush(future);
93 }
94 };
7c673cae
FG
95 struct C_AppendFlush : public Context {
96 ObjectRecorder *object_recorder;
97 uint64_t tid;
98 C_AppendFlush(ObjectRecorder *o, uint64_t _tid)
99 : object_recorder(o), tid(_tid) {
100 object_recorder->get();
101 }
102 void finish(int r) override {
103 object_recorder->handle_append_flushed(tid, r);
104 object_recorder->put();
105 }
106 };
107
108 librados::IoCtx m_ioctx;
109 std::string m_oid;
110 uint64_t m_object_number;
111 CephContext *m_cct;
112
113 ContextWQ *m_op_work_queue;
114
7c673cae
FG
115 Handler *m_handler;
116
117 uint8_t m_order;
118 uint64_t m_soft_max_size;
119
494da23a
TL
120 uint32_t m_flush_interval = 0;
121 uint64_t m_flush_bytes = 0;
122 double m_flush_age = 0;
123 int32_t m_max_in_flight_appends;
7c673cae
FG
124
125 FlushHandler m_flush_handler;
126
7c673cae 127 mutable std::shared_ptr<Mutex> m_lock;
494da23a
TL
128 AppendBuffers m_pending_buffers;
129 uint64_t m_pending_bytes = 0;
130 utime_t m_last_flush_time;
131
7c673cae 132 uint64_t m_append_tid;
7c673cae
FG
133
134 InFlightTids m_in_flight_tids;
135 InFlightAppends m_in_flight_appends;
494da23a 136 uint64_t m_object_bytes = 0;
7c673cae
FG
137 bool m_overflowed;
138 bool m_object_closed;
139
140 bufferlist m_prefetch_bl;
141
142 bool m_in_flight_flushes;
143 Cond m_in_flight_flushes_cond;
494da23a 144 uint64_t m_in_flight_bytes = 0;
7c673cae 145
494da23a 146 bool send_appends(bool force, FutureImplPtr flush_sentinal);
7c673cae
FG
147 void handle_append_flushed(uint64_t tid, int r);
148 void append_overflowed();
7c673cae
FG
149
150 void notify_handler_unlock();
151};
152
153} // namespace journal
154
155#endif // CEPH_JOURNAL_OBJECT_RECORDER_H