]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_log.cc
bump version to 16.2.6-pve2
[ceph.git] / ceph / src / rgw / rgw_log.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "common/Clock.h"
5 #include "common/Timer.h"
6 #include "common/utf8.h"
7 #include "common/OutputDataSocket.h"
8 #include "common/Formatter.h"
9
10 #include "rgw_bucket.h"
11 #include "rgw_log.h"
12 #include "rgw_acl.h"
13 #include "rgw_client_io.h"
14 #include "rgw_rest.h"
15 #include "rgw_zone.h"
16
17 #include "services/svc_zone.h"
18
19 #define dout_subsys ceph_subsys_rgw
20
21 static void set_param_str(struct req_state *s, const char *name, string& str)
22 {
23 const char *p = s->info.env->get(name);
24 if (p)
25 str = p;
26 }
27
28 string render_log_object_name(const string& format,
29 struct tm *dt, const string& bucket_id,
30 const string& bucket_name)
31 {
32 string o;
33 for (unsigned i=0; i<format.size(); i++) {
34 if (format[i] == '%' && i+1 < format.size()) {
35 i++;
36 char buf[32];
37 switch (format[i]) {
38 case '%':
39 strcpy(buf, "%");
40 break;
41 case 'Y':
42 sprintf(buf, "%.4d", dt->tm_year + 1900);
43 break;
44 case 'y':
45 sprintf(buf, "%.2d", dt->tm_year % 100);
46 break;
47 case 'm':
48 sprintf(buf, "%.2d", dt->tm_mon + 1);
49 break;
50 case 'd':
51 sprintf(buf, "%.2d", dt->tm_mday);
52 break;
53 case 'H':
54 sprintf(buf, "%.2d", dt->tm_hour);
55 break;
56 case 'I':
57 sprintf(buf, "%.2d", (dt->tm_hour % 12) + 1);
58 break;
59 case 'k':
60 sprintf(buf, "%d", dt->tm_hour);
61 break;
62 case 'l':
63 sprintf(buf, "%d", (dt->tm_hour % 12) + 1);
64 break;
65 case 'M':
66 sprintf(buf, "%.2d", dt->tm_min);
67 break;
68
69 case 'i':
70 o += bucket_id;
71 continue;
72 case 'n':
73 o += bucket_name;
74 continue;
75 default:
76 // unknown code
77 sprintf(buf, "%%%c", format[i]);
78 break;
79 }
80 o += buf;
81 continue;
82 }
83 o += format[i];
84 }
85 return o;
86 }
87
88 /* usage logger */
89 class UsageLogger : public DoutPrefixProvider {
90 CephContext *cct;
91 RGWRados *store;
92 map<rgw_user_bucket, RGWUsageBatch> usage_map;
93 ceph::mutex lock = ceph::make_mutex("UsageLogger");
94 int32_t num_entries;
95 ceph::mutex timer_lock = ceph::make_mutex("UsageLogger::timer_lock");
96 SafeTimer timer;
97 utime_t round_timestamp;
98
99 class C_UsageLogTimeout : public Context {
100 UsageLogger *logger;
101 public:
102 explicit C_UsageLogTimeout(UsageLogger *_l) : logger(_l) {}
103 void finish(int r) override {
104 logger->flush();
105 logger->set_timer();
106 }
107 };
108
109 void set_timer() {
110 timer.add_event_after(cct->_conf->rgw_usage_log_tick_interval, new C_UsageLogTimeout(this));
111 }
112 public:
113
114 UsageLogger(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), num_entries(0), timer(cct, timer_lock) {
115 timer.init();
116 std::lock_guard l{timer_lock};
117 set_timer();
118 utime_t ts = ceph_clock_now();
119 recalc_round_timestamp(ts);
120 }
121
122 ~UsageLogger() {
123 std::lock_guard l{timer_lock};
124 flush();
125 timer.cancel_all_events();
126 timer.shutdown();
127 }
128
129 void recalc_round_timestamp(utime_t& ts) {
130 round_timestamp = ts.round_to_hour();
131 }
132
133 void insert_user(utime_t& timestamp, const rgw_user& user, rgw_usage_log_entry& entry) {
134 lock.lock();
135 if (timestamp.sec() > round_timestamp + 3600)
136 recalc_round_timestamp(timestamp);
137 entry.epoch = round_timestamp.sec();
138 bool account;
139 string u = user.to_str();
140 rgw_user_bucket ub(u, entry.bucket);
141 real_time rt = round_timestamp.to_real_time();
142 usage_map[ub].insert(rt, entry, &account);
143 if (account)
144 num_entries++;
145 bool need_flush = (num_entries > cct->_conf->rgw_usage_log_flush_threshold);
146 lock.unlock();
147 if (need_flush) {
148 std::lock_guard l{timer_lock};
149 flush();
150 }
151 }
152
153 void insert(utime_t& timestamp, rgw_usage_log_entry& entry) {
154 if (entry.payer.empty()) {
155 insert_user(timestamp, entry.owner, entry);
156 } else {
157 insert_user(timestamp, entry.payer, entry);
158 }
159 }
160
161 void flush() {
162 map<rgw_user_bucket, RGWUsageBatch> old_map;
163 lock.lock();
164 old_map.swap(usage_map);
165 num_entries = 0;
166 lock.unlock();
167
168 store->log_usage(this, old_map);
169 }
170
171 CephContext *get_cct() const override { return cct; }
172 unsigned get_subsys() const override { return dout_subsys; }
173 std::ostream& gen_prefix(std::ostream& out) const override { return out << "rgw UsageLogger: "; }
174 };
175
176 static UsageLogger *usage_logger = NULL;
177
178 void rgw_log_usage_init(CephContext *cct, RGWRados *store)
179 {
180 usage_logger = new UsageLogger(cct, store);
181 }
182
183 void rgw_log_usage_finalize()
184 {
185 delete usage_logger;
186 usage_logger = NULL;
187 }
188
189 static void log_usage(struct req_state *s, const string& op_name)
190 {
191 if (s->system_request) /* don't log system user operations */
192 return;
193
194 if (!usage_logger)
195 return;
196
197 rgw_user user;
198 rgw_user payer;
199 string bucket_name;
200
201 bucket_name = s->bucket_name;
202
203 if (!bucket_name.empty()) {
204 bucket_name = s->bucket_name;
205 user = s->bucket_owner.get_id();
206 if (!rgw::sal::RGWBucket::empty(s->bucket.get()) &&
207 s->bucket->get_info().requester_pays) {
208 payer = s->user->get_id();
209 }
210 } else {
211 user = s->user->get_id();
212 }
213
214 bool error = s->err.is_err();
215 if (error && s->err.http_ret == 404) {
216 bucket_name = "-"; /* bucket not found, use the invalid '-' as bucket name */
217 }
218
219 string u = user.to_str();
220 string p = payer.to_str();
221 rgw_usage_log_entry entry(u, p, bucket_name);
222
223 uint64_t bytes_sent = ACCOUNTING_IO(s)->get_bytes_sent();
224 uint64_t bytes_received = ACCOUNTING_IO(s)->get_bytes_received();
225
226 rgw_usage_data data(bytes_sent, bytes_received);
227
228 data.ops = 1;
229 if (!s->is_err())
230 data.successful_ops = 1;
231
232 ldpp_dout(s, 30) << "log_usage: bucket_name=" << bucket_name
233 << " tenant=" << s->bucket_tenant
234 << ", bytes_sent=" << bytes_sent << ", bytes_received="
235 << bytes_received << ", success=" << data.successful_ops << dendl;
236
237 entry.add(op_name, data);
238
239 utime_t ts = ceph_clock_now();
240
241 usage_logger->insert(ts, entry);
242 }
243
244 void rgw_format_ops_log_entry(struct rgw_log_entry& entry, Formatter *formatter)
245 {
246 formatter->open_object_section("log_entry");
247 formatter->dump_string("bucket", entry.bucket);
248 {
249 auto t = utime_t{entry.time};
250 t.gmtime(formatter->dump_stream("time")); // UTC
251 t.localtime(formatter->dump_stream("time_local"));
252 }
253 formatter->dump_string("remote_addr", entry.remote_addr);
254 string obj_owner = entry.object_owner.to_str();
255 if (obj_owner.length())
256 formatter->dump_string("object_owner", obj_owner);
257 formatter->dump_string("user", entry.user);
258 formatter->dump_string("operation", entry.op);
259 formatter->dump_string("uri", entry.uri);
260 formatter->dump_string("http_status", entry.http_status);
261 formatter->dump_string("error_code", entry.error_code);
262 formatter->dump_int("bytes_sent", entry.bytes_sent);
263 formatter->dump_int("bytes_received", entry.bytes_received);
264 formatter->dump_int("object_size", entry.obj_size);
265 {
266 using namespace std::chrono;
267 uint64_t total_time = duration_cast<milliseconds>(entry.total_time).count();
268 formatter->dump_int("total_time", total_time);
269 }
270 formatter->dump_string("user_agent", entry.user_agent);
271 formatter->dump_string("referrer", entry.referrer);
272 if (entry.x_headers.size() > 0) {
273 formatter->open_array_section("http_x_headers");
274 for (const auto& iter: entry.x_headers) {
275 formatter->open_object_section(iter.first.c_str());
276 formatter->dump_string(iter.first.c_str(), iter.second);
277 formatter->close_section();
278 }
279 formatter->close_section();
280 }
281 formatter->dump_string("trans_id", entry.trans_id);
282 if (entry.token_claims.size() > 0) {
283 if (entry.token_claims[0] == "sts") {
284 formatter->open_object_section("sts_token_claims");
285 for (const auto& iter: entry.token_claims) {
286 auto pos = iter.find(":");
287 if (pos != string::npos) {
288 formatter->dump_string(iter.substr(0, pos), iter.substr(pos + 1));
289 }
290 }
291 formatter->close_section();
292 }
293 }
294
295 formatter->close_section();
296 }
297
298 void OpsLogSocket::formatter_to_bl(bufferlist& bl)
299 {
300 stringstream ss;
301 formatter->flush(ss);
302 const string& s = ss.str();
303
304 bl.append(s);
305 }
306
307 void OpsLogSocket::init_connection(bufferlist& bl)
308 {
309 bl.append("[");
310 }
311
312 OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog)
313 {
314 formatter = new JSONFormatter;
315 delim.append(",\n");
316 }
317
318 OpsLogSocket::~OpsLogSocket()
319 {
320 delete formatter;
321 }
322
323 void OpsLogSocket::log(struct rgw_log_entry& entry)
324 {
325 bufferlist bl;
326
327 lock.lock();
328 rgw_format_ops_log_entry(entry, formatter);
329 formatter_to_bl(bl);
330 lock.unlock();
331
332 append_output(bl);
333 }
334
335 int rgw_log_op(RGWRados *store, RGWREST* const rest, struct req_state *s,
336 const string& op_name, OpsLogSocket *olog)
337 {
338 struct rgw_log_entry entry;
339 string bucket_id;
340
341 if (s->enable_usage_log)
342 log_usage(s, op_name);
343
344 if (!s->enable_ops_log)
345 return 0;
346
347 if (s->bucket_name.empty()) {
348 ldpp_dout(s, 5) << "nothing to log for operation" << dendl;
349 return -EINVAL;
350 }
351 if (s->err.ret == -ERR_NO_SUCH_BUCKET || rgw::sal::RGWBucket::empty(s->bucket.get())) {
352 if (!s->cct->_conf->rgw_log_nonexistent_bucket) {
353 ldpp_dout(s, 5) << "bucket " << s->bucket_name << " doesn't exist, not logging" << dendl;
354 return 0;
355 }
356 bucket_id = "";
357 } else {
358 bucket_id = s->bucket->get_bucket_id();
359 }
360 entry.bucket = rgw_make_bucket_entry_name(s->bucket_tenant, s->bucket_name);
361
362 if (check_utf8(entry.bucket.c_str(), entry.bucket.size()) != 0) {
363 ldpp_dout(s, 5) << "not logging op on bucket with non-utf8 name" << dendl;
364 return 0;
365 }
366
367 if (!rgw::sal::RGWObject::empty(s->object.get())) {
368 entry.obj = s->object->get_key();
369 } else {
370 entry.obj = rgw_obj_key("-");
371 }
372
373 entry.obj_size = s->obj_size;
374
375 if (s->cct->_conf->rgw_remote_addr_param.length())
376 set_param_str(s, s->cct->_conf->rgw_remote_addr_param.c_str(),
377 entry.remote_addr);
378 else
379 set_param_str(s, "REMOTE_ADDR", entry.remote_addr);
380 set_param_str(s, "HTTP_USER_AGENT", entry.user_agent);
381 // legacy apps are still using misspelling referer, such as curl -e option
382 if (s->info.env->exists("HTTP_REFERRER"))
383 set_param_str(s, "HTTP_REFERRER", entry.referrer);
384 else
385 set_param_str(s, "HTTP_REFERER", entry.referrer);
386
387 std::string uri;
388 if (s->info.env->exists("REQUEST_METHOD")) {
389 uri.append(s->info.env->get("REQUEST_METHOD"));
390 uri.append(" ");
391 }
392
393 if (s->info.env->exists("REQUEST_URI")) {
394 uri.append(s->info.env->get("REQUEST_URI"));
395 }
396
397 if (s->info.env->exists("QUERY_STRING")) {
398 const char* qs = s->info.env->get("QUERY_STRING");
399 if(qs && (*qs != '\0')) {
400 uri.append("?");
401 uri.append(qs);
402 }
403 }
404
405 if (s->info.env->exists("HTTP_VERSION")) {
406 uri.append(" ");
407 uri.append("HTTP/");
408 uri.append(s->info.env->get("HTTP_VERSION"));
409 }
410
411 entry.uri = std::move(uri);
412
413 entry.op = op_name;
414
415 if (! s->token_claims.empty()) {
416 entry.token_claims = std::move(s->token_claims);
417 }
418
419 /* custom header logging */
420 if (rest) {
421 if (rest->log_x_headers()) {
422 for (const auto& iter : s->info.env->get_map()) {
423 if (rest->log_x_header(iter.first)) {
424 entry.x_headers.insert(
425 rgw_log_entry::headers_map::value_type(iter.first, iter.second));
426 }
427 }
428 }
429 }
430
431 entry.user = s->user->get_id().to_str();
432 if (s->object_acl)
433 entry.object_owner = s->object_acl->get_owner().get_id();
434 entry.bucket_owner = s->bucket_owner.get_id();
435
436 uint64_t bytes_sent = ACCOUNTING_IO(s)->get_bytes_sent();
437 uint64_t bytes_received = ACCOUNTING_IO(s)->get_bytes_received();
438
439 entry.time = s->time;
440 entry.total_time = s->time_elapsed();
441 entry.bytes_sent = bytes_sent;
442 entry.bytes_received = bytes_received;
443 if (s->err.http_ret) {
444 char buf[16];
445 snprintf(buf, sizeof(buf), "%d", s->err.http_ret);
446 entry.http_status = buf;
447 } else
448 entry.http_status = "200"; // default
449
450 entry.error_code = s->err.err_code;
451 entry.bucket_id = bucket_id;
452 entry.trans_id = s->trans_id;
453
454 bufferlist bl;
455 encode(entry, bl);
456
457 struct tm bdt;
458 time_t t = req_state::Clock::to_time_t(entry.time);
459 if (s->cct->_conf->rgw_log_object_name_utc)
460 gmtime_r(&t, &bdt);
461 else
462 localtime_r(&t, &bdt);
463
464 int ret = 0;
465
466 if (s->cct->_conf->rgw_ops_log_rados) {
467 string oid = render_log_object_name(s->cct->_conf->rgw_log_object_name, &bdt,
468 entry.bucket_id, entry.bucket);
469
470 rgw_raw_obj obj(store->svc.zone->get_zone_params().log_pool, oid);
471
472 ret = store->append_async(s, obj, bl.length(), bl);
473 if (ret == -ENOENT) {
474 ret = store->create_pool(s, store->svc.zone->get_zone_params().log_pool);
475 if (ret < 0)
476 goto done;
477 // retry
478 ret = store->append_async(s, obj, bl.length(), bl);
479 }
480 }
481
482 if (olog) {
483 olog->log(entry);
484 }
485 done:
486 if (ret < 0)
487 ldpp_dout(s, 0) << "ERROR: failed to log entry" << dendl;
488
489 return ret;
490 }
491