]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_loadgen_process.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_loadgen_process.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "common/errno.h"
5 #include "common/Throttle.h"
6 #include "common/WorkQueue.h"
7
8 #include "rgw_rest.h"
9 #include "rgw_frontend.h"
10 #include "rgw_request.h"
11 #include "rgw_process.h"
12 #include "rgw_loadgen.h"
13 #include "rgw_client_io.h"
14
15 #include <atomic>
16
17 #define dout_subsys ceph_subsys_rgw
18
19 using namespace std;
20
21 extern void signal_shutdown();
22
23 void RGWLoadGenProcess::checkpoint()
24 {
25 m_tp.drain(&req_wq);
26 }
27
28 void RGWLoadGenProcess::run()
29 {
30 m_tp.start(); /* start thread pool */
31
32 int i;
33
34 int num_objs;
35
36 conf->get_val("num_objs", 1000, &num_objs);
37
38 int num_buckets;
39 conf->get_val("num_buckets", 1, &num_buckets);
40
41 vector<string> buckets(num_buckets);
42
43 std::atomic<bool> failed = { false };
44
45 for (i = 0; i < num_buckets; i++) {
46 buckets[i] = "/loadgen";
47 string& bucket = buckets[i];
48 append_rand_alpha(cct, bucket, bucket, 16);
49
50 /* first create a bucket */
51 gen_request("PUT", bucket, 0, &failed);
52 checkpoint();
53 }
54
55 string *objs = new string[num_objs];
56
57 if (failed) {
58 derr << "ERROR: bucket creation failed" << dendl;
59 goto done;
60 }
61
62 for (i = 0; i < num_objs; i++) {
63 char buf[16 + 1];
64 gen_rand_alphanumeric(cct, buf, sizeof(buf));
65 buf[16] = '\0';
66 objs[i] = buckets[i % num_buckets] + "/" + buf;
67 }
68
69 for (i = 0; i < num_objs; i++) {
70 gen_request("PUT", objs[i], 4096, &failed);
71 }
72
73 checkpoint();
74
75 if (failed) {
76 derr << "ERROR: bucket creation failed" << dendl;
77 goto done;
78 }
79
80 for (i = 0; i < num_objs; i++) {
81 gen_request("GET", objs[i], 4096, NULL);
82 }
83
84 checkpoint();
85
86 for (i = 0; i < num_objs; i++) {
87 gen_request("DELETE", objs[i], 0, NULL);
88 }
89
90 checkpoint();
91
92 for (i = 0; i < num_buckets; i++) {
93 gen_request("DELETE", buckets[i], 0, NULL);
94 }
95
96 done:
97 checkpoint();
98
99 m_tp.stop();
100
101 delete[] objs;
102
103 signal_shutdown();
104 } /* RGWLoadGenProcess::run() */
105
106 void RGWLoadGenProcess::gen_request(const string& method,
107 const string& resource,
108 int content_length, std::atomic<bool>* fail_flag)
109 {
110 RGWLoadGenRequest* req =
111 new RGWLoadGenRequest(store->get_new_req_id(), method, resource,
112 content_length, fail_flag);
113 dout(10) << "allocated request req=" << hex << req << dec << dendl;
114 req_throttle.get(1);
115 req_wq.queue(req);
116 } /* RGWLoadGenProcess::gen_request */
117
118 void RGWLoadGenProcess::handle_request(const DoutPrefixProvider *dpp, RGWRequest* r)
119 {
120 RGWLoadGenRequest* req = static_cast<RGWLoadGenRequest*>(r);
121
122 RGWLoadGenRequestEnv env;
123
124 utime_t tm = ceph_clock_now();
125
126 env.port = 80;
127 env.content_length = req->content_length;
128 env.content_type = "binary/octet-stream";
129 env.request_method = req->method;
130 env.uri = req->resource;
131 env.set_date(tm);
132 env.sign(dpp, access_key);
133
134 RGWLoadGenIO real_client_io(&env);
135 RGWRestfulIO client_io(cct, &real_client_io);
136 ActiveRateLimiter ratelimit(cct);
137 int ret = process_request(store, rest, req, uri_prefix,
138 *auth_registry, &client_io, olog,
139 null_yield, nullptr, nullptr, nullptr,
140 ratelimit.get_active());
141 if (ret < 0) {
142 /* we don't really care about return code */
143 dout(20) << "process_request() returned " << ret << dendl;
144
145 if (req->fail_flag) {
146 req->fail_flag++;
147 }
148 }
149
150 delete req;
151 } /* RGWLoadGenProcess::handle_request */