]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_log.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_log.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "common/Clock.h"
5 #include "common/Timer.h"
6 #include "common/utf8.h"
7 #include "common/OutputDataSocket.h"
8 #include "common/Formatter.h"
9
10 #include "rgw_bucket.h"
11 #include "rgw_log.h"
12 #include "rgw_acl.h"
13 #include "rgw_client_io.h"
14 #include "rgw_rest.h"
15 #include "rgw_zone.h"
16 #include "rgw_rados.h"
17
18 #include "services/svc_zone.h"
19
20 #include <chrono>
21 #include <math.h>
22
23 #define dout_subsys ceph_subsys_rgw
24
25 using namespace std;
26
27 static void set_param_str(struct req_state *s, const char *name, string& str)
28 {
29 const char *p = s->info.env->get(name);
30 if (p)
31 str = p;
32 }
33
34 string render_log_object_name(const string& format,
35 struct tm *dt, const string& bucket_id,
36 const string& bucket_name)
37 {
38 string o;
39 for (unsigned i=0; i<format.size(); i++) {
40 if (format[i] == '%' && i+1 < format.size()) {
41 i++;
42 char buf[32];
43 switch (format[i]) {
44 case '%':
45 strcpy(buf, "%");
46 break;
47 case 'Y':
48 sprintf(buf, "%.4d", dt->tm_year + 1900);
49 break;
50 case 'y':
51 sprintf(buf, "%.2d", dt->tm_year % 100);
52 break;
53 case 'm':
54 sprintf(buf, "%.2d", dt->tm_mon + 1);
55 break;
56 case 'd':
57 sprintf(buf, "%.2d", dt->tm_mday);
58 break;
59 case 'H':
60 sprintf(buf, "%.2d", dt->tm_hour);
61 break;
62 case 'I':
63 sprintf(buf, "%.2d", (dt->tm_hour % 12) + 1);
64 break;
65 case 'k':
66 sprintf(buf, "%d", dt->tm_hour);
67 break;
68 case 'l':
69 sprintf(buf, "%d", (dt->tm_hour % 12) + 1);
70 break;
71 case 'M':
72 sprintf(buf, "%.2d", dt->tm_min);
73 break;
74
75 case 'i':
76 o += bucket_id;
77 continue;
78 case 'n':
79 o += bucket_name;
80 continue;
81 default:
82 // unknown code
83 sprintf(buf, "%%%c", format[i]);
84 break;
85 }
86 o += buf;
87 continue;
88 }
89 o += format[i];
90 }
91 return o;
92 }
93
94 /* usage logger */
95 class UsageLogger : public DoutPrefixProvider {
96 CephContext *cct;
97 rgw::sal::Store* store;
98 map<rgw_user_bucket, RGWUsageBatch> usage_map;
99 ceph::mutex lock = ceph::make_mutex("UsageLogger");
100 int32_t num_entries;
101 ceph::mutex timer_lock = ceph::make_mutex("UsageLogger::timer_lock");
102 SafeTimer timer;
103 utime_t round_timestamp;
104
105 class C_UsageLogTimeout : public Context {
106 UsageLogger *logger;
107 public:
108 explicit C_UsageLogTimeout(UsageLogger *_l) : logger(_l) {}
109 void finish(int r) override {
110 logger->flush();
111 logger->set_timer();
112 }
113 };
114
115 void set_timer() {
116 timer.add_event_after(cct->_conf->rgw_usage_log_tick_interval, new C_UsageLogTimeout(this));
117 }
118 public:
119
120 UsageLogger(CephContext *_cct, rgw::sal::Store* _store) : cct(_cct), store(_store), num_entries(0), timer(cct, timer_lock) {
121 timer.init();
122 std::lock_guard l{timer_lock};
123 set_timer();
124 utime_t ts = ceph_clock_now();
125 recalc_round_timestamp(ts);
126 }
127
128 ~UsageLogger() {
129 std::lock_guard l{timer_lock};
130 flush();
131 timer.cancel_all_events();
132 timer.shutdown();
133 }
134
135 void recalc_round_timestamp(utime_t& ts) {
136 round_timestamp = ts.round_to_hour();
137 }
138
139 void insert_user(utime_t& timestamp, const rgw_user& user, rgw_usage_log_entry& entry) {
140 lock.lock();
141 if (timestamp.sec() > round_timestamp + 3600)
142 recalc_round_timestamp(timestamp);
143 entry.epoch = round_timestamp.sec();
144 bool account;
145 string u = user.to_str();
146 rgw_user_bucket ub(u, entry.bucket);
147 real_time rt = round_timestamp.to_real_time();
148 usage_map[ub].insert(rt, entry, &account);
149 if (account)
150 num_entries++;
151 bool need_flush = (num_entries > cct->_conf->rgw_usage_log_flush_threshold);
152 lock.unlock();
153 if (need_flush) {
154 std::lock_guard l{timer_lock};
155 flush();
156 }
157 }
158
159 void insert(utime_t& timestamp, rgw_usage_log_entry& entry) {
160 if (entry.payer.empty()) {
161 insert_user(timestamp, entry.owner, entry);
162 } else {
163 insert_user(timestamp, entry.payer, entry);
164 }
165 }
166
167 void flush() {
168 map<rgw_user_bucket, RGWUsageBatch> old_map;
169 lock.lock();
170 old_map.swap(usage_map);
171 num_entries = 0;
172 lock.unlock();
173
174 store->log_usage(this, old_map);
175 }
176
177 CephContext *get_cct() const override { return cct; }
178 unsigned get_subsys() const override { return dout_subsys; }
179 std::ostream& gen_prefix(std::ostream& out) const override { return out << "rgw UsageLogger: "; }
180 };
181
182 static UsageLogger *usage_logger = NULL;
183
184 void rgw_log_usage_init(CephContext *cct, rgw::sal::Store* store)
185 {
186 usage_logger = new UsageLogger(cct, store);
187 }
188
189 void rgw_log_usage_finalize()
190 {
191 delete usage_logger;
192 usage_logger = NULL;
193 }
194
195 static void log_usage(struct req_state *s, const string& op_name)
196 {
197 if (s->system_request) /* don't log system user operations */
198 return;
199
200 if (!usage_logger)
201 return;
202
203 rgw_user user;
204 rgw_user payer;
205 string bucket_name;
206
207 bucket_name = s->bucket_name;
208
209 if (!bucket_name.empty()) {
210 bucket_name = s->bucket_name;
211 user = s->bucket_owner.get_id();
212 if (!rgw::sal::Bucket::empty(s->bucket.get()) &&
213 s->bucket->get_info().requester_pays) {
214 payer = s->user->get_id();
215 }
216 } else {
217 user = s->user->get_id();
218 }
219
220 bool error = s->err.is_err();
221 if (error && s->err.http_ret == 404) {
222 bucket_name = "-"; /* bucket not found, use the invalid '-' as bucket name */
223 }
224
225 string u = user.to_str();
226 string p = payer.to_str();
227 rgw_usage_log_entry entry(u, p, bucket_name);
228
229 uint64_t bytes_sent = ACCOUNTING_IO(s)->get_bytes_sent();
230 uint64_t bytes_received = ACCOUNTING_IO(s)->get_bytes_received();
231
232 rgw_usage_data data(bytes_sent, bytes_received);
233
234 data.ops = 1;
235 if (!s->is_err())
236 data.successful_ops = 1;
237
238 ldpp_dout(s, 30) << "log_usage: bucket_name=" << bucket_name
239 << " tenant=" << s->bucket_tenant
240 << ", bytes_sent=" << bytes_sent << ", bytes_received="
241 << bytes_received << ", success=" << data.successful_ops << dendl;
242
243 entry.add(op_name, data);
244
245 utime_t ts = ceph_clock_now();
246
247 usage_logger->insert(ts, entry);
248 }
249
250 void rgw_format_ops_log_entry(struct rgw_log_entry& entry, Formatter *formatter)
251 {
252 formatter->open_object_section("log_entry");
253 formatter->dump_string("bucket", entry.bucket);
254 {
255 auto t = utime_t{entry.time};
256 t.gmtime(formatter->dump_stream("time")); // UTC
257 t.localtime(formatter->dump_stream("time_local"));
258 }
259 formatter->dump_string("remote_addr", entry.remote_addr);
260 string obj_owner = entry.object_owner.to_str();
261 if (obj_owner.length())
262 formatter->dump_string("object_owner", obj_owner);
263 formatter->dump_string("user", entry.user);
264 formatter->dump_string("operation", entry.op);
265 formatter->dump_string("uri", entry.uri);
266 formatter->dump_string("http_status", entry.http_status);
267 formatter->dump_string("error_code", entry.error_code);
268 formatter->dump_int("bytes_sent", entry.bytes_sent);
269 formatter->dump_int("bytes_received", entry.bytes_received);
270 formatter->dump_int("object_size", entry.obj_size);
271 {
272 using namespace std::chrono;
273 uint64_t total_time = duration_cast<milliseconds>(entry.total_time).count();
274 formatter->dump_int("total_time", total_time);
275 }
276 formatter->dump_string("user_agent", entry.user_agent);
277 formatter->dump_string("referrer", entry.referrer);
278 if (entry.x_headers.size() > 0) {
279 formatter->open_array_section("http_x_headers");
280 for (const auto& iter: entry.x_headers) {
281 formatter->open_object_section(iter.first.c_str());
282 formatter->dump_string(iter.first.c_str(), iter.second);
283 formatter->close_section();
284 }
285 formatter->close_section();
286 }
287 formatter->dump_string("trans_id", entry.trans_id);
288 switch(entry.identity_type) {
289 case TYPE_RGW:
290 formatter->dump_string("authentication_type","Local");
291 break;
292 case TYPE_LDAP:
293 formatter->dump_string("authentication_type","LDAP");
294 break;
295 case TYPE_KEYSTONE:
296 formatter->dump_string("authentication_type","Keystone");
297 break;
298 case TYPE_WEB:
299 formatter->dump_string("authentication_type","OIDC Provider");
300 break;
301 case TYPE_ROLE:
302 formatter->dump_string("authentication_type","STS");
303 break;
304 default:
305 break;
306 }
307 if (entry.token_claims.size() > 0) {
308 if (entry.token_claims[0] == "sts") {
309 formatter->open_object_section("sts_info");
310 for (const auto& iter: entry.token_claims) {
311 auto pos = iter.find(":");
312 if (pos != string::npos) {
313 formatter->dump_string(iter.substr(0, pos), iter.substr(pos + 1));
314 }
315 }
316 formatter->close_section();
317 }
318 }
319
320 formatter->close_section();
321 }
322
323 OpsLogManifold::~OpsLogManifold()
324 {
325 for (const auto &sink : sinks) {
326 delete sink;
327 }
328 }
329
330 void OpsLogManifold::add_sink(OpsLogSink* sink)
331 {
332 sinks.push_back(sink);
333 }
334
335 int OpsLogManifold::log(struct req_state* s, struct rgw_log_entry& entry)
336 {
337 int ret = 0;
338 for (const auto &sink : sinks) {
339 if (sink->log(s, entry) < 0) {
340 ret = -1;
341 }
342 }
343 return ret;
344 }
345
346 OpsLogFile::OpsLogFile(CephContext* cct, std::string& path, uint64_t max_data_size) :
347 cct(cct), file(path, std::ofstream::app), data_size(0), max_data_size(max_data_size)
348 {
349 }
350
351 void OpsLogFile::flush()
352 {
353 std::scoped_lock flush_lock(flush_mutex);
354 {
355 std::scoped_lock log_lock(log_mutex);
356 assert(flush_buffer.empty());
357 flush_buffer.swap(log_buffer);
358 data_size = 0;
359 }
360 for (auto bl : flush_buffer) {
361 int try_num = 0;
362 while (true) {
363 bl.write_stream(file);
364 if (!file) {
365 ldpp_dout(this, 0) << "ERROR: failed to log RGW ops log file entry" << dendl;
366 file.clear();
367 if (stopped) {
368 break;
369 }
370 int sleep_time_secs = std::min((int) pow(2, try_num), 60);
371 std::this_thread::sleep_for(std::chrono::seconds(sleep_time_secs));
372 try_num++;
373 } else {
374 break;
375 }
376 }
377 }
378 flush_buffer.clear();
379 file << std::endl;
380 }
381
382 void* OpsLogFile::entry() {
383 std::unique_lock lock(log_mutex);
384 while (!stopped) {
385 if (!log_buffer.empty()) {
386 lock.unlock();
387 flush();
388 lock.lock();
389 continue;
390 }
391 cond_flush.wait(lock);
392 }
393 flush();
394 return NULL;
395 }
396
397 void OpsLogFile::start() {
398 stopped = false;
399 create("ops_log_file");
400 }
401
402 void OpsLogFile::stop() {
403 {
404 cond_flush.notify_one();
405 stopped = true;
406 }
407 join();
408 }
409
410 OpsLogFile::~OpsLogFile()
411 {
412 if (!stopped) {
413 stop();
414 }
415 file.close();
416 }
417
418 int OpsLogFile::log_json(struct req_state* s, bufferlist& bl)
419 {
420 std::unique_lock lock(log_mutex);
421 if (data_size + bl.length() >= max_data_size) {
422 ldout(s->cct, 0) << "ERROR: RGW ops log file buffer too full, dropping log for txn: " << s->trans_id << dendl;
423 return -1;
424 }
425 log_buffer.push_back(bl);
426 data_size += bl.length();
427 cond_flush.notify_all();
428 return 0;
429 }
430
431 JsonOpsLogSink::JsonOpsLogSink() {
432 formatter = new JSONFormatter;
433 }
434
435 JsonOpsLogSink::~JsonOpsLogSink() {
436 delete formatter;
437 }
438
439 void JsonOpsLogSink::formatter_to_bl(bufferlist& bl)
440 {
441 stringstream ss;
442 formatter->flush(ss);
443 const string& s = ss.str();
444 bl.append(s);
445 }
446
447 int JsonOpsLogSink::log(struct req_state* s, struct rgw_log_entry& entry)
448 {
449 bufferlist bl;
450
451 lock.lock();
452 rgw_format_ops_log_entry(entry, formatter);
453 formatter_to_bl(bl);
454 lock.unlock();
455
456 return log_json(s, bl);
457 }
458
459 void OpsLogSocket::init_connection(bufferlist& bl)
460 {
461 bl.append("[");
462 }
463
464 OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog)
465 {
466 delim.append(",\n");
467 }
468
469 int OpsLogSocket::log_json(struct req_state* s, bufferlist& bl)
470 {
471 append_output(bl);
472 return 0;
473 }
474
475 OpsLogRados::OpsLogRados(rgw::sal::Store* const& store): store(store)
476 {
477 }
478
479 int OpsLogRados::log(struct req_state* s, struct rgw_log_entry& entry)
480 {
481 if (!s->cct->_conf->rgw_ops_log_rados) {
482 return 0;
483 }
484 bufferlist bl;
485 encode(entry, bl);
486
487 struct tm bdt;
488 time_t t = req_state::Clock::to_time_t(entry.time);
489 if (s->cct->_conf->rgw_log_object_name_utc)
490 gmtime_r(&t, &bdt);
491 else
492 localtime_r(&t, &bdt);
493 string oid = render_log_object_name(s->cct->_conf->rgw_log_object_name, &bdt,
494 entry.bucket_id, entry.bucket);
495 if (store->log_op(s, oid, bl) < 0) {
496 ldpp_dout(s, 0) << "ERROR: failed to log RADOS RGW ops log entry for txn: " << s->trans_id << dendl;
497 return -1;
498 }
499 return 0;
500 }
501
502 int rgw_log_op(RGWREST* const rest, struct req_state *s, const string& op_name, OpsLogSink *olog)
503 {
504 struct rgw_log_entry entry;
505 string bucket_id;
506
507 if (s->enable_usage_log)
508 log_usage(s, op_name);
509
510 if (!s->enable_ops_log)
511 return 0;
512
513 if (s->bucket_name.empty()) {
514 /* this case is needed for, e.g., list_buckets */
515 } else {
516 if (s->err.ret == -ERR_NO_SUCH_BUCKET ||
517 rgw::sal::Bucket::empty(s->bucket.get())) {
518 if (!s->cct->_conf->rgw_log_nonexistent_bucket) {
519 ldout(s->cct, 5) << "bucket " << s->bucket_name << " doesn't exist, not logging" << dendl;
520 return 0;
521 }
522 bucket_id = "";
523 } else {
524 bucket_id = s->bucket->get_bucket_id();
525 }
526 entry.bucket = rgw_make_bucket_entry_name(s->bucket_tenant, s->bucket_name);
527
528 if (check_utf8(entry.bucket.c_str(), entry.bucket.size()) != 0) {
529 ldpp_dout(s, 5) << "not logging op on bucket with non-utf8 name" << dendl;
530 return 0;
531 }
532
533 if (!rgw::sal::Object::empty(s->object.get())) {
534 entry.obj = s->object->get_key();
535 } else {
536 entry.obj = rgw_obj_key("-");
537 }
538
539 entry.obj_size = s->obj_size;
540 } /* !bucket empty */
541
542 if (s->cct->_conf->rgw_remote_addr_param.length())
543 set_param_str(s, s->cct->_conf->rgw_remote_addr_param.c_str(),
544 entry.remote_addr);
545 else
546 set_param_str(s, "REMOTE_ADDR", entry.remote_addr);
547 set_param_str(s, "HTTP_USER_AGENT", entry.user_agent);
548 // legacy apps are still using misspelling referer, such as curl -e option
549 if (s->info.env->exists("HTTP_REFERRER"))
550 set_param_str(s, "HTTP_REFERRER", entry.referrer);
551 else
552 set_param_str(s, "HTTP_REFERER", entry.referrer);
553
554 std::string uri;
555 if (s->info.env->exists("REQUEST_METHOD")) {
556 uri.append(s->info.env->get("REQUEST_METHOD"));
557 uri.append(" ");
558 }
559
560 if (s->info.env->exists("REQUEST_URI")) {
561 uri.append(s->info.env->get("REQUEST_URI"));
562 }
563
564 if (s->info.env->exists("QUERY_STRING")) {
565 const char* qs = s->info.env->get("QUERY_STRING");
566 if(qs && (*qs != '\0')) {
567 uri.append("?");
568 uri.append(qs);
569 }
570 }
571
572 if (s->info.env->exists("HTTP_VERSION")) {
573 uri.append(" ");
574 uri.append("HTTP/");
575 uri.append(s->info.env->get("HTTP_VERSION"));
576 }
577
578 entry.uri = std::move(uri);
579
580 entry.op = op_name;
581
582 if (s->auth.identity) {
583 entry.identity_type = s->auth.identity->get_identity_type();
584 } else {
585 entry.identity_type = TYPE_NONE;
586 }
587
588 if (! s->token_claims.empty()) {
589 entry.token_claims = std::move(s->token_claims);
590 }
591
592 /* custom header logging */
593 if (rest) {
594 if (rest->log_x_headers()) {
595 for (const auto& iter : s->info.env->get_map()) {
596 if (rest->log_x_header(iter.first)) {
597 entry.x_headers.insert(
598 rgw_log_entry::headers_map::value_type(iter.first, iter.second));
599 }
600 }
601 }
602 }
603
604 entry.user = s->user->get_id().to_str();
605 if (s->object_acl)
606 entry.object_owner = s->object_acl->get_owner().get_id();
607 entry.bucket_owner = s->bucket_owner.get_id();
608
609 uint64_t bytes_sent = ACCOUNTING_IO(s)->get_bytes_sent();
610 uint64_t bytes_received = ACCOUNTING_IO(s)->get_bytes_received();
611
612 entry.time = s->time;
613 entry.total_time = s->time_elapsed();
614 entry.bytes_sent = bytes_sent;
615 entry.bytes_received = bytes_received;
616 if (s->err.http_ret) {
617 char buf[16];
618 snprintf(buf, sizeof(buf), "%d", s->err.http_ret);
619 entry.http_status = buf;
620 } else {
621 entry.http_status = "200"; // default
622 }
623 entry.error_code = s->err.err_code;
624 entry.bucket_id = bucket_id;
625 entry.trans_id = s->trans_id;
626 if (olog) {
627 return olog->log(s, entry);
628 }
629 return 0;
630 }
631
632 void rgw_log_entry::generate_test_instances(list<rgw_log_entry*>& o)
633 {
634 rgw_log_entry *e = new rgw_log_entry;
635 e->object_owner = "object_owner";
636 e->bucket_owner = "bucket_owner";
637 e->bucket = "bucket";
638 e->remote_addr = "1.2.3.4";
639 e->user = "user";
640 e->obj = rgw_obj_key("obj");
641 e->uri = "http://uri/bucket/obj";
642 e->http_status = "200";
643 e->error_code = "error_code";
644 e->bytes_sent = 1024;
645 e->bytes_received = 512;
646 e->obj_size = 2048;
647 e->user_agent = "user_agent";
648 e->referrer = "referrer";
649 e->bucket_id = "10";
650 e->trans_id = "trans_id";
651 e->identity_type = TYPE_RGW;
652 o.push_back(e);
653 o.push_back(new rgw_log_entry);
654 }
655
656 void rgw_log_entry::dump(Formatter *f) const
657 {
658 f->dump_string("object_owner", object_owner.to_str());
659 f->dump_string("bucket_owner", bucket_owner.to_str());
660 f->dump_string("bucket", bucket);
661 f->dump_stream("time") << time;
662 f->dump_string("remote_addr", remote_addr);
663 f->dump_string("user", user);
664 f->dump_stream("obj") << obj;
665 f->dump_string("op", op);
666 f->dump_string("uri", uri);
667 f->dump_string("http_status", http_status);
668 f->dump_string("error_code", error_code);
669 f->dump_unsigned("bytes_sent", bytes_sent);
670 f->dump_unsigned("bytes_received", bytes_received);
671 f->dump_unsigned("obj_size", obj_size);
672 f->dump_stream("total_time") << total_time;
673 f->dump_string("user_agent", user_agent);
674 f->dump_string("referrer", referrer);
675 f->dump_string("bucket_id", bucket_id);
676 f->dump_string("trans_id", trans_id);
677 f->dump_unsigned("identity_type", identity_type);
678 }
679