#include <unordered_map>
#include <unordered_set>
#include <list>
+#include <variant>
+#include <boost/intrusive/list.hpp>
#include <seastar/core/future.hh>
#include <seastar/core/seastar.hh>
#include <seastar/net/api.hh>
#include <seastar/util/backtrace.hh>
#include <seastar/util/log.hh>
+namespace bi = boost::intrusive;
+
namespace seastar {
namespace rpc {
size_t max_memory = rpc_semaphore::max_counter(); ///< Maximum amount of memory that may be consumed by all requests
/// Configures isolation for a connection based on its isolation cookie. May throw,
/// in which case the connection will be terminated.
- std::function<isolation_config (sstring isolation_cookie)> isolate_connection = default_isolate_connection;
+ using syncronous_isolation_function = std::function<isolation_config (sstring isolation_cookie)>;
+ using asyncronous_isolation_function = std::function<future<isolation_config> (sstring isolation_cookie)>;
+ using isolation_function_alternatives = std::variant<syncronous_isolation_function, asyncronous_isolation_function>;
+ isolation_function_alternatives isolate_connection = default_isolate_connection;
};
struct client_options {
output_stream<char> _write_buf;
bool _error = false;
bool _connected = false;
+ std::optional<shared_promise<>> _negotiated = shared_promise<>();
promise<> _stopped;
stats _stats;
const logger& _logger;
// The owner of the pointer below is an instance of rpc::protocol<typename Serializer> class.
// The type of the pointer is erased here, but the original type is Serializer
void* _serializer;
- struct outgoing_entry {
+ struct outgoing_entry : public bi::list_base_hook<bi::link_mode<bi::auto_unlink>> {
timer<rpc_clock_type> t;
snd_buf buf;
- std::optional<promise<>> p = promise<>();
+ promise<> done;
cancellable* pcancel = nullptr;
outgoing_entry(snd_buf b) : buf(std::move(b)) {}
- outgoing_entry(outgoing_entry&& o) noexcept : t(std::move(o.t)), buf(std::move(o.buf)), p(std::move(o.p)), pcancel(o.pcancel) {
- o.p = std::nullopt;
+
+ outgoing_entry(outgoing_entry&&) = delete;
+ outgoing_entry(const outgoing_entry&) = delete;
+
+ void uncancellable() {
+ t.cancel();
+ if (pcancel) {
+ pcancel->cancel_send = std::function<void()>();
+ }
}
+
~outgoing_entry() {
- if (p) {
- if (pcancel) {
- pcancel->cancel_send = std::function<void()>();
- pcancel->send_back_pointer = nullptr;
- }
- p->set_value();
+ if (pcancel) {
+ pcancel->cancel_send = std::function<void()>();
+ pcancel->send_back_pointer = nullptr;
}
}
+
+ using container_t = bi::list<outgoing_entry, bi::constant_time_size<false>>;
};
- friend outgoing_entry;
- std::list<outgoing_entry> _outgoing_queue;
- condition_variable _outgoing_queue_cond;
- future<> _send_loop_stopped = make_ready_future<>();
+ void withdraw(outgoing_entry::container_t::iterator it, std::exception_ptr ex = nullptr);
+ future<> _outgoing_queue_ready = _negotiated->get_shared_future();
+ outgoing_entry::container_t _outgoing_queue;
+ size_t _outgoing_queue_size = 0;
std::unique_ptr<compressor> _compressor;
+ bool _propagate_timeout = false;
bool _timeout_negotiated = false;
// stream related fields
bool _is_stream = false;
// if it is not ready it means the sink is been closed
future<bool> _sink_closed_future = make_ready_future<bool>(false);
- bool is_stream() {
+ size_t outgoing_queue_length() const noexcept {
+ return _outgoing_queue_size;
+ }
+
+ void set_negotiated() noexcept;
+
+ bool is_stream() const noexcept {
return _is_stream;
}
snd_buf compress(snd_buf buf);
future<> send_buffer(snd_buf buf);
- enum class outgoing_queue_type {
- request,
- response,
- stream = response
- };
-
- template<outgoing_queue_type QueueType> void send_loop();
- future<> stop_send_loop();
+ future<> send_entry(outgoing_entry& d);
+ future<> stop_send_loop(std::exception_ptr ex);
future<std::optional<rcv_buf>> read_stream_frame_compressed(input_stream<char>& in);
- bool stream_check_twoway_closed() {
+ bool stream_check_twoway_closed() const noexcept {
return _sink_closed && _source_closed;
}
future<> stream_close();
// functions below are public because they are used by external heavily templated functions
// and I am not smart enough to know how to define them as friends
future<> send(snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel = nullptr);
- bool error() { return _error; }
+ bool error() const noexcept { return _error; }
void abort();
future<> stop() noexcept;
future<> stream_receive(circular_buffer<foreign_ptr<std::unique_ptr<rcv_buf>>>& bufs);
}
return make_ready_future();
}
- bool sink_closed() {
+ bool sink_closed() const noexcept {
return _sink_closed;
}
future<> close_source() {
}
return make_ready_future();
}
- connection_id get_connection_id() const {
+ connection_id get_connection_id() const noexcept {
return _id;
}
xshard_connection_ptr get_stream(connection_id id) const;
void register_stream(connection_id id, xshard_connection_ptr c);
virtual socket_address peer_address() const = 0;
- const logger& get_logger() const {
+ const logger& get_logger() const noexcept {
return _logger;
}
friend class sink_impl;
template<typename Serializer, typename... In>
friend class source_impl;
+
+ void suspend_for_testing(promise<>& p) {
+ _outgoing_queue_ready.get();
+ auto dummy = std::make_unique<outgoing_entry>(snd_buf());
+ _outgoing_queue.push_back(*dummy);
+ _outgoing_queue_ready = dummy->done.get_future();
+ (void)p.get_future().then([dummy = std::move(dummy)] { dummy->done.set_value(); });
+ }
};
struct deferred_snd_buf {
std::unordered_map<id_type, std::unique_ptr<reply_handler_base>> _outstanding;
socket_address _server_addr, _local_addr;
client_options _options;
- std::optional<shared_promise<>> _client_negotiated = shared_promise<>();
weak_ptr<client> _parent; // for stream clients
private:
read_response_frame(input_stream<char>& in);
future<std::tuple<int64_t, std::optional<rcv_buf>>>
read_response_frame_compressed(input_stream<char>& in);
- void send_loop() {
- if (is_stream()) {
- rpc::connection::send_loop<rpc::connection::outgoing_queue_type::stream>();
- } else {
- rpc::connection::send_loop<rpc::connection::outgoing_queue_type::request>();
- }
- }
public:
/**
* Create client object which will attempt to connect to the remote address.
return _server_addr;
}
future<> await_connection() {
- if (!_client_negotiated) {
+ if (!_negotiated) {
return make_ready_future<>();
} else {
- return _client_negotiated->get_shared_future();
+ return _negotiated->get_shared_future();
}
}
template<typename Serializer, typename... Out>
xshard_connection_ptr s = make_lw_shared(make_foreign(static_pointer_cast<rpc::connection>(c)));
this->register_stream(c->get_connection_id(), s);
return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(std::move(s)));
+ }).handle_exception([c] (std::exception_ptr eptr) {
+ // If await_connection fails we need to stop the client
+ // before destroying it.
+ return c->stop().then([eptr, c] {
+ return make_exception_future<sink<Out...>>(eptr);
+ });
});
});
}
future<std::tuple<std::optional<uint64_t>, uint64_t, int64_t, std::optional<rcv_buf>>>
read_request_frame_compressed(input_stream<char>& in);
future<feature_map> negotiate(feature_map requested);
- void send_loop() {
- if (is_stream()) {
- rpc::connection::send_loop<rpc::connection::outgoing_queue_type::stream>();
- } else {
- rpc::connection::send_loop<rpc::connection::outgoing_queue_type::response>();
- }
- }
future<> send_unknown_verb_reply(std::optional<rpc_clock_type::time_point> timeout, int64_t msg_id, uint64_t type);
public:
connection(server& s, connected_socket&& fd, socket_address&& addr, const logger& l, void* seralizer, connection_id id);
const client_info& info() const { return _info; }
stats get_stats() const {
stats res = _stats;
- res.pending = _outgoing_queue.size();
+ res.pending = outgoing_queue_length();
return res;
}
rpc::server(&proto, addr, memory_limit) {}
server(protocol& proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
rpc::server(&proto, opts, addr, memory_limit) {}
- server(protocol& proto, server_socket socket, resource_limits memory_limit = resource_limits(), server_options opts = server_options{}) :
+ server(protocol& proto, server_socket socket, resource_limits memory_limit = resource_limits(), server_options = server_options{}) :
rpc::server(&proto, std::move(socket), memory_limit) {}
server(protocol& proto, server_options opts, server_socket socket, resource_limits memory_limit = resource_limits()) :
rpc::server(&proto, opts, std::move(socket), memory_limit) {}