]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/async/detail/shared_mutex.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / common / async / detail / shared_mutex.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2018 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #pragma once
16
17 #include <condition_variable>
18 #include <mutex>
19 #include <optional>
20 #include <shared_mutex> // for std::shared_lock
21
22 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
23 #include <boost/intrusive_ptr.hpp>
24 #include <boost/intrusive/list.hpp>
25
26 #include "include/ceph_assert.h"
27
28 #include "common/async/completion.h"
29
30 namespace ceph::async::detail {
31
32 struct LockRequest : public boost::intrusive::list_base_hook<> {
33 virtual ~LockRequest() {}
34 virtual void complete(boost::system::error_code ec) = 0;
35 virtual void destroy() = 0;
36 };
37
38 class SharedMutexImpl : public boost::intrusive_ref_counter<SharedMutexImpl> {
39 public:
40 ~SharedMutexImpl();
41
42 template <typename Mutex, typename CompletionToken>
43 auto async_lock(Mutex& mtx, CompletionToken&& token);
44 void lock();
45 void lock(boost::system::error_code& ec);
46 bool try_lock();
47 void unlock();
48 template <typename Mutex, typename CompletionToken>
49 auto async_lock_shared(Mutex& mtx, CompletionToken&& token);
50 void lock_shared();
51 void lock_shared(boost::system::error_code& ec);
52 bool try_lock_shared();
53 void unlock_shared();
54 void cancel();
55
56 private:
57 using RequestList = boost::intrusive::list<LockRequest>;
58
59 RequestList shared_queue; //< requests waiting on a shared lock
60 RequestList exclusive_queue; //< requests waiting on an exclusive lock
61
62 /// lock state encodes the number of shared lockers, or 'max' for exclusive
63 using LockState = uint16_t;
64 static constexpr LockState Unlocked = 0;
65 static constexpr LockState Exclusive = std::numeric_limits<LockState>::max();
66 static constexpr LockState MaxShared = Exclusive - 1;
67 LockState state = Unlocked; //< current lock state
68
69 std::mutex mutex; //< protects lock state and wait queues
70
71 void complete(RequestList&& requests, boost::system::error_code ec);
72 };
73
74 // sync requests live on the stack and wait on a condition variable
75 class SyncRequest : public LockRequest {
76 std::condition_variable cond;
77 std::optional<boost::system::error_code> ec;
78 public:
79 boost::system::error_code wait(std::unique_lock<std::mutex>& lock) {
80 // return the error code once its been set
81 cond.wait(lock, [this] { return ec; });
82 return *ec;
83 }
84 void complete(boost::system::error_code ec) override {
85 this->ec = ec;
86 cond.notify_one();
87 }
88 void destroy() override {
89 // nothing, SyncRequests live on the stack
90 }
91 };
92
93 // async requests use async::Completion to invoke a handler on its executor
94 template <typename Mutex, template <typename> typename Lock>
95 class AsyncRequest : public LockRequest {
96 Mutex& mutex; //< mutex argument for lock guard
97 public:
98 explicit AsyncRequest(Mutex& mutex) : mutex(mutex) {}
99
100 using Signature = void(boost::system::error_code, Lock<Mutex>);
101 using LockCompletion = Completion<Signature, AsBase<AsyncRequest>>;
102
103 void complete(boost::system::error_code ec) override {
104 auto r = static_cast<LockCompletion*>(this);
105 // pass ownership of ourselves to post(). on error, pass an empty lock
106 post(std::unique_ptr<LockCompletion>{r}, ec,
107 ec ? Lock{mutex, std::defer_lock} : Lock{mutex, std::adopt_lock});
108 }
109 void destroy() override {
110 delete static_cast<LockCompletion*>(this);
111 }
112 };
113
114 inline SharedMutexImpl::~SharedMutexImpl()
115 {
116 ceph_assert(state == Unlocked);
117 ceph_assert(shared_queue.empty());
118 ceph_assert(exclusive_queue.empty());
119 }
120
121 template <typename Mutex, typename CompletionToken>
122 auto SharedMutexImpl::async_lock(Mutex& mtx, CompletionToken&& token)
123 {
124 using Request = AsyncRequest<Mutex, std::unique_lock>;
125 using Signature = typename Request::Signature;
126 boost::asio::async_completion<CompletionToken, Signature> init(token);
127 auto& handler = init.completion_handler;
128 auto ex1 = mtx.get_executor();
129 {
130 std::lock_guard lock{mutex};
131
132 boost::system::error_code ec;
133 if (state == Unlocked) {
134 state = Exclusive;
135
136 // post a successful completion
137 auto ex2 = boost::asio::get_associated_executor(handler, ex1);
138 auto alloc2 = boost::asio::get_associated_allocator(handler);
139 auto b = bind_handler(std::move(handler), ec,
140 std::unique_lock{mtx, std::adopt_lock});
141 ex2.post(forward_handler(std::move(b)), alloc2);
142 } else {
143 // create a request and add it to the exclusive list
144 using LockCompletion = typename Request::LockCompletion;
145 auto request = LockCompletion::create(ex1, std::move(handler), mtx);
146 exclusive_queue.push_back(*request.release());
147 }
148 }
149 return init.result.get();
150 }
151
152 inline void SharedMutexImpl::lock()
153 {
154 boost::system::error_code ec;
155 lock(ec);
156 if (ec) {
157 throw boost::system::system_error(ec);
158 }
159 }
160
161 void SharedMutexImpl::lock(boost::system::error_code& ec)
162 {
163 std::unique_lock lock{mutex};
164
165 if (state == Unlocked) {
166 state = Exclusive;
167 ec.clear();
168 } else {
169 SyncRequest request;
170 exclusive_queue.push_back(request);
171 ec = request.wait(lock);
172 }
173 }
174
175 inline bool SharedMutexImpl::try_lock()
176 {
177 std::lock_guard lock{mutex};
178
179 if (state == Unlocked) {
180 state = Exclusive;
181 return true;
182 }
183 return false;
184 }
185
186 void SharedMutexImpl::unlock()
187 {
188 RequestList granted;
189 {
190 std::lock_guard lock{mutex};
191 ceph_assert(state == Exclusive);
192
193 if (!exclusive_queue.empty()) {
194 // grant next exclusive lock
195 auto& request = exclusive_queue.front();
196 exclusive_queue.pop_front();
197 granted.push_back(request);
198 } else {
199 // grant shared locks, if any
200 state = shared_queue.size();
201 if (state > MaxShared) {
202 state = MaxShared;
203 auto end = std::next(shared_queue.begin(), MaxShared);
204 granted.splice(granted.end(), shared_queue,
205 shared_queue.begin(), end, MaxShared);
206 } else {
207 granted.splice(granted.end(), shared_queue);
208 }
209 }
210 }
211 complete(std::move(granted), boost::system::error_code{});
212 }
213
214 template <typename Mutex, typename CompletionToken>
215 auto SharedMutexImpl::async_lock_shared(Mutex& mtx, CompletionToken&& token)
216 {
217 using Request = AsyncRequest<Mutex, std::shared_lock>;
218 using Signature = typename Request::Signature;
219 boost::asio::async_completion<CompletionToken, Signature> init(token);
220 auto& handler = init.completion_handler;
221 auto ex1 = mtx.get_executor();
222 {
223 std::lock_guard lock{mutex};
224
225 boost::system::error_code ec;
226 if (exclusive_queue.empty() && state < MaxShared) {
227 state++;
228
229 auto ex2 = boost::asio::get_associated_executor(handler, ex1);
230 auto alloc2 = boost::asio::get_associated_allocator(handler);
231 auto b = bind_handler(std::move(handler), ec,
232 std::shared_lock{mtx, std::adopt_lock});
233 ex2.post(forward_handler(std::move(b)), alloc2);
234 } else {
235 using LockCompletion = typename Request::LockCompletion;
236 auto request = LockCompletion::create(ex1, std::move(handler), mtx);
237 shared_queue.push_back(*request.release());
238 }
239 }
240 return init.result.get();
241 }
242
243 inline void SharedMutexImpl::lock_shared()
244 {
245 boost::system::error_code ec;
246 lock_shared(ec);
247 if (ec) {
248 throw boost::system::system_error(ec);
249 }
250 }
251
252 void SharedMutexImpl::lock_shared(boost::system::error_code& ec)
253 {
254 std::unique_lock lock{mutex};
255
256 if (exclusive_queue.empty() && state < MaxShared) {
257 state++;
258 ec.clear();
259 } else {
260 SyncRequest request;
261 shared_queue.push_back(request);
262 ec = request.wait(lock);
263 }
264 }
265
266 inline bool SharedMutexImpl::try_lock_shared()
267 {
268 std::lock_guard lock{mutex};
269
270 if (exclusive_queue.empty() && state < MaxShared) {
271 state++;
272 return true;
273 }
274 return false;
275 }
276
277 inline void SharedMutexImpl::unlock_shared()
278 {
279 std::lock_guard lock{mutex};
280 ceph_assert(state != Unlocked && state <= MaxShared);
281
282 if (state == 1 && !exclusive_queue.empty()) {
283 // grant next exclusive lock
284 state = Exclusive;
285 auto& request = exclusive_queue.front();
286 exclusive_queue.pop_front();
287 request.complete(boost::system::error_code{});
288 } else if (state == MaxShared && !shared_queue.empty() &&
289 exclusive_queue.empty()) {
290 // grant next shared lock
291 auto& request = shared_queue.front();
292 shared_queue.pop_front();
293 request.complete(boost::system::error_code{});
294 } else {
295 state--;
296 }
297 }
298
299 inline void SharedMutexImpl::cancel()
300 {
301 RequestList canceled;
302 {
303 std::lock_guard lock{mutex};
304 canceled.splice(canceled.end(), shared_queue);
305 canceled.splice(canceled.end(), exclusive_queue);
306 }
307 complete(std::move(canceled), boost::asio::error::operation_aborted);
308 }
309
310 void SharedMutexImpl::complete(RequestList&& requests,
311 boost::system::error_code ec)
312 {
313 while (!requests.empty()) {
314 auto& request = requests.front();
315 requests.pop_front();
316 try {
317 request.complete(ec);
318 } catch (...) {
319 // clean up any remaining completions and rethrow
320 requests.clear_and_dispose([] (LockRequest *r) { r->destroy(); });
321 throw;
322 }
323 }
324 }
325
326 } // namespace ceph::async::detail