]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_http_client.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rgw / rgw_http_client.h
index 04ffb80c90e6afd13ba4989d13a014c73c62860d..04d8506e3903d7aa0432048a883550abfe7bb465 100644 (file)
 using param_pair_t = pair<string, string>;
 using param_vec_t = vector<param_pair_t>;
 
+void rgw_http_client_init(CephContext *cct);
+void rgw_http_client_cleanup();
+
 struct rgw_http_req_data;
+class RGWHTTPManager;
+
+class RGWIOIDProvider
+{
+  std::atomic<int64_t> max = {0};
+
+public:
+  RGWIOIDProvider() {}
+  int64_t get_next() {
+    return ++max;
+  }
+};
+
+struct rgw_io_id {
+  int64_t id{0};
+  int channels{0};
+
+  rgw_io_id() {}
+  rgw_io_id(int64_t _id, int _channels) : id(_id), channels(_channels) {}
+
+  bool intersects(const rgw_io_id& rhs) {
+    return (id == rhs.id && ((channels | rhs.channels) != 0));
+  }
+
+  bool operator<(const rgw_io_id& rhs) const {
+    if (id < rhs.id) {
+      return true;
+    }
+    return (id == rhs.id &&
+            channels < rhs.channels);
+  }
+};
+
+class RGWIOProvider
+{
+  int64_t id{-1};
 
-class RGWHTTPClient
+public:
+  RGWIOProvider() {}
+  virtual ~RGWIOProvider() = default; 
+
+  void assign_io(RGWIOIDProvider& io_id_provider, int io_type = -1);
+  rgw_io_id get_io_id(int io_type) {
+    return rgw_io_id{id, io_type};
+  }
+
+  virtual void set_io_user_info(void *_user_info) = 0;
+  virtual void *get_io_user_info() = 0;
+};
+
+class RGWHTTPClient : public RGWIOProvider
 {
   friend class RGWHTTPManager;
 
   bufferlist send_bl;
   bufferlist::iterator send_iter;
-  size_t send_len;
   bool has_send_len;
   long http_status;
+  bool send_data_hint{false};
+  size_t receive_pause_skip{0}; /* how many bytes to skip next time receive_data is called
+                                   due to being paused */
 
-  rgw_http_req_data *req_data;
+  void *user_info{nullptr};
 
-  void *user_info;
+  rgw_http_req_data *req_data;
 
-  string last_method;
-  string last_url;
   bool verify_ssl; // Do not validate self signed certificates, default to false
 
   std::atomic<unsigned> stopped { 0 };
 
+
 protected:
   CephContext *cct;
+
+  string method;
+  string url;
+
+  size_t send_len{0};
+
   param_vec_t headers;
 
-  int init_request(const char *method,
-                   const char *url,
-                   rgw_http_req_data *req_data,
-                   bool send_data_hint = false);
+  RGWHTTPManager *get_manager();
+
+  int init_request(rgw_http_req_data *req_data);
 
   virtual int receive_header(void *ptr, size_t len) {
     return 0;
   }
-  virtual int receive_data(void *ptr, size_t len) {
+  virtual int receive_data(void *ptr, size_t len, bool *pause) {
     return 0;
   }
-  virtual int send_data(void *ptr, size_t len) {
+
+  virtual int send_data(void *ptr, size_t len, bool *pause=nullptr) {
     return 0;
   }
 
   /* Callbacks for libcurl. */
-  static size_t simple_receive_http_header(void *ptr,
-                                           size_t size,
-                                           size_t nmemb,
-                                           void *_info);
   static size_t receive_http_header(void *ptr,
                                     size_t size,
                                     size_t nmemb,
                                     void *_info);
 
-  static size_t simple_receive_http_data(void *ptr,
-                                         size_t size,
-                                         size_t nmemb,
-                                         void *_info);
   static size_t receive_http_data(void *ptr,
                                   size_t size,
                                   size_t nmemb,
                                   void *_info);
 
-  static size_t simple_send_http_data(void *ptr,
-                                      size_t size,
-                                      size_t nmemb,
-                                      void *_info);
   static size_t send_http_data(void *ptr,
                                size_t size,
                                size_t nmemb,
                                void *_info);
+
+  Mutex& get_req_lock();
+
+  /* needs to be called under req_lock() */
+  void _set_write_paused(bool pause);
+  void _set_read_paused(bool pause);
 public:
   static const long HTTP_STATUS_NOSTATUS     = 0;
   static const long HTTP_STATUS_UNAUTHORIZED = 401;
   static const long HTTP_STATUS_NOTFOUND     = 404;
 
+  static constexpr int HTTPCLIENT_IO_READ    = 0x1;
+  static constexpr int HTTPCLIENT_IO_WRITE   = 0x2;
+  static constexpr int HTTPCLIENT_IO_CONTROL = 0x4;
+
   virtual ~RGWHTTPClient();
-  explicit RGWHTTPClient(CephContext *cct)
-    : send_len(0),
-      has_send_len(false),
+  explicit RGWHTTPClient(CephContext *cct,
+                         const string& _method,
+                         const string& _url)
+    : has_send_len(false),
       http_status(HTTP_STATUS_NOSTATUS),
       req_data(nullptr),
-      user_info(nullptr),
       verify_ssl(cct->_conf->rgw_verify_ssl),
-      cct(cct) {
-  }
-
-  void set_user_info(void *info) {
-    user_info = info;
-  }
-
-  void *get_user_info() {
-    return user_info;
+      cct(cct),
+      method(_method),
+      url(_url) {
   }
 
   void append_header(const string& name, const string& val) {
@@ -115,6 +166,9 @@ public:
     has_send_len = true;
   }
 
+  void set_send_data_hint(bool hint) {
+    send_data_hint = hint;
+  }
 
   long get_http_status() const {
     return http_status;
@@ -124,15 +178,33 @@ public:
     verify_ssl = flag;
   }
 
-  int process(const char *method, const char *url);
-  int process(const char *url) { return process("GET", url); }
+  int process();
 
   int wait();
+  void cancel();
+  bool is_done();
+
   rgw_http_req_data *get_req_data() { return req_data; }
 
   string to_str();
 
   int get_req_retcode();
+
+  void set_url(const string& _url) {
+    url = _url;
+  }
+
+  void set_method(const string& _method) {
+    method = _method;
+  }
+
+  void set_io_user_info(void *_user_info) override {
+    user_info = _user_info;
+  }
+
+  void *get_io_user_info() override {
+    return user_info;
+  }
 };
 
 
@@ -143,8 +215,10 @@ public:
   typedef std::set<header_name_t, ltstr_nocase> header_spec_t;
 
   RGWHTTPHeadersCollector(CephContext * const cct,
-                          const header_spec_t relevant_headers)
-    : RGWHTTPClient(cct),
+                          const string& method,
+                          const string& url,
+                          const header_spec_t &relevant_headers)
+    : RGWHTTPClient(cct, method, url),
       relevant_headers(relevant_headers) {
   }
 
@@ -160,14 +234,6 @@ public:
 protected:
   int receive_header(void *ptr, size_t len) override;
 
-  int receive_data(void *ptr, size_t len) override {
-    return 0;
-  }
-
-  int send_data(void *ptr, size_t len) override {
-    return 0;
-  }
-
 private:
   const std::set<header_name_t, ltstr_nocase> relevant_headers;
   std::map<header_name_t, header_value_t, ltstr_nocase> found_headers;
@@ -181,18 +247,22 @@ class RGWHTTPTransceiver : public RGWHTTPHeadersCollector {
 
 public:
   RGWHTTPTransceiver(CephContext * const cct,
+                     const string& method,
+                     const string& url,
                      bufferlist * const read_bl,
                      const header_spec_t intercept_headers = {})
-    : RGWHTTPHeadersCollector(cct, intercept_headers),
+    : RGWHTTPHeadersCollector(cct, method, url, intercept_headers),
       read_bl(read_bl),
       post_data_index(0) {
   }
 
   RGWHTTPTransceiver(CephContext * const cct,
+                     const string& method,
+                     const string& url,
                      bufferlist * const read_bl,
                      const bool verify_ssl,
                      const header_spec_t intercept_headers = {})
-    : RGWHTTPHeadersCollector(cct, intercept_headers),
+    : RGWHTTPHeadersCollector(cct, method, url, intercept_headers),
       read_bl(read_bl),
       post_data_index(0) {
     set_verify_ssl(verify_ssl);
@@ -203,9 +273,9 @@ public:
   }
 
 protected:
-  int send_data(void* ptr, size_t len) override;
+  int send_data(void* ptr, size_t len, bool *pause=nullptr) override;
 
-  int receive_data(void *ptr, size_t len) override {
+  int receive_data(void *ptr, size_t len, bool *pause) override {
     read_bl->append((char *)ptr, len);
     return 0;
   }
@@ -216,17 +286,32 @@ typedef RGWHTTPTransceiver RGWPostHTTPData;
 
 class RGWCompletionManager;
 
+enum RGWHTTPRequestSetState {
+  SET_NOP = 0,
+  SET_WRITE_PAUSED = 1,
+  SET_WRITE_RESUME = 2,
+  SET_READ_PAUSED  = 3,
+  SET_READ_RESUME  = 4,
+};
+
 class RGWHTTPManager {
+  struct set_state {
+    rgw_http_req_data *req;
+    int bitmask;
+
+    set_state(rgw_http_req_data *_req, int _bitmask) : req(_req), bitmask(_bitmask) {}
+  };
   CephContext *cct;
   RGWCompletionManager *completion_mgr;
   void *multi_handle;
-  bool is_threaded;
+  bool is_started;
   std::atomic<unsigned> going_down { 0 };
   std::atomic<unsigned> is_stopped { 0 };
 
   RWLock reqs_lock;
   map<uint64_t, rgw_http_req_data *> reqs;
   list<rgw_http_req_data *> unregistered_reqs;
+  list<set_state> reqs_change_state;
   map<uint64_t, rgw_http_req_data *> complete_reqs;
   int64_t num_reqs;
   int64_t max_threaded_req;
@@ -235,11 +320,12 @@ class RGWHTTPManager {
   void register_request(rgw_http_req_data *req_data);
   void complete_request(rgw_http_req_data *req_data);
   void _complete_request(rgw_http_req_data *req_data);
-  void unregister_request(rgw_http_req_data *req_data);
+  bool unregister_request(rgw_http_req_data *req_data);
   void _unlink_request(rgw_http_req_data *req_data);
   void unlink_request(rgw_http_req_data *req_data);
   void finish_request(rgw_http_req_data *req_data, int r);
   void _finish_request(rgw_http_req_data *req_data, int r);
+  void _set_req_state(set_state& ss);
   int link_request(rgw_http_req_data *req_data);
 
   void manage_pending_requests();
@@ -248,7 +334,7 @@ class RGWHTTPManager {
     RGWHTTPManager *manager;
 
   public:
-    ReqsThread(RGWHTTPManager *_m) : manager(_m) {}
+    explicit ReqsThread(RGWHTTPManager *_m) : manager(_m) {}
     void *entry() override;
   };
 
@@ -262,17 +348,18 @@ public:
   RGWHTTPManager(CephContext *_cct, RGWCompletionManager *completion_mgr = NULL);
   ~RGWHTTPManager();
 
-  int set_threaded();
+  int start();
   void stop();
 
-  int add_request(RGWHTTPClient *client, const char *method, const char *url,
-                  bool send_data_hint = false);
+  int add_request(RGWHTTPClient *client);
   int remove_request(RGWHTTPClient *client);
-
-  /* only for non threaded case */
-  int process_requests(bool wait_for_data, bool *done);
-
-  int complete_requests();
+  int set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state);
 };
 
+class RGWHTTP
+{
+public:
+  static int send(RGWHTTPClient *req);
+  static int process(RGWHTTPClient *req);
+};
 #endif