]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_pubsub_push.cc
import ceph 14.2.5
[ceph.git] / ceph / src / rgw / rgw_pubsub_push.cc
index efc1823e748488bc225b224aa70e764a8063e4c6..0c13886e01b31b8bf8f39dac66d3048f3da1c949 100644 (file)
@@ -7,6 +7,7 @@
 #include <algorithm>
 #include "include/buffer_fwd.h"
 #include "common/Formatter.h"
+#include "common/async/completion.h"
 #include "rgw_common.h"
 #include "rgw_data_sync.h"
 #include "rgw_pubsub.h"
 
 using namespace rgw;
 
-std::string json_format_pubsub_event(const rgw_pubsub_event& event) {
+template<typename EventType>
+std::string json_format_pubsub_event(const EventType& event) {
   std::stringstream ss;
   JSONFormatter f(false);
-  encode_json("event", event, &f);
+  encode_json(EventType::json_type_single, event, &f);
   f.flush(ss);
   return ss.str();
 }
@@ -88,41 +90,57 @@ private:
 
 public:
   RGWPubSubHTTPEndpoint(const std::string& _endpoint, 
-      const RGWHTTPArgs& args) :
-    endpoint(_endpoint) {
-      bool exists;
-
-      str_ack_level = args.get("http-ack-level", &exists);
-      if (!exists || str_ack_level == "any") {
-        // "any" is default
-        ack_level = ACK_LEVEL_ANY;
-      } else if (str_ack_level == "non-error") {
-        ack_level = ACK_LEVEL_NON_ERROR;
-      } else {
-        ack_level = std::atoi(str_ack_level.c_str());
-        if (ack_level < 100 || ack_level >= 600) {
-          throw configuration_error("HTTP: invalid http-ack-level " + str_ack_level);
-        }
-      }
+    const RGWHTTPArgs& args) : endpoint(_endpoint) {
+    bool exists;
 
-      auto str_verify_ssl = args.get("verify-ssl", &exists);
-      boost::algorithm::to_lower(str_verify_ssl);
-      // verify server certificate by default
-      if (!exists || str_verify_ssl == "true") {
-        verify_ssl = true;
-      } else if (str_verify_ssl == "false") {
-        verify_ssl = false;
-      } else {
-          throw configuration_error("HTTP: verify-ssl must be true/false, not: " + str_verify_ssl);
+    str_ack_level = args.get("http-ack-level", &exists);
+    if (!exists || str_ack_level == "any") {
+      // "any" is default
+      ack_level = ACK_LEVEL_ANY;
+    } else if (str_ack_level == "non-error") {
+      ack_level = ACK_LEVEL_NON_ERROR;
+    } else {
+      ack_level = std::atoi(str_ack_level.c_str());
+      if (ack_level < 100 || ack_level >= 600) {
+        throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level);
       }
     }
 
+    auto str_verify_ssl = args.get("verify-ssl", &exists);
+    boost::algorithm::to_lower(str_verify_ssl);
+    // verify server certificate by default
+    if (!exists || str_verify_ssl == "true") {
+      verify_ssl = true;
+    } else if (str_verify_ssl == "false") {
+      verify_ssl = false;
+    } else {
+        throw configuration_error("HTTP/S: verify-ssl must be true/false, not: " + str_verify_ssl);
+    }
+  }
+
   RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
     return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
   }
 
+  RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
+    return new PostCR(json_format_pubsub_event(record), env, endpoint, ack_level, verify_ssl);
+  }
+
+  int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
+    bufferlist read_bl;
+    RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
+    const auto post_data = json_format_pubsub_event(record);
+    request.set_post_data(post_data);
+    request.set_send_length(post_data.length());
+    if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
+    const auto rc = RGWHTTP::process(&request, y);
+    if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+    // TODO: use read_bl to process return code and handle according to ack level
+    return rc;
+  }
+
   std::string to_str() const override {
-    std::string str("HTTP Endpoint");
+    std::string str("HTTP/S Endpoint");
     str += "\nURI: " + endpoint;
     str += "\nAck Level: " + str_ack_level;
     str += (verify_ssl ? "\nverify SSL" : "\ndon't verify SSL");
@@ -133,42 +151,43 @@ public:
 
 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
 class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
-  private:
-    enum ack_level_t {
-      ACK_LEVEL_NONE,
-      ACK_LEVEL_BROKER,
-      ACK_LEVEL_ROUTEABLE
-    };
-    const std::string endpoint;
-    const std::string topic;
-    amqp::connection_ptr_t conn;
-    ack_level_t ack_level;
-    std::string str_ack_level;
-
-    static std::string get_exchange(const RGWHTTPArgs& args) {
-      bool exists;
-      const auto exchange = args.get("amqp-exchange", &exists);
-      if (!exists) {
-        throw configuration_error("AMQP: missing amqp-exchange");
-      }
-      return exchange;
+private:
+  enum ack_level_t {
+    ACK_LEVEL_NONE,
+    ACK_LEVEL_BROKER,
+    ACK_LEVEL_ROUTEABLE
+  };
+  CephContext* const cct;
+  const std::string endpoint;
+  const std::string topic;
+  const std::string exchange;
+  amqp::connection_ptr_t conn;
+  ack_level_t ack_level;
+  std::string str_ack_level;
+
+  static std::string get_exchange(const RGWHTTPArgs& args) {
+    bool exists;
+    const auto exchange = args.get("amqp-exchange", &exists);
+    if (!exists) {
+      throw configuration_error("AMQP: missing amqp-exchange");
     }
+    return exchange;
+  }
 
   // NoAckPublishCR implements async amqp publishing via coroutine
   // This coroutine ends when it send the message and does not wait for an ack
   class NoAckPublishCR : public RGWCoroutine {
   private:
-    RGWDataSyncEnv* const sync_env;
     const std::string topic;
     amqp::connection_ptr_t conn;
     const std::string message;
 
   public:
-    NoAckPublishCR(RGWDataSyncEnv* _sync_env,
+    NoAckPublishCR(CephContext* cct,
               const std::string& _topic,
               amqp::connection_ptr_t& _conn,
               const std::string& _message) :
-      RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+      RGWCoroutine(cct),
       topic(_topic), conn(_conn), message(_message) {}
 
     // send message to endpoint, without waiting for reply
@@ -189,19 +208,18 @@ class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
   // note that it does not wait for an ack fron the end client
   class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
   private:
-    RGWDataSyncEnv* const sync_env;
     const std::string topic;
     amqp::connection_ptr_t conn;
     const std::string message;
     const ack_level_t ack_level; // TODO not used for now
 
   public:
-    AckPublishCR(RGWDataSyncEnv* _sync_env,
+    AckPublishCR(CephContext* cct,
               const std::string& _topic,
               amqp::connection_ptr_t& _conn,
               const std::string& _message,
               ack_level_t _ack_level) :
-      RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+      RGWCoroutine(cct),
       topic(_topic), conn(_conn), message(_message), ack_level(_ack_level) {}
 
     // send message to endpoint, waiting for reply
@@ -247,86 +265,200 @@ class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
     }
   };
 
+public:
+  RGWPubSubAMQPEndpoint(const std::string& _endpoint,
+      const std::string& _topic,
+      const RGWHTTPArgs& args,
+      CephContext* _cct) : 
+        cct(_cct),
+        endpoint(_endpoint), 
+        topic(_topic),
+        exchange(get_exchange(args)),
+        conn(amqp::connect(endpoint, exchange)) {
+    if (!conn) { 
+      throw configuration_error("AMQP: failed to create connection to: " + endpoint);
+    }
+    bool exists;
+    // get ack level
+    str_ack_level = args.get("amqp-ack-level", &exists);
+    if (!exists || str_ack_level == "broker") {
+      // "broker" is default
+      ack_level = ACK_LEVEL_BROKER;
+    } else if (str_ack_level == "none") {
+      ack_level = ACK_LEVEL_NONE;
+    } else if (str_ack_level == "routable") {
+      ack_level = ACK_LEVEL_ROUTEABLE;
+    } else {
+      throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
+    }
+  }
+
+  RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
+    ceph_assert(conn);
+    if (ack_level == ACK_LEVEL_NONE) {
+      return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
+    } else {
+      // TODO: currently broker and routable are the same - this will require different flags
+      // but the same mechanism
+      return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event), ack_level);
+    }
+  }
+  
+  RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
+    ceph_assert(conn);
+    if (ack_level == ACK_LEVEL_NONE) {
+      return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
+    } else {
+      // TODO: currently broker and routable are the same - this will require different flags
+      // but the same mechanism
+      return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record), ack_level);
+    }
+  }
+
+  // this allows waiting untill "finish()" is called from a different thread
+  // waiting could be blocking the waiting thread or yielding, depending
+  // with compilation flag support and whether the optional_yield is set
+  class Waiter {
+    using Signature = void(boost::system::error_code);
+    using Completion = ceph::async::Completion<Signature>;
+    std::unique_ptr<Completion> completion = nullptr;
+    int ret;
+
+    mutable std::atomic<bool> done = false;
+    mutable std::mutex lock;
+    mutable std::condition_variable cond;
+
+    template <typename ExecutionContext, typename CompletionToken>
+    auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
+      boost::asio::async_completion<CompletionToken, Signature> init(token);
+      auto& handler = init.completion_handler;
+      {
+        std::unique_lock l{lock};
+        completion = Completion::create(ctx.get_executor(), std::move(handler));
+      }
+      return init.result.get();
+    }
+
   public:
