#include "client.h"
+#include <seastar/core/sleep.hh>
+
#include "crimson/common/log.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Messenger.h"
WithStats& with_stats)
: msgr{msgr},
with_stats{with_stats},
- tick_timer{[this] {tick();}}
+ report_timer{[this] {report();}}
{}
seastar::future<> Client::start()
seastar::future<> Client::stop()
{
- return gate.close().then([this] {
- if (conn) {
- return conn->close();
- } else {
+ logger().info("{}", __func__);
+ report_timer.cancel();
+ auto fut = gate.close();
+ if (conn) {
+ conn->mark_down();
+ }
+ return fut;
+}
+
+std::optional<seastar::future<>>
+Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
+{
+ bool dispatched = true;
+ gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
+ switch(m->get_type()) {
+ case MSG_MGR_MAP:
+ return handle_mgr_map(conn, boost::static_pointer_cast<MMgrMap>(m));
+ case MSG_MGR_CONFIGURE:
+ return handle_mgr_conf(conn, boost::static_pointer_cast<MMgrConfigure>(m));
+ default:
+ dispatched = false;
return seastar::now();
}
});
+ return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
}
-seastar::future<> Client::ms_dispatch(crimson::net::Connection* conn,
- MessageRef m)
+void Client::ms_handle_connect(crimson::net::ConnectionRef c)
{
- switch(m->get_type()) {
- case MSG_MGR_MAP:
- return handle_mgr_map(conn, boost::static_pointer_cast<MMgrMap>(m));
- case MSG_MGR_CONFIGURE:
- return handle_mgr_conf(conn, boost::static_pointer_cast<MMgrConfigure>(m));
- default:
- return seastar::now();
- }
+ gate.dispatch_in_background(__func__, *this, [this, c] {
+ if (conn == c) {
+ // ask for the mgrconfigure message
+ auto m = ceph::make_message<MMgrOpen>();
+ m->daemon_name = local_conf()->name.get_id();
+ return conn->send(std::move(m));
+ } else {
+ return seastar::now();
+ }
+ });
}
-seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef c)
+void Client::ms_handle_reset(crimson::net::ConnectionRef c, bool /* is_replace */)
{
- if (conn == c) {
- conn = nullptr;
- tick_timer.cancel();
- }
- return seastar::now();
+ gate.dispatch_in_background(__func__, *this, [this, c] {
+ if (conn == c) {
+ report_timer.cancel();
+ return reconnect();
+ } else {
+ return seastar::now();
+ }
+ });
}
seastar::future<> Client::reconnect()
{
if (conn) {
- // crimson::net::Protocol::close() is able to close() in background
- (void)conn->close();
+ conn->mark_down();
+ conn = {};
}
if (!mgrmap.get_available()) {
logger().warn("No active mgr available yet");
return seastar::now();
}
- auto peer = mgrmap.get_active_addrs().front();
- conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MGR);
- // ask for the mgrconfigure message
- auto m = ceph::make_message<MMgrOpen>();
- m->daemon_name = local_conf()->name.get_id();
- return conn->send(std::move(m));
+ auto retry_interval = std::chrono::duration<double>(
+ local_conf().get_val<double>("mgr_connect_retry_interval"));
+ auto a_while = std::chrono::duration_cast<seastar::steady_clock_type::duration>(
+ retry_interval);
+ return seastar::sleep(a_while).then([this] {
+ auto peer = mgrmap.get_active_addrs().pick_addr(msgr.get_myaddr().get_type());
+ if (peer == entity_addr_t{}) {
+ // crimson msgr only uses the first bound addr
+ logger().error("mgr.{} does not have an addr compatible with me",
+ mgrmap.get_active_name());
+ return;
+ }
+ conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MGR);
+ });
}
-seastar::future<> Client::handle_mgr_map(crimson::net::Connection*,
+seastar::future<> Client::handle_mgr_map(crimson::net::ConnectionRef,
Ref<MMgrMap> m)
{
mgrmap = m->get_map();
}
}
-seastar::future<> Client::handle_mgr_conf(crimson::net::Connection* conn,
+seastar::future<> Client::handle_mgr_conf(crimson::net::ConnectionRef,
Ref<MMgrConfigure> m)
{
logger().info("{} {}", __func__, *m);
- auto tick_period = std::chrono::seconds{m->stats_period};
- if (tick_period.count()) {
- if (tick_timer.armed()) {
- tick_timer.rearm(tick_timer.get_timeout(), tick_period);
+ auto report_period = std::chrono::seconds{m->stats_period};
+ if (report_period.count()) {
+ if (report_timer.armed()) {
+ report_timer.rearm(report_timer.get_timeout(), report_period);
} else {
- tick_timer.arm_periodic(tick_period);
+ report_timer.arm_periodic(report_period);
}
} else {
- tick_timer.cancel();
+ report_timer.cancel();
}
return seastar::now();
}
-void Client::tick()
+void Client::report()
{
- (void) seastar::with_gate(gate, [this] {
- if (conn) {
- auto pg_stats = with_stats.get_stats();
- return conn->send(std::move(pg_stats));
- } else {
- return reconnect();
- }
+ gate.dispatch_in_background(__func__, *this, [this] {
+ assert(conn);
+ auto pg_stats = with_stats.get_stats();
+ return conn->send(std::move(pg_stats));
});
}
+void Client::print(std::ostream& out) const
+{
+ out << "mgrc ";
+}
+
}