1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
7 #include "rgw_common.h"
9 #include "rgw_auth_registry.h"
13 #include "rgw_ratelimit.h"
14 #include "include/ceph_assert.h"
16 #include "common/WorkQueue.h"
17 #include "common/Throttle.h"
21 #if !defined(dout_subsys)
22 #define dout_subsys ceph_subsys_rgw
23 #define def_dout_subsys
26 #define dout_context g_ceph_context
28 extern void signal_shutdown();
30 namespace rgw::dmclock
{
34 struct RGWProcessEnv
{
35 rgw::sal::Store
* store
;
39 std::string uri_prefix
;
40 std::shared_ptr
<rgw::auth::StrategyRegistry
> auth_registry
;
41 //maybe there is a better place to store the rate limit data structure
42 ActiveRateLimiter
* ratelimiting
;
45 class RGWFrontendConfig
;
49 std::deque
<RGWRequest
*> m_req_queue
;
52 rgw::sal::Store
* store
;
53 rgw_auth_registry_ptr_t auth_registry
;
56 Throttle req_throttle
;
58 RGWFrontendConfig
* conf
;
60 std::string uri_prefix
;
62 struct RGWWQ
: public DoutPrefixProvider
, public ThreadPool::WorkQueue
<RGWRequest
> {
64 RGWWQ(RGWProcess
* p
, ceph::timespan timeout
, ceph::timespan suicide_timeout
,
66 : ThreadPool::WorkQueue
<RGWRequest
>("RGWWQ", timeout
, suicide_timeout
,
69 bool _enqueue(RGWRequest
* req
) override
;
71 void _dequeue(RGWRequest
* req
) override
{
75 bool _empty() override
{
76 return process
->m_req_queue
.empty();
79 RGWRequest
* _dequeue() override
;
81 using ThreadPool::WorkQueue
<RGWRequest
>::_process
;
83 void _process(RGWRequest
*req
, ThreadPool::TPHandle
&) override
;
87 void _clear() override
{
88 ceph_assert(process
->m_req_queue
.empty());
91 CephContext
*get_cct() const override
{ return process
->cct
; }
92 unsigned get_subsys() const { return ceph_subsys_rgw
; }
93 std::ostream
& gen_prefix(std::ostream
& out
) const { return out
<< "rgw request work queue: ";}
98 RGWProcess(CephContext
* const cct
,
99 RGWProcessEnv
* const pe
,
100 const int num_threads
,
101 RGWFrontendConfig
* const conf
)
104 auth_registry(pe
->auth_registry
),
106 m_tp(cct
, "RGWProcess::m_tp", "tp_rgw_process", num_threads
),
107 req_throttle(cct
, "rgw_ops", num_threads
* 2),
111 uri_prefix(pe
->uri_prefix
),
113 ceph::make_timespan(g_conf()->rgw_op_thread_timeout
),
114 ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout
),
118 virtual ~RGWProcess() = default;
120 virtual void run() = 0;
121 virtual void handle_request(const DoutPrefixProvider
*dpp
, RGWRequest
*req
) = 0;
127 void unpause_with_new_config(rgw::sal::Store
* const store
,
128 rgw_auth_registry_ptr_t auth_registry
) {
130 this->auth_registry
= std::move(auth_registry
);
142 class RGWProcessControlThread
: public Thread
{
143 RGWProcess
*pprocess
;
145 explicit RGWProcessControlThread(RGWProcess
*_pprocess
) : pprocess(_pprocess
) {}
147 void *entry() override
{
153 class RGWLoadGenProcess
: public RGWProcess
{
154 RGWAccessKey access_key
;
156 RGWLoadGenProcess(CephContext
* cct
, RGWProcessEnv
* pe
, int num_threads
,
157 RGWFrontendConfig
* _conf
) :
158 RGWProcess(cct
, pe
, num_threads
, _conf
) {}
161 void handle_request(const DoutPrefixProvider
*dpp
, RGWRequest
* req
) override
;
162 void gen_request(const std::string
& method
, const std::string
& resource
,
163 int content_length
, std::atomic
<bool>* fail_flag
);
165 void set_access_key(RGWAccessKey
& key
) { access_key
= key
; }
167 /* process stream request */
168 extern int process_request(rgw::sal::Store
* store
,
171 const std::string
& frontend_prefix
,
172 const rgw_auth_registry_t
& auth_registry
,
173 RGWRestfulIO
* client_io
,
176 rgw::dmclock::Scheduler
*scheduler
,
178 ceph::coarse_real_clock::duration
* latency
,
179 std::shared_ptr
<RateLimiter
> ratelimit
,
180 int* http_ret
= nullptr);
182 extern int rgw_process_authenticated(RGWHandler_REST
* handler
,
187 rgw::sal::Store
* store
,
188 bool skip_retarget
= false);
190 #if defined(def_dout_subsys)
191 #undef def_dout_subsys
196 #endif /* RGW_PROCESS_H */