]>
git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_aio_throttle.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2018 Red Hat, Inc.
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
16 #include "include/rados/librados.hpp"
18 #include "rgw_aio_throttle.h"
19 #include "rgw_rados.h"
23 bool Throttle::waiter_ready() const
26 case Wait::Available
: return is_available();
27 case Wait::Completion
: return has_completion();
28 case Wait::Drained
: return is_drained();
29 default: return false;
33 AioResultList
BlockingAioThrottle::get(const RGWSI_RADOS::Obj
& obj
,
35 uint64_t cost
, uint64_t id
)
37 auto p
= std::make_unique
<Pending
>();
42 std::unique_lock lock
{mutex
};
44 p
->result
= -EDEADLK
; // would never succeed
45 completed
.push_back(*p
);
47 // wait for the write size to become available
48 pending_size
+= p
->cost
;
49 if (!is_available()) {
50 ceph_assert(waiter
== Wait::None
);
51 waiter
= Wait::Available
;
52 cond
.wait(lock
, [this] { return is_available(); });
56 // register the pending write and attach a completion
58 pending
.push_back(*p
);
60 std::move(f
)(this, *static_cast<AioResult
*>(p
.get()));
64 return std::move(completed
);
67 void BlockingAioThrottle::put(AioResult
& r
)
69 auto& p
= static_cast<Pending
&>(r
);
70 std::scoped_lock lock
{mutex
};
72 // move from pending to completed
73 pending
.erase(pending
.iterator_to(p
));
74 completed
.push_back(p
);
76 pending_size
-= p
.cost
;
83 AioResultList
BlockingAioThrottle::poll()
85 std::unique_lock lock
{mutex
};
86 return std::move(completed
);
89 AioResultList
BlockingAioThrottle::wait()
91 std::unique_lock lock
{mutex
};
92 if (completed
.empty() && !pending
.empty()) {
93 ceph_assert(waiter
== Wait::None
);
94 waiter
= Wait::Completion
;
95 cond
.wait(lock
, [this] { return has_completion(); });
98 return std::move(completed
);
101 AioResultList
BlockingAioThrottle::drain()
103 std::unique_lock lock
{mutex
};
104 if (!pending
.empty()) {
105 ceph_assert(waiter
== Wait::None
);
106 waiter
= Wait::Drained
;
107 cond
.wait(lock
, [this] { return is_drained(); });
110 return std::move(completed
);
113 #ifdef HAVE_BOOST_CONTEXT
115 template <typename CompletionToken
>
116 auto YieldingAioThrottle::async_wait(CompletionToken
&& token
)
118 using boost::asio::async_completion
;
119 using Signature
= void(boost::system::error_code
);
120 async_completion
<CompletionToken
, Signature
> init(token
);
121 completion
= Completion::create(context
.get_executor(),
122 std::move(init
.completion_handler
));
123 return init
.result
.get();
126 AioResultList
YieldingAioThrottle::get(const RGWSI_RADOS::Obj
& obj
,
128 uint64_t cost
, uint64_t id
)
130 auto p
= std::make_unique
<Pending
>();
136 p
->result
= -EDEADLK
; // would never succeed
137 completed
.push_back(*p
);
139 // wait for the write size to become available
140 pending_size
+= p
->cost
;
141 if (!is_available()) {
142 ceph_assert(waiter
== Wait::None
);
143 ceph_assert(!completion
);
145 boost::system::error_code ec
;
146 waiter
= Wait::Available
;
147 async_wait(yield
[ec
]);
150 // register the pending write and initiate the operation
151 pending
.push_back(*p
);
152 std::move(f
)(this, *static_cast<AioResult
*>(p
.get()));
155 return std::move(completed
);
158 void YieldingAioThrottle::put(AioResult
& r
)
160 auto& p
= static_cast<Pending
&>(r
);
162 // move from pending to completed
163 pending
.erase(pending
.iterator_to(p
));
164 completed
.push_back(p
);
166 pending_size
-= p
.cost
;
168 if (waiter_ready()) {
169 ceph_assert(completion
);
170 ceph::async::post(std::move(completion
), boost::system::error_code
{});
175 AioResultList
YieldingAioThrottle::poll()
177 return std::move(completed
);
180 AioResultList
YieldingAioThrottle::wait()
182 if (!has_completion() && !pending
.empty()) {
183 ceph_assert(waiter
== Wait::None
);
184 ceph_assert(!completion
);
186 boost::system::error_code ec
;
187 waiter
= Wait::Completion
;
188 async_wait(yield
[ec
]);
190 return std::move(completed
);
193 AioResultList
YieldingAioThrottle::drain()
196 ceph_assert(waiter
== Wait::None
);
197 ceph_assert(!completion
);
199 boost::system::error_code ec
;
200 waiter
= Wait::Drained
;
201 async_wait(yield
[ec
]);
203 return std::move(completed
);
205 #endif // HAVE_BOOST_CONTEXT