1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "common/Clock.h"
5 #include "common/Timer.h"
6 #include "common/utf8.h"
7 #include "common/OutputDataSocket.h"
8 #include "common/Formatter.h"
10 #include "rgw_bucket.h"
13 #include "rgw_client_io.h"
17 #include "services/svc_zone.h"
19 #define dout_subsys ceph_subsys_rgw
21 static void set_param_str(struct req_state
*s
, const char *name
, string
& str
)
23 const char *p
= s
->info
.env
->get(name
);
28 string
render_log_object_name(const string
& format
,
29 struct tm
*dt
, const string
& bucket_id
,
30 const string
& bucket_name
)
33 for (unsigned i
=0; i
<format
.size(); i
++) {
34 if (format
[i
] == '%' && i
+1 < format
.size()) {
42 sprintf(buf
, "%.4d", dt
->tm_year
+ 1900);
45 sprintf(buf
, "%.2d", dt
->tm_year
% 100);
48 sprintf(buf
, "%.2d", dt
->tm_mon
+ 1);
51 sprintf(buf
, "%.2d", dt
->tm_mday
);
54 sprintf(buf
, "%.2d", dt
->tm_hour
);
57 sprintf(buf
, "%.2d", (dt
->tm_hour
% 12) + 1);
60 sprintf(buf
, "%d", dt
->tm_hour
);
63 sprintf(buf
, "%d", (dt
->tm_hour
% 12) + 1);
66 sprintf(buf
, "%.2d", dt
->tm_min
);
77 sprintf(buf
, "%%%c", format
[i
]);
89 class UsageLogger
: public DoutPrefixProvider
{
92 map
<rgw_user_bucket
, RGWUsageBatch
> usage_map
;
93 ceph::mutex lock
= ceph::make_mutex("UsageLogger");
95 ceph::mutex timer_lock
= ceph::make_mutex("UsageLogger::timer_lock");
97 utime_t round_timestamp
;
99 class C_UsageLogTimeout
: public Context
{
102 explicit C_UsageLogTimeout(UsageLogger
*_l
) : logger(_l
) {}
103 void finish(int r
) override
{
110 timer
.add_event_after(cct
->_conf
->rgw_usage_log_tick_interval
, new C_UsageLogTimeout(this));
114 UsageLogger(CephContext
*_cct
, RGWRados
*_store
) : cct(_cct
), store(_store
), num_entries(0), timer(cct
, timer_lock
) {
116 std::lock_guard l
{timer_lock
};
118 utime_t ts
= ceph_clock_now();
119 recalc_round_timestamp(ts
);
123 std::lock_guard l
{timer_lock
};
125 timer
.cancel_all_events();
129 void recalc_round_timestamp(utime_t
& ts
) {
130 round_timestamp
= ts
.round_to_hour();
133 void insert_user(utime_t
& timestamp
, const rgw_user
& user
, rgw_usage_log_entry
& entry
) {
135 if (timestamp
.sec() > round_timestamp
+ 3600)
136 recalc_round_timestamp(timestamp
);
137 entry
.epoch
= round_timestamp
.sec();
139 string u
= user
.to_str();
140 rgw_user_bucket
ub(u
, entry
.bucket
);
141 real_time rt
= round_timestamp
.to_real_time();
142 usage_map
[ub
].insert(rt
, entry
, &account
);
145 bool need_flush
= (num_entries
> cct
->_conf
->rgw_usage_log_flush_threshold
);
148 std::lock_guard l
{timer_lock
};
153 void insert(utime_t
& timestamp
, rgw_usage_log_entry
& entry
) {
154 if (entry
.payer
.empty()) {
155 insert_user(timestamp
, entry
.owner
, entry
);
157 insert_user(timestamp
, entry
.payer
, entry
);
162 map
<rgw_user_bucket
, RGWUsageBatch
> old_map
;
164 old_map
.swap(usage_map
);
168 store
->log_usage(this, old_map
);
171 CephContext
*get_cct() const override
{ return cct
; }
172 unsigned get_subsys() const override
{ return dout_subsys
; }
173 std::ostream
& gen_prefix(std::ostream
& out
) const override
{ return out
<< "rgw UsageLogger: "; }
176 static UsageLogger
*usage_logger
= NULL
;
178 void rgw_log_usage_init(CephContext
*cct
, RGWRados
*store
)
180 usage_logger
= new UsageLogger(cct
, store
);
183 void rgw_log_usage_finalize()
189 static void log_usage(struct req_state
*s
, const string
& op_name
)
191 if (s
->system_request
) /* don't log system user operations */
201 bucket_name
= s
->bucket_name
;
203 if (!bucket_name
.empty()) {
204 bucket_name
= s
->bucket_name
;
205 user
= s
->bucket_owner
.get_id();
206 if (!rgw::sal::RGWBucket::empty(s
->bucket
.get()) &&
207 s
->bucket
->get_info().requester_pays
) {
208 payer
= s
->user
->get_id();
211 user
= s
->user
->get_id();
214 bool error
= s
->err
.is_err();
215 if (error
&& s
->err
.http_ret
== 404) {
216 bucket_name
= "-"; /* bucket not found, use the invalid '-' as bucket name */
219 string u
= user
.to_str();
220 string p
= payer
.to_str();
221 rgw_usage_log_entry
entry(u
, p
, bucket_name
);
223 uint64_t bytes_sent
= ACCOUNTING_IO(s
)->get_bytes_sent();
224 uint64_t bytes_received
= ACCOUNTING_IO(s
)->get_bytes_received();
226 rgw_usage_data
data(bytes_sent
, bytes_received
);
230 data
.successful_ops
= 1;
232 ldpp_dout(s
, 30) << "log_usage: bucket_name=" << bucket_name
233 << " tenant=" << s
->bucket_tenant
234 << ", bytes_sent=" << bytes_sent
<< ", bytes_received="
235 << bytes_received
<< ", success=" << data
.successful_ops
<< dendl
;
237 entry
.add(op_name
, data
);
239 utime_t ts
= ceph_clock_now();
241 usage_logger
->insert(ts
, entry
);
244 void rgw_format_ops_log_entry(struct rgw_log_entry
& entry
, Formatter
*formatter
)
246 formatter
->open_object_section("log_entry");
247 formatter
->dump_string("bucket", entry
.bucket
);
249 auto t
= utime_t
{entry
.time
};
250 t
.gmtime(formatter
->dump_stream("time")); // UTC
251 t
.localtime(formatter
->dump_stream("time_local"));
253 formatter
->dump_string("remote_addr", entry
.remote_addr
);
254 string obj_owner
= entry
.object_owner
.to_str();
255 if (obj_owner
.length())
256 formatter
->dump_string("object_owner", obj_owner
);
257 formatter
->dump_string("user", entry
.user
);
258 formatter
->dump_string("operation", entry
.op
);
259 formatter
->dump_string("uri", entry
.uri
);
260 formatter
->dump_string("http_status", entry
.http_status
);
261 formatter
->dump_string("error_code", entry
.error_code
);
262 formatter
->dump_int("bytes_sent", entry
.bytes_sent
);
263 formatter
->dump_int("bytes_received", entry
.bytes_received
);
264 formatter
->dump_int("object_size", entry
.obj_size
);
266 using namespace std::chrono
;
267 uint64_t total_time
= duration_cast
<milliseconds
>(entry
.total_time
).count();
268 formatter
->dump_int("total_time", total_time
);
270 formatter
->dump_string("user_agent", entry
.user_agent
);
271 formatter
->dump_string("referrer", entry
.referrer
);
272 if (entry
.x_headers
.size() > 0) {
273 formatter
->open_array_section("http_x_headers");
274 for (const auto& iter
: entry
.x_headers
) {
275 formatter
->open_object_section(iter
.first
.c_str());
276 formatter
->dump_string(iter
.first
.c_str(), iter
.second
);
277 formatter
->close_section();
279 formatter
->close_section();
281 formatter
->dump_string("trans_id", entry
.trans_id
);
282 if (entry
.token_claims
.size() > 0) {
283 if (entry
.token_claims
[0] == "sts") {
284 formatter
->open_object_section("sts_token_claims");
285 for (const auto& iter
: entry
.token_claims
) {
286 auto pos
= iter
.find(":");
287 if (pos
!= string::npos
) {
288 formatter
->dump_string(iter
.substr(0, pos
), iter
.substr(pos
+ 1));
291 formatter
->close_section();
295 formatter
->close_section();
298 void OpsLogSocket::formatter_to_bl(bufferlist
& bl
)
301 formatter
->flush(ss
);
302 const string
& s
= ss
.str();
307 void OpsLogSocket::init_connection(bufferlist
& bl
)
312 OpsLogSocket::OpsLogSocket(CephContext
*cct
, uint64_t _backlog
) : OutputDataSocket(cct
, _backlog
)
314 formatter
= new JSONFormatter
;
318 OpsLogSocket::~OpsLogSocket()
323 void OpsLogSocket::log(struct rgw_log_entry
& entry
)
328 rgw_format_ops_log_entry(entry
, formatter
);
335 int rgw_log_op(RGWRados
*store
, RGWREST
* const rest
, struct req_state
*s
,
336 const string
& op_name
, OpsLogSocket
*olog
)
338 struct rgw_log_entry entry
;
341 if (s
->enable_usage_log
)
342 log_usage(s
, op_name
);
344 if (!s
->enable_ops_log
)
347 if (s
->bucket_name
.empty()) {
348 ldpp_dout(s
, 5) << "nothing to log for operation" << dendl
;
351 if (s
->err
.ret
== -ERR_NO_SUCH_BUCKET
|| rgw::sal::RGWBucket::empty(s
->bucket
.get())) {
352 if (!s
->cct
->_conf
->rgw_log_nonexistent_bucket
) {
353 ldpp_dout(s
, 5) << "bucket " << s
->bucket_name
<< " doesn't exist, not logging" << dendl
;
358 bucket_id
= s
->bucket
->get_bucket_id();
360 entry
.bucket
= rgw_make_bucket_entry_name(s
->bucket_tenant
, s
->bucket_name
);
362 if (check_utf8(entry
.bucket
.c_str(), entry
.bucket
.size()) != 0) {
363 ldpp_dout(s
, 5) << "not logging op on bucket with non-utf8 name" << dendl
;
367 if (!rgw::sal::RGWObject::empty(s
->object
.get())) {
368 entry
.obj
= s
->object
->get_key();
370 entry
.obj
= rgw_obj_key("-");
373 entry
.obj_size
= s
->obj_size
;
375 if (s
->cct
->_conf
->rgw_remote_addr_param
.length())
376 set_param_str(s
, s
->cct
->_conf
->rgw_remote_addr_param
.c_str(),
379 set_param_str(s
, "REMOTE_ADDR", entry
.remote_addr
);
380 set_param_str(s
, "HTTP_USER_AGENT", entry
.user_agent
);
381 // legacy apps are still using misspelling referer, such as curl -e option
382 if (s
->info
.env
->exists("HTTP_REFERRER"))
383 set_param_str(s
, "HTTP_REFERRER", entry
.referrer
);
385 set_param_str(s
, "HTTP_REFERER", entry
.referrer
);
388 if (s
->info
.env
->exists("REQUEST_METHOD")) {
389 uri
.append(s
->info
.env
->get("REQUEST_METHOD"));
393 if (s
->info
.env
->exists("REQUEST_URI")) {
394 uri
.append(s
->info
.env
->get("REQUEST_URI"));
397 if (s
->info
.env
->exists("QUERY_STRING")) {
398 const char* qs
= s
->info
.env
->get("QUERY_STRING");
399 if(qs
&& (*qs
!= '\0')) {
405 if (s
->info
.env
->exists("HTTP_VERSION")) {
408 uri
.append(s
->info
.env
->get("HTTP_VERSION"));
411 entry
.uri
= std::move(uri
);
415 if (! s
->token_claims
.empty()) {
416 entry
.token_claims
= std::move(s
->token_claims
);
419 /* custom header logging */
421 if (rest
->log_x_headers()) {
422 for (const auto& iter
: s
->info
.env
->get_map()) {
423 if (rest
->log_x_header(iter
.first
)) {
424 entry
.x_headers
.insert(
425 rgw_log_entry::headers_map::value_type(iter
.first
, iter
.second
));
431 entry
.user
= s
->user
->get_id().to_str();
433 entry
.object_owner
= s
->object_acl
->get_owner().get_id();
434 entry
.bucket_owner
= s
->bucket_owner
.get_id();
436 uint64_t bytes_sent
= ACCOUNTING_IO(s
)->get_bytes_sent();
437 uint64_t bytes_received
= ACCOUNTING_IO(s
)->get_bytes_received();
439 entry
.time
= s
->time
;
440 entry
.total_time
= s
->time_elapsed();
441 entry
.bytes_sent
= bytes_sent
;
442 entry
.bytes_received
= bytes_received
;
443 if (s
->err
.http_ret
) {
445 snprintf(buf
, sizeof(buf
), "%d", s
->err
.http_ret
);
446 entry
.http_status
= buf
;
448 entry
.http_status
= "200"; // default
450 entry
.error_code
= s
->err
.err_code
;
451 entry
.bucket_id
= bucket_id
;
452 entry
.trans_id
= s
->trans_id
;
458 time_t t
= req_state::Clock::to_time_t(entry
.time
);
459 if (s
->cct
->_conf
->rgw_log_object_name_utc
)
462 localtime_r(&t
, &bdt
);
466 if (s
->cct
->_conf
->rgw_ops_log_rados
) {
467 string oid
= render_log_object_name(s
->cct
->_conf
->rgw_log_object_name
, &bdt
,
468 entry
.bucket_id
, entry
.bucket
);
470 rgw_raw_obj
obj(store
->svc
.zone
->get_zone_params().log_pool
, oid
);
472 ret
= store
->append_async(s
, obj
, bl
.length(), bl
);
473 if (ret
== -ENOENT
) {
474 ret
= store
->create_pool(s
, store
->svc
.zone
->get_zone_params().log_pool
);
478 ret
= store
->append_async(s
, obj
, bl
.length(), bl
);
487 ldpp_dout(s
, 0) << "ERROR: failed to log entry" << dendl
;