]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #ifndef CEPH_LIBRBD_IO_DISPATCHER_H | |
5 | #define CEPH_LIBRBD_IO_DISPATCHER_H | |
6 | ||
7 | #include "include/int_types.h" | |
8 | #include "include/Context.h" | |
9 | #include "common/ceph_mutex.h" | |
10 | #include "common/dout.h" | |
11 | #include "common/AsyncOpTracker.h" | |
12 | #include "librbd/Utils.h" | |
13 | #include "librbd/io/DispatcherInterface.h" | |
14 | #include "librbd/io/Types.h" | |
15 | #include <map> | |
16 | ||
17 | #define dout_subsys ceph_subsys_rbd | |
18 | #undef dout_prefix | |
19 | #define dout_prefix *_dout << "librbd::io::Dispatcher: " << this \ | |
20 | << " " << __func__ << ": " | |
21 | ||
22 | namespace librbd { | |
23 | namespace io { | |
24 | ||
25 | template <typename ImageCtxT, typename DispatchInterfaceT> | |
26 | class Dispatcher : public DispatchInterfaceT { | |
27 | public: | |
28 | typedef typename DispatchInterfaceT::Dispatch Dispatch; | |
29 | typedef typename DispatchInterfaceT::DispatchLayer DispatchLayer; | |
30 | typedef typename DispatchInterfaceT::DispatchSpec DispatchSpec; | |
31 | ||
32 | Dispatcher(ImageCtxT* image_ctx) | |
33 | : m_image_ctx(image_ctx), | |
34 | m_lock(ceph::make_shared_mutex( | |
35 | librbd::util::unique_lock_name("librbd::io::Dispatcher::lock", | |
36 | this))) { | |
37 | } | |
38 | ||
39 | virtual ~Dispatcher() { | |
40 | ceph_assert(m_dispatches.empty()); | |
41 | } | |
42 | ||
43 | void shut_down(Context* on_finish) override { | |
44 | auto cct = m_image_ctx->cct; | |
45 | ldout(cct, 5) << dendl; | |
46 | ||
47 | std::map<DispatchLayer, DispatchMeta> dispatches; | |
48 | { | |
49 | std::unique_lock locker{m_lock}; | |
50 | std::swap(dispatches, m_dispatches); | |
51 | } | |
52 | ||
53 | for (auto it : dispatches) { | |
54 | shut_down_dispatch(it.second, &on_finish); | |
55 | } | |
56 | on_finish->complete(0); | |
57 | } | |
58 | ||
59 | void register_dispatch(Dispatch* dispatch) override { | |
60 | auto cct = m_image_ctx->cct; | |
61 | auto type = dispatch->get_dispatch_layer(); | |
62 | ldout(cct, 5) << "dispatch_layer=" << type << dendl; | |
63 | ||
64 | std::unique_lock locker{m_lock}; | |
65 | ||
66 | auto result = m_dispatches.insert( | |
67 | {type, {dispatch, new AsyncOpTracker()}}); | |
68 | ceph_assert(result.second); | |
69 | } | |
70 | ||
71 | bool exists(DispatchLayer dispatch_layer) override { | |
72 | std::unique_lock locker{m_lock}; | |
73 | return m_dispatches.find(dispatch_layer) != m_dispatches.end(); | |
74 | } | |
75 | ||
76 | void shut_down_dispatch(DispatchLayer dispatch_layer, | |
77 | Context* on_finish) override { | |
78 | auto cct = m_image_ctx->cct; | |
79 | ldout(cct, 5) << "dispatch_layer=" << dispatch_layer << dendl; | |
80 | ||
81 | DispatchMeta dispatch_meta; | |
82 | { | |
83 | std::unique_lock locker{m_lock}; | |
84 | auto it = m_dispatches.find(dispatch_layer); | |
85 | if (it == m_dispatches.end()) { | |
86 | on_finish->complete(0); | |
87 | return; | |
88 | } | |
89 | ||
90 | dispatch_meta = it->second; | |
91 | m_dispatches.erase(it); | |
92 | } | |
93 | ||
94 | shut_down_dispatch(dispatch_meta, &on_finish); | |
95 | on_finish->complete(0); | |
96 | } | |
97 | ||
98 | void send(DispatchSpec* dispatch_spec) { | |
99 | auto cct = m_image_ctx->cct; | |
100 | ldout(cct, 20) << "dispatch_spec=" << dispatch_spec << dendl; | |
101 | ||
102 | auto dispatch_layer = dispatch_spec->dispatch_layer; | |
103 | ||
104 | // apply the IO request to all layers -- this method will be re-invoked | |
105 | // by the dispatch layer if continuing / restarting the IO | |
106 | while (true) { | |
107 | m_lock.lock_shared(); | |
108 | dispatch_layer = dispatch_spec->dispatch_layer; | |
109 | auto it = m_dispatches.upper_bound(dispatch_layer); | |
110 | if (it == m_dispatches.end()) { | |
111 | // the request is complete if handled by all layers | |
112 | dispatch_spec->dispatch_result = DISPATCH_RESULT_COMPLETE; | |
113 | m_lock.unlock_shared(); | |
114 | break; | |
115 | } | |
116 | ||
117 | auto& dispatch_meta = it->second; | |
118 | auto dispatch = dispatch_meta.dispatch; | |
119 | auto async_op_tracker = dispatch_meta.async_op_tracker; | |
120 | dispatch_spec->dispatch_result = DISPATCH_RESULT_INVALID; | |
121 | ||
122 | // prevent recursive locking back into the dispatcher while handling IO | |
123 | async_op_tracker->start_op(); | |
124 | m_lock.unlock_shared(); | |
125 | ||
126 | // advance to next layer in case we skip or continue | |
127 | dispatch_spec->dispatch_layer = dispatch->get_dispatch_layer(); | |
128 | ||
129 | bool handled = send_dispatch(dispatch, dispatch_spec); | |
130 | async_op_tracker->finish_op(); | |
131 | ||
132 | // handled ops will resume when the dispatch ctx is invoked | |
133 | if (handled) { | |
134 | return; | |
135 | } | |
136 | } | |
137 | ||
138 | // skipped through to the last layer | |
139 | dispatch_spec->dispatcher_ctx.complete(0); | |
140 | } | |
141 | ||
142 | protected: | |
143 | struct DispatchMeta { | |
144 | Dispatch* dispatch = nullptr; | |
145 | AsyncOpTracker* async_op_tracker = nullptr; | |
146 | ||
147 | DispatchMeta() { | |
148 | } | |
149 | DispatchMeta(Dispatch* dispatch, AsyncOpTracker* async_op_tracker) | |
150 | : dispatch(dispatch), async_op_tracker(async_op_tracker) { | |
151 | } | |
152 | }; | |
153 | ||
154 | ImageCtxT* m_image_ctx; | |
155 | ||
156 | ceph::shared_mutex m_lock; | |
157 | std::map<DispatchLayer, DispatchMeta> m_dispatches; | |
158 | ||
159 | virtual bool send_dispatch(Dispatch* dispatch, | |
160 | DispatchSpec* dispatch_spec) = 0; | |
161 | ||
162 | protected: | |
163 | struct C_LayerIterator : public Context { | |
164 | Dispatcher* dispatcher; | |
165 | Context* on_finish; | |
166 | DispatchLayer dispatch_layer; | |
167 | ||
168 | C_LayerIterator(Dispatcher* dispatcher, | |
169 | DispatchLayer start_layer, | |
170 | Context* on_finish) | |
171 | : dispatcher(dispatcher), on_finish(on_finish), dispatch_layer(start_layer) { | |
172 | } | |
173 | ||
174 | void complete(int r) override { | |
175 | while (true) { | |
176 | dispatcher->m_lock.lock_shared(); | |
177 | auto it = dispatcher->m_dispatches.upper_bound(dispatch_layer); | |
178 | if (it == dispatcher->m_dispatches.end()) { | |
179 | dispatcher->m_lock.unlock_shared(); | |
180 | Context::complete(r); | |
181 | return; | |
182 | } | |
183 | ||
184 | auto& dispatch_meta = it->second; | |
185 | auto dispatch = dispatch_meta.dispatch; | |
186 | ||
187 | // prevent recursive locking back into the dispatcher while handling IO | |
188 | dispatch_meta.async_op_tracker->start_op(); | |
189 | dispatcher->m_lock.unlock_shared(); | |
190 | ||
191 | // next loop should start after current layer | |
192 | dispatch_layer = dispatch->get_dispatch_layer(); | |
193 | ||
194 | auto handled = execute(dispatch, this); | |
195 | dispatch_meta.async_op_tracker->finish_op(); | |
196 | ||
197 | if (handled) { | |
198 | break; | |
199 | } | |
200 | } | |
201 | } | |
202 | ||
203 | void finish(int r) override { | |
204 | on_finish->complete(0); | |
205 | } | |
206 | virtual bool execute(Dispatch* dispatch, | |
207 | Context* on_finish) = 0; | |
208 | }; | |
209 | ||
210 | struct C_InvalidateCache : public C_LayerIterator { | |
211 | C_InvalidateCache(Dispatcher* dispatcher, DispatchLayer start_layer, Context* on_finish) | |
212 | : C_LayerIterator(dispatcher, start_layer, on_finish) { | |
213 | } | |
214 | ||
215 | bool execute(Dispatch* dispatch, | |
216 | Context* on_finish) override { | |
217 | return dispatch->invalidate_cache(on_finish); | |
218 | } | |
219 | }; | |
220 | ||
221 | private: | |
222 | void shut_down_dispatch(DispatchMeta& dispatch_meta, | |
223 | Context** on_finish) { | |
224 | auto dispatch = dispatch_meta.dispatch; | |
225 | auto async_op_tracker = dispatch_meta.async_op_tracker; | |
226 | ||
227 | auto ctx = *on_finish; | |
228 | ctx = new LambdaContext( | |
229 | [dispatch, async_op_tracker, ctx](int r) { | |
230 | delete dispatch; | |
231 | delete async_op_tracker; | |
232 | ||
233 | ctx->complete(r); | |
234 | }); | |
235 | ctx = new LambdaContext([dispatch, ctx](int r) { | |
236 | dispatch->shut_down(ctx); | |
237 | }); | |
238 | *on_finish = new LambdaContext([async_op_tracker, ctx](int r) { | |
239 | async_op_tracker->wait_for_ops(ctx); | |
240 | }); | |
241 | } | |
242 | ||
243 | }; | |
244 | ||
245 | } // namespace io | |
246 | } // namespace librbd | |
247 | ||
248 | #undef dout_subsys | |
249 | #undef dout_prefix | |
250 | #define dout_prefix *_dout | |
251 | ||
252 | #endif // CEPH_LIBRBD_IO_DISPATCHER_H |