1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "common/Clock.h"
5 #include "common/Timer.h"
6 #include "common/utf8.h"
7 #include "common/OutputDataSocket.h"
8 #include "common/Formatter.h"
10 #include "rgw_bucket.h"
13 #include "rgw_client_io.h"
16 #include "rgw_rados.h"
18 #include "services/svc_zone.h"
23 #define dout_subsys ceph_subsys_rgw
27 static void set_param_str(struct req_state
*s
, const char *name
, string
& str
)
29 const char *p
= s
->info
.env
->get(name
);
34 string
render_log_object_name(const string
& format
,
35 struct tm
*dt
, const string
& bucket_id
,
36 const string
& bucket_name
)
39 for (unsigned i
=0; i
<format
.size(); i
++) {
40 if (format
[i
] == '%' && i
+1 < format
.size()) {
48 sprintf(buf
, "%.4d", dt
->tm_year
+ 1900);
51 sprintf(buf
, "%.2d", dt
->tm_year
% 100);
54 sprintf(buf
, "%.2d", dt
->tm_mon
+ 1);
57 sprintf(buf
, "%.2d", dt
->tm_mday
);
60 sprintf(buf
, "%.2d", dt
->tm_hour
);
63 sprintf(buf
, "%.2d", (dt
->tm_hour
% 12) + 1);
66 sprintf(buf
, "%d", dt
->tm_hour
);
69 sprintf(buf
, "%d", (dt
->tm_hour
% 12) + 1);
72 sprintf(buf
, "%.2d", dt
->tm_min
);
83 sprintf(buf
, "%%%c", format
[i
]);
95 class UsageLogger
: public DoutPrefixProvider
{
97 rgw::sal::Store
* store
;
98 map
<rgw_user_bucket
, RGWUsageBatch
> usage_map
;
99 ceph::mutex lock
= ceph::make_mutex("UsageLogger");
101 ceph::mutex timer_lock
= ceph::make_mutex("UsageLogger::timer_lock");
103 utime_t round_timestamp
;
105 class C_UsageLogTimeout
: public Context
{
108 explicit C_UsageLogTimeout(UsageLogger
*_l
) : logger(_l
) {}
109 void finish(int r
) override
{
116 timer
.add_event_after(cct
->_conf
->rgw_usage_log_tick_interval
, new C_UsageLogTimeout(this));
120 UsageLogger(CephContext
*_cct
, rgw::sal::Store
* _store
) : cct(_cct
), store(_store
), num_entries(0), timer(cct
, timer_lock
) {
122 std::lock_guard l
{timer_lock
};
124 utime_t ts
= ceph_clock_now();
125 recalc_round_timestamp(ts
);
129 std::lock_guard l
{timer_lock
};
131 timer
.cancel_all_events();
135 void recalc_round_timestamp(utime_t
& ts
) {
136 round_timestamp
= ts
.round_to_hour();
139 void insert_user(utime_t
& timestamp
, const rgw_user
& user
, rgw_usage_log_entry
& entry
) {
141 if (timestamp
.sec() > round_timestamp
+ 3600)
142 recalc_round_timestamp(timestamp
);
143 entry
.epoch
= round_timestamp
.sec();
145 string u
= user
.to_str();
146 rgw_user_bucket
ub(u
, entry
.bucket
);
147 real_time rt
= round_timestamp
.to_real_time();
148 usage_map
[ub
].insert(rt
, entry
, &account
);
151 bool need_flush
= (num_entries
> cct
->_conf
->rgw_usage_log_flush_threshold
);
154 std::lock_guard l
{timer_lock
};
159 void insert(utime_t
& timestamp
, rgw_usage_log_entry
& entry
) {
160 if (entry
.payer
.empty()) {
161 insert_user(timestamp
, entry
.owner
, entry
);
163 insert_user(timestamp
, entry
.payer
, entry
);
168 map
<rgw_user_bucket
, RGWUsageBatch
> old_map
;
170 old_map
.swap(usage_map
);
174 store
->log_usage(this, old_map
);
177 CephContext
*get_cct() const override
{ return cct
; }
178 unsigned get_subsys() const override
{ return dout_subsys
; }
179 std::ostream
& gen_prefix(std::ostream
& out
) const override
{ return out
<< "rgw UsageLogger: "; }
182 static UsageLogger
*usage_logger
= NULL
;
184 void rgw_log_usage_init(CephContext
*cct
, rgw::sal::Store
* store
)
186 usage_logger
= new UsageLogger(cct
, store
);
189 void rgw_log_usage_finalize()
195 static void log_usage(struct req_state
*s
, const string
& op_name
)
197 if (s
->system_request
) /* don't log system user operations */
207 bucket_name
= s
->bucket_name
;
209 if (!bucket_name
.empty()) {
210 bucket_name
= s
->bucket_name
;
211 user
= s
->bucket_owner
.get_id();
212 if (!rgw::sal::Bucket::empty(s
->bucket
.get()) &&
213 s
->bucket
->get_info().requester_pays
) {
214 payer
= s
->user
->get_id();
217 user
= s
->user
->get_id();
220 bool error
= s
->err
.is_err();
221 if (error
&& s
->err
.http_ret
== 404) {
222 bucket_name
= "-"; /* bucket not found, use the invalid '-' as bucket name */
225 string u
= user
.to_str();
226 string p
= payer
.to_str();
227 rgw_usage_log_entry
entry(u
, p
, bucket_name
);
229 uint64_t bytes_sent
= ACCOUNTING_IO(s
)->get_bytes_sent();
230 uint64_t bytes_received
= ACCOUNTING_IO(s
)->get_bytes_received();
232 rgw_usage_data
data(bytes_sent
, bytes_received
);
236 data
.successful_ops
= 1;
238 ldpp_dout(s
, 30) << "log_usage: bucket_name=" << bucket_name
239 << " tenant=" << s
->bucket_tenant
240 << ", bytes_sent=" << bytes_sent
<< ", bytes_received="
241 << bytes_received
<< ", success=" << data
.successful_ops
<< dendl
;
243 entry
.add(op_name
, data
);
245 utime_t ts
= ceph_clock_now();
247 usage_logger
->insert(ts
, entry
);
250 void rgw_format_ops_log_entry(struct rgw_log_entry
& entry
, Formatter
*formatter
)
252 formatter
->open_object_section("log_entry");
253 formatter
->dump_string("bucket", entry
.bucket
);
255 auto t
= utime_t
{entry
.time
};
256 t
.gmtime(formatter
->dump_stream("time")); // UTC
257 t
.localtime(formatter
->dump_stream("time_local"));
259 formatter
->dump_string("remote_addr", entry
.remote_addr
);
260 string obj_owner
= entry
.object_owner
.to_str();
261 if (obj_owner
.length())
262 formatter
->dump_string("object_owner", obj_owner
);
263 formatter
->dump_string("user", entry
.user
);
264 formatter
->dump_string("operation", entry
.op
);
265 formatter
->dump_string("uri", entry
.uri
);
266 formatter
->dump_string("http_status", entry
.http_status
);
267 formatter
->dump_string("error_code", entry
.error_code
);
268 formatter
->dump_int("bytes_sent", entry
.bytes_sent
);
269 formatter
->dump_int("bytes_received", entry
.bytes_received
);
270 formatter
->dump_int("object_size", entry
.obj_size
);
272 using namespace std::chrono
;
273 uint64_t total_time
= duration_cast
<milliseconds
>(entry
.total_time
).count();
274 formatter
->dump_int("total_time", total_time
);
276 formatter
->dump_string("user_agent", entry
.user_agent
);
277 formatter
->dump_string("referrer", entry
.referrer
);
278 if (entry
.x_headers
.size() > 0) {
279 formatter
->open_array_section("http_x_headers");
280 for (const auto& iter
: entry
.x_headers
) {
281 formatter
->open_object_section(iter
.first
.c_str());
282 formatter
->dump_string(iter
.first
.c_str(), iter
.second
);
283 formatter
->close_section();
285 formatter
->close_section();
287 formatter
->dump_string("trans_id", entry
.trans_id
);
288 switch(entry
.identity_type
) {
290 formatter
->dump_string("authentication_type","Local");
293 formatter
->dump_string("authentication_type","LDAP");
296 formatter
->dump_string("authentication_type","Keystone");
299 formatter
->dump_string("authentication_type","OIDC Provider");
302 formatter
->dump_string("authentication_type","STS");
307 if (entry
.token_claims
.size() > 0) {
308 if (entry
.token_claims
[0] == "sts") {
309 formatter
->open_object_section("sts_info");
310 for (const auto& iter
: entry
.token_claims
) {
311 auto pos
= iter
.find(":");
312 if (pos
!= string::npos
) {
313 formatter
->dump_string(iter
.substr(0, pos
), iter
.substr(pos
+ 1));
316 formatter
->close_section();
319 if (!entry
.access_key_id
.empty()) {
320 formatter
->dump_string("access_key_id", entry
.access_key_id
);
322 if (!entry
.subuser
.empty()) {
323 formatter
->dump_string("subuser", entry
.subuser
);
325 formatter
->dump_bool("temp_url", entry
.temp_url
);
326 formatter
->close_section();
329 OpsLogManifold::~OpsLogManifold()
331 for (const auto &sink
: sinks
) {
336 void OpsLogManifold::add_sink(OpsLogSink
* sink
)
338 sinks
.push_back(sink
);
341 int OpsLogManifold::log(struct req_state
* s
, struct rgw_log_entry
& entry
)
344 for (const auto &sink
: sinks
) {
345 if (sink
->log(s
, entry
) < 0) {
352 OpsLogFile::OpsLogFile(CephContext
* cct
, std::string
& path
, uint64_t max_data_size
) :
353 cct(cct
), data_size(0), max_data_size(max_data_size
), path(path
), need_reopen(false)
357 void OpsLogFile::reopen() {
361 void OpsLogFile::flush()
364 std::scoped_lock
log_lock(mutex
);
365 assert(flush_buffer
.empty());
366 flush_buffer
.swap(log_buffer
);
369 for (auto bl
: flush_buffer
) {
372 if (!file
.is_open() || need_reopen
) {
375 file
.open(path
, std::ofstream::app
);
377 bl
.write_stream(file
);
379 ldpp_dout(this, 0) << "ERROR: failed to log RGW ops log file entry" << dendl
;
384 int sleep_time_secs
= std::min((int) pow(2, try_num
), 60);
385 std::this_thread::sleep_for(std::chrono::seconds(sleep_time_secs
));
392 flush_buffer
.clear();
396 void* OpsLogFile::entry() {
397 std::unique_lock
lock(mutex
);
399 if (!log_buffer
.empty()) {
412 void OpsLogFile::start() {
414 create("ops_log_file");
417 void OpsLogFile::stop() {
419 std::unique_lock
lock(mutex
);
426 OpsLogFile::~OpsLogFile()
434 int OpsLogFile::log_json(struct req_state
* s
, bufferlist
& bl
)
436 std::unique_lock
lock(mutex
);
437 if (data_size
+ bl
.length() >= max_data_size
) {
438 ldout(s
->cct
, 0) << "ERROR: RGW ops log file buffer too full, dropping log for txn: " << s
->trans_id
<< dendl
;
441 log_buffer
.push_back(bl
);
442 data_size
+= bl
.length();
447 JsonOpsLogSink::JsonOpsLogSink() {
448 formatter
= new JSONFormatter
;
451 JsonOpsLogSink::~JsonOpsLogSink() {
455 void JsonOpsLogSink::formatter_to_bl(bufferlist
& bl
)
458 formatter
->flush(ss
);
459 const string
& s
= ss
.str();
463 int JsonOpsLogSink::log(struct req_state
* s
, struct rgw_log_entry
& entry
)
468 rgw_format_ops_log_entry(entry
, formatter
);
472 return log_json(s
, bl
);
475 void OpsLogSocket::init_connection(bufferlist
& bl
)
480 OpsLogSocket::OpsLogSocket(CephContext
*cct
, uint64_t _backlog
) : OutputDataSocket(cct
, _backlog
)
485 int OpsLogSocket::log_json(struct req_state
* s
, bufferlist
& bl
)
491 OpsLogRados::OpsLogRados(rgw::sal::Store
* const& store
): store(store
)
495 int OpsLogRados::log(struct req_state
* s
, struct rgw_log_entry
& entry
)
497 if (!s
->cct
->_conf
->rgw_ops_log_rados
) {
504 time_t t
= req_state::Clock::to_time_t(entry
.time
);
505 if (s
->cct
->_conf
->rgw_log_object_name_utc
)
508 localtime_r(&t
, &bdt
);
509 string oid
= render_log_object_name(s
->cct
->_conf
->rgw_log_object_name
, &bdt
,
510 entry
.bucket_id
, entry
.bucket
);
511 if (store
->log_op(s
, oid
, bl
) < 0) {
512 ldpp_dout(s
, 0) << "ERROR: failed to log RADOS RGW ops log entry for txn: " << s
->trans_id
<< dendl
;
518 int rgw_log_op(RGWREST
* const rest
, struct req_state
*s
, const string
& op_name
, OpsLogSink
*olog
)
520 struct rgw_log_entry entry
;
523 if (s
->enable_usage_log
)
524 log_usage(s
, op_name
);
526 if (!s
->enable_ops_log
)
529 if (s
->bucket_name
.empty()) {
530 /* this case is needed for, e.g., list_buckets */
532 if (s
->err
.ret
== -ERR_NO_SUCH_BUCKET
||
533 rgw::sal::Bucket::empty(s
->bucket
.get())) {
534 if (!s
->cct
->_conf
->rgw_log_nonexistent_bucket
) {
535 ldout(s
->cct
, 5) << "bucket " << s
->bucket_name
<< " doesn't exist, not logging" << dendl
;
540 bucket_id
= s
->bucket
->get_bucket_id();
542 entry
.bucket
= rgw_make_bucket_entry_name(s
->bucket_tenant
, s
->bucket_name
);
544 if (check_utf8(entry
.bucket
.c_str(), entry
.bucket
.size()) != 0) {
545 ldpp_dout(s
, 5) << "not logging op on bucket with non-utf8 name" << dendl
;
549 if (!rgw::sal::Object::empty(s
->object
.get())) {
550 entry
.obj
= s
->object
->get_key();
552 entry
.obj
= rgw_obj_key("-");
555 entry
.obj_size
= s
->obj_size
;
556 } /* !bucket empty */
558 if (s
->cct
->_conf
->rgw_remote_addr_param
.length())
559 set_param_str(s
, s
->cct
->_conf
->rgw_remote_addr_param
.c_str(),
562 set_param_str(s
, "REMOTE_ADDR", entry
.remote_addr
);
563 set_param_str(s
, "HTTP_USER_AGENT", entry
.user_agent
);
564 // legacy apps are still using misspelling referer, such as curl -e option
565 if (s
->info
.env
->exists("HTTP_REFERRER"))
566 set_param_str(s
, "HTTP_REFERRER", entry
.referrer
);
568 set_param_str(s
, "HTTP_REFERER", entry
.referrer
);
571 if (s
->info
.env
->exists("REQUEST_METHOD")) {
572 uri
.append(s
->info
.env
->get("REQUEST_METHOD"));
576 if (s
->info
.env
->exists("REQUEST_URI")) {
577 uri
.append(s
->info
.env
->get("REQUEST_URI"));
580 if (s
->info
.env
->exists("QUERY_STRING")) {
581 const char* qs
= s
->info
.env
->get("QUERY_STRING");
582 if(qs
&& (*qs
!= '\0')) {
588 if (s
->info
.env
->exists("HTTP_VERSION")) {
591 uri
.append(s
->info
.env
->get("HTTP_VERSION"));
594 entry
.uri
= std::move(uri
);
598 if (s
->auth
.identity
) {
599 entry
.identity_type
= s
->auth
.identity
->get_identity_type();
600 s
->auth
.identity
->write_ops_log_entry(entry
);
602 entry
.identity_type
= TYPE_NONE
;
605 if (! s
->token_claims
.empty()) {
606 entry
.token_claims
= std::move(s
->token_claims
);
609 /* custom header logging */
611 if (rest
->log_x_headers()) {
612 for (const auto& iter
: s
->info
.env
->get_map()) {
613 if (rest
->log_x_header(iter
.first
)) {
614 entry
.x_headers
.insert(
615 rgw_log_entry::headers_map::value_type(iter
.first
, iter
.second
));
621 entry
.user
= s
->user
->get_id().to_str();
623 entry
.object_owner
= s
->object_acl
->get_owner().get_id();
624 entry
.bucket_owner
= s
->bucket_owner
.get_id();
626 uint64_t bytes_sent
= ACCOUNTING_IO(s
)->get_bytes_sent();
627 uint64_t bytes_received
= ACCOUNTING_IO(s
)->get_bytes_received();
629 entry
.time
= s
->time
;
630 entry
.total_time
= s
->time_elapsed();
631 entry
.bytes_sent
= bytes_sent
;
632 entry
.bytes_received
= bytes_received
;
633 if (s
->err
.http_ret
) {
635 snprintf(buf
, sizeof(buf
), "%d", s
->err
.http_ret
);
636 entry
.http_status
= buf
;
638 entry
.http_status
= "200"; // default
640 entry
.error_code
= s
->err
.err_code
;
641 entry
.bucket_id
= bucket_id
;
642 entry
.trans_id
= s
->trans_id
;
644 return olog
->log(s
, entry
);
649 void rgw_log_entry::generate_test_instances(list
<rgw_log_entry
*>& o
)
651 rgw_log_entry
*e
= new rgw_log_entry
;
652 e
->object_owner
= "object_owner";
653 e
->bucket_owner
= "bucket_owner";
654 e
->bucket
= "bucket";
655 e
->remote_addr
= "1.2.3.4";
657 e
->obj
= rgw_obj_key("obj");
658 e
->uri
= "http://uri/bucket/obj";
659 e
->http_status
= "200";
660 e
->error_code
= "error_code";
661 e
->bytes_sent
= 1024;
662 e
->bytes_received
= 512;
664 e
->user_agent
= "user_agent";
665 e
->referrer
= "referrer";
667 e
->trans_id
= "trans_id";
668 e
->identity_type
= TYPE_RGW
;
670 o
.push_back(new rgw_log_entry
);
673 void rgw_log_entry::dump(Formatter
*f
) const
675 f
->dump_string("object_owner", object_owner
.to_str());
676 f
->dump_string("bucket_owner", bucket_owner
.to_str());
677 f
->dump_string("bucket", bucket
);
678 f
->dump_stream("time") << time
;
679 f
->dump_string("remote_addr", remote_addr
);
680 f
->dump_string("user", user
);
681 f
->dump_stream("obj") << obj
;
682 f
->dump_string("op", op
);
683 f
->dump_string("uri", uri
);
684 f
->dump_string("http_status", http_status
);
685 f
->dump_string("error_code", error_code
);
686 f
->dump_unsigned("bytes_sent", bytes_sent
);
687 f
->dump_unsigned("bytes_received", bytes_received
);
688 f
->dump_unsigned("obj_size", obj_size
);
689 f
->dump_stream("total_time") << total_time
;
690 f
->dump_string("user_agent", user_agent
);
691 f
->dump_string("referrer", referrer
);
692 f
->dump_string("bucket_id", bucket_id
);
693 f
->dump_string("trans_id", trans_id
);
694 f
->dump_unsigned("identity_type", identity_type
);