]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #ifndef RGW_PROCESS_H | |
5 | #define RGW_PROCESS_H | |
6 | ||
7 | #include "rgw_common.h" | |
8 | #include "rgw_rados.h" | |
9 | #include "rgw_acl.h" | |
10 | #include "rgw_auth_registry.h" | |
11 | #include "rgw_user.h" | |
12 | #include "rgw_op.h" | |
13 | #include "rgw_rest.h" | |
14 | ||
15 | #include "include/assert.h" | |
16 | ||
17 | #include "common/WorkQueue.h" | |
18 | #include "common/Throttle.h" | |
19 | ||
20 | #include <atomic> | |
21 | ||
22 | #if !defined(dout_subsys) | |
23 | #define dout_subsys ceph_subsys_rgw | |
24 | #define def_dout_subsys | |
25 | #endif | |
26 | ||
27 | #define dout_context g_ceph_context | |
28 | ||
29 | extern void signal_shutdown(); | |
30 | ||
31 | struct RGWProcessEnv { | |
32 | RGWRados *store; | |
33 | RGWREST *rest; | |
34 | OpsLogSocket *olog; | |
35 | int port; | |
36 | std::string uri_prefix; | |
37 | std::shared_ptr<rgw::auth::StrategyRegistry> auth_registry; | |
38 | }; | |
39 | ||
40 | class RGWFrontendConfig; | |
41 | ||
42 | class RGWProcess { | |
43 | deque<RGWRequest*> m_req_queue; | |
44 | protected: | |
45 | CephContext *cct; | |
46 | RGWRados* store; | |
47 | rgw_auth_registry_ptr_t auth_registry; | |
48 | OpsLogSocket* olog; | |
49 | ThreadPool m_tp; | |
50 | Throttle req_throttle; | |
51 | RGWREST* rest; | |
52 | RGWFrontendConfig* conf; | |
53 | int sock_fd; | |
54 | std::string uri_prefix; | |
55 | ||
56 | struct RGWWQ : public ThreadPool::WorkQueue<RGWRequest> { | |
57 | RGWProcess* process; | |
58 | RGWWQ(RGWProcess* p, time_t timeout, time_t suicide_timeout, ThreadPool* tp) | |
59 | : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout, | |
60 | tp), process(p) {} | |
61 | ||
62 | bool _enqueue(RGWRequest* req) override { | |
63 | process->m_req_queue.push_back(req); | |
64 | perfcounter->inc(l_rgw_qlen); | |
65 | dout(20) << "enqueued request req=" << hex << req << dec << dendl; | |
66 | _dump_queue(); | |
67 | return true; | |
68 | } | |
69 | ||
70 | void _dequeue(RGWRequest* req) override { | |
71 | ceph_abort(); | |
72 | } | |
73 | ||
74 | bool _empty() override { | |
75 | return process->m_req_queue.empty(); | |
76 | } | |
77 | ||
78 | RGWRequest* _dequeue() override { | |
79 | if (process->m_req_queue.empty()) | |
80 | return NULL; | |
81 | RGWRequest *req = process->m_req_queue.front(); | |
82 | process->m_req_queue.pop_front(); | |
83 | dout(20) << "dequeued request req=" << hex << req << dec << dendl; | |
84 | _dump_queue(); | |
85 | perfcounter->inc(l_rgw_qlen, -1); | |
86 | return req; | |
87 | } | |
88 | ||
89 | using ThreadPool::WorkQueue<RGWRequest>::_process; | |
90 | ||
91 | void _process(RGWRequest *req, ThreadPool::TPHandle &) override { | |
92 | perfcounter->inc(l_rgw_qactive); | |
93 | process->handle_request(req); | |
94 | process->req_throttle.put(1); | |
95 | perfcounter->inc(l_rgw_qactive, -1); | |
96 | } | |
97 | ||
98 | void _dump_queue(); | |
99 | ||
100 | void _clear() override { | |
101 | assert(process->m_req_queue.empty()); | |
102 | } | |
103 | } req_wq; | |
104 | ||
105 | public: | |
106 | RGWProcess(CephContext* const cct, | |
107 | RGWProcessEnv* const pe, | |
108 | const int num_threads, | |
109 | RGWFrontendConfig* const conf) | |
110 | : cct(cct), | |
111 | store(pe->store), | |
112 | auth_registry(pe->auth_registry), | |
113 | olog(pe->olog), | |
114 | m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads), | |
115 | req_throttle(cct, "rgw_ops", num_threads * 2), | |
116 | rest(pe->rest), | |
117 | conf(conf), | |
118 | sock_fd(-1), | |
119 | uri_prefix(pe->uri_prefix), | |
120 | req_wq(this, g_conf->rgw_op_thread_timeout, | |
121 | g_conf->rgw_op_thread_suicide_timeout, &m_tp) { | |
122 | } | |
123 | ||
124 | virtual ~RGWProcess() = default; | |
125 | ||
126 | virtual void run() = 0; | |
127 | virtual void handle_request(RGWRequest *req) = 0; | |
128 | ||
129 | void pause() { | |
130 | m_tp.pause(); | |
131 | } | |
132 | ||
133 | void unpause_with_new_config(RGWRados* const store, | |
134 | rgw_auth_registry_ptr_t auth_registry) { | |
135 | this->store = store; | |
136 | this->auth_registry = std::move(auth_registry); | |
137 | m_tp.unpause(); | |
138 | } | |
139 | ||
140 | void close_fd() { | |
141 | if (sock_fd >= 0) { | |
142 | ::close(sock_fd); | |
143 | sock_fd = -1; | |
144 | } | |
145 | } | |
146 | }; /* RGWProcess */ | |
147 | ||
148 | class RGWFCGXProcess : public RGWProcess { | |
149 | int max_connections; | |
150 | public: | |
151 | ||
152 | /* have a bit more connections than threads so that requests are | |
153 | * still accepted even if we're still processing older requests */ | |
154 | RGWFCGXProcess(CephContext* const cct, | |
155 | RGWProcessEnv* const pe, | |
156 | const int num_threads, | |
157 | RGWFrontendConfig* const conf) | |
158 | : RGWProcess(cct, pe, num_threads, conf), | |
159 | max_connections(num_threads + (num_threads >> 3)) { | |
160 | } | |
161 | ||
162 | void run() override; | |
163 | void handle_request(RGWRequest* req) override; | |
164 | }; | |
165 | ||
166 | class RGWProcessControlThread : public Thread { | |
167 | RGWProcess *pprocess; | |
168 | public: | |
169 | RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {} | |
170 | ||
171 | void *entry() override { | |
172 | pprocess->run(); | |
173 | return NULL; | |
174 | } | |
175 | }; | |
176 | ||
177 | class RGWLoadGenProcess : public RGWProcess { | |
178 | RGWAccessKey access_key; | |
179 | public: | |
180 | RGWLoadGenProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads, | |
181 | RGWFrontendConfig* _conf) : | |
182 | RGWProcess(cct, pe, num_threads, _conf) {} | |
183 | void run() override; | |
184 | void checkpoint(); | |
185 | void handle_request(RGWRequest* req) override; | |
186 | void gen_request(const string& method, const string& resource, | |
31f18b77 | 187 | int content_length, std::atomic<bool>* fail_flag); |
7c673cae FG |
188 | |
189 | void set_access_key(RGWAccessKey& key) { access_key = key; } | |
190 | }; | |
191 | ||
192 | /* process stream request */ | |
193 | extern int process_request(RGWRados* store, | |
194 | RGWREST* rest, | |
195 | RGWRequest* req, | |
196 | const std::string& frontend_prefix, | |
197 | const rgw_auth_registry_t& auth_registry, | |
198 | RGWRestfulIO* client_io, | |
199 | OpsLogSocket* olog); | |
200 | ||
201 | extern int rgw_process_authenticated(RGWHandler_REST* handler, | |
202 | RGWOp*& op, | |
203 | RGWRequest* req, | |
204 | req_state* s, | |
205 | bool skip_retarget = false); | |
206 | ||
207 | #if defined(def_dout_subsys) | |
208 | #undef def_dout_subsys | |
209 | #undef dout_subsys | |
210 | #endif | |
211 | #undef dout_context | |
212 | ||
213 | #endif /* RGW_PROCESS_H */ |