1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
6 #include "rgw_common.h"
10 #include "include/ceph_assert.h"
12 #include "common/WorkQueue.h"
13 #include "common/Throttle.h"
17 #define dout_context g_ceph_context
20 namespace rgw::dmclock
{
25 class RGWFrontendConfig
;
29 std::deque
<RGWRequest
*> m_req_queue
;
34 Throttle req_throttle
;
35 RGWFrontendConfig
* conf
;
37 std::string uri_prefix
;
39 struct RGWWQ
: public DoutPrefixProvider
, public ThreadPool::WorkQueue
<RGWRequest
> {
41 RGWWQ(RGWProcess
* p
, ceph::timespan timeout
, ceph::timespan suicide_timeout
,
43 : ThreadPool::WorkQueue
<RGWRequest
>("RGWWQ", timeout
, suicide_timeout
,
46 bool _enqueue(RGWRequest
* req
) override
;
48 void _dequeue(RGWRequest
* req
) override
{
52 bool _empty() override
{
53 return process
->m_req_queue
.empty();
56 RGWRequest
* _dequeue() override
;
58 using ThreadPool::WorkQueue
<RGWRequest
>::_process
;
60 void _process(RGWRequest
*req
, ThreadPool::TPHandle
&) override
;
64 void _clear() override
{
65 ceph_assert(process
->m_req_queue
.empty());
68 CephContext
*get_cct() const override
{ return process
->cct
; }
69 unsigned get_subsys() const { return ceph_subsys_rgw
; }
70 std::ostream
& gen_prefix(std::ostream
& out
) const { return out
<< "rgw request work queue: ";}
75 RGWProcess(CephContext
* const cct
,
77 const int num_threads
,
78 std::string uri_prefix
,
79 RGWFrontendConfig
* const conf
)
81 m_tp(cct
, "RGWProcess::m_tp", "tp_rgw_process", num_threads
),
82 req_throttle(cct
, "rgw_ops", num_threads
* 2),
85 uri_prefix(std::move(uri_prefix
)),
87 ceph::make_timespan(g_conf()->rgw_op_thread_timeout
),
88 ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout
),
92 virtual ~RGWProcess() = default;
94 const RGWProcessEnv
& get_env() const { return env
; }
96 virtual void run() = 0;
97 virtual void handle_request(const DoutPrefixProvider
*dpp
, RGWRequest
*req
) = 0;
103 void unpause_with_new_config() {
115 class RGWProcessControlThread
: public Thread
{
116 RGWProcess
*pprocess
;
118 explicit RGWProcessControlThread(RGWProcess
*_pprocess
) : pprocess(_pprocess
) {}
120 void *entry() override
{
126 class RGWLoadGenProcess
: public RGWProcess
{
127 RGWAccessKey access_key
;
129 RGWLoadGenProcess(CephContext
* cct
, RGWProcessEnv
& env
, int num_threads
,
130 std::string uri_prefix
, RGWFrontendConfig
* _conf
)
131 : RGWProcess(cct
, env
, num_threads
, std::move(uri_prefix
), _conf
) {}
134 void handle_request(const DoutPrefixProvider
*dpp
, RGWRequest
* req
) override
;
135 void gen_request(const std::string
& method
, const std::string
& resource
,
136 int content_length
, std::atomic
<bool>* fail_flag
);
138 void set_access_key(RGWAccessKey
& key
) { access_key
= key
; }
140 /* process stream request */
141 extern int process_request(const RGWProcessEnv
& penv
,
143 const std::string
& frontend_prefix
,
144 RGWRestfulIO
* client_io
,
146 rgw::dmclock::Scheduler
*scheduler
,
148 ceph::coarse_real_clock::duration
* latency
,
149 int* http_ret
= nullptr);
151 extern int rgw_process_authenticated(RGWHandler_REST
* handler
,
156 rgw::sal::Driver
* driver
,
157 bool skip_retarget
= false);