1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_JOURNAL_OBJECT_RECORDER_H
5 #define CEPH_JOURNAL_OBJECT_RECORDER_H
7 #include "include/utime.h"
8 #include "include/Context.h"
9 #include "include/rados/librados.hpp"
10 #include "common/ceph_mutex.h"
11 #include "common/RefCountedObj.h"
12 #include "common/WorkQueue.h"
13 #include "journal/FutureImpl.h"
17 #include <boost/noncopyable.hpp>
18 #include "include/ceph_assert.h"
26 typedef std::pair
<ceph::ref_t
<FutureImpl
>, bufferlist
> AppendBuffer
;
27 typedef std::list
<AppendBuffer
> AppendBuffers
;
29 class ObjectRecorder
: public RefCountedObject
, boost::noncopyable
{
34 virtual void closed(ObjectRecorder
*object_recorder
) = 0;
35 virtual void overflow(ObjectRecorder
*object_recorder
) = 0;
38 void set_append_batch_options(int flush_interval
, uint64_t flush_bytes
,
41 inline uint64_t get_object_number() const {
42 return m_object_number
;
44 inline const std::string
&get_oid() const {
48 bool append(AppendBuffers
&&append_buffers
);
49 void flush(Context
*on_safe
);
50 void flush(const ceph::ref_t
<FutureImpl
> &future
);
52 void claim_append_buffers(AppendBuffers
*append_buffers
);
54 bool is_closed() const {
55 ceph_assert(ceph_mutex_is_locked(*m_lock
));
56 return (m_object_closed
&& m_in_flight_appends
.empty());
60 inline CephContext
*cct() const {
64 inline size_t get_pending_appends() const {
65 std::lock_guard locker
{*m_lock
};
66 return m_pending_buffers
.size();
70 FRIEND_MAKE_REF(ObjectRecorder
);
71 ObjectRecorder(librados::IoCtx
&ioctx
, std::string_view oid
,
72 uint64_t object_number
, ceph::mutex
* lock
,
73 ContextWQ
*work_queue
, Handler
*handler
, uint8_t order
,
74 int32_t max_in_flight_appends
);
75 ~ObjectRecorder() override
;
77 typedef std::set
<uint64_t> InFlightTids
;
78 typedef std::map
<uint64_t, AppendBuffers
> InFlightAppends
;
80 struct FlushHandler
: public FutureImpl::FlushHandler
{
81 ceph::ref_t
<ObjectRecorder
> object_recorder
;
82 virtual void flush(const ceph::ref_t
<FutureImpl
> &future
) override
{
83 object_recorder
->flush(future
);
85 FlushHandler(ceph::ref_t
<ObjectRecorder
> o
) : object_recorder(std::move(o
)) {}
87 struct C_AppendFlush
: public Context
{
88 ceph::ref_t
<ObjectRecorder
> object_recorder
;
90 C_AppendFlush(ceph::ref_t
<ObjectRecorder
> o
, uint64_t _tid
)
91 : object_recorder(std::move(o
)), tid(_tid
) {
93 void finish(int r
) override
{
94 object_recorder
->handle_append_flushed(tid
, r
);
98 librados::IoCtx m_ioctx
;
100 uint64_t m_object_number
;
101 CephContext
*m_cct
= nullptr;
103 ContextWQ
*m_op_work_queue
;
108 uint64_t m_soft_max_size
;
110 uint32_t m_flush_interval
= 0;
111 uint64_t m_flush_bytes
= 0;
112 double m_flush_age
= 0;
113 int32_t m_max_in_flight_appends
;
117 /* So that ObjectRecorder::FlushHandler doesn't create a circular reference: */
118 std::weak_ptr
<FlushHandler
> m_flush_handler
;
119 auto get_flush_handler() {
120 auto h
= m_flush_handler
.lock();
122 h
= std::make_shared
<FlushHandler
>(this);
128 mutable ceph::mutex
* m_lock
;
129 AppendBuffers m_pending_buffers
;
130 uint64_t m_pending_bytes
= 0;
131 utime_t m_last_flush_time
;
133 uint64_t m_append_tid
= 0;
135 InFlightTids m_in_flight_tids
;
136 InFlightAppends m_in_flight_appends
;
137 uint64_t m_object_bytes
= 0;
139 bool m_overflowed
= false;
141 bool m_object_closed
= false;
142 bool m_object_closed_notify
= false;
144 bufferlist m_prefetch_bl
;
146 bool m_in_flight_callbacks
= false;
147 ceph::condition_variable m_in_flight_callbacks_cond
;
148 uint64_t m_in_flight_bytes
= 0;
150 bool send_appends(bool force
, ceph::ref_t
<FutureImpl
> flush_sentinal
);
151 void handle_append_flushed(uint64_t tid
, int r
);
152 void append_overflowed();
154 void wake_up_flushes();
155 void notify_handler_unlock(std::unique_lock
<ceph::mutex
>& locker
,
156 bool notify_overflowed
);
159 } // namespace journal
161 #endif // CEPH_JOURNAL_OBJECT_RECORDER_H