]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_rest_log.cc
9220917c99a63f2395715ebb65d91cad75f85363
[ceph.git] / ceph / src / rgw / rgw_rest_log.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14 #include "common/ceph_json.h"
15 #include "common/strtol.h"
16 #include "rgw_rest.h"
17 #include "rgw_op.h"
18 #include "rgw_rest_s3.h"
19 #include "rgw_rest_log.h"
20 #include "rgw_client_io.h"
21 #include "rgw_sync.h"
22 #include "rgw_data_sync.h"
23 #include "rgw_common.h"
24 #include "common/errno.h"
25 #include "include/assert.h"
26
27 #define dout_context g_ceph_context
28 #define LOG_CLASS_LIST_MAX_ENTRIES (1000)
29 #define dout_subsys ceph_subsys_rgw
30
31 static int parse_date_str(string& in, real_time& out) {
32 uint64_t epoch = 0;
33 uint64_t nsec = 0;
34
35 if (!in.empty()) {
36 if (utime_t::parse_date(in, &epoch, &nsec) < 0) {
37 dout(5) << "Error parsing date " << in << dendl;
38 return -EINVAL;
39 }
40 }
41 out = utime_t(epoch, nsec).to_real_time();
42 return 0;
43 }
44
45 void RGWOp_MDLog_List::execute() {
46 string period = s->info.args.get("period");
47 string shard = s->info.args.get("id");
48 string max_entries_str = s->info.args.get("max-entries");
49 string st = s->info.args.get("start-time"),
50 et = s->info.args.get("end-time"),
51 marker = s->info.args.get("marker"),
52 err;
53 real_time ut_st,
54 ut_et;
55 void *handle;
56 unsigned shard_id, max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
57
58 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
59 if (!err.empty()) {
60 dout(5) << "Error parsing shard_id " << shard << dendl;
61 http_ret = -EINVAL;
62 return;
63 }
64
65 if (parse_date_str(st, ut_st) < 0) {
66 http_ret = -EINVAL;
67 return;
68 }
69
70 if (parse_date_str(et, ut_et) < 0) {
71 http_ret = -EINVAL;
72 return;
73 }
74
75 if (!max_entries_str.empty()) {
76 max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
77 if (!err.empty()) {
78 dout(5) << "Error parsing max-entries " << max_entries_str << dendl;
79 http_ret = -EINVAL;
80 return;
81 }
82 }
83
84 if (period.empty()) {
85 ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
86 period = store->get_current_period_id();
87 if (period.empty()) {
88 ldout(s->cct, 5) << "Missing period id" << dendl;
89 http_ret = -EINVAL;
90 return;
91 }
92 }
93
94 RGWMetadataLog meta_log{s->cct, store, period};
95
96 meta_log.init_list_entries(shard_id, ut_st, ut_et, marker, &handle);
97
98 do {
99 http_ret = meta_log.list_entries(handle, max_entries, entries,
100 &last_marker, &truncated);
101 if (http_ret < 0)
102 break;
103
104 if (!max_entries_str.empty())
105 max_entries -= entries.size();
106 } while (truncated && (max_entries > 0));
107
108 meta_log.complete_list_entries(handle);
109 }
110
111 void RGWOp_MDLog_List::send_response() {
112 set_req_state_err(s, http_ret);
113 dump_errno(s);
114 end_header(s);
115
116 if (http_ret < 0)
117 return;
118
119 s->formatter->open_object_section("log_entries");
120 s->formatter->dump_string("marker", last_marker);
121 s->formatter->dump_bool("truncated", truncated);
122 {
123 s->formatter->open_array_section("entries");
124 for (list<cls_log_entry>::iterator iter = entries.begin();
125 iter != entries.end(); ++iter) {
126 cls_log_entry& entry = *iter;
127 store->meta_mgr->dump_log_entry(entry, s->formatter);
128 flusher.flush();
129 }
130 s->formatter->close_section();
131 }
132 s->formatter->close_section();
133 flusher.flush();
134 }
135
136 void RGWOp_MDLog_Info::execute() {
137 num_objects = s->cct->_conf->rgw_md_log_max_shards;
138 period = store->meta_mgr->read_oldest_log_period();
139 http_ret = period.get_error();
140 }
141
142 void RGWOp_MDLog_Info::send_response() {
143 set_req_state_err(s, http_ret);
144 dump_errno(s);
145 end_header(s);
146
147 s->formatter->open_object_section("mdlog");
148 s->formatter->dump_unsigned("num_objects", num_objects);
149 if (period) {
150 s->formatter->dump_string("period", period.get_period().get_id());
151 s->formatter->dump_unsigned("realm_epoch", period.get_epoch());
152 }
153 s->formatter->close_section();
154 flusher.flush();
155 }
156
157 void RGWOp_MDLog_ShardInfo::execute() {
158 string period = s->info.args.get("period");
159 string shard = s->info.args.get("id");
160 string err;
161
162 unsigned shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
163 if (!err.empty()) {
164 dout(5) << "Error parsing shard_id " << shard << dendl;
165 http_ret = -EINVAL;
166 return;
167 }
168
169 if (period.empty()) {
170 ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
171 period = store->get_current_period_id();
172
173 if (period.empty()) {
174 ldout(s->cct, 5) << "Missing period id" << dendl;
175 http_ret = -EINVAL;
176 return;
177 }
178 }
179 RGWMetadataLog meta_log{s->cct, store, period};
180
181 http_ret = meta_log.get_info(shard_id, &info);
182 }
183
184 void RGWOp_MDLog_ShardInfo::send_response() {
185 set_req_state_err(s, http_ret);
186 dump_errno(s);
187 end_header(s);
188
189 encode_json("info", info, s->formatter);
190 flusher.flush();
191 }
192
193 void RGWOp_MDLog_Delete::execute() {
194 string st = s->info.args.get("start-time"),
195 et = s->info.args.get("end-time"),
196 start_marker = s->info.args.get("start-marker"),
197 end_marker = s->info.args.get("end-marker"),
198 period = s->info.args.get("period"),
199 shard = s->info.args.get("id"),
200 err;
201 real_time ut_st,
202 ut_et;
203 unsigned shard_id;
204
205 http_ret = 0;
206
207 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
208 if (!err.empty()) {
209 dout(5) << "Error parsing shard_id " << shard << dendl;
210 http_ret = -EINVAL;
211 return;
212 }
213 if (et.empty() && end_marker.empty()) { /* bounding end */
214 http_ret = -EINVAL;
215 return;
216 }
217
218 if (parse_date_str(st, ut_st) < 0) {
219 http_ret = -EINVAL;
220 return;
221 }
222
223 if (parse_date_str(et, ut_et) < 0) {
224 http_ret = -EINVAL;
225 return;
226 }
227
228 if (period.empty()) {
229 ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
230 period = store->get_current_period_id();
231
232 if (period.empty()) {
233 ldout(s->cct, 5) << "Missing period id" << dendl;
234 http_ret = -EINVAL;
235 return;
236 }
237 }
238 RGWMetadataLog meta_log{s->cct, store, period};
239
240 http_ret = meta_log.trim(shard_id, ut_st, ut_et, start_marker, end_marker);
241 }
242
243 void RGWOp_MDLog_Lock::execute() {
244 string period, shard_id_str, duration_str, locker_id, zone_id;
245 unsigned shard_id;
246
247 http_ret = 0;
248
249 period = s->info.args.get("period");
250 shard_id_str = s->info.args.get("id");
251 duration_str = s->info.args.get("length");
252 locker_id = s->info.args.get("locker-id");
253 zone_id = s->info.args.get("zone-id");
254
255 if (period.empty()) {
256 ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
257 period = store->get_current_period_id();
258 }
259
260 if (period.empty() ||
261 shard_id_str.empty() ||
262 (duration_str.empty()) ||
263 locker_id.empty() ||
264 zone_id.empty()) {
265 dout(5) << "Error invalid parameter list" << dendl;
266 http_ret = -EINVAL;
267 return;
268 }
269
270 string err;
271 shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
272 if (!err.empty()) {
273 dout(5) << "Error parsing shard_id param " << shard_id_str << dendl;
274 http_ret = -EINVAL;
275 return;
276 }
277
278 RGWMetadataLog meta_log{s->cct, store, period};
279 unsigned dur;
280 dur = (unsigned)strict_strtol(duration_str.c_str(), 10, &err);
281 if (!err.empty() || dur <= 0) {
282 dout(5) << "invalid length param " << duration_str << dendl;
283 http_ret = -EINVAL;
284 return;
285 }
286 http_ret = meta_log.lock_exclusive(shard_id, make_timespan(dur), zone_id,
287 locker_id);
288 if (http_ret == -EBUSY)
289 http_ret = -ERR_LOCKED;
290 }
291
292 void RGWOp_MDLog_Unlock::execute() {
293 string period, shard_id_str, locker_id, zone_id;
294 unsigned shard_id;
295
296 http_ret = 0;
297
298 period = s->info.args.get("period");
299 shard_id_str = s->info.args.get("id");
300 locker_id = s->info.args.get("locker-id");
301 zone_id = s->info.args.get("zone-id");
302
303 if (period.empty()) {
304 ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
305 period = store->get_current_period_id();
306 }
307
308 if (period.empty() ||
309 shard_id_str.empty() ||
310 locker_id.empty() ||
311 zone_id.empty()) {
312 dout(5) << "Error invalid parameter list" << dendl;
313 http_ret = -EINVAL;
314 return;
315 }
316
317 string err;
318 shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
319 if (!err.empty()) {
320 dout(5) << "Error parsing shard_id param " << shard_id_str << dendl;
321 http_ret = -EINVAL;
322 return;
323 }
324
325 RGWMetadataLog meta_log{s->cct, store, period};
326 http_ret = meta_log.unlock(shard_id, zone_id, locker_id);
327 }
328
329 void RGWOp_MDLog_Notify::execute() {
330 char *data;
331 int len = 0;
332 #define LARGE_ENOUGH_BUF (128 * 1024)
333 int r = rgw_rest_read_all_input(s, &data, &len, LARGE_ENOUGH_BUF);
334 if (r < 0) {
335 http_ret = r;
336 return;
337 }
338
339 ldout(s->cct, 20) << __func__ << "(): read data: " << string(data, len) << dendl;
340
341 JSONParser p;
342 r = p.parse(data, len);
343 free(data);
344 if (r < 0) {
345 ldout(s->cct, 0) << "ERROR: failed to parse JSON" << dendl;
346 http_ret = r;
347 return;
348 }
349
350 set<int> updated_shards;
351 try {
352 decode_json_obj(updated_shards, &p);
353 } catch (JSONDecoder::err& err) {
354 ldout(s->cct, 0) << "ERROR: failed to decode JSON" << dendl;
355 http_ret = -EINVAL;
356 return;
357 }
358
359 if (store->ctx()->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
360 for (set<int>::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) {
361 ldout(s->cct, 20) << __func__ << "(): updated shard=" << *iter << dendl;
362 }
363 }
364
365 store->wakeup_meta_sync_shards(updated_shards);
366
367 http_ret = 0;
368 }
369
370 void RGWOp_BILog_List::execute() {
371 string tenant_name = s->info.args.get("tenant"),
372 bucket_name = s->info.args.get("bucket"),
373 marker = s->info.args.get("marker"),
374 max_entries_str = s->info.args.get("max-entries"),
375 bucket_instance = s->info.args.get("bucket-instance");
376 RGWBucketInfo bucket_info;
377 unsigned max_entries;
378
379 RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
380
381 if (bucket_name.empty() && bucket_instance.empty()) {
382 dout(5) << "ERROR: neither bucket nor bucket instance specified" << dendl;
383 http_ret = -EINVAL;
384 return;
385 }
386
387 int shard_id;
388 http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
389 if (http_ret < 0) {
390 return;
391 }
392
393 if (!bucket_instance.empty()) {
394 http_ret = store->get_bucket_instance_info(obj_ctx, bucket_instance, bucket_info, NULL, NULL);
395 if (http_ret < 0) {
396 dout(5) << "could not get bucket instance info for bucket instance id=" << bucket_instance << dendl;
397 return;
398 }
399 } else { /* !bucket_name.empty() */
400 http_ret = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, NULL, NULL);
401 if (http_ret < 0) {
402 dout(5) << "could not get bucket info for bucket=" << bucket_name << dendl;
403 return;
404 }
405 }
406
407 bool truncated;
408 unsigned count = 0;
409 string err;
410
411 max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
412 if (!err.empty())
413 max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
414
415 send_response();
416 do {
417 list<rgw_bi_log_entry> entries;
418 int ret = store->list_bi_log_entries(bucket_info, shard_id,
419 marker, max_entries - count,
420 entries, &truncated);
421 if (ret < 0) {
422 dout(5) << "ERROR: list_bi_log_entries()" << dendl;
423 return;
424 }
425
426 count += entries.size();
427
428 send_response(entries, marker);
429 } while (truncated && count < max_entries);
430
431 send_response_end();
432 }
433
434 void RGWOp_BILog_List::send_response() {
435 if (sent_header)
436 return;
437
438 set_req_state_err(s, http_ret);
439 dump_errno(s);
440 end_header(s);
441
442 sent_header = true;
443
444 if (http_ret < 0)
445 return;
446
447 s->formatter->open_array_section("entries");
448 }
449
450 void RGWOp_BILog_List::send_response(list<rgw_bi_log_entry>& entries, string& marker)
451 {
452 for (list<rgw_bi_log_entry>::iterator iter = entries.begin(); iter != entries.end(); ++iter) {
453 rgw_bi_log_entry& entry = *iter;
454 encode_json("entry", entry, s->formatter);
455
456 marker = entry.id;
457 flusher.flush();
458 }
459 }
460
461 void RGWOp_BILog_List::send_response_end() {
462 s->formatter->close_section();
463 flusher.flush();
464 }
465
466 void RGWOp_BILog_Info::execute() {
467 string tenant_name = s->info.args.get("tenant"),
468 bucket_name = s->info.args.get("bucket"),
469 bucket_instance = s->info.args.get("bucket-instance");
470 RGWBucketInfo bucket_info;
471
472 RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
473
474 if (bucket_name.empty() && bucket_instance.empty()) {
475 dout(5) << "ERROR: neither bucket nor bucket instance specified" << dendl;
476 http_ret = -EINVAL;
477 return;
478 }
479
480 int shard_id;
481 http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
482 if (http_ret < 0) {
483 return;
484 }
485
486 if (!bucket_instance.empty()) {
487 http_ret = store->get_bucket_instance_info(obj_ctx, bucket_instance, bucket_info, NULL, NULL);
488 if (http_ret < 0) {
489 dout(5) << "could not get bucket instance info for bucket instance id=" << bucket_instance << dendl;
490 return;
491 }
492 } else { /* !bucket_name.empty() */
493 http_ret = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, NULL, NULL);
494 if (http_ret < 0) {
495 dout(5) << "could not get bucket info for bucket=" << bucket_name << dendl;
496 return;
497 }
498 }
499 map<RGWObjCategory, RGWStorageStats> stats;
500 int ret = store->get_bucket_stats(bucket_info, shard_id, &bucket_ver, &master_ver, stats, &max_marker);
501 if (ret < 0 && ret != -ENOENT) {
502 http_ret = ret;
503 return;
504 }
505 }
506
507 void RGWOp_BILog_Info::send_response() {
508 set_req_state_err(s, http_ret);
509 dump_errno(s);
510 end_header(s);
511
512 if (http_ret < 0)
513 return;
514
515 s->formatter->open_object_section("info");
516 encode_json("bucket_ver", bucket_ver, s->formatter);
517 encode_json("master_ver", master_ver, s->formatter);
518 encode_json("max_marker", max_marker, s->formatter);
519 s->formatter->close_section();
520
521 flusher.flush();
522 }
523
524 void RGWOp_BILog_Delete::execute() {
525 string tenant_name = s->info.args.get("tenant"),
526 bucket_name = s->info.args.get("bucket"),
527 start_marker = s->info.args.get("start-marker"),
528 end_marker = s->info.args.get("end-marker"),
529 bucket_instance = s->info.args.get("bucket-instance");
530
531 RGWBucketInfo bucket_info;
532
533 RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
534
535 http_ret = 0;
536 if ((bucket_name.empty() && bucket_instance.empty()) ||
537 end_marker.empty()) {
538 dout(5) << "ERROR: one of bucket and bucket instance, and also end-marker is mandatory" << dendl;
539 http_ret = -EINVAL;
540 return;
541 }
542
543 int shard_id;
544 http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
545 if (http_ret < 0) {
546 return;
547 }
548
549 if (!bucket_instance.empty()) {
550 http_ret = store->get_bucket_instance_info(obj_ctx, bucket_instance, bucket_info, NULL, NULL);
551 if (http_ret < 0) {
552 dout(5) << "could not get bucket instance info for bucket instance id=" << bucket_instance << dendl;
553 return;
554 }
555 } else { /* !bucket_name.empty() */
556 http_ret = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, NULL, NULL);
557 if (http_ret < 0) {
558 dout(5) << "could not get bucket info for bucket=" << bucket_name << dendl;
559 return;
560 }
561 }
562 http_ret = store->trim_bi_log_entries(bucket_info, shard_id, start_marker, end_marker);
563 if (http_ret < 0) {
564 dout(5) << "ERROR: trim_bi_log_entries() " << dendl;
565 }
566 return;
567 }
568
569 void RGWOp_DATALog_List::execute() {
570 string shard = s->info.args.get("id");
571
572 string st = s->info.args.get("start-time"),
573 et = s->info.args.get("end-time"),
574 max_entries_str = s->info.args.get("max-entries"),
575 marker = s->info.args.get("marker"),
576 err;
577 real_time ut_st,
578 ut_et;
579 unsigned shard_id, max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
580
581 s->info.args.get_bool("extra-info", &extra_info, false);
582
583 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
584 if (!err.empty()) {
585 dout(5) << "Error parsing shard_id " << shard << dendl;
586 http_ret = -EINVAL;
587 return;
588 }
589
590 if (parse_date_str(st, ut_st) < 0) {
591 http_ret = -EINVAL;
592 return;
593 }
594
595 if (parse_date_str(et, ut_et) < 0) {
596 http_ret = -EINVAL;
597 return;
598 }
599
600 if (!max_entries_str.empty()) {
601 max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
602 if (!err.empty()) {
603 dout(5) << "Error parsing max-entries " << max_entries_str << dendl;
604 http_ret = -EINVAL;
605 return;
606 }
607 }
608
609 do {
610 // Note that last_marker is updated to be the marker of the last
611 // entry listed
612 http_ret = store->data_log->list_entries(shard_id, ut_st, ut_et,
613 max_entries, entries, marker,
614 &last_marker, &truncated);
615 if (http_ret < 0)
616 break;
617
618 if (!max_entries_str.empty())
619 max_entries -= entries.size();
620 } while (truncated && (max_entries > 0));
621 }
622
623 void RGWOp_DATALog_List::send_response() {
624 set_req_state_err(s, http_ret);
625 dump_errno(s);
626 end_header(s);
627
628 if (http_ret < 0)
629 return;
630
631 s->formatter->open_object_section("log_entries");
632 s->formatter->dump_string("marker", last_marker);
633 s->formatter->dump_bool("truncated", truncated);
634 {
635 s->formatter->open_array_section("entries");
636 for (list<rgw_data_change_log_entry>::iterator iter = entries.begin();
637 iter != entries.end(); ++iter) {
638 rgw_data_change_log_entry& entry = *iter;
639 if (!extra_info) {
640 encode_json("entry", entry.entry, s->formatter);
641 } else {
642 encode_json("entry", entry, s->formatter);
643 }
644 flusher.flush();
645 }
646 s->formatter->close_section();
647 }
648 s->formatter->close_section();
649 flusher.flush();
650 }
651
652
653 void RGWOp_DATALog_Info::execute() {
654 num_objects = s->cct->_conf->rgw_data_log_num_shards;
655 http_ret = 0;
656 }
657
658 void RGWOp_DATALog_Info::send_response() {
659 set_req_state_err(s, http_ret);
660 dump_errno(s);
661 end_header(s);
662
663 s->formatter->open_object_section("num_objects");
664 s->formatter->dump_unsigned("num_objects", num_objects);
665 s->formatter->close_section();
666 flusher.flush();
667 }
668
669 void RGWOp_DATALog_ShardInfo::execute() {
670 string shard = s->info.args.get("id");
671 string err;
672
673 unsigned shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
674 if (!err.empty()) {
675 dout(5) << "Error parsing shard_id " << shard << dendl;
676 http_ret = -EINVAL;
677 return;
678 }
679
680 http_ret = store->data_log->get_info(shard_id, &info);
681 }
682
683 void RGWOp_DATALog_ShardInfo::send_response() {
684 set_req_state_err(s, http_ret);
685 dump_errno(s);
686 end_header(s);
687
688 encode_json("info", info, s->formatter);
689 flusher.flush();
690 }
691
692 void RGWOp_DATALog_Lock::execute() {
693 string shard_id_str, duration_str, locker_id, zone_id;
694 unsigned shard_id;
695
696 http_ret = 0;
697
698 shard_id_str = s->info.args.get("id");
699 duration_str = s->info.args.get("length");
700 locker_id = s->info.args.get("locker-id");
701 zone_id = s->info.args.get("zone-id");
702
703 if (shard_id_str.empty() ||
704 (duration_str.empty()) ||
705 locker_id.empty() ||
706 zone_id.empty()) {
707 dout(5) << "Error invalid parameter list" << dendl;
708 http_ret = -EINVAL;
709 return;
710 }
711
712 string err;
713 shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
714 if (!err.empty()) {
715 dout(5) << "Error parsing shard_id param " << shard_id_str << dendl;
716 http_ret = -EINVAL;
717 return;
718 }
719
720 unsigned dur;
721 dur = (unsigned)strict_strtol(duration_str.c_str(), 10, &err);
722 if (!err.empty() || dur <= 0) {
723 dout(5) << "invalid length param " << duration_str << dendl;
724 http_ret = -EINVAL;
725 return;
726 }
727 http_ret = store->data_log->lock_exclusive(shard_id, make_timespan(dur), zone_id, locker_id);
728 if (http_ret == -EBUSY)
729 http_ret = -ERR_LOCKED;
730 }
731
732 void RGWOp_DATALog_Unlock::execute() {
733 string shard_id_str, locker_id, zone_id;
734 unsigned shard_id;
735
736 http_ret = 0;
737
738 shard_id_str = s->info.args.get("id");
739 locker_id = s->info.args.get("locker-id");
740 zone_id = s->info.args.get("zone-id");
741
742 if (shard_id_str.empty() ||
743 locker_id.empty() ||
744 zone_id.empty()) {
745 dout(5) << "Error invalid parameter list" << dendl;
746 http_ret = -EINVAL;
747 return;
748 }
749
750 string err;
751 shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
752 if (!err.empty()) {
753 dout(5) << "Error parsing shard_id param " << shard_id_str << dendl;
754 http_ret = -EINVAL;
755 return;
756 }
757
758 http_ret = store->data_log->unlock(shard_id, zone_id, locker_id);
759 }
760
761 void RGWOp_DATALog_Notify::execute() {
762 string source_zone = s->info.args.get("source-zone");
763 char *data;
764 int len = 0;
765 #define LARGE_ENOUGH_BUF (128 * 1024)
766 int r = rgw_rest_read_all_input(s, &data, &len, LARGE_ENOUGH_BUF);
767 if (r < 0) {
768 http_ret = r;
769 return;
770 }
771
772 ldout(s->cct, 20) << __func__ << "(): read data: " << string(data, len) << dendl;
773
774 JSONParser p;
775 r = p.parse(data, len);
776 free(data);
777 if (r < 0) {
778 ldout(s->cct, 0) << "ERROR: failed to parse JSON" << dendl;
779 http_ret = r;
780 return;
781 }
782
783 map<int, set<string> > updated_shards;
784 try {
785 decode_json_obj(updated_shards, &p);
786 } catch (JSONDecoder::err& err) {
787 ldout(s->cct, 0) << "ERROR: failed to decode JSON" << dendl;
788 http_ret = -EINVAL;
789 return;
790 }
791
792 if (store->ctx()->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
793 for (map<int, set<string> >::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) {
794 ldout(s->cct, 20) << __func__ << "(): updated shard=" << iter->first << dendl;
795 set<string>& keys = iter->second;
796 for (set<string>::iterator kiter = keys.begin(); kiter != keys.end(); ++kiter) {
797 ldout(s->cct, 20) << __func__ << "(): modified key=" << *kiter << dendl;
798 }
799 }
800 }
801
802 store->wakeup_data_sync_shards(source_zone, updated_shards);
803
804 http_ret = 0;
805 }
806
807 void RGWOp_DATALog_Delete::execute() {
808 string st = s->info.args.get("start-time"),
809 et = s->info.args.get("end-time"),
810 start_marker = s->info.args.get("start-marker"),
811 end_marker = s->info.args.get("end-marker"),
812 shard = s->info.args.get("id"),
813 err;
814 real_time ut_st,
815 ut_et;
816 unsigned shard_id;
817
818 http_ret = 0;
819
820 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
821 if (!err.empty()) {
822 dout(5) << "Error parsing shard_id " << shard << dendl;
823 http_ret = -EINVAL;
824 return;
825 }
826 if (et.empty() && end_marker.empty()) { /* bounding end */
827 http_ret = -EINVAL;
828 return;
829 }
830
831 if (parse_date_str(st, ut_st) < 0) {
832 http_ret = -EINVAL;
833 return;
834 }
835
836 if (parse_date_str(et, ut_et) < 0) {
837 http_ret = -EINVAL;
838 return;
839 }
840
841 http_ret = store->data_log->trim_entries(shard_id, ut_st, ut_et, start_marker, end_marker);
842 }
843
844 // not in header to avoid pulling in rgw_sync.h
845 class RGWOp_MDLog_Status : public RGWRESTOp {
846 rgw_meta_sync_status status;
847 public:
848 int check_caps(RGWUserCaps& caps) override {
849 return caps.check_cap("mdlog", RGW_CAP_READ);
850 }
851 int verify_permission() override {
852 return check_caps(s->user->caps);
853 }
854 void execute() override;
855 void send_response() override;
856 const string name() override { return "get_metadata_log_status"; }
857 };
858
859 void RGWOp_MDLog_Status::execute()
860 {
861 auto sync = store->get_meta_sync_manager();
862 if (sync == nullptr) {
863 ldout(s->cct, 1) << "no sync manager" << dendl;
864 http_ret = -ENOENT;
865 return;
866 }
867 http_ret = sync->read_sync_status(&status);
868 }
869
870 void RGWOp_MDLog_Status::send_response()
871 {
872 set_req_state_err(s, http_ret);
873 dump_errno(s);
874 end_header(s);
875
876 if (http_ret >= 0) {
877 encode_json("status", status, s->formatter);
878 }
879 flusher.flush();
880 }
881
882 // not in header to avoid pulling in rgw_data_sync.h
883 class RGWOp_DATALog_Status : public RGWRESTOp {
884 rgw_data_sync_status status;
885 public:
886 int check_caps(RGWUserCaps& caps) override {
887 return caps.check_cap("datalog", RGW_CAP_READ);
888 }
889 int verify_permission() override {
890 return check_caps(s->user->caps);
891 }
892 void execute() override ;
893 void send_response() override;
894 const string name() override { return "get_data_changes_log_status"; }
895 };
896
897 void RGWOp_DATALog_Status::execute()
898 {
899 const auto source_zone = s->info.args.get("source-zone");
900 auto sync = store->get_data_sync_manager(source_zone);
901 if (sync == nullptr) {
902 ldout(s->cct, 1) << "no sync manager for source-zone " << source_zone << dendl;
903 http_ret = -ENOENT;
904 return;
905 }
906 http_ret = sync->read_sync_status(&status);
907 }
908
909 void RGWOp_DATALog_Status::send_response()
910 {
911 set_req_state_err(s, http_ret);
912 dump_errno(s);
913 end_header(s);
914
915 if (http_ret >= 0) {
916 encode_json("status", status, s->formatter);
917 }
918 flusher.flush();
919 }
920
921
922 RGWOp *RGWHandler_Log::op_get() {
923 bool exists;
924 string type = s->info.args.get("type", &exists);
925
926 if (!exists) {
927 return NULL;
928 }
929
930 if (type.compare("metadata") == 0) {
931 if (s->info.args.exists("id")) {
932 if (s->info.args.exists("info")) {
933 return new RGWOp_MDLog_ShardInfo;
934 } else {
935 return new RGWOp_MDLog_List;
936 }
937 } else if (s->info.args.exists("status")) {
938 return new RGWOp_MDLog_Status;
939 } else {
940 return new RGWOp_MDLog_Info;
941 }
942 } else if (type.compare("bucket-index") == 0) {
943 if (s->info.args.exists("info")) {
944 return new RGWOp_BILog_Info;
945 } else {
946 return new RGWOp_BILog_List;
947 }
948 } else if (type.compare("data") == 0) {
949 if (s->info.args.exists("id")) {
950 if (s->info.args.exists("info")) {
951 return new RGWOp_DATALog_ShardInfo;
952 } else {
953 return new RGWOp_DATALog_List;
954 }
955 } else if (s->info.args.exists("status")) {
956 return new RGWOp_DATALog_Status;
957 } else {
958 return new RGWOp_DATALog_Info;
959 }
960 }
961 return NULL;
962 }
963
964 RGWOp *RGWHandler_Log::op_delete() {
965 bool exists;
966 string type = s->info.args.get("type", &exists);
967
968 if (!exists) {
969 return NULL;
970 }
971
972 if (type.compare("metadata") == 0)
973 return new RGWOp_MDLog_Delete;
974 else if (type.compare("bucket-index") == 0)
975 return new RGWOp_BILog_Delete;
976 else if (type.compare("data") == 0)
977 return new RGWOp_DATALog_Delete;
978 return NULL;
979 }
980
981 RGWOp *RGWHandler_Log::op_post() {
982 bool exists;
983 string type = s->info.args.get("type", &exists);
984
985 if (!exists) {
986 return NULL;
987 }
988
989 if (type.compare("metadata") == 0) {
990 if (s->info.args.exists("lock"))
991 return new RGWOp_MDLog_Lock;
992 else if (s->info.args.exists("unlock"))
993 return new RGWOp_MDLog_Unlock;
994 else if (s->info.args.exists("notify"))
995 return new RGWOp_MDLog_Notify;
996 } else if (type.compare("data") == 0) {
997 if (s->info.args.exists("lock"))
998 return new RGWOp_DATALog_Lock;
999 else if (s->info.args.exists("unlock"))
1000 return new RGWOp_DATALog_Unlock;
1001 else if (s->info.args.exists("notify"))
1002 return new RGWOp_DATALog_Notify;
1003 }
1004 return NULL;
1005 }
1006