]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
7c673cae FG |
3 | |
4 | #ifndef RGW_PROCESS_H | |
5 | #define RGW_PROCESS_H | |
6 | ||
7 | #include "rgw_common.h" | |
7c673cae FG |
8 | #include "rgw_acl.h" |
9 | #include "rgw_auth_registry.h" | |
10 | #include "rgw_user.h" | |
11 | #include "rgw_op.h" | |
12 | #include "rgw_rest.h" | |
20effc67 | 13 | #include "rgw_ratelimit.h" |
11fdf7f2 | 14 | #include "include/ceph_assert.h" |
7c673cae FG |
15 | |
16 | #include "common/WorkQueue.h" | |
17 | #include "common/Throttle.h" | |
18 | ||
19 | #include <atomic> | |
20 | ||
21 | #if !defined(dout_subsys) | |
22 | #define dout_subsys ceph_subsys_rgw | |
23 | #define def_dout_subsys | |
24 | #endif | |
25 | ||
26 | #define dout_context g_ceph_context | |
27 | ||
28 | extern void signal_shutdown(); | |
29 | ||
11fdf7f2 TL |
30 | namespace rgw::dmclock { |
31 | class Scheduler; | |
32 | } | |
33 | ||
7c673cae | 34 | struct RGWProcessEnv { |
20effc67 | 35 | rgw::sal::Store* store; |
7c673cae | 36 | RGWREST *rest; |
a4b75251 | 37 | OpsLogSink *olog; |
7c673cae FG |
38 | int port; |
39 | std::string uri_prefix; | |
40 | std::shared_ptr<rgw::auth::StrategyRegistry> auth_registry; | |
20effc67 TL |
41 | //maybe there is a better place to store the rate limit data structure |
42 | ActiveRateLimiter* ratelimiting; | |
7c673cae FG |
43 | }; |
44 | ||
45 | class RGWFrontendConfig; | |
f67539c2 | 46 | class RGWRequest; |
7c673cae FG |
47 | |
48 | class RGWProcess { | |
20effc67 | 49 | std::deque<RGWRequest*> m_req_queue; |
7c673cae FG |
50 | protected: |
51 | CephContext *cct; | |
20effc67 | 52 | rgw::sal::Store* store; |
7c673cae | 53 | rgw_auth_registry_ptr_t auth_registry; |
a4b75251 | 54 | OpsLogSink* olog; |
7c673cae FG |
55 | ThreadPool m_tp; |
56 | Throttle req_throttle; | |
57 | RGWREST* rest; | |
58 | RGWFrontendConfig* conf; | |
59 | int sock_fd; | |
60 | std::string uri_prefix; | |
61 | ||
b3b6e05e | 62 | struct RGWWQ : public DoutPrefixProvider, public ThreadPool::WorkQueue<RGWRequest> { |
7c673cae | 63 | RGWProcess* process; |
f67539c2 TL |
64 | RGWWQ(RGWProcess* p, ceph::timespan timeout, ceph::timespan suicide_timeout, |
65 | ThreadPool* tp) | |
7c673cae FG |
66 | : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout, |
67 | tp), process(p) {} | |
68 | ||
11fdf7f2 | 69 | bool _enqueue(RGWRequest* req) override; |
7c673cae FG |
70 | |
71 | void _dequeue(RGWRequest* req) override { | |
72 | ceph_abort(); | |
73 | } | |
74 | ||
75 | bool _empty() override { | |
76 | return process->m_req_queue.empty(); | |
77 | } | |
78 | ||
11fdf7f2 | 79 | RGWRequest* _dequeue() override; |
7c673cae FG |
80 | |
81 | using ThreadPool::WorkQueue<RGWRequest>::_process; | |
82 | ||
11fdf7f2 | 83 | void _process(RGWRequest *req, ThreadPool::TPHandle &) override; |
7c673cae FG |
84 | |
85 | void _dump_queue(); | |
86 | ||
87 | void _clear() override { | |
11fdf7f2 | 88 | ceph_assert(process->m_req_queue.empty()); |
7c673cae | 89 | } |
b3b6e05e TL |
90 | |
91 | CephContext *get_cct() const override { return process->cct; } | |
92 | unsigned get_subsys() const { return ceph_subsys_rgw; } | |
93 | std::ostream& gen_prefix(std::ostream& out) const { return out << "rgw request work queue: ";} | |
94 | ||
7c673cae FG |
95 | } req_wq; |
96 | ||
97 | public: | |
98 | RGWProcess(CephContext* const cct, | |
99 | RGWProcessEnv* const pe, | |
100 | const int num_threads, | |
101 | RGWFrontendConfig* const conf) | |
102 | : cct(cct), | |
103 | store(pe->store), | |
104 | auth_registry(pe->auth_registry), | |
105 | olog(pe->olog), | |
106 | m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads), | |
107 | req_throttle(cct, "rgw_ops", num_threads * 2), | |
108 | rest(pe->rest), | |
109 | conf(conf), | |
110 | sock_fd(-1), | |
111 | uri_prefix(pe->uri_prefix), | |
f67539c2 TL |
112 | req_wq(this, |
113 | ceph::make_timespan(g_conf()->rgw_op_thread_timeout), | |
114 | ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout), | |
115 | &m_tp) { | |
7c673cae FG |
116 | } |
117 | ||
118 | virtual ~RGWProcess() = default; | |
119 | ||
120 | virtual void run() = 0; | |
b3b6e05e | 121 | virtual void handle_request(const DoutPrefixProvider *dpp, RGWRequest *req) = 0; |
7c673cae FG |
122 | |
123 | void pause() { | |
124 | m_tp.pause(); | |
125 | } | |
126 | ||
20effc67 | 127 | void unpause_with_new_config(rgw::sal::Store* const store, |
7c673cae FG |
128 | rgw_auth_registry_ptr_t auth_registry) { |
129 | this->store = store; | |
130 | this->auth_registry = std::move(auth_registry); | |
131 | m_tp.unpause(); | |
132 | } | |
133 | ||
134 | void close_fd() { | |
135 | if (sock_fd >= 0) { | |
136 | ::close(sock_fd); | |
137 | sock_fd = -1; | |
138 | } | |
139 | } | |
140 | }; /* RGWProcess */ | |
141 | ||
7c673cae FG |
142 | class RGWProcessControlThread : public Thread { |
143 | RGWProcess *pprocess; | |
144 | public: | |
11fdf7f2 | 145 | explicit RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {} |
7c673cae FG |
146 | |
147 | void *entry() override { | |
148 | pprocess->run(); | |
149 | return NULL; | |
150 | } | |
151 | }; | |
152 | ||
153 | class RGWLoadGenProcess : public RGWProcess { | |
154 | RGWAccessKey access_key; | |
155 | public: | |
156 | RGWLoadGenProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads, | |
157 | RGWFrontendConfig* _conf) : | |
158 | RGWProcess(cct, pe, num_threads, _conf) {} | |
159 | void run() override; | |
160 | void checkpoint(); | |
b3b6e05e | 161 | void handle_request(const DoutPrefixProvider *dpp, RGWRequest* req) override; |
20effc67 | 162 | void gen_request(const std::string& method, const std::string& resource, |
31f18b77 | 163 | int content_length, std::atomic<bool>* fail_flag); |
7c673cae FG |
164 | |
165 | void set_access_key(RGWAccessKey& key) { access_key = key; } | |
166 | }; | |
7c673cae | 167 | /* process stream request */ |
20effc67 | 168 | extern int process_request(rgw::sal::Store* store, |
7c673cae FG |
169 | RGWREST* rest, |
170 | RGWRequest* req, | |
171 | const std::string& frontend_prefix, | |
172 | const rgw_auth_registry_t& auth_registry, | |
173 | RGWRestfulIO* client_io, | |
a4b75251 | 174 | OpsLogSink* olog, |
11fdf7f2 TL |
175 | optional_yield y, |
176 | rgw::dmclock::Scheduler *scheduler, | |
f67539c2 TL |
177 | std::string* user, |
178 | ceph::coarse_real_clock::duration* latency, | |
20effc67 | 179 | std::shared_ptr<RateLimiter> ratelimit, |
94b18763 | 180 | int* http_ret = nullptr); |
7c673cae FG |
181 | |
182 | extern int rgw_process_authenticated(RGWHandler_REST* handler, | |
183 | RGWOp*& op, | |
184 | RGWRequest* req, | |
185 | req_state* s, | |
20effc67 TL |
186 | optional_yield y, |
187 | rgw::sal::Store* store, | |
7c673cae FG |
188 | bool skip_retarget = false); |
189 | ||
190 | #if defined(def_dout_subsys) | |
191 | #undef def_dout_subsys | |
192 | #undef dout_subsys | |
193 | #endif | |
194 | #undef dout_context | |
195 | ||
196 | #endif /* RGW_PROCESS_H */ |