]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
7c673cae FG |
3 | |
4 | #ifndef RGW_PROCESS_H | |
5 | #define RGW_PROCESS_H | |
6 | ||
7 | #include "rgw_common.h" | |
7c673cae FG |
8 | #include "rgw_acl.h" |
9 | #include "rgw_auth_registry.h" | |
10 | #include "rgw_user.h" | |
11 | #include "rgw_op.h" | |
12 | #include "rgw_rest.h" | |
13 | ||
11fdf7f2 | 14 | #include "include/ceph_assert.h" |
7c673cae FG |
15 | |
16 | #include "common/WorkQueue.h" | |
17 | #include "common/Throttle.h" | |
18 | ||
19 | #include <atomic> | |
20 | ||
21 | #if !defined(dout_subsys) | |
22 | #define dout_subsys ceph_subsys_rgw | |
23 | #define def_dout_subsys | |
24 | #endif | |
25 | ||
26 | #define dout_context g_ceph_context | |
27 | ||
28 | extern void signal_shutdown(); | |
29 | ||
11fdf7f2 TL |
30 | namespace rgw::dmclock { |
31 | class Scheduler; | |
32 | } | |
33 | ||
7c673cae | 34 | struct RGWProcessEnv { |
9f95a23c | 35 | rgw::sal::RGWRadosStore *store; |
7c673cae FG |
36 | RGWREST *rest; |
37 | OpsLogSocket *olog; | |
38 | int port; | |
39 | std::string uri_prefix; | |
40 | std::shared_ptr<rgw::auth::StrategyRegistry> auth_registry; | |
41 | }; | |
42 | ||
43 | class RGWFrontendConfig; | |
f67539c2 | 44 | class RGWRequest; |
7c673cae FG |
45 | |
46 | class RGWProcess { | |
47 | deque<RGWRequest*> m_req_queue; | |
48 | protected: | |
49 | CephContext *cct; | |
9f95a23c | 50 | rgw::sal::RGWRadosStore* store; |
7c673cae FG |
51 | rgw_auth_registry_ptr_t auth_registry; |
52 | OpsLogSocket* olog; | |
53 | ThreadPool m_tp; | |
54 | Throttle req_throttle; | |
55 | RGWREST* rest; | |
56 | RGWFrontendConfig* conf; | |
57 | int sock_fd; | |
58 | std::string uri_prefix; | |
59 | ||
b3b6e05e | 60 | struct RGWWQ : public DoutPrefixProvider, public ThreadPool::WorkQueue<RGWRequest> { |
7c673cae | 61 | RGWProcess* process; |
f67539c2 TL |
62 | RGWWQ(RGWProcess* p, ceph::timespan timeout, ceph::timespan suicide_timeout, |
63 | ThreadPool* tp) | |
7c673cae FG |
64 | : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout, |
65 | tp), process(p) {} | |
66 | ||
11fdf7f2 | 67 | bool _enqueue(RGWRequest* req) override; |
7c673cae FG |
68 | |
69 | void _dequeue(RGWRequest* req) override { | |
70 | ceph_abort(); | |
71 | } | |
72 | ||
73 | bool _empty() override { | |
74 | return process->m_req_queue.empty(); | |
75 | } | |
76 | ||
11fdf7f2 | 77 | RGWRequest* _dequeue() override; |
7c673cae FG |
78 | |
79 | using ThreadPool::WorkQueue<RGWRequest>::_process; | |
80 | ||
11fdf7f2 | 81 | void _process(RGWRequest *req, ThreadPool::TPHandle &) override; |
7c673cae FG |
82 | |
83 | void _dump_queue(); | |
84 | ||
85 | void _clear() override { | |
11fdf7f2 | 86 | ceph_assert(process->m_req_queue.empty()); |
7c673cae | 87 | } |
b3b6e05e TL |
88 | |
89 | CephContext *get_cct() const override { return process->cct; } | |
90 | unsigned get_subsys() const { return ceph_subsys_rgw; } | |
91 | std::ostream& gen_prefix(std::ostream& out) const { return out << "rgw request work queue: ";} | |
92 | ||
7c673cae FG |
93 | } req_wq; |
94 | ||
95 | public: | |
96 | RGWProcess(CephContext* const cct, | |
97 | RGWProcessEnv* const pe, | |
98 | const int num_threads, | |
99 | RGWFrontendConfig* const conf) | |
100 | : cct(cct), | |
101 | store(pe->store), | |
102 | auth_registry(pe->auth_registry), | |
103 | olog(pe->olog), | |
104 | m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads), | |
105 | req_throttle(cct, "rgw_ops", num_threads * 2), | |
106 | rest(pe->rest), | |
107 | conf(conf), | |
108 | sock_fd(-1), | |
109 | uri_prefix(pe->uri_prefix), | |
f67539c2 TL |
110 | req_wq(this, |
111 | ceph::make_timespan(g_conf()->rgw_op_thread_timeout), | |
112 | ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout), | |
113 | &m_tp) { | |
7c673cae FG |
114 | } |
115 | ||
116 | virtual ~RGWProcess() = default; | |
117 | ||
118 | virtual void run() = 0; | |
b3b6e05e | 119 | virtual void handle_request(const DoutPrefixProvider *dpp, RGWRequest *req) = 0; |
7c673cae FG |
120 | |
121 | void pause() { | |
122 | m_tp.pause(); | |
123 | } | |
124 | ||
9f95a23c | 125 | void unpause_with_new_config(rgw::sal::RGWRadosStore* const store, |
7c673cae FG |
126 | rgw_auth_registry_ptr_t auth_registry) { |
127 | this->store = store; | |
128 | this->auth_registry = std::move(auth_registry); | |
129 | m_tp.unpause(); | |
130 | } | |
131 | ||
132 | void close_fd() { | |
133 | if (sock_fd >= 0) { | |
134 | ::close(sock_fd); | |
135 | sock_fd = -1; | |
136 | } | |
137 | } | |
138 | }; /* RGWProcess */ | |
139 | ||
140 | class RGWFCGXProcess : public RGWProcess { | |
141 | int max_connections; | |
142 | public: | |
143 | ||
144 | /* have a bit more connections than threads so that requests are | |
145 | * still accepted even if we're still processing older requests */ | |
146 | RGWFCGXProcess(CephContext* const cct, | |
147 | RGWProcessEnv* const pe, | |
148 | const int num_threads, | |
149 | RGWFrontendConfig* const conf) | |
150 | : RGWProcess(cct, pe, num_threads, conf), | |
151 | max_connections(num_threads + (num_threads >> 3)) { | |
152 | } | |
153 | ||
154 | void run() override; | |
b3b6e05e | 155 | void handle_request(const DoutPrefixProvider *dpp, RGWRequest* req) override; |
7c673cae FG |
156 | }; |
157 | ||
158 | class RGWProcessControlThread : public Thread { | |
159 | RGWProcess *pprocess; | |
160 | public: | |
11fdf7f2 | 161 | explicit RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {} |
7c673cae FG |
162 | |
163 | void *entry() override { | |
164 | pprocess->run(); | |
165 | return NULL; | |
166 | } | |
167 | }; | |
168 | ||
169 | class RGWLoadGenProcess : public RGWProcess { | |
170 | RGWAccessKey access_key; | |
171 | public: | |
172 | RGWLoadGenProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads, | |
173 | RGWFrontendConfig* _conf) : | |
174 | RGWProcess(cct, pe, num_threads, _conf) {} | |
175 | void run() override; | |
176 | void checkpoint(); | |
b3b6e05e | 177 | void handle_request(const DoutPrefixProvider *dpp, RGWRequest* req) override; |
7c673cae | 178 | void gen_request(const string& method, const string& resource, |
31f18b77 | 179 | int content_length, std::atomic<bool>* fail_flag); |
7c673cae FG |
180 | |
181 | void set_access_key(RGWAccessKey& key) { access_key = key; } | |
182 | }; | |
7c673cae | 183 | /* process stream request */ |
9f95a23c | 184 | extern int process_request(rgw::sal::RGWRadosStore* store, |
7c673cae FG |
185 | RGWREST* rest, |
186 | RGWRequest* req, | |
187 | const std::string& frontend_prefix, | |
188 | const rgw_auth_registry_t& auth_registry, | |
189 | RGWRestfulIO* client_io, | |
94b18763 | 190 | OpsLogSocket* olog, |
11fdf7f2 TL |
191 | optional_yield y, |
192 | rgw::dmclock::Scheduler *scheduler, | |
f67539c2 TL |
193 | std::string* user, |
194 | ceph::coarse_real_clock::duration* latency, | |
94b18763 | 195 | int* http_ret = nullptr); |
7c673cae FG |
196 | |
197 | extern int rgw_process_authenticated(RGWHandler_REST* handler, | |
198 | RGWOp*& op, | |
199 | RGWRequest* req, | |
200 | req_state* s, | |
f67539c2 | 201 | optional_yield y, |
7c673cae FG |
202 | bool skip_retarget = false); |
203 | ||
204 | #if defined(def_dout_subsys) | |
205 | #undef def_dout_subsys | |
206 | #undef dout_subsys | |
207 | #endif | |
208 | #undef dout_context | |
209 | ||
210 | #endif /* RGW_PROCESS_H */ |