]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_process.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / rgw_process.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "common/errno.h"
5 #include "common/Throttle.h"
6 #include "common/WorkQueue.h"
7 #include "include/scope_guard.h"
8
9 #include <utility>
10 #include "rgw_auth_registry.h"
11 #include "rgw_dmclock_scheduler.h"
12 #include "rgw_rest.h"
13 #include "rgw_frontend.h"
14 #include "rgw_request.h"
15 #include "rgw_process.h"
16 #include "rgw_loadgen.h"
17 #include "rgw_client_io.h"
18 #include "rgw_opa.h"
19 #include "rgw_perf_counters.h"
20 #include "rgw_lua.h"
21 #include "rgw_lua_request.h"
22 #include "rgw_tracer.h"
23 #include "rgw_ratelimit.h"
24
25 #include "services/svc_zone_utils.h"
26
27 #define dout_subsys ceph_subsys_rgw
28
29 using namespace std;
30 using rgw::dmclock::Scheduler;
31
32 void RGWProcess::RGWWQ::_dump_queue()
33 {
34 if (!g_conf()->subsys.should_gather<ceph_subsys_rgw, 20>()) {
35 return;
36 }
37 deque<RGWRequest *>::iterator iter;
38 if (process->m_req_queue.empty()) {
39 dout(20) << "RGWWQ: empty" << dendl;
40 return;
41 }
42 dout(20) << "RGWWQ:" << dendl;
43 for (iter = process->m_req_queue.begin();
44 iter != process->m_req_queue.end(); ++iter) {
45 dout(20) << "req: " << hex << *iter << dec << dendl;
46 }
47 } /* RGWProcess::RGWWQ::_dump_queue */
48
49 auto schedule_request(Scheduler *scheduler, req_state *s, RGWOp *op)
50 {
51 using rgw::dmclock::SchedulerCompleter;
52 if (!scheduler)
53 return std::make_pair(0,SchedulerCompleter{});
54
55 const auto client = op->dmclock_client();
56 const auto cost = op->dmclock_cost();
57 if (s->cct->_conf->subsys.should_gather(ceph_subsys_rgw, 10)) {
58 ldpp_dout(op,10) << "scheduling with "
59 << s->cct->_conf.get_val<std::string>("rgw_scheduler_type")
60 << " client=" << static_cast<int>(client)
61 << " cost=" << cost << dendl;
62 }
63 return scheduler->schedule_request(client, {},
64 req_state::Clock::to_double(s->time),
65 cost,
66 s->yield);
67 }
68
69 bool RGWProcess::RGWWQ::_enqueue(RGWRequest* req) {
70 process->m_req_queue.push_back(req);
71 perfcounter->inc(l_rgw_qlen);
72 dout(20) << "enqueued request req=" << hex << req << dec << dendl;
73 _dump_queue();
74 return true;
75 }
76
77 RGWRequest* RGWProcess::RGWWQ::_dequeue() {
78 if (process->m_req_queue.empty())
79 return NULL;
80 RGWRequest *req = process->m_req_queue.front();
81 process->m_req_queue.pop_front();
82 dout(20) << "dequeued request req=" << hex << req << dec << dendl;
83 _dump_queue();
84 perfcounter->inc(l_rgw_qlen, -1);
85 return req;
86 }
87
88 void RGWProcess::RGWWQ::_process(RGWRequest *req, ThreadPool::TPHandle &) {
89 perfcounter->inc(l_rgw_qactive);
90 process->handle_request(this, req);
91 process->req_throttle.put(1);
92 perfcounter->inc(l_rgw_qactive, -1);
93 }
94 bool rate_limit(rgw::sal::Driver* driver, req_state* s) {
95 // we dont want to limit health check or system or admin requests
96 const auto& is_admin_or_system = s->user->get_info();
97 if ((s->op_type == RGW_OP_GET_HEALTH_CHECK) || is_admin_or_system.admin || is_admin_or_system.system)
98 return false;
99 std::string userfind;
100 RGWRateLimitInfo global_user;
101 RGWRateLimitInfo global_bucket;
102 RGWRateLimitInfo global_anon;
103 RGWRateLimitInfo* bucket_ratelimit;
104 RGWRateLimitInfo* user_ratelimit;
105 driver->get_ratelimit(global_bucket, global_user, global_anon);
106 bucket_ratelimit = &global_bucket;
107 user_ratelimit = &global_user;
108 s->user->get_id().to_str(userfind);
109 userfind = "u" + userfind;
110 s->ratelimit_user_name = userfind;
111 std::string bucketfind = !rgw::sal::Bucket::empty(s->bucket.get()) ? "b" + s->bucket->get_marker() : "";
112 s->ratelimit_bucket_marker = bucketfind;
113 const char *method = s->info.method;
114
115 auto iter = s->user->get_attrs().find(RGW_ATTR_RATELIMIT);
116 if(iter != s->user->get_attrs().end()) {
117 try {
118 RGWRateLimitInfo user_ratelimit_temp;
119 bufferlist& bl = iter->second;
120 auto biter = bl.cbegin();
121 decode(user_ratelimit_temp, biter);
122 // override global rate limiting only if local rate limiting is enabled
123 if (user_ratelimit_temp.enabled)
124 *user_ratelimit = user_ratelimit_temp;
125 } catch (buffer::error& err) {
126 ldpp_dout(s, 0) << "ERROR: failed to decode rate limit" << dendl;
127 return -EIO;
128 }
129 }
130 if (s->user->get_id().id == RGW_USER_ANON_ID && global_anon.enabled) {
131 *user_ratelimit = global_anon;
132 }
133 bool limit_bucket = false;
134 bool limit_user = s->ratelimit_data->should_rate_limit(method, s->ratelimit_user_name, s->time, user_ratelimit);
135
136 if(!rgw::sal::Bucket::empty(s->bucket.get()))
137 {
138 iter = s->bucket->get_attrs().find(RGW_ATTR_RATELIMIT);
139 if(iter != s->bucket->get_attrs().end()) {
140 try {
141 RGWRateLimitInfo bucket_ratelimit_temp;
142 bufferlist& bl = iter->second;
143 auto biter = bl.cbegin();
144 decode(bucket_ratelimit_temp, biter);
145 // override global rate limiting only if local rate limiting is enabled
146 if (bucket_ratelimit_temp.enabled)
147 *bucket_ratelimit = bucket_ratelimit_temp;
148 } catch (buffer::error& err) {
149 ldpp_dout(s, 0) << "ERROR: failed to decode rate limit" << dendl;
150 return -EIO;
151 }
152 }
153 if (!limit_user) {
154 limit_bucket = s->ratelimit_data->should_rate_limit(method, s->ratelimit_bucket_marker, s->time, bucket_ratelimit);
155 }
156 }
157 if(limit_bucket && !limit_user) {
158 s->ratelimit_data->giveback_tokens(method, s->ratelimit_user_name);
159 }
160 s->user_ratelimit = *user_ratelimit;
161 s->bucket_ratelimit = *bucket_ratelimit;
162 return (limit_user || limit_bucket);
163 }
164
165 int rgw_process_authenticated(RGWHandler_REST * const handler,
166 RGWOp *& op,
167 RGWRequest * const req,
168 req_state * const s,
169 optional_yield y,
170 rgw::sal::Driver* driver,
171 const bool skip_retarget)
172 {
173 ldpp_dout(op, 2) << "init permissions" << dendl;
174 int ret = handler->init_permissions(op, y);
175 if (ret < 0) {
176 return ret;
177 }
178
179 /**
180 * Only some accesses support website mode, and website mode does NOT apply
181 * if you are using the REST endpoint either (ergo, no authenticated access)
182 */
183 if (! skip_retarget) {
184 ldpp_dout(op, 2) << "recalculating target" << dendl;
185 ret = handler->retarget(op, &op, y);
186 if (ret < 0) {
187 return ret;
188 }
189 req->op = op;
190 } else {
191 ldpp_dout(op, 2) << "retargeting skipped because of SubOp mode" << dendl;
192 }
193
194 /* If necessary extract object ACL and put them into req_state. */
195 ldpp_dout(op, 2) << "reading permissions" << dendl;
196 ret = handler->read_permissions(op, y);
197 if (ret < 0) {
198 return ret;
199 }
200
201 ldpp_dout(op, 2) << "init op" << dendl;
202 ret = op->init_processing(y);
203 if (ret < 0) {
204 return ret;
205 }
206
207 ldpp_dout(op, 2) << "verifying op mask" << dendl;
208 ret = op->verify_op_mask();
209 if (ret < 0) {
210 return ret;
211 }
212
213 /* Check if OPA is used to authorize requests */
214 if (s->cct->_conf->rgw_use_opa_authz) {
215 ret = rgw_opa_authorize(op, s);
216 if (ret < 0) {
217 return ret;
218 }
219 }
220
221 ldpp_dout(op, 2) << "verifying op permissions" << dendl;
222 {
223 auto span = tracing::rgw::tracer.add_span("verify_permission", s->trace);
224 std::swap(span, s->trace);
225 ret = op->verify_permission(y);
226 std::swap(span, s->trace);
227 }
228 if (ret < 0) {
229 if (s->system_request) {
230 dout(2) << "overriding permissions due to system operation" << dendl;
231 } else if (s->auth.identity->is_admin_of(s->user->get_id())) {
232 dout(2) << "overriding permissions due to admin operation" << dendl;
233 } else {
234 return ret;
235 }
236 }
237
238 ldpp_dout(op, 2) << "verifying op params" << dendl;
239 ret = op->verify_params();
240 if (ret < 0) {
241 return ret;
242 }
243
244 ldpp_dout(op, 2) << "pre-executing" << dendl;
245 op->pre_exec();
246
247 ldpp_dout(op, 2) << "check rate limiting" << dendl;
248 if (rate_limit(driver, s)) {
249 return -ERR_RATE_LIMITED;
250 }
251 ldpp_dout(op, 2) << "executing" << dendl;
252 {
253 auto span = tracing::rgw::tracer.add_span("execute", s->trace);
254 std::swap(span, s->trace);
255 op->execute(y);
256 std::swap(span, s->trace);
257 }
258
259 ldpp_dout(op, 2) << "completing" << dendl;
260 op->complete();
261
262 return 0;
263 }
264
265 int process_request(const RGWProcessEnv& penv,
266 RGWRequest* const req,
267 const std::string& frontend_prefix,
268 RGWRestfulIO* const client_io,
269 optional_yield yield,
270 rgw::dmclock::Scheduler *scheduler,
271 string* user,
272 ceph::coarse_real_clock::duration* latency,
273 int* http_ret)
274 {
275 int ret = client_io->init(g_ceph_context);
276 dout(1) << "====== starting new request req=" << hex << req << dec
277 << " =====" << dendl;
278 perfcounter->inc(l_rgw_req);
279
280 RGWEnv& rgw_env = client_io->get_env();
281
282 req_state rstate(g_ceph_context, penv, &rgw_env, req->id);
283 req_state *s = &rstate;
284
285 s->ratelimit_data = penv.ratelimiting->get_active();
286
287 rgw::sal::Driver* driver = penv.driver;
288 std::unique_ptr<rgw::sal::User> u = driver->get_user(rgw_user());
289 s->set_user(u);
290
291 if (ret < 0) {
292 s->cio = client_io;
293 abort_early(s, nullptr, ret, nullptr, yield);
294 return ret;
295 }
296
297 s->req_id = driver->zone_unique_id(req->id);
298 s->trans_id = driver->zone_unique_trans_id(req->id);
299 s->host_id = driver->get_host_id();
300 s->yield = yield;
301
302 ldpp_dout(s, 2) << "initializing for trans_id = " << s->trans_id << dendl;
303
304 RGWOp* op = nullptr;
305 int init_error = 0;
306 bool should_log = false;
307 RGWREST* rest = penv.rest;
308 RGWRESTMgr *mgr;
309 RGWHandler_REST *handler = rest->get_handler(driver, s,
310 *penv.auth_registry,
311 frontend_prefix,
312 client_io, &mgr, &init_error);
313 rgw::dmclock::SchedulerCompleter c;
314
315 if (init_error != 0) {
316 abort_early(s, nullptr, init_error, nullptr, yield);
317 goto done;
318 }
319 ldpp_dout(s, 10) << "handler=" << typeid(*handler).name() << dendl;
320
321 should_log = mgr->get_logging();
322
323 ldpp_dout(s, 2) << "getting op " << s->op << dendl;
324 op = handler->get_op();
325 if (!op) {
326 abort_early(s, NULL, -ERR_METHOD_NOT_ALLOWED, handler, yield);
327 goto done;
328 }
329 {
330 s->trace_enabled = tracing::rgw::tracer.is_enabled();
331 std::string script;
332 auto rc = rgw::lua::read_script(s, penv.lua.manager.get(), s->bucket_tenant, s->yield, rgw::lua::context::preRequest, script);
333 if (rc == -ENOENT) {
334 // no script, nothing to do
335 } else if (rc < 0) {
336 ldpp_dout(op, 5) << "WARNING: failed to read pre request script. error: " << rc << dendl;
337 } else {
338 rc = rgw::lua::request::execute(driver, rest, penv.olog, s, op, script);
339 if (rc < 0) {
340 ldpp_dout(op, 5) << "WARNING: failed to execute pre request script. error: " << rc << dendl;
341 }
342 }
343 }
344 std::tie(ret,c) = schedule_request(scheduler, s, op);
345 if (ret < 0) {
346 if (ret == -EAGAIN) {
347 ret = -ERR_RATE_LIMITED;
348 }
349 ldpp_dout(op,0) << "Scheduling request failed with " << ret << dendl;
350 abort_early(s, op, ret, handler, yield);
351 goto done;
352 }
353 req->op = op;
354 ldpp_dout(op, 10) << "op=" << typeid(*op).name() << dendl;
355 s->op_type = op->get_type();
356
357 try {
358 ldpp_dout(op, 2) << "verifying requester" << dendl;
359 ret = op->verify_requester(*penv.auth_registry, yield);
360 if (ret < 0) {
361 dout(10) << "failed to authorize request" << dendl;
362 abort_early(s, op, ret, handler, yield);
363 goto done;
364 }
365
366 /* FIXME: remove this after switching all handlers to the new authentication
367 * infrastructure. */
368 if (nullptr == s->auth.identity) {
369 s->auth.identity = rgw::auth::transform_old_authinfo(s);
370 }
371
372 ldpp_dout(op, 2) << "normalizing buckets and tenants" << dendl;
373 ret = handler->postauth_init(yield);
374 if (ret < 0) {
375 dout(10) << "failed to run post-auth init" << dendl;
376 abort_early(s, op, ret, handler, yield);
377 goto done;
378 }
379
380 if (s->user->get_info().suspended) {
381 dout(10) << "user is suspended, uid=" << s->user->get_id() << dendl;
382 abort_early(s, op, -ERR_USER_SUSPENDED, handler, yield);
383 goto done;
384 }
385
386
387 const auto trace_name = std::string(op->name()) + " " + s->trans_id;
388 s->trace = tracing::rgw::tracer.start_trace(trace_name, s->trace_enabled);
389 s->trace->SetAttribute(tracing::rgw::OP, op->name());
390 s->trace->SetAttribute(tracing::rgw::TYPE, tracing::rgw::REQUEST);
391
392 ret = rgw_process_authenticated(handler, op, req, s, yield, driver);
393 if (ret < 0) {
394 abort_early(s, op, ret, handler, yield);
395 goto done;
396 }
397 } catch (const ceph::crypto::DigestException& e) {
398 dout(0) << "authentication failed" << e.what() << dendl;
399 abort_early(s, op, -ERR_INVALID_SECRET_KEY, handler, yield);
400 }
401
402 done:
403 if (op) {
404 if (s->trace) {
405 s->trace->SetAttribute(tracing::rgw::RETURN, op->get_ret());
406 if (!rgw::sal::User::empty(s->user)) {
407 s->trace->SetAttribute(tracing::rgw::USER_ID, s->user->get_id().id);
408 }
409 if (!rgw::sal::Bucket::empty(s->bucket)) {
410 s->trace->SetAttribute(tracing::rgw::BUCKET_NAME, s->bucket->get_name());
411 }
412 if (!rgw::sal::Object::empty(s->object)) {
413 s->trace->SetAttribute(tracing::rgw::OBJECT_NAME, s->object->get_name());
414 }
415 }
416 std::string script;
417 auto rc = rgw::lua::read_script(s, penv.lua.manager.get(), s->bucket_tenant, s->yield, rgw::lua::context::postRequest, script);
418 if (rc == -ENOENT) {
419 // no script, nothing to do
420 } else if (rc < 0) {
421 ldpp_dout(op, 5) << "WARNING: failed to read post request script. error: " << rc << dendl;
422 } else {
423 rc = rgw::lua::request::execute(driver, rest, penv.olog, s, op, script);
424 if (rc < 0) {
425 ldpp_dout(op, 5) << "WARNING: failed to execute post request script. error: " << rc << dendl;
426 }
427 }
428 }
429
430 try {
431 client_io->complete_request();
432 } catch (rgw::io::Exception& e) {
433 dout(0) << "ERROR: client_io->complete_request() returned "
434 << e.what() << dendl;
435 }
436 if (should_log) {
437 rgw_log_op(rest, s, op, penv.olog);
438 }
439
440 if (http_ret != nullptr) {
441 *http_ret = s->err.http_ret;
442 }
443 int op_ret = 0;
444
445 if (user && !rgw::sal::User::empty(s->user.get())) {
446 *user = s->user->get_id().to_str();
447 }
448
449 if (op) {
450 op_ret = op->get_ret();
451 ldpp_dout(op, 2) << "op status=" << op_ret << dendl;
452 ldpp_dout(op, 2) << "http status=" << s->err.http_ret << dendl;
453 } else {
454 ldpp_dout(s, 2) << "http status=" << s->err.http_ret << dendl;
455 }
456 if (handler)
457 handler->put_op(op);
458 rest->put_handler(handler);
459
460 const auto lat = s->time_elapsed();
461 if (latency) {
462 *latency = lat;
463 }
464 dout(1) << "====== req done req=" << hex << req << dec
465 << " op status=" << op_ret
466 << " http_status=" << s->err.http_ret
467 << " latency=" << lat
468 << " ======"
469 << dendl;
470
471 return (ret < 0 ? ret : s->err.ret);
472 } /* process_request */