]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_process.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_process.h
CommitLineData
7c673cae 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
7c673cae
FG
3
4#ifndef RGW_PROCESS_H
5#define RGW_PROCESS_H
6
7#include "rgw_common.h"
7c673cae
FG
8#include "rgw_acl.h"
9#include "rgw_auth_registry.h"
10#include "rgw_user.h"
11#include "rgw_op.h"
12#include "rgw_rest.h"
20effc67 13#include "rgw_ratelimit.h"
11fdf7f2 14#include "include/ceph_assert.h"
7c673cae
FG
15
16#include "common/WorkQueue.h"
17#include "common/Throttle.h"
18
19#include <atomic>
20
21#if !defined(dout_subsys)
22#define dout_subsys ceph_subsys_rgw
23#define def_dout_subsys
24#endif
25
26#define dout_context g_ceph_context
27
28extern void signal_shutdown();
29
11fdf7f2
TL
30namespace rgw::dmclock {
31 class Scheduler;
32}
33
7c673cae 34struct RGWProcessEnv {
20effc67 35 rgw::sal::Store* store;
7c673cae 36 RGWREST *rest;
a4b75251 37 OpsLogSink *olog;
7c673cae
FG
38 int port;
39 std::string uri_prefix;
40 std::shared_ptr<rgw::auth::StrategyRegistry> auth_registry;
20effc67
TL
41 //maybe there is a better place to store the rate limit data structure
42 ActiveRateLimiter* ratelimiting;
7c673cae
FG
43};
44
45class RGWFrontendConfig;
f67539c2 46class RGWRequest;
7c673cae
FG
47
48class RGWProcess {
20effc67 49 std::deque<RGWRequest*> m_req_queue;
7c673cae
FG
50protected:
51 CephContext *cct;
20effc67 52 rgw::sal::Store* store;
7c673cae 53 rgw_auth_registry_ptr_t auth_registry;
a4b75251 54 OpsLogSink* olog;
7c673cae
FG
55 ThreadPool m_tp;
56 Throttle req_throttle;
57 RGWREST* rest;
58 RGWFrontendConfig* conf;
59 int sock_fd;
60 std::string uri_prefix;
61
b3b6e05e 62 struct RGWWQ : public DoutPrefixProvider, public ThreadPool::WorkQueue<RGWRequest> {
7c673cae 63 RGWProcess* process;
f67539c2
TL
64 RGWWQ(RGWProcess* p, ceph::timespan timeout, ceph::timespan suicide_timeout,
65 ThreadPool* tp)
7c673cae
FG
66 : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout,
67 tp), process(p) {}
68
11fdf7f2 69 bool _enqueue(RGWRequest* req) override;
7c673cae
FG
70
71 void _dequeue(RGWRequest* req) override {
72 ceph_abort();
73 }
74
75 bool _empty() override {
76 return process->m_req_queue.empty();
77 }
78
11fdf7f2 79 RGWRequest* _dequeue() override;
7c673cae
FG
80
81 using ThreadPool::WorkQueue<RGWRequest>::_process;
82
11fdf7f2 83 void _process(RGWRequest *req, ThreadPool::TPHandle &) override;
7c673cae
FG
84
85 void _dump_queue();
86
87 void _clear() override {
11fdf7f2 88 ceph_assert(process->m_req_queue.empty());
7c673cae 89 }
b3b6e05e
TL
90
91 CephContext *get_cct() const override { return process->cct; }
92 unsigned get_subsys() const { return ceph_subsys_rgw; }
93 std::ostream& gen_prefix(std::ostream& out) const { return out << "rgw request work queue: ";}
94
7c673cae
FG
95 } req_wq;
96
97public:
98 RGWProcess(CephContext* const cct,
99 RGWProcessEnv* const pe,
100 const int num_threads,
101 RGWFrontendConfig* const conf)
102 : cct(cct),
103 store(pe->store),
104 auth_registry(pe->auth_registry),
105 olog(pe->olog),
106 m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads),
107 req_throttle(cct, "rgw_ops", num_threads * 2),
108 rest(pe->rest),
109 conf(conf),
110 sock_fd(-1),
111 uri_prefix(pe->uri_prefix),
f67539c2
TL
112 req_wq(this,
113 ceph::make_timespan(g_conf()->rgw_op_thread_timeout),
114 ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout),
115 &m_tp) {
7c673cae
FG
116 }
117
118 virtual ~RGWProcess() = default;
119
120 virtual void run() = 0;
b3b6e05e 121 virtual void handle_request(const DoutPrefixProvider *dpp, RGWRequest *req) = 0;
7c673cae
FG
122
123 void pause() {
124 m_tp.pause();
125 }
126
20effc67 127 void unpause_with_new_config(rgw::sal::Store* const store,
7c673cae
FG
128 rgw_auth_registry_ptr_t auth_registry) {
129 this->store = store;
130 this->auth_registry = std::move(auth_registry);
131 m_tp.unpause();
132 }
133
134 void close_fd() {
135 if (sock_fd >= 0) {
136 ::close(sock_fd);
137 sock_fd = -1;
138 }
139 }
140}; /* RGWProcess */
141
7c673cae
FG
142class RGWProcessControlThread : public Thread {
143 RGWProcess *pprocess;
144public:
11fdf7f2 145 explicit RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {}
7c673cae
FG
146
147 void *entry() override {
148 pprocess->run();
149 return NULL;
150 }
151};
152
153class RGWLoadGenProcess : public RGWProcess {
154 RGWAccessKey access_key;
155public:
156 RGWLoadGenProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads,
157 RGWFrontendConfig* _conf) :
158 RGWProcess(cct, pe, num_threads, _conf) {}
159 void run() override;
160 void checkpoint();
b3b6e05e 161 void handle_request(const DoutPrefixProvider *dpp, RGWRequest* req) override;
20effc67 162 void gen_request(const std::string& method, const std::string& resource,
31f18b77 163 int content_length, std::atomic<bool>* fail_flag);
7c673cae
FG
164
165 void set_access_key(RGWAccessKey& key) { access_key = key; }
166};
7c673cae 167/* process stream request */
20effc67 168extern int process_request(rgw::sal::Store* store,
7c673cae
FG
169 RGWREST* rest,
170 RGWRequest* req,
171 const std::string& frontend_prefix,
172 const rgw_auth_registry_t& auth_registry,
173 RGWRestfulIO* client_io,
a4b75251 174 OpsLogSink* olog,
11fdf7f2
TL
175 optional_yield y,
176 rgw::dmclock::Scheduler *scheduler,
f67539c2
TL
177 std::string* user,
178 ceph::coarse_real_clock::duration* latency,
20effc67 179 std::shared_ptr<RateLimiter> ratelimit,
94b18763 180 int* http_ret = nullptr);
7c673cae
FG
181
182extern int rgw_process_authenticated(RGWHandler_REST* handler,
183 RGWOp*& op,
184 RGWRequest* req,
185 req_state* s,
20effc67
TL
186 optional_yield y,
187 rgw::sal::Store* store,
7c673cae
FG
188 bool skip_retarget = false);
189
190#if defined(def_dout_subsys)
191#undef def_dout_subsys
192#undef dout_subsys
193#endif
194#undef dout_context
195
196#endif /* RGW_PROCESS_H */