class socket_blocker {
std::optional<seastar::abort_source> p_blocked;
std::optional<seastar::abort_source> p_unblocked;
+ const seastar::shard_id primary_sid;
public:
+ socket_blocker() : primary_sid{seastar::this_shard_id()} {}
+
seastar::future<> wait_blocked() {
+ ceph_assert(seastar::this_shard_id() == primary_sid);
ceph_assert(!p_blocked);
if (p_unblocked) {
return seastar::make_ready_future<>();
} else {
p_blocked = seastar::abort_source();
- return seastar::sleep_abortable(std::chrono::seconds(10),
- *p_blocked).then([] {
+ return seastar::sleep_abortable(
+ std::chrono::seconds(10), *p_blocked
+ ).then([] {
throw std::runtime_error(
"Timeout (10s) in socket_blocker::wait_blocked()");
}).handle_exception_type([] (const seastar::sleep_aborted& e) {
}
seastar::future<> block() {
- if (p_blocked) {
- p_blocked->request_abort();
- p_blocked = std::nullopt;
- }
- ceph_assert(!p_unblocked);
- p_unblocked = seastar::abort_source();
- return seastar::sleep_abortable(std::chrono::seconds(10),
- *p_unblocked).then([] {
- ceph_abort("Timeout (10s) in socket_blocker::block()");
- }).handle_exception_type([] (const seastar::sleep_aborted& e) {
- // wait done!
+ return seastar::smp::submit_to(primary_sid, [this] {
+ if (p_blocked) {
+ p_blocked->request_abort();
+ p_blocked = std::nullopt;
+ }
+ ceph_assert(!p_unblocked);
+ p_unblocked = seastar::abort_source();
+ return seastar::sleep_abortable(
+ std::chrono::seconds(10), *p_unblocked
+ ).then([] {
+ ceph_abort("Timeout (10s) in socket_blocker::block()");
+ }).handle_exception_type([] (const seastar::sleep_aborted& e) {
+ // wait done!
+ });
});
}
void unblock() {
+ ceph_assert(seastar::this_shard_id() == primary_sid);
ceph_assert(!p_blocked);
ceph_assert(p_unblocked);
p_unblocked->request_abort();
struct Interceptor {
socket_blocker blocker;
virtual ~Interceptor() {}
- virtual void register_conn(SocketConnection& conn) = 0;
- virtual void register_conn_ready(SocketConnection& conn) = 0;
- virtual void register_conn_closed(SocketConnection& conn) = 0;
- virtual void register_conn_replaced(SocketConnection& conn) = 0;
- virtual bp_action_t intercept(SocketConnection& conn, Breakpoint bp) = 0;
+ virtual void register_conn(ConnectionRef) = 0;
+ virtual void register_conn_ready(ConnectionRef) = 0;
+ virtual void register_conn_closed(ConnectionRef) = 0;
+ virtual void register_conn_replaced(ConnectionRef) = 0;
+
+ virtual seastar::future<bp_action_t>
+ intercept(Connection&, std::vector<Breakpoint> bp) = 0;
};
} // namespace crimson::net