]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_aio_throttle.cc
import quincy 17.2.0
[ceph.git] / ceph / src / rgw / rgw_aio_throttle.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 /*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2018 Red Hat, Inc.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #include "include/rados/librados.hpp"
17
18 #include "rgw_aio_throttle.h"
19
20 namespace rgw {
21
22 bool Throttle::waiter_ready() const
23 {
24 switch (waiter) {
25 case Wait::Available: return is_available();
26 case Wait::Completion: return has_completion();
27 case Wait::Drained: return is_drained();
28 default: return false;
29 }
30 }
31
32 AioResultList BlockingAioThrottle::get(const RGWSI_RADOS::Obj& obj,
33 OpFunc&& f,
34 uint64_t cost, uint64_t id)
35 {
36 auto p = std::make_unique<Pending>();
37 p->obj = obj;
38 p->id = id;
39 p->cost = cost;
40
41 std::unique_lock lock{mutex};
42 if (cost > window) {
43 p->result = -EDEADLK; // would never succeed
44 completed.push_back(*p);
45 } else {
46 // wait for the write size to become available
47 pending_size += p->cost;
48 if (!is_available()) {
49 ceph_assert(waiter == Wait::None);
50 waiter = Wait::Available;
51 cond.wait(lock, [this] { return is_available(); });
52 waiter = Wait::None;
53 }
54
55 // register the pending write and attach a completion
56 p->parent = this;
57 pending.push_back(*p);
58 lock.unlock();
59 std::move(f)(this, *static_cast<AioResult*>(p.get()));
60 lock.lock();
61 }
62 p.release();
63 return std::move(completed);
64 }
65
66 void BlockingAioThrottle::put(AioResult& r)
67 {
68 auto& p = static_cast<Pending&>(r);
69 std::scoped_lock lock{mutex};
70
71 // move from pending to completed
72 pending.erase(pending.iterator_to(p));
73 completed.push_back(p);
74
75 pending_size -= p.cost;
76
77 if (waiter_ready()) {
78 cond.notify_one();
79 }
80 }
81
82 AioResultList BlockingAioThrottle::poll()
83 {
84 std::unique_lock lock{mutex};
85 return std::move(completed);
86 }
87
88 AioResultList BlockingAioThrottle::wait()
89 {
90 std::unique_lock lock{mutex};
91 if (completed.empty() && !pending.empty()) {
92 ceph_assert(waiter == Wait::None);
93 waiter = Wait::Completion;
94 cond.wait(lock, [this] { return has_completion(); });
95 waiter = Wait::None;
96 }
97 return std::move(completed);
98 }
99
100 AioResultList BlockingAioThrottle::drain()
101 {
102 std::unique_lock lock{mutex};
103 if (!pending.empty()) {
104 ceph_assert(waiter == Wait::None);
105 waiter = Wait::Drained;
106 cond.wait(lock, [this] { return is_drained(); });
107 waiter = Wait::None;
108 }
109 return std::move(completed);
110 }
111
112 template <typename CompletionToken>
113 auto YieldingAioThrottle::async_wait(CompletionToken&& token)
114 {
115 using boost::asio::async_completion;
116 using Signature = void(boost::system::error_code);
117 async_completion<CompletionToken, Signature> init(token);
118 completion = Completion::create(context.get_executor(),
119 std::move(init.completion_handler));
120 return init.result.get();
121 }
122
123 AioResultList YieldingAioThrottle::get(const RGWSI_RADOS::Obj& obj,
124 OpFunc&& f,
125 uint64_t cost, uint64_t id)
126 {
127 auto p = std::make_unique<Pending>();
128 p->obj = obj;
129 p->id = id;
130 p->cost = cost;
131
132 if (cost > window) {
133 p->result = -EDEADLK; // would never succeed
134 completed.push_back(*p);
135 } else {
136 // wait for the write size to become available
137 pending_size += p->cost;
138 if (!is_available()) {
139 ceph_assert(waiter == Wait::None);
140 ceph_assert(!completion);
141
142 boost::system::error_code ec;
143 waiter = Wait::Available;
144 async_wait(yield[ec]);
145 }
146
147 // register the pending write and initiate the operation
148 pending.push_back(*p);
149 std::move(f)(this, *static_cast<AioResult*>(p.get()));
150 }
151 p.release();
152 return std::move(completed);
153 }
154
155 void YieldingAioThrottle::put(AioResult& r)
156 {
157 auto& p = static_cast<Pending&>(r);
158
159 // move from pending to completed
160 pending.erase(pending.iterator_to(p));
161 completed.push_back(p);
162
163 pending_size -= p.cost;
164
165 if (waiter_ready()) {
166 ceph_assert(completion);
167 ceph::async::post(std::move(completion), boost::system::error_code{});
168 waiter = Wait::None;
169 }
170 }
171
172 AioResultList YieldingAioThrottle::poll()
173 {
174 return std::move(completed);
175 }
176
177 AioResultList YieldingAioThrottle::wait()
178 {
179 if (!has_completion() && !pending.empty()) {
180 ceph_assert(waiter == Wait::None);
181 ceph_assert(!completion);
182
183 boost::system::error_code ec;
184 waiter = Wait::Completion;
185 async_wait(yield[ec]);
186 }
187 return std::move(completed);
188 }
189
190 AioResultList YieldingAioThrottle::drain()
191 {
192 if (!is_drained()) {
193 ceph_assert(waiter == Wait::None);
194 ceph_assert(!completion);
195
196 boost::system::error_code ec;
197 waiter = Wait::Drained;
198 async_wait(yield[ec]);
199 }
200 return std::move(completed);
201 }
202 } // namespace rgw