#include "crimson/common/log.h"
#include "crimson/net/Errors.h"
+#include "crimson/net/chained_dispatchers.h"
#include "crimson/net/Socket.h"
#include "crimson/net/SocketConnection.h"
#include "msg/Message.h"
namespace crimson::net {
Protocol::Protocol(proto_t type,
- Dispatcher& dispatcher,
+ ChainedDispatchers& dispatchers,
SocketConnection& conn)
: proto_type(type),
- dispatcher(dispatcher),
+ dispatchers(dispatchers),
conn(conn),
auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()}
{}
Protocol::~Protocol()
{
- ceph_assert(pending_dispatch.is_closed());
+ ceph_assert(gate.is_closed());
assert(!exit_open);
}
-bool Protocol::is_connected() const
-{
- return write_state == write_state_t::open;
-}
-
-seastar::future<> Protocol::close()
+void Protocol::close(bool dispatch_reset,
+ std::optional<std::function<void()>> f_accept_new)
{
if (closed) {
// already closing
- assert(close_ready.valid());
- return close_ready.get_future();
+ return;
}
- // unregister_conn() drops a reference, so hold another until completion
- auto cleanup = [conn_ref = conn.shared_from_this(), this] {
- logger().debug("{} closed!", conn);
- };
+ bool is_replace = f_accept_new ? true : false;
+ logger().info("{} closing: reset {}, replace {}", conn,
+ dispatch_reset ? "yes" : "no",
+ is_replace ? "yes" : "no");
+ // atomic operations
+ closed = true;
trigger_close();
-
- // close_ready become valid only after state is state_t::closing
- assert(!close_ready.valid());
-
+ if (f_accept_new) {
+ (*f_accept_new)();
+ }
if (socket) {
socket->shutdown();
- close_ready = pending_dispatch.close().finally([this] {
- return socket->close();
- }).finally(std::move(cleanup));
- } else {
- close_ready = pending_dispatch.close().finally(std::move(cleanup));
}
-
- closed = true;
set_write_state(write_state_t::drop);
+ assert(!gate.is_closed());
+ auto gate_closed = gate.close();
+
+ if (dispatch_reset) {
+ dispatchers.ms_handle_reset(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
+ is_replace);
+ }
- return close_ready.get_future();
+ // asynchronous operations
+ assert(!close_ready.valid());
+ close_ready = std::move(gate_closed).then([this] {
+ if (socket) {
+ return socket->close();
+ } else {
+ return seastar::now();
+ }
+ }).then([this] {
+ logger().debug("{} closed!", conn);
+ on_closed();
+#ifdef UNIT_TESTS_BUILT
+ is_closed_clean = true;
+ if (conn.interceptor) {
+ conn.interceptor->register_conn_closed(conn);
+ }
+#endif
+ }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
+ logger().error("{} closing: close_ready got unexpected exception {}", conn, eptr);
+ ceph_abort();
+ });
}
seastar::future<> Protocol::send(MessageRef msg)
case write_state_t::open:
[[fallthrough]];
case write_state_t::delay:
- (void) seastar::with_gate(pending_dispatch, [this] {
- return do_write_dispatch_sweep(
- ).handle_exception([this] (std::exception_ptr eptr) {
- logger().error("{} do_write_dispatch_sweep(): unexpected exception {}",
- conn, eptr);
- ceph_abort();
- });
+ assert(!gate.is_closed());
+ gate.dispatch_in_background("do_write_dispatch_sweep", *this, [this] {
+ return do_write_dispatch_sweep();
});
return;
case write_state_t::drop: