]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/driver/rados/rgw_rest_log.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / driver / rados / rgw_rest_log.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 /*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #include "common/ceph_json.h"
17 #include "common/strtol.h"
18 #include "rgw_rest.h"
19 #include "rgw_op.h"
20 #include "rgw_rest_s3.h"
21 #include "rgw_rest_log.h"
22 #include "rgw_client_io.h"
23 #include "rgw_sync.h"
24 #include "rgw_data_sync.h"
25 #include "rgw_common.h"
26 #include "rgw_zone.h"
27 #include "rgw_mdlog.h"
28 #include "rgw_datalog_notify.h"
29 #include "rgw_trim_bilog.h"
30
31 #include "services/svc_zone.h"
32 #include "services/svc_mdlog.h"
33 #include "services/svc_bilog_rados.h"
34
35 #include "common/errno.h"
36 #include "include/ceph_assert.h"
37
38 #define dout_context g_ceph_context
39 #define LOG_CLASS_LIST_MAX_ENTRIES (1000)
40 #define dout_subsys ceph_subsys_rgw
41
42 using namespace std;
43
44 void RGWOp_MDLog_List::execute(optional_yield y) {
45 string period = s->info.args.get("period");
46 string shard = s->info.args.get("id");
47 string max_entries_str = s->info.args.get("max-entries");
48 string marker = s->info.args.get("marker"),
49 err;
50 void *handle;
51 unsigned shard_id, max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
52
53 if (s->info.args.exists("start-time") ||
54 s->info.args.exists("end-time")) {
55 ldpp_dout(this, 5) << "start-time and end-time are no longer accepted" << dendl;
56 op_ret = -EINVAL;
57 return;
58 }
59
60 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
61 if (!err.empty()) {
62 ldpp_dout(this, 5) << "Error parsing shard_id " << shard << dendl;
63 op_ret = -EINVAL;
64 return;
65 }
66
67 if (!max_entries_str.empty()) {
68 max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
69 if (!err.empty()) {
70 ldpp_dout(this, 5) << "Error parsing max-entries " << max_entries_str << dendl;
71 op_ret = -EINVAL;
72 return;
73 }
74 if (max_entries > LOG_CLASS_LIST_MAX_ENTRIES) {
75 max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
76 }
77 }
78
79 if (period.empty()) {
80 ldpp_dout(this, 5) << "Missing period id trying to use current" << dendl;
81 period = driver->get_zone()->get_current_period_id();
82 if (period.empty()) {
83 ldpp_dout(this, 5) << "Missing period id" << dendl;
84 op_ret = -EINVAL;
85 return;
86 }
87 }
88
89 RGWMetadataLog meta_log{s->cct, static_cast<rgw::sal::RadosStore*>(driver)->svc()->zone, static_cast<rgw::sal::RadosStore*>(driver)->svc()->cls, period};
90
91 meta_log.init_list_entries(shard_id, {}, {}, marker, &handle);
92
93 op_ret = meta_log.list_entries(this, handle, max_entries, entries,
94 &last_marker, &truncated);
95
96 meta_log.complete_list_entries(handle);
97 }
98
99 void RGWOp_MDLog_List::send_response() {
100 set_req_state_err(s, op_ret);
101 dump_errno(s);
102 end_header(s);
103
104 if (op_ret < 0)
105 return;
106
107 s->formatter->open_object_section("log_entries");
108 s->formatter->dump_string("marker", last_marker);
109 s->formatter->dump_bool("truncated", truncated);
110 {
111 s->formatter->open_array_section("entries");
112 for (list<cls_log_entry>::iterator iter = entries.begin();
113 iter != entries.end(); ++iter) {
114 cls_log_entry& entry = *iter;
115 static_cast<rgw::sal::RadosStore*>(driver)->ctl()->meta.mgr->dump_log_entry(entry, s->formatter);
116 flusher.flush();
117 }
118 s->formatter->close_section();
119 }
120 s->formatter->close_section();
121 flusher.flush();
122 }
123
124 void RGWOp_MDLog_Info::execute(optional_yield y) {
125 num_objects = s->cct->_conf->rgw_md_log_max_shards;
126 period = static_cast<rgw::sal::RadosStore*>(driver)->svc()->mdlog->read_oldest_log_period(y, s);
127 op_ret = period.get_error();
128 }
129
130 void RGWOp_MDLog_Info::send_response() {
131 set_req_state_err(s, op_ret);
132 dump_errno(s);
133 end_header(s);
134
135 s->formatter->open_object_section("mdlog");
136 s->formatter->dump_unsigned("num_objects", num_objects);
137 if (period) {
138 s->formatter->dump_string("period", period.get_period().get_id());
139 s->formatter->dump_unsigned("realm_epoch", period.get_epoch());
140 }
141 s->formatter->close_section();
142 flusher.flush();
143 }
144
145 void RGWOp_MDLog_ShardInfo::execute(optional_yield y) {
146 string period = s->info.args.get("period");
147 string shard = s->info.args.get("id");
148 string err;
149
150 unsigned shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
151 if (!err.empty()) {
152 ldpp_dout(this, 5) << "Error parsing shard_id " << shard << dendl;
153 op_ret = -EINVAL;
154 return;
155 }
156
157 if (period.empty()) {
158 ldpp_dout(this, 5) << "Missing period id trying to use current" << dendl;
159 period = driver->get_zone()->get_current_period_id();
160
161 if (period.empty()) {
162 ldpp_dout(this, 5) << "Missing period id" << dendl;
163 op_ret = -EINVAL;
164 return;
165 }
166 }
167 RGWMetadataLog meta_log{s->cct, static_cast<rgw::sal::RadosStore*>(driver)->svc()->zone, static_cast<rgw::sal::RadosStore*>(driver)->svc()->cls, period};
168
169 op_ret = meta_log.get_info(this, shard_id, &info);
170 }
171
172 void RGWOp_MDLog_ShardInfo::send_response() {
173 set_req_state_err(s, op_ret);
174 dump_errno(s);
175 end_header(s);
176
177 encode_json("info", info, s->formatter);
178 flusher.flush();
179 }
180
181 void RGWOp_MDLog_Delete::execute(optional_yield y) {
182 string marker = s->info.args.get("marker"),
183 period = s->info.args.get("period"),
184 shard = s->info.args.get("id"),
185 err;
186 unsigned shard_id;
187
188
189 if (s->info.args.exists("start-time") ||
190 s->info.args.exists("end-time")) {
191 ldpp_dout(this, 5) << "start-time and end-time are no longer accepted" << dendl;
192 op_ret = -EINVAL;
193 }
194
195 if (s->info.args.exists("start-marker")) {
196 ldpp_dout(this, 5) << "start-marker is no longer accepted" << dendl;
197 op_ret = -EINVAL;
198 }
199
200 if (s->info.args.exists("end-marker")) {
201 if (!s->info.args.exists("marker")) {
202 marker = s->info.args.get("end-marker");
203 } else {
204 ldpp_dout(this, 5) << "end-marker and marker cannot both be provided" << dendl;
205 op_ret = -EINVAL;
206 }
207 }
208
209 op_ret = 0;
210
211 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
212 if (!err.empty()) {
213 ldpp_dout(this, 5) << "Error parsing shard_id " << shard << dendl;
214 op_ret = -EINVAL;
215 return;
216 }
217
218 if (marker.empty()) { /* bounding end */
219 op_ret = -EINVAL;
220 return;
221 }
222
223 if (period.empty()) {
224 ldpp_dout(this, 5) << "Missing period id trying to use current" << dendl;
225 period = driver->get_zone()->get_current_period_id();
226
227 if (period.empty()) {
228 ldpp_dout(this, 5) << "Missing period id" << dendl;
229 op_ret = -EINVAL;
230 return;
231 }
232 }
233 RGWMetadataLog meta_log{s->cct, static_cast<rgw::sal::RadosStore*>(driver)->svc()->zone, static_cast<rgw::sal::RadosStore*>(driver)->svc()->cls, period};
234
235 op_ret = meta_log.trim(this, shard_id, {}, {}, {}, marker);
236 }
237
238 void RGWOp_MDLog_Lock::execute(optional_yield y) {
239 string period, shard_id_str, duration_str, locker_id, zone_id;
240 unsigned shard_id;
241
242 op_ret = 0;
243
244 period = s->info.args.get("period");
245 shard_id_str = s->info.args.get("id");
246 duration_str = s->info.args.get("length");
247 locker_id = s->info.args.get("locker-id");
248 zone_id = s->info.args.get("zone-id");
249
250 if (period.empty()) {
251 ldpp_dout(this, 5) << "Missing period id trying to use current" << dendl;
252 period = driver->get_zone()->get_current_period_id();
253 }
254
255 if (period.empty() ||
256 shard_id_str.empty() ||
257 (duration_str.empty()) ||
258 locker_id.empty() ||
259 zone_id.empty()) {
260 ldpp_dout(this, 5) << "Error invalid parameter list" << dendl;
261 op_ret = -EINVAL;
262 return;
263 }
264
265 string err;
266 shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
267 if (!err.empty()) {
268 ldpp_dout(this, 5) << "Error parsing shard_id param " << shard_id_str << dendl;
269 op_ret = -EINVAL;
270 return;
271 }
272
273 RGWMetadataLog meta_log{s->cct, static_cast<rgw::sal::RadosStore*>(driver)->svc()->zone, static_cast<rgw::sal::RadosStore*>(driver)->svc()->cls, period};
274 unsigned dur;
275 dur = (unsigned)strict_strtol(duration_str.c_str(), 10, &err);
276 if (!err.empty() || dur <= 0) {
277 ldpp_dout(this, 5) << "invalid length param " << duration_str << dendl;
278 op_ret = -EINVAL;
279 return;
280 }
281 op_ret = meta_log.lock_exclusive(s, shard_id, make_timespan(dur), zone_id,
282 locker_id);
283 if (op_ret == -EBUSY)
284 op_ret = -ERR_LOCKED;
285 }
286
287 void RGWOp_MDLog_Unlock::execute(optional_yield y) {
288 string period, shard_id_str, locker_id, zone_id;
289 unsigned shard_id;
290
291 op_ret = 0;
292
293 period = s->info.args.get("period");
294 shard_id_str = s->info.args.get("id");
295 locker_id = s->info.args.get("locker-id");
296 zone_id = s->info.args.get("zone-id");
297
298 if (period.empty()) {
299 ldpp_dout(this, 5) << "Missing period id trying to use current" << dendl;
300 period = driver->get_zone()->get_current_period_id();
301 }
302
303 if (period.empty() ||
304 shard_id_str.empty() ||
305 locker_id.empty() ||
306 zone_id.empty()) {
307 ldpp_dout(this, 5) << "Error invalid parameter list" << dendl;
308 op_ret = -EINVAL;
309 return;
310 }
311
312 string err;
313 shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
314 if (!err.empty()) {
315 ldpp_dout(this, 5) << "Error parsing shard_id param " << shard_id_str << dendl;
316 op_ret = -EINVAL;
317 return;
318 }
319
320 RGWMetadataLog meta_log{s->cct, static_cast<rgw::sal::RadosStore*>(driver)->svc()->zone, static_cast<rgw::sal::RadosStore*>(driver)->svc()->cls, period};
321 op_ret = meta_log.unlock(s, shard_id, zone_id, locker_id);
322 }
323
324 void RGWOp_MDLog_Notify::execute(optional_yield y) {
325 #define LARGE_ENOUGH_BUF (128 * 1024)
326
327 int r = 0;
328 bufferlist data;
329 std::tie(r, data) = read_all_input(s, LARGE_ENOUGH_BUF);
330 if (r < 0) {
331 op_ret = r;
332 return;
333 }
334
335 char* buf = data.c_str();
336 ldpp_dout(this, 20) << __func__ << "(): read data: " << buf << dendl;
337
338 JSONParser p;
339 r = p.parse(buf, data.length());
340 if (r < 0) {
341 ldpp_dout(this, 0) << "ERROR: failed to parse JSON" << dendl;
342 op_ret = r;
343 return;
344 }
345
346 set<int> updated_shards;
347 try {
348 decode_json_obj(updated_shards, &p);
349 } catch (JSONDecoder::err& err) {
350 ldpp_dout(this, 0) << "ERROR: failed to decode JSON" << dendl;
351 op_ret = -EINVAL;
352 return;
353 }
354
355 if (driver->ctx()->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) {
356 for (set<int>::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) {
357 ldpp_dout(this, 20) << __func__ << "(): updated shard=" << *iter << dendl;
358 }
359 }
360
361 driver->wakeup_meta_sync_shards(updated_shards);
362
363 op_ret = 0;
364 }
365
366 void RGWOp_BILog_List::execute(optional_yield y) {
367 bool gen_specified = false;
368 string tenant_name = s->info.args.get("tenant"),
369 bucket_name = s->info.args.get("bucket"),
370 marker = s->info.args.get("marker"),
371 max_entries_str = s->info.args.get("max-entries"),
372 bucket_instance = s->info.args.get("bucket-instance"),
373 gen_str = s->info.args.get("generation", &gen_specified),
374 format_version_str = s->info.args.get("format-ver");
375 std::unique_ptr<rgw::sal::Bucket> bucket;
376 rgw_bucket b(rgw_bucket_key(tenant_name, bucket_name));
377
378 unsigned max_entries;
379
380 if (bucket_name.empty() && bucket_instance.empty()) {
381 ldpp_dout(this, 5) << "ERROR: neither bucket nor bucket instance specified" << dendl;
382 op_ret = -EINVAL;
383 return;
384 }
385
386 string err;
387 std::optional<uint64_t> gen;
388 if (gen_specified) {
389 gen = strict_strtoll(gen_str.c_str(), 10, &err);
390 if (!err.empty()) {
391 ldpp_dout(s, 5) << "Error parsing generation param " << gen_str << dendl;
392 op_ret = -EINVAL;
393 return;
394 }
395 }
396
397 if (!format_version_str.empty()) {
398 format_ver = strict_strtoll(format_version_str.c_str(), 10, &err);
399 if (!err.empty()) {
400 ldpp_dout(s, 5) << "Failed to parse format-ver param: " << format_ver << dendl;
401 op_ret = -EINVAL;
402 return;
403 }
404 }
405
406 int shard_id;
407 string bn;
408 op_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bn, &bucket_instance, &shard_id);
409 if (op_ret < 0) {
410 return;
411 }
412
413 if (!bucket_instance.empty()) {
414 b.name = bn;
415 b.bucket_id = bucket_instance;
416 }
417 op_ret = driver->get_bucket(s, nullptr, b, &bucket, y);
418 if (op_ret < 0) {
419 ldpp_dout(this, 5) << "could not get bucket info for bucket=" << bucket_name << dendl;
420 return;
421 }
422
423 const auto& logs = bucket->get_info().layout.logs;
424 if (logs.empty()) {
425 ldpp_dout(s, 5) << "ERROR: bucket=" << bucket_name << " has no log layouts" << dendl;
426 op_ret = -ENOENT;
427 return;
428 }
429
430 auto log = std::prev(logs.end());
431 if (gen) {
432 log = std::find_if(logs.begin(), logs.end(), rgw::matches_gen(*gen));
433 if (log == logs.end()) {
434 ldpp_dout(s, 5) << "ERROR: no log layout with gen=" << *gen << dendl;
435 op_ret = -ENOENT;
436 return;
437 }
438 }
439 if (auto next = std::next(log); next != logs.end()) {
440 next_log_layout = *next; // get the next log after the current latest
441 }
442 auto& log_layout = *log; // current log layout for log listing
443
444 unsigned count = 0;
445
446
447 max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
448 if (!err.empty())
449 max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
450
451 send_response();
452 do {
453 list<rgw_bi_log_entry> entries;
454 int ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->bilog_rados->log_list(s, bucket->get_info(), log_layout, shard_id,
455 marker, max_entries - count,
456 entries, &truncated);
457 if (ret < 0) {
458 ldpp_dout(this, 5) << "ERROR: list_bi_log_entries()" << dendl;
459 return;
460 }
461
462 count += entries.size();
463
464 send_response(entries, marker);
465 } while (truncated && count < max_entries);
466
467 send_response_end();
468 }
469
470 void RGWOp_BILog_List::send_response() {
471 if (sent_header)
472 return;
473
474 set_req_state_err(s, op_ret);
475 dump_errno(s);
476 end_header(s);
477
478 sent_header = true;
479
480 if (op_ret < 0)
481 return;
482
483 if (format_ver >= 2) {
484 s->formatter->open_object_section("result");
485 }
486
487 s->formatter->open_array_section("entries");
488 }
489
490 void RGWOp_BILog_List::send_response(list<rgw_bi_log_entry>& entries, string& marker)
491 {
492 for (list<rgw_bi_log_entry>::iterator iter = entries.begin(); iter != entries.end(); ++iter) {
493 rgw_bi_log_entry& entry = *iter;
494 encode_json("entry", entry, s->formatter);
495
496 marker = entry.id;
497 flusher.flush();
498 }
499 }
500
501 void RGWOp_BILog_List::send_response_end() {
502 s->formatter->close_section();
503
504 if (format_ver >= 2) {
505 encode_json("truncated", truncated, s->formatter);
506
507 if (next_log_layout) {
508 s->formatter->open_object_section("next_log");
509 encode_json("generation", next_log_layout->gen, s->formatter);
510 encode_json("num_shards", rgw::num_shards(next_log_layout->layout.in_index.layout), s->formatter);
511 s->formatter->close_section(); // next_log
512 }
513
514 s->formatter->close_section(); // result
515 }
516
517 flusher.flush();
518 }
519
520 void RGWOp_BILog_Info::execute(optional_yield y) {
521 string tenant_name = s->info.args.get("tenant"),
522 bucket_name = s->info.args.get("bucket"),
523 bucket_instance = s->info.args.get("bucket-instance");
524 std::unique_ptr<rgw::sal::Bucket> bucket;
525 rgw_bucket b(rgw_bucket_key(tenant_name, bucket_name));
526
527 if (bucket_name.empty() && bucket_instance.empty()) {
528 ldpp_dout(this, 5) << "ERROR: neither bucket nor bucket instance specified" << dendl;
529 op_ret = -EINVAL;
530 return;
531 }
532
533 int shard_id;
534 string bn;
535 op_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bn, &bucket_instance, &shard_id);
536 if (op_ret < 0) {
537 return;
538 }
539
540 if (!bucket_instance.empty()) {
541 b.name = bn;
542 b.bucket_id = bucket_instance;
543 }
544 op_ret = driver->get_bucket(s, nullptr, b, &bucket, y);
545 if (op_ret < 0) {
546 ldpp_dout(this, 5) << "could not get bucket info for bucket=" << bucket_name << dendl;
547 return;
548 }
549
550 const auto& logs = bucket->get_info().layout.logs;
551 if (logs.empty()) {
552 ldpp_dout(s, 5) << "ERROR: bucket=" << bucket_name << " has no log layouts" << dendl;
553 op_ret = -ENOENT;
554 return;
555 }
556
557 map<RGWObjCategory, RGWStorageStats> stats;
558 const auto& index = log_to_index_layout(logs.back());
559
560 int ret = bucket->read_stats(s, index, shard_id, &bucket_ver, &master_ver, stats, &max_marker, &syncstopped);
561 if (ret < 0 && ret != -ENOENT) {
562 op_ret = ret;
563 return;
564 }
565
566 oldest_gen = logs.front().gen;
567 latest_gen = logs.back().gen;
568
569 for (auto& log : logs) {
570 uint32_t num_shards = rgw::num_shards(log.layout.in_index.layout);
571 generations.push_back({log.gen, num_shards});
572 }
573 }
574
575 void RGWOp_BILog_Info::send_response() {
576 set_req_state_err(s, op_ret);
577 dump_errno(s);
578 end_header(s);
579
580 if (op_ret < 0)
581 return;
582
583 s->formatter->open_object_section("info");
584 encode_json("bucket_ver", bucket_ver, s->formatter);
585 encode_json("master_ver", master_ver, s->formatter);
586 encode_json("max_marker", max_marker, s->formatter);
587 encode_json("syncstopped", syncstopped, s->formatter);
588 encode_json("oldest_gen", oldest_gen, s->formatter);
589 encode_json("latest_gen", latest_gen, s->formatter);
590 encode_json("generations", generations, s->formatter);
591 s->formatter->close_section();
592
593 flusher.flush();
594 }
595
596 void RGWOp_BILog_Delete::execute(optional_yield y) {
597 bool gen_specified = false;
598 string tenant_name = s->info.args.get("tenant"),
599 bucket_name = s->info.args.get("bucket"),
600 start_marker = s->info.args.get("start-marker"),
601 end_marker = s->info.args.get("end-marker"),
602 bucket_instance = s->info.args.get("bucket-instance"),
603 gen_str = s->info.args.get("generation", &gen_specified);
604
605 std::unique_ptr<rgw::sal::Bucket> bucket;
606 rgw_bucket b(rgw_bucket_key(tenant_name, bucket_name));
607
608 op_ret = 0;
609 if ((bucket_name.empty() && bucket_instance.empty()) ||
610 end_marker.empty()) {
611 ldpp_dout(this, 5) << "ERROR: one of bucket or bucket instance, and also end-marker is mandatory" << dendl;
612 op_ret = -EINVAL;
613 return;
614 }
615
616 string err;
617 uint64_t gen = 0;
618 if (gen_specified) {
619 gen = strict_strtoll(gen_str.c_str(), 10, &err);
620 if (!err.empty()) {
621 ldpp_dout(s, 5) << "Error parsing generation param " << gen_str << dendl;
622 op_ret = -EINVAL;
623 return;
624 }
625 }
626
627 int shard_id;
628 string bn;
629 op_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bn, &bucket_instance, &shard_id);
630 if (op_ret < 0) {
631 return;
632 }
633
634 if (!bucket_instance.empty()) {
635 b.name = bn;
636 b.bucket_id = bucket_instance;
637 }
638 op_ret = driver->get_bucket(s, nullptr, b, &bucket, y);
639 if (op_ret < 0) {
640 ldpp_dout(this, 5) << "could not get bucket info for bucket=" << bucket_name << dendl;
641 return;
642 }
643
644 op_ret = bilog_trim(this, static_cast<rgw::sal::RadosStore*>(driver),
645 bucket->get_info(), gen, shard_id,
646 start_marker, end_marker);
647 if (op_ret < 0) {
648 ldpp_dout(s, 5) << "bilog_trim failed with op_ret=" << op_ret << dendl;
649 }
650
651 return;
652 }
653
654 void RGWOp_DATALog_List::execute(optional_yield y) {
655 string shard = s->info.args.get("id");
656
657 string max_entries_str = s->info.args.get("max-entries"),
658 marker = s->info.args.get("marker"),
659 err;
660 unsigned shard_id, max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
661
662 if (s->info.args.exists("start-time") ||
663 s->info.args.exists("end-time")) {
664 ldpp_dout(this, 5) << "start-time and end-time are no longer accepted" << dendl;
665 op_ret = -EINVAL;
666 }
667
668 s->info.args.get_bool("extra-info", &extra_info, false);
669
670 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
671 if (!err.empty()) {
672 ldpp_dout(this, 5) << "Error parsing shard_id " << shard << dendl;
673 op_ret = -EINVAL;
674 return;
675 }
676
677 if (!max_entries_str.empty()) {
678 max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
679 if (!err.empty()) {
680 ldpp_dout(this, 5) << "Error parsing max-entries " << max_entries_str << dendl;
681 op_ret = -EINVAL;
682 return;
683 }
684 if (max_entries > LOG_CLASS_LIST_MAX_ENTRIES) {
685 max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
686 }
687 }
688
689 // Note that last_marker is updated to be the marker of the last
690 // entry listed
691 op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->
692 datalog_rados->list_entries(this, shard_id, max_entries, entries,
693 marker, &last_marker, &truncated, y);
694 }
695
696 void RGWOp_DATALog_List::send_response() {
697 set_req_state_err(s, op_ret);
698 dump_errno(s);
699 end_header(s);
700
701 if (op_ret < 0)
702 return;
703
704 s->formatter->open_object_section("log_entries");
705 s->formatter->dump_string("marker", last_marker);
706 s->formatter->dump_bool("truncated", truncated);
707 {
708 s->formatter->open_array_section("entries");
709 for (const auto& entry : entries) {
710 if (!extra_info) {
711 encode_json("entry", entry.entry, s->formatter);
712 } else {
713 encode_json("entry", entry, s->formatter);
714 }
715 flusher.flush();
716 }
717 s->formatter->close_section();
718 }
719 s->formatter->close_section();
720 flusher.flush();
721 }
722
723
724 void RGWOp_DATALog_Info::execute(optional_yield y) {
725 num_objects = s->cct->_conf->rgw_data_log_num_shards;
726 op_ret = 0;
727 }
728
729 void RGWOp_DATALog_Info::send_response() {
730 set_req_state_err(s, op_ret);
731 dump_errno(s);
732 end_header(s);
733
734 s->formatter->open_object_section("num_objects");
735 s->formatter->dump_unsigned("num_objects", num_objects);
736 s->formatter->close_section();
737 flusher.flush();
738 }
739
740 void RGWOp_DATALog_ShardInfo::execute(optional_yield y) {
741 string shard = s->info.args.get("id");
742 string err;
743
744 unsigned shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
745 if (!err.empty()) {
746 ldpp_dout(this, 5) << "Error parsing shard_id " << shard << dendl;
747 op_ret = -EINVAL;
748 return;
749 }
750
751 op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->
752 datalog_rados->get_info(this, shard_id, &info, y);
753 }
754
755 void RGWOp_DATALog_ShardInfo::send_response() {
756 set_req_state_err(s, op_ret);
757 dump_errno(s);
758 end_header(s);
759
760 encode_json("info", info, s->formatter);
761 flusher.flush();
762 }
763
764 void RGWOp_DATALog_Notify::execute(optional_yield y) {
765 string source_zone = s->info.args.get("source-zone");
766 #define LARGE_ENOUGH_BUF (128 * 1024)
767
768 int r = 0;
769 bufferlist data;
770 std::tie(r, data) = read_all_input(s, LARGE_ENOUGH_BUF);
771 if (r < 0) {
772 op_ret = r;
773 return;
774 }
775
776 char* buf = data.c_str();
777 ldpp_dout(this, 20) << __func__ << "(): read data: " << buf << dendl;
778
779 JSONParser p;
780 r = p.parse(buf, data.length());
781 if (r < 0) {
782 ldpp_dout(this, 0) << "ERROR: failed to parse JSON" << dendl;
783 op_ret = r;
784 return;
785 }
786
787 bc::flat_map<int, bc::flat_set<rgw_data_notify_entry>> updated_shards;
788 try {
789 auto decoder = rgw_data_notify_v1_decoder{updated_shards};
790 decode_json_obj(decoder, &p);
791 } catch (JSONDecoder::err& err) {
792 ldpp_dout(this, 0) << "ERROR: failed to decode JSON" << dendl;
793 op_ret = -EINVAL;
794 return;
795 }
796
797 if (driver->ctx()->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) {
798 for (bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) {
799 ldpp_dout(this, 20) << __func__ << "(): updated shard=" << iter->first << dendl;
800 bc::flat_set<rgw_data_notify_entry>& entries = iter->second;
801 for (const auto& [key, gen] : entries) {
802 ldpp_dout(this, 20) << __func__ << "(): modified key=" << key
803 << " of gen=" << gen << dendl;
804 }
805 }
806 }
807
808 driver->wakeup_data_sync_shards(this, source_zone, updated_shards);
809
810 op_ret = 0;
811 }
812
813 void RGWOp_DATALog_Notify2::execute(optional_yield y) {
814 string source_zone = s->info.args.get("source-zone");
815 #define LARGE_ENOUGH_BUF (128 * 1024)
816
817 int r = 0;
818 bufferlist data;
819 std::tie(r, data) = rgw_rest_read_all_input(s, LARGE_ENOUGH_BUF);
820 if (r < 0) {
821 op_ret = r;
822 return;
823 }
824
825 char* buf = data.c_str();
826 ldout(s->cct, 20) << __func__ << "(): read data: " << buf << dendl;
827
828 JSONParser p;
829 r = p.parse(buf, data.length());
830 if (r < 0) {
831 ldout(s->cct, 0) << "ERROR: failed to parse JSON" << dendl;
832 op_ret = r;
833 return;
834 }
835
836 bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> > updated_shards;
837 try {
838 decode_json_obj(updated_shards, &p);
839 } catch (JSONDecoder::err& err) {
840 ldpp_dout(this, 0) << "ERROR: failed to decode JSON" << dendl;
841 op_ret = -EINVAL;
842 return;
843 }
844
845 if (driver->ctx()->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) {
846 for (bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >::iterator iter =
847 updated_shards.begin(); iter != updated_shards.end(); ++iter) {
848 ldpp_dout(this, 20) << __func__ << "(): updated shard=" << iter->first << dendl;
849 bc::flat_set<rgw_data_notify_entry>& entries = iter->second;
850 for (const auto& [key, gen] : entries) {
851 ldpp_dout(this, 20) << __func__ << "(): modified key=" << key <<
852 " of generation=" << gen << dendl;
853 }
854 }
855 }
856
857 driver->wakeup_data_sync_shards(this, source_zone, updated_shards);
858
859 op_ret = 0;
860 }
861
862 void RGWOp_DATALog_Delete::execute(optional_yield y) {
863 string marker = s->info.args.get("marker"),
864 shard = s->info.args.get("id"),
865 err;
866 unsigned shard_id;
867
868 op_ret = 0;
869
870 if (s->info.args.exists("start-time") ||
871 s->info.args.exists("end-time")) {
872 ldpp_dout(this, 5) << "start-time and end-time are no longer accepted" << dendl;
873 op_ret = -EINVAL;
874 }
875
876 if (s->info.args.exists("start-marker")) {
877 ldpp_dout(this, 5) << "start-marker is no longer accepted" << dendl;
878 op_ret = -EINVAL;
879 }
880
881 if (s->info.args.exists("end-marker")) {
882 if (!s->info.args.exists("marker")) {
883 marker = s->info.args.get("end-marker");
884 } else {
885 ldpp_dout(this, 5) << "end-marker and marker cannot both be provided" << dendl;
886 op_ret = -EINVAL;
887 }
888 }
889
890 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
891 if (!err.empty()) {
892 ldpp_dout(this, 5) << "Error parsing shard_id " << shard << dendl;
893 op_ret = -EINVAL;
894 return;
895 }
896 if (marker.empty()) { /* bounding end */
897 op_ret = -EINVAL;
898 return;
899 }
900
901 op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->
902 datalog_rados->trim_entries(this, shard_id, marker, y);
903 }
904
905 // not in header to avoid pulling in rgw_sync.h
906 class RGWOp_MDLog_Status : public RGWRESTOp {
907 rgw_meta_sync_status status;
908 public:
909 int check_caps(const RGWUserCaps& caps) override {
910 return caps.check_cap("mdlog", RGW_CAP_READ);
911 }
912 int verify_permission(optional_yield) override {
913 return check_caps(s->user->get_caps());
914 }
915 void execute(optional_yield y) override;
916 void send_response() override;
917 const char* name() const override { return "get_metadata_log_status"; }
918 };
919
920 void RGWOp_MDLog_Status::execute(optional_yield y)
921 {
922 auto sync = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_meta_sync_manager();
923 if (sync == nullptr) {
924 ldpp_dout(this, 1) << "no sync manager" << dendl;
925 op_ret = -ENOENT;
926 return;
927 }
928 op_ret = sync->read_sync_status(this, &status);
929 }
930
931 void RGWOp_MDLog_Status::send_response()
932 {
933 set_req_state_err(s, op_ret);
934 dump_errno(s);
935 end_header(s);
936
937 if (op_ret >= 0) {
938 encode_json("status", status, s->formatter);
939 }
940 flusher.flush();
941 }
942
943 // not in header to avoid pulling in rgw_data_sync.h
944 class RGWOp_BILog_Status : public RGWRESTOp {
945 bilog_status_v2 status;
946 int version = 1;
947 public:
948 int check_caps(const RGWUserCaps& caps) override {
949 return caps.check_cap("bilog", RGW_CAP_READ);
950 }
951 int verify_permission(optional_yield y) override {
952 return check_caps(s->user->get_caps());
953 }
954 void execute(optional_yield y) override;
955 void send_response() override;
956 const char* name() const override { return "get_bucket_index_log_status"; }
957 };
958
959 void RGWOp_BILog_Status::execute(optional_yield y)
960 {
961 const auto options = s->info.args.get("options");
962 bool merge = (options == "merge");
963 const auto source_zone = s->info.args.get("source-zone");
964 const auto source_key = s->info.args.get("source-bucket");
965 auto key = s->info.args.get("bucket");
966 op_ret = s->info.args.get_int("version", &version, 1);
967
968 if (key.empty()) {
969 key = source_key;
970 }
971 if (key.empty()) {
972 ldpp_dout(this, 4) << "no 'bucket' provided" << dendl;
973 op_ret = -EINVAL;
974 return;
975 }
976
977 rgw_bucket b;
978 int shard_id{-1}; // unused
979 op_ret = rgw_bucket_parse_bucket_key(s->cct, key, &b, &shard_id);
980 if (op_ret < 0) {
981 ldpp_dout(this, 4) << "invalid 'bucket' provided" << dendl;
982 op_ret = -EINVAL;
983 return;
984 }
985
986 // read the bucket instance info for num_shards
987 std::unique_ptr<rgw::sal::Bucket> bucket;
988 op_ret = driver->get_bucket(s, nullptr, b, &bucket, y);
989 if (op_ret < 0) {
990 ldpp_dout(this, 4) << "failed to read bucket info: " << cpp_strerror(op_ret) << dendl;
991 return;
992 }
993
994 rgw_bucket source_bucket;
995
996 if (source_key.empty() ||
997 source_key == key) {
998 source_bucket = bucket->get_key();
999 } else {
1000 op_ret = rgw_bucket_parse_bucket_key(s->cct, source_key, &source_bucket, nullptr);
1001 if (op_ret < 0) {
1002 ldpp_dout(this, 4) << "invalid 'source-bucket' provided (key=" << source_key << ")" << dendl;
1003 return;
1004 }
1005 }
1006
1007 const auto& local_zone_id = driver->get_zone()->get_id();
1008
1009 if (!merge) {
1010 rgw_sync_bucket_pipe pipe;
1011 pipe.source.zone = source_zone;
1012 pipe.source.bucket = source_bucket;
1013 pipe.dest.zone = local_zone_id;
1014 pipe.dest.bucket = bucket->get_key();
1015
1016 ldpp_dout(this, 20) << "RGWOp_BILog_Status::execute(optional_yield y): getting sync status for pipe=" << pipe << dendl;
1017
1018 op_ret = rgw_read_bucket_full_sync_status(
1019 this,
1020 static_cast<rgw::sal::RadosStore*>(driver),
1021 pipe,
1022 &status.sync_status,
1023 s->yield);
1024 if (op_ret < 0) {
1025 ldpp_dout(this, -1) << "ERROR: rgw_read_bucket_full_sync_status() on pipe=" << pipe << " returned ret=" << op_ret << dendl;
1026 return;
1027 }
1028 status.inc_status.resize(status.sync_status.shards_done_with_gen.size());
1029
1030 op_ret = rgw_read_bucket_inc_sync_status(
1031 this,
1032 static_cast<rgw::sal::RadosStore*>(driver),
1033 pipe,
1034 status.sync_status.incremental_gen,
1035 &status.inc_status);
1036 if (op_ret < 0) {
1037 ldpp_dout(this, -1) << "ERROR: rgw_read_bucket_inc_sync_status() on pipe=" << pipe << " returned ret=" << op_ret << dendl;
1038 }
1039 return;
1040 }
1041
1042 rgw_zone_id source_zone_id(source_zone);
1043
1044 RGWBucketSyncPolicyHandlerRef source_handler;
1045 op_ret = driver->get_sync_policy_handler(s, source_zone_id, source_bucket, &source_handler, y);
1046 if (op_ret < 0) {
1047 ldpp_dout(this, -1) << "could not get bucket sync policy handler (r=" << op_ret << ")" << dendl;
1048 return;
1049 }
1050
1051 auto local_dests = source_handler->get_all_dests_in_zone(local_zone_id);
1052
1053 std::vector<rgw_bucket_shard_sync_info> current_status;
1054 for (auto& entry : local_dests) {
1055 auto pipe = entry.second;
1056
1057 ldpp_dout(this, 20) << "RGWOp_BILog_Status::execute(optional_yield y): getting sync status for pipe=" << pipe << dendl;
1058
1059 RGWBucketInfo *pinfo = &bucket->get_info();
1060 std::optional<RGWBucketInfo> opt_dest_info;
1061
1062 if (!pipe.dest.bucket) {
1063 /* Uh oh, something went wrong */
1064 ldpp_dout(this, 20) << "ERROR: RGWOp_BILog_Status::execute(optional_yield y): BUG: pipe.dest.bucket was not initialized" << pipe << dendl;
1065 op_ret = -EIO;
1066 return;
1067 }
1068
1069 if (*pipe.dest.bucket != pinfo->bucket) {
1070 opt_dest_info.emplace();
1071 std::unique_ptr<rgw::sal::Bucket> dest_bucket;
1072 op_ret = driver->get_bucket(s, nullptr, *pipe.dest.bucket, &dest_bucket, y);
1073 if (op_ret < 0) {
1074 ldpp_dout(this, 4) << "failed to read target bucket info (bucket=: " << cpp_strerror(op_ret) << dendl;
1075 return;
1076 }
1077
1078 *opt_dest_info = dest_bucket->get_info();
1079 pinfo = &(*opt_dest_info);
1080 pipe.dest.bucket = pinfo->bucket;
1081 }
1082
1083 op_ret = rgw_read_bucket_full_sync_status(
1084 this,
1085 static_cast<rgw::sal::RadosStore*>(driver),
1086 pipe,
1087 &status.sync_status,
1088 s->yield);
1089 if (op_ret < 0) {
1090 ldpp_dout(this, -1) << "ERROR: rgw_read_bucket_full_sync_status() on pipe=" << pipe << " returned ret=" << op_ret << dendl;
1091 return;
1092 }
1093
1094 current_status.resize(status.sync_status.shards_done_with_gen.size());
1095 int r = rgw_read_bucket_inc_sync_status(this, static_cast<rgw::sal::RadosStore*>(driver),
1096 pipe, status.sync_status.incremental_gen, &current_status);
1097 if (r < 0) {
1098 ldpp_dout(this, -1) << "ERROR: rgw_read_bucket_inc_sync_status() on pipe=" << pipe << " returned ret=" << r << dendl;
1099 op_ret = r;
1100 return;
1101 }
1102
1103 if (status.inc_status.empty()) {
1104 status.inc_status = std::move(current_status);
1105 } else {
1106 if (current_status.size() != status.inc_status.size()) {
1107 op_ret = -EINVAL;
1108 ldpp_dout(this, -1) << "ERROR: different number of shards for sync status of buckets "
1109 "syncing from the same source: status.size()= "
1110 << status.inc_status.size()
1111 << " current_status.size()="
1112 << current_status.size() << dendl;
1113 return;
1114 }
1115 auto m = status.inc_status.begin();
1116 for (auto& cur_shard_status : current_status) {
1117 auto& result_shard_status = *m++;
1118 // always take the first marker, or any later marker that's smaller
1119 if (cur_shard_status.inc_marker.position < result_shard_status.inc_marker.position) {
1120 result_shard_status = std::move(cur_shard_status);
1121 }
1122 }
1123 }
1124 }
1125 }
1126
1127 void RGWOp_BILog_Status::send_response()
1128 {
1129 set_req_state_err(s, op_ret);
1130 dump_errno(s);
1131 end_header(s);
1132
1133 if (op_ret >= 0) {
1134 if (version < 2) {
1135 encode_json("status", status.inc_status, s->formatter);
1136 } else {
1137 encode_json("status", status, s->formatter);
1138 }
1139 }
1140 flusher.flush();
1141 }
1142
1143 // not in header to avoid pulling in rgw_data_sync.h
1144 class RGWOp_DATALog_Status : public RGWRESTOp {
1145 rgw_data_sync_status status;
1146 public:
1147 int check_caps(const RGWUserCaps& caps) override {
1148 return caps.check_cap("datalog", RGW_CAP_READ);
1149 }
1150 int verify_permission(optional_yield y) override {
1151 return check_caps(s->user->get_caps());
1152 }
1153 void execute(optional_yield y) override ;
1154 void send_response() override;
1155 const char* name() const override { return "get_data_changes_log_status"; }
1156 };
1157
1158 void RGWOp_DATALog_Status::execute(optional_yield y)
1159 {
1160 const auto source_zone = s->info.args.get("source-zone");
1161 auto sync = driver->get_data_sync_manager(source_zone);
1162 if (sync == nullptr) {
1163 ldpp_dout(this, 1) << "no sync manager for source-zone " << source_zone << dendl;
1164 op_ret = -ENOENT;
1165 return;
1166 }
1167 op_ret = sync->read_sync_status(this, &status);
1168 }
1169
1170 void RGWOp_DATALog_Status::send_response()
1171 {
1172 set_req_state_err(s, op_ret);
1173 dump_errno(s);
1174 end_header(s);
1175
1176 if (op_ret >= 0) {
1177 encode_json("status", status, s->formatter);
1178 }
1179 flusher.flush();
1180 }
1181
1182
1183 RGWOp *RGWHandler_Log::op_get() {
1184 bool exists;
1185 string type = s->info.args.get("type", &exists);
1186
1187 if (!exists) {
1188 return NULL;
1189 }
1190
1191 if (type.compare("metadata") == 0) {
1192 if (s->info.args.exists("id")) {
1193 if (s->info.args.exists("info")) {
1194 return new RGWOp_MDLog_ShardInfo;
1195 } else {
1196 return new RGWOp_MDLog_List;
1197 }
1198 } else if (s->info.args.exists("status")) {
1199 return new RGWOp_MDLog_Status;
1200 } else {
1201 return new RGWOp_MDLog_Info;
1202 }
1203 } else if (type.compare("bucket-index") == 0) {
1204 if (s->info.args.exists("info")) {
1205 return new RGWOp_BILog_Info;
1206 } else if (s->info.args.exists("status")) {
1207 return new RGWOp_BILog_Status;
1208 } else {
1209 return new RGWOp_BILog_List;
1210 }
1211 } else if (type.compare("data") == 0) {
1212 if (s->info.args.exists("id")) {
1213 if (s->info.args.exists("info")) {
1214 return new RGWOp_DATALog_ShardInfo;
1215 } else {
1216 return new RGWOp_DATALog_List;
1217 }
1218 } else if (s->info.args.exists("status")) {
1219 return new RGWOp_DATALog_Status;
1220 } else {
1221 return new RGWOp_DATALog_Info;
1222 }
1223 }
1224 return NULL;
1225 }
1226
1227 RGWOp *RGWHandler_Log::op_delete() {
1228 bool exists;
1229 string type = s->info.args.get("type", &exists);
1230
1231 if (!exists) {
1232 return NULL;
1233 }
1234
1235 if (type.compare("metadata") == 0)
1236 return new RGWOp_MDLog_Delete;
1237 else if (type.compare("bucket-index") == 0)
1238 return new RGWOp_BILog_Delete;
1239 else if (type.compare("data") == 0)
1240 return new RGWOp_DATALog_Delete;
1241 return NULL;
1242 }
1243
1244 RGWOp *RGWHandler_Log::op_post() {
1245 bool exists;
1246 string type = s->info.args.get("type", &exists);
1247
1248 if (!exists) {
1249 return NULL;
1250 }
1251
1252 if (type.compare("metadata") == 0) {
1253 if (s->info.args.exists("lock"))
1254 return new RGWOp_MDLog_Lock;
1255 else if (s->info.args.exists("unlock"))
1256 return new RGWOp_MDLog_Unlock;
1257 else if (s->info.args.exists("notify"))
1258 return new RGWOp_MDLog_Notify;
1259 } else if (type.compare("data") == 0) {
1260 if (s->info.args.exists("notify")) {
1261 return new RGWOp_DATALog_Notify;
1262 } else if (s->info.args.exists("notify2")) {
1263 return new RGWOp_DATALog_Notify2;
1264 }
1265 }
1266 return NULL;
1267 }
1268