out_q[m->get_priority()].emplace_back(std::move(bl), m);
ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
<< dendl;
- if (can_write != WriteStatus::REPLACING) {
+ if (can_write != WriteStatus::REPLACING && !write_in_progress) {
+ write_in_progress = true;
connection->center->dispatch_event_external(connection->write_handler);
}
}
} else if (r > 0)
break;
} while (can_write == WriteStatus::CANWRITE);
+ write_in_progress = false;
connection->write_lock.unlock();
// if r > 0 mean data still lefted, so no need _try_send.
return;
}
} else {
+ write_in_progress = false;
connection->write_lock.unlock();
connection->lock.lock();
connection->write_lock.lock();
}
}
- message->set_connection(connection);
-
#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
if (message->get_type() == CEPH_MSG_OSD_OP ||
message->get_type() == CEPH_MSG_OSD_OPREPLY) {
}
void ProtocolV1::requeue_sent() {
+ write_in_progress = false;
if (sent.empty()) {
return;
}
}
}
out_q.clear();
+ write_in_progress = false;
}
void ProtocolV1::reset_recv_state() {
<< __func__ << " stop myself to swap existing" << dendl;
exproto->can_write = WriteStatus::REPLACING;
exproto->replacing = true;
+ exproto->write_in_progress = false;
existing->state_offset = 0;
// avoid previous thread modify event
exproto->state = NONE;