1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2018 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include <condition_variable>
20 #include <shared_mutex> // for std::shared_lock
22 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
23 #include <boost/intrusive_ptr.hpp>
24 #include <boost/intrusive/list.hpp>
26 #include "include/ceph_assert.h"
28 #include "common/async/completion.h"
30 namespace ceph::async::detail
{
32 struct LockRequest
: public boost::intrusive::list_base_hook
<> {
33 virtual ~LockRequest() {}
34 virtual void complete(boost::system::error_code ec
) = 0;
35 virtual void destroy() = 0;
38 class SharedMutexImpl
: public boost::intrusive_ref_counter
<SharedMutexImpl
> {
42 template <typename Mutex
, typename CompletionToken
>
43 auto async_lock(Mutex
& mtx
, CompletionToken
&& token
);
45 void lock(boost::system::error_code
& ec
);
48 template <typename Mutex
, typename CompletionToken
>
49 auto async_lock_shared(Mutex
& mtx
, CompletionToken
&& token
);
51 void lock_shared(boost::system::error_code
& ec
);
52 bool try_lock_shared();
57 using RequestList
= boost::intrusive::list
<LockRequest
>;
59 RequestList shared_queue
; //< requests waiting on a shared lock
60 RequestList exclusive_queue
; //< requests waiting on an exclusive lock
62 /// lock state encodes the number of shared lockers, or 'max' for exclusive
63 using LockState
= uint16_t;
64 static constexpr LockState Unlocked
= 0;
65 static constexpr LockState Exclusive
= std::numeric_limits
<LockState
>::max();
66 static constexpr LockState MaxShared
= Exclusive
- 1;
67 LockState state
= Unlocked
; //< current lock state
69 std::mutex mutex
; //< protects lock state and wait queues
71 void complete(RequestList
&& requests
, boost::system::error_code ec
);
74 // sync requests live on the stack and wait on a condition variable
75 class SyncRequest
: public LockRequest
{
76 std::condition_variable cond
;
77 std::optional
<boost::system::error_code
> ec
;
79 boost::system::error_code
wait(std::unique_lock
<std::mutex
>& lock
) {
80 // return the error code once its been set
81 cond
.wait(lock
, [this] { return ec
; });
84 void complete(boost::system::error_code ec
) override
{
88 void destroy() override
{
89 // nothing, SyncRequests live on the stack
93 // async requests use async::Completion to invoke a handler on its executor
94 template <typename Mutex
, template <typename
> typename Lock
>
95 class AsyncRequest
: public LockRequest
{
96 Mutex
& mutex
; //< mutex argument for lock guard
98 explicit AsyncRequest(Mutex
& mutex
) : mutex(mutex
) {}
100 using Signature
= void(boost::system::error_code
, Lock
<Mutex
>);
101 using LockCompletion
= Completion
<Signature
, AsBase
<AsyncRequest
>>;
103 void complete(boost::system::error_code ec
) override
{
104 auto r
= static_cast<LockCompletion
*>(this);
105 // pass ownership of ourselves to post(). on error, pass an empty lock
106 post(std::unique_ptr
<LockCompletion
>{r
}, ec
,
107 ec
? Lock
{mutex
, std::defer_lock
} : Lock
{mutex
, std::adopt_lock
});
109 void destroy() override
{
110 delete static_cast<LockCompletion
*>(this);
114 inline SharedMutexImpl::~SharedMutexImpl()
116 ceph_assert(state
== Unlocked
);
117 ceph_assert(shared_queue
.empty());
118 ceph_assert(exclusive_queue
.empty());
121 template <typename Mutex
, typename CompletionToken
>
122 auto SharedMutexImpl::async_lock(Mutex
& mtx
, CompletionToken
&& token
)
124 using Request
= AsyncRequest
<Mutex
, std::unique_lock
>;
125 using Signature
= typename
Request::Signature
;
126 boost::asio::async_completion
<CompletionToken
, Signature
> init(token
);
127 auto& handler
= init
.completion_handler
;
128 auto ex1
= mtx
.get_executor();
130 std::lock_guard lock
{mutex
};
132 boost::system::error_code ec
;
133 if (state
== Unlocked
) {
136 // post a successful completion
137 auto ex2
= boost::asio::get_associated_executor(handler
, ex1
);
138 auto alloc2
= boost::asio::get_associated_allocator(handler
);
139 auto b
= bind_handler(std::move(handler
), ec
,
140 std::unique_lock
{mtx
, std::adopt_lock
});
141 ex2
.post(forward_handler(std::move(b
)), alloc2
);
143 // create a request and add it to the exclusive list
144 using LockCompletion
= typename
Request::LockCompletion
;
145 auto request
= LockCompletion::create(ex1
, std::move(handler
), mtx
);
146 exclusive_queue
.push_back(*request
.release());
149 return init
.result
.get();
152 inline void SharedMutexImpl::lock()
154 boost::system::error_code ec
;
157 throw boost::system::system_error(ec
);
161 void SharedMutexImpl::lock(boost::system::error_code
& ec
)
163 std::unique_lock lock
{mutex
};
165 if (state
== Unlocked
) {
170 exclusive_queue
.push_back(request
);
171 ec
= request
.wait(lock
);
175 inline bool SharedMutexImpl::try_lock()
177 std::lock_guard lock
{mutex
};
179 if (state
== Unlocked
) {
186 void SharedMutexImpl::unlock()
190 std::lock_guard lock
{mutex
};
191 ceph_assert(state
== Exclusive
);
193 if (!exclusive_queue
.empty()) {
194 // grant next exclusive lock
195 auto& request
= exclusive_queue
.front();
196 exclusive_queue
.pop_front();
197 granted
.push_back(request
);
199 // grant shared locks, if any
200 state
= shared_queue
.size();
201 if (state
> MaxShared
) {
203 auto end
= std::next(shared_queue
.begin(), MaxShared
);
204 granted
.splice(granted
.end(), shared_queue
,
205 shared_queue
.begin(), end
, MaxShared
);
207 granted
.splice(granted
.end(), shared_queue
);
211 complete(std::move(granted
), boost::system::error_code
{});
214 template <typename Mutex
, typename CompletionToken
>
215 auto SharedMutexImpl::async_lock_shared(Mutex
& mtx
, CompletionToken
&& token
)
217 using Request
= AsyncRequest
<Mutex
, std::shared_lock
>;
218 using Signature
= typename
Request::Signature
;
219 boost::asio::async_completion
<CompletionToken
, Signature
> init(token
);
220 auto& handler
= init
.completion_handler
;
221 auto ex1
= mtx
.get_executor();
223 std::lock_guard lock
{mutex
};
225 boost::system::error_code ec
;
226 if (exclusive_queue
.empty() && state
< MaxShared
) {
229 auto ex2
= boost::asio::get_associated_executor(handler
, ex1
);
230 auto alloc2
= boost::asio::get_associated_allocator(handler
);
231 auto b
= bind_handler(std::move(handler
), ec
,
232 std::shared_lock
{mtx
, std::adopt_lock
});
233 ex2
.post(forward_handler(std::move(b
)), alloc2
);
235 using LockCompletion
= typename
Request::LockCompletion
;
236 auto request
= LockCompletion::create(ex1
, std::move(handler
), mtx
);
237 shared_queue
.push_back(*request
.release());
240 return init
.result
.get();
243 inline void SharedMutexImpl::lock_shared()
245 boost::system::error_code ec
;
248 throw boost::system::system_error(ec
);
252 void SharedMutexImpl::lock_shared(boost::system::error_code
& ec
)
254 std::unique_lock lock
{mutex
};
256 if (exclusive_queue
.empty() && state
< MaxShared
) {
261 shared_queue
.push_back(request
);
262 ec
= request
.wait(lock
);
266 inline bool SharedMutexImpl::try_lock_shared()
268 std::lock_guard lock
{mutex
};
270 if (exclusive_queue
.empty() && state
< MaxShared
) {
277 inline void SharedMutexImpl::unlock_shared()
279 std::lock_guard lock
{mutex
};
280 ceph_assert(state
!= Unlocked
&& state
<= MaxShared
);
282 if (state
== 1 && !exclusive_queue
.empty()) {
283 // grant next exclusive lock
285 auto& request
= exclusive_queue
.front();
286 exclusive_queue
.pop_front();
287 request
.complete(boost::system::error_code
{});
288 } else if (state
== MaxShared
&& !shared_queue
.empty() &&
289 exclusive_queue
.empty()) {
290 // grant next shared lock
291 auto& request
= shared_queue
.front();
292 shared_queue
.pop_front();
293 request
.complete(boost::system::error_code
{});
299 inline void SharedMutexImpl::cancel()
301 RequestList canceled
;
303 std::lock_guard lock
{mutex
};
304 canceled
.splice(canceled
.end(), shared_queue
);
305 canceled
.splice(canceled
.end(), exclusive_queue
);
307 complete(std::move(canceled
), boost::asio::error::operation_aborted
);
310 void SharedMutexImpl::complete(RequestList
&& requests
,
311 boost::system::error_code ec
)
313 while (!requests
.empty()) {
314 auto& request
= requests
.front();
315 requests
.pop_front();
317 request
.complete(ec
);
319 // clean up any remaining completions and rethrow
320 requests
.clear_and_dispose([] (LockRequest
*r
) { r
->destroy(); });
326 } // namespace ceph::async::detail