#include <boost/range/numeric.hpp>
#include <boost/range/adaptor/transformed.hpp>
#include <seastar/net/packet-data-source.hh>
+#include <seastar/core/print.hh>
namespace seastar {
}
};
+template <typename Serializer, typename Output, typename... T>
+inline void do_marshall(Serializer& serializer, Output& out, const T&... args);
+
template <typename Serializer, typename Output>
struct marshall_one {
template <typename T> struct helper {
put_connection_id(arg.get_id(), out);
}
};
+ template <typename... T> struct helper<tuple<T...>> {
+ static void doit(Serializer& serializer, Output& out, const tuple<T...>& arg) {
+ auto do_do_marshall = [&serializer, &out] (const auto&... args) {
+ do_marshall(serializer, out, args...);
+ };
+ apply(do_do_marshall, arg);
+ }
+ };
};
template <typename Serializer, typename Output, typename... T>
return source<T...>(make_shared<source_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
}
};
+ template <typename... T> struct helper<tuple<T...>> {
+ static tuple<T...> doit(connection& c, Input& in) {
+ return do_unmarshall<Serializer, Input, T...>(c, in);
+ }
+ };
};
template <typename Serializer, typename Input, typename T0, typename... Trest>
return make_ready_future<>();
}
+// Convert a relative timeout (a duration) to an absolute one (time_point).
+// Do the calculation safely so that a very large duration will be capped by
+// time_point::max, instead of wrapping around to ancient history.
+inline rpc_clock_type::time_point
+relative_timeout_to_absolute(rpc_clock_type::duration relative) {
+ rpc_clock_type::time_point now = rpc_clock_type::now();
+ return now + std::min(relative, rpc_clock_type::time_point::max() - now);
+}
+
// Returns lambda that can be used to send rpc messages.
// The lambda gets client connection and rpc parameters as arguments, marshalls them sends
// to a server and waits for a reply. After receiving reply it unmarshalls it and signal completion
return send(dst, timeout, nullptr, args...);
}
auto operator()(rpc::client& dst, rpc_clock_type::duration timeout, const InArgs&... args) {
- return send(dst, rpc_clock_type::now() + timeout, nullptr, args...);
+ return send(dst, relative_timeout_to_absolute(timeout), nullptr, args...);
}
auto operator()(rpc::client& dst, cancellable& cancel, const InArgs&... args) {
return send(dst, {}, &cancel, args...);
if (memory_consumed > client->max_request_size()) {
auto err = format("request size {:d} large than memory limit {:d}", memory_consumed, client->max_request_size());
client->get_logger()(client->peer_address(), err);
- with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, err = std::move(err)] {
+ // FIXME: future is discarded
+ (void)with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, err = std::move(err)] {
return reply<Serializer>(wait_style(), futurize<Ret>::make_exception_future(std::runtime_error(err.c_str())), msg_id, client, timeout);
});
return make_ready_future();
// note: apply is executed asynchronously with regards to networking so we cannot chain futures here by doing "return apply()"
auto f = client->wait_for_resources(memory_consumed, timeout).then([client, timeout, msg_id, data = std::move(data), &func] (auto permit) mutable {
try {
- with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, data = std::move(data), permit = std::move(permit), &func] () mutable {
- auto args = unmarshall<Serializer, InArgs...>(*client, std::move(data));
- return apply(func, client->info(), timeout, WantClientInfo(), WantTimePoint(), signature(), std::move(args)).then_wrapped([client, timeout, msg_id, permit = std::move(permit)] (futurize_t<Ret> ret) mutable {
- return reply<Serializer>(wait_style(), std::move(ret), msg_id, client, timeout).then([permit = std::move(permit)] {});
- });
+ // FIXME: future is discarded
+ (void)with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, data = std::move(data), permit = std::move(permit), &func] () mutable {
+ try {
+ auto args = unmarshall<Serializer, InArgs...>(*client, std::move(data));
+ return apply(func, client->info(), timeout, WantClientInfo(), WantTimePoint(), signature(), std::move(args)).then_wrapped([client, timeout, msg_id, permit = std::move(permit)] (futurize_t<Ret> ret) mutable {
+ return reply<Serializer>(wait_style(), std::move(ret), msg_id, client, timeout).handle_exception([permit = std::move(permit), client, msg_id] (std::exception_ptr eptr) {
+ client->get_logger()(client->info(), msg_id, format("got exception while processing a message: {}", eptr));
+ });
+ });
+ } catch (...) {
+ client->get_logger()(client->info(), msg_id, format("got exception while processing a message: {}", std::current_exception()));
+ return make_ready_future();
+ }
});
} catch (gate_closed_exception&) {/* ignore */ }
});
f = f.handle_exception_type([] (semaphore_timed_out&) { /* ignore */ });
}
- return std::move(f);
+ return f;
};
}
return register_handler(t, scheduling_group(), std::forward<Func>(func));
}
+template<typename Serializer, typename MsgType>
+future<> protocol<Serializer, MsgType>::unregister_handler(MsgType t) {
+ auto it = _handlers.find(t);
+ if (it != _handlers.end()) {
+ return it->second.use_gate.close().finally([this, t] {
+ _handlers.erase(t);
+ });
+ }
+ return make_ready_future<>();
+}
+
+template<typename Serializer, typename MsgType>
+bool protocol<Serializer, MsgType>::has_handler(uint64_t msg_id) {
+ auto it = _handlers.find(MsgType(msg_id));
+ if (it == _handlers.end()) {
+ return false;
+ }
+ return !it->second.use_gate.is_closed();
+}
+
+template<typename Serializer, typename MsgType>
+rpc_handler* protocol<Serializer, MsgType>::get_handler(uint64_t msg_id) {
+ rpc_handler* h = nullptr;
+ auto it = _handlers.find(MsgType(msg_id));
+ if (it != _handlers.end()) {
+ try {
+ it->second.use_gate.enter();
+ h = &it->second;
+ } catch (gate_closed_exception&) {
+ // unregistered, just ignore
+ }
+ }
+ return h;
+}
+
+template<typename Serializer, typename MsgType>
+void protocol<Serializer, MsgType>::put_handler(rpc_handler* h) {
+ h->use_gate.leave();
+}
+
template<typename T> T make_shard_local_buffer_copy(foreign_ptr<std::unique_ptr<T>> org);
template<typename Serializer, typename... Out>
if (this->_ex) {
return make_exception_future(this->_ex);
}
- smp::submit_to(this->_con->get_owner_shard(), [this, data = std::move(data)] () mutable {
+ // FIXME: future is discarded
+ (void)smp::submit_to(this->_con->get_owner_shard(), [this, data = std::move(data)] () mutable {
connection* con = this->_con->get();
if (con->error()) {
return make_exception_future(closed_error());
});
}
+template<typename Serializer, typename... Out>
+future<> sink_impl<Serializer, Out...>::flush() {
+ // wait until everything is sent out before returning.
+ return with_semaphore(this->_sem, max_stream_buffers_memory, [this] {
+ if (this->_ex) {
+ return make_exception_future(this->_ex);
+ }
+ return make_ready_future();
+ });
+}
+
template<typename Serializer, typename... Out>
future<> sink_impl<Serializer, Out...>::close() {
return with_semaphore(this->_sem, max_stream_buffers_memory, [this] {
// refill buffers from remote cpu
return smp::submit_to(this->_con->get_owner_shard(), [this] () -> future<> {
connection* con = this->_con->get();
+ if (con->_source_closed) {
+ return make_exception_future<>(stream_closed());
+ }
return con->stream_receive(this->_bufs).then_wrapped([this, con] (future<>&& f) {
if (f.failed()) {
return con->close_source().then_wrapped([ex = f.get_exception()] (future<> f){