]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_process.h
bump version to 18.2.4-pve3
[ceph.git] / ceph / src / rgw / rgw_process.h
CommitLineData
7c673cae 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
7c673cae 3
1e59de90 4#pragma once
7c673cae
FG
5
6#include "rgw_common.h"
7c673cae 7#include "rgw_acl.h"
7c673cae 8#include "rgw_user.h"
7c673cae 9#include "rgw_rest.h"
11fdf7f2 10#include "include/ceph_assert.h"
7c673cae
FG
11
12#include "common/WorkQueue.h"
13#include "common/Throttle.h"
14
15#include <atomic>
16
7c673cae
FG
17#define dout_context g_ceph_context
18
7c673cae 19
11fdf7f2
TL
20namespace rgw::dmclock {
21 class Scheduler;
22}
23
1e59de90 24struct RGWProcessEnv;
7c673cae 25class RGWFrontendConfig;
f67539c2 26class RGWRequest;
7c673cae
FG
27
28class RGWProcess {
20effc67 29 std::deque<RGWRequest*> m_req_queue;
7c673cae
FG
30protected:
31 CephContext *cct;
1e59de90 32 RGWProcessEnv& env;
7c673cae
FG
33 ThreadPool m_tp;
34 Throttle req_throttle;
7c673cae
FG
35 RGWFrontendConfig* conf;
36 int sock_fd;
37 std::string uri_prefix;
38
b3b6e05e 39 struct RGWWQ : public DoutPrefixProvider, public ThreadPool::WorkQueue<RGWRequest> {
7c673cae 40 RGWProcess* process;
f67539c2
TL
41 RGWWQ(RGWProcess* p, ceph::timespan timeout, ceph::timespan suicide_timeout,
42 ThreadPool* tp)
7c673cae
FG
43 : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout,
44 tp), process(p) {}
45
11fdf7f2 46 bool _enqueue(RGWRequest* req) override;
7c673cae
FG
47
48 void _dequeue(RGWRequest* req) override {
49 ceph_abort();
50 }
51
52 bool _empty() override {
53 return process->m_req_queue.empty();
54 }
55
11fdf7f2 56 RGWRequest* _dequeue() override;
7c673cae
FG
57
58 using ThreadPool::WorkQueue<RGWRequest>::_process;
59
11fdf7f2 60 void _process(RGWRequest *req, ThreadPool::TPHandle &) override;
7c673cae
FG
61
62 void _dump_queue();
63
64 void _clear() override {
11fdf7f2 65 ceph_assert(process->m_req_queue.empty());
7c673cae 66 }
b3b6e05e
TL
67
68 CephContext *get_cct() const override { return process->cct; }
69 unsigned get_subsys() const { return ceph_subsys_rgw; }
70 std::ostream& gen_prefix(std::ostream& out) const { return out << "rgw request work queue: ";}
71
7c673cae
FG
72 } req_wq;
73
74public:
75 RGWProcess(CephContext* const cct,
1e59de90 76 RGWProcessEnv& env,
7c673cae 77 const int num_threads,
1e59de90 78 std::string uri_prefix,
7c673cae 79 RGWFrontendConfig* const conf)
1e59de90 80 : cct(cct), env(env),
7c673cae
FG
81 m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads),
82 req_throttle(cct, "rgw_ops", num_threads * 2),
7c673cae
FG
83 conf(conf),
84 sock_fd(-1),
1e59de90 85 uri_prefix(std::move(uri_prefix)),
f67539c2
TL
86 req_wq(this,
87 ceph::make_timespan(g_conf()->rgw_op_thread_timeout),
88 ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout),
89 &m_tp) {
7c673cae 90 }
1e59de90 91
7c673cae
FG
92 virtual ~RGWProcess() = default;
93
1e59de90
TL
94 const RGWProcessEnv& get_env() const { return env; }
95
7c673cae 96 virtual void run() = 0;
b3b6e05e 97 virtual void handle_request(const DoutPrefixProvider *dpp, RGWRequest *req) = 0;
7c673cae
FG
98
99 void pause() {
100 m_tp.pause();
101 }
102
1e59de90 103 void unpause_with_new_config() {
7c673cae
FG
104 m_tp.unpause();
105 }
106
107 void close_fd() {
108 if (sock_fd >= 0) {
109 ::close(sock_fd);
110 sock_fd = -1;
111 }
112 }
113}; /* RGWProcess */
114
7c673cae
FG
115class RGWProcessControlThread : public Thread {
116 RGWProcess *pprocess;
117public:
11fdf7f2 118 explicit RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {}
7c673cae
FG
119
120 void *entry() override {
121 pprocess->run();
122 return NULL;
123 }
124};
125
126class RGWLoadGenProcess : public RGWProcess {
127 RGWAccessKey access_key;
128public:
1e59de90
TL
129 RGWLoadGenProcess(CephContext* cct, RGWProcessEnv& env, int num_threads,
130 std::string uri_prefix, RGWFrontendConfig* _conf)
131 : RGWProcess(cct, env, num_threads, std::move(uri_prefix), _conf) {}
7c673cae
FG
132 void run() override;
133 void checkpoint();
b3b6e05e 134 void handle_request(const DoutPrefixProvider *dpp, RGWRequest* req) override;
20effc67 135 void gen_request(const std::string& method, const std::string& resource,
31f18b77 136 int content_length, std::atomic<bool>* fail_flag);
7c673cae
FG
137
138 void set_access_key(RGWAccessKey& key) { access_key = key; }
139};
7c673cae 140/* process stream request */
1e59de90 141extern int process_request(const RGWProcessEnv& penv,
7c673cae
FG
142 RGWRequest* req,
143 const std::string& frontend_prefix,
7c673cae 144 RGWRestfulIO* client_io,
11fdf7f2
TL
145 optional_yield y,
146 rgw::dmclock::Scheduler *scheduler,
f67539c2
TL
147 std::string* user,
148 ceph::coarse_real_clock::duration* latency,
94b18763 149 int* http_ret = nullptr);
7c673cae
FG
150
151extern int rgw_process_authenticated(RGWHandler_REST* handler,
152 RGWOp*& op,
153 RGWRequest* req,
154 req_state* s,
20effc67 155 optional_yield y,
1e59de90 156 rgw::sal::Driver* driver,
7c673cae
FG
157 bool skip_retarget = false);
158
7c673cae 159#undef dout_context