]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_rest_log.cc
update sources to 12.2.7
[ceph.git] / ceph / src / rgw / rgw_rest_log.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14#include "common/ceph_json.h"
15#include "common/strtol.h"
16#include "rgw_rest.h"
17#include "rgw_op.h"
18#include "rgw_rest_s3.h"
19#include "rgw_rest_log.h"
20#include "rgw_client_io.h"
21#include "rgw_sync.h"
22#include "rgw_data_sync.h"
31f18b77 23#include "rgw_common.h"
7c673cae
FG
24#include "common/errno.h"
25#include "include/assert.h"
26
27#define dout_context g_ceph_context
28#define LOG_CLASS_LIST_MAX_ENTRIES (1000)
29#define dout_subsys ceph_subsys_rgw
30
31static int parse_date_str(string& in, real_time& out) {
32 uint64_t epoch = 0;
33 uint64_t nsec = 0;
34
35 if (!in.empty()) {
36 if (utime_t::parse_date(in, &epoch, &nsec) < 0) {
37 dout(5) << "Error parsing date " << in << dendl;
38 return -EINVAL;
39 }
40 }
41 out = utime_t(epoch, nsec).to_real_time();
42 return 0;
43}
44
45void RGWOp_MDLog_List::execute() {
46 string period = s->info.args.get("period");
47 string shard = s->info.args.get("id");
48 string max_entries_str = s->info.args.get("max-entries");
49 string st = s->info.args.get("start-time"),
50 et = s->info.args.get("end-time"),
51 marker = s->info.args.get("marker"),
52 err;
53 real_time ut_st,
54 ut_et;
55 void *handle;
56 unsigned shard_id, max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
57
58 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
59 if (!err.empty()) {
60 dout(5) << "Error parsing shard_id " << shard << dendl;
61 http_ret = -EINVAL;
62 return;
63 }
64
65 if (parse_date_str(st, ut_st) < 0) {
66 http_ret = -EINVAL;
67 return;
68 }
69
70 if (parse_date_str(et, ut_et) < 0) {
71 http_ret = -EINVAL;
72 return;
73 }
74
75 if (!max_entries_str.empty()) {
76 max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
77 if (!err.empty()) {
78 dout(5) << "Error parsing max-entries " << max_entries_str << dendl;
79 http_ret = -EINVAL;
80 return;
81 }
224ce89b
WB
82 if (max_entries > LOG_CLASS_LIST_MAX_ENTRIES) {
83 max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
84 }
7c673cae
FG
85 }
86
87 if (period.empty()) {
88 ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
89 period = store->get_current_period_id();
90 if (period.empty()) {
91 ldout(s->cct, 5) << "Missing period id" << dendl;
92 http_ret = -EINVAL;
93 return;
94 }
95 }
96
97 RGWMetadataLog meta_log{s->cct, store, period};
98
99 meta_log.init_list_entries(shard_id, ut_st, ut_et, marker, &handle);
100
224ce89b
WB
101 http_ret = meta_log.list_entries(handle, max_entries, entries,
102 &last_marker, &truncated);
7c673cae
FG
103
104 meta_log.complete_list_entries(handle);
105}
106
107void RGWOp_MDLog_List::send_response() {
108 set_req_state_err(s, http_ret);
109 dump_errno(s);
110 end_header(s);
111
112 if (http_ret < 0)
113 return;
114
115 s->formatter->open_object_section("log_entries");
116 s->formatter->dump_string("marker", last_marker);
117 s->formatter->dump_bool("truncated", truncated);
118 {
119 s->formatter->open_array_section("entries");
120 for (list<cls_log_entry>::iterator iter = entries.begin();
121 iter != entries.end(); ++iter) {
122 cls_log_entry& entry = *iter;
123 store->meta_mgr->dump_log_entry(entry, s->formatter);
124 flusher.flush();
125 }
126 s->formatter->close_section();
127 }
128 s->formatter->close_section();
129 flusher.flush();
130}
131
132void RGWOp_MDLog_Info::execute() {
133 num_objects = s->cct->_conf->rgw_md_log_max_shards;
134 period = store->meta_mgr->read_oldest_log_period();
135 http_ret = period.get_error();
136}
137
138void RGWOp_MDLog_Info::send_response() {
139 set_req_state_err(s, http_ret);
140 dump_errno(s);
141 end_header(s);
142
143 s->formatter->open_object_section("mdlog");
144 s->formatter->dump_unsigned("num_objects", num_objects);
145 if (period) {
146 s->formatter->dump_string("period", period.get_period().get_id());
147 s->formatter->dump_unsigned("realm_epoch", period.get_epoch());
148 }
149 s->formatter->close_section();
150 flusher.flush();
151}
152
153void RGWOp_MDLog_ShardInfo::execute() {
154 string period = s->info.args.get("period");
155 string shard = s->info.args.get("id");
156 string err;
157
158 unsigned shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
159 if (!err.empty()) {
160 dout(5) << "Error parsing shard_id " << shard << dendl;
161 http_ret = -EINVAL;
162 return;
163 }
164
165 if (period.empty()) {
166 ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
167 period = store->get_current_period_id();
168
169 if (period.empty()) {
170 ldout(s->cct, 5) << "Missing period id" << dendl;
171 http_ret = -EINVAL;
172 return;
173 }
174 }
175 RGWMetadataLog meta_log{s->cct, store, period};
176
177 http_ret = meta_log.get_info(shard_id, &info);
178}
179
180void RGWOp_MDLog_ShardInfo::send_response() {
181 set_req_state_err(s, http_ret);
182 dump_errno(s);
183 end_header(s);
184
185 encode_json("info", info, s->formatter);
186 flusher.flush();
187}
188
189void RGWOp_MDLog_Delete::execute() {
190 string st = s->info.args.get("start-time"),
191 et = s->info.args.get("end-time"),
192 start_marker = s->info.args.get("start-marker"),
193 end_marker = s->info.args.get("end-marker"),
194 period = s->info.args.get("period"),
195 shard = s->info.args.get("id"),
196 err;
197 real_time ut_st,
198 ut_et;
199 unsigned shard_id;
200
201 http_ret = 0;
202
203 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
204 if (!err.empty()) {
205 dout(5) << "Error parsing shard_id " << shard << dendl;
206 http_ret = -EINVAL;
207 return;
208 }
209 if (et.empty() && end_marker.empty()) { /* bounding end */
210 http_ret = -EINVAL;
211 return;
212 }
213
214 if (parse_date_str(st, ut_st) < 0) {
215 http_ret = -EINVAL;
216 return;
217 }
218
219 if (parse_date_str(et, ut_et) < 0) {
220 http_ret = -EINVAL;
221 return;
222 }
223
224 if (period.empty()) {
225 ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
226 period = store->get_current_period_id();
227
228 if (period.empty()) {
229 ldout(s->cct, 5) << "Missing period id" << dendl;
230 http_ret = -EINVAL;
231 return;
232 }
233 }
234 RGWMetadataLog meta_log{s->cct, store, period};
235
236 http_ret = meta_log.trim(shard_id, ut_st, ut_et, start_marker, end_marker);
237}
238
239void RGWOp_MDLog_Lock::execute() {
240 string period, shard_id_str, duration_str, locker_id, zone_id;
241 unsigned shard_id;
242
243 http_ret = 0;
244
245 period = s->info.args.get("period");
246 shard_id_str = s->info.args.get("id");
247 duration_str = s->info.args.get("length");
248 locker_id = s->info.args.get("locker-id");
249 zone_id = s->info.args.get("zone-id");
250
251 if (period.empty()) {
252 ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
253 period = store->get_current_period_id();
254 }
255
256 if (period.empty() ||
257 shard_id_str.empty() ||
258 (duration_str.empty()) ||
259 locker_id.empty() ||
260 zone_id.empty()) {
261 dout(5) << "Error invalid parameter list" << dendl;
262 http_ret = -EINVAL;
263 return;
264 }
265
266 string err;
267 shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
268 if (!err.empty()) {
269 dout(5) << "Error parsing shard_id param " << shard_id_str << dendl;
270 http_ret = -EINVAL;
271 return;
272 }
273
274 RGWMetadataLog meta_log{s->cct, store, period};
275 unsigned dur;
276 dur = (unsigned)strict_strtol(duration_str.c_str(), 10, &err);
277 if (!err.empty() || dur <= 0) {
278 dout(5) << "invalid length param " << duration_str << dendl;
279 http_ret = -EINVAL;
280 return;
281 }
282 http_ret = meta_log.lock_exclusive(shard_id, make_timespan(dur), zone_id,
283 locker_id);
284 if (http_ret == -EBUSY)
285 http_ret = -ERR_LOCKED;
286}
287
288void RGWOp_MDLog_Unlock::execute() {
289 string period, shard_id_str, locker_id, zone_id;
290 unsigned shard_id;
291
292 http_ret = 0;
293
294 period = s->info.args.get("period");
295 shard_id_str = s->info.args.get("id");
296 locker_id = s->info.args.get("locker-id");
297 zone_id = s->info.args.get("zone-id");
298
299 if (period.empty()) {
300 ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
301 period = store->get_current_period_id();
302 }
303
304 if (period.empty() ||
305 shard_id_str.empty() ||
306 locker_id.empty() ||
307 zone_id.empty()) {
308 dout(5) << "Error invalid parameter list" << dendl;
309 http_ret = -EINVAL;
310 return;
311 }
312
313 string err;
314 shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
315 if (!err.empty()) {
316 dout(5) << "Error parsing shard_id param " << shard_id_str << dendl;
317 http_ret = -EINVAL;
318 return;
319 }
320
321 RGWMetadataLog meta_log{s->cct, store, period};
322 http_ret = meta_log.unlock(shard_id, zone_id, locker_id);
323}
324
325void RGWOp_MDLog_Notify::execute() {
326 char *data;
327 int len = 0;
328#define LARGE_ENOUGH_BUF (128 * 1024)
329 int r = rgw_rest_read_all_input(s, &data, &len, LARGE_ENOUGH_BUF);
330 if (r < 0) {
331 http_ret = r;
332 return;
333 }
334
335 ldout(s->cct, 20) << __func__ << "(): read data: " << string(data, len) << dendl;
336
337 JSONParser p;
338 r = p.parse(data, len);
339 free(data);
340 if (r < 0) {
341 ldout(s->cct, 0) << "ERROR: failed to parse JSON" << dendl;
342 http_ret = r;
343 return;
344 }
345
346 set<int> updated_shards;
347 try {
348 decode_json_obj(updated_shards, &p);
349 } catch (JSONDecoder::err& err) {
350 ldout(s->cct, 0) << "ERROR: failed to decode JSON" << dendl;
351 http_ret = -EINVAL;
352 return;
353 }
354
355 if (store->ctx()->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
356 for (set<int>::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) {
357 ldout(s->cct, 20) << __func__ << "(): updated shard=" << *iter << dendl;
358 }
359 }
360
361 store->wakeup_meta_sync_shards(updated_shards);
362
363 http_ret = 0;
364}
365
366void RGWOp_BILog_List::execute() {
367 string tenant_name = s->info.args.get("tenant"),
368 bucket_name = s->info.args.get("bucket"),
369 marker = s->info.args.get("marker"),
370 max_entries_str = s->info.args.get("max-entries"),
371 bucket_instance = s->info.args.get("bucket-instance");
372 RGWBucketInfo bucket_info;
373 unsigned max_entries;
374
375 RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
376
377 if (bucket_name.empty() && bucket_instance.empty()) {
378 dout(5) << "ERROR: neither bucket nor bucket instance specified" << dendl;
379 http_ret = -EINVAL;
380 return;
381 }
382
383 int shard_id;
384 http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
385 if (http_ret < 0) {
386 return;
387 }
388
389 if (!bucket_instance.empty()) {
390 http_ret = store->get_bucket_instance_info(obj_ctx, bucket_instance, bucket_info, NULL, NULL);
391 if (http_ret < 0) {
392 dout(5) << "could not get bucket instance info for bucket instance id=" << bucket_instance << dendl;
393 return;
394 }
395 } else { /* !bucket_name.empty() */
396 http_ret = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, NULL, NULL);
397 if (http_ret < 0) {
398 dout(5) << "could not get bucket info for bucket=" << bucket_name << dendl;
399 return;
400 }
401 }
402
403 bool truncated;
404 unsigned count = 0;
405 string err;
406
407 max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
408 if (!err.empty())
409 max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
410
411 send_response();
412 do {
413 list<rgw_bi_log_entry> entries;
414 int ret = store->list_bi_log_entries(bucket_info, shard_id,
415 marker, max_entries - count,
416 entries, &truncated);
417 if (ret < 0) {
418 dout(5) << "ERROR: list_bi_log_entries()" << dendl;
419 return;
420 }
421
422 count += entries.size();
423
424 send_response(entries, marker);
425 } while (truncated && count < max_entries);
426
427 send_response_end();
428}
429
430void RGWOp_BILog_List::send_response() {
431 if (sent_header)
432 return;
433
434 set_req_state_err(s, http_ret);
435 dump_errno(s);
436 end_header(s);
437
438 sent_header = true;
439
440 if (http_ret < 0)
441 return;
442
443 s->formatter->open_array_section("entries");
444}
445
446void RGWOp_BILog_List::send_response(list<rgw_bi_log_entry>& entries, string& marker)
447{
448 for (list<rgw_bi_log_entry>::iterator iter = entries.begin(); iter != entries.end(); ++iter) {
449 rgw_bi_log_entry& entry = *iter;
450 encode_json("entry", entry, s->formatter);
451
452 marker = entry.id;
453 flusher.flush();
454 }
455}
456
457void RGWOp_BILog_List::send_response_end() {
458 s->formatter->close_section();
459 flusher.flush();
460}
461
462void RGWOp_BILog_Info::execute() {
463 string tenant_name = s->info.args.get("tenant"),
464 bucket_name = s->info.args.get("bucket"),
465 bucket_instance = s->info.args.get("bucket-instance");
466 RGWBucketInfo bucket_info;
467
468 RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
469
470 if (bucket_name.empty() && bucket_instance.empty()) {
471 dout(5) << "ERROR: neither bucket nor bucket instance specified" << dendl;
472 http_ret = -EINVAL;
473 return;
474 }
475
476 int shard_id;
477 http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
478 if (http_ret < 0) {
479 return;
480 }
481
482 if (!bucket_instance.empty()) {
483 http_ret = store->get_bucket_instance_info(obj_ctx, bucket_instance, bucket_info, NULL, NULL);
484 if (http_ret < 0) {
485 dout(5) << "could not get bucket instance info for bucket instance id=" << bucket_instance << dendl;
486 return;
487 }
488 } else { /* !bucket_name.empty() */
489 http_ret = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, NULL, NULL);
490 if (http_ret < 0) {
491 dout(5) << "could not get bucket info for bucket=" << bucket_name << dendl;
492 return;
493 }
494 }
495 map<RGWObjCategory, RGWStorageStats> stats;
c07f9fc5 496 int ret = store->get_bucket_stats(bucket_info, shard_id, &bucket_ver, &master_ver, stats, &max_marker, &syncstopped);
7c673cae
FG
497 if (ret < 0 && ret != -ENOENT) {
498 http_ret = ret;
499 return;
500 }
501}
502
503void RGWOp_BILog_Info::send_response() {
504 set_req_state_err(s, http_ret);
505 dump_errno(s);
506 end_header(s);
507
508 if (http_ret < 0)
509 return;
510
511 s->formatter->open_object_section("info");
512 encode_json("bucket_ver", bucket_ver, s->formatter);
513 encode_json("master_ver", master_ver, s->formatter);
514 encode_json("max_marker", max_marker, s->formatter);
c07f9fc5 515 encode_json("syncstopped", syncstopped, s->formatter);
7c673cae
FG
516 s->formatter->close_section();
517
518 flusher.flush();
519}
520
521void RGWOp_BILog_Delete::execute() {
522 string tenant_name = s->info.args.get("tenant"),
523 bucket_name = s->info.args.get("bucket"),
524 start_marker = s->info.args.get("start-marker"),
525 end_marker = s->info.args.get("end-marker"),
526 bucket_instance = s->info.args.get("bucket-instance");
527
528 RGWBucketInfo bucket_info;
529
530 RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
531
532 http_ret = 0;
533 if ((bucket_name.empty() && bucket_instance.empty()) ||
534 end_marker.empty()) {
535 dout(5) << "ERROR: one of bucket and bucket instance, and also end-marker is mandatory" << dendl;
536 http_ret = -EINVAL;
537 return;
538 }
539
540 int shard_id;
541 http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
542 if (http_ret < 0) {
543 return;
544 }
545
546 if (!bucket_instance.empty()) {
547 http_ret = store->get_bucket_instance_info(obj_ctx, bucket_instance, bucket_info, NULL, NULL);
548 if (http_ret < 0) {
549 dout(5) << "could not get bucket instance info for bucket instance id=" << bucket_instance << dendl;
550 return;
551 }
552 } else { /* !bucket_name.empty() */
553 http_ret = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, NULL, NULL);
554 if (http_ret < 0) {
555 dout(5) << "could not get bucket info for bucket=" << bucket_name << dendl;
556 return;
557 }
558 }
559 http_ret = store->trim_bi_log_entries(bucket_info, shard_id, start_marker, end_marker);
560 if (http_ret < 0) {
561 dout(5) << "ERROR: trim_bi_log_entries() " << dendl;
562 }
563 return;
564}
565
566void RGWOp_DATALog_List::execute() {
567 string shard = s->info.args.get("id");
568
569 string st = s->info.args.get("start-time"),
570 et = s->info.args.get("end-time"),
571 max_entries_str = s->info.args.get("max-entries"),
572 marker = s->info.args.get("marker"),
573 err;
574 real_time ut_st,
575 ut_et;
576 unsigned shard_id, max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
577
578 s->info.args.get_bool("extra-info", &extra_info, false);
579
580 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
581 if (!err.empty()) {
582 dout(5) << "Error parsing shard_id " << shard << dendl;
583 http_ret = -EINVAL;
584 return;
585 }
586
587 if (parse_date_str(st, ut_st) < 0) {
588 http_ret = -EINVAL;
589 return;
590 }
591
592 if (parse_date_str(et, ut_et) < 0) {
593 http_ret = -EINVAL;
594 return;
595 }
596
597 if (!max_entries_str.empty()) {
598 max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
599 if (!err.empty()) {
600 dout(5) << "Error parsing max-entries " << max_entries_str << dendl;
601 http_ret = -EINVAL;
602 return;
603 }
224ce89b
WB
604 if (max_entries > LOG_CLASS_LIST_MAX_ENTRIES) {
605 max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
606 }
607 }
608
609 // Note that last_marker is updated to be the marker of the last
610 // entry listed
611 http_ret = store->data_log->list_entries(shard_id, ut_st, ut_et,
612 max_entries, entries, marker,
613 &last_marker, &truncated);
7c673cae
FG
614}
615
616void RGWOp_DATALog_List::send_response() {
617 set_req_state_err(s, http_ret);
618 dump_errno(s);
619 end_header(s);
620
621 if (http_ret < 0)
622 return;
623
624 s->formatter->open_object_section("log_entries");
625 s->formatter->dump_string("marker", last_marker);
626 s->formatter->dump_bool("truncated", truncated);
627 {
628 s->formatter->open_array_section("entries");
629 for (list<rgw_data_change_log_entry>::iterator iter = entries.begin();
630 iter != entries.end(); ++iter) {
631 rgw_data_change_log_entry& entry = *iter;
632 if (!extra_info) {
633 encode_json("entry", entry.entry, s->formatter);
634 } else {
635 encode_json("entry", entry, s->formatter);
636 }
637 flusher.flush();
638 }
639 s->formatter->close_section();
640 }
641 s->formatter->close_section();
642 flusher.flush();
643}
644
645
646void RGWOp_DATALog_Info::execute() {
647 num_objects = s->cct->_conf->rgw_data_log_num_shards;
648 http_ret = 0;
649}
650
651void RGWOp_DATALog_Info::send_response() {
652 set_req_state_err(s, http_ret);
653 dump_errno(s);
654 end_header(s);
655
656 s->formatter->open_object_section("num_objects");
657 s->formatter->dump_unsigned("num_objects", num_objects);
658 s->formatter->close_section();
659 flusher.flush();
660}
661
662void RGWOp_DATALog_ShardInfo::execute() {
663 string shard = s->info.args.get("id");
664 string err;
665
666 unsigned shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
667 if (!err.empty()) {
668 dout(5) << "Error parsing shard_id " << shard << dendl;
669 http_ret = -EINVAL;
670 return;
671 }
672
673 http_ret = store->data_log->get_info(shard_id, &info);
674}
675
676void RGWOp_DATALog_ShardInfo::send_response() {
677 set_req_state_err(s, http_ret);
678 dump_errno(s);
679 end_header(s);
680
681 encode_json("info", info, s->formatter);
682 flusher.flush();
683}
684
685void RGWOp_DATALog_Lock::execute() {
686 string shard_id_str, duration_str, locker_id, zone_id;
687 unsigned shard_id;
688
689 http_ret = 0;
690
691 shard_id_str = s->info.args.get("id");
692 duration_str = s->info.args.get("length");
693 locker_id = s->info.args.get("locker-id");
694 zone_id = s->info.args.get("zone-id");
695
696 if (shard_id_str.empty() ||
697 (duration_str.empty()) ||
698 locker_id.empty() ||
699 zone_id.empty()) {
700 dout(5) << "Error invalid parameter list" << dendl;
701 http_ret = -EINVAL;
702 return;
703 }
704
705 string err;
706 shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
707 if (!err.empty()) {
708 dout(5) << "Error parsing shard_id param " << shard_id_str << dendl;
709 http_ret = -EINVAL;
710 return;
711 }
712
713 unsigned dur;
714 dur = (unsigned)strict_strtol(duration_str.c_str(), 10, &err);
715 if (!err.empty() || dur <= 0) {
716 dout(5) << "invalid length param " << duration_str << dendl;
717 http_ret = -EINVAL;
718 return;
719 }
720 http_ret = store->data_log->lock_exclusive(shard_id, make_timespan(dur), zone_id, locker_id);
721 if (http_ret == -EBUSY)
722 http_ret = -ERR_LOCKED;
723}
724
725void RGWOp_DATALog_Unlock::execute() {
726 string shard_id_str, locker_id, zone_id;
727 unsigned shard_id;
728
729 http_ret = 0;
730
731 shard_id_str = s->info.args.get("id");
732 locker_id = s->info.args.get("locker-id");
733 zone_id = s->info.args.get("zone-id");
734
735 if (shard_id_str.empty() ||
736 locker_id.empty() ||
737 zone_id.empty()) {
738 dout(5) << "Error invalid parameter list" << dendl;
739 http_ret = -EINVAL;
740 return;
741 }
742
743 string err;
744 shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
745 if (!err.empty()) {
746 dout(5) << "Error parsing shard_id param " << shard_id_str << dendl;
747 http_ret = -EINVAL;
748 return;
749 }
750
751 http_ret = store->data_log->unlock(shard_id, zone_id, locker_id);
752}
753
754void RGWOp_DATALog_Notify::execute() {
755 string source_zone = s->info.args.get("source-zone");
756 char *data;
757 int len = 0;
758#define LARGE_ENOUGH_BUF (128 * 1024)
759 int r = rgw_rest_read_all_input(s, &data, &len, LARGE_ENOUGH_BUF);
760 if (r < 0) {
761 http_ret = r;
762 return;
763 }
764
765 ldout(s->cct, 20) << __func__ << "(): read data: " << string(data, len) << dendl;
766
767 JSONParser p;
768 r = p.parse(data, len);
769 free(data);
770 if (r < 0) {
771 ldout(s->cct, 0) << "ERROR: failed to parse JSON" << dendl;
772 http_ret = r;
773 return;
774 }
775
776 map<int, set<string> > updated_shards;
777 try {
778 decode_json_obj(updated_shards, &p);
779 } catch (JSONDecoder::err& err) {
780 ldout(s->cct, 0) << "ERROR: failed to decode JSON" << dendl;
781 http_ret = -EINVAL;
782 return;
783 }
784
785 if (store->ctx()->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
786 for (map<int, set<string> >::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) {
787 ldout(s->cct, 20) << __func__ << "(): updated shard=" << iter->first << dendl;
788 set<string>& keys = iter->second;
789 for (set<string>::iterator kiter = keys.begin(); kiter != keys.end(); ++kiter) {
790 ldout(s->cct, 20) << __func__ << "(): modified key=" << *kiter << dendl;
791 }
792 }
793 }
794
795 store->wakeup_data_sync_shards(source_zone, updated_shards);
796
797 http_ret = 0;
798}
799
800void RGWOp_DATALog_Delete::execute() {
801 string st = s->info.args.get("start-time"),
802 et = s->info.args.get("end-time"),
803 start_marker = s->info.args.get("start-marker"),
804 end_marker = s->info.args.get("end-marker"),
805 shard = s->info.args.get("id"),
806 err;
807 real_time ut_st,
808 ut_et;
809 unsigned shard_id;
810
811 http_ret = 0;
812
813 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
814 if (!err.empty()) {
815 dout(5) << "Error parsing shard_id " << shard << dendl;
816 http_ret = -EINVAL;
817 return;
818 }
819 if (et.empty() && end_marker.empty()) { /* bounding end */
820 http_ret = -EINVAL;
821 return;
822 }
823
824 if (parse_date_str(st, ut_st) < 0) {
825 http_ret = -EINVAL;
826 return;
827 }
828
829 if (parse_date_str(et, ut_et) < 0) {
830 http_ret = -EINVAL;
831 return;
832 }
833
834 http_ret = store->data_log->trim_entries(shard_id, ut_st, ut_et, start_marker, end_marker);
835}
836
837// not in header to avoid pulling in rgw_sync.h
838class RGWOp_MDLog_Status : public RGWRESTOp {
839 rgw_meta_sync_status status;
840public:
841 int check_caps(RGWUserCaps& caps) override {
842 return caps.check_cap("mdlog", RGW_CAP_READ);
843 }
844 int verify_permission() override {
845 return check_caps(s->user->caps);
846 }
847 void execute() override;
848 void send_response() override;
849 const string name() override { return "get_metadata_log_status"; }
850};
851
852void RGWOp_MDLog_Status::execute()
853{
854 auto sync = store->get_meta_sync_manager();
855 if (sync == nullptr) {
856 ldout(s->cct, 1) << "no sync manager" << dendl;
857 http_ret = -ENOENT;
858 return;
859 }
860 http_ret = sync->read_sync_status(&status);
861}
862
863void RGWOp_MDLog_Status::send_response()
864{
865 set_req_state_err(s, http_ret);
866 dump_errno(s);
867 end_header(s);
868
869 if (http_ret >= 0) {
870 encode_json("status", status, s->formatter);
871 }
872 flusher.flush();
873}
874
b32b8144
FG
875// not in header to avoid pulling in rgw_data_sync.h
876class RGWOp_BILog_Status : public RGWRESTOp {
877 std::vector<rgw_bucket_shard_sync_info> status;
878public:
879 int check_caps(RGWUserCaps& caps) override {
880 return caps.check_cap("bilog", RGW_CAP_READ);
881 }
882 int verify_permission() override {
883 return check_caps(s->user->caps);
884 }
885 void execute() override;
886 void send_response() override;
887 const string name() override { return "get_bucket_index_log_status"; }
888};
889
890void RGWOp_BILog_Status::execute()
891{
892 const auto source_zone = s->info.args.get("source-zone");
893 const auto key = s->info.args.get("bucket");
894 if (key.empty()) {
895 ldout(s->cct, 4) << "no 'bucket' provided" << dendl;
896 http_ret = -EINVAL;
897 return;
898 }
899
900 rgw_bucket bucket;
901 int shard_id{-1}; // unused
902 http_ret = rgw_bucket_parse_bucket_key(s->cct, key, &bucket, &shard_id);
903 if (http_ret < 0) {
904 ldout(s->cct, 4) << "no 'bucket' provided" << dendl;
905 http_ret = -EINVAL;
906 return;
907 }
908
28e407b8
AA
909 // read the bucket instance info for num_shards
910 RGWObjectCtx ctx(store);
911 RGWBucketInfo info;
912 http_ret = store->get_bucket_instance_info(ctx, bucket, info, nullptr, nullptr);
913 if (http_ret < 0) {
914 ldout(s->cct, 4) << "failed to read bucket info: " << cpp_strerror(http_ret) << dendl;
915 return;
916 }
917 http_ret = rgw_bucket_sync_status(store, source_zone, info, &status);
b32b8144
FG
918}
919
920void RGWOp_BILog_Status::send_response()
921{
922 set_req_state_err(s, http_ret);
923 dump_errno(s);
924 end_header(s);
925
926 if (http_ret >= 0) {
927 encode_json("status", status, s->formatter);
928 }
929 flusher.flush();
930}
931
7c673cae
FG
932// not in header to avoid pulling in rgw_data_sync.h
933class RGWOp_DATALog_Status : public RGWRESTOp {
934 rgw_data_sync_status status;
935public:
936 int check_caps(RGWUserCaps& caps) override {
937 return caps.check_cap("datalog", RGW_CAP_READ);
938 }
939 int verify_permission() override {
940 return check_caps(s->user->caps);
941 }
942 void execute() override ;
943 void send_response() override;
944 const string name() override { return "get_data_changes_log_status"; }
945};
946
947void RGWOp_DATALog_Status::execute()
948{
949 const auto source_zone = s->info.args.get("source-zone");
950 auto sync = store->get_data_sync_manager(source_zone);
951 if (sync == nullptr) {
952 ldout(s->cct, 1) << "no sync manager for source-zone " << source_zone << dendl;
953 http_ret = -ENOENT;
954 return;
955 }
956 http_ret = sync->read_sync_status(&status);
957}
958
959void RGWOp_DATALog_Status::send_response()
960{
961 set_req_state_err(s, http_ret);
962 dump_errno(s);
963 end_header(s);
964
965 if (http_ret >= 0) {
966 encode_json("status", status, s->formatter);
967 }
968 flusher.flush();
969}
970
971
972RGWOp *RGWHandler_Log::op_get() {
973 bool exists;
974 string type = s->info.args.get("type", &exists);
975
976 if (!exists) {
977 return NULL;
978 }
979
980 if (type.compare("metadata") == 0) {
981 if (s->info.args.exists("id")) {
982 if (s->info.args.exists("info")) {
983 return new RGWOp_MDLog_ShardInfo;
984 } else {
985 return new RGWOp_MDLog_List;
986 }
987 } else if (s->info.args.exists("status")) {
988 return new RGWOp_MDLog_Status;
989 } else {
990 return new RGWOp_MDLog_Info;
991 }
992 } else if (type.compare("bucket-index") == 0) {
993 if (s->info.args.exists("info")) {
994 return new RGWOp_BILog_Info;
b32b8144
FG
995 } else if (s->info.args.exists("status")) {
996 return new RGWOp_BILog_Status;
7c673cae
FG
997 } else {
998 return new RGWOp_BILog_List;
999 }
1000 } else if (type.compare("data") == 0) {
1001 if (s->info.args.exists("id")) {
1002 if (s->info.args.exists("info")) {
1003 return new RGWOp_DATALog_ShardInfo;
1004 } else {
1005 return new RGWOp_DATALog_List;
1006 }
1007 } else if (s->info.args.exists("status")) {
1008 return new RGWOp_DATALog_Status;
1009 } else {
1010 return new RGWOp_DATALog_Info;
1011 }
1012 }
1013 return NULL;
1014}
1015
1016RGWOp *RGWHandler_Log::op_delete() {
1017 bool exists;
1018 string type = s->info.args.get("type", &exists);
1019
1020 if (!exists) {
1021 return NULL;
1022 }
1023
1024 if (type.compare("metadata") == 0)
1025 return new RGWOp_MDLog_Delete;
1026 else if (type.compare("bucket-index") == 0)
1027 return new RGWOp_BILog_Delete;
1028 else if (type.compare("data") == 0)
1029 return new RGWOp_DATALog_Delete;
1030 return NULL;
1031}
1032
1033RGWOp *RGWHandler_Log::op_post() {
1034 bool exists;
1035 string type = s->info.args.get("type", &exists);
1036
1037 if (!exists) {
1038 return NULL;
1039 }
1040
1041 if (type.compare("metadata") == 0) {
1042 if (s->info.args.exists("lock"))
1043 return new RGWOp_MDLog_Lock;
1044 else if (s->info.args.exists("unlock"))
1045 return new RGWOp_MDLog_Unlock;
1046 else if (s->info.args.exists("notify"))
1047 return new RGWOp_MDLog_Notify;
1048 } else if (type.compare("data") == 0) {
1049 if (s->info.args.exists("lock"))
1050 return new RGWOp_DATALog_Lock;
1051 else if (s->info.args.exists("unlock"))
1052 return new RGWOp_DATALog_Unlock;
1053 else if (s->info.args.exists("notify"))
1054 return new RGWOp_DATALog_Notify;
1055 }
1056 return NULL;
1057}
1058