]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_log.cc
c51eecded53d204b39b1cf03bea67297dc57f68f
[ceph.git] / ceph / src / rgw / rgw_log.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "common/Clock.h"
5 #include "common/Timer.h"
6 #include "common/utf8.h"
7 #include "common/OutputDataSocket.h"
8 #include "common/Formatter.h"
9
10 #include "rgw_bucket.h"
11 #include "rgw_log.h"
12 #include "rgw_acl.h"
13 #include "rgw_client_io.h"
14 #include "rgw_rest.h"
15 #include "rgw_zone.h"
16 #include "rgw_rados.h"
17
18 #include "services/svc_zone.h"
19
20 #include <chrono>
21 #include <math.h>
22
23 #define dout_subsys ceph_subsys_rgw
24
25 using namespace std;
26
27 static void set_param_str(struct req_state *s, const char *name, string& str)
28 {
29 const char *p = s->info.env->get(name);
30 if (p)
31 str = p;
32 }
33
34 string render_log_object_name(const string& format,
35 struct tm *dt, const string& bucket_id,
36 const string& bucket_name)
37 {
38 string o;
39 for (unsigned i=0; i<format.size(); i++) {
40 if (format[i] == '%' && i+1 < format.size()) {
41 i++;
42 char buf[32];
43 switch (format[i]) {
44 case '%':
45 strcpy(buf, "%");
46 break;
47 case 'Y':
48 sprintf(buf, "%.4d", dt->tm_year + 1900);
49 break;
50 case 'y':
51 sprintf(buf, "%.2d", dt->tm_year % 100);
52 break;
53 case 'm':
54 sprintf(buf, "%.2d", dt->tm_mon + 1);
55 break;
56 case 'd':
57 sprintf(buf, "%.2d", dt->tm_mday);
58 break;
59 case 'H':
60 sprintf(buf, "%.2d", dt->tm_hour);
61 break;
62 case 'I':
63 sprintf(buf, "%.2d", (dt->tm_hour % 12) + 1);
64 break;
65 case 'k':
66 sprintf(buf, "%d", dt->tm_hour);
67 break;
68 case 'l':
69 sprintf(buf, "%d", (dt->tm_hour % 12) + 1);
70 break;
71 case 'M':
72 sprintf(buf, "%.2d", dt->tm_min);
73 break;
74
75 case 'i':
76 o += bucket_id;
77 continue;
78 case 'n':
79 o += bucket_name;
80 continue;
81 default:
82 // unknown code
83 sprintf(buf, "%%%c", format[i]);
84 break;
85 }
86 o += buf;
87 continue;
88 }
89 o += format[i];
90 }
91 return o;
92 }
93
94 /* usage logger */
95 class UsageLogger : public DoutPrefixProvider {
96 CephContext *cct;
97 rgw::sal::Store* store;
98 map<rgw_user_bucket, RGWUsageBatch> usage_map;
99 ceph::mutex lock = ceph::make_mutex("UsageLogger");
100 int32_t num_entries;
101 ceph::mutex timer_lock = ceph::make_mutex("UsageLogger::timer_lock");
102 SafeTimer timer;
103 utime_t round_timestamp;
104
105 class C_UsageLogTimeout : public Context {
106 UsageLogger *logger;
107 public:
108 explicit C_UsageLogTimeout(UsageLogger *_l) : logger(_l) {}
109 void finish(int r) override {
110 logger->flush();
111 logger->set_timer();
112 }
113 };
114
115 void set_timer() {
116 timer.add_event_after(cct->_conf->rgw_usage_log_tick_interval, new C_UsageLogTimeout(this));
117 }
118 public:
119
120 UsageLogger(CephContext *_cct, rgw::sal::Store* _store) : cct(_cct), store(_store), num_entries(0), timer(cct, timer_lock) {
121 timer.init();
122 std::lock_guard l{timer_lock};
123 set_timer();
124 utime_t ts = ceph_clock_now();
125 recalc_round_timestamp(ts);
126 }
127
128 ~UsageLogger() {
129 std::lock_guard l{timer_lock};
130 flush();
131 timer.cancel_all_events();
132 timer.shutdown();
133 }
134
135 void recalc_round_timestamp(utime_t& ts) {
136 round_timestamp = ts.round_to_hour();
137 }
138
139 void insert_user(utime_t& timestamp, const rgw_user& user, rgw_usage_log_entry& entry) {
140 lock.lock();
141 if (timestamp.sec() > round_timestamp + 3600)
142 recalc_round_timestamp(timestamp);
143 entry.epoch = round_timestamp.sec();
144 bool account;
145 string u = user.to_str();
146 rgw_user_bucket ub(u, entry.bucket);
147 real_time rt = round_timestamp.to_real_time();
148 usage_map[ub].insert(rt, entry, &account);
149 if (account)
150 num_entries++;
151 bool need_flush = (num_entries > cct->_conf->rgw_usage_log_flush_threshold);
152 lock.unlock();
153 if (need_flush) {
154 std::lock_guard l{timer_lock};
155 flush();
156 }
157 }
158
159 void insert(utime_t& timestamp, rgw_usage_log_entry& entry) {
160 if (entry.payer.empty()) {
161 insert_user(timestamp, entry.owner, entry);
162 } else {
163 insert_user(timestamp, entry.payer, entry);
164 }
165 }
166
167 void flush() {
168 map<rgw_user_bucket, RGWUsageBatch> old_map;
169 lock.lock();
170 old_map.swap(usage_map);
171 num_entries = 0;
172 lock.unlock();
173
174 store->log_usage(this, old_map);
175 }
176
177 CephContext *get_cct() const override { return cct; }
178 unsigned get_subsys() const override { return dout_subsys; }
179 std::ostream& gen_prefix(std::ostream& out) const override { return out << "rgw UsageLogger: "; }
180 };
181
182 static UsageLogger *usage_logger = NULL;
183
184 void rgw_log_usage_init(CephContext *cct, rgw::sal::Store* store)
185 {
186 usage_logger = new UsageLogger(cct, store);
187 }
188
189 void rgw_log_usage_finalize()
190 {
191 delete usage_logger;
192 usage_logger = NULL;
193 }
194
195 static void log_usage(struct req_state *s, const string& op_name)
196 {
197 if (s->system_request) /* don't log system user operations */
198 return;
199
200 if (!usage_logger)
201 return;
202
203 rgw_user user;
204 rgw_user payer;
205 string bucket_name;
206
207 bucket_name = s->bucket_name;
208
209 if (!bucket_name.empty()) {
210 bucket_name = s->bucket_name;
211 user = s->bucket_owner.get_id();
212 if (!rgw::sal::Bucket::empty(s->bucket.get()) &&
213 s->bucket->get_info().requester_pays) {
214 payer = s->user->get_id();
215 }
216 } else {
217 user = s->user->get_id();
218 }
219
220 bool error = s->err.is_err();
221 if (error && s->err.http_ret == 404) {
222 bucket_name = "-"; /* bucket not found, use the invalid '-' as bucket name */
223 }
224
225 string u = user.to_str();
226 string p = payer.to_str();
227 rgw_usage_log_entry entry(u, p, bucket_name);
228
229 uint64_t bytes_sent = ACCOUNTING_IO(s)->get_bytes_sent();
230 uint64_t bytes_received = ACCOUNTING_IO(s)->get_bytes_received();
231
232 rgw_usage_data data(bytes_sent, bytes_received);
233
234 data.ops = 1;
235 if (!s->is_err())
236 data.successful_ops = 1;
237
238 ldpp_dout(s, 30) << "log_usage: bucket_name=" << bucket_name
239 << " tenant=" << s->bucket_tenant
240 << ", bytes_sent=" << bytes_sent << ", bytes_received="
241 << bytes_received << ", success=" << data.successful_ops << dendl;
242
243 entry.add(op_name, data);
244
245 utime_t ts = ceph_clock_now();
246
247 usage_logger->insert(ts, entry);
248 }
249
250 void rgw_format_ops_log_entry(struct rgw_log_entry& entry, Formatter *formatter)
251 {
252 formatter->open_object_section("log_entry");
253 formatter->dump_string("bucket", entry.bucket);
254 {
255 auto t = utime_t{entry.time};
256 t.gmtime(formatter->dump_stream("time")); // UTC
257 t.localtime(formatter->dump_stream("time_local"));
258 }
259 formatter->dump_string("remote_addr", entry.remote_addr);
260 string obj_owner = entry.object_owner.to_str();
261 if (obj_owner.length())
262 formatter->dump_string("object_owner", obj_owner);
263 formatter->dump_string("user", entry.user);
264 formatter->dump_string("operation", entry.op);
265 formatter->dump_string("uri", entry.uri);
266 formatter->dump_string("http_status", entry.http_status);
267 formatter->dump_string("error_code", entry.error_code);
268 formatter->dump_int("bytes_sent", entry.bytes_sent);
269 formatter->dump_int("bytes_received", entry.bytes_received);
270 formatter->dump_int("object_size", entry.obj_size);
271 {
272 using namespace std::chrono;
273 uint64_t total_time = duration_cast<milliseconds>(entry.total_time).count();
274 formatter->dump_int("total_time", total_time);
275 }
276 formatter->dump_string("user_agent", entry.user_agent);
277 formatter->dump_string("referrer", entry.referrer);
278 if (entry.x_headers.size() > 0) {
279 formatter->open_array_section("http_x_headers");
280 for (const auto& iter: entry.x_headers) {
281 formatter->open_object_section(iter.first.c_str());
282 formatter->dump_string(iter.first.c_str(), iter.second);
283 formatter->close_section();
284 }
285 formatter->close_section();
286 }
287 formatter->dump_string("trans_id", entry.trans_id);
288 switch(entry.identity_type) {
289 case TYPE_RGW:
290 formatter->dump_string("authentication_type","Local");
291 break;
292 case TYPE_LDAP:
293 formatter->dump_string("authentication_type","LDAP");
294 break;
295 case TYPE_KEYSTONE:
296 formatter->dump_string("authentication_type","Keystone");
297 break;
298 case TYPE_WEB:
299 formatter->dump_string("authentication_type","OIDC Provider");
300 break;
301 case TYPE_ROLE:
302 formatter->dump_string("authentication_type","STS");
303 break;
304 default:
305 break;
306 }
307 if (entry.token_claims.size() > 0) {
308 if (entry.token_claims[0] == "sts") {
309 formatter->open_object_section("sts_info");
310 for (const auto& iter: entry.token_claims) {
311 auto pos = iter.find(":");
312 if (pos != string::npos) {
313 formatter->dump_string(iter.substr(0, pos), iter.substr(pos + 1));
314 }
315 }
316 formatter->close_section();
317 }
318 }
319 if (!entry.access_key_id.empty()) {
320 formatter->dump_string("access_key_id", entry.access_key_id);
321 }
322 if (!entry.subuser.empty()) {
323 formatter->dump_string("subuser", entry.subuser);
324 }
325 formatter->dump_bool("temp_url", entry.temp_url);
326 formatter->close_section();
327 }
328
329 OpsLogManifold::~OpsLogManifold()
330 {
331 for (const auto &sink : sinks) {
332 delete sink;
333 }
334 }
335
336 void OpsLogManifold::add_sink(OpsLogSink* sink)
337 {
338 sinks.push_back(sink);
339 }
340
341 int OpsLogManifold::log(struct req_state* s, struct rgw_log_entry& entry)
342 {
343 int ret = 0;
344 for (const auto &sink : sinks) {
345 if (sink->log(s, entry) < 0) {
346 ret = -1;
347 }
348 }
349 return ret;
350 }
351
352 OpsLogFile::OpsLogFile(CephContext* cct, std::string& path, uint64_t max_data_size) :
353 cct(cct), data_size(0), max_data_size(max_data_size), path(path), need_reopen(false)
354 {
355 }
356
357 void OpsLogFile::reopen() {
358 need_reopen = true;
359 }
360
361 void OpsLogFile::flush()
362 {
363 {
364 std::scoped_lock log_lock(mutex);
365 assert(flush_buffer.empty());
366 flush_buffer.swap(log_buffer);
367 data_size = 0;
368 }
369 for (auto bl : flush_buffer) {
370 int try_num = 0;
371 while (true) {
372 if (!file.is_open() || need_reopen) {
373 need_reopen = false;
374 file.close();
375 file.open(path, std::ofstream::app);
376 }
377 bl.write_stream(file);
378 if (!file) {
379 ldpp_dout(this, 0) << "ERROR: failed to log RGW ops log file entry" << dendl;
380 file.clear();
381 if (stopped) {
382 break;
383 }
384 int sleep_time_secs = std::min((int) pow(2, try_num), 60);
385 std::this_thread::sleep_for(std::chrono::seconds(sleep_time_secs));
386 try_num++;
387 } else {
388 break;
389 }
390 }
391 }
392 flush_buffer.clear();
393 file << std::endl;
394 }
395
396 void* OpsLogFile::entry() {
397 std::unique_lock lock(mutex);
398 while (!stopped) {
399 if (!log_buffer.empty()) {
400 lock.unlock();
401 flush();
402 lock.lock();
403 continue;
404 }
405 cond.wait(lock);
406 }
407 lock.unlock();
408 flush();
409 return NULL;
410 }
411
412 void OpsLogFile::start() {
413 stopped = false;
414 create("ops_log_file");
415 }
416
417 void OpsLogFile::stop() {
418 {
419 std::unique_lock lock(mutex);
420 cond.notify_one();
421 stopped = true;
422 }
423 join();
424 }
425
426 OpsLogFile::~OpsLogFile()
427 {
428 if (!stopped) {
429 stop();
430 }
431 file.close();
432 }
433
434 int OpsLogFile::log_json(struct req_state* s, bufferlist& bl)
435 {
436 std::unique_lock lock(mutex);
437 if (data_size + bl.length() >= max_data_size) {
438 ldout(s->cct, 0) << "ERROR: RGW ops log file buffer too full, dropping log for txn: " << s->trans_id << dendl;
439 return -1;
440 }
441 log_buffer.push_back(bl);
442 data_size += bl.length();
443 cond.notify_all();
444 return 0;
445 }
446
447 JsonOpsLogSink::JsonOpsLogSink() {
448 formatter = new JSONFormatter;
449 }
450
451 JsonOpsLogSink::~JsonOpsLogSink() {
452 delete formatter;
453 }
454
455 void JsonOpsLogSink::formatter_to_bl(bufferlist& bl)
456 {
457 stringstream ss;
458 formatter->flush(ss);
459 const string& s = ss.str();
460 bl.append(s);
461 }
462
463 int JsonOpsLogSink::log(struct req_state* s, struct rgw_log_entry& entry)
464 {
465 bufferlist bl;
466
467 lock.lock();
468 rgw_format_ops_log_entry(entry, formatter);
469 formatter_to_bl(bl);
470 lock.unlock();
471
472 return log_json(s, bl);
473 }
474
475 void OpsLogSocket::init_connection(bufferlist& bl)
476 {
477 bl.append("[");
478 }
479
480 OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog)
481 {
482 delim.append(",\n");
483 }
484
485 int OpsLogSocket::log_json(struct req_state* s, bufferlist& bl)
486 {
487 append_output(bl);
488 return 0;
489 }
490
491 OpsLogRados::OpsLogRados(rgw::sal::Store* const& store): store(store)
492 {
493 }
494
495 int OpsLogRados::log(struct req_state* s, struct rgw_log_entry& entry)
496 {
497 if (!s->cct->_conf->rgw_ops_log_rados) {
498 return 0;
499 }
500 bufferlist bl;
501 encode(entry, bl);
502
503 struct tm bdt;
504 time_t t = req_state::Clock::to_time_t(entry.time);
505 if (s->cct->_conf->rgw_log_object_name_utc)
506 gmtime_r(&t, &bdt);
507 else
508 localtime_r(&t, &bdt);
509 string oid = render_log_object_name(s->cct->_conf->rgw_log_object_name, &bdt,
510 entry.bucket_id, entry.bucket);
511 if (store->log_op(s, oid, bl) < 0) {
512 ldpp_dout(s, 0) << "ERROR: failed to log RADOS RGW ops log entry for txn: " << s->trans_id << dendl;
513 return -1;
514 }
515 return 0;
516 }
517
518 int rgw_log_op(RGWREST* const rest, struct req_state *s, const string& op_name, OpsLogSink *olog)
519 {
520 struct rgw_log_entry entry;
521 string bucket_id;
522
523 if (s->enable_usage_log)
524 log_usage(s, op_name);
525
526 if (!s->enable_ops_log)
527 return 0;
528
529 if (s->bucket_name.empty()) {
530 /* this case is needed for, e.g., list_buckets */
531 } else {
532 if (s->err.ret == -ERR_NO_SUCH_BUCKET ||
533 rgw::sal::Bucket::empty(s->bucket.get())) {
534 if (!s->cct->_conf->rgw_log_nonexistent_bucket) {
535 ldout(s->cct, 5) << "bucket " << s->bucket_name << " doesn't exist, not logging" << dendl;
536 return 0;
537 }
538 bucket_id = "";
539 } else {
540 bucket_id = s->bucket->get_bucket_id();
541 }
542 entry.bucket = rgw_make_bucket_entry_name(s->bucket_tenant, s->bucket_name);
543
544 if (check_utf8(entry.bucket.c_str(), entry.bucket.size()) != 0) {
545 ldpp_dout(s, 5) << "not logging op on bucket with non-utf8 name" << dendl;
546 return 0;
547 }
548
549 if (!rgw::sal::Object::empty(s->object.get())) {
550 entry.obj = s->object->get_key();
551 } else {
552 entry.obj = rgw_obj_key("-");
553 }
554
555 entry.obj_size = s->obj_size;
556 } /* !bucket empty */
557
558 if (s->cct->_conf->rgw_remote_addr_param.length())
559 set_param_str(s, s->cct->_conf->rgw_remote_addr_param.c_str(),
560 entry.remote_addr);
561 else
562 set_param_str(s, "REMOTE_ADDR", entry.remote_addr);
563 set_param_str(s, "HTTP_USER_AGENT", entry.user_agent);
564 // legacy apps are still using misspelling referer, such as curl -e option
565 if (s->info.env->exists("HTTP_REFERRER"))
566 set_param_str(s, "HTTP_REFERRER", entry.referrer);
567 else
568 set_param_str(s, "HTTP_REFERER", entry.referrer);
569
570 std::string uri;
571 if (s->info.env->exists("REQUEST_METHOD")) {
572 uri.append(s->info.env->get("REQUEST_METHOD"));
573 uri.append(" ");
574 }
575
576 if (s->info.env->exists("REQUEST_URI")) {
577 uri.append(s->info.env->get("REQUEST_URI"));
578 }
579
580 if (s->info.env->exists("QUERY_STRING")) {
581 const char* qs = s->info.env->get("QUERY_STRING");
582 if(qs && (*qs != '\0')) {
583 uri.append("?");
584 uri.append(qs);
585 }
586 }
587
588 if (s->info.env->exists("HTTP_VERSION")) {
589 uri.append(" ");
590 uri.append("HTTP/");
591 uri.append(s->info.env->get("HTTP_VERSION"));
592 }
593
594 entry.uri = std::move(uri);
595
596 entry.op = op_name;
597
598 if (s->auth.identity) {
599 entry.identity_type = s->auth.identity->get_identity_type();
600 s->auth.identity->write_ops_log_entry(entry);
601 } else {
602 entry.identity_type = TYPE_NONE;
603 }
604
605 if (! s->token_claims.empty()) {
606 entry.token_claims = std::move(s->token_claims);
607 }
608
609 /* custom header logging */
610 if (rest) {
611 if (rest->log_x_headers()) {
612 for (const auto& iter : s->info.env->get_map()) {
613 if (rest->log_x_header(iter.first)) {
614 entry.x_headers.insert(
615 rgw_log_entry::headers_map::value_type(iter.first, iter.second));
616 }
617 }
618 }
619 }
620
621 entry.user = s->user->get_id().to_str();
622 if (s->object_acl)
623 entry.object_owner = s->object_acl->get_owner().get_id();
624 entry.bucket_owner = s->bucket_owner.get_id();
625
626 uint64_t bytes_sent = ACCOUNTING_IO(s)->get_bytes_sent();
627 uint64_t bytes_received = ACCOUNTING_IO(s)->get_bytes_received();
628
629 entry.time = s->time;
630 entry.total_time = s->time_elapsed();
631 entry.bytes_sent = bytes_sent;
632 entry.bytes_received = bytes_received;
633 if (s->err.http_ret) {
634 char buf[16];
635 snprintf(buf, sizeof(buf), "%d", s->err.http_ret);
636 entry.http_status = buf;
637 } else {
638 entry.http_status = "200"; // default
639 }
640 entry.error_code = s->err.err_code;
641 entry.bucket_id = bucket_id;
642 entry.trans_id = s->trans_id;
643 if (olog) {
644 return olog->log(s, entry);
645 }
646 return 0;
647 }
648
649 void rgw_log_entry::generate_test_instances(list<rgw_log_entry*>& o)
650 {
651 rgw_log_entry *e = new rgw_log_entry;
652 e->object_owner = "object_owner";
653 e->bucket_owner = "bucket_owner";
654 e->bucket = "bucket";
655 e->remote_addr = "1.2.3.4";
656 e->user = "user";
657 e->obj = rgw_obj_key("obj");
658 e->uri = "http://uri/bucket/obj";
659 e->http_status = "200";
660 e->error_code = "error_code";
661 e->bytes_sent = 1024;
662 e->bytes_received = 512;
663 e->obj_size = 2048;
664 e->user_agent = "user_agent";
665 e->referrer = "referrer";
666 e->bucket_id = "10";
667 e->trans_id = "trans_id";
668 e->identity_type = TYPE_RGW;
669 o.push_back(e);
670 o.push_back(new rgw_log_entry);
671 }
672
673 void rgw_log_entry::dump(Formatter *f) const
674 {
675 f->dump_string("object_owner", object_owner.to_str());
676 f->dump_string("bucket_owner", bucket_owner.to_str());
677 f->dump_string("bucket", bucket);
678 f->dump_stream("time") << time;
679 f->dump_string("remote_addr", remote_addr);
680 f->dump_string("user", user);
681 f->dump_stream("obj") << obj;
682 f->dump_string("op", op);
683 f->dump_string("uri", uri);
684 f->dump_string("http_status", http_status);
685 f->dump_string("error_code", error_code);
686 f->dump_unsigned("bytes_sent", bytes_sent);
687 f->dump_unsigned("bytes_received", bytes_received);
688 f->dump_unsigned("obj_size", obj_size);
689 f->dump_stream("total_time") << total_time;
690 f->dump_string("user_agent", user_agent);
691 f->dump_string("referrer", referrer);
692 f->dump_string("bucket_id", bucket_id);
693 f->dump_string("trans_id", trans_id);
694 f->dump_unsigned("identity_type", identity_type);
695 }
696