1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_JOURNAL_OBJECT_RECORDER_H
5 #define CEPH_JOURNAL_OBJECT_RECORDER_H
7 #include "include/utime.h"
8 #include "include/Context.h"
9 #include "include/rados/librados.hpp"
10 #include "common/Cond.h"
11 #include "common/Mutex.h"
12 #include "common/RefCountedObj.h"
13 #include "common/WorkQueue.h"
14 #include "journal/FutureImpl.h"
18 #include <boost/intrusive_ptr.hpp>
19 #include <boost/noncopyable.hpp>
20 #include "include/ceph_assert.h"
27 typedef boost::intrusive_ptr
<ObjectRecorder
> ObjectRecorderPtr
;
29 typedef std::pair
<FutureImplPtr
, bufferlist
> AppendBuffer
;
30 typedef std::list
<AppendBuffer
> AppendBuffers
;
32 class ObjectRecorder
: public RefCountedObject
, boost::noncopyable
{
37 virtual void closed(ObjectRecorder
*object_recorder
) = 0;
38 virtual void overflow(ObjectRecorder
*object_recorder
) = 0;
41 ObjectRecorder(librados::IoCtx
&ioctx
, const std::string
&oid
,
42 uint64_t object_number
, std::shared_ptr
<Mutex
> lock
,
43 ContextWQ
*work_queue
, Handler
*handler
, uint8_t order
,
44 int32_t max_in_flight_appends
);
45 ~ObjectRecorder() override
;
47 void set_append_batch_options(int flush_interval
, uint64_t flush_bytes
,
50 inline uint64_t get_object_number() const {
51 return m_object_number
;
53 inline const std::string
&get_oid() const {
57 bool append(AppendBuffers
&&append_buffers
);
58 void flush(Context
*on_safe
);
59 void flush(const FutureImplPtr
&future
);
61 void claim_append_buffers(AppendBuffers
*append_buffers
);
63 bool is_closed() const {
64 ceph_assert(m_lock
->is_locked());
65 return (m_object_closed
&& m_in_flight_appends
.empty());
69 inline CephContext
*cct() const {
73 inline size_t get_pending_appends() const {
74 Mutex::Locker
locker(*m_lock
);
75 return m_pending_buffers
.size();
79 typedef std::set
<uint64_t> InFlightTids
;
80 typedef std::map
<uint64_t, AppendBuffers
> InFlightAppends
;
82 struct FlushHandler
: public FutureImpl::FlushHandler
{
83 ObjectRecorder
*object_recorder
;
84 FlushHandler(ObjectRecorder
*o
) : object_recorder(o
) {}
86 object_recorder
->get();
89 object_recorder
->put();
91 void flush(const FutureImplPtr
&future
) override
{
92 object_recorder
->flush(future
);
95 struct C_AppendFlush
: public Context
{
96 ObjectRecorder
*object_recorder
;
98 C_AppendFlush(ObjectRecorder
*o
, uint64_t _tid
)
99 : object_recorder(o
), tid(_tid
) {
100 object_recorder
->get();
102 void finish(int r
) override
{
103 object_recorder
->handle_append_flushed(tid
, r
);
104 object_recorder
->put();
108 librados::IoCtx m_ioctx
;
110 uint64_t m_object_number
;
113 ContextWQ
*m_op_work_queue
;
118 uint64_t m_soft_max_size
;
120 uint32_t m_flush_interval
= 0;
121 uint64_t m_flush_bytes
= 0;
122 double m_flush_age
= 0;
123 int32_t m_max_in_flight_appends
;
125 FlushHandler m_flush_handler
;
127 mutable std::shared_ptr
<Mutex
> m_lock
;
128 AppendBuffers m_pending_buffers
;
129 uint64_t m_pending_bytes
= 0;
130 utime_t m_last_flush_time
;
132 uint64_t m_append_tid
;
134 InFlightTids m_in_flight_tids
;
135 InFlightAppends m_in_flight_appends
;
136 uint64_t m_object_bytes
= 0;
138 bool m_object_closed
;
140 bufferlist m_prefetch_bl
;
142 bool m_in_flight_flushes
;
143 Cond m_in_flight_flushes_cond
;
144 uint64_t m_in_flight_bytes
= 0;
146 bool send_appends(bool force
, FutureImplPtr flush_sentinal
);
147 void handle_append_flushed(uint64_t tid
, int r
);
148 void append_overflowed();
150 void notify_handler_unlock();
153 } // namespace journal
155 #endif // CEPH_JOURNAL_OBJECT_RECORDER_H