]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_process.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / rgw_process.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #pragma once
5
6 #include "rgw_common.h"
7 #include "rgw_acl.h"
8 #include "rgw_user.h"
9 #include "rgw_rest.h"
10 #include "include/ceph_assert.h"
11
12 #include "common/WorkQueue.h"
13 #include "common/Throttle.h"
14
15 #include <atomic>
16
17 #define dout_context g_ceph_context
18
19
20 namespace rgw::dmclock {
21 class Scheduler;
22 }
23
24 struct RGWProcessEnv;
25 class RGWFrontendConfig;
26 class RGWRequest;
27
28 class RGWProcess {
29 std::deque<RGWRequest*> m_req_queue;
30 protected:
31 CephContext *cct;
32 RGWProcessEnv& env;
33 ThreadPool m_tp;
34 Throttle req_throttle;
35 RGWFrontendConfig* conf;
36 int sock_fd;
37 std::string uri_prefix;
38
39 struct RGWWQ : public DoutPrefixProvider, public ThreadPool::WorkQueue<RGWRequest> {
40 RGWProcess* process;
41 RGWWQ(RGWProcess* p, ceph::timespan timeout, ceph::timespan suicide_timeout,
42 ThreadPool* tp)
43 : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout,
44 tp), process(p) {}
45
46 bool _enqueue(RGWRequest* req) override;
47
48 void _dequeue(RGWRequest* req) override {
49 ceph_abort();
50 }
51
52 bool _empty() override {
53 return process->m_req_queue.empty();
54 }
55
56 RGWRequest* _dequeue() override;
57
58 using ThreadPool::WorkQueue<RGWRequest>::_process;
59
60 void _process(RGWRequest *req, ThreadPool::TPHandle &) override;
61
62 void _dump_queue();
63
64 void _clear() override {
65 ceph_assert(process->m_req_queue.empty());
66 }
67
68 CephContext *get_cct() const override { return process->cct; }
69 unsigned get_subsys() const { return ceph_subsys_rgw; }
70 std::ostream& gen_prefix(std::ostream& out) const { return out << "rgw request work queue: ";}
71
72 } req_wq;
73
74 public:
75 RGWProcess(CephContext* const cct,
76 RGWProcessEnv& env,
77 const int num_threads,
78 std::string uri_prefix,
79 RGWFrontendConfig* const conf)
80 : cct(cct), env(env),
81 m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads),
82 req_throttle(cct, "rgw_ops", num_threads * 2),
83 conf(conf),
84 sock_fd(-1),
85 uri_prefix(std::move(uri_prefix)),
86 req_wq(this,
87 ceph::make_timespan(g_conf()->rgw_op_thread_timeout),
88 ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout),
89 &m_tp) {
90 }
91
92 virtual ~RGWProcess() = default;
93
94 const RGWProcessEnv& get_env() const { return env; }
95
96 virtual void run() = 0;
97 virtual void handle_request(const DoutPrefixProvider *dpp, RGWRequest *req) = 0;
98
99 void pause() {
100 m_tp.pause();
101 }
102
103 void unpause_with_new_config() {
104 m_tp.unpause();
105 }
106
107 void close_fd() {
108 if (sock_fd >= 0) {
109 ::close(sock_fd);
110 sock_fd = -1;
111 }
112 }
113 }; /* RGWProcess */
114
115 class RGWProcessControlThread : public Thread {
116 RGWProcess *pprocess;
117 public:
118 explicit RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {}
119
120 void *entry() override {
121 pprocess->run();
122 return NULL;
123 }
124 };
125
126 class RGWLoadGenProcess : public RGWProcess {
127 RGWAccessKey access_key;
128 public:
129 RGWLoadGenProcess(CephContext* cct, RGWProcessEnv& env, int num_threads,
130 std::string uri_prefix, RGWFrontendConfig* _conf)
131 : RGWProcess(cct, env, num_threads, std::move(uri_prefix), _conf) {}
132 void run() override;
133 void checkpoint();
134 void handle_request(const DoutPrefixProvider *dpp, RGWRequest* req) override;
135 void gen_request(const std::string& method, const std::string& resource,
136 int content_length, std::atomic<bool>* fail_flag);
137
138 void set_access_key(RGWAccessKey& key) { access_key = key; }
139 };
140 /* process stream request */
141 extern int process_request(const RGWProcessEnv& penv,
142 RGWRequest* req,
143 const std::string& frontend_prefix,
144 RGWRestfulIO* client_io,
145 optional_yield y,
146 rgw::dmclock::Scheduler *scheduler,
147 std::string* user,
148 ceph::coarse_real_clock::duration* latency,
149 int* http_ret = nullptr);
150
151 extern int rgw_process_authenticated(RGWHandler_REST* handler,
152 RGWOp*& op,
153 RGWRequest* req,
154 req_state* s,
155 optional_yield y,
156 rgw::sal::Driver* driver,
157 bool skip_retarget = false);
158
159 #undef dout_context