1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_LIBRBD_IO_IMAGE_REQUEST_WQ_H
5 #define CEPH_LIBRBD_IO_IMAGE_REQUEST_WQ_H
7 #include "include/Context.h"
8 #include "common/ceph_mutex.h"
9 #include "common/Throttle.h"
10 #include "common/WorkQueue.h"
11 #include "librbd/io/Types.h"
12 #include "include/interval_set.h"
24 template <typename
> class ImageDispatchSpec
;
27 template <typename ImageCtxT
= librbd::ImageCtx
>
29 : public ThreadPool::PointerWQ
<ImageDispatchSpec
<ImageCtxT
> > {
31 ImageRequestWQ(ImageCtxT
*image_ctx
, const string
&name
, time_t ti
,
35 ssize_t
read(uint64_t off
, uint64_t len
, ReadResult
&&read_result
,
37 ssize_t
write(uint64_t off
, uint64_t len
, bufferlist
&&bl
, int op_flags
);
38 ssize_t
discard(uint64_t off
, uint64_t len
,
39 uint32_t discard_granularity_bytes
);
40 ssize_t
writesame(uint64_t off
, uint64_t len
, bufferlist
&&bl
, int op_flags
);
41 ssize_t
write_zeroes(uint64_t off
, uint64_t len
, int zero_flags
,
43 ssize_t
compare_and_write(uint64_t off
, uint64_t len
,
44 bufferlist
&&cmp_bl
, bufferlist
&&bl
,
45 uint64_t *mismatch_off
, int op_flags
);
48 void aio_read(AioCompletion
*c
, uint64_t off
, uint64_t len
,
49 ReadResult
&&read_result
, int op_flags
, bool native_async
=true);
50 void aio_write(AioCompletion
*c
, uint64_t off
, uint64_t len
,
51 bufferlist
&&bl
, int op_flags
, bool native_async
=true);
52 void aio_discard(AioCompletion
*c
, uint64_t off
, uint64_t len
,
53 uint32_t discard_granularity_bytes
, bool native_async
=true);
54 void aio_flush(AioCompletion
*c
, bool native_async
=true);
55 void aio_writesame(AioCompletion
*c
, uint64_t off
, uint64_t len
,
56 bufferlist
&&bl
, int op_flags
, bool native_async
=true);
57 void aio_write_zeroes(AioCompletion
*c
, uint64_t off
, uint64_t len
,
58 int zero_flags
, int op_flags
, bool native_async
);
59 void aio_compare_and_write(AioCompletion
*c
, uint64_t off
,
60 uint64_t len
, bufferlist
&&cmp_bl
,
61 bufferlist
&&bl
, uint64_t *mismatch_off
,
62 int op_flags
, bool native_async
=true);
64 using ThreadPool::PointerWQ
<ImageDispatchSpec
<ImageCtxT
> >::drain
;
65 using ThreadPool::PointerWQ
<ImageDispatchSpec
<ImageCtxT
> >::empty
;
67 void shut_down(Context
*on_shutdown
);
69 inline bool writes_blocked() const {
70 std::shared_lock locker
{m_lock
};
71 return (m_write_blockers
> 0);
75 void block_writes(Context
*on_blocked
);
76 void unblock_writes();
78 void wait_on_writes_unblocked(Context
*on_unblocked
);
80 void set_require_lock(Direction direction
, bool enabled
);
82 void apply_qos_schedule_tick_min(uint64_t tick
);
84 void apply_qos_limit(const uint64_t flag
, uint64_t limit
, uint64_t burst
);
87 void *_void_dequeue() override
;
88 void process(ImageDispatchSpec
<ImageCtxT
> *req
) override
;
89 bool _empty() override
{
90 return (ThreadPool::PointerWQ
<ImageDispatchSpec
<ImageCtxT
>>::_empty() &&
91 m_io_throttled
.load() == 0);
96 typedef std::list
<Context
*> Contexts
;
99 struct C_BlockedWrites
;
100 struct C_RefreshFinish
;
102 ImageCtxT
&m_image_ctx
;
103 mutable ceph::shared_mutex m_lock
;
104 Contexts m_write_blocker_contexts
;
105 uint32_t m_write_blockers
= 0;
106 Contexts m_unblocked_write_waiter_contexts
;
107 bool m_require_lock_on_read
= false;
108 bool m_require_lock_on_write
= false;
109 std::atomic
<unsigned> m_queued_reads
{ 0 };
110 std::atomic
<unsigned> m_queued_writes
{ 0 };
111 std::atomic
<unsigned> m_in_flight_ios
{ 0 };
112 std::atomic
<unsigned> m_in_flight_writes
{ 0 };
113 std::atomic
<unsigned> m_io_blockers
{ 0 };
114 std::atomic
<unsigned> m_io_throttled
{ 0 };
116 typedef interval_set
<uint64_t> ImageExtentIntervals
;
117 ImageExtentIntervals m_in_flight_extents
;
119 std::vector
<ImageDispatchSpec
<ImageCtxT
>*> m_blocked_ios
;
120 std::atomic
<unsigned> m_last_tid
{ 0 };
121 std::set
<uint64_t> m_queued_or_blocked_io_tids
;
122 std::map
<uint64_t, ImageDispatchSpec
<ImageCtxT
>*> m_queued_flushes
;
124 std::list
<std::pair
<uint64_t, TokenBucketThrottle
*> > m_throttles
;
125 uint64_t m_qos_enabled_flag
= 0;
127 bool m_shutdown
= false;
128 Context
*m_on_shutdown
= nullptr;
130 bool is_lock_required(bool write_op
) const;
132 inline bool require_lock_on_read() const {
133 std::shared_lock locker
{m_lock
};
134 return m_require_lock_on_read
;
136 inline bool writes_empty() const {
137 std::shared_lock locker
{m_lock
};
138 return (m_queued_writes
== 0);
141 bool needs_throttle(ImageDispatchSpec
<ImageCtxT
> *item
);
143 void finish_queued_io(bool write_op
);
144 void remove_in_flight_write_ios(uint64_t offset
, uint64_t length
,
145 bool write_op
, uint64_t tid
);
146 void finish_in_flight_write();
148 void unblock_flushes();
149 bool block_overlapping_io(ImageExtentIntervals
* in_flight_image_extents
,
150 uint64_t object_off
, uint64_t object_len
);
151 void unblock_overlapping_io(uint64_t offset
, uint64_t length
, uint64_t tid
);
152 int start_in_flight_io(AioCompletion
*c
);
153 void finish_in_flight_io();
154 void fail_in_flight_io(int r
, ImageDispatchSpec
<ImageCtxT
> *req
);
155 void process_io(ImageDispatchSpec
<ImageCtxT
> *req
, bool non_blocking_io
);
157 void queue(ImageDispatchSpec
<ImageCtxT
> *req
);
158 void queue_unblocked_io(AioCompletion
*comp
,
159 ImageDispatchSpec
<ImageCtxT
> *req
);
161 void handle_acquire_lock(int r
, ImageDispatchSpec
<ImageCtxT
> *req
);
162 void handle_refreshed(int r
, ImageDispatchSpec
<ImageCtxT
> *req
);
163 void handle_blocked_writes(int r
);
165 void handle_throttle_ready(int r
, ImageDispatchSpec
<ImageCtxT
> *item
, uint64_t flag
);
169 } // namespace librbd
171 extern template class librbd::io::ImageRequestWQ
<librbd::ImageCtx
>;
173 #endif // CEPH_LIBRBD_IO_IMAGE_REQUEST_WQ_H