]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_process.h
update sources to v12.2.5
[ceph.git] / ceph / src / rgw / rgw_process.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#ifndef RGW_PROCESS_H
5#define RGW_PROCESS_H
6
7#include "rgw_common.h"
8#include "rgw_rados.h"
9#include "rgw_acl.h"
10#include "rgw_auth_registry.h"
11#include "rgw_user.h"
12#include "rgw_op.h"
13#include "rgw_rest.h"
14
15#include "include/assert.h"
16
17#include "common/WorkQueue.h"
18#include "common/Throttle.h"
19
20#include <atomic>
21
22#if !defined(dout_subsys)
23#define dout_subsys ceph_subsys_rgw
24#define def_dout_subsys
25#endif
26
27#define dout_context g_ceph_context
28
29extern void signal_shutdown();
30
31struct RGWProcessEnv {
32 RGWRados *store;
33 RGWREST *rest;
34 OpsLogSocket *olog;
35 int port;
36 std::string uri_prefix;
37 std::shared_ptr<rgw::auth::StrategyRegistry> auth_registry;
38};
39
40class RGWFrontendConfig;
41
42class RGWProcess {
43 deque<RGWRequest*> m_req_queue;
44protected:
45 CephContext *cct;
46 RGWRados* store;
47 rgw_auth_registry_ptr_t auth_registry;
48 OpsLogSocket* olog;
49 ThreadPool m_tp;
50 Throttle req_throttle;
51 RGWREST* rest;
52 RGWFrontendConfig* conf;
53 int sock_fd;
54 std::string uri_prefix;
55
56 struct RGWWQ : public ThreadPool::WorkQueue<RGWRequest> {
57 RGWProcess* process;
58 RGWWQ(RGWProcess* p, time_t timeout, time_t suicide_timeout, ThreadPool* tp)
59 : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout,
60 tp), process(p) {}
61
62 bool _enqueue(RGWRequest* req) override {
63 process->m_req_queue.push_back(req);
64 perfcounter->inc(l_rgw_qlen);
65 dout(20) << "enqueued request req=" << hex << req << dec << dendl;
66 _dump_queue();
67 return true;
68 }
69
70 void _dequeue(RGWRequest* req) override {
71 ceph_abort();
72 }
73
74 bool _empty() override {
75 return process->m_req_queue.empty();
76 }
77
78 RGWRequest* _dequeue() override {
79 if (process->m_req_queue.empty())
80 return NULL;
81 RGWRequest *req = process->m_req_queue.front();
82 process->m_req_queue.pop_front();
83 dout(20) << "dequeued request req=" << hex << req << dec << dendl;
84 _dump_queue();
85 perfcounter->inc(l_rgw_qlen, -1);
86 return req;
87 }
88
89 using ThreadPool::WorkQueue<RGWRequest>::_process;
90
91 void _process(RGWRequest *req, ThreadPool::TPHandle &) override {
92 perfcounter->inc(l_rgw_qactive);
93 process->handle_request(req);
94 process->req_throttle.put(1);
95 perfcounter->inc(l_rgw_qactive, -1);
96 }
97
98 void _dump_queue();
99
100 void _clear() override {
101 assert(process->m_req_queue.empty());
102 }
103 } req_wq;
104
105public:
106 RGWProcess(CephContext* const cct,
107 RGWProcessEnv* const pe,
108 const int num_threads,
109 RGWFrontendConfig* const conf)
110 : cct(cct),
111 store(pe->store),
112 auth_registry(pe->auth_registry),
113 olog(pe->olog),
114 m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads),
115 req_throttle(cct, "rgw_ops", num_threads * 2),
116 rest(pe->rest),
117 conf(conf),
118 sock_fd(-1),
119 uri_prefix(pe->uri_prefix),
120 req_wq(this, g_conf->rgw_op_thread_timeout,
121 g_conf->rgw_op_thread_suicide_timeout, &m_tp) {
122 }
123
124 virtual ~RGWProcess() = default;
125
126 virtual void run() = 0;
127 virtual void handle_request(RGWRequest *req) = 0;
128
129 void pause() {
130 m_tp.pause();
131 }
132
133 void unpause_with_new_config(RGWRados* const store,
134 rgw_auth_registry_ptr_t auth_registry) {
135 this->store = store;
136 this->auth_registry = std::move(auth_registry);
137 m_tp.unpause();
138 }
139
140 void close_fd() {
141 if (sock_fd >= 0) {
142 ::close(sock_fd);
143 sock_fd = -1;
144 }
145 }
146}; /* RGWProcess */
147
148class RGWFCGXProcess : public RGWProcess {
149 int max_connections;
150public:
151
152 /* have a bit more connections than threads so that requests are
153 * still accepted even if we're still processing older requests */
154 RGWFCGXProcess(CephContext* const cct,
155 RGWProcessEnv* const pe,
156 const int num_threads,
157 RGWFrontendConfig* const conf)
158 : RGWProcess(cct, pe, num_threads, conf),
159 max_connections(num_threads + (num_threads >> 3)) {
160 }
161
162 void run() override;
163 void handle_request(RGWRequest* req) override;
164};
165
166class RGWProcessControlThread : public Thread {
167 RGWProcess *pprocess;
168public:
169 RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {}
170
171 void *entry() override {
172 pprocess->run();
173 return NULL;
174 }
175};
176
177class RGWLoadGenProcess : public RGWProcess {
178 RGWAccessKey access_key;
179public:
180 RGWLoadGenProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads,
181 RGWFrontendConfig* _conf) :
182 RGWProcess(cct, pe, num_threads, _conf) {}
183 void run() override;
184 void checkpoint();
185 void handle_request(RGWRequest* req) override;
186 void gen_request(const string& method, const string& resource,
31f18b77 187 int content_length, std::atomic<bool>* fail_flag);
7c673cae
FG
188
189 void set_access_key(RGWAccessKey& key) { access_key = key; }
190};
191
192/* process stream request */
193extern int process_request(RGWRados* store,
194 RGWREST* rest,
195 RGWRequest* req,
196 const std::string& frontend_prefix,
197 const rgw_auth_registry_t& auth_registry,
198 RGWRestfulIO* client_io,
94b18763
FG
199 OpsLogSocket* olog,
200 int* http_ret = nullptr);
7c673cae
FG
201
202extern int rgw_process_authenticated(RGWHandler_REST* handler,
203 RGWOp*& op,
204 RGWRequest* req,
205 req_state* s,
206 bool skip_retarget = false);
207
208#if defined(def_dout_subsys)
209#undef def_dout_subsys
210#undef dout_subsys
211#endif
212#undef dout_context
213
214#endif /* RGW_PROCESS_H */