]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_log.cc
update sources to v12.1.0
[ceph.git] / ceph / src / rgw / rgw_log.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "common/Clock.h"
5#include "common/Timer.h"
6#include "common/utf8.h"
7#include "common/OutputDataSocket.h"
8#include "common/Formatter.h"
9
10#include "rgw_bucket.h"
11#include "rgw_log.h"
12#include "rgw_acl.h"
13#include "rgw_rados.h"
14#include "rgw_client_io.h"
15#include "rgw_rest.h"
16
17#define dout_subsys ceph_subsys_rgw
18
19static void set_param_str(struct req_state *s, const char *name, string& str)
20{
21 const char *p = s->info.env->get(name);
22 if (p)
23 str = p;
24}
25
26string render_log_object_name(const string& format,
27 struct tm *dt, string& bucket_id,
28 const string& bucket_name)
29{
30 string o;
31 for (unsigned i=0; i<format.size(); i++) {
32 if (format[i] == '%' && i+1 < format.size()) {
33 i++;
34 char buf[32];
35 switch (format[i]) {
36 case '%':
37 strcpy(buf, "%");
38 break;
39 case 'Y':
40 sprintf(buf, "%.4d", dt->tm_year + 1900);
41 break;
42 case 'y':
43 sprintf(buf, "%.2d", dt->tm_year % 100);
44 break;
45 case 'm':
46 sprintf(buf, "%.2d", dt->tm_mon + 1);
47 break;
48 case 'd':
49 sprintf(buf, "%.2d", dt->tm_mday);
50 break;
51 case 'H':
52 sprintf(buf, "%.2d", dt->tm_hour);
53 break;
54 case 'I':
55 sprintf(buf, "%.2d", (dt->tm_hour % 12) + 1);
56 break;
57 case 'k':
58 sprintf(buf, "%d", dt->tm_hour);
59 break;
60 case 'l':
61 sprintf(buf, "%d", (dt->tm_hour % 12) + 1);
62 break;
63 case 'M':
64 sprintf(buf, "%.2d", dt->tm_min);
65 break;
66
67 case 'i':
68 o += bucket_id;
69 continue;
70 case 'n':
71 o += bucket_name;
72 continue;
73 default:
74 // unknown code
75 sprintf(buf, "%%%c", format[i]);
76 break;
77 }
78 o += buf;
79 continue;
80 }
81 o += format[i];
82 }
83 return o;
84}
85
86/* usage logger */
87class UsageLogger {
88 CephContext *cct;
89 RGWRados *store;
90 map<rgw_user_bucket, RGWUsageBatch> usage_map;
91 Mutex lock;
92 int32_t num_entries;
93 Mutex timer_lock;
94 SafeTimer timer;
95 utime_t round_timestamp;
96
97 class C_UsageLogTimeout : public Context {
98 UsageLogger *logger;
99 public:
100 explicit C_UsageLogTimeout(UsageLogger *_l) : logger(_l) {}
101 void finish(int r) override {
102 logger->flush();
103 logger->set_timer();
104 }
105 };
106
107 void set_timer() {
108 timer.add_event_after(cct->_conf->rgw_usage_log_tick_interval, new C_UsageLogTimeout(this));
109 }
110public:
111
112 UsageLogger(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("UsageLogger"), num_entries(0), timer_lock("UsageLogger::timer_lock"), timer(cct, timer_lock) {
113 timer.init();
114 Mutex::Locker l(timer_lock);
115 set_timer();
116 utime_t ts = ceph_clock_now();
117 recalc_round_timestamp(ts);
118 }
119
120 ~UsageLogger() {
121 Mutex::Locker l(timer_lock);
122 flush();
123 timer.cancel_all_events();
124 timer.shutdown();
125 }
126
127 void recalc_round_timestamp(utime_t& ts) {
128 round_timestamp = ts.round_to_hour();
129 }
130
131 void insert_user(utime_t& timestamp, const rgw_user& user, rgw_usage_log_entry& entry) {
132 lock.Lock();
133 if (timestamp.sec() > round_timestamp + 3600)
134 recalc_round_timestamp(timestamp);
135 entry.epoch = round_timestamp.sec();
136 bool account;
137 string u = user.to_str();
138 rgw_user_bucket ub(u, entry.bucket);
139 real_time rt = round_timestamp.to_real_time();
140 usage_map[ub].insert(rt, entry, &account);
141 if (account)
142 num_entries++;
143 bool need_flush = (num_entries > cct->_conf->rgw_usage_log_flush_threshold);
144 lock.Unlock();
145 if (need_flush) {
146 Mutex::Locker l(timer_lock);
147 flush();
148 }
149 }
150
151 void insert(utime_t& timestamp, rgw_usage_log_entry& entry) {
152 if (entry.payer.empty()) {
153 insert_user(timestamp, entry.owner, entry);
154 } else {
155 insert_user(timestamp, entry.payer, entry);
156 }
157 }
158
159 void flush() {
160 map<rgw_user_bucket, RGWUsageBatch> old_map;
161 lock.Lock();
162 old_map.swap(usage_map);
163 num_entries = 0;
164 lock.Unlock();
165
166 store->log_usage(old_map);
167 }
168};
169
170static UsageLogger *usage_logger = NULL;
171
172void rgw_log_usage_init(CephContext *cct, RGWRados *store)
173{
174 usage_logger = new UsageLogger(cct, store);
175}
176
177void rgw_log_usage_finalize()
178{
179 delete usage_logger;
180 usage_logger = NULL;
181}
182
183static void log_usage(struct req_state *s, const string& op_name)
184{
185 if (s->system_request) /* don't log system user operations */
186 return;
187
188 if (!usage_logger)
189 return;
190
191 rgw_user user;
192 rgw_user payer;
193 string bucket_name;
194
195 bucket_name = s->bucket_name;
196
197 if (!bucket_name.empty()) {
198 user = s->bucket_owner.get_id();
199 if (s->bucket_info.requester_pays) {
200 payer = s->user->user_id;
201 }
202 } else {
203 user = s->user->user_id;
204 }
205
206 bool error = s->err.is_err();
207 if (error && s->err.http_ret == 404) {
208 bucket_name = "-"; /* bucket not found, use the invalid '-' as bucket name */
209 }
210
211 string u = user.to_str();
212 string p = payer.to_str();
213 rgw_usage_log_entry entry(u, p, bucket_name);
214
215 uint64_t bytes_sent = ACCOUNTING_IO(s)->get_bytes_sent();
216 uint64_t bytes_received = ACCOUNTING_IO(s)->get_bytes_received();
217
218 rgw_usage_data data(bytes_sent, bytes_received);
219
220 data.ops = 1;
31f18b77 221 if (!s->is_err())
7c673cae
FG
222 data.successful_ops = 1;
223
224 entry.add(op_name, data);
225
226 utime_t ts = ceph_clock_now();
227
228 usage_logger->insert(ts, entry);
229}
230
231void rgw_format_ops_log_entry(struct rgw_log_entry& entry, Formatter *formatter)
232{
233 formatter->open_object_section("log_entry");
234 formatter->dump_string("bucket", entry.bucket);
235 entry.time.gmtime(formatter->dump_stream("time")); // UTC
236 entry.time.localtime(formatter->dump_stream("time_local"));
237 formatter->dump_string("remote_addr", entry.remote_addr);
238 string obj_owner = entry.object_owner.to_str();
239 if (obj_owner.length())
240 formatter->dump_string("object_owner", obj_owner);
241 formatter->dump_string("user", entry.user);
242 formatter->dump_string("operation", entry.op);
243 formatter->dump_string("uri", entry.uri);
244 formatter->dump_string("http_status", entry.http_status);
245 formatter->dump_string("error_code", entry.error_code);
246 formatter->dump_int("bytes_sent", entry.bytes_sent);
247 formatter->dump_int("bytes_received", entry.bytes_received);
248 formatter->dump_int("object_size", entry.obj_size);
249 uint64_t total_time = entry.total_time.sec() * 1000000LL + entry.total_time.usec();
250
251 formatter->dump_int("total_time", total_time);
252 formatter->dump_string("user_agent", entry.user_agent);
253 formatter->dump_string("referrer", entry.referrer);
254 if (entry.x_headers.size() > 0) {
255 formatter->open_array_section("http_x_headers");
256 for (const auto& iter: entry.x_headers) {
257 formatter->open_object_section(iter.first.c_str());
258 formatter->dump_string(iter.first.c_str(), iter.second);
259 formatter->close_section();
260 }
261 formatter->close_section();
262 }
263 formatter->close_section();
264}
265
266void OpsLogSocket::formatter_to_bl(bufferlist& bl)
267{
268 stringstream ss;
269 formatter->flush(ss);
270 const string& s = ss.str();
271
272 bl.append(s);
273}
274
275void OpsLogSocket::init_connection(bufferlist& bl)
276{
277 bl.append("[");
278}
279
280OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog), lock("OpsLogSocket")
281{
282 formatter = new JSONFormatter;
283 delim.append(",\n");
284}
285
286OpsLogSocket::~OpsLogSocket()
287{
288 delete formatter;
289}
290
291void OpsLogSocket::log(struct rgw_log_entry& entry)
292{
293 bufferlist bl;
294
295 lock.Lock();
296 rgw_format_ops_log_entry(entry, formatter);
297 formatter_to_bl(bl);
298 lock.Unlock();
299
300 append_output(bl);
301}
302
303int rgw_log_op(RGWRados *store, RGWREST* const rest, struct req_state *s,
304 const string& op_name, OpsLogSocket *olog)
305{
306 struct rgw_log_entry entry;
307 string bucket_id;
308
309 if (s->enable_usage_log)
310 log_usage(s, op_name);
311
312 if (!s->enable_ops_log)
313 return 0;
314
315 if (s->bucket_name.empty()) {
316 ldout(s->cct, 5) << "nothing to log for operation" << dendl;
317 return -EINVAL;
318 }
319 if (s->err.ret == -ERR_NO_SUCH_BUCKET) {
320 if (!s->cct->_conf->rgw_log_nonexistent_bucket) {
321 ldout(s->cct, 5) << "bucket " << s->bucket << " doesn't exist, not logging" << dendl;
322 return 0;
323 }
324 bucket_id = "";
325 } else {
326 bucket_id = s->bucket.bucket_id;
327 }
328 rgw_make_bucket_entry_name(s->bucket_tenant, s->bucket_name, entry.bucket);
329
330 if (check_utf8(s->bucket_name.c_str(), entry.bucket.size()) != 0) {
331 ldout(s->cct, 5) << "not logging op on bucket with non-utf8 name" << dendl;
332 return 0;
333 }
334
335 if (!s->object.empty()) {
336 entry.obj = s->object;
337 } else {
338 entry.obj = rgw_obj_key("-");
339 }
340
341 entry.obj_size = s->obj_size;
342
343 if (s->cct->_conf->rgw_remote_addr_param.length())
344 set_param_str(s, s->cct->_conf->rgw_remote_addr_param.c_str(),
345 entry.remote_addr);
346 else
347 set_param_str(s, "REMOTE_ADDR", entry.remote_addr);
348 set_param_str(s, "HTTP_USER_AGENT", entry.user_agent);
349 set_param_str(s, "HTTP_REFERRER", entry.referrer);
350 set_param_str(s, "REQUEST_URI", entry.uri);
351 set_param_str(s, "REQUEST_METHOD", entry.op);
352
353 /* custom header logging */
354 if (rest) {
355 if (rest->log_x_headers()) {
356 for (const auto& iter : s->info.env->get_map()) {
357 if (rest->log_x_header(iter.first)) {
358 entry.x_headers.insert(
359 rgw_log_entry::headers_map::value_type(iter.first, iter.second));
360 }
361 }
362 }
363 }
364
365 entry.user = s->user->user_id.to_str();
366 if (s->object_acl)
367 entry.object_owner = s->object_acl->get_owner().get_id();
368 entry.bucket_owner = s->bucket_owner.get_id();
369
370 uint64_t bytes_sent = ACCOUNTING_IO(s)->get_bytes_sent();
371 uint64_t bytes_received = ACCOUNTING_IO(s)->get_bytes_received();
372
373 entry.time = s->time;
374 entry.total_time = ceph_clock_now() - s->time;
375 entry.bytes_sent = bytes_sent;
376 entry.bytes_received = bytes_received;
377 if (s->err.http_ret) {
378 char buf[16];
379 snprintf(buf, sizeof(buf), "%d", s->err.http_ret);
380 entry.http_status = buf;
381 } else
382 entry.http_status = "200"; // default
383
31f18b77 384 entry.error_code = s->err.err_code;
7c673cae
FG
385 entry.bucket_id = bucket_id;
386
387 bufferlist bl;
388 ::encode(entry, bl);
389
390 struct tm bdt;
391 time_t t = entry.time.sec();
392 if (s->cct->_conf->rgw_log_object_name_utc)
393 gmtime_r(&t, &bdt);
394 else
395 localtime_r(&t, &bdt);
396
397 int ret = 0;
398
399 if (s->cct->_conf->rgw_ops_log_rados) {
400 string oid = render_log_object_name(s->cct->_conf->rgw_log_object_name, &bdt,
401 s->bucket.bucket_id, entry.bucket);
402
403 rgw_raw_obj obj(store->get_zone_params().log_pool, oid);
404
405 ret = store->append_async(obj, bl.length(), bl);
406 if (ret == -ENOENT) {
407 ret = store->create_pool(store->get_zone_params().log_pool);
408 if (ret < 0)
409 goto done;
410 // retry
411 ret = store->append_async(obj, bl.length(), bl);
412 }
413 }
414
415 if (olog) {
416 olog->log(entry);
417 }
418done:
419 if (ret < 0)
420 ldout(s->cct, 0) << "ERROR: failed to log entry" << dendl;
421
422 return ret;
423}
424