]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_process.h
import ceph pacific 16.2.5
[ceph.git] / ceph / src / rgw / rgw_process.h
CommitLineData
7c673cae 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
7c673cae
FG
3
4#ifndef RGW_PROCESS_H
5#define RGW_PROCESS_H
6
7#include "rgw_common.h"
7c673cae
FG
8#include "rgw_acl.h"
9#include "rgw_auth_registry.h"
10#include "rgw_user.h"
11#include "rgw_op.h"
12#include "rgw_rest.h"
13
11fdf7f2 14#include "include/ceph_assert.h"
7c673cae
FG
15
16#include "common/WorkQueue.h"
17#include "common/Throttle.h"
18
19#include <atomic>
20
21#if !defined(dout_subsys)
22#define dout_subsys ceph_subsys_rgw
23#define def_dout_subsys
24#endif
25
26#define dout_context g_ceph_context
27
28extern void signal_shutdown();
29
11fdf7f2
TL
30namespace rgw::dmclock {
31 class Scheduler;
32}
33
7c673cae 34struct RGWProcessEnv {
9f95a23c 35 rgw::sal::RGWRadosStore *store;
7c673cae
FG
36 RGWREST *rest;
37 OpsLogSocket *olog;
38 int port;
39 std::string uri_prefix;
40 std::shared_ptr<rgw::auth::StrategyRegistry> auth_registry;
41};
42
43class RGWFrontendConfig;
f67539c2 44class RGWRequest;
7c673cae
FG
45
46class RGWProcess {
47 deque<RGWRequest*> m_req_queue;
48protected:
49 CephContext *cct;
9f95a23c 50 rgw::sal::RGWRadosStore* store;
7c673cae
FG
51 rgw_auth_registry_ptr_t auth_registry;
52 OpsLogSocket* olog;
53 ThreadPool m_tp;
54 Throttle req_throttle;
55 RGWREST* rest;
56 RGWFrontendConfig* conf;
57 int sock_fd;
58 std::string uri_prefix;
59
b3b6e05e 60 struct RGWWQ : public DoutPrefixProvider, public ThreadPool::WorkQueue<RGWRequest> {
7c673cae 61 RGWProcess* process;
f67539c2
TL
62 RGWWQ(RGWProcess* p, ceph::timespan timeout, ceph::timespan suicide_timeout,
63 ThreadPool* tp)
7c673cae
FG
64 : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout,
65 tp), process(p) {}
66
11fdf7f2 67 bool _enqueue(RGWRequest* req) override;
7c673cae
FG
68
69 void _dequeue(RGWRequest* req) override {
70 ceph_abort();
71 }
72
73 bool _empty() override {
74 return process->m_req_queue.empty();
75 }
76
11fdf7f2 77 RGWRequest* _dequeue() override;
7c673cae
FG
78
79 using ThreadPool::WorkQueue<RGWRequest>::_process;
80
11fdf7f2 81 void _process(RGWRequest *req, ThreadPool::TPHandle &) override;
7c673cae
FG
82
83 void _dump_queue();
84
85 void _clear() override {
11fdf7f2 86 ceph_assert(process->m_req_queue.empty());
7c673cae 87 }
b3b6e05e
TL
88
89 CephContext *get_cct() const override { return process->cct; }
90 unsigned get_subsys() const { return ceph_subsys_rgw; }
91 std::ostream& gen_prefix(std::ostream& out) const { return out << "rgw request work queue: ";}
92
7c673cae
FG
93 } req_wq;
94
95public:
96 RGWProcess(CephContext* const cct,
97 RGWProcessEnv* const pe,
98 const int num_threads,
99 RGWFrontendConfig* const conf)
100 : cct(cct),
101 store(pe->store),
102 auth_registry(pe->auth_registry),
103 olog(pe->olog),
104 m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads),
105 req_throttle(cct, "rgw_ops", num_threads * 2),
106 rest(pe->rest),
107 conf(conf),
108 sock_fd(-1),
109 uri_prefix(pe->uri_prefix),
f67539c2
TL
110 req_wq(this,
111 ceph::make_timespan(g_conf()->rgw_op_thread_timeout),
112 ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout),
113 &m_tp) {
7c673cae
FG
114 }
115
116 virtual ~RGWProcess() = default;
117
118 virtual void run() = 0;
b3b6e05e 119 virtual void handle_request(const DoutPrefixProvider *dpp, RGWRequest *req) = 0;
7c673cae
FG
120
121 void pause() {
122 m_tp.pause();
123 }
124
9f95a23c 125 void unpause_with_new_config(rgw::sal::RGWRadosStore* const store,
7c673cae
FG
126 rgw_auth_registry_ptr_t auth_registry) {
127 this->store = store;
128 this->auth_registry = std::move(auth_registry);
129 m_tp.unpause();
130 }
131
132 void close_fd() {
133 if (sock_fd >= 0) {
134 ::close(sock_fd);
135 sock_fd = -1;
136 }
137 }
138}; /* RGWProcess */
139
140class RGWFCGXProcess : public RGWProcess {
141 int max_connections;
142public:
143
144 /* have a bit more connections than threads so that requests are
145 * still accepted even if we're still processing older requests */
146 RGWFCGXProcess(CephContext* const cct,
147 RGWProcessEnv* const pe,
148 const int num_threads,
149 RGWFrontendConfig* const conf)
150 : RGWProcess(cct, pe, num_threads, conf),
151 max_connections(num_threads + (num_threads >> 3)) {
152 }
153
154 void run() override;
b3b6e05e 155 void handle_request(const DoutPrefixProvider *dpp, RGWRequest* req) override;
7c673cae
FG
156};
157
158class RGWProcessControlThread : public Thread {
159 RGWProcess *pprocess;
160public:
11fdf7f2 161 explicit RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {}
7c673cae
FG
162
163 void *entry() override {
164 pprocess->run();
165 return NULL;
166 }
167};
168
169class RGWLoadGenProcess : public RGWProcess {
170 RGWAccessKey access_key;
171public:
172 RGWLoadGenProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads,
173 RGWFrontendConfig* _conf) :
174 RGWProcess(cct, pe, num_threads, _conf) {}
175 void run() override;
176 void checkpoint();
b3b6e05e 177 void handle_request(const DoutPrefixProvider *dpp, RGWRequest* req) override;
7c673cae 178 void gen_request(const string& method, const string& resource,
31f18b77 179 int content_length, std::atomic<bool>* fail_flag);
7c673cae
FG
180
181 void set_access_key(RGWAccessKey& key) { access_key = key; }
182};
7c673cae 183/* process stream request */
9f95a23c 184extern int process_request(rgw::sal::RGWRadosStore* store,
7c673cae
FG
185 RGWREST* rest,
186 RGWRequest* req,
187 const std::string& frontend_prefix,
188 const rgw_auth_registry_t& auth_registry,
189 RGWRestfulIO* client_io,
94b18763 190 OpsLogSocket* olog,
11fdf7f2
TL
191 optional_yield y,
192 rgw::dmclock::Scheduler *scheduler,
f67539c2
TL
193 std::string* user,
194 ceph::coarse_real_clock::duration* latency,
94b18763 195 int* http_ret = nullptr);
7c673cae
FG
196
197extern int rgw_process_authenticated(RGWHandler_REST* handler,
198 RGWOp*& op,
199 RGWRequest* req,
200 req_state* s,
f67539c2 201 optional_yield y,
7c673cae
FG
202 bool skip_retarget = false);
203
204#if defined(def_dout_subsys)
205#undef def_dout_subsys
206#undef dout_subsys
207#endif
208#undef dout_context
209
210#endif /* RGW_PROCESS_H */