]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_rest_log.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / rgw / rgw_rest_log.cc
CommitLineData
7c673cae 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
11fdf7f2 3
7c673cae
FG
4/*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
11fdf7f2 15
7c673cae
FG
16#include "common/ceph_json.h"
17#include "common/strtol.h"
18#include "rgw_rest.h"
19#include "rgw_op.h"
20#include "rgw_rest_s3.h"
21#include "rgw_rest_log.h"
22#include "rgw_client_io.h"
23#include "rgw_sync.h"
24#include "rgw_data_sync.h"
31f18b77 25#include "rgw_common.h"
11fdf7f2 26#include "rgw_zone.h"
9f95a23c 27#include "rgw_mdlog.h"
11fdf7f2
TL
28
29#include "services/svc_zone.h"
9f95a23c
TL
30#include "services/svc_mdlog.h"
31#include "services/svc_bilog_rados.h"
11fdf7f2 32
7c673cae 33#include "common/errno.h"
11fdf7f2 34#include "include/ceph_assert.h"
7c673cae
FG
35
36#define dout_context g_ceph_context
37#define LOG_CLASS_LIST_MAX_ENTRIES (1000)
38#define dout_subsys ceph_subsys_rgw
39
f67539c2 40void RGWOp_MDLog_List::execute(optional_yield y) {
7c673cae
FG
41 string period = s->info.args.get("period");
42 string shard = s->info.args.get("id");
43 string max_entries_str = s->info.args.get("max-entries");
f67539c2 44 string marker = s->info.args.get("marker"),
7c673cae 45 err;
7c673cae
FG
46 void *handle;
47 unsigned shard_id, max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
48
f67539c2
TL
49 if (s->info.args.exists("start-time") ||
50 s->info.args.exists("end-time")) {
b3b6e05e 51 ldpp_dout(this, 5) << "start-time and end-time are no longer accepted" << dendl;
f67539c2 52 op_ret = -EINVAL;
7c673cae
FG
53 return;
54 }
55
f67539c2
TL
56 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
57 if (!err.empty()) {
b3b6e05e 58 ldpp_dout(this, 5) << "Error parsing shard_id " << shard << dendl;
f67539c2 59 op_ret = -EINVAL;
7c673cae
FG
60 return;
61 }
62
63 if (!max_entries_str.empty()) {
64 max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
65 if (!err.empty()) {
b3b6e05e 66 ldpp_dout(this, 5) << "Error parsing max-entries " << max_entries_str << dendl;
f67539c2 67 op_ret = -EINVAL;
7c673cae
FG
68 return;
69 }
224ce89b
WB
70 if (max_entries > LOG_CLASS_LIST_MAX_ENTRIES) {
71 max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
72 }
f67539c2 73 }
7c673cae
FG
74
75 if (period.empty()) {
b3b6e05e 76 ldpp_dout(this, 5) << "Missing period id trying to use current" << dendl;
9f95a23c 77 period = store->svc()->zone->get_current_period_id();
7c673cae 78 if (period.empty()) {
b3b6e05e 79 ldpp_dout(this, 5) << "Missing period id" << dendl;
f67539c2 80 op_ret = -EINVAL;
7c673cae
FG
81 return;
82 }
83 }
84
9f95a23c 85 RGWMetadataLog meta_log{s->cct, store->svc()->zone, store->svc()->cls, period};
7c673cae 86
f67539c2 87 meta_log.init_list_entries(shard_id, {}, {}, marker, &handle);
7c673cae 88
b3b6e05e 89 op_ret = meta_log.list_entries(this, handle, max_entries, entries,
224ce89b 90 &last_marker, &truncated);
7c673cae
FG
91
92 meta_log.complete_list_entries(handle);
93}
94
95void RGWOp_MDLog_List::send_response() {
f67539c2 96 set_req_state_err(s, op_ret);
7c673cae
FG
97 dump_errno(s);
98 end_header(s);
99
f67539c2 100 if (op_ret < 0)
7c673cae
FG
101 return;
102
103 s->formatter->open_object_section("log_entries");
104 s->formatter->dump_string("marker", last_marker);
105 s->formatter->dump_bool("truncated", truncated);
106 {
107 s->formatter->open_array_section("entries");
108 for (list<cls_log_entry>::iterator iter = entries.begin();
109 iter != entries.end(); ++iter) {
110 cls_log_entry& entry = *iter;
9f95a23c 111 store->ctl()->meta.mgr->dump_log_entry(entry, s->formatter);
7c673cae
FG
112 flusher.flush();
113 }
114 s->formatter->close_section();
115 }
116 s->formatter->close_section();
117 flusher.flush();
118}
119
f67539c2 120void RGWOp_MDLog_Info::execute(optional_yield y) {
7c673cae 121 num_objects = s->cct->_conf->rgw_md_log_max_shards;
b3b6e05e 122 period = store->svc()->mdlog->read_oldest_log_period(y, s);
f67539c2 123 op_ret = period.get_error();
7c673cae
FG
124}
125
126void RGWOp_MDLog_Info::send_response() {
f67539c2 127 set_req_state_err(s, op_ret);
7c673cae
FG
128 dump_errno(s);
129 end_header(s);
130
131 s->formatter->open_object_section("mdlog");
132 s->formatter->dump_unsigned("num_objects", num_objects);
133 if (period) {
134 s->formatter->dump_string("period", period.get_period().get_id());
135 s->formatter->dump_unsigned("realm_epoch", period.get_epoch());
136 }
137 s->formatter->close_section();
138 flusher.flush();
139}
140
f67539c2 141void RGWOp_MDLog_ShardInfo::execute(optional_yield y) {
7c673cae
FG
142 string period = s->info.args.get("period");
143 string shard = s->info.args.get("id");
144 string err;
145
146 unsigned shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
147 if (!err.empty()) {
b3b6e05e 148 ldpp_dout(this, 5) << "Error parsing shard_id " << shard << dendl;
f67539c2 149 op_ret = -EINVAL;
7c673cae
FG
150 return;
151 }
152
153 if (period.empty()) {
b3b6e05e 154 ldpp_dout(this, 5) << "Missing period id trying to use current" << dendl;
9f95a23c 155 period = store->svc()->zone->get_current_period_id();
7c673cae
FG
156
157 if (period.empty()) {
b3b6e05e 158 ldpp_dout(this, 5) << "Missing period id" << dendl;
f67539c2 159 op_ret = -EINVAL;
7c673cae
FG
160 return;
161 }
162 }
9f95a23c 163 RGWMetadataLog meta_log{s->cct, store->svc()->zone, store->svc()->cls, period};
7c673cae 164
b3b6e05e 165 op_ret = meta_log.get_info(this, shard_id, &info);
7c673cae
FG
166}
167
168void RGWOp_MDLog_ShardInfo::send_response() {
f67539c2 169 set_req_state_err(s, op_ret);
7c673cae
FG
170 dump_errno(s);
171 end_header(s);
172
173 encode_json("info", info, s->formatter);
174 flusher.flush();
175}
176
f67539c2
TL
177void RGWOp_MDLog_Delete::execute(optional_yield y) {
178 string marker = s->info.args.get("marker"),
7c673cae
FG
179 period = s->info.args.get("period"),
180 shard = s->info.args.get("id"),
181 err;
7c673cae
FG
182 unsigned shard_id;
183
7c673cae 184
f67539c2
TL
185 if (s->info.args.exists("start-time") ||
186 s->info.args.exists("end-time")) {
b3b6e05e 187 ldpp_dout(this, 5) << "start-time and end-time are no longer accepted" << dendl;
f67539c2 188 op_ret = -EINVAL;
7c673cae 189 }
f67539c2
TL
190
191 if (s->info.args.exists("start-marker")) {
b3b6e05e 192 ldpp_dout(this, 5) << "start-marker is no longer accepted" << dendl;
f67539c2
TL
193 op_ret = -EINVAL;
194 }
195
196 if (s->info.args.exists("end-marker")) {
197 if (!s->info.args.exists("marker")) {
198 marker = s->info.args.get("end-marker");
199 } else {
b3b6e05e 200 ldpp_dout(this, 5) << "end-marker and marker cannot both be provided" << dendl;
f67539c2
TL
201 op_ret = -EINVAL;
202 }
7c673cae
FG
203 }
204
f67539c2
TL
205 op_ret = 0;
206
207 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
208 if (!err.empty()) {
b3b6e05e 209 ldpp_dout(this, 5) << "Error parsing shard_id " << shard << dendl;
f67539c2 210 op_ret = -EINVAL;
7c673cae
FG
211 return;
212 }
213
f67539c2
TL
214 if (marker.empty()) { /* bounding end */
215 op_ret = -EINVAL;
7c673cae
FG
216 return;
217 }
218
219 if (period.empty()) {
b3b6e05e 220 ldpp_dout(this, 5) << "Missing period id trying to use current" << dendl;
9f95a23c 221 period = store->svc()->zone->get_current_period_id();
7c673cae
FG
222
223 if (period.empty()) {
b3b6e05e 224 ldpp_dout(this, 5) << "Missing period id" << dendl;
f67539c2 225 op_ret = -EINVAL;
7c673cae
FG
226 return;
227 }
228 }
9f95a23c 229 RGWMetadataLog meta_log{s->cct, store->svc()->zone, store->svc()->cls, period};
7c673cae 230
b3b6e05e 231 op_ret = meta_log.trim(this, shard_id, {}, {}, {}, marker);
7c673cae
FG
232}
233
f67539c2 234void RGWOp_MDLog_Lock::execute(optional_yield y) {
7c673cae
FG
235 string period, shard_id_str, duration_str, locker_id, zone_id;
236 unsigned shard_id;
237
f67539c2 238 op_ret = 0;
7c673cae
FG
239
240 period = s->info.args.get("period");
241 shard_id_str = s->info.args.get("id");
242 duration_str = s->info.args.get("length");
243 locker_id = s->info.args.get("locker-id");
244 zone_id = s->info.args.get("zone-id");
245
246 if (period.empty()) {
b3b6e05e 247 ldpp_dout(this, 5) << "Missing period id trying to use current" << dendl;
9f95a23c 248 period = store->svc()->zone->get_current_period_id();
7c673cae
FG
249 }
250
251 if (period.empty() ||
252 shard_id_str.empty() ||
253 (duration_str.empty()) ||
254 locker_id.empty() ||
255 zone_id.empty()) {
b3b6e05e 256 ldpp_dout(this, 5) << "Error invalid parameter list" << dendl;
f67539c2 257 op_ret = -EINVAL;
7c673cae
FG
258 return;
259 }
260
261 string err;
262 shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
263 if (!err.empty()) {
b3b6e05e 264 ldpp_dout(this, 5) << "Error parsing shard_id param " << shard_id_str << dendl;
f67539c2 265 op_ret = -EINVAL;
7c673cae
FG
266 return;
267 }
268
9f95a23c 269 RGWMetadataLog meta_log{s->cct, store->svc()->zone, store->svc()->cls, period};
7c673cae
FG
270 unsigned dur;
271 dur = (unsigned)strict_strtol(duration_str.c_str(), 10, &err);
272 if (!err.empty() || dur <= 0) {
b3b6e05e 273 ldpp_dout(this, 5) << "invalid length param " << duration_str << dendl;
f67539c2 274 op_ret = -EINVAL;
7c673cae
FG
275 return;
276 }
b3b6e05e 277 op_ret = meta_log.lock_exclusive(s, shard_id, make_timespan(dur), zone_id,
7c673cae 278 locker_id);
f67539c2
TL
279 if (op_ret == -EBUSY)
280 op_ret = -ERR_LOCKED;
7c673cae
FG
281}
282
f67539c2 283void RGWOp_MDLog_Unlock::execute(optional_yield y) {
7c673cae
FG
284 string period, shard_id_str, locker_id, zone_id;
285 unsigned shard_id;
286
f67539c2 287 op_ret = 0;
7c673cae
FG
288
289 period = s->info.args.get("period");
290 shard_id_str = s->info.args.get("id");
291 locker_id = s->info.args.get("locker-id");
292 zone_id = s->info.args.get("zone-id");
293
294 if (period.empty()) {
b3b6e05e 295 ldpp_dout(this, 5) << "Missing period id trying to use current" << dendl;
9f95a23c 296 period = store->svc()->zone->get_current_period_id();
7c673cae
FG
297 }
298
299 if (period.empty() ||
300 shard_id_str.empty() ||
301 locker_id.empty() ||
302 zone_id.empty()) {
b3b6e05e 303 ldpp_dout(this, 5) << "Error invalid parameter list" << dendl;
f67539c2 304 op_ret = -EINVAL;
7c673cae
FG
305 return;
306 }
307
308 string err;
309 shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
310 if (!err.empty()) {
b3b6e05e 311 ldpp_dout(this, 5) << "Error parsing shard_id param " << shard_id_str << dendl;
f67539c2 312 op_ret = -EINVAL;
7c673cae
FG
313 return;
314 }
315
9f95a23c 316 RGWMetadataLog meta_log{s->cct, store->svc()->zone, store->svc()->cls, period};
b3b6e05e 317 op_ret = meta_log.unlock(s, shard_id, zone_id, locker_id);
7c673cae
FG
318}
319
f67539c2 320void RGWOp_MDLog_Notify::execute(optional_yield y) {
7c673cae 321#define LARGE_ENOUGH_BUF (128 * 1024)
11fdf7f2
TL
322
323 int r = 0;
324 bufferlist data;
325 std::tie(r, data) = rgw_rest_read_all_input(s, LARGE_ENOUGH_BUF);
7c673cae 326 if (r < 0) {
f67539c2 327 op_ret = r;
7c673cae
FG
328 return;
329 }
330
11fdf7f2 331 char* buf = data.c_str();
b3b6e05e 332 ldpp_dout(this, 20) << __func__ << "(): read data: " << buf << dendl;
7c673cae
FG
333
334 JSONParser p;
11fdf7f2 335 r = p.parse(buf, data.length());
7c673cae 336 if (r < 0) {
b3b6e05e 337 ldpp_dout(this, 0) << "ERROR: failed to parse JSON" << dendl;
f67539c2 338 op_ret = r;
7c673cae
FG
339 return;
340 }
341
342 set<int> updated_shards;
343 try {
344 decode_json_obj(updated_shards, &p);
345 } catch (JSONDecoder::err& err) {
b3b6e05e 346 ldpp_dout(this, 0) << "ERROR: failed to decode JSON" << dendl;
f67539c2 347 op_ret = -EINVAL;
7c673cae
FG
348 return;
349 }
350
11fdf7f2 351 if (store->ctx()->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) {
7c673cae 352 for (set<int>::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) {
b3b6e05e 353 ldpp_dout(this, 20) << __func__ << "(): updated shard=" << *iter << dendl;
7c673cae
FG
354 }
355 }
356
9f95a23c 357 store->getRados()->wakeup_meta_sync_shards(updated_shards);
7c673cae 358
f67539c2 359 op_ret = 0;
7c673cae
FG
360}
361
f67539c2 362void RGWOp_BILog_List::execute(optional_yield y) {
7c673cae
FG
363 string tenant_name = s->info.args.get("tenant"),
364 bucket_name = s->info.args.get("bucket"),
365 marker = s->info.args.get("marker"),
366 max_entries_str = s->info.args.get("max-entries"),
367 bucket_instance = s->info.args.get("bucket-instance");
368 RGWBucketInfo bucket_info;
369 unsigned max_entries;
370
7c673cae 371 if (bucket_name.empty() && bucket_instance.empty()) {
b3b6e05e 372 ldpp_dout(this, 5) << "ERROR: neither bucket nor bucket instance specified" << dendl;
f67539c2 373 op_ret = -EINVAL;
7c673cae
FG
374 return;
375 }
376
377 int shard_id;
9f95a23c 378 string bn;
f67539c2
TL
379 op_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bn, &bucket_instance, &shard_id);
380 if (op_ret < 0) {
7c673cae
FG
381 return;
382 }
383
384 if (!bucket_instance.empty()) {
9f95a23c 385 rgw_bucket b(rgw_bucket_key(tenant_name, bn, bucket_instance));
b3b6e05e 386 op_ret = store->getRados()->get_bucket_instance_info(*s->sysobj_ctx, b, bucket_info, NULL, NULL, s->yield, this);
f67539c2 387 if (op_ret < 0) {
b3b6e05e 388 ldpp_dout(this, 5) << "could not get bucket instance info for bucket instance id=" << bucket_instance << dendl;
7c673cae
FG
389 return;
390 }
391 } else { /* !bucket_name.empty() */
f67539c2
TL
392 op_ret = store->getRados()->get_bucket_info(store->svc(), tenant_name, bucket_name, bucket_info, NULL, s->yield, NULL);
393 if (op_ret < 0) {
b3b6e05e 394 ldpp_dout(this, 5) << "could not get bucket info for bucket=" << bucket_name << dendl;
7c673cae
FG
395 return;
396 }
397 }
398
399 bool truncated;
400 unsigned count = 0;
401 string err;
402
403 max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
404 if (!err.empty())
405 max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
406
407 send_response();
408 do {
409 list<rgw_bi_log_entry> entries;
b3b6e05e 410 int ret = store->svc()->bilog_rados->log_list(s, bucket_info, shard_id,
9f95a23c
TL
411 marker, max_entries - count,
412 entries, &truncated);
7c673cae 413 if (ret < 0) {
b3b6e05e 414 ldpp_dout(this, 5) << "ERROR: list_bi_log_entries()" << dendl;
7c673cae
FG
415 return;
416 }
417
418 count += entries.size();
419
420 send_response(entries, marker);
421 } while (truncated && count < max_entries);
422
423 send_response_end();
424}
425
426void RGWOp_BILog_List::send_response() {
427 if (sent_header)
428 return;
429
f67539c2 430 set_req_state_err(s, op_ret);
7c673cae
FG
431 dump_errno(s);
432 end_header(s);
433
434 sent_header = true;
435
f67539c2 436 if (op_ret < 0)
7c673cae
FG
437 return;
438
439 s->formatter->open_array_section("entries");
440}
441
442void RGWOp_BILog_List::send_response(list<rgw_bi_log_entry>& entries, string& marker)
443{
444 for (list<rgw_bi_log_entry>::iterator iter = entries.begin(); iter != entries.end(); ++iter) {
445 rgw_bi_log_entry& entry = *iter;
446 encode_json("entry", entry, s->formatter);
447
448 marker = entry.id;
449 flusher.flush();
450 }
451}
452
453void RGWOp_BILog_List::send_response_end() {
454 s->formatter->close_section();
455 flusher.flush();
456}
457
f67539c2 458void RGWOp_BILog_Info::execute(optional_yield y) {
7c673cae
FG
459 string tenant_name = s->info.args.get("tenant"),
460 bucket_name = s->info.args.get("bucket"),
461 bucket_instance = s->info.args.get("bucket-instance");
462 RGWBucketInfo bucket_info;
463
7c673cae 464 if (bucket_name.empty() && bucket_instance.empty()) {
b3b6e05e 465 ldpp_dout(this, 5) << "ERROR: neither bucket nor bucket instance specified" << dendl;
f67539c2 466 op_ret = -EINVAL;
7c673cae
FG
467 return;
468 }
469
470 int shard_id;
9f95a23c 471 string bn;
f67539c2
TL
472 op_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bn, &bucket_instance, &shard_id);
473 if (op_ret < 0) {
7c673cae
FG
474 return;
475 }
476
477 if (!bucket_instance.empty()) {
9f95a23c 478 rgw_bucket b(rgw_bucket_key(tenant_name, bn, bucket_instance));
b3b6e05e 479 op_ret = store->getRados()->get_bucket_instance_info(*s->sysobj_ctx, b, bucket_info, NULL, NULL, s->yield, this);
f67539c2 480 if (op_ret < 0) {
b3b6e05e 481 ldpp_dout(this, 5) << "could not get bucket instance info for bucket instance id=" << bucket_instance << dendl;
7c673cae
FG
482 return;
483 }
484 } else { /* !bucket_name.empty() */
f67539c2
TL
485 op_ret = store->getRados()->get_bucket_info(store->svc(), tenant_name, bucket_name, bucket_info, NULL, s->yield, NULL);
486 if (op_ret < 0) {
b3b6e05e 487 ldpp_dout(this, 5) << "could not get bucket info for bucket=" << bucket_name << dendl;
7c673cae
FG
488 return;
489 }
490 }
491 map<RGWObjCategory, RGWStorageStats> stats;
b3b6e05e 492 int ret = store->getRados()->get_bucket_stats(s, bucket_info, shard_id, &bucket_ver, &master_ver, stats, &max_marker, &syncstopped);
7c673cae 493 if (ret < 0 && ret != -ENOENT) {
f67539c2 494 op_ret = ret;
7c673cae
FG
495 return;
496 }
497}
498
499void RGWOp_BILog_Info::send_response() {
f67539c2 500 set_req_state_err(s, op_ret);
7c673cae
FG
501 dump_errno(s);
502 end_header(s);
503
f67539c2 504 if (op_ret < 0)
7c673cae
FG
505 return;
506
507 s->formatter->open_object_section("info");
508 encode_json("bucket_ver", bucket_ver, s->formatter);
509 encode_json("master_ver", master_ver, s->formatter);
510 encode_json("max_marker", max_marker, s->formatter);
c07f9fc5 511 encode_json("syncstopped", syncstopped, s->formatter);
7c673cae
FG
512 s->formatter->close_section();
513
514 flusher.flush();
515}
516
f67539c2 517void RGWOp_BILog_Delete::execute(optional_yield y) {
7c673cae
FG
518 string tenant_name = s->info.args.get("tenant"),
519 bucket_name = s->info.args.get("bucket"),
520 start_marker = s->info.args.get("start-marker"),
521 end_marker = s->info.args.get("end-marker"),
522 bucket_instance = s->info.args.get("bucket-instance");
523
524 RGWBucketInfo bucket_info;
525
f67539c2 526 op_ret = 0;
7c673cae
FG
527 if ((bucket_name.empty() && bucket_instance.empty()) ||
528 end_marker.empty()) {
b3b6e05e 529 ldpp_dout(this, 5) << "ERROR: one of bucket and bucket instance, and also end-marker is mandatory" << dendl;
f67539c2 530 op_ret = -EINVAL;
7c673cae
FG
531 return;
532 }
533
534 int shard_id;
9f95a23c 535 string bn;
f67539c2
TL
536 op_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bn, &bucket_instance, &shard_id);
537 if (op_ret < 0) {
7c673cae
FG
538 return;
539 }
540
541 if (!bucket_instance.empty()) {
9f95a23c 542 rgw_bucket b(rgw_bucket_key(tenant_name, bn, bucket_instance));
b3b6e05e 543 op_ret = store->getRados()->get_bucket_instance_info(*s->sysobj_ctx, b, bucket_info, NULL, NULL, s->yield, this);
f67539c2 544 if (op_ret < 0) {
b3b6e05e 545 ldpp_dout(this, 5) << "could not get bucket instance info for bucket instance id=" << bucket_instance << dendl;
7c673cae
FG
546 return;
547 }
548 } else { /* !bucket_name.empty() */
f67539c2
TL
549 op_ret = store->getRados()->get_bucket_info(store->svc(), tenant_name, bucket_name, bucket_info, NULL, s->yield, NULL);
550 if (op_ret < 0) {
b3b6e05e 551 ldpp_dout(this, 5) << "could not get bucket info for bucket=" << bucket_name << dendl;
7c673cae
FG
552 return;
553 }
554 }
b3b6e05e 555 op_ret = store->svc()->bilog_rados->log_trim(s, bucket_info, shard_id, start_marker, end_marker);
f67539c2 556 if (op_ret < 0) {
b3b6e05e 557 ldpp_dout(this, 5) << "ERROR: trim_bi_log_entries() " << dendl;
7c673cae
FG
558 }
559 return;
560}
561
f67539c2 562void RGWOp_DATALog_List::execute(optional_yield y) {
7c673cae
FG
563 string shard = s->info.args.get("id");
564
f67539c2 565 string max_entries_str = s->info.args.get("max-entries"),
7c673cae
FG
566 marker = s->info.args.get("marker"),
567 err;
7c673cae
FG
568 unsigned shard_id, max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
569
f67539c2
TL
570 if (s->info.args.exists("start-time") ||
571 s->info.args.exists("end-time")) {
b3b6e05e 572 ldpp_dout(this, 5) << "start-time and end-time are no longer accepted" << dendl;
f67539c2
TL
573 op_ret = -EINVAL;
574 }
575
7c673cae
FG
576 s->info.args.get_bool("extra-info", &extra_info, false);
577
578 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
579 if (!err.empty()) {
b3b6e05e 580 ldpp_dout(this, 5) << "Error parsing shard_id " << shard << dendl;
f67539c2 581 op_ret = -EINVAL;
7c673cae
FG
582 return;
583 }
584
585 if (!max_entries_str.empty()) {
586 max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
587 if (!err.empty()) {
b3b6e05e 588 ldpp_dout(this, 5) << "Error parsing max-entries " << max_entries_str << dendl;
f67539c2 589 op_ret = -EINVAL;
7c673cae
FG
590 return;
591 }
224ce89b
WB
592 if (max_entries > LOG_CLASS_LIST_MAX_ENTRIES) {
593 max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
594 }
595 }
596
597 // Note that last_marker is updated to be the marker of the last
598 // entry listed
b3b6e05e 599 op_ret = store->svc()->datalog_rados->list_entries(this, shard_id,
f67539c2
TL
600 max_entries, entries,
601 marker, &last_marker,
602 &truncated);
7c673cae
FG
603}
604
605void RGWOp_DATALog_List::send_response() {
f67539c2 606 set_req_state_err(s, op_ret);
7c673cae
FG
607 dump_errno(s);
608 end_header(s);
609
f67539c2 610 if (op_ret < 0)
7c673cae
FG
611 return;
612
613 s->formatter->open_object_section("log_entries");
614 s->formatter->dump_string("marker", last_marker);
615 s->formatter->dump_bool("truncated", truncated);
616 {
617 s->formatter->open_array_section("entries");
f67539c2 618 for (const auto& entry : entries) {
7c673cae
FG
619 if (!extra_info) {
620 encode_json("entry", entry.entry, s->formatter);
621 } else {
622 encode_json("entry", entry, s->formatter);
623 }
624 flusher.flush();
625 }
626 s->formatter->close_section();
627 }
628 s->formatter->close_section();
629 flusher.flush();
630}
631
632
f67539c2 633void RGWOp_DATALog_Info::execute(optional_yield y) {
7c673cae 634 num_objects = s->cct->_conf->rgw_data_log_num_shards;
f67539c2 635 op_ret = 0;
7c673cae
FG
636}
637
638void RGWOp_DATALog_Info::send_response() {
f67539c2 639 set_req_state_err(s, op_ret);
7c673cae
FG
640 dump_errno(s);
641 end_header(s);
642
643 s->formatter->open_object_section("num_objects");
644 s->formatter->dump_unsigned("num_objects", num_objects);
645 s->formatter->close_section();
646 flusher.flush();
647}
648
f67539c2 649void RGWOp_DATALog_ShardInfo::execute(optional_yield y) {
7c673cae
FG
650 string shard = s->info.args.get("id");
651 string err;
652
653 unsigned shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
654 if (!err.empty()) {
b3b6e05e 655 ldpp_dout(this, 5) << "Error parsing shard_id " << shard << dendl;
f67539c2 656 op_ret = -EINVAL;
7c673cae
FG
657 return;
658 }
659
b3b6e05e 660 op_ret = store->svc()->datalog_rados->get_info(this, shard_id, &info);
7c673cae
FG
661}
662
663void RGWOp_DATALog_ShardInfo::send_response() {
f67539c2 664 set_req_state_err(s, op_ret);
7c673cae
FG
665 dump_errno(s);
666 end_header(s);
667
668 encode_json("info", info, s->formatter);
669 flusher.flush();
670}
671
f67539c2 672void RGWOp_DATALog_Notify::execute(optional_yield y) {
7c673cae 673 string source_zone = s->info.args.get("source-zone");
7c673cae 674#define LARGE_ENOUGH_BUF (128 * 1024)
11fdf7f2
TL
675
676 int r = 0;
677 bufferlist data;
678 std::tie(r, data) = rgw_rest_read_all_input(s, LARGE_ENOUGH_BUF);
7c673cae 679 if (r < 0) {
f67539c2 680 op_ret = r;
7c673cae
FG
681 return;
682 }
683
11fdf7f2 684 char* buf = data.c_str();
b3b6e05e 685 ldpp_dout(this, 20) << __func__ << "(): read data: " << buf << dendl;
7c673cae
FG
686
687 JSONParser p;
11fdf7f2 688 r = p.parse(buf, data.length());
7c673cae 689 if (r < 0) {
b3b6e05e 690 ldpp_dout(this, 0) << "ERROR: failed to parse JSON" << dendl;
f67539c2 691 op_ret = r;
7c673cae
FG
692 return;
693 }
694
695 map<int, set<string> > updated_shards;
696 try {
697 decode_json_obj(updated_shards, &p);
698 } catch (JSONDecoder::err& err) {
b3b6e05e 699 ldpp_dout(this, 0) << "ERROR: failed to decode JSON" << dendl;
f67539c2 700 op_ret = -EINVAL;
7c673cae
FG
701 return;
702 }
703
11fdf7f2 704 if (store->ctx()->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) {
7c673cae 705 for (map<int, set<string> >::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) {
b3b6e05e 706 ldpp_dout(this, 20) << __func__ << "(): updated shard=" << iter->first << dendl;
7c673cae
FG
707 set<string>& keys = iter->second;
708 for (set<string>::iterator kiter = keys.begin(); kiter != keys.end(); ++kiter) {
b3b6e05e 709 ldpp_dout(this, 20) << __func__ << "(): modified key=" << *kiter << dendl;
7c673cae
FG
710 }
711 }
712 }
713
9f95a23c 714 store->getRados()->wakeup_data_sync_shards(source_zone, updated_shards);
7c673cae 715
f67539c2 716 op_ret = 0;
7c673cae
FG
717}
718
f67539c2
TL
719void RGWOp_DATALog_Delete::execute(optional_yield y) {
720 string marker = s->info.args.get("marker"),
7c673cae
FG
721 shard = s->info.args.get("id"),
722 err;
7c673cae
FG
723 unsigned shard_id;
724
f67539c2 725 op_ret = 0;
7c673cae 726
f67539c2
TL
727 if (s->info.args.exists("start-time") ||
728 s->info.args.exists("end-time")) {
b3b6e05e 729 ldpp_dout(this, 5) << "start-time and end-time are no longer accepted" << dendl;
f67539c2 730 op_ret = -EINVAL;
7c673cae 731 }
f67539c2
TL
732
733 if (s->info.args.exists("start-marker")) {
b3b6e05e 734 ldpp_dout(this, 5) << "start-marker is no longer accepted" << dendl;
f67539c2 735 op_ret = -EINVAL;
7c673cae
FG
736 }
737
f67539c2
TL
738 if (s->info.args.exists("end-marker")) {
739 if (!s->info.args.exists("marker")) {
740 marker = s->info.args.get("end-marker");
741 } else {
b3b6e05e 742 ldpp_dout(this, 5) << "end-marker and marker cannot both be provided" << dendl;
f67539c2
TL
743 op_ret = -EINVAL;
744 }
7c673cae
FG
745 }
746
f67539c2
TL
747 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
748 if (!err.empty()) {
b3b6e05e 749 ldpp_dout(this, 5) << "Error parsing shard_id " << shard << dendl;
f67539c2
TL
750 op_ret = -EINVAL;
751 return;
752 }
753 if (marker.empty()) { /* bounding end */
754 op_ret = -EINVAL;
7c673cae
FG
755 return;
756 }
757
b3b6e05e 758 op_ret = store->svc()->datalog_rados->trim_entries(this, shard_id, marker);
7c673cae
FG
759}
760
761// not in header to avoid pulling in rgw_sync.h
762class RGWOp_MDLog_Status : public RGWRESTOp {
763 rgw_meta_sync_status status;
764public:
9f95a23c 765 int check_caps(const RGWUserCaps& caps) override {
7c673cae
FG
766 return caps.check_cap("mdlog", RGW_CAP_READ);
767 }
f67539c2 768 int verify_permission(optional_yield) override {
9f95a23c 769 return check_caps(s->user->get_caps());
7c673cae 770 }
f67539c2 771 void execute(optional_yield y) override;
7c673cae 772 void send_response() override;
11fdf7f2 773 const char* name() const override { return "get_metadata_log_status"; }
7c673cae
FG
774};
775
f67539c2 776void RGWOp_MDLog_Status::execute(optional_yield y)
7c673cae 777{
9f95a23c 778 auto sync = store->getRados()->get_meta_sync_manager();
7c673cae 779 if (sync == nullptr) {
b3b6e05e 780 ldpp_dout(this, 1) << "no sync manager" << dendl;
f67539c2 781 op_ret = -ENOENT;
7c673cae
FG
782 return;
783 }
b3b6e05e 784 op_ret = sync->read_sync_status(this, &status);
7c673cae
FG
785}
786
787void RGWOp_MDLog_Status::send_response()
788{
f67539c2 789 set_req_state_err(s, op_ret);
7c673cae
FG
790 dump_errno(s);
791 end_header(s);
792
f67539c2 793 if (op_ret >= 0) {
7c673cae
FG
794 encode_json("status", status, s->formatter);
795 }
796 flusher.flush();
797}
798
b32b8144
FG
799// not in header to avoid pulling in rgw_data_sync.h
800class RGWOp_BILog_Status : public RGWRESTOp {
801 std::vector<rgw_bucket_shard_sync_info> status;
802public:
9f95a23c 803 int check_caps(const RGWUserCaps& caps) override {
b32b8144
FG
804 return caps.check_cap("bilog", RGW_CAP_READ);
805 }
f67539c2 806 int verify_permission(optional_yield y) override {
9f95a23c 807 return check_caps(s->user->get_caps());
b32b8144 808 }
f67539c2 809 void execute(optional_yield y) override;
b32b8144 810 void send_response() override;
11fdf7f2 811 const char* name() const override { return "get_bucket_index_log_status"; }
b32b8144
FG
812};
813
f67539c2 814void RGWOp_BILog_Status::execute(optional_yield y)
b32b8144 815{
9f95a23c
TL
816 const auto options = s->info.args.get("options");
817 bool merge = (options == "merge");
b32b8144 818 const auto source_zone = s->info.args.get("source-zone");
9f95a23c
TL
819 const auto source_key = s->info.args.get("source-bucket");
820 auto key = s->info.args.get("bucket");
821 if (key.empty()) {
822 key = source_key;
823 }
b32b8144 824 if (key.empty()) {
b3b6e05e 825 ldpp_dout(this, 4) << "no 'bucket' provided" << dendl;
f67539c2 826 op_ret = -EINVAL;
b32b8144
FG
827 return;
828 }
829
830 rgw_bucket bucket;
831 int shard_id{-1}; // unused
f67539c2
TL
832 op_ret = rgw_bucket_parse_bucket_key(s->cct, key, &bucket, &shard_id);
833 if (op_ret < 0) {
b3b6e05e 834 ldpp_dout(this, 4) << "invalid 'bucket' provided" << dendl;
f67539c2 835 op_ret = -EINVAL;
b32b8144
FG
836 return;
837 }
838
28e407b8 839 // read the bucket instance info for num_shards
9f95a23c 840 auto ctx = store->svc()->sysobj->init_obj_ctx();
28e407b8 841 RGWBucketInfo info;
b3b6e05e 842 op_ret = store->getRados()->get_bucket_instance_info(ctx, bucket, info, nullptr, nullptr, s->yield, this);
f67539c2 843 if (op_ret < 0) {
b3b6e05e 844 ldpp_dout(this, 4) << "failed to read bucket info: " << cpp_strerror(op_ret) << dendl;
28e407b8
AA
845 return;
846 }
9f95a23c
TL
847
848 rgw_bucket source_bucket;
849
850 if (source_key.empty() ||
851 source_key == key) {
852 source_bucket = info.bucket;
853 } else {
f67539c2
TL
854 op_ret = rgw_bucket_parse_bucket_key(s->cct, source_key, &source_bucket, nullptr);
855 if (op_ret < 0) {
b3b6e05e 856 ldpp_dout(this, 4) << "invalid 'source-bucket' provided (key=" << source_key << ")" << dendl;
9f95a23c
TL
857 return;
858 }
859 }
860
861 const auto& local_zone_id = store->svc()->zone->zone_id();
862
863 if (!merge) {
864 rgw_sync_bucket_pipe pipe;
865 pipe.source.zone = source_zone;
866 pipe.source.bucket = source_bucket;
867 pipe.dest.zone = local_zone_id;
868 pipe.dest.bucket = info.bucket;
869
b3b6e05e 870 ldpp_dout(this, 20) << "RGWOp_BILog_Status::execute(optional_yield y): getting sync status for pipe=" << pipe << dendl;
9f95a23c 871
f67539c2 872 op_ret = rgw_bucket_sync_status(this, store, pipe, info, nullptr, &status);
9f95a23c 873
f67539c2 874 if (op_ret < 0) {
b3b6e05e 875 ldpp_dout(this, -1) << "ERROR: rgw_bucket_sync_status() on pipe=" << pipe << " returned ret=" << op_ret << dendl;
9f95a23c
TL
876 }
877 return;
878 }
879
880 rgw_zone_id source_zone_id(source_zone);
881
882 RGWBucketSyncPolicyHandlerRef source_handler;
b3b6e05e 883 op_ret = store->ctl()->bucket->get_sync_policy_handler(source_zone_id, source_bucket, &source_handler, y, s);
f67539c2 884 if (op_ret < 0) {
b3b6e05e 885 ldpp_dout(this, -1) << "could not get bucket sync policy handler (r=" << op_ret << ")" << dendl;
9f95a23c
TL
886 return;
887 }
888
889 auto local_dests = source_handler->get_all_dests_in_zone(local_zone_id);
890
891 std::vector<rgw_bucket_shard_sync_info> current_status;
892 for (auto& entry : local_dests) {
893 auto pipe = entry.second;
894
b3b6e05e 895 ldpp_dout(this, 20) << "RGWOp_BILog_Status::execute(optional_yield y): getting sync status for pipe=" << pipe << dendl;
9f95a23c
TL
896
897 RGWBucketInfo *pinfo = &info;
898 std::optional<RGWBucketInfo> opt_dest_info;
899
900 if (!pipe.dest.bucket) {
901 /* Uh oh, something went wrong */
b3b6e05e 902 ldpp_dout(this, 20) << "ERROR: RGWOp_BILog_Status::execute(optional_yield y): BUG: pipe.dest.bucket was not initialized" << pipe << dendl;
f67539c2 903 op_ret = -EIO;
9f95a23c
TL
904 return;
905 }
906
907 if (*pipe.dest.bucket != info.bucket) {
908 opt_dest_info.emplace();
909 pinfo = &(*opt_dest_info);
910
911 /* dest bucket might not have a bucket id */
f67539c2 912 op_ret = store->ctl()->bucket->read_bucket_info(*pipe.dest.bucket,
9f95a23c
TL
913 pinfo,
914 s->yield,
b3b6e05e 915 s,
9f95a23c
TL
916 RGWBucketCtl::BucketInstance::GetParams(),
917 nullptr);
f67539c2 918 if (op_ret < 0) {
b3b6e05e 919 ldpp_dout(this, 4) << "failed to read target bucket info (bucket=: " << cpp_strerror(op_ret) << dendl;
9f95a23c
TL
920 return;
921 }
922
923 pipe.dest.bucket = pinfo->bucket;
924 }
925
926 int r = rgw_bucket_sync_status(this, store, pipe, *pinfo, &info, &current_status);
927 if (r < 0) {
b3b6e05e 928 ldpp_dout(this, -1) << "ERROR: rgw_bucket_sync_status() on pipe=" << pipe << " returned ret=" << r << dendl;
f67539c2 929 op_ret = r;
9f95a23c
TL
930 return;
931 }
932
933 if (status.empty()) {
934 status = std::move(current_status);
935 } else {
936 if (current_status.size() !=
937 status.size()) {
f67539c2 938 op_ret = -EINVAL;
b3b6e05e 939 ldpp_dout(this, -1) << "ERROR: different number of shards for sync status of buckets syncing from the same source: status.size()= " << status.size() << " current_status.size()=" << current_status.size() << dendl;
9f95a23c
TL
940 return;
941 }
942 auto m = status.begin();
943 for (auto& cur_shard_status : current_status) {
944 auto& result_shard_status = *m++;
945 // always take the first marker, or any later marker that's smaller
946 if (cur_shard_status.inc_marker.position < result_shard_status.inc_marker.position) {
947 result_shard_status = std::move(cur_shard_status);
948 }
949 }
950 }
951 }
b32b8144
FG
952}
953
954void RGWOp_BILog_Status::send_response()
955{
f67539c2 956 set_req_state_err(s, op_ret);
b32b8144
FG
957 dump_errno(s);
958 end_header(s);
959
f67539c2 960 if (op_ret >= 0) {
b32b8144
FG
961 encode_json("status", status, s->formatter);
962 }
963 flusher.flush();
964}
965
7c673cae
FG
966// not in header to avoid pulling in rgw_data_sync.h
967class RGWOp_DATALog_Status : public RGWRESTOp {
968 rgw_data_sync_status status;
969public:
9f95a23c 970 int check_caps(const RGWUserCaps& caps) override {
7c673cae
FG
971 return caps.check_cap("datalog", RGW_CAP_READ);
972 }
f67539c2 973 int verify_permission(optional_yield y) override {
9f95a23c 974 return check_caps(s->user->get_caps());
7c673cae 975 }
f67539c2 976 void execute(optional_yield y) override ;
7c673cae 977 void send_response() override;
11fdf7f2 978 const char* name() const override { return "get_data_changes_log_status"; }
7c673cae
FG
979};
980
f67539c2 981void RGWOp_DATALog_Status::execute(optional_yield y)
7c673cae
FG
982{
983 const auto source_zone = s->info.args.get("source-zone");
9f95a23c 984 auto sync = store->getRados()->get_data_sync_manager(source_zone);
7c673cae 985 if (sync == nullptr) {
b3b6e05e 986 ldpp_dout(this, 1) << "no sync manager for source-zone " << source_zone << dendl;
f67539c2 987 op_ret = -ENOENT;
7c673cae
FG
988 return;
989 }
b3b6e05e 990 op_ret = sync->read_sync_status(this, &status);
7c673cae
FG
991}
992
993void RGWOp_DATALog_Status::send_response()
994{
f67539c2 995 set_req_state_err(s, op_ret);
7c673cae
FG
996 dump_errno(s);
997 end_header(s);
998
f67539c2 999 if (op_ret >= 0) {
7c673cae
FG
1000 encode_json("status", status, s->formatter);
1001 }
1002 flusher.flush();
1003}
1004
1005
1006RGWOp *RGWHandler_Log::op_get() {
1007 bool exists;
1008 string type = s->info.args.get("type", &exists);
1009
1010 if (!exists) {
1011 return NULL;
1012 }
1013
1014 if (type.compare("metadata") == 0) {
1015 if (s->info.args.exists("id")) {
1016 if (s->info.args.exists("info")) {
1017 return new RGWOp_MDLog_ShardInfo;
1018 } else {
1019 return new RGWOp_MDLog_List;
1020 }
1021 } else if (s->info.args.exists("status")) {
1022 return new RGWOp_MDLog_Status;
1023 } else {
1024 return new RGWOp_MDLog_Info;
1025 }
1026 } else if (type.compare("bucket-index") == 0) {
1027 if (s->info.args.exists("info")) {
1028 return new RGWOp_BILog_Info;
b32b8144
FG
1029 } else if (s->info.args.exists("status")) {
1030 return new RGWOp_BILog_Status;
7c673cae
FG
1031 } else {
1032 return new RGWOp_BILog_List;
1033 }
1034 } else if (type.compare("data") == 0) {
1035 if (s->info.args.exists("id")) {
1036 if (s->info.args.exists("info")) {
1037 return new RGWOp_DATALog_ShardInfo;
1038 } else {
1039 return new RGWOp_DATALog_List;
1040 }
1041 } else if (s->info.args.exists("status")) {
1042 return new RGWOp_DATALog_Status;
1043 } else {
1044 return new RGWOp_DATALog_Info;
1045 }
1046 }
1047 return NULL;
1048}
1049
1050RGWOp *RGWHandler_Log::op_delete() {
1051 bool exists;
1052 string type = s->info.args.get("type", &exists);
1053
1054 if (!exists) {
1055 return NULL;
1056 }
1057
1058 if (type.compare("metadata") == 0)
1059 return new RGWOp_MDLog_Delete;
1060 else if (type.compare("bucket-index") == 0)
1061 return new RGWOp_BILog_Delete;
1062 else if (type.compare("data") == 0)
1063 return new RGWOp_DATALog_Delete;
1064 return NULL;
1065}
1066
1067RGWOp *RGWHandler_Log::op_post() {
1068 bool exists;
1069 string type = s->info.args.get("type", &exists);
1070
1071 if (!exists) {
1072 return NULL;
1073 }
1074
1075 if (type.compare("metadata") == 0) {
1076 if (s->info.args.exists("lock"))
1077 return new RGWOp_MDLog_Lock;
1078 else if (s->info.args.exists("unlock"))
1079 return new RGWOp_MDLog_Unlock;
1080 else if (s->info.args.exists("notify"))
1081 return new RGWOp_MDLog_Notify;
1082 } else if (type.compare("data") == 0) {
9f95a23c 1083 if (s->info.args.exists("notify"))
7c673cae
FG
1084 return new RGWOp_DATALog_Notify;
1085 }
1086 return NULL;
1087}
1088