]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_dmclock_async_scheduler.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / rgw_dmclock_async_scheduler.h
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
11fdf7f2
TL
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2018 Red Hat, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
1e59de90 15#pragma once
11fdf7f2
TL
16
17#include "common/async/completion.h"
18
19#include <boost/asio.hpp>
20#include "rgw_dmclock_scheduler.h"
21#include "rgw_dmclock_scheduler_ctx.h"
22
23namespace rgw::dmclock {
24 namespace async = ceph::async;
25
26/*
27 * A dmclock request scheduling service for use with boost::asio.
28 *
29 * An asynchronous dmclock priority queue, where scheduled requests complete
30 * on a boost::asio executor.
31 */
32class AsyncScheduler : public md_config_obs_t, public Scheduler {
33 public:
34 template <typename ...Args> // args forwarded to PullPriorityQueue ctor
35 AsyncScheduler(CephContext *cct, boost::asio::io_context& context,
36 GetClientCounters&& counters, md_config_obs_t *observer,
37 Args&& ...args);
38 ~AsyncScheduler();
39
40 using executor_type = boost::asio::io_context::executor_type;
41
42 /// return the default executor for async_request() callbacks
43 executor_type get_executor() noexcept {
44 return timer.get_executor();
45 }
46
47 /// submit an async request for dmclock scheduling. the given completion
48 /// handler will be invoked with (error_code, PhaseType) when the request
49 /// is ready or canceled. on success, this grants a throttle unit that must
50 /// be returned with a call to request_complete()
51 template <typename CompletionToken>
52 auto async_request(const client_id& client, const ReqParams& params,
53 const Time& time, Cost cost, CompletionToken&& token);
54
55 /// returns a throttle unit granted by async_request()
56 void request_complete() override;
57
58 /// cancel all queued requests, invoking their completion handlers with an
59 /// operation_aborted error and default-constructed result
60 void cancel();
61
62 /// cancel all queued requests for a given client, invoking their completion
63 /// handler with an operation_aborted error and default-constructed result
64 void cancel(const client_id& client);
65
66 const char** get_tracked_conf_keys() const override;
67 void handle_conf_change(const ConfigProxy& conf,
68 const std::set<std::string>& changed) override;
69
70 private:
71 int schedule_request_impl(const client_id& client, const ReqParams& params,
72 const Time& time, const Cost& cost,
73 optional_yield yield_ctx) override;
74
75 static constexpr bool IsDelayed = false;
76 using Queue = crimson::dmclock::PullPriorityQueue<client_id, Request, IsDelayed>;
77 using RequestRef = typename Queue::RequestRef;
78 Queue queue; //< dmclock priority queue
79
80 using Signature = void(boost::system::error_code, PhaseType);
81 using Completion = async::Completion<Signature, async::AsBase<Request>>;
82
83 using Clock = ceph::coarse_real_clock;
92f5a8d4
TL
84 using Timer = boost::asio::basic_waitable_timer<Clock,
85 boost::asio::wait_traits<Clock>, executor_type>;
11fdf7f2
TL
86 Timer timer; //< timer for the next scheduled request
87
88 CephContext *const cct;
89 md_config_obs_t *const observer; //< observer to update ClientInfoFunc
90 GetClientCounters counters; //< provides per-client perf counters
91
92 /// max request throttle
93 std::atomic<int64_t> max_requests;
94 std::atomic<int64_t> outstanding_requests = 0;
95
96 /// set a timer to process the next request
97 void schedule(const Time& time);
98
99 /// process ready requests, then schedule the next pending request
100 void process(const Time& now);
101};
102
103
104template <typename ...Args>
105AsyncScheduler::AsyncScheduler(CephContext *cct, boost::asio::io_context& context,
106 GetClientCounters&& counters,
107 md_config_obs_t *observer, Args&& ...args)
108 : queue(std::forward<Args>(args)...),
109 timer(context), cct(cct), observer(observer),
110 counters(std::move(counters)),
111 max_requests(cct->_conf.get_val<int64_t>("rgw_max_concurrent_requests"))
112{
113 if (max_requests <= 0) {
114 max_requests = std::numeric_limits<int64_t>::max();
115 }
116 if (observer) {
117 cct->_conf.add_observer(this);
118 }
119}
120
121template <typename CompletionToken>
122auto AsyncScheduler::async_request(const client_id& client,
123 const ReqParams& params,
124 const Time& time, Cost cost,
125 CompletionToken&& token)
126{
127 boost::asio::async_completion<CompletionToken, Signature> init(token);
128
129 auto ex1 = get_executor();
130 auto& handler = init.completion_handler;
131
132 // allocate the Request and add it to the queue
133 auto completion = Completion::create(ex1, std::move(handler),
134 Request{client, time, cost});
135 // cast to unique_ptr<Request>
136 auto req = RequestRef{std::move(completion)};
137 int r = queue.add_request(std::move(req), client, params, time, cost);
138 if (r == 0) {
139 // schedule an immediate call to process() on the executor
140 schedule(crimson::dmclock::TimeZero);
141 if (auto c = counters(client)) {
142 c->inc(queue_counters::l_qlen);
143 c->inc(queue_counters::l_cost, cost);
144 }
145 } else {
146 // post the error code
147 boost::system::error_code ec(r, boost::system::system_category());
148 // cast back to Completion
149 auto completion = static_cast<Completion*>(req.release());
150 async::post(std::unique_ptr<Completion>{completion},
151 ec, PhaseType::priority);
152 if (auto c = counters(client)) {
153 c->inc(queue_counters::l_limit);
154 c->inc(queue_counters::l_limit_cost, cost);
155 }
156 }
157
158 return init.result.get();
159}
160
161class SimpleThrottler : public md_config_obs_t, public dmclock::Scheduler {
162public:
163 SimpleThrottler(CephContext *cct) :
164 max_requests(cct->_conf.get_val<int64_t>("rgw_max_concurrent_requests")),
165 counters(cct, "simple-throttler")
166 {
167 if (max_requests <= 0) {
168 max_requests = std::numeric_limits<int64_t>::max();
169 }
170 cct->_conf.add_observer(this);
171 }
172
173 const char** get_tracked_conf_keys() const override {
174 static const char* keys[] = { "rgw_max_concurrent_requests", nullptr };
175 return keys;
176 }
177
178 void handle_conf_change(const ConfigProxy& conf,
179 const std::set<std::string>& changed) override
180 {
181 if (changed.count("rgw_max_concurrent_requests")) {
182 auto new_max = conf.get_val<int64_t>("rgw_max_concurrent_requests");
183 max_requests = new_max > 0 ? new_max : std::numeric_limits<int64_t>::max();
184 }
185 }
186
187 void request_complete() override {
188 --outstanding_requests;
f67539c2
TL
189 if (auto c = counters();
190 c != nullptr) {
191 c->inc(throttle_counters::l_outstanding, -1);
192 }
193
11fdf7f2
TL
194 }
195
196private:
197 int schedule_request_impl(const client_id&, const ReqParams&,
198 const Time&, const Cost&,
199 optional_yield) override {
200 if (outstanding_requests++ >= max_requests) {
201 if (auto c = counters();
202 c != nullptr) {
f67539c2 203 c->inc(throttle_counters::l_outstanding);
11fdf7f2
TL
204 c->inc(throttle_counters::l_throttle);
205 }
206 return -EAGAIN;
207 }
208
209 return 0 ;
210 }
211
212 std::atomic<int64_t> max_requests;
213 std::atomic<int64_t> outstanding_requests = 0;
214 ThrottleCounters counters;
215};
216
217} // namespace rgw::dmclock