#ifndef CEPH_JOURNAL_OBJECT_RECORDER_H
#define CEPH_JOURNAL_OBJECT_RECORDER_H
+#include "include/utime.h"
#include "include/Context.h"
#include "include/rados/librados.hpp"
#include "common/Cond.h"
ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
uint64_t object_number, std::shared_ptr<Mutex> lock,
- ContextWQ *work_queue, SafeTimer &timer, Mutex &timer_lock,
- Handler *handler, uint8_t order, uint32_t flush_interval,
- uint64_t flush_bytes, double flush_age,
- uint64_t max_in_flight_appends);
+ ContextWQ *work_queue, Handler *handler, uint8_t order,
+ int32_t max_in_flight_appends);
~ObjectRecorder() override;
+ void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
+ double flush_age);
+
inline uint64_t get_object_number() const {
return m_object_number;
}
return m_oid;
}
- bool append_unlock(AppendBuffers &&append_buffers);
+ bool append(AppendBuffers &&append_buffers);
void flush(Context *on_safe);
void flush(const FutureImplPtr &future);
inline size_t get_pending_appends() const {
Mutex::Locker locker(*m_lock);
- return m_append_buffers.size();
+ return m_pending_buffers.size();
}
private:
object_recorder->put();
}
void flush(const FutureImplPtr &future) override {
- Mutex::Locker locker(*(object_recorder->m_lock));
object_recorder->flush(future);
}
};
ContextWQ *m_op_work_queue;
- SafeTimer &m_timer;
- Mutex &m_timer_lock;
-
Handler *m_handler;
uint8_t m_order;
uint64_t m_soft_max_size;
- uint32_t m_flush_interval;
- uint64_t m_flush_bytes;
- double m_flush_age;
- uint32_t m_max_in_flight_appends;
+ uint32_t m_flush_interval = 0;
+ uint64_t m_flush_bytes = 0;
+ double m_flush_age = 0;
+ int32_t m_max_in_flight_appends;
FlushHandler m_flush_handler;
- Context *m_append_task = nullptr;
-
mutable std::shared_ptr<Mutex> m_lock;
- AppendBuffers m_append_buffers;
+ AppendBuffers m_pending_buffers;
+ uint64_t m_pending_bytes = 0;
+ utime_t m_last_flush_time;
+
uint64_t m_append_tid;
- uint32_t m_pending_bytes;
InFlightTids m_in_flight_tids;
InFlightAppends m_in_flight_appends;
- uint64_t m_size;
+ uint64_t m_object_bytes = 0;
bool m_overflowed;
bool m_object_closed;
bool m_in_flight_flushes;
Cond m_in_flight_flushes_cond;
+ uint64_t m_in_flight_bytes = 0;
- AppendBuffers m_pending_buffers;
- uint64_t m_aio_sent_size = 0;
- bool m_aio_scheduled;
-
- void handle_append_task();
- void cancel_append_task();
- void schedule_append_task();
-
- bool append(const AppendBuffer &append_buffer, bool *schedule_append);
- bool flush_appends(bool force);
+ bool send_appends(bool force, FutureImplPtr flush_sentinal);
void handle_append_flushed(uint64_t tid, int r);
void append_overflowed();
- void send_appends(AppendBuffers *append_buffers);
- void send_appends_aio();
void notify_handler_unlock();
};