SocketConnection::SocketConnection(SocketMessenger& messenger,
ChainedDispatchers& dispatchers)
- : core(messenger.shard_id()),
- messenger(messenger)
+ : msgr_sid{messenger.get_shard_id()}, messenger(messenger)
{
auto ret = create_handlers(dispatchers, *this);
io_handler = std::move(ret.io_handler);
#ifdef UNIT_TESTS_BUILT
if (messenger.interceptor) {
interceptor = messenger.interceptor;
- interceptor->register_conn(*this);
+ interceptor->register_conn(this->get_local_shared_foreign_from_this());
}
#endif
}
bool SocketConnection::is_connected() const
{
- assert(seastar::this_shard_id() == shard_id());
return io_handler->is_connected();
}
#ifdef UNIT_TESTS_BUILT
-bool SocketConnection::is_closed() const
+bool SocketConnection::is_protocol_ready() const
{
- assert(seastar::this_shard_id() == shard_id());
+ assert(seastar::this_shard_id() == msgr_sid);
+ return protocol->is_ready();
+}
+
+bool SocketConnection::is_protocol_standby() const {
+ assert(seastar::this_shard_id() == msgr_sid);
+ return protocol->is_standby();
+}
+
+bool SocketConnection::is_protocol_closed() const
+{
+ assert(seastar::this_shard_id() == msgr_sid);
return protocol->is_closed();
}
-bool SocketConnection::is_closed_clean() const
+bool SocketConnection::is_protocol_closed_clean() const
{
- assert(seastar::this_shard_id() == shard_id());
+ assert(seastar::this_shard_id() == msgr_sid);
return protocol->is_closed_clean();
}
#endif
bool SocketConnection::peer_wins() const
{
+ assert(seastar::this_shard_id() == msgr_sid);
return (messenger.get_myaddr() > peer_addr || policy.server);
}
-seastar::future<> SocketConnection::send(MessageURef msg)
+seastar::future<> SocketConnection::send(MessageURef _msg)
{
- return seastar::smp::submit_to(
- shard_id(),
- [this, msg=std::move(msg)]() mutable {
- return io_handler->send(std::move(msg));
- });
+ // may be invoked from any core
+ MessageFRef msg = seastar::make_foreign(std::move(_msg));
+ return io_handler->send(std::move(msg));
}
seastar::future<> SocketConnection::send_keepalive()
{
- return seastar::smp::submit_to(
- shard_id(),
- [this] {
- return io_handler->send_keepalive();
- });
+ // may be invoked from any core
+ return io_handler->send_keepalive();
}
SocketConnection::clock_t::time_point
void SocketConnection::mark_down()
{
- assert(seastar::this_shard_id() == shard_id());
io_handler->mark_down();
}
SocketConnection::start_connect(const entity_addr_t& _peer_addr,
const entity_name_t& _peer_name)
{
+ assert(seastar::this_shard_id() == msgr_sid);
protocol->start_connect(_peer_addr, _peer_name);
}
void
-SocketConnection::start_accept(SocketRef&& sock,
+SocketConnection::start_accept(SocketFRef&& sock,
const entity_addr_t& _peer_addr)
{
+ assert(seastar::this_shard_id() == msgr_sid);
protocol->start_accept(std::move(sock), _peer_addr);
}
seastar::future<>
SocketConnection::close_clean_yielded()
{
+ assert(seastar::this_shard_id() == msgr_sid);
return protocol->close_clean_yielded();
}
-seastar::shard_id SocketConnection::shard_id() const {
- return core;
-}
-
seastar::socket_address SocketConnection::get_local_address() const {
+ assert(seastar::this_shard_id() == msgr_sid);
return socket->get_local_address();
}
ConnectionRef
SocketConnection::get_local_shared_foreign_from_this()
{
- assert(seastar::this_shard_id() == shard_id());
+ assert(seastar::this_shard_id() == msgr_sid);
return make_local_shared_foreign(
seastar::make_foreign(shared_from_this()));
}
+SocketMessenger &
+SocketConnection::get_messenger() const
+{
+ assert(seastar::this_shard_id() == msgr_sid);
+ return messenger;
+}
+
+seastar::shard_id
+SocketConnection::get_messenger_shard_id() const
+{
+ return msgr_sid;
+}
+
+void SocketConnection::set_peer_type(entity_type_t peer_type) {
+ assert(seastar::this_shard_id() == msgr_sid);
+ // it is not allowed to assign an unknown value when the current
+ // value is known
+ assert(!(peer_type == 0 &&
+ peer_name.type() != 0));
+ // it is not allowed to assign a different known value when the
+ // current value is also known.
+ assert(!(peer_type != 0 &&
+ peer_name.type() != 0 &&
+ peer_type != peer_name.type()));
+ peer_name._type = peer_type;
+}
+
+void SocketConnection::set_peer_id(int64_t peer_id) {
+ assert(seastar::this_shard_id() == msgr_sid);
+ // it is not allowed to assign an unknown value when the current
+ // value is known
+ assert(!(peer_id == entity_name_t::NEW &&
+ peer_name.num() != entity_name_t::NEW));
+ // it is not allowed to assign a different known value when the
+ // current value is also known.
+ assert(!(peer_id != entity_name_t::NEW &&
+ peer_name.num() != entity_name_t::NEW &&
+ peer_id != peer_name.num()));
+ peer_name._num = peer_id;
+}
+
+void SocketConnection::set_features(uint64_t f) {
+ assert(seastar::this_shard_id() == msgr_sid);
+ features = f;
+}
+
+void SocketConnection::set_socket(Socket *s) {
+ assert(seastar::this_shard_id() == msgr_sid);
+ socket = s;
+}
+
void SocketConnection::print(ostream& out) const {
- out << (void*)this << " ";
- messenger.print(out);
- if (!socket) {
- out << " >> " << get_peer_name() << " " << peer_addr;
- } else if (socket->get_side() == Socket::side_t::acceptor) {
- out << " >> " << get_peer_name() << " " << peer_addr
- << "@" << socket->get_ephemeral_port();
- } else { // socket->get_side() == Socket::side_t::connector
- out << "@" << socket->get_ephemeral_port()
- << " >> " << get_peer_name() << " " << peer_addr;
- }
+ out << (void*)this << " ";
+ messenger.print(out);
+ if (seastar::this_shard_id() != msgr_sid) {
+ out << " >> " << get_peer_name() << " " << peer_addr;
+ } else if (!socket) {
+ out << " >> " << get_peer_name() << " " << peer_addr;
+ } else if (socket->get_side() == Socket::side_t::acceptor) {
+ out << " >> " << get_peer_name() << " " << peer_addr
+ << "@" << socket->get_ephemeral_port();
+ } else { // socket->get_side() == Socket::side_t::connector
+ out << "@" << socket->get_ephemeral_port()
+ << " >> " << get_peer_name() << " " << peer_addr;
+ }
}
} // namespace crimson::net