]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_dmclock_async_scheduler.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_dmclock_async_scheduler.h
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
11fdf7f2
TL
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2018 Red Hat, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef RGW_DMCLOCK_ASYNC_SCHEDULER_H
16#define RGW_DMCLOCK_ASYNC_SCHEDULER_H
17
18#include "common/async/completion.h"
19
20#include <boost/asio.hpp>
21#include "rgw_dmclock_scheduler.h"
22#include "rgw_dmclock_scheduler_ctx.h"
23
24namespace rgw::dmclock {
25 namespace async = ceph::async;
26
27/*
28 * A dmclock request scheduling service for use with boost::asio.
29 *
30 * An asynchronous dmclock priority queue, where scheduled requests complete
31 * on a boost::asio executor.
32 */
33class AsyncScheduler : public md_config_obs_t, public Scheduler {
34 public:
35 template <typename ...Args> // args forwarded to PullPriorityQueue ctor
36 AsyncScheduler(CephContext *cct, boost::asio::io_context& context,
37 GetClientCounters&& counters, md_config_obs_t *observer,
38 Args&& ...args);
39 ~AsyncScheduler();
40
41 using executor_type = boost::asio::io_context::executor_type;
42
43 /// return the default executor for async_request() callbacks
44 executor_type get_executor() noexcept {
45 return timer.get_executor();
46 }
47
48 /// submit an async request for dmclock scheduling. the given completion
49 /// handler will be invoked with (error_code, PhaseType) when the request
50 /// is ready or canceled. on success, this grants a throttle unit that must
51 /// be returned with a call to request_complete()
52 template <typename CompletionToken>
53 auto async_request(const client_id& client, const ReqParams& params,
54 const Time& time, Cost cost, CompletionToken&& token);
55
56 /// returns a throttle unit granted by async_request()
57 void request_complete() override;
58
59 /// cancel all queued requests, invoking their completion handlers with an
60 /// operation_aborted error and default-constructed result
61 void cancel();
62
63 /// cancel all queued requests for a given client, invoking their completion
64 /// handler with an operation_aborted error and default-constructed result
65 void cancel(const client_id& client);
66
67 const char** get_tracked_conf_keys() const override;
68 void handle_conf_change(const ConfigProxy& conf,
69 const std::set<std::string>& changed) override;
70
71 private:
72 int schedule_request_impl(const client_id& client, const ReqParams& params,
73 const Time& time, const Cost& cost,
74 optional_yield yield_ctx) override;
75
76 static constexpr bool IsDelayed = false;
77 using Queue = crimson::dmclock::PullPriorityQueue<client_id, Request, IsDelayed>;
78 using RequestRef = typename Queue::RequestRef;
79 Queue queue; //< dmclock priority queue
80
81 using Signature = void(boost::system::error_code, PhaseType);
82 using Completion = async::Completion<Signature, async::AsBase<Request>>;
83
84 using Clock = ceph::coarse_real_clock;
92f5a8d4 85#if BOOST_VERSION < 107000
11fdf7f2 86 using Timer = boost::asio::basic_waitable_timer<Clock>;
92f5a8d4
TL
87#else
88 using Timer = boost::asio::basic_waitable_timer<Clock,
89 boost::asio::wait_traits<Clock>, executor_type>;
90#endif
11fdf7f2
TL
91 Timer timer; //< timer for the next scheduled request
92
93 CephContext *const cct;
94 md_config_obs_t *const observer; //< observer to update ClientInfoFunc
95 GetClientCounters counters; //< provides per-client perf counters
96
97 /// max request throttle
98 std::atomic<int64_t> max_requests;
99 std::atomic<int64_t> outstanding_requests = 0;
100
101 /// set a timer to process the next request
102 void schedule(const Time& time);
103
104 /// process ready requests, then schedule the next pending request
105 void process(const Time& now);
106};
107
108
109template <typename ...Args>
110AsyncScheduler::AsyncScheduler(CephContext *cct, boost::asio::io_context& context,
111 GetClientCounters&& counters,
112 md_config_obs_t *observer, Args&& ...args)
113 : queue(std::forward<Args>(args)...),
114 timer(context), cct(cct), observer(observer),
115 counters(std::move(counters)),
116 max_requests(cct->_conf.get_val<int64_t>("rgw_max_concurrent_requests"))
117{
118 if (max_requests <= 0) {
119 max_requests = std::numeric_limits<int64_t>::max();
120 }
121 if (observer) {
122 cct->_conf.add_observer(this);
123 }
124}
125
126template <typename CompletionToken>
127auto AsyncScheduler::async_request(const client_id& client,
128 const ReqParams& params,
129 const Time& time, Cost cost,
130 CompletionToken&& token)
131{
132 boost::asio::async_completion<CompletionToken, Signature> init(token);
133
134 auto ex1 = get_executor();
135 auto& handler = init.completion_handler;
136
137 // allocate the Request and add it to the queue
138 auto completion = Completion::create(ex1, std::move(handler),
139 Request{client, time, cost});
140 // cast to unique_ptr<Request>
141 auto req = RequestRef{std::move(completion)};
142 int r = queue.add_request(std::move(req), client, params, time, cost);
143 if (r == 0) {
144 // schedule an immediate call to process() on the executor
145 schedule(crimson::dmclock::TimeZero);
146 if (auto c = counters(client)) {
147 c->inc(queue_counters::l_qlen);
148 c->inc(queue_counters::l_cost, cost);
149 }
150 } else {
151 // post the error code
152 boost::system::error_code ec(r, boost::system::system_category());
153 // cast back to Completion
154 auto completion = static_cast<Completion*>(req.release());
155 async::post(std::unique_ptr<Completion>{completion},
156 ec, PhaseType::priority);
157 if (auto c = counters(client)) {
158 c->inc(queue_counters::l_limit);
159 c->inc(queue_counters::l_limit_cost, cost);
160 }
161 }
162
163 return init.result.get();
164}
165
166class SimpleThrottler : public md_config_obs_t, public dmclock::Scheduler {
167public:
168 SimpleThrottler(CephContext *cct) :
169 max_requests(cct->_conf.get_val<int64_t>("rgw_max_concurrent_requests")),
170 counters(cct, "simple-throttler")
171 {
172 if (max_requests <= 0) {
173 max_requests = std::numeric_limits<int64_t>::max();
174 }
175 cct->_conf.add_observer(this);
176 }
177
178 const char** get_tracked_conf_keys() const override {
179 static const char* keys[] = { "rgw_max_concurrent_requests", nullptr };
180 return keys;
181 }
182
183 void handle_conf_change(const ConfigProxy& conf,
184 const std::set<std::string>& changed) override
185 {
186 if (changed.count("rgw_max_concurrent_requests")) {
187 auto new_max = conf.get_val<int64_t>("rgw_max_concurrent_requests");
188 max_requests = new_max > 0 ? new_max : std::numeric_limits<int64_t>::max();
189 }
190 }
191
192 void request_complete() override {
193 --outstanding_requests;
194 }
195
196private:
197 int schedule_request_impl(const client_id&, const ReqParams&,
198 const Time&, const Cost&,
199 optional_yield) override {
200 if (outstanding_requests++ >= max_requests) {
201 if (auto c = counters();
202 c != nullptr) {
203 c->inc(throttle_counters::l_throttle);
204 }
205 return -EAGAIN;
206 }
207
208 return 0 ;
209 }
210
211 std::atomic<int64_t> max_requests;
212 std::atomic<int64_t> outstanding_requests = 0;
213 ThrottleCounters counters;
214};
215
216} // namespace rgw::dmclock
217#endif /* RGW_DMCLOCK_ASYNC_SCHEDULER_H */