]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_rest_log.cc
import ceph 15.2.10
[ceph.git] / ceph / src / rgw / rgw_rest_log.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 /*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #include "common/ceph_json.h"
17 #include "common/strtol.h"
18 #include "rgw_rest.h"
19 #include "rgw_op.h"
20 #include "rgw_rest_s3.h"
21 #include "rgw_rest_log.h"
22 #include "rgw_client_io.h"
23 #include "rgw_sync.h"
24 #include "rgw_data_sync.h"
25 #include "rgw_common.h"
26 #include "rgw_zone.h"
27 #include "rgw_mdlog.h"
28
29 #include "services/svc_zone.h"
30 #include "services/svc_mdlog.h"
31 #include "services/svc_bilog_rados.h"
32 #include "services/svc_datalog_rados.h"
33
34 #include "common/errno.h"
35 #include "include/ceph_assert.h"
36
37 #define dout_context g_ceph_context
38 #define LOG_CLASS_LIST_MAX_ENTRIES (1000)
39 #define dout_subsys ceph_subsys_rgw
40
41 static int parse_date_str(string& in, real_time& out) {
42 uint64_t epoch = 0;
43 uint64_t nsec = 0;
44
45 if (!in.empty()) {
46 if (utime_t::parse_date(in, &epoch, &nsec) < 0) {
47 dout(5) << "Error parsing date " << in << dendl;
48 return -EINVAL;
49 }
50 }
51 out = utime_t(epoch, nsec).to_real_time();
52 return 0;
53 }
54
55 void RGWOp_MDLog_List::execute() {
56 string period = s->info.args.get("period");
57 string shard = s->info.args.get("id");
58 string max_entries_str = s->info.args.get("max-entries");
59 string st = s->info.args.get("start-time"),
60 et = s->info.args.get("end-time"),
61 marker = s->info.args.get("marker"),
62 err;
63 real_time ut_st,
64 ut_et;
65 void *handle;
66 unsigned shard_id, max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
67
68 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
69 if (!err.empty()) {
70 dout(5) << "Error parsing shard_id " << shard << dendl;
71 http_ret = -EINVAL;
72 return;
73 }
74
75 if (parse_date_str(st, ut_st) < 0) {
76 http_ret = -EINVAL;
77 return;
78 }
79
80 if (parse_date_str(et, ut_et) < 0) {
81 http_ret = -EINVAL;
82 return;
83 }
84
85 if (!max_entries_str.empty()) {
86 max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
87 if (!err.empty()) {
88 dout(5) << "Error parsing max-entries " << max_entries_str << dendl;
89 http_ret = -EINVAL;
90 return;
91 }
92 if (max_entries > LOG_CLASS_LIST_MAX_ENTRIES) {
93 max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
94 }
95 }
96
97 if (period.empty()) {
98 ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
99 period = store->svc()->zone->get_current_period_id();
100 if (period.empty()) {
101 ldout(s->cct, 5) << "Missing period id" << dendl;
102 http_ret = -EINVAL;
103 return;
104 }
105 }
106
107 RGWMetadataLog meta_log{s->cct, store->svc()->zone, store->svc()->cls, period};
108
109 meta_log.init_list_entries(shard_id, ut_st, ut_et, marker, &handle);
110
111 http_ret = meta_log.list_entries(handle, max_entries, entries,
112 &last_marker, &truncated);
113
114 meta_log.complete_list_entries(handle);
115 }
116
117 void RGWOp_MDLog_List::send_response() {
118 set_req_state_err(s, http_ret);
119 dump_errno(s);
120 end_header(s);
121
122 if (http_ret < 0)
123 return;
124
125 s->formatter->open_object_section("log_entries");
126 s->formatter->dump_string("marker", last_marker);
127 s->formatter->dump_bool("truncated", truncated);
128 {
129 s->formatter->open_array_section("entries");
130 for (list<cls_log_entry>::iterator iter = entries.begin();
131 iter != entries.end(); ++iter) {
132 cls_log_entry& entry = *iter;
133 store->ctl()->meta.mgr->dump_log_entry(entry, s->formatter);
134 flusher.flush();
135 }
136 s->formatter->close_section();
137 }
138 s->formatter->close_section();
139 flusher.flush();
140 }
141
142 void RGWOp_MDLog_Info::execute() {
143 num_objects = s->cct->_conf->rgw_md_log_max_shards;
144 period = store->svc()->mdlog->read_oldest_log_period();
145 http_ret = period.get_error();
146 }
147
148 void RGWOp_MDLog_Info::send_response() {
149 set_req_state_err(s, http_ret);
150 dump_errno(s);
151 end_header(s);
152
153 s->formatter->open_object_section("mdlog");
154 s->formatter->dump_unsigned("num_objects", num_objects);
155 if (period) {
156 s->formatter->dump_string("period", period.get_period().get_id());
157 s->formatter->dump_unsigned("realm_epoch", period.get_epoch());
158 }
159 s->formatter->close_section();
160 flusher.flush();
161 }
162
163 void RGWOp_MDLog_ShardInfo::execute() {
164 string period = s->info.args.get("period");
165 string shard = s->info.args.get("id");
166 string err;
167
168 unsigned shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
169 if (!err.empty()) {
170 dout(5) << "Error parsing shard_id " << shard << dendl;
171 http_ret = -EINVAL;
172 return;
173 }
174
175 if (period.empty()) {
176 ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
177 period = store->svc()->zone->get_current_period_id();
178
179 if (period.empty()) {
180 ldout(s->cct, 5) << "Missing period id" << dendl;
181 http_ret = -EINVAL;
182 return;
183 }
184 }
185 RGWMetadataLog meta_log{s->cct, store->svc()->zone, store->svc()->cls, period};
186
187 http_ret = meta_log.get_info(shard_id, &info);
188 }
189
190 void RGWOp_MDLog_ShardInfo::send_response() {
191 set_req_state_err(s, http_ret);
192 dump_errno(s);
193 end_header(s);
194
195 encode_json("info", info, s->formatter);
196 flusher.flush();
197 }
198
199 void RGWOp_MDLog_Delete::execute() {
200 string st = s->info.args.get("start-time"),
201 et = s->info.args.get("end-time"),
202 start_marker = s->info.args.get("start-marker"),
203 end_marker = s->info.args.get("end-marker"),
204 period = s->info.args.get("period"),
205 shard = s->info.args.get("id"),
206 err;
207 real_time ut_st,
208 ut_et;
209 unsigned shard_id;
210
211 http_ret = 0;
212
213 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
214 if (!err.empty()) {
215 dout(5) << "Error parsing shard_id " << shard << dendl;
216 http_ret = -EINVAL;
217 return;
218 }
219 if (et.empty() && end_marker.empty()) { /* bounding end */
220 http_ret = -EINVAL;
221 return;
222 }
223
224 if (parse_date_str(st, ut_st) < 0) {
225 http_ret = -EINVAL;
226 return;
227 }
228
229 if (parse_date_str(et, ut_et) < 0) {
230 http_ret = -EINVAL;
231 return;
232 }
233
234 if (period.empty()) {
235 ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
236 period = store->svc()->zone->get_current_period_id();
237
238 if (period.empty()) {
239 ldout(s->cct, 5) << "Missing period id" << dendl;
240 http_ret = -EINVAL;
241 return;
242 }
243 }
244 RGWMetadataLog meta_log{s->cct, store->svc()->zone, store->svc()->cls, period};
245
246 http_ret = meta_log.trim(shard_id, ut_st, ut_et, start_marker, end_marker);
247 }
248
249 void RGWOp_MDLog_Lock::execute() {
250 string period, shard_id_str, duration_str, locker_id, zone_id;
251 unsigned shard_id;
252
253 http_ret = 0;
254
255 period = s->info.args.get("period");
256 shard_id_str = s->info.args.get("id");
257 duration_str = s->info.args.get("length");
258 locker_id = s->info.args.get("locker-id");
259 zone_id = s->info.args.get("zone-id");
260
261 if (period.empty()) {
262 ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
263 period = store->svc()->zone->get_current_period_id();
264 }
265
266 if (period.empty() ||
267 shard_id_str.empty() ||
268 (duration_str.empty()) ||
269 locker_id.empty() ||
270 zone_id.empty()) {
271 dout(5) << "Error invalid parameter list" << dendl;
272 http_ret = -EINVAL;
273 return;
274 }
275
276 string err;
277 shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
278 if (!err.empty()) {
279 dout(5) << "Error parsing shard_id param " << shard_id_str << dendl;
280 http_ret = -EINVAL;
281 return;
282 }
283
284 RGWMetadataLog meta_log{s->cct, store->svc()->zone, store->svc()->cls, period};
285 unsigned dur;
286 dur = (unsigned)strict_strtol(duration_str.c_str(), 10, &err);
287 if (!err.empty() || dur <= 0) {
288 dout(5) << "invalid length param " << duration_str << dendl;
289 http_ret = -EINVAL;
290 return;
291 }
292 http_ret = meta_log.lock_exclusive(shard_id, make_timespan(dur), zone_id,
293 locker_id);
294 if (http_ret == -EBUSY)
295 http_ret = -ERR_LOCKED;
296 }
297
298 void RGWOp_MDLog_Unlock::execute() {
299 string period, shard_id_str, locker_id, zone_id;
300 unsigned shard_id;
301
302 http_ret = 0;
303
304 period = s->info.args.get("period");
305 shard_id_str = s->info.args.get("id");
306 locker_id = s->info.args.get("locker-id");
307 zone_id = s->info.args.get("zone-id");
308
309 if (period.empty()) {
310 ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
311 period = store->svc()->zone->get_current_period_id();
312 }
313
314 if (period.empty() ||
315 shard_id_str.empty() ||
316 locker_id.empty() ||
317 zone_id.empty()) {
318 dout(5) << "Error invalid parameter list" << dendl;
319 http_ret = -EINVAL;
320 return;
321 }
322
323 string err;
324 shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
325 if (!err.empty()) {
326 dout(5) << "Error parsing shard_id param " << shard_id_str << dendl;
327 http_ret = -EINVAL;
328 return;
329 }
330
331 RGWMetadataLog meta_log{s->cct, store->svc()->zone, store->svc()->cls, period};
332 http_ret = meta_log.unlock(shard_id, zone_id, locker_id);
333 }
334
335 void RGWOp_MDLog_Notify::execute() {
336 #define LARGE_ENOUGH_BUF (128 * 1024)
337
338 int r = 0;
339 bufferlist data;
340 std::tie(r, data) = rgw_rest_read_all_input(s, LARGE_ENOUGH_BUF);
341 if (r < 0) {
342 http_ret = r;
343 return;
344 }
345
346 char* buf = data.c_str();
347 ldout(s->cct, 20) << __func__ << "(): read data: " << buf << dendl;
348
349 JSONParser p;
350 r = p.parse(buf, data.length());
351 if (r < 0) {
352 ldout(s->cct, 0) << "ERROR: failed to parse JSON" << dendl;
353 http_ret = r;
354 return;
355 }
356
357 set<int> updated_shards;
358 try {
359 decode_json_obj(updated_shards, &p);
360 } catch (JSONDecoder::err& err) {
361 ldout(s->cct, 0) << "ERROR: failed to decode JSON" << dendl;
362 http_ret = -EINVAL;
363 return;
364 }
365
366 if (store->ctx()->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) {
367 for (set<int>::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) {
368 ldout(s->cct, 20) << __func__ << "(): updated shard=" << *iter << dendl;
369 }
370 }
371
372 store->getRados()->wakeup_meta_sync_shards(updated_shards);
373
374 http_ret = 0;
375 }
376
377 void RGWOp_BILog_List::execute() {
378 string tenant_name = s->info.args.get("tenant"),
379 bucket_name = s->info.args.get("bucket"),
380 marker = s->info.args.get("marker"),
381 max_entries_str = s->info.args.get("max-entries"),
382 bucket_instance = s->info.args.get("bucket-instance");
383 RGWBucketInfo bucket_info;
384 unsigned max_entries;
385
386 if (bucket_name.empty() && bucket_instance.empty()) {
387 dout(5) << "ERROR: neither bucket nor bucket instance specified" << dendl;
388 http_ret = -EINVAL;
389 return;
390 }
391
392 int shard_id;
393 string bn;
394 http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bn, &bucket_instance, &shard_id);
395 if (http_ret < 0) {
396 return;
397 }
398
399 if (!bucket_instance.empty()) {
400 rgw_bucket b(rgw_bucket_key(tenant_name, bn, bucket_instance));
401 http_ret = store->getRados()->get_bucket_instance_info(*s->sysobj_ctx, b, bucket_info, NULL, NULL, s->yield);
402 if (http_ret < 0) {
403 ldpp_dout(s, 5) << "could not get bucket instance info for bucket instance id=" << bucket_instance << dendl;
404 return;
405 }
406 } else { /* !bucket_name.empty() */
407 http_ret = store->getRados()->get_bucket_info(store->svc(), tenant_name, bucket_name, bucket_info, NULL, s->yield, NULL);
408 if (http_ret < 0) {
409 ldpp_dout(s, 5) << "could not get bucket info for bucket=" << bucket_name << dendl;
410 return;
411 }
412 }
413
414 bool truncated;
415 unsigned count = 0;
416 string err;
417
418 max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
419 if (!err.empty())
420 max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
421
422 send_response();
423 do {
424 list<rgw_bi_log_entry> entries;
425 int ret = store->svc()->bilog_rados->log_list(bucket_info, shard_id,
426 marker, max_entries - count,
427 entries, &truncated);
428 if (ret < 0) {
429 ldpp_dout(s, 5) << "ERROR: list_bi_log_entries()" << dendl;
430 return;
431 }
432
433 count += entries.size();
434
435 send_response(entries, marker);
436 } while (truncated && count < max_entries);
437
438 send_response_end();
439 }
440
441 void RGWOp_BILog_List::send_response() {
442 if (sent_header)
443 return;
444
445 set_req_state_err(s, http_ret);
446 dump_errno(s);
447 end_header(s);
448
449 sent_header = true;
450
451 if (http_ret < 0)
452 return;
453
454 s->formatter->open_array_section("entries");
455 }
456
457 void RGWOp_BILog_List::send_response(list<rgw_bi_log_entry>& entries, string& marker)
458 {
459 for (list<rgw_bi_log_entry>::iterator iter = entries.begin(); iter != entries.end(); ++iter) {
460 rgw_bi_log_entry& entry = *iter;
461 encode_json("entry", entry, s->formatter);
462
463 marker = entry.id;
464 flusher.flush();
465 }
466 }
467
468 void RGWOp_BILog_List::send_response_end() {
469 s->formatter->close_section();
470 flusher.flush();
471 }
472
473 void RGWOp_BILog_Info::execute() {
474 string tenant_name = s->info.args.get("tenant"),
475 bucket_name = s->info.args.get("bucket"),
476 bucket_instance = s->info.args.get("bucket-instance");
477 RGWBucketInfo bucket_info;
478
479 if (bucket_name.empty() && bucket_instance.empty()) {
480 ldpp_dout(s, 5) << "ERROR: neither bucket nor bucket instance specified" << dendl;
481 http_ret = -EINVAL;
482 return;
483 }
484
485 int shard_id;
486 string bn;
487 http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bn, &bucket_instance, &shard_id);
488 if (http_ret < 0) {
489 return;
490 }
491
492 if (!bucket_instance.empty()) {
493 rgw_bucket b(rgw_bucket_key(tenant_name, bn, bucket_instance));
494 http_ret = store->getRados()->get_bucket_instance_info(*s->sysobj_ctx, b, bucket_info, NULL, NULL, s->yield);
495 if (http_ret < 0) {
496 ldpp_dout(s, 5) << "could not get bucket instance info for bucket instance id=" << bucket_instance << dendl;
497 return;
498 }
499 } else { /* !bucket_name.empty() */
500 http_ret = store->getRados()->get_bucket_info(store->svc(), tenant_name, bucket_name, bucket_info, NULL, s->yield, NULL);
501 if (http_ret < 0) {
502 ldpp_dout(s, 5) << "could not get bucket info for bucket=" << bucket_name << dendl;
503 return;
504 }
505 }
506 map<RGWObjCategory, RGWStorageStats> stats;
507 int ret = store->getRados()->get_bucket_stats(bucket_info, shard_id, &bucket_ver, &master_ver, stats, &max_marker, &syncstopped);
508 if (ret < 0 && ret != -ENOENT) {
509 http_ret = ret;
510 return;
511 }
512 }
513
514 void RGWOp_BILog_Info::send_response() {
515 set_req_state_err(s, http_ret);
516 dump_errno(s);
517 end_header(s);
518
519 if (http_ret < 0)
520 return;
521
522 s->formatter->open_object_section("info");
523 encode_json("bucket_ver", bucket_ver, s->formatter);
524 encode_json("master_ver", master_ver, s->formatter);
525 encode_json("max_marker", max_marker, s->formatter);
526 encode_json("syncstopped", syncstopped, s->formatter);
527 s->formatter->close_section();
528
529 flusher.flush();
530 }
531
532 void RGWOp_BILog_Delete::execute() {
533 string tenant_name = s->info.args.get("tenant"),
534 bucket_name = s->info.args.get("bucket"),
535 start_marker = s->info.args.get("start-marker"),
536 end_marker = s->info.args.get("end-marker"),
537 bucket_instance = s->info.args.get("bucket-instance");
538
539 RGWBucketInfo bucket_info;
540
541 http_ret = 0;
542 if ((bucket_name.empty() && bucket_instance.empty()) ||
543 end_marker.empty()) {
544 ldpp_dout(s, 5) << "ERROR: one of bucket and bucket instance, and also end-marker is mandatory" << dendl;
545 http_ret = -EINVAL;
546 return;
547 }
548
549 int shard_id;
550 string bn;
551 http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bn, &bucket_instance, &shard_id);
552 if (http_ret < 0) {
553 return;
554 }
555
556 if (!bucket_instance.empty()) {
557 rgw_bucket b(rgw_bucket_key(tenant_name, bn, bucket_instance));
558 http_ret = store->getRados()->get_bucket_instance_info(*s->sysobj_ctx, b, bucket_info, NULL, NULL, s->yield);
559 if (http_ret < 0) {
560 ldpp_dout(s, 5) << "could not get bucket instance info for bucket instance id=" << bucket_instance << dendl;
561 return;
562 }
563 } else { /* !bucket_name.empty() */
564 http_ret = store->getRados()->get_bucket_info(store->svc(), tenant_name, bucket_name, bucket_info, NULL, s->yield, NULL);
565 if (http_ret < 0) {
566 ldpp_dout(s, 5) << "could not get bucket info for bucket=" << bucket_name << dendl;
567 return;
568 }
569 }
570 http_ret = store->svc()->bilog_rados->log_trim(bucket_info, shard_id, start_marker, end_marker);
571 if (http_ret < 0) {
572 ldpp_dout(s, 5) << "ERROR: trim_bi_log_entries() " << dendl;
573 }
574 return;
575 }
576
577 void RGWOp_DATALog_List::execute() {
578 string shard = s->info.args.get("id");
579
580 string st = s->info.args.get("start-time"),
581 et = s->info.args.get("end-time"),
582 max_entries_str = s->info.args.get("max-entries"),
583 marker = s->info.args.get("marker"),
584 err;
585 real_time ut_st,
586 ut_et;
587 unsigned shard_id, max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
588
589 s->info.args.get_bool("extra-info", &extra_info, false);
590
591 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
592 if (!err.empty()) {
593 dout(5) << "Error parsing shard_id " << shard << dendl;
594 http_ret = -EINVAL;
595 return;
596 }
597
598 if (parse_date_str(st, ut_st) < 0) {
599 http_ret = -EINVAL;
600 return;
601 }
602
603 if (parse_date_str(et, ut_et) < 0) {
604 http_ret = -EINVAL;
605 return;
606 }
607
608 if (!max_entries_str.empty()) {
609 max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
610 if (!err.empty()) {
611 dout(5) << "Error parsing max-entries " << max_entries_str << dendl;
612 http_ret = -EINVAL;
613 return;
614 }
615 if (max_entries > LOG_CLASS_LIST_MAX_ENTRIES) {
616 max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
617 }
618 }
619
620 // Note that last_marker is updated to be the marker of the last
621 // entry listed
622 http_ret = store->svc()->datalog_rados->list_entries(shard_id, ut_st, ut_et,
623 max_entries, entries, marker,
624 &last_marker, &truncated);
625 }
626
627 void RGWOp_DATALog_List::send_response() {
628 set_req_state_err(s, http_ret);
629 dump_errno(s);
630 end_header(s);
631
632 if (http_ret < 0)
633 return;
634
635 s->formatter->open_object_section("log_entries");
636 s->formatter->dump_string("marker", last_marker);
637 s->formatter->dump_bool("truncated", truncated);
638 {
639 s->formatter->open_array_section("entries");
640 for (list<rgw_data_change_log_entry>::iterator iter = entries.begin();
641 iter != entries.end(); ++iter) {
642 rgw_data_change_log_entry& entry = *iter;
643 if (!extra_info) {
644 encode_json("entry", entry.entry, s->formatter);
645 } else {
646 encode_json("entry", entry, s->formatter);
647 }
648 flusher.flush();
649 }
650 s->formatter->close_section();
651 }
652 s->formatter->close_section();
653 flusher.flush();
654 }
655
656
657 void RGWOp_DATALog_Info::execute() {
658 num_objects = s->cct->_conf->rgw_data_log_num_shards;
659 http_ret = 0;
660 }
661
662 void RGWOp_DATALog_Info::send_response() {
663 set_req_state_err(s, http_ret);
664 dump_errno(s);
665 end_header(s);
666
667 s->formatter->open_object_section("num_objects");
668 s->formatter->dump_unsigned("num_objects", num_objects);
669 s->formatter->close_section();
670 flusher.flush();
671 }
672
673 void RGWOp_DATALog_ShardInfo::execute() {
674 string shard = s->info.args.get("id");
675 string err;
676
677 unsigned shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
678 if (!err.empty()) {
679 dout(5) << "Error parsing shard_id " << shard << dendl;
680 http_ret = -EINVAL;
681 return;
682 }
683
684 http_ret = store->svc()->datalog_rados->get_info(shard_id, &info);
685 }
686
687 void RGWOp_DATALog_ShardInfo::send_response() {
688 set_req_state_err(s, http_ret);
689 dump_errno(s);
690 end_header(s);
691
692 encode_json("info", info, s->formatter);
693 flusher.flush();
694 }
695
696 void RGWOp_DATALog_Notify::execute() {
697 string source_zone = s->info.args.get("source-zone");
698 #define LARGE_ENOUGH_BUF (128 * 1024)
699
700 int r = 0;
701 bufferlist data;
702 std::tie(r, data) = rgw_rest_read_all_input(s, LARGE_ENOUGH_BUF);
703 if (r < 0) {
704 http_ret = r;
705 return;
706 }
707
708 char* buf = data.c_str();
709 ldout(s->cct, 20) << __func__ << "(): read data: " << buf << dendl;
710
711 JSONParser p;
712 r = p.parse(buf, data.length());
713 if (r < 0) {
714 ldout(s->cct, 0) << "ERROR: failed to parse JSON" << dendl;
715 http_ret = r;
716 return;
717 }
718
719 map<int, set<string> > updated_shards;
720 try {
721 decode_json_obj(updated_shards, &p);
722 } catch (JSONDecoder::err& err) {
723 ldout(s->cct, 0) << "ERROR: failed to decode JSON" << dendl;
724 http_ret = -EINVAL;
725 return;
726 }
727
728 if (store->ctx()->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) {
729 for (map<int, set<string> >::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) {
730 ldout(s->cct, 20) << __func__ << "(): updated shard=" << iter->first << dendl;
731 set<string>& keys = iter->second;
732 for (set<string>::iterator kiter = keys.begin(); kiter != keys.end(); ++kiter) {
733 ldout(s->cct, 20) << __func__ << "(): modified key=" << *kiter << dendl;
734 }
735 }
736 }
737
738 store->getRados()->wakeup_data_sync_shards(source_zone, updated_shards);
739
740 http_ret = 0;
741 }
742
743 void RGWOp_DATALog_Delete::execute() {
744 string st = s->info.args.get("start-time"),
745 et = s->info.args.get("end-time"),
746 start_marker = s->info.args.get("start-marker"),
747 end_marker = s->info.args.get("end-marker"),
748 shard = s->info.args.get("id"),
749 err;
750 real_time ut_st,
751 ut_et;
752 unsigned shard_id;
753
754 http_ret = 0;
755
756 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
757 if (!err.empty()) {
758 dout(5) << "Error parsing shard_id " << shard << dendl;
759 http_ret = -EINVAL;
760 return;
761 }
762 if (et.empty() && end_marker.empty()) { /* bounding end */
763 http_ret = -EINVAL;
764 return;
765 }
766
767 if (parse_date_str(st, ut_st) < 0) {
768 http_ret = -EINVAL;
769 return;
770 }
771
772 if (parse_date_str(et, ut_et) < 0) {
773 http_ret = -EINVAL;
774 return;
775 }
776
777 http_ret = store->svc()->datalog_rados->trim_entries(shard_id, ut_st, ut_et, start_marker, end_marker);
778 }
779
780 // not in header to avoid pulling in rgw_sync.h
781 class RGWOp_MDLog_Status : public RGWRESTOp {
782 rgw_meta_sync_status status;
783 public:
784 int check_caps(const RGWUserCaps& caps) override {
785 return caps.check_cap("mdlog", RGW_CAP_READ);
786 }
787 int verify_permission() override {
788 return check_caps(s->user->get_caps());
789 }
790 void execute() override;
791 void send_response() override;
792 const char* name() const override { return "get_metadata_log_status"; }
793 };
794
795 void RGWOp_MDLog_Status::execute()
796 {
797 auto sync = store->getRados()->get_meta_sync_manager();
798 if (sync == nullptr) {
799 ldout(s->cct, 1) << "no sync manager" << dendl;
800 http_ret = -ENOENT;
801 return;
802 }
803 http_ret = sync->read_sync_status(&status);
804 }
805
806 void RGWOp_MDLog_Status::send_response()
807 {
808 set_req_state_err(s, http_ret);
809 dump_errno(s);
810 end_header(s);
811
812 if (http_ret >= 0) {
813 encode_json("status", status, s->formatter);
814 }
815 flusher.flush();
816 }
817
818 // not in header to avoid pulling in rgw_data_sync.h
819 class RGWOp_BILog_Status : public RGWRESTOp {
820 std::vector<rgw_bucket_shard_sync_info> status;
821 public:
822 int check_caps(const RGWUserCaps& caps) override {
823 return caps.check_cap("bilog", RGW_CAP_READ);
824 }
825 int verify_permission() override {
826 return check_caps(s->user->get_caps());
827 }
828 void execute() override;
829 void send_response() override;
830 const char* name() const override { return "get_bucket_index_log_status"; }
831 };
832
833 void RGWOp_BILog_Status::execute()
834 {
835 const auto options = s->info.args.get("options");
836 bool merge = (options == "merge");
837 const auto source_zone = s->info.args.get("source-zone");
838 const auto source_key = s->info.args.get("source-bucket");
839 auto key = s->info.args.get("bucket");
840 if (key.empty()) {
841 key = source_key;
842 }
843 if (key.empty()) {
844 ldpp_dout(s, 4) << "no 'bucket' provided" << dendl;
845 http_ret = -EINVAL;
846 return;
847 }
848
849 rgw_bucket bucket;
850 int shard_id{-1}; // unused
851 http_ret = rgw_bucket_parse_bucket_key(s->cct, key, &bucket, &shard_id);
852 if (http_ret < 0) {
853 ldpp_dout(s, 4) << "invalid 'bucket' provided" << dendl;
854 http_ret = -EINVAL;
855 return;
856 }
857
858 // read the bucket instance info for num_shards
859 auto ctx = store->svc()->sysobj->init_obj_ctx();
860 RGWBucketInfo info;
861 http_ret = store->getRados()->get_bucket_instance_info(ctx, bucket, info, nullptr, nullptr, s->yield);
862 if (http_ret < 0) {
863 ldpp_dout(s, 4) << "failed to read bucket info: " << cpp_strerror(http_ret) << dendl;
864 return;
865 }
866
867 rgw_bucket source_bucket;
868
869 if (source_key.empty() ||
870 source_key == key) {
871 source_bucket = info.bucket;
872 } else {
873 http_ret = rgw_bucket_parse_bucket_key(s->cct, source_key, &source_bucket, nullptr);
874 if (http_ret < 0) {
875 ldpp_dout(s, 4) << "invalid 'source-bucket' provided (key=" << source_key << ")" << dendl;
876 return;
877 }
878 }
879
880 const auto& local_zone_id = store->svc()->zone->zone_id();
881
882 if (!merge) {
883 rgw_sync_bucket_pipe pipe;
884 pipe.source.zone = source_zone;
885 pipe.source.bucket = source_bucket;
886 pipe.dest.zone = local_zone_id;
887 pipe.dest.bucket = info.bucket;
888
889 ldout(s->cct, 20) << "RGWOp_BILog_Status::execute(): getting sync status for pipe=" << pipe << dendl;
890
891 http_ret = rgw_bucket_sync_status(this, store, pipe, info, nullptr, &status);
892
893 if (http_ret < 0) {
894 lderr(s->cct) << "ERROR: rgw_bucket_sync_status() on pipe=" << pipe << " returned ret=" << http_ret << dendl;
895 }
896 return;
897 }
898
899 rgw_zone_id source_zone_id(source_zone);
900
901 RGWBucketSyncPolicyHandlerRef source_handler;
902 http_ret = store->ctl()->bucket->get_sync_policy_handler(source_zone_id, source_bucket, &source_handler, null_yield);
903 if (http_ret < 0) {
904 lderr(s->cct) << "could not get bucket sync policy handler (r=" << http_ret << ")" << dendl;
905 return;
906 }
907
908 auto local_dests = source_handler->get_all_dests_in_zone(local_zone_id);
909
910 std::vector<rgw_bucket_shard_sync_info> current_status;
911 for (auto& entry : local_dests) {
912 auto pipe = entry.second;
913
914 ldout(s->cct, 20) << "RGWOp_BILog_Status::execute(): getting sync status for pipe=" << pipe << dendl;
915
916 RGWBucketInfo *pinfo = &info;
917 std::optional<RGWBucketInfo> opt_dest_info;
918
919 if (!pipe.dest.bucket) {
920 /* Uh oh, something went wrong */
921 ldout(s->cct, 20) << "ERROR: RGWOp_BILog_Status::execute(): BUG: pipe.dest.bucket was not initialized" << pipe << dendl;
922 http_ret = -EIO;
923 return;
924 }
925
926 if (*pipe.dest.bucket != info.bucket) {
927 opt_dest_info.emplace();
928 pinfo = &(*opt_dest_info);
929
930 /* dest bucket might not have a bucket id */
931 http_ret = store->ctl()->bucket->read_bucket_info(*pipe.dest.bucket,
932 pinfo,
933 s->yield,
934 RGWBucketCtl::BucketInstance::GetParams(),
935 nullptr);
936 if (http_ret < 0) {
937 ldpp_dout(s, 4) << "failed to read target bucket info (bucket=: " << cpp_strerror(http_ret) << dendl;
938 return;
939 }
940
941 pipe.dest.bucket = pinfo->bucket;
942 }
943
944 int r = rgw_bucket_sync_status(this, store, pipe, *pinfo, &info, &current_status);
945 if (r < 0) {
946 lderr(s->cct) << "ERROR: rgw_bucket_sync_status() on pipe=" << pipe << " returned ret=" << r << dendl;
947 http_ret = r;
948 return;
949 }
950
951 if (status.empty()) {
952 status = std::move(current_status);
953 } else {
954 if (current_status.size() !=
955 status.size()) {
956 http_ret = -EINVAL;
957 lderr(s->cct) << "ERROR: different number of shards for sync status of buckets syncing from the same source: status.size()= " << status.size() << " current_status.size()=" << current_status.size() << dendl;
958 return;
959 }
960 auto m = status.begin();
961 for (auto& cur_shard_status : current_status) {
962 auto& result_shard_status = *m++;
963 // always take the first marker, or any later marker that's smaller
964 if (cur_shard_status.inc_marker.position < result_shard_status.inc_marker.position) {
965 result_shard_status = std::move(cur_shard_status);
966 }
967 }
968 }
969 }
970 }
971
972 void RGWOp_BILog_Status::send_response()
973 {
974 set_req_state_err(s, http_ret);
975 dump_errno(s);
976 end_header(s);
977
978 if (http_ret >= 0) {
979 encode_json("status", status, s->formatter);
980 }
981 flusher.flush();
982 }
983
984 // not in header to avoid pulling in rgw_data_sync.h
985 class RGWOp_DATALog_Status : public RGWRESTOp {
986 rgw_data_sync_status status;
987 public:
988 int check_caps(const RGWUserCaps& caps) override {
989 return caps.check_cap("datalog", RGW_CAP_READ);
990 }
991 int verify_permission() override {
992 return check_caps(s->user->get_caps());
993 }
994 void execute() override ;
995 void send_response() override;
996 const char* name() const override { return "get_data_changes_log_status"; }
997 };
998
999 void RGWOp_DATALog_Status::execute()
1000 {
1001 const auto source_zone = s->info.args.get("source-zone");
1002 auto sync = store->getRados()->get_data_sync_manager(source_zone);
1003 if (sync == nullptr) {
1004 ldout(s->cct, 1) << "no sync manager for source-zone " << source_zone << dendl;
1005 http_ret = -ENOENT;
1006 return;
1007 }
1008 http_ret = sync->read_sync_status(&status);
1009 }
1010
1011 void RGWOp_DATALog_Status::send_response()
1012 {
1013 set_req_state_err(s, http_ret);
1014 dump_errno(s);
1015 end_header(s);
1016
1017 if (http_ret >= 0) {
1018 encode_json("status", status, s->formatter);
1019 }
1020 flusher.flush();
1021 }
1022
1023
1024 RGWOp *RGWHandler_Log::op_get() {
1025 bool exists;
1026 string type = s->info.args.get("type", &exists);
1027
1028 if (!exists) {
1029 return NULL;
1030 }
1031
1032 if (type.compare("metadata") == 0) {
1033 if (s->info.args.exists("id")) {
1034 if (s->info.args.exists("info")) {
1035 return new RGWOp_MDLog_ShardInfo;
1036 } else {
1037 return new RGWOp_MDLog_List;
1038 }
1039 } else if (s->info.args.exists("status")) {
1040 return new RGWOp_MDLog_Status;
1041 } else {
1042 return new RGWOp_MDLog_Info;
1043 }
1044 } else if (type.compare("bucket-index") == 0) {
1045 if (s->info.args.exists("info")) {
1046 return new RGWOp_BILog_Info;
1047 } else if (s->info.args.exists("status")) {
1048 return new RGWOp_BILog_Status;
1049 } else {
1050 return new RGWOp_BILog_List;
1051 }
1052 } else if (type.compare("data") == 0) {
1053 if (s->info.args.exists("id")) {
1054 if (s->info.args.exists("info")) {
1055 return new RGWOp_DATALog_ShardInfo;
1056 } else {
1057 return new RGWOp_DATALog_List;
1058 }
1059 } else if (s->info.args.exists("status")) {
1060 return new RGWOp_DATALog_Status;
1061 } else {
1062 return new RGWOp_DATALog_Info;
1063 }
1064 }
1065 return NULL;
1066 }
1067
1068 RGWOp *RGWHandler_Log::op_delete() {
1069 bool exists;
1070 string type = s->info.args.get("type", &exists);
1071
1072 if (!exists) {
1073 return NULL;
1074 }
1075
1076 if (type.compare("metadata") == 0)
1077 return new RGWOp_MDLog_Delete;
1078 else if (type.compare("bucket-index") == 0)
1079 return new RGWOp_BILog_Delete;
1080 else if (type.compare("data") == 0)
1081 return new RGWOp_DATALog_Delete;
1082 return NULL;
1083 }
1084
1085 RGWOp *RGWHandler_Log::op_post() {
1086 bool exists;
1087 string type = s->info.args.get("type", &exists);
1088
1089 if (!exists) {
1090 return NULL;
1091 }
1092
1093 if (type.compare("metadata") == 0) {
1094 if (s->info.args.exists("lock"))
1095 return new RGWOp_MDLog_Lock;
1096 else if (s->info.args.exists("unlock"))
1097 return new RGWOp_MDLog_Unlock;
1098 else if (s->info.args.exists("notify"))
1099 return new RGWOp_MDLog_Notify;
1100 } else if (type.compare("data") == 0) {
1101 if (s->info.args.exists("notify"))
1102 return new RGWOp_DATALog_Notify;
1103 }
1104 return NULL;
1105 }
1106