1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
7 #include "rgw_common.h"
9 #include "rgw_auth_registry.h"
14 #include "include/ceph_assert.h"
16 #include "common/WorkQueue.h"
17 #include "common/Throttle.h"
21 #if !defined(dout_subsys)
22 #define dout_subsys ceph_subsys_rgw
23 #define def_dout_subsys
26 #define dout_context g_ceph_context
28 extern void signal_shutdown();
30 namespace rgw::dmclock
{
34 struct RGWProcessEnv
{
35 rgw::sal::RGWRadosStore
*store
;
39 std::string uri_prefix
;
40 std::shared_ptr
<rgw::auth::StrategyRegistry
> auth_registry
;
43 class RGWFrontendConfig
;
47 deque
<RGWRequest
*> m_req_queue
;
50 rgw::sal::RGWRadosStore
* store
;
51 rgw_auth_registry_ptr_t auth_registry
;
54 Throttle req_throttle
;
56 RGWFrontendConfig
* conf
;
58 std::string uri_prefix
;
60 struct RGWWQ
: public ThreadPool::WorkQueue
<RGWRequest
> {
62 RGWWQ(RGWProcess
* p
, ceph::timespan timeout
, ceph::timespan suicide_timeout
,
64 : ThreadPool::WorkQueue
<RGWRequest
>("RGWWQ", timeout
, suicide_timeout
,
67 bool _enqueue(RGWRequest
* req
) override
;
69 void _dequeue(RGWRequest
* req
) override
{
73 bool _empty() override
{
74 return process
->m_req_queue
.empty();
77 RGWRequest
* _dequeue() override
;
79 using ThreadPool::WorkQueue
<RGWRequest
>::_process
;
81 void _process(RGWRequest
*req
, ThreadPool::TPHandle
&) override
;
85 void _clear() override
{
86 ceph_assert(process
->m_req_queue
.empty());
91 RGWProcess(CephContext
* const cct
,
92 RGWProcessEnv
* const pe
,
93 const int num_threads
,
94 RGWFrontendConfig
* const conf
)
97 auth_registry(pe
->auth_registry
),
99 m_tp(cct
, "RGWProcess::m_tp", "tp_rgw_process", num_threads
),
100 req_throttle(cct
, "rgw_ops", num_threads
* 2),
104 uri_prefix(pe
->uri_prefix
),
106 ceph::make_timespan(g_conf()->rgw_op_thread_timeout
),
107 ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout
),
111 virtual ~RGWProcess() = default;
113 virtual void run() = 0;
114 virtual void handle_request(RGWRequest
*req
) = 0;
120 void unpause_with_new_config(rgw::sal::RGWRadosStore
* const store
,
121 rgw_auth_registry_ptr_t auth_registry
) {
123 this->auth_registry
= std::move(auth_registry
);
135 class RGWFCGXProcess
: public RGWProcess
{
139 /* have a bit more connections than threads so that requests are
140 * still accepted even if we're still processing older requests */
141 RGWFCGXProcess(CephContext
* const cct
,
142 RGWProcessEnv
* const pe
,
143 const int num_threads
,
144 RGWFrontendConfig
* const conf
)
145 : RGWProcess(cct
, pe
, num_threads
, conf
),
146 max_connections(num_threads
+ (num_threads
>> 3)) {
150 void handle_request(RGWRequest
* req
) override
;
153 class RGWProcessControlThread
: public Thread
{
154 RGWProcess
*pprocess
;
156 explicit RGWProcessControlThread(RGWProcess
*_pprocess
) : pprocess(_pprocess
) {}
158 void *entry() override
{
164 class RGWLoadGenProcess
: public RGWProcess
{
165 RGWAccessKey access_key
;
167 RGWLoadGenProcess(CephContext
* cct
, RGWProcessEnv
* pe
, int num_threads
,
168 RGWFrontendConfig
* _conf
) :
169 RGWProcess(cct
, pe
, num_threads
, _conf
) {}
172 void handle_request(RGWRequest
* req
) override
;
173 void gen_request(const string
& method
, const string
& resource
,
174 int content_length
, std::atomic
<bool>* fail_flag
);
176 void set_access_key(RGWAccessKey
& key
) { access_key
= key
; }
178 /* process stream request */
179 extern int process_request(rgw::sal::RGWRadosStore
* store
,
182 const std::string
& frontend_prefix
,
183 const rgw_auth_registry_t
& auth_registry
,
184 RGWRestfulIO
* client_io
,
187 rgw::dmclock::Scheduler
*scheduler
,
189 ceph::coarse_real_clock::duration
* latency
,
190 int* http_ret
= nullptr);
192 extern int rgw_process_authenticated(RGWHandler_REST
* handler
,
197 bool skip_retarget
= false);
199 #if defined(def_dout_subsys)
200 #undef def_dout_subsys
205 #endif /* RGW_PROCESS_H */