std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef conn, MessageRef m) override;
void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
- void ms_handle_connect(crimson::net::ConnectionRef conn) override;
- void ms_handle_accept(crimson::net::ConnectionRef conn) override;
+ void ms_handle_connect(crimson::net::ConnectionRef conn, seastar::shard_id) override;
+ void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id, bool is_replace) override;
void print(std::ostream&) const;
private:
void connected() {
set_connected();
}
- void accepted(crimson::net::ConnectionRef);
- void replaced();
- void reset();
+ bool accepted(crimson::net::ConnectionRef, bool is_replace);
+ void reset(bool is_replace=false);
seastar::future<> send(MessageURef msg);
void validate();
// retry connection if still pending
private:
void set_connected();
+ void set_unconnected();
void connect();
const osd_id_t peer;
crimson::net::ConnectionRef conn;
bool is_connected = false;
- friend std::ostream& operator<<(std::ostream& os, const Connection& c) {
- if (c.type == type_t::front) {
- return os << "con_front(osd." << c.peer << ")";
- } else {
- return os << "con_back(osd." << c.peer << ")";
- }
- }
+ friend std::ostream& operator<<(std::ostream& os, const Connection& c) {
+ if (c.type == type_t::front) {
+ return os << "con_front(osd." << c.peer << ")";
+ } else {
+ return os << "con_back(osd." << c.peer << ")";
+ }
+ }
};
-#if FMT_VERSION >= 90000
-template <> struct fmt::formatter<Heartbeat::Connection> : fmt::ostream_formatter {};
-#endif
-
/*
* Track the ping history and ping reply (the pong) from the same session, clean up
* history once hb_front or hb_back loses connection and restart the session once
void set_epoch_added(epoch_t epoch_) { epoch = epoch_; }
epoch_t get_epoch_added() const { return epoch; }
- void set_last_epoch_sent(epoch_t epoch_) { last_sent_epoch = epoch_; }
- epoch_t get_last_epoch_sent() const { return last_sent_epoch; }
+ void set_projected_epoch(epoch_t epoch_) { projected_epoch = epoch_; }
+ epoch_t get_projected_epoch() const { return projected_epoch; }
bool is_started() const { return connected; }
bool pinged() const {
clock::time_point last_rx_back;
// most recent epoch we wanted this peer
epoch_t epoch; // rename me to epoch_added
- // last epoch sent
- epoch_t last_sent_epoch = 0;
+ // epoch we expect peer to be at once our sent incrementals are processed
+ epoch_t projected_epoch = 0;
struct reply_t {
clock::time_point deadline;
void set_epoch_added(epoch_t epoch) { session.set_epoch_added(epoch); }
epoch_t get_epoch_added() const { return session.get_epoch_added(); }
- void set_last_epoch_sent(epoch_t epoch) { session.set_last_epoch_sent(epoch); }
- epoch_t get_last_epoch_sent() const { return session.get_last_epoch_sent(); }
+ void set_projected_epoch(epoch_t epoch) { session.set_projected_epoch(epoch); }
+ epoch_t get_projected_epoch() const { return session.get_projected_epoch(); }
// if failure, return time_point since last active
// else, return clock::zero()
void send_heartbeat(
clock::time_point, ceph::signedspan, std::vector<seastar::future<>>&);
seastar::future<> handle_reply(crimson::net::ConnectionRef, Ref<MOSDPing>);
- void handle_reset(crimson::net::ConnectionRef conn, bool is_replace) {
- for_each_conn([&] (auto& _conn) {
- if (_conn.matches(conn)) {
- if (is_replace) {
- _conn.replaced();
- } else {
- _conn.reset();
- }
- }
- });
- }
- void handle_connect(crimson::net::ConnectionRef conn) {
- for_each_conn([&] (auto& _conn) {
- if (_conn.matches(conn)) {
- _conn.connected();
- }
- });
- }
- void handle_accept(crimson::net::ConnectionRef conn) {
- for_each_conn([&] (auto& _conn) {
- _conn.accepted(conn);
- });
- }
+
+ void handle_reset(crimson::net::ConnectionRef conn, bool is_replace);
+
+ void handle_connect(crimson::net::ConnectionRef conn);
+
+ void handle_accept(crimson::net::ConnectionRef conn, bool is_replace);
private:
entity_addr_t get_peer_addr(type_t type) override;
bool pending_send = false;
Connection con_front;
Connection con_back;
+
+ friend std::ostream& operator<<(std::ostream& os, const Peer& p) {
+ return os << "peer(osd." << p.peer << ")";
+ }
};
#if FMT_VERSION >= 90000
template <> struct fmt::formatter<Heartbeat> : fmt::ostream_formatter {};
+template <> struct fmt::formatter<Heartbeat::Connection> : fmt::ostream_formatter {};
+template <> struct fmt::formatter<Heartbeat::Peer> : fmt::ostream_formatter {};
#endif