]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_aio_throttle.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_aio_throttle.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 /*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2018 Red Hat, Inc.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #include "include/rados/librados.hpp"
17
18 #include "rgw_aio_throttle.h"
19 #include "rgw_rados.h"
20
21 namespace rgw {
22
23 bool Throttle::waiter_ready() const
24 {
25 switch (waiter) {
26 case Wait::Available: return is_available();
27 case Wait::Completion: return has_completion();
28 case Wait::Drained: return is_drained();
29 default: return false;
30 }
31 }
32
33 AioResultList BlockingAioThrottle::get(const RGWSI_RADOS::Obj& obj,
34 OpFunc&& f,
35 uint64_t cost, uint64_t id)
36 {
37 auto p = std::make_unique<Pending>();
38 p->obj = obj;
39 p->id = id;
40 p->cost = cost;
41
42 std::unique_lock lock{mutex};
43 if (cost > window) {
44 p->result = -EDEADLK; // would never succeed
45 completed.push_back(*p);
46 } else {
47 // wait for the write size to become available
48 pending_size += p->cost;
49 if (!is_available()) {
50 ceph_assert(waiter == Wait::None);
51 waiter = Wait::Available;
52 cond.wait(lock, [this] { return is_available(); });
53 waiter = Wait::None;
54 }
55
56 // register the pending write and attach a completion
57 p->parent = this;
58 pending.push_back(*p);
59 lock.unlock();
60 std::move(f)(this, *static_cast<AioResult*>(p.get()));
61 lock.lock();
62 }
63 p.release();
64 return std::move(completed);
65 }
66
67 void BlockingAioThrottle::put(AioResult& r)
68 {
69 auto& p = static_cast<Pending&>(r);
70 std::scoped_lock lock{mutex};
71
72 // move from pending to completed
73 pending.erase(pending.iterator_to(p));
74 completed.push_back(p);
75
76 pending_size -= p.cost;
77
78 if (waiter_ready()) {
79 cond.notify_one();
80 }
81 }
82
83 AioResultList BlockingAioThrottle::poll()
84 {
85 std::unique_lock lock{mutex};
86 return std::move(completed);
87 }
88
89 AioResultList BlockingAioThrottle::wait()
90 {
91 std::unique_lock lock{mutex};
92 if (completed.empty() && !pending.empty()) {
93 ceph_assert(waiter == Wait::None);
94 waiter = Wait::Completion;
95 cond.wait(lock, [this] { return has_completion(); });
96 waiter = Wait::None;
97 }
98 return std::move(completed);
99 }
100
101 AioResultList BlockingAioThrottle::drain()
102 {
103 std::unique_lock lock{mutex};
104 if (!pending.empty()) {
105 ceph_assert(waiter == Wait::None);
106 waiter = Wait::Drained;
107 cond.wait(lock, [this] { return is_drained(); });
108 waiter = Wait::None;
109 }
110 return std::move(completed);
111 }
112
113 #ifdef HAVE_BOOST_CONTEXT
114
115 template <typename CompletionToken>
116 auto YieldingAioThrottle::async_wait(CompletionToken&& token)
117 {
118 using boost::asio::async_completion;
119 using Signature = void(boost::system::error_code);
120 async_completion<CompletionToken, Signature> init(token);
121 completion = Completion::create(context.get_executor(),
122 std::move(init.completion_handler));
123 return init.result.get();
124 }
125
126 AioResultList YieldingAioThrottle::get(const RGWSI_RADOS::Obj& obj,
127 OpFunc&& f,
128 uint64_t cost, uint64_t id)
129 {
130 auto p = std::make_unique<Pending>();
131 p->obj = obj;
132 p->id = id;
133 p->cost = cost;
134
135 if (cost > window) {
136 p->result = -EDEADLK; // would never succeed
137 completed.push_back(*p);
138 } else {
139 // wait for the write size to become available
140 pending_size += p->cost;
141 if (!is_available()) {
142 ceph_assert(waiter == Wait::None);
143 ceph_assert(!completion);
144
145 boost::system::error_code ec;
146 waiter = Wait::Available;
147 async_wait(yield[ec]);
148 }
149
150 // register the pending write and initiate the operation
151 pending.push_back(*p);
152 std::move(f)(this, *static_cast<AioResult*>(p.get()));
153 }
154 p.release();
155 return std::move(completed);
156 }
157
158 void YieldingAioThrottle::put(AioResult& r)
159 {
160 auto& p = static_cast<Pending&>(r);
161
162 // move from pending to completed
163 pending.erase(pending.iterator_to(p));
164 completed.push_back(p);
165
166 pending_size -= p.cost;
167
168 if (waiter_ready()) {
169 ceph_assert(completion);
170 ceph::async::post(std::move(completion), boost::system::error_code{});
171 waiter = Wait::None;
172 }
173 }
174
175 AioResultList YieldingAioThrottle::poll()
176 {
177 return std::move(completed);
178 }
179
180 AioResultList YieldingAioThrottle::wait()
181 {
182 if (!has_completion() && !pending.empty()) {
183 ceph_assert(waiter == Wait::None);
184 ceph_assert(!completion);
185
186 boost::system::error_code ec;
187 waiter = Wait::Completion;
188 async_wait(yield[ec]);
189 }
190 return std::move(completed);
191 }
192
193 AioResultList YieldingAioThrottle::drain()
194 {
195 if (!is_drained()) {
196 ceph_assert(waiter == Wait::None);
197 ceph_assert(!completion);
198
199 boost::system::error_code ec;
200 waiter = Wait::Drained;
201 async_wait(yield[ec]);
202 }
203 return std::move(completed);
204 }
205 #endif // HAVE_BOOST_CONTEXT
206
207 } // namespace rgw