]>
git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_aio_throttle.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2018 Red Hat, Inc.
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
16 #include "include/rados/librados.hpp"
18 #include "rgw_aio_throttle.h"
22 bool Throttle::waiter_ready() const
25 case Wait::Available
: return is_available();
26 case Wait::Completion
: return has_completion();
27 case Wait::Drained
: return is_drained();
28 default: return false;
32 AioResultList
BlockingAioThrottle::get(const RGWSI_RADOS::Obj
& obj
,
34 uint64_t cost
, uint64_t id
)
36 auto p
= std::make_unique
<Pending
>();
41 std::unique_lock lock
{mutex
};
43 p
->result
= -EDEADLK
; // would never succeed
44 completed
.push_back(*p
);
46 // wait for the write size to become available
47 pending_size
+= p
->cost
;
48 if (!is_available()) {
49 ceph_assert(waiter
== Wait::None
);
50 waiter
= Wait::Available
;
51 cond
.wait(lock
, [this] { return is_available(); });
55 // register the pending write and attach a completion
57 pending
.push_back(*p
);
59 std::move(f
)(this, *static_cast<AioResult
*>(p
.get()));
63 return std::move(completed
);
66 void BlockingAioThrottle::put(AioResult
& r
)
68 auto& p
= static_cast<Pending
&>(r
);
69 std::scoped_lock lock
{mutex
};
71 // move from pending to completed
72 pending
.erase(pending
.iterator_to(p
));
73 completed
.push_back(p
);
75 pending_size
-= p
.cost
;
82 AioResultList
BlockingAioThrottle::poll()
84 std::unique_lock lock
{mutex
};
85 return std::move(completed
);
88 AioResultList
BlockingAioThrottle::wait()
90 std::unique_lock lock
{mutex
};
91 if (completed
.empty() && !pending
.empty()) {
92 ceph_assert(waiter
== Wait::None
);
93 waiter
= Wait::Completion
;
94 cond
.wait(lock
, [this] { return has_completion(); });
97 return std::move(completed
);
100 AioResultList
BlockingAioThrottle::drain()
102 std::unique_lock lock
{mutex
};
103 if (!pending
.empty()) {
104 ceph_assert(waiter
== Wait::None
);
105 waiter
= Wait::Drained
;
106 cond
.wait(lock
, [this] { return is_drained(); });
109 return std::move(completed
);
112 template <typename CompletionToken
>
113 auto YieldingAioThrottle::async_wait(CompletionToken
&& token
)
115 using boost::asio::async_completion
;
116 using Signature
= void(boost::system::error_code
);
117 async_completion
<CompletionToken
, Signature
> init(token
);
118 completion
= Completion::create(context
.get_executor(),
119 std::move(init
.completion_handler
));
120 return init
.result
.get();
123 AioResultList
YieldingAioThrottle::get(const RGWSI_RADOS::Obj
& obj
,
125 uint64_t cost
, uint64_t id
)
127 auto p
= std::make_unique
<Pending
>();
133 p
->result
= -EDEADLK
; // would never succeed
134 completed
.push_back(*p
);
136 // wait for the write size to become available
137 pending_size
+= p
->cost
;
138 if (!is_available()) {
139 ceph_assert(waiter
== Wait::None
);
140 ceph_assert(!completion
);
142 boost::system::error_code ec
;
143 waiter
= Wait::Available
;
144 async_wait(yield
[ec
]);
147 // register the pending write and initiate the operation
148 pending
.push_back(*p
);
149 std::move(f
)(this, *static_cast<AioResult
*>(p
.get()));
152 return std::move(completed
);
155 void YieldingAioThrottle::put(AioResult
& r
)
157 auto& p
= static_cast<Pending
&>(r
);
159 // move from pending to completed
160 pending
.erase(pending
.iterator_to(p
));
161 completed
.push_back(p
);
163 pending_size
-= p
.cost
;
165 if (waiter_ready()) {
166 ceph_assert(completion
);
167 ceph::async::post(std::move(completion
), boost::system::error_code
{});
172 AioResultList
YieldingAioThrottle::poll()
174 return std::move(completed
);
177 AioResultList
YieldingAioThrottle::wait()
179 if (!has_completion() && !pending
.empty()) {
180 ceph_assert(waiter
== Wait::None
);
181 ceph_assert(!completion
);
183 boost::system::error_code ec
;
184 waiter
= Wait::Completion
;
185 async_wait(yield
[ec
]);
187 return std::move(completed
);
190 AioResultList
YieldingAioThrottle::drain()
193 ceph_assert(waiter
== Wait::None
);
194 ceph_assert(!completion
);
196 boost::system::error_code ec
;
197 waiter
= Wait::Drained
;
198 async_wait(yield
[ec
]);
200 return std::move(completed
);