]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_log.cc
import 15.2.9
[ceph.git] / ceph / src / rgw / rgw_log.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "common/Clock.h"
5 #include "common/Timer.h"
6 #include "common/utf8.h"
7 #include "common/OutputDataSocket.h"
8 #include "common/Formatter.h"
9
10 #include "rgw_bucket.h"
11 #include "rgw_log.h"
12 #include "rgw_acl.h"
13 #include "rgw_rados.h"
14 #include "rgw_client_io.h"
15 #include "rgw_rest.h"
16 #include "rgw_zone.h"
17
18 #include "services/svc_zone.h"
19
20 #define dout_subsys ceph_subsys_rgw
21
22 static void set_param_str(struct req_state *s, const char *name, string& str)
23 {
24 const char *p = s->info.env->get(name);
25 if (p)
26 str = p;
27 }
28
29 string render_log_object_name(const string& format,
30 struct tm *dt, string& bucket_id,
31 const string& bucket_name)
32 {
33 string o;
34 for (unsigned i=0; i<format.size(); i++) {
35 if (format[i] == '%' && i+1 < format.size()) {
36 i++;
37 char buf[32];
38 switch (format[i]) {
39 case '%':
40 strcpy(buf, "%");
41 break;
42 case 'Y':
43 sprintf(buf, "%.4d", dt->tm_year + 1900);
44 break;
45 case 'y':
46 sprintf(buf, "%.2d", dt->tm_year % 100);
47 break;
48 case 'm':
49 sprintf(buf, "%.2d", dt->tm_mon + 1);
50 break;
51 case 'd':
52 sprintf(buf, "%.2d", dt->tm_mday);
53 break;
54 case 'H':
55 sprintf(buf, "%.2d", dt->tm_hour);
56 break;
57 case 'I':
58 sprintf(buf, "%.2d", (dt->tm_hour % 12) + 1);
59 break;
60 case 'k':
61 sprintf(buf, "%d", dt->tm_hour);
62 break;
63 case 'l':
64 sprintf(buf, "%d", (dt->tm_hour % 12) + 1);
65 break;
66 case 'M':
67 sprintf(buf, "%.2d", dt->tm_min);
68 break;
69
70 case 'i':
71 o += bucket_id;
72 continue;
73 case 'n':
74 o += bucket_name;
75 continue;
76 default:
77 // unknown code
78 sprintf(buf, "%%%c", format[i]);
79 break;
80 }
81 o += buf;
82 continue;
83 }
84 o += format[i];
85 }
86 return o;
87 }
88
89 /* usage logger */
90 class UsageLogger {
91 CephContext *cct;
92 RGWRados *store;
93 map<rgw_user_bucket, RGWUsageBatch> usage_map;
94 ceph::mutex lock = ceph::make_mutex("UsageLogger");
95 int32_t num_entries;
96 ceph::mutex timer_lock = ceph::make_mutex("UsageLogger::timer_lock");
97 SafeTimer timer;
98 utime_t round_timestamp;
99
100 class C_UsageLogTimeout : public Context {
101 UsageLogger *logger;
102 public:
103 explicit C_UsageLogTimeout(UsageLogger *_l) : logger(_l) {}
104 void finish(int r) override {
105 logger->flush();
106 logger->set_timer();
107 }
108 };
109
110 void set_timer() {
111 timer.add_event_after(cct->_conf->rgw_usage_log_tick_interval, new C_UsageLogTimeout(this));
112 }
113 public:
114
115 UsageLogger(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), num_entries(0), timer(cct, timer_lock) {
116 timer.init();
117 std::lock_guard l{timer_lock};
118 set_timer();
119 utime_t ts = ceph_clock_now();
120 recalc_round_timestamp(ts);
121 }
122
123 ~UsageLogger() {
124 std::lock_guard l{timer_lock};
125 flush();
126 timer.cancel_all_events();
127 timer.shutdown();
128 }
129
130 void recalc_round_timestamp(utime_t& ts) {
131 round_timestamp = ts.round_to_hour();
132 }
133
134 void insert_user(utime_t& timestamp, const rgw_user& user, rgw_usage_log_entry& entry) {
135 lock.lock();
136 if (timestamp.sec() > round_timestamp + 3600)
137 recalc_round_timestamp(timestamp);
138 entry.epoch = round_timestamp.sec();
139 bool account;
140 string u = user.to_str();
141 rgw_user_bucket ub(u, entry.bucket);
142 real_time rt = round_timestamp.to_real_time();
143 usage_map[ub].insert(rt, entry, &account);
144 if (account)
145 num_entries++;
146 bool need_flush = (num_entries > cct->_conf->rgw_usage_log_flush_threshold);
147 lock.unlock();
148 if (need_flush) {
149 std::lock_guard l{timer_lock};
150 flush();
151 }
152 }
153
154 void insert(utime_t& timestamp, rgw_usage_log_entry& entry) {
155 if (entry.payer.empty()) {
156 insert_user(timestamp, entry.owner, entry);
157 } else {
158 insert_user(timestamp, entry.payer, entry);
159 }
160 }
161
162 void flush() {
163 map<rgw_user_bucket, RGWUsageBatch> old_map;
164 lock.lock();
165 old_map.swap(usage_map);
166 num_entries = 0;
167 lock.unlock();
168
169 store->log_usage(old_map);
170 }
171 };
172
173 static UsageLogger *usage_logger = NULL;
174
175 void rgw_log_usage_init(CephContext *cct, RGWRados *store)
176 {
177 usage_logger = new UsageLogger(cct, store);
178 }
179
180 void rgw_log_usage_finalize()
181 {
182 delete usage_logger;
183 usage_logger = NULL;
184 }
185
186 static void log_usage(struct req_state *s, const string& op_name)
187 {
188 if (s->system_request) /* don't log system user operations */
189 return;
190
191 if (!usage_logger)
192 return;
193
194 rgw_user user;
195 rgw_user payer;
196 string bucket_name;
197
198 bucket_name = s->bucket_name;
199
200 if (!bucket_name.empty()) {
201 user = s->bucket_owner.get_id();
202 if (s->bucket_info.requester_pays) {
203 payer = s->user->get_id();
204 }
205 } else {
206 user = s->user->get_id();
207 }
208
209 bool error = s->err.is_err();
210 if (error && s->err.http_ret == 404) {
211 bucket_name = "-"; /* bucket not found, use the invalid '-' as bucket name */
212 }
213
214 string u = user.to_str();
215 string p = payer.to_str();
216 rgw_usage_log_entry entry(u, p, bucket_name);
217
218 uint64_t bytes_sent = ACCOUNTING_IO(s)->get_bytes_sent();
219 uint64_t bytes_received = ACCOUNTING_IO(s)->get_bytes_received();
220
221 rgw_usage_data data(bytes_sent, bytes_received);
222
223 data.ops = 1;
224 if (!s->is_err())
225 data.successful_ops = 1;
226
227 ldout(s->cct, 30) << "log_usage: bucket_name=" << bucket_name
228 << " tenant=" << s->bucket_tenant
229 << ", bytes_sent=" << bytes_sent << ", bytes_received="
230 << bytes_received << ", success=" << data.successful_ops << dendl;
231
232 entry.add(op_name, data);
233
234 utime_t ts = ceph_clock_now();
235
236 usage_logger->insert(ts, entry);
237 }
238
239 void rgw_format_ops_log_entry(struct rgw_log_entry& entry, Formatter *formatter)
240 {
241 formatter->open_object_section("log_entry");
242 formatter->dump_string("bucket", entry.bucket);
243 {
244 auto t = utime_t{entry.time};
245 t.gmtime(formatter->dump_stream("time")); // UTC
246 t.localtime(formatter->dump_stream("time_local"));
247 }
248 formatter->dump_string("remote_addr", entry.remote_addr);
249 string obj_owner = entry.object_owner.to_str();
250 if (obj_owner.length())
251 formatter->dump_string("object_owner", obj_owner);
252 formatter->dump_string("user", entry.user);
253 formatter->dump_string("operation", entry.op);
254 formatter->dump_string("uri", entry.uri);
255 formatter->dump_string("http_status", entry.http_status);
256 formatter->dump_string("error_code", entry.error_code);
257 formatter->dump_int("bytes_sent", entry.bytes_sent);
258 formatter->dump_int("bytes_received", entry.bytes_received);
259 formatter->dump_int("object_size", entry.obj_size);
260 {
261 using namespace std::chrono;
262 uint64_t total_time = duration_cast<milliseconds>(entry.total_time).count();
263 formatter->dump_int("total_time", total_time);
264 }
265 formatter->dump_string("user_agent", entry.user_agent);
266 formatter->dump_string("referrer", entry.referrer);
267 if (entry.x_headers.size() > 0) {
268 formatter->open_array_section("http_x_headers");
269 for (const auto& iter: entry.x_headers) {
270 formatter->open_object_section(iter.first.c_str());
271 formatter->dump_string(iter.first.c_str(), iter.second);
272 formatter->close_section();
273 }
274 formatter->close_section();
275 }
276 formatter->dump_string("trans_id", entry.trans_id);
277 if (entry.token_claims.size() > 0) {
278 if (entry.token_claims[0] == "sts") {
279 formatter->open_object_section("sts_token_claims");
280 for (const auto& iter: entry.token_claims) {
281 auto pos = iter.find(":");
282 if (pos != string::npos) {
283 formatter->dump_string(iter.substr(0, pos), iter.substr(pos + 1));
284 }
285 }
286 formatter->close_section();
287 }
288 }
289
290 formatter->close_section();
291 }
292
293 void OpsLogSocket::formatter_to_bl(bufferlist& bl)
294 {
295 stringstream ss;
296 formatter->flush(ss);
297 const string& s = ss.str();
298
299 bl.append(s);
300 }
301
302 void OpsLogSocket::init_connection(bufferlist& bl)
303 {
304 bl.append("[");
305 }
306
307 OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog)
308 {
309 formatter = new JSONFormatter;
310 delim.append(",\n");
311 }
312
313 OpsLogSocket::~OpsLogSocket()
314 {
315 delete formatter;
316 }
317
318 void OpsLogSocket::log(struct rgw_log_entry& entry)
319 {
320 bufferlist bl;
321
322 lock.lock();
323 rgw_format_ops_log_entry(entry, formatter);
324 formatter_to_bl(bl);
325 lock.unlock();
326
327 append_output(bl);
328 }
329
330 int rgw_log_op(RGWRados *store, RGWREST* const rest, struct req_state *s,
331 const string& op_name, OpsLogSocket *olog)
332 {
333 struct rgw_log_entry entry;
334 string bucket_id;
335
336 if (s->enable_usage_log)
337 log_usage(s, op_name);
338
339 if (!s->enable_ops_log)
340 return 0;
341
342 if (s->bucket_name.empty()) {
343 ldout(s->cct, 5) << "nothing to log for operation" << dendl;
344 return -EINVAL;
345 }
346 if (s->err.ret == -ERR_NO_SUCH_BUCKET) {
347 if (!s->cct->_conf->rgw_log_nonexistent_bucket) {
348 ldout(s->cct, 5) << "bucket " << s->bucket << " doesn't exist, not logging" << dendl;
349 return 0;
350 }
351 bucket_id = "";
352 } else {
353 bucket_id = s->bucket.bucket_id;
354 }
355 entry.bucket = rgw_make_bucket_entry_name(s->bucket_tenant, s->bucket_name);
356
357 if (check_utf8(entry.bucket.c_str(), entry.bucket.size()) != 0) {
358 ldout(s->cct, 5) << "not logging op on bucket with non-utf8 name" << dendl;
359 return 0;
360 }
361
362 if (!s->object.empty()) {
363 entry.obj = s->object;
364 } else {
365 entry.obj = rgw_obj_key("-");
366 }
367
368 entry.obj_size = s->obj_size;
369
370 if (s->cct->_conf->rgw_remote_addr_param.length())
371 set_param_str(s, s->cct->_conf->rgw_remote_addr_param.c_str(),
372 entry.remote_addr);
373 else
374 set_param_str(s, "REMOTE_ADDR", entry.remote_addr);
375 set_param_str(s, "HTTP_USER_AGENT", entry.user_agent);
376 // legacy apps are still using misspelling referer, such as curl -e option
377 if (s->info.env->exists("HTTP_REFERRER"))
378 set_param_str(s, "HTTP_REFERRER", entry.referrer);
379 else
380 set_param_str(s, "HTTP_REFERER", entry.referrer);
381
382 std::string uri;
383 if (s->info.env->exists("REQUEST_METHOD")) {
384 uri.append(s->info.env->get("REQUEST_METHOD"));
385 uri.append(" ");
386 }
387
388 if (s->info.env->exists("REQUEST_URI")) {
389 uri.append(s->info.env->get("REQUEST_URI"));
390 }
391
392 if (s->info.env->exists("QUERY_STRING")) {
393 const char* qs = s->info.env->get("QUERY_STRING");
394 if(qs && (*qs != '\0')) {
395 uri.append("?");
396 uri.append(qs);
397 }
398 }
399
400 if (s->info.env->exists("HTTP_VERSION")) {
401 uri.append(" ");
402 uri.append("HTTP/");
403 uri.append(s->info.env->get("HTTP_VERSION"));
404 }
405
406 entry.uri = std::move(uri);
407
408 entry.op = op_name;
409
410 if (! s->token_claims.empty()) {
411 entry.token_claims = std::move(s->token_claims);
412 }
413
414 /* custom header logging */
415 if (rest) {
416 if (rest->log_x_headers()) {
417 for (const auto& iter : s->info.env->get_map()) {
418 if (rest->log_x_header(iter.first)) {
419 entry.x_headers.insert(
420 rgw_log_entry::headers_map::value_type(iter.first, iter.second));
421 }
422 }
423 }
424 }
425
426 entry.user = s->user->get_id().to_str();
427 if (s->object_acl)
428 entry.object_owner = s->object_acl->get_owner().get_id();
429 entry.bucket_owner = s->bucket_owner.get_id();
430
431 uint64_t bytes_sent = ACCOUNTING_IO(s)->get_bytes_sent();
432 uint64_t bytes_received = ACCOUNTING_IO(s)->get_bytes_received();
433
434 entry.time = s->time;
435 entry.total_time = s->time_elapsed();
436 entry.bytes_sent = bytes_sent;
437 entry.bytes_received = bytes_received;
438 if (s->err.http_ret) {
439 char buf[16];
440 snprintf(buf, sizeof(buf), "%d", s->err.http_ret);
441 entry.http_status = buf;
442 } else
443 entry.http_status = "200"; // default
444
445 entry.error_code = s->err.err_code;
446 entry.bucket_id = bucket_id;
447 entry.trans_id = s->trans_id;
448
449 bufferlist bl;
450 encode(entry, bl);
451
452 struct tm bdt;
453 time_t t = req_state::Clock::to_time_t(entry.time);
454 if (s->cct->_conf->rgw_log_object_name_utc)
455 gmtime_r(&t, &bdt);
456 else
457 localtime_r(&t, &bdt);
458
459 int ret = 0;
460
461 if (s->cct->_conf->rgw_ops_log_rados) {
462 string oid = render_log_object_name(s->cct->_conf->rgw_log_object_name, &bdt,
463 s->bucket.bucket_id, entry.bucket);
464
465 rgw_raw_obj obj(store->svc.zone->get_zone_params().log_pool, oid);
466
467 ret = store->append_async(obj, bl.length(), bl);
468 if (ret == -ENOENT) {
469 ret = store->create_pool(store->svc.zone->get_zone_params().log_pool);
470 if (ret < 0)
471 goto done;
472 // retry
473 ret = store->append_async(obj, bl.length(), bl);
474 }
475 }
476
477 if (olog) {
478 olog->log(entry);
479 }
480 done:
481 if (ret < 0)
482 ldout(s->cct, 0) << "ERROR: failed to log entry" << dendl;
483
484 return ret;
485 }
486