1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "common/errno.h"
5 #include "common/Throttle.h"
6 #include "common/WorkQueue.h"
7 #include "include/scope_guard.h"
10 #include "rgw_dmclock_scheduler.h"
12 #include "rgw_frontend.h"
13 #include "rgw_request.h"
14 #include "rgw_process.h"
15 #include "rgw_loadgen.h"
16 #include "rgw_client_io.h"
18 #include "rgw_perf_counters.h"
20 #include "rgw_lua_request.h"
21 #include "rgw_tracer.h"
22 #include "rgw_ratelimit.h"
24 #include "services/svc_zone_utils.h"
26 #define dout_subsys ceph_subsys_rgw
29 using rgw::dmclock::Scheduler
;
31 void RGWProcess::RGWWQ::_dump_queue()
33 if (!g_conf()->subsys
.should_gather
<ceph_subsys_rgw
, 20>()) {
36 deque
<RGWRequest
*>::iterator iter
;
37 if (process
->m_req_queue
.empty()) {
38 dout(20) << "RGWWQ: empty" << dendl
;
41 dout(20) << "RGWWQ:" << dendl
;
42 for (iter
= process
->m_req_queue
.begin();
43 iter
!= process
->m_req_queue
.end(); ++iter
) {
44 dout(20) << "req: " << hex
<< *iter
<< dec
<< dendl
;
46 } /* RGWProcess::RGWWQ::_dump_queue */
48 auto schedule_request(Scheduler
*scheduler
, req_state
*s
, RGWOp
*op
)
50 using rgw::dmclock::SchedulerCompleter
;
52 return std::make_pair(0,SchedulerCompleter
{});
54 const auto client
= op
->dmclock_client();
55 const auto cost
= op
->dmclock_cost();
56 if (s
->cct
->_conf
->subsys
.should_gather(ceph_subsys_rgw
, 10)) {
57 ldpp_dout(op
,10) << "scheduling with "
58 << s
->cct
->_conf
.get_val
<std::string
>("rgw_scheduler_type")
59 << " client=" << static_cast<int>(client
)
60 << " cost=" << cost
<< dendl
;
62 return scheduler
->schedule_request(client
, {},
63 req_state::Clock::to_double(s
->time
),
68 bool RGWProcess::RGWWQ::_enqueue(RGWRequest
* req
) {
69 process
->m_req_queue
.push_back(req
);
70 perfcounter
->inc(l_rgw_qlen
);
71 dout(20) << "enqueued request req=" << hex
<< req
<< dec
<< dendl
;
76 RGWRequest
* RGWProcess::RGWWQ::_dequeue() {
77 if (process
->m_req_queue
.empty())
79 RGWRequest
*req
= process
->m_req_queue
.front();
80 process
->m_req_queue
.pop_front();
81 dout(20) << "dequeued request req=" << hex
<< req
<< dec
<< dendl
;
83 perfcounter
->inc(l_rgw_qlen
, -1);
87 void RGWProcess::RGWWQ::_process(RGWRequest
*req
, ThreadPool::TPHandle
&) {
88 perfcounter
->inc(l_rgw_qactive
);
89 process
->handle_request(this, req
);
90 process
->req_throttle
.put(1);
91 perfcounter
->inc(l_rgw_qactive
, -1);
93 bool rate_limit(rgw::sal::Store
* store
, req_state
* s
) {
94 // we dont want to limit health check or system or admin requests
95 const auto& is_admin_or_system
= s
->user
->get_info();
96 if ((s
->op_type
== RGW_OP_GET_HEALTH_CHECK
) || is_admin_or_system
.admin
|| is_admin_or_system
.system
)
99 RGWRateLimitInfo global_user
;
100 RGWRateLimitInfo global_bucket
;
101 RGWRateLimitInfo global_anon
;
102 RGWRateLimitInfo
* bucket_ratelimit
;
103 RGWRateLimitInfo
* user_ratelimit
;
104 store
->get_ratelimit(global_bucket
, global_user
, global_anon
);
105 bucket_ratelimit
= &global_bucket
;
106 user_ratelimit
= &global_user
;
107 s
->user
->get_id().to_str(userfind
);
108 userfind
= "u" + userfind
;
109 s
->ratelimit_user_name
= userfind
;
110 std::string bucketfind
= !rgw::sal::Bucket::empty(s
->bucket
.get()) ? "b" + s
->bucket
->get_marker() : "";
111 s
->ratelimit_bucket_marker
= bucketfind
;
112 const char *method
= s
->info
.method
;
114 auto iter
= s
->user
->get_attrs().find(RGW_ATTR_RATELIMIT
);
115 if(iter
!= s
->user
->get_attrs().end()) {
117 RGWRateLimitInfo user_ratelimit_temp
;
118 bufferlist
& bl
= iter
->second
;
119 auto biter
= bl
.cbegin();
120 decode(user_ratelimit_temp
, biter
);
121 // override global rate limiting only if local rate limiting is enabled
122 if (user_ratelimit_temp
.enabled
)
123 *user_ratelimit
= user_ratelimit_temp
;
124 } catch (buffer::error
& err
) {
125 ldpp_dout(s
, 0) << "ERROR: failed to decode rate limit" << dendl
;
129 if (s
->user
->get_id().id
== RGW_USER_ANON_ID
&& global_anon
.enabled
) {
130 *user_ratelimit
= global_anon
;
132 bool limit_bucket
= false;
133 bool limit_user
= s
->ratelimit_data
->should_rate_limit(method
, s
->ratelimit_user_name
, s
->time
, user_ratelimit
);
135 if(!rgw::sal::Bucket::empty(s
->bucket
.get()))
137 iter
= s
->bucket
->get_attrs().find(RGW_ATTR_RATELIMIT
);
138 if(iter
!= s
->bucket
->get_attrs().end()) {
140 RGWRateLimitInfo bucket_ratelimit_temp
;
141 bufferlist
& bl
= iter
->second
;
142 auto biter
= bl
.cbegin();
143 decode(bucket_ratelimit_temp
, biter
);
144 // override global rate limiting only if local rate limiting is enabled
145 if (bucket_ratelimit_temp
.enabled
)
146 *bucket_ratelimit
= bucket_ratelimit_temp
;
147 } catch (buffer::error
& err
) {
148 ldpp_dout(s
, 0) << "ERROR: failed to decode rate limit" << dendl
;
153 limit_bucket
= s
->ratelimit_data
->should_rate_limit(method
, s
->ratelimit_bucket_marker
, s
->time
, bucket_ratelimit
);
156 if(limit_bucket
&& !limit_user
) {
157 s
->ratelimit_data
->giveback_tokens(method
, s
->ratelimit_user_name
);
159 s
->user_ratelimit
= *user_ratelimit
;
160 s
->bucket_ratelimit
= *bucket_ratelimit
;
161 return (limit_user
|| limit_bucket
);
164 int rgw_process_authenticated(RGWHandler_REST
* const handler
,
166 RGWRequest
* const req
,
169 rgw::sal::Store
* store
,
170 const bool skip_retarget
)
172 ldpp_dout(op
, 2) << "init permissions" << dendl
;
173 int ret
= handler
->init_permissions(op
, y
);
179 * Only some accesses support website mode, and website mode does NOT apply
180 * if you are using the REST endpoint either (ergo, no authenticated access)
182 if (! skip_retarget
) {
183 ldpp_dout(op
, 2) << "recalculating target" << dendl
;
184 ret
= handler
->retarget(op
, &op
, y
);
190 ldpp_dout(op
, 2) << "retargeting skipped because of SubOp mode" << dendl
;
193 /* If necessary extract object ACL and put them into req_state. */
194 ldpp_dout(op
, 2) << "reading permissions" << dendl
;
195 ret
= handler
->read_permissions(op
, y
);
200 ldpp_dout(op
, 2) << "init op" << dendl
;
201 ret
= op
->init_processing(y
);
206 ldpp_dout(op
, 2) << "verifying op mask" << dendl
;
207 ret
= op
->verify_op_mask();
212 /* Check if OPA is used to authorize requests */
213 if (s
->cct
->_conf
->rgw_use_opa_authz
) {
214 ret
= rgw_opa_authorize(op
, s
);
220 ldpp_dout(op
, 2) << "verifying op permissions" << dendl
;
222 auto span
= tracing::rgw::tracer
.add_span("verify_permission", s
->trace
);
223 std::swap(span
, s
->trace
);
224 ret
= op
->verify_permission(y
);
225 std::swap(span
, s
->trace
);
228 if (s
->system_request
) {
229 dout(2) << "overriding permissions due to system operation" << dendl
;
230 } else if (s
->auth
.identity
->is_admin_of(s
->user
->get_id())) {
231 dout(2) << "overriding permissions due to admin operation" << dendl
;
237 ldpp_dout(op
, 2) << "verifying op params" << dendl
;
238 ret
= op
->verify_params();
243 ldpp_dout(op
, 2) << "pre-executing" << dendl
;
246 ldpp_dout(op
, 2) << "check rate limiting" << dendl
;
247 if (rate_limit(store
, s
)) {
248 return -ERR_RATE_LIMITED
;
250 ldpp_dout(op
, 2) << "executing" << dendl
;
252 auto span
= tracing::rgw::tracer
.add_span("execute", s
->trace
);
253 std::swap(span
, s
->trace
);
255 std::swap(span
, s
->trace
);
258 ldpp_dout(op
, 2) << "completing" << dendl
;
264 int process_request(rgw::sal::Store
* const store
,
266 RGWRequest
* const req
,
267 const std::string
& frontend_prefix
,
268 const rgw_auth_registry_t
& auth_registry
,
269 RGWRestfulIO
* const client_io
,
270 OpsLogSink
* const olog
,
271 optional_yield yield
,
272 rgw::dmclock::Scheduler
*scheduler
,
274 ceph::coarse_real_clock::duration
* latency
,
275 std::shared_ptr
<RateLimiter
> ratelimit
,
278 int ret
= client_io
->init(g_ceph_context
);
280 dout(1) << "====== starting new request req=" << hex
<< req
<< dec
281 << " =====" << dendl
;
282 perfcounter
->inc(l_rgw_req
);
284 RGWEnv
& rgw_env
= client_io
->get_env();
286 struct req_state
rstate(g_ceph_context
, &rgw_env
, req
->id
);
287 struct req_state
*s
= &rstate
;
289 s
->ratelimit_data
= ratelimit
;
290 std::unique_ptr
<rgw::sal::User
> u
= store
->get_user(rgw_user());
293 RGWObjectCtx
rados_ctx(store
, s
);
294 s
->obj_ctx
= &rados_ctx
;
298 abort_early(s
, nullptr, ret
, nullptr, yield
);
302 s
->req_id
= store
->zone_unique_id(req
->id
);
303 s
->trans_id
= store
->zone_unique_trans_id(req
->id
);
304 s
->host_id
= store
->get_host_id();
307 ldpp_dout(s
, 2) << "initializing for trans_id = " << s
->trans_id
<< dendl
;
311 bool should_log
= false;
313 RGWHandler_REST
*handler
= rest
->get_handler(store
, s
,
316 client_io
, &mgr
, &init_error
);
317 rgw::dmclock::SchedulerCompleter c
;
319 if (init_error
!= 0) {
320 abort_early(s
, nullptr, init_error
, nullptr, yield
);
323 ldpp_dout(s
, 10) << "handler=" << typeid(*handler
).name() << dendl
;
325 should_log
= mgr
->get_logging();
327 ldpp_dout(s
, 2) << "getting op " << s
->op
<< dendl
;
328 op
= handler
->get_op();
330 abort_early(s
, NULL
, -ERR_METHOD_NOT_ALLOWED
, handler
, yield
);
335 auto rc
= rgw::lua::read_script(s
, store
, s
->bucket_tenant
, s
->yield
, rgw::lua::context::preRequest
, script
);
337 // no script, nothing to do
339 ldpp_dout(op
, 5) << "WARNING: failed to read pre request script. error: " << rc
<< dendl
;
341 rc
= rgw::lua::request::execute(store
, rest
, olog
, s
, op
->name(), script
);
343 ldpp_dout(op
, 5) << "WARNING: failed to execute pre request script. error: " << rc
<< dendl
;
347 std::tie(ret
,c
) = schedule_request(scheduler
, s
, op
);
349 if (ret
== -EAGAIN
) {
350 ret
= -ERR_RATE_LIMITED
;
352 ldpp_dout(op
,0) << "Scheduling request failed with " << ret
<< dendl
;
353 abort_early(s
, op
, ret
, handler
, yield
);
357 ldpp_dout(op
, 10) << "op=" << typeid(*op
).name() << dendl
;
358 s
->op_type
= op
->get_type();
361 ldpp_dout(op
, 2) << "verifying requester" << dendl
;
362 ret
= op
->verify_requester(auth_registry
, yield
);
364 dout(10) << "failed to authorize request" << dendl
;
365 abort_early(s
, op
, ret
, handler
, yield
);
369 /* FIXME: remove this after switching all handlers to the new authentication
371 if (nullptr == s
->auth
.identity
) {
372 s
->auth
.identity
= rgw::auth::transform_old_authinfo(s
);
375 ldpp_dout(op
, 2) << "normalizing buckets and tenants" << dendl
;
376 ret
= handler
->postauth_init(yield
);
378 dout(10) << "failed to run post-auth init" << dendl
;
379 abort_early(s
, op
, ret
, handler
, yield
);
383 if (s
->user
->get_info().suspended
) {
384 dout(10) << "user is suspended, uid=" << s
->user
->get_id() << dendl
;
385 abort_early(s
, op
, -ERR_USER_SUSPENDED
, handler
, yield
);
389 const auto trace_name
= std::string(op
->name()) + " " + s
->trans_id
;
390 s
->trace
= tracing::rgw::tracer
.start_trace(trace_name
);
391 s
->trace
->SetAttribute(tracing::rgw::OP
, op
->name());
392 s
->trace
->SetAttribute(tracing::rgw::TYPE
, tracing::rgw::REQUEST
);
394 ret
= rgw_process_authenticated(handler
, op
, req
, s
, yield
, store
);
396 abort_early(s
, op
, ret
, handler
, yield
);
399 } catch (const ceph::crypto::DigestException
& e
) {
400 dout(0) << "authentication failed" << e
.what() << dendl
;
401 abort_early(s
, op
, -ERR_INVALID_SECRET_KEY
, handler
, yield
);
406 s
->trace
->SetAttribute(tracing::rgw::RETURN
, op
->get_ret());
408 s
->trace
->SetAttribute(tracing::rgw::USER_ID
, s
->user
->get_id().id
);
411 s
->trace
->SetAttribute(tracing::rgw::BUCKET_NAME
, s
->bucket
->get_name());
414 s
->trace
->SetAttribute(tracing::rgw::OBJECT_NAME
, s
->object
->get_name());
417 auto rc
= rgw::lua::read_script(s
, store
, s
->bucket_tenant
, s
->yield
, rgw::lua::context::postRequest
, script
);
419 // no script, nothing to do
421 ldpp_dout(op
, 5) << "WARNING: failed to read post request script. error: " << rc
<< dendl
;
423 rc
= rgw::lua::request::execute(store
, rest
, olog
, s
, op
->name(), script
);
425 ldpp_dout(op
, 5) << "WARNING: failed to execute post request script. error: " << rc
<< dendl
;
431 client_io
->complete_request();
432 } catch (rgw::io::Exception
& e
) {
433 dout(0) << "ERROR: client_io->complete_request() returned "
434 << e
.what() << dendl
;
437 rgw_log_op(rest
, s
, (op
? op
->name() : "unknown"), olog
);
440 if (http_ret
!= nullptr) {
441 *http_ret
= s
->err
.http_ret
;
445 if (user
&& !rgw::sal::User::empty(s
->user
.get())) {
446 *user
= s
->user
->get_id().to_str();
450 op_ret
= op
->get_ret();
451 ldpp_dout(op
, 2) << "op status=" << op_ret
<< dendl
;
452 ldpp_dout(op
, 2) << "http status=" << s
->err
.http_ret
<< dendl
;
454 ldpp_dout(s
, 2) << "http status=" << s
->err
.http_ret
<< dendl
;
458 rest
->put_handler(handler
);
460 const auto lat
= s
->time_elapsed();
464 dout(1) << "====== req done req=" << hex
<< req
<< dec
465 << " op status=" << op_ret
466 << " http_status=" << s
->err
.http_ret
467 << " latency=" << lat
471 return (ret
< 0 ? ret
: s
->err
.ret
);
472 } /* process_request */