1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "common/errno.h"
5 #include "common/Throttle.h"
6 #include "common/WorkQueue.h"
7 #include "include/scope_guard.h"
10 #include "rgw_auth_registry.h"
11 #include "rgw_dmclock_scheduler.h"
13 #include "rgw_frontend.h"
14 #include "rgw_request.h"
15 #include "rgw_process.h"
16 #include "rgw_loadgen.h"
17 #include "rgw_client_io.h"
19 #include "rgw_perf_counters.h"
21 #include "rgw_lua_request.h"
22 #include "rgw_tracer.h"
23 #include "rgw_ratelimit.h"
25 #include "services/svc_zone_utils.h"
27 #define dout_subsys ceph_subsys_rgw
30 using rgw::dmclock::Scheduler
;
32 void RGWProcess::RGWWQ::_dump_queue()
34 if (!g_conf()->subsys
.should_gather
<ceph_subsys_rgw
, 20>()) {
37 deque
<RGWRequest
*>::iterator iter
;
38 if (process
->m_req_queue
.empty()) {
39 dout(20) << "RGWWQ: empty" << dendl
;
42 dout(20) << "RGWWQ:" << dendl
;
43 for (iter
= process
->m_req_queue
.begin();
44 iter
!= process
->m_req_queue
.end(); ++iter
) {
45 dout(20) << "req: " << hex
<< *iter
<< dec
<< dendl
;
47 } /* RGWProcess::RGWWQ::_dump_queue */
49 auto schedule_request(Scheduler
*scheduler
, req_state
*s
, RGWOp
*op
)
51 using rgw::dmclock::SchedulerCompleter
;
53 return std::make_pair(0,SchedulerCompleter
{});
55 const auto client
= op
->dmclock_client();
56 const auto cost
= op
->dmclock_cost();
57 if (s
->cct
->_conf
->subsys
.should_gather(ceph_subsys_rgw
, 10)) {
58 ldpp_dout(op
,10) << "scheduling with "
59 << s
->cct
->_conf
.get_val
<std::string
>("rgw_scheduler_type")
60 << " client=" << static_cast<int>(client
)
61 << " cost=" << cost
<< dendl
;
63 return scheduler
->schedule_request(client
, {},
64 req_state::Clock::to_double(s
->time
),
69 bool RGWProcess::RGWWQ::_enqueue(RGWRequest
* req
) {
70 process
->m_req_queue
.push_back(req
);
71 perfcounter
->inc(l_rgw_qlen
);
72 dout(20) << "enqueued request req=" << hex
<< req
<< dec
<< dendl
;
77 RGWRequest
* RGWProcess::RGWWQ::_dequeue() {
78 if (process
->m_req_queue
.empty())
80 RGWRequest
*req
= process
->m_req_queue
.front();
81 process
->m_req_queue
.pop_front();
82 dout(20) << "dequeued request req=" << hex
<< req
<< dec
<< dendl
;
84 perfcounter
->inc(l_rgw_qlen
, -1);
88 void RGWProcess::RGWWQ::_process(RGWRequest
*req
, ThreadPool::TPHandle
&) {
89 perfcounter
->inc(l_rgw_qactive
);
90 process
->handle_request(this, req
);
91 process
->req_throttle
.put(1);
92 perfcounter
->inc(l_rgw_qactive
, -1);
94 bool rate_limit(rgw::sal::Driver
* driver
, req_state
* s
) {
95 // we dont want to limit health check or system or admin requests
96 const auto& is_admin_or_system
= s
->user
->get_info();
97 if ((s
->op_type
== RGW_OP_GET_HEALTH_CHECK
) || is_admin_or_system
.admin
|| is_admin_or_system
.system
)
100 RGWRateLimitInfo global_user
;
101 RGWRateLimitInfo global_bucket
;
102 RGWRateLimitInfo global_anon
;
103 RGWRateLimitInfo
* bucket_ratelimit
;
104 RGWRateLimitInfo
* user_ratelimit
;
105 driver
->get_ratelimit(global_bucket
, global_user
, global_anon
);
106 bucket_ratelimit
= &global_bucket
;
107 user_ratelimit
= &global_user
;
108 s
->user
->get_id().to_str(userfind
);
109 userfind
= "u" + userfind
;
110 s
->ratelimit_user_name
= userfind
;
111 std::string bucketfind
= !rgw::sal::Bucket::empty(s
->bucket
.get()) ? "b" + s
->bucket
->get_marker() : "";
112 s
->ratelimit_bucket_marker
= bucketfind
;
113 const char *method
= s
->info
.method
;
115 auto iter
= s
->user
->get_attrs().find(RGW_ATTR_RATELIMIT
);
116 if(iter
!= s
->user
->get_attrs().end()) {
118 RGWRateLimitInfo user_ratelimit_temp
;
119 bufferlist
& bl
= iter
->second
;
120 auto biter
= bl
.cbegin();
121 decode(user_ratelimit_temp
, biter
);
122 // override global rate limiting only if local rate limiting is enabled
123 if (user_ratelimit_temp
.enabled
)
124 *user_ratelimit
= user_ratelimit_temp
;
125 } catch (buffer::error
& err
) {
126 ldpp_dout(s
, 0) << "ERROR: failed to decode rate limit" << dendl
;
130 if (s
->user
->get_id().id
== RGW_USER_ANON_ID
&& global_anon
.enabled
) {
131 *user_ratelimit
= global_anon
;
133 bool limit_bucket
= false;
134 bool limit_user
= s
->ratelimit_data
->should_rate_limit(method
, s
->ratelimit_user_name
, s
->time
, user_ratelimit
);
136 if(!rgw::sal::Bucket::empty(s
->bucket
.get()))
138 iter
= s
->bucket
->get_attrs().find(RGW_ATTR_RATELIMIT
);
139 if(iter
!= s
->bucket
->get_attrs().end()) {
141 RGWRateLimitInfo bucket_ratelimit_temp
;
142 bufferlist
& bl
= iter
->second
;
143 auto biter
= bl
.cbegin();
144 decode(bucket_ratelimit_temp
, biter
);
145 // override global rate limiting only if local rate limiting is enabled
146 if (bucket_ratelimit_temp
.enabled
)
147 *bucket_ratelimit
= bucket_ratelimit_temp
;
148 } catch (buffer::error
& err
) {
149 ldpp_dout(s
, 0) << "ERROR: failed to decode rate limit" << dendl
;
154 limit_bucket
= s
->ratelimit_data
->should_rate_limit(method
, s
->ratelimit_bucket_marker
, s
->time
, bucket_ratelimit
);
157 if(limit_bucket
&& !limit_user
) {
158 s
->ratelimit_data
->giveback_tokens(method
, s
->ratelimit_user_name
);
160 s
->user_ratelimit
= *user_ratelimit
;
161 s
->bucket_ratelimit
= *bucket_ratelimit
;
162 return (limit_user
|| limit_bucket
);
165 int rgw_process_authenticated(RGWHandler_REST
* const handler
,
167 RGWRequest
* const req
,
170 rgw::sal::Driver
* driver
,
171 const bool skip_retarget
)
173 ldpp_dout(op
, 2) << "init permissions" << dendl
;
174 int ret
= handler
->init_permissions(op
, y
);
180 * Only some accesses support website mode, and website mode does NOT apply
181 * if you are using the REST endpoint either (ergo, no authenticated access)
183 if (! skip_retarget
) {
184 ldpp_dout(op
, 2) << "recalculating target" << dendl
;
185 ret
= handler
->retarget(op
, &op
, y
);
191 ldpp_dout(op
, 2) << "retargeting skipped because of SubOp mode" << dendl
;
194 /* If necessary extract object ACL and put them into req_state. */
195 ldpp_dout(op
, 2) << "reading permissions" << dendl
;
196 ret
= handler
->read_permissions(op
, y
);
201 ldpp_dout(op
, 2) << "init op" << dendl
;
202 ret
= op
->init_processing(y
);
207 ldpp_dout(op
, 2) << "verifying op mask" << dendl
;
208 ret
= op
->verify_op_mask();
213 /* Check if OPA is used to authorize requests */
214 if (s
->cct
->_conf
->rgw_use_opa_authz
) {
215 ret
= rgw_opa_authorize(op
, s
);
221 ldpp_dout(op
, 2) << "verifying op permissions" << dendl
;
223 auto span
= tracing::rgw::tracer
.add_span("verify_permission", s
->trace
);
224 std::swap(span
, s
->trace
);
225 ret
= op
->verify_permission(y
);
226 std::swap(span
, s
->trace
);
229 if (s
->system_request
) {
230 dout(2) << "overriding permissions due to system operation" << dendl
;
231 } else if (s
->auth
.identity
->is_admin_of(s
->user
->get_id())) {
232 dout(2) << "overriding permissions due to admin operation" << dendl
;
238 ldpp_dout(op
, 2) << "verifying op params" << dendl
;
239 ret
= op
->verify_params();
244 ldpp_dout(op
, 2) << "pre-executing" << dendl
;
247 ldpp_dout(op
, 2) << "check rate limiting" << dendl
;
248 if (rate_limit(driver
, s
)) {
249 return -ERR_RATE_LIMITED
;
251 ldpp_dout(op
, 2) << "executing" << dendl
;
253 auto span
= tracing::rgw::tracer
.add_span("execute", s
->trace
);
254 std::swap(span
, s
->trace
);
256 std::swap(span
, s
->trace
);
259 ldpp_dout(op
, 2) << "completing" << dendl
;
265 int process_request(const RGWProcessEnv
& penv
,
266 RGWRequest
* const req
,
267 const std::string
& frontend_prefix
,
268 RGWRestfulIO
* const client_io
,
269 optional_yield yield
,
270 rgw::dmclock::Scheduler
*scheduler
,
272 ceph::coarse_real_clock::duration
* latency
,
275 int ret
= client_io
->init(g_ceph_context
);
276 dout(1) << "====== starting new request req=" << hex
<< req
<< dec
277 << " =====" << dendl
;
278 perfcounter
->inc(l_rgw_req
);
280 RGWEnv
& rgw_env
= client_io
->get_env();
282 req_state
rstate(g_ceph_context
, penv
, &rgw_env
, req
->id
);
283 req_state
*s
= &rstate
;
285 s
->ratelimit_data
= penv
.ratelimiting
->get_active();
287 rgw::sal::Driver
* driver
= penv
.driver
;
288 std::unique_ptr
<rgw::sal::User
> u
= driver
->get_user(rgw_user());
293 abort_early(s
, nullptr, ret
, nullptr, yield
);
297 s
->req_id
= driver
->zone_unique_id(req
->id
);
298 s
->trans_id
= driver
->zone_unique_trans_id(req
->id
);
299 s
->host_id
= driver
->get_host_id();
302 ldpp_dout(s
, 2) << "initializing for trans_id = " << s
->trans_id
<< dendl
;
306 bool should_log
= false;
307 RGWREST
* rest
= penv
.rest
;
309 RGWHandler_REST
*handler
= rest
->get_handler(driver
, s
,
312 client_io
, &mgr
, &init_error
);
313 rgw::dmclock::SchedulerCompleter c
;
315 if (init_error
!= 0) {
316 abort_early(s
, nullptr, init_error
, nullptr, yield
);
319 ldpp_dout(s
, 10) << "handler=" << typeid(*handler
).name() << dendl
;
321 should_log
= mgr
->get_logging();
323 ldpp_dout(s
, 2) << "getting op " << s
->op
<< dendl
;
324 op
= handler
->get_op();
326 abort_early(s
, NULL
, -ERR_METHOD_NOT_ALLOWED
, handler
, yield
);
330 s
->trace_enabled
= tracing::rgw::tracer
.is_enabled();
332 auto rc
= rgw::lua::read_script(s
, penv
.lua
.manager
.get(), s
->bucket_tenant
, s
->yield
, rgw::lua::context::preRequest
, script
);
334 // no script, nothing to do
336 ldpp_dout(op
, 5) << "WARNING: failed to read pre request script. error: " << rc
<< dendl
;
338 rc
= rgw::lua::request::execute(driver
, rest
, penv
.olog
, s
, op
, script
);
340 ldpp_dout(op
, 5) << "WARNING: failed to execute pre request script. error: " << rc
<< dendl
;
344 std::tie(ret
,c
) = schedule_request(scheduler
, s
, op
);
346 if (ret
== -EAGAIN
) {
347 ret
= -ERR_RATE_LIMITED
;
349 ldpp_dout(op
,0) << "Scheduling request failed with " << ret
<< dendl
;
350 abort_early(s
, op
, ret
, handler
, yield
);
354 ldpp_dout(op
, 10) << "op=" << typeid(*op
).name() << dendl
;
355 s
->op_type
= op
->get_type();
358 ldpp_dout(op
, 2) << "verifying requester" << dendl
;
359 ret
= op
->verify_requester(*penv
.auth_registry
, yield
);
361 dout(10) << "failed to authorize request" << dendl
;
362 abort_early(s
, op
, ret
, handler
, yield
);
366 /* FIXME: remove this after switching all handlers to the new authentication
368 if (nullptr == s
->auth
.identity
) {
369 s
->auth
.identity
= rgw::auth::transform_old_authinfo(s
);
372 ldpp_dout(op
, 2) << "normalizing buckets and tenants" << dendl
;
373 ret
= handler
->postauth_init(yield
);
375 dout(10) << "failed to run post-auth init" << dendl
;
376 abort_early(s
, op
, ret
, handler
, yield
);
380 if (s
->user
->get_info().suspended
) {
381 dout(10) << "user is suspended, uid=" << s
->user
->get_id() << dendl
;
382 abort_early(s
, op
, -ERR_USER_SUSPENDED
, handler
, yield
);
387 const auto trace_name
= std::string(op
->name()) + " " + s
->trans_id
;
388 s
->trace
= tracing::rgw::tracer
.start_trace(trace_name
, s
->trace_enabled
);
389 s
->trace
->SetAttribute(tracing::rgw::OP
, op
->name());
390 s
->trace
->SetAttribute(tracing::rgw::TYPE
, tracing::rgw::REQUEST
);
392 ret
= rgw_process_authenticated(handler
, op
, req
, s
, yield
, driver
);
394 abort_early(s
, op
, ret
, handler
, yield
);
397 } catch (const ceph::crypto::DigestException
& e
) {
398 dout(0) << "authentication failed" << e
.what() << dendl
;
399 abort_early(s
, op
, -ERR_INVALID_SECRET_KEY
, handler
, yield
);
405 s
->trace
->SetAttribute(tracing::rgw::RETURN
, op
->get_ret());
406 if (!rgw::sal::User::empty(s
->user
)) {
407 s
->trace
->SetAttribute(tracing::rgw::USER_ID
, s
->user
->get_id().id
);
409 if (!rgw::sal::Bucket::empty(s
->bucket
)) {
410 s
->trace
->SetAttribute(tracing::rgw::BUCKET_NAME
, s
->bucket
->get_name());
412 if (!rgw::sal::Object::empty(s
->object
)) {
413 s
->trace
->SetAttribute(tracing::rgw::OBJECT_NAME
, s
->object
->get_name());
417 auto rc
= rgw::lua::read_script(s
, penv
.lua
.manager
.get(), s
->bucket_tenant
, s
->yield
, rgw::lua::context::postRequest
, script
);
419 // no script, nothing to do
421 ldpp_dout(op
, 5) << "WARNING: failed to read post request script. error: " << rc
<< dendl
;
423 rc
= rgw::lua::request::execute(driver
, rest
, penv
.olog
, s
, op
, script
);
425 ldpp_dout(op
, 5) << "WARNING: failed to execute post request script. error: " << rc
<< dendl
;
431 client_io
->complete_request();
432 } catch (rgw::io::Exception
& e
) {
433 dout(0) << "ERROR: client_io->complete_request() returned "
434 << e
.what() << dendl
;
437 rgw_log_op(rest
, s
, op
, penv
.olog
);
440 if (http_ret
!= nullptr) {
441 *http_ret
= s
->err
.http_ret
;
445 if (user
&& !rgw::sal::User::empty(s
->user
.get())) {
446 *user
= s
->user
->get_id().to_str();
450 op_ret
= op
->get_ret();
451 ldpp_dout(op
, 2) << "op status=" << op_ret
<< dendl
;
452 ldpp_dout(op
, 2) << "http status=" << s
->err
.http_ret
<< dendl
;
454 ldpp_dout(s
, 2) << "http status=" << s
->err
.http_ret
<< dendl
;
458 rest
->put_handler(handler
);
460 const auto lat
= s
->time_elapsed();
464 dout(1) << "====== req done req=" << hex
<< req
<< dec
465 << " op status=" << op_ret
466 << " http_status=" << s
->err
.http_ret
467 << " latency=" << lat
471 return (ret
< 0 ? ret
: s
->err
.ret
);
472 } /* process_request */