]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_admin.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / rgw / rgw_admin.cc
index e6880c7778e65325898ee303eba14b8a7a6bda28..73b0736b18feca10abef6072ffe57d800eac04f7 100644 (file)
@@ -6,6 +6,7 @@
 #include <sstream>
 #include <string>
 
+#include <boost/asio.hpp>
 #include <boost/optional.hpp>
 
 extern "C" {
@@ -155,7 +156,9 @@ void usage()
   cout << "  bucket unlink              unlink bucket from specified user\n";
   cout << "  bucket stats               returns bucket statistics\n";
   cout << "  bucket rm                  remove bucket\n";
-  cout << "  bucket check               check bucket index\n";
+  cout << "  bucket check               check bucket index by verifying size and object count stats\n";
+  cout << "  bucket check olh           check for olh index entries and objects that are pending removal\n";
+  cout << "  bucket check unlinked      check for object versions that are not visible in a bucket listing \n";
   cout << "  bucket chown               link bucket to specified user and update its object ACLs\n";
   cout << "  bucket reshard             reshard bucket\n";
   cout << "  bucket rewrite             rewrite all objects in the specified bucket\n";
@@ -318,6 +321,9 @@ void usage()
   cout << "  script-package add         add a lua package to the scripts allowlist\n";
   cout << "  script-package rm          remove a lua package from the scripts allowlist\n";
   cout << "  script-package list        get the lua packages allowlist\n";
+  cout << "  notification list          list bucket notifications configuration\n";
+  cout << "  notification get           get a bucket notifications configuration\n";
+  cout << "  notification rm            remove a bucket notifications configuration\n";
   cout << "options:\n";
   cout << "   --tenant=<tenant>         tenant name\n";
   cout << "   --user_ns=<namespace>     namespace of user (oidc in case of users authenticated with oidc provider)\n";
@@ -483,10 +489,15 @@ void usage()
   cout << "   --totp-pin                the valid value of a TOTP token at a certain time\n";
   cout << "\nBucket notifications options:\n";
   cout << "   --topic                   bucket notifications topic name\n";
+  cout << "   --notification-id         bucket notifications id\n";
   cout << "\nScript options:\n";
   cout << "   --context                 context in which the script runs. one of: "+LUA_CONTEXT_LIST+"\n";
   cout << "   --package                 name of the lua package that should be added/removed to/from the allowlist\n";
   cout << "   --allow-compilation       package is allowed to compile C code as part of its installation\n";
+  cout << "\nBucket check olh/unlinked options:\n";
+  cout << "   --min-age-hours           minimum age of unlinked objects to consider for bucket check unlinked (default: 1)\n";
+  cout << "   --dump-keys               when specified, all keys identified as problematic are printed to stdout\n";
+  cout << "   --hide-progress           when specified, per-shard progress details are not printed to stderr\n";
   cout << "\nradoslist options:\n";
   cout << "   --rgw-obj-fs              the field separator that will separate the rados\n";
   cout << "                             object name from the rgw object name;\n";
@@ -655,6 +666,8 @@ enum class OPT {
   BUCKET_LAYOUT,
   BUCKET_STATS,
   BUCKET_CHECK,
+  BUCKET_CHECK_OLH,
+  BUCKET_CHECK_UNLINKED,
   BUCKET_SYNC_CHECKPOINT,
   BUCKET_SYNC_INFO,
   BUCKET_SYNC_STATUS,
@@ -670,6 +683,7 @@ enum class OPT {
   BUCKET_RADOS_LIST,
   BUCKET_SHARD_OBJECTS,
   BUCKET_OBJECT_SHARD,
+  BUCKET_RESYNC_ENCRYPTED_MULTIPART,
   POLICY,
   POOL_ADD,
   POOL_RM,
@@ -830,9 +844,12 @@ enum class OPT {
   MFA_RESYNC,
   RESHARD_STALE_INSTANCES_LIST,
   RESHARD_STALE_INSTANCES_DELETE,
-  PUBSUB_TOPICS_LIST,
+  PUBSUB_TOPIC_LIST,
   PUBSUB_TOPIC_GET,
   PUBSUB_TOPIC_RM,
+  PUBSUB_NOTIFICATION_LIST,
+  PUBSUB_NOTIFICATION_GET,
+  PUBSUB_NOTIFICATION_RM,
   SCRIPT_PUT,
   SCRIPT_GET,
   SCRIPT_RM,
@@ -869,6 +886,8 @@ static SimpleCmd::Commands all_cmds = {
   { "bucket layout", OPT::BUCKET_LAYOUT },
   { "bucket stats", OPT::BUCKET_STATS },
   { "bucket check", OPT::BUCKET_CHECK },
+  { "bucket check olh", OPT::BUCKET_CHECK_OLH },
+  { "bucket check unlinked", OPT::BUCKET_CHECK_UNLINKED },
   { "bucket sync checkpoint", OPT::BUCKET_SYNC_CHECKPOINT },
   { "bucket sync info", OPT::BUCKET_SYNC_INFO },
   { "bucket sync status", OPT::BUCKET_SYNC_STATUS },
@@ -886,6 +905,7 @@ static SimpleCmd::Commands all_cmds = {
   { "bucket shard objects", OPT::BUCKET_SHARD_OBJECTS },
   { "bucket shard object", OPT::BUCKET_SHARD_OBJECTS },
   { "bucket object shard", OPT::BUCKET_OBJECT_SHARD },
+  { "bucket resync encrypted multipart", OPT::BUCKET_RESYNC_ENCRYPTED_MULTIPART },
   { "policy", OPT::POLICY },
   { "pool add", OPT::POOL_ADD },
   { "pool rm", OPT::POOL_RM },
@@ -1061,9 +1081,12 @@ static SimpleCmd::Commands all_cmds = {
   { "reshard stale list", OPT::RESHARD_STALE_INSTANCES_LIST },
   { "reshard stale-instances delete", OPT::RESHARD_STALE_INSTANCES_DELETE },
   { "reshard stale delete", OPT::RESHARD_STALE_INSTANCES_DELETE },
-  { "topic list", OPT::PUBSUB_TOPICS_LIST },
+  { "topic list", OPT::PUBSUB_TOPIC_LIST },
   { "topic get", OPT::PUBSUB_TOPIC_GET },
   { "topic rm", OPT::PUBSUB_TOPIC_RM },
+  { "notification list", OPT::PUBSUB_NOTIFICATION_LIST },
+  { "notification get", OPT::PUBSUB_NOTIFICATION_GET },
+  { "notification rm", OPT::PUBSUB_NOTIFICATION_RM },
   { "script put", OPT::SCRIPT_PUT },
   { "script get", OPT::SCRIPT_GET },
   { "script rm", OPT::SCRIPT_RM },
@@ -2268,7 +2291,7 @@ static void get_data_sync_status(const rgw_zone_id& source_zone, list<string>& s
     return;
   }
 
-  if (!static_cast<rgw::sal::RadosStore*>(driver)->svc()->zone->zone_syncs_from(static_cast<rgw::sal::RadosStore*>(driver)->svc()->zone->get_zone(), *sz)) {
+  if (!static_cast<rgw::sal::RadosStore*>(driver)->svc()->zone->zone_syncs_from(*sz)) {
     push_ss(ss, status, tab) << string("not syncing from zone");
     flush_ss(ss, status);
     return;
@@ -3455,6 +3478,9 @@ int main(int argc, const char **argv)
   std::optional<int> bucket_index_max_shards;
 
   int max_concurrent_ios = 32;
+  ceph::timespan min_age = std::chrono::hours(1);
+  bool hide_progress = false;
+  bool dump_keys = false;
   uint64_t orphan_stale_secs = (24 * 3600);
   int detail = false;
 
@@ -3488,6 +3514,7 @@ int main(int argc, const char **argv)
   int trim_delay_ms = 0;
 
   string topic_name;
+  string notification_id;
   string sub_name;
   string event_id;
 
@@ -3718,6 +3745,8 @@ int main(int argc, const char **argv)
         cerr << "ERROR: failed to parse max concurrent ios: " << err << std::endl;
         return EINVAL;
       }
+    } else if (ceph_argparse_witharg(args, i, &val, "--min-age-hours", (char*)NULL)) {
+      min_age = std::chrono::hours(atoi(val.c_str()));
     } else if (ceph_argparse_witharg(args, i, &val, "--orphan-stale-secs", (char*)NULL)) {
       orphan_stale_secs = (uint64_t)strict_strtoll(val.c_str(), 10, &err);
       if (!err.empty()) {
@@ -3800,6 +3829,10 @@ int main(int argc, const char **argv)
      // do nothing
     } else if (ceph_argparse_binary_flag(args, i, &inconsistent_index, NULL, "--inconsistent-index", (char*)NULL)) {
      // do nothing
+    } else if (ceph_argparse_flag(args, i, "--hide-progress", (char*)NULL)) {
+      hide_progress = true;
+    } else if (ceph_argparse_flag(args, i, "--dump-keys", (char*)NULL)) {
+      dump_keys = true;
     } else if (ceph_argparse_binary_flag(args, i, &placement_inline_data, NULL, "--placement-inline-data", (char*)NULL)) {
       placement_inline_data_specified = true;
      // do nothing
@@ -3963,6 +3996,8 @@ int main(int argc, const char **argv)
       trim_delay_ms = atoi(val.c_str());
     } else if (ceph_argparse_witharg(args, i, &val, "--topic", (char*)NULL)) {
       topic_name = val;
+    } else if (ceph_argparse_witharg(args, i, &val, "--notification-id", (char*)NULL)) {
+      notification_id = val;
     } else if (ceph_argparse_witharg(args, i, &val, "--subscription", (char*)NULL)) {
       sub_name = val;
     } else if (ceph_argparse_witharg(args, i, &val, "--event-id", (char*)NULL)) {
@@ -4209,8 +4244,10 @@ int main(int argc, const char **argv)
                         OPT::ROLE_POLICY_GET,
                         OPT::RESHARD_LIST,
                         OPT::RESHARD_STATUS,
-                        OPT::PUBSUB_TOPICS_LIST,
+                        OPT::PUBSUB_TOPIC_LIST,
+       OPT::PUBSUB_NOTIFICATION_LIST,
                         OPT::PUBSUB_TOPIC_GET,
+       OPT::PUBSUB_NOTIFICATION_GET,
                         OPT::SCRIPT_GET,
     };
 
@@ -4290,9 +4327,12 @@ int main(int argc, const char **argv)
                           && opt_cmd != OPT::RESHARD_ADD
                           && opt_cmd != OPT::RESHARD_CANCEL
                           && opt_cmd != OPT::RESHARD_STATUS
-                          && opt_cmd != OPT::PUBSUB_TOPICS_LIST
+                          && opt_cmd != OPT::PUBSUB_TOPIC_LIST
+                          && opt_cmd != OPT::PUBSUB_NOTIFICATION_LIST
                           && opt_cmd != OPT::PUBSUB_TOPIC_GET
-                          && opt_cmd != OPT::PUBSUB_TOPIC_RM) {
+                          && opt_cmd != OPT::PUBSUB_NOTIFICATION_GET
+                          && opt_cmd != OPT::PUBSUB_TOPIC_RM
+                          && opt_cmd != OPT::PUBSUB_NOTIFICATION_RM) {
         cerr << "ERROR: --tenant is set, but there's no user ID" << std::endl;
         return EINVAL;
       }
@@ -6443,6 +6483,9 @@ int main(int argc, const char **argv)
   bucket_op.set_delete_children(delete_child_objects);
   bucket_op.set_fix_index(fix);
   bucket_op.set_max_aio(max_concurrent_ios);
+  bucket_op.set_min_age(min_age);
+  bucket_op.set_dump_keys(dump_keys);
+  bucket_op.set_hide_progress(hide_progress);
 
   // required to gather errors from operations
   std::string err_msg;
@@ -7177,6 +7220,47 @@ int main(int argc, const char **argv)
     formatter->flush(cout);
   }
 
+  if (opt_cmd == OPT::BUCKET_RESYNC_ENCRYPTED_MULTIPART) {
+    // repair logic for replication of encrypted multipart uploads:
+    // https://tracker.ceph.com/issues/46062
+    if (bucket_name.empty()) {
+      cerr << "ERROR: bucket not specified" << std::endl;
+      return EINVAL;
+    }
+    int ret = init_bucket(user.get(), tenant, bucket_name, bucket_id, &bucket);
+    if (ret < 0) {
+      return -ret;
+    }
+
+    auto rados_driver = dynamic_cast<rgw::sal::RadosStore*>(driver);
+    if (!rados_driver) {
+      cerr << "ERROR: this command can only work when the cluster "
+          "has a RADOS backing store." << std::endl;
+      return EPERM;
+    }
+
+    // fail if recovery wouldn't generate replication log entries
+    if (!rados_driver->svc()->zone->need_to_log_data() && !yes_i_really_mean_it) {
+      cerr << "This command is only necessary for replicated buckets." << std::endl;
+      cerr << "do you really mean it? (requires --yes-i-really-mean-it)" << std::endl;
+      return EPERM;
+    }
+
+    formatter->open_object_section("modified");
+    encode_json("bucket", bucket->get_name(), formatter.get());
+    encode_json("bucket_id", bucket->get_bucket_id(), formatter.get());
+
+    ret = rados_driver->getRados()->bucket_resync_encrypted_multipart(
+        dpp(), null_yield, rados_driver, bucket->get_info(),
+        marker, stream_flusher);
+    if (ret < 0) {
+      return -ret;
+    }
+    formatter->close_section();
+    formatter->flush(cout);
+    return 0;
+  }
+
   if (opt_cmd == OPT::BUCKET_CHOWN) {
     if (bucket_name.empty()) {
       cerr << "ERROR: bucket name not specified" << std::endl;
@@ -8313,6 +8397,28 @@ next:
     }
   }
 
+  if (opt_cmd == OPT::BUCKET_CHECK_OLH) {
+    rgw::sal::RadosStore* store = dynamic_cast<rgw::sal::RadosStore*>(driver);
+    if (!store) {
+      cerr <<
+             "WARNING: this command is only relevant when the cluster has a RADOS backing store." <<
+             std::endl;
+      return 0;
+    }
+    RGWBucketAdminOp::check_index_olh(store, bucket_op, stream_flusher, dpp());
+  }
+
+  if (opt_cmd == OPT::BUCKET_CHECK_UNLINKED) {
+    rgw::sal::RadosStore* store = dynamic_cast<rgw::sal::RadosStore*>(driver);
+    if (!store) {
+      cerr <<
+             "WARNING: this command is only relevant when the cluster has a RADOS backing store." <<
+             std::endl;
+      return 0;
+    }
+    RGWBucketAdminOp::check_index_unlinked(store, bucket_op, stream_flusher, dpp());
+  }
+
   if (opt_cmd == OPT::BUCKET_RM) {
     if (!inconsistent_index) {
       RGWBucketAdminOp::remove_bucket(driver, bucket_op, null_yield, dpp(), bypass_gc, true);
@@ -9474,7 +9580,7 @@ next:
 
   if (opt_cmd == OPT::SYNC_GROUP_CREATE ||
       opt_cmd == OPT::SYNC_GROUP_MODIFY) {
-    CHECK_TRUE(require_opt(opt_group_id), "ERROR: --group-id not specified", EINVAL);
+    CHECK_TRUE(require_non_empty_opt(opt_group_id), "ERROR: --group-id not specified", EINVAL);
     CHECK_TRUE(require_opt(opt_status), "ERROR: --status is not specified (options: forbidden, allowed, enabled)", EINVAL);
 
     SyncPolicyContext sync_policy_ctx(cfgstore.get(), opt_bucket);
@@ -9534,7 +9640,7 @@ next:
   }
 
   if (opt_cmd == OPT::SYNC_GROUP_REMOVE) {
-    CHECK_TRUE(require_opt(opt_group_id), "ERROR: --group-id not specified", EINVAL);
+    CHECK_TRUE(require_non_empty_opt(opt_group_id), "ERROR: --group-id not specified", EINVAL);
 
     SyncPolicyContext sync_policy_ctx(cfgstore.get(), opt_bucket);
     ret = sync_policy_ctx.init(zonegroup_id, zonegroup_name);
@@ -9559,8 +9665,8 @@ next:
   }
 
   if (opt_cmd == OPT::SYNC_GROUP_FLOW_CREATE) {
-    CHECK_TRUE(require_opt(opt_group_id), "ERROR: --group-id not specified", EINVAL);
-    CHECK_TRUE(require_opt(opt_flow_id), "ERROR: --flow-id not specified", EINVAL);
+    CHECK_TRUE(require_non_empty_opt(opt_group_id), "ERROR: --group-id not specified", EINVAL);
+    CHECK_TRUE(require_non_empty_opt(opt_flow_id), "ERROR: --flow-id not specified", EINVAL);
     CHECK_TRUE(require_opt(opt_flow_type),
                            "ERROR: --flow-type not specified (options: symmetrical, directional)", EINVAL);
     CHECK_TRUE((symmetrical_flow_opt(*opt_flow_type) ||
@@ -9610,8 +9716,8 @@ next:
   }
 
   if (opt_cmd == OPT::SYNC_GROUP_FLOW_REMOVE) {
-    CHECK_TRUE(require_opt(opt_group_id), "ERROR: --group-id not specified", EINVAL);
-    CHECK_TRUE(require_opt(opt_flow_id), "ERROR: --flow-id not specified", EINVAL);
+    CHECK_TRUE(require_non_empty_opt(opt_group_id), "ERROR: --group-id not specified", EINVAL);
+    CHECK_TRUE(require_non_empty_opt(opt_flow_id), "ERROR: --flow-id not specified", EINVAL);
     CHECK_TRUE(require_opt(opt_flow_type),
                            "ERROR: --flow-type not specified (options: symmetrical, directional)", EINVAL);
     CHECK_TRUE((symmetrical_flow_opt(*opt_flow_type) ||
@@ -9652,8 +9758,8 @@ next:
 
   if (opt_cmd == OPT::SYNC_GROUP_PIPE_CREATE ||
       opt_cmd == OPT::SYNC_GROUP_PIPE_MODIFY) {
-    CHECK_TRUE(require_opt(opt_group_id), "ERROR: --group-id not specified", EINVAL);
-    CHECK_TRUE(require_opt(opt_pipe_id), "ERROR: --pipe-id not specified", EINVAL);
+    CHECK_TRUE(require_non_empty_opt(opt_group_id), "ERROR: --group-id not specified", EINVAL);
+    CHECK_TRUE(require_non_empty_opt(opt_pipe_id), "ERROR: --pipe-id not specified", EINVAL);
     if (opt_cmd == OPT::SYNC_GROUP_PIPE_CREATE) {
       CHECK_TRUE(require_non_empty_opt(opt_source_zone_ids), "ERROR: --source-zones not provided or is empty; should be list of zones or '*'", EINVAL);
       CHECK_TRUE(require_non_empty_opt(opt_dest_zone_ids), "ERROR: --dest-zones not provided or is empty; should be list of zones or '*'", EINVAL);
@@ -9738,8 +9844,8 @@ next:
   }
 
   if (opt_cmd == OPT::SYNC_GROUP_PIPE_REMOVE) {
-    CHECK_TRUE(require_opt(opt_group_id), "ERROR: --group-id not specified", EINVAL);
-    CHECK_TRUE(require_opt(opt_pipe_id), "ERROR: --pipe-id not specified", EINVAL);
+    CHECK_TRUE(require_non_empty_opt(opt_group_id), "ERROR: --group-id not specified", EINVAL);
+    CHECK_TRUE(require_non_empty_opt(opt_pipe_id), "ERROR: --pipe-id not specified", EINVAL);
 
     SyncPolicyContext sync_policy_ctx(cfgstore.get(), opt_bucket);
     ret = sync_policy_ctx.init(zonegroup_id, zonegroup_name);
@@ -10407,34 +10513,41 @@ next:
    }
  }
 
-  if (opt_cmd == OPT::PUBSUB_TOPICS_LIST) {
+  if (opt_cmd == OPT::PUBSUB_NOTIFICATION_LIST) {
+    if (bucket_name.empty()) {
+      cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
+      return EINVAL;
+    }
 
     RGWPubSub ps(driver, tenant);
 
-    if (!bucket_name.empty()) {
-      rgw_pubsub_bucket_topics result;
-      int ret = init_bucket(user.get(), tenant, bucket_name, bucket_id, &bucket);
-      if (ret < 0) {
-        cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
-        return -ret;
-      }
+    rgw_pubsub_bucket_topics result;
+    int ret = init_bucket(user.get(), tenant, bucket_name, bucket_id, &bucket);
+    if (ret < 0) {
+      cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
 
-      const RGWPubSub::Bucket b(ps, bucket.get());
-      ret = b.get_topics(dpp(), result, null_yield);
-      if (ret < 0 && ret != -ENOENT) {
-        cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
-        return -ret;
-      }
-      encode_json("result", result, formatter.get());
-    } else {
-      rgw_pubsub_topics result;
-      int ret = ps.get_topics(dpp(), result, null_yield);
-      if (ret < 0 && ret != -ENOENT) {
-        cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
-        return -ret;
-      }
-      encode_json("result", result, formatter.get());
+    const RGWPubSub::Bucket b(ps, bucket.get());
+    ret = b.get_topics(dpp(), result, null_yield);
+    if (ret < 0 && ret != -ENOENT) {
+      cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
     }
+    encode_json("result", result, formatter.get());
+    formatter->flush(cout);
+  }
+
+  if (opt_cmd == OPT::PUBSUB_TOPIC_LIST) {
+    RGWPubSub ps(driver, tenant);
+
+    rgw_pubsub_topics result;
+    int ret = ps.get_topics(dpp(), result, null_yield);
+    if (ret < 0 && ret != -ENOENT) {
+      cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+    encode_json("result", result, formatter.get());
     formatter->flush(cout);
   }
 
@@ -10456,12 +10569,54 @@ next:
     formatter->flush(cout);
   }
 
+  if (opt_cmd == OPT::PUBSUB_NOTIFICATION_GET) {
+    if (notification_id.empty()) {
+      cerr << "ERROR: notification-id was not provided (via --notification-id)" << std::endl;
+      return EINVAL;
+    }
+    if (bucket_name.empty()) {
+      cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
+      return EINVAL;
+    }
+
+    int ret = init_bucket(user.get(), tenant, bucket_name, bucket_id, &bucket);
+    if (ret < 0) {
+      cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+
+    RGWPubSub ps(driver, tenant);
+
+    rgw_pubsub_bucket_topics bucket_topics;
+    const RGWPubSub::Bucket b(ps, bucket.get());
+    ret = b.get_topics(dpp(), bucket_topics, null_yield);
+    if (ret < 0 && ret != -ENOENT) {
+      cerr << "ERROR: could not get bucket notifications: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+
+    rgw_pubsub_topic_filter bucket_topic;
+    ret = b.get_notification_by_id(dpp(), notification_id, bucket_topic, null_yield);
+    if (ret < 0) {
+      cerr << "ERROR: could not get notification: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+    encode_json("notification", bucket_topic, formatter.get());
+    formatter->flush(cout);
+  }
+
   if (opt_cmd == OPT::PUBSUB_TOPIC_RM) {
     if (topic_name.empty()) {
       cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
       return EINVAL;
     }
 
+    ret = rgw::notify::remove_persistent_topic(dpp(), static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_notif_pool_ctx(), topic_name, null_yield);
+    if (ret < 0) {
+      cerr << "ERROR: could not remove persistent topic: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+
     RGWPubSub ps(driver, tenant);
 
     ret = ps.remove_topic(dpp(), topic_name, null_yield);
@@ -10471,6 +10626,36 @@ next:
     }
   }
 
+  if (opt_cmd == OPT::PUBSUB_NOTIFICATION_RM) {
+    if (bucket_name.empty()) {
+      cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
+      return EINVAL;
+    }
+
+    int ret = init_bucket(user.get(), tenant, bucket_name, bucket_id, &bucket);
+    if (ret < 0) {
+      cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+
+    RGWPubSub ps(driver, tenant);
+
+    rgw_pubsub_bucket_topics bucket_topics;
+    const RGWPubSub::Bucket b(ps, bucket.get());
+    ret = b.get_topics(dpp(), bucket_topics, null_yield);
+    if (ret < 0 && ret != -ENOENT) {
+      cerr << "ERROR: could not get bucket notifications: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+
+    rgw_pubsub_topic_filter bucket_topic;
+    if(notification_id.empty()) {
+      ret = b.remove_notifications(dpp(), null_yield);
+    } else {
+      ret = b.remove_notification_by_id(dpp(), notification_id, null_yield);
+    }
+  }
+
   if (opt_cmd == OPT::SCRIPT_PUT) {
     if (!str_script_ctx) {
       cerr << "ERROR: context was not provided (via --context)" << std::endl;