]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/io/Dispatcher.h
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / librbd / io / Dispatcher.h
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#ifndef CEPH_LIBRBD_IO_DISPATCHER_H
5#define CEPH_LIBRBD_IO_DISPATCHER_H
6
7#include "include/int_types.h"
8#include "include/Context.h"
9#include "common/ceph_mutex.h"
10#include "common/dout.h"
11#include "common/AsyncOpTracker.h"
12#include "librbd/Utils.h"
13#include "librbd/io/DispatcherInterface.h"
14#include "librbd/io/Types.h"
15#include <map>
16
17#define dout_subsys ceph_subsys_rbd
18#undef dout_prefix
19#define dout_prefix *_dout << "librbd::io::Dispatcher: " << this \
20 << " " << __func__ << ": "
21
22namespace librbd {
23namespace io {
24
25template <typename ImageCtxT, typename DispatchInterfaceT>
26class Dispatcher : public DispatchInterfaceT {
27public:
28 typedef typename DispatchInterfaceT::Dispatch Dispatch;
29 typedef typename DispatchInterfaceT::DispatchLayer DispatchLayer;
30 typedef typename DispatchInterfaceT::DispatchSpec DispatchSpec;
31
32 Dispatcher(ImageCtxT* image_ctx)
33 : m_image_ctx(image_ctx),
34 m_lock(ceph::make_shared_mutex(
35 librbd::util::unique_lock_name("librbd::io::Dispatcher::lock",
36 this))) {
37 }
38
39 virtual ~Dispatcher() {
40 ceph_assert(m_dispatches.empty());
41 }
42
43 void shut_down(Context* on_finish) override {
44 auto cct = m_image_ctx->cct;
45 ldout(cct, 5) << dendl;
46
47 std::map<DispatchLayer, DispatchMeta> dispatches;
48 {
49 std::unique_lock locker{m_lock};
50 std::swap(dispatches, m_dispatches);
51 }
52
53 for (auto it : dispatches) {
54 shut_down_dispatch(it.second, &on_finish);
55 }
56 on_finish->complete(0);
57 }
58
59 void register_dispatch(Dispatch* dispatch) override {
60 auto cct = m_image_ctx->cct;
61 auto type = dispatch->get_dispatch_layer();
62 ldout(cct, 5) << "dispatch_layer=" << type << dendl;
63
64 std::unique_lock locker{m_lock};
65
66 auto result = m_dispatches.insert(
67 {type, {dispatch, new AsyncOpTracker()}});
68 ceph_assert(result.second);
69 }
70
71 bool exists(DispatchLayer dispatch_layer) override {
72 std::unique_lock locker{m_lock};
73 return m_dispatches.find(dispatch_layer) != m_dispatches.end();
74 }
75
76 void shut_down_dispatch(DispatchLayer dispatch_layer,
77 Context* on_finish) override {
78 auto cct = m_image_ctx->cct;
79 ldout(cct, 5) << "dispatch_layer=" << dispatch_layer << dendl;
80
81 DispatchMeta dispatch_meta;
82 {
83 std::unique_lock locker{m_lock};
84 auto it = m_dispatches.find(dispatch_layer);
85 if (it == m_dispatches.end()) {
86 on_finish->complete(0);
87 return;
88 }
89
90 dispatch_meta = it->second;
91 m_dispatches.erase(it);
92 }
93
94 shut_down_dispatch(dispatch_meta, &on_finish);
95 on_finish->complete(0);
96 }
97
98 void send(DispatchSpec* dispatch_spec) {
99 auto cct = m_image_ctx->cct;
100 ldout(cct, 20) << "dispatch_spec=" << dispatch_spec << dendl;
101
102 auto dispatch_layer = dispatch_spec->dispatch_layer;
103
104 // apply the IO request to all layers -- this method will be re-invoked
105 // by the dispatch layer if continuing / restarting the IO
106 while (true) {
107 m_lock.lock_shared();
108 dispatch_layer = dispatch_spec->dispatch_layer;
109 auto it = m_dispatches.upper_bound(dispatch_layer);
110 if (it == m_dispatches.end()) {
111 // the request is complete if handled by all layers
112 dispatch_spec->dispatch_result = DISPATCH_RESULT_COMPLETE;
113 m_lock.unlock_shared();
114 break;
115 }
116
117 auto& dispatch_meta = it->second;
118 auto dispatch = dispatch_meta.dispatch;
119 auto async_op_tracker = dispatch_meta.async_op_tracker;
120 dispatch_spec->dispatch_result = DISPATCH_RESULT_INVALID;
121
122 // prevent recursive locking back into the dispatcher while handling IO
123 async_op_tracker->start_op();
124 m_lock.unlock_shared();
125
126 // advance to next layer in case we skip or continue
127 dispatch_spec->dispatch_layer = dispatch->get_dispatch_layer();
128
129 bool handled = send_dispatch(dispatch, dispatch_spec);
130 async_op_tracker->finish_op();
131
132 // handled ops will resume when the dispatch ctx is invoked
133 if (handled) {
134 return;
135 }
136 }
137
138 // skipped through to the last layer
139 dispatch_spec->dispatcher_ctx.complete(0);
140 }
141
142protected:
143 struct DispatchMeta {
144 Dispatch* dispatch = nullptr;
145 AsyncOpTracker* async_op_tracker = nullptr;
146
147 DispatchMeta() {
148 }
149 DispatchMeta(Dispatch* dispatch, AsyncOpTracker* async_op_tracker)
150 : dispatch(dispatch), async_op_tracker(async_op_tracker) {
151 }
152 };
153
154 ImageCtxT* m_image_ctx;
155
156 ceph::shared_mutex m_lock;
157 std::map<DispatchLayer, DispatchMeta> m_dispatches;
158
159 virtual bool send_dispatch(Dispatch* dispatch,
160 DispatchSpec* dispatch_spec) = 0;
161
162protected:
163 struct C_LayerIterator : public Context {
164 Dispatcher* dispatcher;
165 Context* on_finish;
166 DispatchLayer dispatch_layer;
167
168 C_LayerIterator(Dispatcher* dispatcher,
169 DispatchLayer start_layer,
170 Context* on_finish)
171 : dispatcher(dispatcher), on_finish(on_finish), dispatch_layer(start_layer) {
172 }
173
174 void complete(int r) override {
175 while (true) {
176 dispatcher->m_lock.lock_shared();
177 auto it = dispatcher->m_dispatches.upper_bound(dispatch_layer);
178 if (it == dispatcher->m_dispatches.end()) {
179 dispatcher->m_lock.unlock_shared();
180 Context::complete(r);
181 return;
182 }
183
184 auto& dispatch_meta = it->second;
185 auto dispatch = dispatch_meta.dispatch;
186
187 // prevent recursive locking back into the dispatcher while handling IO
188 dispatch_meta.async_op_tracker->start_op();
189 dispatcher->m_lock.unlock_shared();
190
191 // next loop should start after current layer
192 dispatch_layer = dispatch->get_dispatch_layer();
193
194 auto handled = execute(dispatch, this);
195 dispatch_meta.async_op_tracker->finish_op();
196
197 if (handled) {
198 break;
199 }
200 }
201 }
202
203 void finish(int r) override {
204 on_finish->complete(0);
205 }
206 virtual bool execute(Dispatch* dispatch,
207 Context* on_finish) = 0;
208 };
209
210 struct C_InvalidateCache : public C_LayerIterator {
211 C_InvalidateCache(Dispatcher* dispatcher, DispatchLayer start_layer, Context* on_finish)
212 : C_LayerIterator(dispatcher, start_layer, on_finish) {
213 }
214
215 bool execute(Dispatch* dispatch,
216 Context* on_finish) override {
217 return dispatch->invalidate_cache(on_finish);
218 }
219 };
220
221private:
222 void shut_down_dispatch(DispatchMeta& dispatch_meta,
223 Context** on_finish) {
224 auto dispatch = dispatch_meta.dispatch;
225 auto async_op_tracker = dispatch_meta.async_op_tracker;
226
227 auto ctx = *on_finish;
228 ctx = new LambdaContext(
229 [dispatch, async_op_tracker, ctx](int r) {
230 delete dispatch;
231 delete async_op_tracker;
232
233 ctx->complete(r);
234 });
235 ctx = new LambdaContext([dispatch, ctx](int r) {
236 dispatch->shut_down(ctx);
237 });
238 *on_finish = new LambdaContext([async_op_tracker, ctx](int r) {
239 async_op_tracker->wait_for_ops(ctx);
240 });
241 }
242
243};
244
245} // namespace io
246} // namespace librbd
247
248#undef dout_subsys
249#undef dout_prefix
250#define dout_prefix *_dout
251
252#endif // CEPH_LIBRBD_IO_DISPATCHER_H