Worker *w)
: Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0),
- out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), port(-1),
+ state(STATE_NONE), state_after_send(STATE_NONE), port(-1),
dispatch_queue(q), can_write(WriteStatus::NOWRITE),
- open_write(false), keepalive(false), recv_buf(NULL),
+ keepalive(false), recv_buf(NULL),
recv_max_prefetch(MAX(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
recv_start(0), recv_end(0),
last_active(ceph::coarse_mono_clock::now()),
}
}
+ assert(center->in_thread());
ssize_t r = cs.send(outcoming_bl, more);
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl;
<< " remaining bytes " << outcoming_bl.length() << dendl;
if (!open_write && is_queued()) {
- if (center->in_thread()) {
- center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler);
- open_write = true;
- } else {
- center->dispatch_event_external(write_handler);
- }
+ center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler);
+ open_write = true;
}
if (open_write && !is_queued()) {
- if (center->in_thread()) {
- center->delete_file_event(cs.fd(), EVENT_WRITABLE);
- open_write = false;
- } else {
- center->dispatch_event_external(write_handler);
- }
+ center->delete_file_event(cs.fd(), EVENT_WRITABLE);
+ open_write = false;
if (state_after_send != STATE_NONE)
center->dispatch_event_external(read_handler);
}
bool need_dispatch_writer = false;
std::lock_guard<std::mutex> l(lock);
last_active = ceph::coarse_mono_clock::now();
+ auto recv_start_time = ceph::mono_clock::now();
do {
ldout(async_msgr->cct, 20) << __func__ << " prev state is " << get_state_name(prev_state) << dendl;
prev_state = state;
#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
ltt_recv_stamp = ceph_clock_now();
#endif
+ recv_stamp = ceph_clock_now();
ldout(async_msgr->cct, 20) << __func__ << " begin MSG" << dendl;
ceph_msg_header header;
ceph_msg_header_old oldheader;
front.clear();
middle.clear();
data.clear();
- recv_stamp = ceph_clock_now();
current_header = header;
state = STATE_OPEN_MESSAGE_THROTTLE_MESSAGE;
break;
// side queueing because messages can't be renumbered, but the (kernel) client will
// occasionally pull a message out of the sent queue to send elsewhere. in that case
// it doesn't matter if we "got" it or not.
- uint64_t cur_seq = in_seq.read();
+ uint64_t cur_seq = in_seq;
if (message->get_seq() <= cur_seq) {
ldout(async_msgr->cct,0) << __func__ << " got old message "
<< message->get_seq() << " <= " << cur_seq << " " << message << " " << *message
#endif
// note last received message.
- in_seq.set(message->get_seq());
+ in_seq = message->get_seq();
ldout(async_msgr->cct, 5) << " rx " << message->get_source() << " seq "
<< message->get_seq() << " " << message
<< " " << *message << dendl;
if (!policy.lossy) {
- ack_left.inc();
+ ack_left++;
need_dispatch_writer = true;
}
state = STATE_OPEN;
logger->inc(l_msgr_recv_bytes, cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
async_msgr->ms_fast_preprocess(message);
+ auto fast_dispatch_time = ceph::mono_clock::now();
+ logger->tinc(l_msgr_running_recv_time, fast_dispatch_time - recv_start_time);
if (delay_state) {
utime_t release = message->get_recv_stamp();
double delay_period = 0;
} else if (async_msgr->ms_can_fast_dispatch(message)) {
lock.unlock();
dispatch_queue->fast_dispatch(message);
+ recv_start_time = ceph::mono_clock::now();
+ logger->tinc(l_msgr_running_fast_dispatch_time,
+ recv_start_time - fast_dispatch_time);
lock.lock();
} else {
dispatch_queue->enqueue(message, message->get_priority(), conn_id);
if (need_dispatch_writer && is_connected())
center->dispatch_event_external(write_handler);
+
+ logger->tinc(l_msgr_running_recv_time, ceph::mono_clock::now() - recv_start_time);
return;
fail:
<< " not " << peer_addr
<< " - presumably this is the same node!" << dendl;
} else {
- ldout(async_msgr->cct, 0) << __func__ << " connect claims to be "
- << paddr << " not " << peer_addr << " - wrong node!" << dendl;
- goto fail;
+ ldout(async_msgr->cct, 10) << __func__ << " connect claims to be "
+ << paddr << " not " << peer_addr << dendl;
+ goto fail;
}
}
ldout(async_msgr->cct, 20) << __func__ << " connect peer addr for me is " << peer_addr_for_me << dendl;
lock.unlock();
async_msgr->learned_addr(peer_addr_for_me);
- if (async_msgr->cct->_conf->ms_inject_internal_delays) {
+ if (async_msgr->cct->_conf->ms_inject_internal_delays
+ && async_msgr->cct->_conf->ms_inject_socket_failures) {
if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
ldout(msgr->cct, 10) << __func__ << " sleep for "
<< async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
case STATE_CONNECTING_SEND_CONNECT_MSG:
{
- if (!got_bad_auth) {
- delete authorizer;
+ if (!authorizer) {
authorizer = async_msgr->get_authorizer(peer_type, false);
}
bufferlist bl;
}
authorizer_reply.append(state_buffer, connect_reply.authorizer_len);
- bufferlist::iterator iter = authorizer_reply.begin();
+
+ if (connect_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
+ ldout(async_msgr->cct,10) << __func__ << " connect got auth challenge" << dendl;
+ authorizer->add_challenge(async_msgr->cct, authorizer_reply);
+ state = STATE_CONNECTING_SEND_CONNECT_MSG;
+ break;
+ }
+
+ auto iter = authorizer_reply.begin();
if (authorizer && !authorizer->verify_reply(iter)) {
ldout(async_msgr->cct, 0) << __func__ << " failed verifying authorize reply" << dendl;
goto fail;
newly_acked_seq = *((uint64_t*)state_buffer);
ldout(async_msgr->cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq
- << " vs out_seq " << out_seq.read() << dendl;
+ << " vs out_seq " << out_seq << dendl;
discard_requeued_up_to(newly_acked_seq);
//while (newly_acked_seq > out_seq.read()) {
// Message *m = _get_next_outgoing(NULL);
//}
bufferlist bl;
- uint64_t s = in_seq.read();
+ uint64_t s = in_seq;
bl.append((char*)&s, sizeof(s));
r = try_send(bl);
if (r == 0) {
if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
ldout(async_msgr->cct, 0) << __func__ << " connect got RESETSESSION" << dendl;
was_session_reset();
+ // see was_session_reset
+ outcoming_bl.clear();
state = STATE_CONNECTING_SEND_CONNECT_MSG;
}
if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
// require signatures for cephx?
if (connect.authorizer_protocol == CEPH_AUTH_CEPHX) {
if (peer_type == CEPH_ENTITY_TYPE_OSD ||
- peer_type == CEPH_ENTITY_TYPE_MDS) {
+ peer_type == CEPH_ENTITY_TYPE_MDS ||
+ peer_type == CEPH_ENTITY_TYPE_MGR) {
if (async_msgr->cct->_conf->cephx_require_signatures ||
async_msgr->cct->_conf->cephx_cluster_require_signatures) {
ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring MSG_AUTH feature bit for cluster" << dendl;
policy.features_required |= CEPH_FEATURE_MSG_AUTH;
}
+ if (async_msgr->cct->_conf->cephx_require_version >= 2 ||
+ async_msgr->cct->_conf->cephx_cluster_require_version >= 2) {
+ ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring cephx v2 feature bit for cluster" << dendl;
+ policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2;
+ }
} else {
if (async_msgr->cct->_conf->cephx_require_signatures ||
async_msgr->cct->_conf->cephx_service_require_signatures) {
ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring MSG_AUTH feature bit for service" << dendl;
policy.features_required |= CEPH_FEATURE_MSG_AUTH;
}
+ if (async_msgr->cct->_conf->cephx_require_version >= 2 ||
+ async_msgr->cct->_conf->cephx_service_require_version >= 2) {
+ ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring cephx v2 feature bit for service" << dendl;
+ policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2;
+ }
}
}
+
uint64_t feat_missing = policy.features_required & ~(uint64_t)connect.features;
if (feat_missing) {
ldout(async_msgr->cct, 1) << __func__ << " peer missing required features "
lock.unlock();
bool authorizer_valid;
- if (!async_msgr->verify_authorizer(this, peer_type, connect.authorizer_protocol, authorizer_bl,
- authorizer_reply, authorizer_valid, session_key) || !authorizer_valid) {
+ bool need_challenge = HAVE_FEATURE(connect.features, CEPHX_V2);
+ bool had_challenge = (bool)authorizer_challenge;
+ if (!async_msgr->verify_authorizer(
+ this, peer_type, connect.authorizer_protocol, authorizer_bl,
+ authorizer_reply, authorizer_valid, session_key,
+ need_challenge ? &authorizer_challenge : nullptr) ||
+ !authorizer_valid) {
lock.lock();
- ldout(async_msgr->cct,0) << __func__ << ": got bad authorizer" << dendl;
+ char tag;
+ if (need_challenge && !had_challenge && authorizer_challenge) {
+ ldout(async_msgr->cct,0) << __func__ << ": challenging authorizer"
+ << dendl;
+ assert(authorizer_reply.length());
+ tag = CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER;
+ } else {
+ ldout(async_msgr->cct,0) << __func__ << ": got bad authorizer" << dendl;
+ tag = CEPH_MSGR_TAG_BADAUTHORIZER;
+ }
session_security.reset();
- return _reply_accept(CEPH_MSGR_TAG_BADAUTHORIZER, connect, reply, authorizer_reply);
+ return _reply_accept(tag, connect, reply, authorizer_reply);
}
// We've verified the authorizer for this AsyncConnection, so set up the session security structure. PLR
// connection's lock
existing->lock.lock(); // skip lockdep check (we are locking a second AsyncConnection here)
- if (existing->replacing || existing->state == STATE_CLOSED) {
- ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing."
+ if (existing->state == STATE_CLOSED) {
+ ldout(async_msgr->cct, 1) << __func__ << " existing already closed." << dendl;
+ existing->lock.unlock();
+ existing = NULL;
+ goto open;
+ }
+
+ if (existing->replacing) {
+ ldout(async_msgr->cct, 1) << __func__ << " existing racing replace happened while replacing."
<< " existing_state=" << get_state_name(existing->state) << dendl;
reply.global_seq = existing->peer_global_seq;
r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE);
- // Clean up output buffer
- existing->outcoming_bl.clear();
if (existing->delay_state) {
existing->delay_state->flush();
assert(!delay_state);
}
- existing->requeue_sent();
existing->reset_recv_state();
auto temp_cs = std::move(cs);
dispatch_queue->queue_reset(this);
ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl;
existing->can_write = WriteStatus::REPLACING;
- existing->open_write = false;
existing->replacing = true;
existing->state_offset = 0;
// avoid previous thread modify event
// there shouldn't exist any buffer
assert(recv_start == recv_end);
+ existing->authorizer_challenge.reset();
+
auto deactivate_existing = std::bind(
[existing, new_worker, new_center, connect, reply, authorizer_reply](ConnectedSocket &cs) mutable {
// we need to delete time event in original thread
{
std::lock_guard<std::mutex> l(existing->lock);
+ existing->write_lock.lock();
+ existing->requeue_sent();
+ existing->outcoming_bl.clear();
+ existing->open_write = false;
+ existing->write_lock.unlock();
if (existing->state == STATE_NONE) {
existing->shutdown_socket();
existing->cs = std::move(cs);
connect_seq = connect.connect_seq + 1;
peer_global_seq = connect.global_seq;
ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = "
- << connect_seq << " in_seq=" << in_seq.read() << ", sending READY" << dendl;
+ << connect_seq << " in_seq=" << in_seq << ", sending READY" << dendl;
int next_state;
next_state = STATE_ACCEPTING_READY;
discard_requeued_up_to(0);
is_reset_from_peer = false;
- in_seq.set(0);
+ in_seq = 0;
}
// send READY reply
reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
if (reply.tag == CEPH_MSGR_TAG_SEQ) {
- uint64_t s = in_seq.read();
+ uint64_t s = in_seq;
reply_bl.append((char*)&s, sizeof(s));
}
}
if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
ldout(async_msgr->cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down" << dendl;
- assert(state == STATE_CLOSED);
+ assert(state == STATE_CLOSED || state == STATE_NONE);
goto fail_registered;
}
ldout(async_msgr->cct, 5) << __func__ << " clear encoded buffer previous "
<< f << " != " << get_features() << dendl;
}
- if (!is_queued() && can_write == WriteStatus::CANWRITE && async_msgr->cct->_conf->ms_async_send_inline) {
- if (!bl.length())
- prepare_send_message(get_features(), m, bl);
- logger->inc(l_msgr_send_messages_inline);
- if (write_message(m, bl, false) < 0) {
- ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
- // we want to handle fault within internal thread
- center->dispatch_event_external(write_handler);
- }
- } else if (can_write == WriteStatus::CLOSED) {
+ if (can_write == WriteStatus::CLOSED) {
ldout(async_msgr->cct, 10) << __func__ << " connection closed."
<< " Drop message " << m << dendl;
m->put();
ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend "
<< " (" << m->get_seq() << ")" << dendl;
rq.push_front(make_pair(bufferlist(), m));
- out_seq.dec();
+ out_seq--;
}
}
<< " <= " << seq << ", discarding" << dendl;
p.second->put();
rq.pop_front();
- out_seq.inc();
+ out_seq++;
}
if (rq.empty())
out_q.erase(CEPH_MSG_PRIO_HIGHEST);
/*
* Tears down the AsyncConnection's message queues, and removes them from the DispatchQueue
- * Must hold pipe_lock prior to calling.
+ * Must hold write_lock prior to calling.
*/
void AsyncConnection::discard_out_queue()
{
r->second->put();
}
out_q.clear();
- outcoming_bl.clear();
}
int AsyncConnection::randomize_out_seq()
int seq_error = get_random_bytes((char *)&rand_seq, sizeof(rand_seq));
rand_seq &= SEQ_MASK;
lsubdout(async_msgr->cct, ms, 10) << __func__ << " randomize_out_seq " << rand_seq << dendl;
- out_seq.set(rand_seq);
+ out_seq = rand_seq;
return seq_error;
} else {
// previously, seq #'s always started at 0.
- out_seq.set(0);
+ out_seq = 0;
return 0;
}
}
outcoming_bl.clear();
if (!once_ready && !is_queued() &&
state >=STATE_ACCEPTING && state <= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
- ldout(async_msgr->cct, 0) << __func__ << " with nothing to send and in the half "
+ ldout(async_msgr->cct, 10) << __func__ << " with nothing to send and in the half "
<< " accept state just closed" << dendl;
write_lock.unlock();
_stop();
}
reset_recv_state();
if (policy.standby && !is_queued() && state != STATE_WAIT) {
- ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl;
+ ldout(async_msgr->cct, 10) << __func__ << " with nothing to send, going to standby" << dendl;
state = STATE_STANDBY;
write_lock.unlock();
return;
delay_state->discard();
dispatch_queue->discard_queue(conn_id);
discard_out_queue();
+ // note: we need to clear outcoming_bl here, but was_session_reset may be
+ // called by other thread, so let caller clear this itself!
+ // outcoming_bl.clear();
dispatch_queue->queue_remote_reset(this);
if (randomize_out_seq()) {
- ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq.read() << dendl;
+ ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
}
- in_seq.set(0);
+ in_seq = 0;
connect_seq = 0;
// it's safe to directly set 0, double locked
- ack_left.set(0);
+ ack_left = 0;
once_ready = false;
can_write = WriteStatus::NOWRITE;
}
ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
{
FUNCTRACE();
- assert(can_write == WriteStatus::CANWRITE);
- m->set_seq(out_seq.inc());
-
- if (!policy.lossy) {
- // put on sent list
- sent.push_back(m);
- m->get();
- }
+ assert(center->in_thread());
+ m->set_seq(++out_seq);
if (msgr->crcflags & MSG_CRC_HEADER)
m->calc_header_crc();
<< " off " << header.data_off << dendl;
if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) {
- std::list<buffer::ptr>::const_iterator pb;
- for (pb = bl.buffers().begin(); pb != bl.buffers().end(); ++pb) {
- outcoming_bl.append((char*)pb->c_str(), pb->length());
+ for (const auto &pb : bl.buffers()) {
+ outcoming_bl.append((char*)pb.c_str(), pb.length());
}
} else {
outcoming_bl.claim_append(bl);
}
m->trace.event("async writing message");
- logger->inc(l_msgr_send_bytes, outcoming_bl.length() - original_bl_len);
ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq()
<< " " << m << dendl;
+ ssize_t total_send_size = outcoming_bl.length();
ssize_t rc = _try_send(more);
if (rc < 0) {
ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", "
<< cpp_strerror(rc) << dendl;
} else if (rc == 0) {
+ logger->inc(l_msgr_send_bytes, total_send_size - original_bl_len);
ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " done." << dendl;
} else {
+ logger->inc(l_msgr_send_bytes, total_send_size - outcoming_bl.length());
ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " continuely." << dendl;
}
if (m->get_type() == CEPH_MSG_OSD_OP)
keepalive = false;
}
- while (1) {
+ auto start = ceph::mono_clock::now();
+ bool more;
+ do {
bufferlist data;
Message *m = _get_next_outgoing(&data);
if (!m)
break;
+ if (!policy.lossy) {
+ // put on sent list
+ sent.push_back(m);
+ m->get();
+ }
+ more = _has_next_outgoing();
+ write_lock.unlock();
+
// send_message or requeue messages may not encode message
if (!data.length())
prepare_send_message(get_features(), m, data);
- r = write_message(m, data, _has_next_outgoing());
+ r = write_message(m, data, more);
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
- write_lock.unlock();
goto fail;
- } else if (r > 0) {
- break;
}
- }
+ write_lock.lock();
+ if (r > 0)
+ break;
+ } while (can_write == WriteStatus::CANWRITE);
+ write_lock.unlock();
- uint64_t left = ack_left.read();
+ uint64_t left = ack_left;
if (left) {
ceph_le64 s;
- s = in_seq.read();
+ s = in_seq;
outcoming_bl.append(CEPH_MSGR_TAG_ACK);
outcoming_bl.append((char*)&s, sizeof(s));
ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl;
- ack_left.sub(left);
- left = ack_left.read();
+ ack_left -= left;
+ left = ack_left;
r = _try_send(left);
} else if (is_queued()) {
r = _try_send();
}
- write_lock.unlock();
+ logger->tinc(l_msgr_running_send_time, ceph::mono_clock::now() - start);
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
goto fail;
auto now = ceph::coarse_mono_clock::now();
ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id
<< " last_active" << last_active << dendl;
- assert(last_tick_id == id);
std::lock_guard<std::mutex> l(lock);
last_tick_id = 0;
auto idle_period = std::chrono::duration_cast<std::chrono::microseconds>(now - last_active).count();