-    RGWPubSubAMQPEndpoint(const std::string& _endpoint,
-        const std::string& _topic,
-        const RGWHTTPArgs& args) : 
-          endpoint(_endpoint), 
-          topic(_topic), 
-          conn(amqp::connect(endpoint, get_exchange(args))) {
-      bool exists;
-      // get ack level
-      str_ack_level = args.get("amqp-ack-level", &exists);
-      if (!exists || str_ack_level == "broker") {
-        // "broker" is default
-        ack_level = ACK_LEVEL_BROKER;
-      } else if (str_ack_level == "none") {
-        ack_level = ACK_LEVEL_NONE;
-      } else if (str_ack_level == "routable") {
-        ack_level = ACK_LEVEL_ROUTEABLE;
-      } else {
-        throw configuration_error("HTTP: invalid amqp-ack-level " + str_ack_level);
+    int wait(optional_yield y) {
+      if (done) {
+        return ret;
+      }
+#ifdef HAVE_BOOST_CONTEXT
+      if (y) {
+        auto& io_ctx = y.get_io_context();
+        auto& yield_ctx = y.get_yield_context();
+        boost::system::error_code ec;
+        async_wait(io_ctx, yield_ctx[ec]);
+        return -ec.value();
       }
+#endif
+      std::unique_lock l(lock);
+      cond.wait(l, [this]{return (done==true);});
+      return ret;
     }
 
-    RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
-      if (ack_level == ACK_LEVEL_NONE) {
-        return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(event));
+    void finish(int r) {
+      std::unique_lock l{lock};
+      ret = r;
+      done = true;
+      if (completion) {
+        boost::system::error_code ec(-ret, boost::system::system_category());
+        Completion::post(std::move(completion), ec);
       } else {
-        // TODO: currently broker and routable are the same - this will require different flags
-        // but the same mechanism
-        return new AckPublishCR(env, topic, conn, json_format_pubsub_event(event), ack_level);
+        cond.notify_all();
       }
     }
+  };
 
-    std::string to_str() const override {
-      std::string str("AMQP(0.9.1) Endpoint");
-      str += "\nURI: " + endpoint;
-      str += "\nTopic: " + topic;
-      str += "\nAck Level: " + str_ack_level;
-      return str;
+  int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
+    ceph_assert(conn);
+    if (ack_level == ACK_LEVEL_NONE) {
+      return amqp::publish(conn, topic, json_format_pubsub_event(record));
+    } else {
+      // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
+      // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
+      auto w = std::unique_ptr<Waiter>(new Waiter);
+      const auto rc = amqp::publish_with_confirm(conn, 
+        topic,
+        json_format_pubsub_event(record),
+        std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
+      if (rc < 0) {
+        // failed to publish, does not wait for reply
+        return rc;
+      }
+      return w->wait(y);
     }
+  }
+
+  std::string to_str() const override {
+    std::string str("AMQP(0.9.1) Endpoint");
+    str += "\nURI: " + endpoint;
+    str += "\nTopic: " + topic;
+    str += "\nExchange: " + exchange;
+    str += "\nAck Level: " + str_ack_level;
+    return str;
+  }
 };
 
 static const std::string AMQP_0_9_1("0-9-1");
 static const std::string AMQP_1_0("1-0");
+static const std::string AMQP_SCHEMA("amqp");
 #endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
 
-RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint, 
-    const std::string& topic, 
-    const RGWHTTPArgs& args) {
-  //fetch the schema from the endpoint
+static const std::string WEBHOOK_SCHEMA("webhook");
+static const std::string UNKNOWN_SCHEMA("unknown");
+static const std::string NO_SCHEMA("");
+
+const std::string& get_schema(const std::string& endpoint) {
+  if (endpoint.empty()) {
+    return NO_SCHEMA; 
+  }
   const auto pos = endpoint.find(':');
   if (pos == std::string::npos) {
-    throw configuration_error("malformed endpoint " + endpoint);
-    return nullptr;
+    return UNKNOWN_SCHEMA;
   }
   const auto& schema = endpoint.substr(0,pos);
   if (schema == "http" || schema == "https") {
-    return Ptr(new RGWPubSubHTTPEndpoint(endpoint, args));
+    return WEBHOOK_SCHEMA;
 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
   } else if (schema == "amqp") {
+    return AMQP_SCHEMA;
+#endif
+  }
+  return UNKNOWN_SCHEMA;
+}
+
+RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint, 
+    const std::string& topic, 
+    const RGWHTTPArgs& args,
+    CephContext* cct) {
+  const auto& schema = get_schema(endpoint);
+  if (schema == WEBHOOK_SCHEMA) {
+    return Ptr(new RGWPubSubHTTPEndpoint(endpoint, args));
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+  } else if (schema == AMQP_SCHEMA) {
     bool exists;
     std::string version = args.get("amqp-version", &exists);
     if (!exists) {
       version = AMQP_0_9_1;
     }
     if (version == AMQP_0_9_1) {
-      return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args));
+      return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct));
     } else if (version == AMQP_1_0) {
-      throw configuration_error("amqp v1.0 not supported");
+      throw configuration_error("AMQP: v1.0 not supported");
       return nullptr;
     } else {
-      throw configuration_error("unknown amqp version " + version);
+      throw configuration_error("AMQP: unknown version: " + version);
       return nullptr;
     }
   } else if (schema == "amqps") {
-    throw configuration_error("amqps not supported");
+    throw configuration_error("AMQP: ssl not supported");
     return nullptr;
 #endif
   }
 
-  throw configuration_error("unknown schema " + schema);
+  throw configuration_error("unknown schema in: " + endpoint);
   return nullptr;
 }