]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_log.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rgw / rgw_log.cc
CommitLineData
7c673cae 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
7c673cae
FG
3
4#include "common/Clock.h"
5#include "common/Timer.h"
6#include "common/utf8.h"
7#include "common/OutputDataSocket.h"
8#include "common/Formatter.h"
9
10#include "rgw_bucket.h"
11#include "rgw_log.h"
12#include "rgw_acl.h"
7c673cae
FG
13#include "rgw_client_io.h"
14#include "rgw_rest.h"
11fdf7f2
TL
15#include "rgw_zone.h"
16
17#include "services/svc_zone.h"
7c673cae
FG
18
19#define dout_subsys ceph_subsys_rgw
20
21static void set_param_str(struct req_state *s, const char *name, string& str)
22{
23 const char *p = s->info.env->get(name);
24 if (p)
25 str = p;
26}
27
28string render_log_object_name(const string& format,
f67539c2 29 struct tm *dt, const string& bucket_id,
7c673cae
FG
30 const string& bucket_name)
31{
32 string o;
33 for (unsigned i=0; i<format.size(); i++) {
34 if (format[i] == '%' && i+1 < format.size()) {
35 i++;
36 char buf[32];
37 switch (format[i]) {
38 case '%':
39 strcpy(buf, "%");
40 break;
41 case 'Y':
42 sprintf(buf, "%.4d", dt->tm_year + 1900);
43 break;
44 case 'y':
45 sprintf(buf, "%.2d", dt->tm_year % 100);
46 break;
47 case 'm':
48 sprintf(buf, "%.2d", dt->tm_mon + 1);
49 break;
50 case 'd':
51 sprintf(buf, "%.2d", dt->tm_mday);
52 break;
53 case 'H':
54 sprintf(buf, "%.2d", dt->tm_hour);
55 break;
56 case 'I':
57 sprintf(buf, "%.2d", (dt->tm_hour % 12) + 1);
58 break;
59 case 'k':
60 sprintf(buf, "%d", dt->tm_hour);
61 break;
62 case 'l':
63 sprintf(buf, "%d", (dt->tm_hour % 12) + 1);
64 break;
65 case 'M':
66 sprintf(buf, "%.2d", dt->tm_min);
67 break;
68
69 case 'i':
70 o += bucket_id;
71 continue;
72 case 'n':
73 o += bucket_name;
74 continue;
75 default:
76 // unknown code
77 sprintf(buf, "%%%c", format[i]);
78 break;
79 }
80 o += buf;
81 continue;
82 }
83 o += format[i];
84 }
85 return o;
86}
87
88/* usage logger */
89class UsageLogger {
90 CephContext *cct;
91 RGWRados *store;
92 map<rgw_user_bucket, RGWUsageBatch> usage_map;
9f95a23c 93 ceph::mutex lock = ceph::make_mutex("UsageLogger");
7c673cae 94 int32_t num_entries;
9f95a23c 95 ceph::mutex timer_lock = ceph::make_mutex("UsageLogger::timer_lock");
7c673cae
FG
96 SafeTimer timer;
97 utime_t round_timestamp;
98
99 class C_UsageLogTimeout : public Context {
100 UsageLogger *logger;
101 public:
102 explicit C_UsageLogTimeout(UsageLogger *_l) : logger(_l) {}
103 void finish(int r) override {
104 logger->flush();
105 logger->set_timer();
106 }
107 };
108
109 void set_timer() {
110 timer.add_event_after(cct->_conf->rgw_usage_log_tick_interval, new C_UsageLogTimeout(this));
111 }
112public:
113
9f95a23c 114 UsageLogger(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), num_entries(0), timer(cct, timer_lock) {
7c673cae 115 timer.init();
9f95a23c 116 std::lock_guard l{timer_lock};
7c673cae
FG
117 set_timer();
118 utime_t ts = ceph_clock_now();
119 recalc_round_timestamp(ts);
120 }
121
122 ~UsageLogger() {
9f95a23c 123 std::lock_guard l{timer_lock};
7c673cae
FG
124 flush();
125 timer.cancel_all_events();
126 timer.shutdown();
127 }
128
129 void recalc_round_timestamp(utime_t& ts) {
130 round_timestamp = ts.round_to_hour();
131 }
132
133 void insert_user(utime_t& timestamp, const rgw_user& user, rgw_usage_log_entry& entry) {
9f95a23c 134 lock.lock();
7c673cae
FG
135 if (timestamp.sec() > round_timestamp + 3600)
136 recalc_round_timestamp(timestamp);
137 entry.epoch = round_timestamp.sec();
138 bool account;
139 string u = user.to_str();
140 rgw_user_bucket ub(u, entry.bucket);
141 real_time rt = round_timestamp.to_real_time();
142 usage_map[ub].insert(rt, entry, &account);
143 if (account)
144 num_entries++;
145 bool need_flush = (num_entries > cct->_conf->rgw_usage_log_flush_threshold);
9f95a23c 146 lock.unlock();
7c673cae 147 if (need_flush) {
9f95a23c 148 std::lock_guard l{timer_lock};
7c673cae
FG
149 flush();
150 }
151 }
152
153 void insert(utime_t& timestamp, rgw_usage_log_entry& entry) {
154 if (entry.payer.empty()) {
155 insert_user(timestamp, entry.owner, entry);
156 } else {
157 insert_user(timestamp, entry.payer, entry);
158 }
159 }
160
161 void flush() {
162 map<rgw_user_bucket, RGWUsageBatch> old_map;
9f95a23c 163 lock.lock();
7c673cae
FG
164 old_map.swap(usage_map);
165 num_entries = 0;
9f95a23c 166 lock.unlock();
7c673cae
FG
167
168 store->log_usage(old_map);
169 }
170};
171
172static UsageLogger *usage_logger = NULL;
173
174void rgw_log_usage_init(CephContext *cct, RGWRados *store)
175{
176 usage_logger = new UsageLogger(cct, store);
177}
178
179void rgw_log_usage_finalize()
180{
181 delete usage_logger;
182 usage_logger = NULL;
183}
184
185static void log_usage(struct req_state *s, const string& op_name)
186{
187 if (s->system_request) /* don't log system user operations */
188 return;
189
190 if (!usage_logger)
191 return;
192
193 rgw_user user;
194 rgw_user payer;
195 string bucket_name;
196
197 bucket_name = s->bucket_name;
198
199 if (!bucket_name.empty()) {
f67539c2 200 bucket_name = s->bucket_name;
7c673cae 201 user = s->bucket_owner.get_id();
f67539c2
TL
202 if (!rgw::sal::RGWBucket::empty(s->bucket.get()) &&
203 s->bucket->get_info().requester_pays) {
9f95a23c 204 payer = s->user->get_id();
7c673cae
FG
205 }
206 } else {
9f95a23c 207 user = s->user->get_id();
7c673cae
FG
208 }
209
210 bool error = s->err.is_err();
211 if (error && s->err.http_ret == 404) {
212 bucket_name = "-"; /* bucket not found, use the invalid '-' as bucket name */
213 }
214
215 string u = user.to_str();
216 string p = payer.to_str();
217 rgw_usage_log_entry entry(u, p, bucket_name);
218
219 uint64_t bytes_sent = ACCOUNTING_IO(s)->get_bytes_sent();
220 uint64_t bytes_received = ACCOUNTING_IO(s)->get_bytes_received();
221
222 rgw_usage_data data(bytes_sent, bytes_received);
223
224 data.ops = 1;
31f18b77 225 if (!s->is_err())
7c673cae
FG
226 data.successful_ops = 1;
227
181888fb
FG
228 ldout(s->cct, 30) << "log_usage: bucket_name=" << bucket_name
229 << " tenant=" << s->bucket_tenant
230 << ", bytes_sent=" << bytes_sent << ", bytes_received="
231 << bytes_received << ", success=" << data.successful_ops << dendl;
232
7c673cae
FG
233 entry.add(op_name, data);
234
235 utime_t ts = ceph_clock_now();
236
237 usage_logger->insert(ts, entry);
238}
239
240void rgw_format_ops_log_entry(struct rgw_log_entry& entry, Formatter *formatter)
241{
242 formatter->open_object_section("log_entry");
243 formatter->dump_string("bucket", entry.bucket);
11fdf7f2
TL
244 {
245 auto t = utime_t{entry.time};
246 t.gmtime(formatter->dump_stream("time")); // UTC
247 t.localtime(formatter->dump_stream("time_local"));
248 }
7c673cae
FG
249 formatter->dump_string("remote_addr", entry.remote_addr);
250 string obj_owner = entry.object_owner.to_str();
251 if (obj_owner.length())
252 formatter->dump_string("object_owner", obj_owner);
253 formatter->dump_string("user", entry.user);
254 formatter->dump_string("operation", entry.op);
255 formatter->dump_string("uri", entry.uri);
256 formatter->dump_string("http_status", entry.http_status);
257 formatter->dump_string("error_code", entry.error_code);
258 formatter->dump_int("bytes_sent", entry.bytes_sent);
259 formatter->dump_int("bytes_received", entry.bytes_received);
260 formatter->dump_int("object_size", entry.obj_size);
11fdf7f2
TL
261 {
262 using namespace std::chrono;
263 uint64_t total_time = duration_cast<milliseconds>(entry.total_time).count();
264 formatter->dump_int("total_time", total_time);
265 }
7c673cae
FG
266 formatter->dump_string("user_agent", entry.user_agent);
267 formatter->dump_string("referrer", entry.referrer);
268 if (entry.x_headers.size() > 0) {
269 formatter->open_array_section("http_x_headers");
270 for (const auto& iter: entry.x_headers) {
271 formatter->open_object_section(iter.first.c_str());
272 formatter->dump_string(iter.first.c_str(), iter.second);
273 formatter->close_section();
274 }
275 formatter->close_section();
276 }
9f95a23c 277 formatter->dump_string("trans_id", entry.trans_id);
adb31ebb
TL
278 if (entry.token_claims.size() > 0) {
279 if (entry.token_claims[0] == "sts") {
280 formatter->open_object_section("sts_token_claims");
281 for (const auto& iter: entry.token_claims) {
282 auto pos = iter.find(":");
283 if (pos != string::npos) {
284 formatter->dump_string(iter.substr(0, pos), iter.substr(pos + 1));
285 }
286 }
287 formatter->close_section();
288 }
289 }
290
7c673cae
FG
291 formatter->close_section();
292}
293
294void OpsLogSocket::formatter_to_bl(bufferlist& bl)
295{
296 stringstream ss;
297 formatter->flush(ss);
298 const string& s = ss.str();
299
300 bl.append(s);
301}
302
303void OpsLogSocket::init_connection(bufferlist& bl)
304{
305 bl.append("[");
306}
307
9f95a23c 308OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog)
7c673cae
FG
309{
310 formatter = new JSONFormatter;
311 delim.append(",\n");
312}
313
314OpsLogSocket::~OpsLogSocket()
315{
316 delete formatter;
317}
318
319void OpsLogSocket::log(struct rgw_log_entry& entry)
320{
321 bufferlist bl;
322
9f95a23c 323 lock.lock();
7c673cae
FG
324 rgw_format_ops_log_entry(entry, formatter);
325 formatter_to_bl(bl);
9f95a23c 326 lock.unlock();
7c673cae
FG
327
328 append_output(bl);
329}
330
331int rgw_log_op(RGWRados *store, RGWREST* const rest, struct req_state *s,
332 const string& op_name, OpsLogSocket *olog)
333{
334 struct rgw_log_entry entry;
335 string bucket_id;
336
337 if (s->enable_usage_log)
338 log_usage(s, op_name);
339
340 if (!s->enable_ops_log)
341 return 0;
342
343 if (s->bucket_name.empty()) {
344 ldout(s->cct, 5) << "nothing to log for operation" << dendl;
345 return -EINVAL;
346 }
f67539c2 347 if (s->err.ret == -ERR_NO_SUCH_BUCKET || rgw::sal::RGWBucket::empty(s->bucket.get())) {
7c673cae 348 if (!s->cct->_conf->rgw_log_nonexistent_bucket) {
f67539c2 349 ldout(s->cct, 5) << "bucket " << s->bucket_name << " doesn't exist, not logging" << dendl;
7c673cae
FG
350 return 0;
351 }
352 bucket_id = "";
353 } else {
f67539c2 354 bucket_id = s->bucket->get_bucket_id();
7c673cae 355 }
9f95a23c 356 entry.bucket = rgw_make_bucket_entry_name(s->bucket_tenant, s->bucket_name);
7c673cae 357
c07f9fc5 358 if (check_utf8(entry.bucket.c_str(), entry.bucket.size()) != 0) {
7c673cae
FG
359 ldout(s->cct, 5) << "not logging op on bucket with non-utf8 name" << dendl;
360 return 0;
361 }
362
f67539c2
TL
363 if (!rgw::sal::RGWObject::empty(s->object.get())) {
364 entry.obj = s->object->get_key();
7c673cae
FG
365 } else {
366 entry.obj = rgw_obj_key("-");
367 }
368
369 entry.obj_size = s->obj_size;
370
371 if (s->cct->_conf->rgw_remote_addr_param.length())
372 set_param_str(s, s->cct->_conf->rgw_remote_addr_param.c_str(),
373 entry.remote_addr);
374 else
375 set_param_str(s, "REMOTE_ADDR", entry.remote_addr);
376 set_param_str(s, "HTTP_USER_AGENT", entry.user_agent);
b32b8144
FG
377 // legacy apps are still using misspelling referer, such as curl -e option
378 if (s->info.env->exists("HTTP_REFERRER"))
379 set_param_str(s, "HTTP_REFERRER", entry.referrer);
380 else
381 set_param_str(s, "HTTP_REFERER", entry.referrer);
382
94b18763
FG
383 std::string uri;
384 if (s->info.env->exists("REQUEST_METHOD")) {
385 uri.append(s->info.env->get("REQUEST_METHOD"));
386 uri.append(" ");
b32b8144
FG
387 }
388
94b18763
FG
389 if (s->info.env->exists("REQUEST_URI")) {
390 uri.append(s->info.env->get("REQUEST_URI"));
391 }
392
393 if (s->info.env->exists("QUERY_STRING")) {
394 const char* qs = s->info.env->get("QUERY_STRING");
395 if(qs && (*qs != '\0')) {
396 uri.append("?");
397 uri.append(qs);
398 }
399 }
400
401 if (s->info.env->exists("HTTP_VERSION")) {
402 uri.append(" ");
403 uri.append("HTTP/");
404 uri.append(s->info.env->get("HTTP_VERSION"));
405 }
b32b8144
FG
406
407 entry.uri = std::move(uri);
408
92f5a8d4 409 entry.op = op_name;
7c673cae 410
adb31ebb
TL
411 if (! s->token_claims.empty()) {
412 entry.token_claims = std::move(s->token_claims);
413 }
414
7c673cae
FG
415 /* custom header logging */
416 if (rest) {
417 if (rest->log_x_headers()) {
418 for (const auto& iter : s->info.env->get_map()) {
419 if (rest->log_x_header(iter.first)) {
420 entry.x_headers.insert(
421 rgw_log_entry::headers_map::value_type(iter.first, iter.second));
422 }
423 }
424 }
425 }
426
9f95a23c 427 entry.user = s->user->get_id().to_str();
7c673cae
FG
428 if (s->object_acl)
429 entry.object_owner = s->object_acl->get_owner().get_id();
430 entry.bucket_owner = s->bucket_owner.get_id();
431
432 uint64_t bytes_sent = ACCOUNTING_IO(s)->get_bytes_sent();
433 uint64_t bytes_received = ACCOUNTING_IO(s)->get_bytes_received();
434
435 entry.time = s->time;
11fdf7f2 436 entry.total_time = s->time_elapsed();
7c673cae
FG
437 entry.bytes_sent = bytes_sent;
438 entry.bytes_received = bytes_received;
439 if (s->err.http_ret) {
440 char buf[16];
441 snprintf(buf, sizeof(buf), "%d", s->err.http_ret);
442 entry.http_status = buf;
443 } else
444 entry.http_status = "200"; // default
445
31f18b77 446 entry.error_code = s->err.err_code;
7c673cae 447 entry.bucket_id = bucket_id;
9f95a23c 448 entry.trans_id = s->trans_id;
7c673cae
FG
449
450 bufferlist bl;
11fdf7f2 451 encode(entry, bl);
7c673cae
FG
452
453 struct tm bdt;
11fdf7f2 454 time_t t = req_state::Clock::to_time_t(entry.time);
7c673cae
FG
455 if (s->cct->_conf->rgw_log_object_name_utc)
456 gmtime_r(&t, &bdt);
457 else
458 localtime_r(&t, &bdt);
459
460 int ret = 0;
461
462 if (s->cct->_conf->rgw_ops_log_rados) {
463 string oid = render_log_object_name(s->cct->_conf->rgw_log_object_name, &bdt,
f67539c2 464 entry.bucket_id, entry.bucket);
7c673cae 465
11fdf7f2 466 rgw_raw_obj obj(store->svc.zone->get_zone_params().log_pool, oid);
7c673cae
FG
467
468 ret = store->append_async(obj, bl.length(), bl);
469 if (ret == -ENOENT) {
11fdf7f2 470 ret = store->create_pool(store->svc.zone->get_zone_params().log_pool);
7c673cae
FG
471 if (ret < 0)
472 goto done;
473 // retry
474 ret = store->append_async(obj, bl.length(), bl);
475 }
476 }
477
478 if (olog) {
479 olog->log(entry);
480 }
481done:
482 if (ret < 0)
483 ldout(s->cct, 0) << "ERROR: failed to log entry" << dendl;
484
485 return ret;
486}
487