#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
ltt_recv_stamp = ceph_clock_now();
#endif
+ recv_stamp = ceph_clock_now();
ldout(async_msgr->cct, 20) << __func__ << " begin MSG" << dendl;
ceph_msg_header header;
ceph_msg_header_old oldheader;
front.clear();
middle.clear();
data.clear();
- recv_stamp = ceph_clock_now();
current_header = header;
state = STATE_OPEN_MESSAGE_THROTTLE_MESSAGE;
break;
<< " not " << peer_addr
<< " - presumably this is the same node!" << dendl;
} else {
- ldout(async_msgr->cct, 0) << __func__ << " connect claims to be "
- << paddr << " not " << peer_addr << " - wrong node!" << dendl;
- goto fail;
+ ldout(async_msgr->cct, 10) << __func__ << " connect claims to be "
+ << paddr << " not " << peer_addr << dendl;
+ goto fail;
}
}
ldout(async_msgr->cct, 20) << __func__ << " connect peer addr for me is " << peer_addr_for_me << dendl;
lock.unlock();
async_msgr->learned_addr(peer_addr_for_me);
- if (async_msgr->cct->_conf->ms_inject_internal_delays) {
+ if (async_msgr->cct->_conf->ms_inject_internal_delays
+ && async_msgr->cct->_conf->ms_inject_socket_failures) {
if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
ldout(msgr->cct, 10) << __func__ << " sleep for "
<< async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
case STATE_CONNECTING_SEND_CONNECT_MSG:
{
- if (!got_bad_auth) {
- delete authorizer;
+ if (!authorizer) {
authorizer = async_msgr->get_authorizer(peer_type, false);
}
bufferlist bl;
}
authorizer_reply.append(state_buffer, connect_reply.authorizer_len);
- bufferlist::iterator iter = authorizer_reply.begin();
+
+ if (connect_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
+ ldout(async_msgr->cct,10) << __func__ << " connect got auth challenge" << dendl;
+ authorizer->add_challenge(async_msgr->cct, authorizer_reply);
+ state = STATE_CONNECTING_SEND_CONNECT_MSG;
+ break;
+ }
+
+ auto iter = authorizer_reply.begin();
if (authorizer && !authorizer->verify_reply(iter)) {
ldout(async_msgr->cct, 0) << __func__ << " failed verifying authorize reply" << dendl;
goto fail;
// require signatures for cephx?
if (connect.authorizer_protocol == CEPH_AUTH_CEPHX) {
if (peer_type == CEPH_ENTITY_TYPE_OSD ||
- peer_type == CEPH_ENTITY_TYPE_MDS) {
+ peer_type == CEPH_ENTITY_TYPE_MDS ||
+ peer_type == CEPH_ENTITY_TYPE_MGR) {
if (async_msgr->cct->_conf->cephx_require_signatures ||
async_msgr->cct->_conf->cephx_cluster_require_signatures) {
ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring MSG_AUTH feature bit for cluster" << dendl;
policy.features_required |= CEPH_FEATURE_MSG_AUTH;
}
+ if (async_msgr->cct->_conf->cephx_require_version >= 2 ||
+ async_msgr->cct->_conf->cephx_cluster_require_version >= 2) {
+ ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring cephx v2 feature bit for cluster" << dendl;
+ policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2;
+ }
} else {
if (async_msgr->cct->_conf->cephx_require_signatures ||
async_msgr->cct->_conf->cephx_service_require_signatures) {
ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring MSG_AUTH feature bit for service" << dendl;
policy.features_required |= CEPH_FEATURE_MSG_AUTH;
}
+ if (async_msgr->cct->_conf->cephx_require_version >= 2 ||
+ async_msgr->cct->_conf->cephx_service_require_version >= 2) {
+ ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring cephx v2 feature bit for service" << dendl;
+ policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2;
+ }
}
}
+
uint64_t feat_missing = policy.features_required & ~(uint64_t)connect.features;
if (feat_missing) {
ldout(async_msgr->cct, 1) << __func__ << " peer missing required features "
lock.unlock();
bool authorizer_valid;
- if (!async_msgr->verify_authorizer(this, peer_type, connect.authorizer_protocol, authorizer_bl,
- authorizer_reply, authorizer_valid, session_key) || !authorizer_valid) {
+ bool need_challenge = HAVE_FEATURE(connect.features, CEPHX_V2);
+ bool had_challenge = (bool)authorizer_challenge;
+ if (!async_msgr->verify_authorizer(
+ this, peer_type, connect.authorizer_protocol, authorizer_bl,
+ authorizer_reply, authorizer_valid, session_key,
+ need_challenge ? &authorizer_challenge : nullptr) ||
+ !authorizer_valid) {
lock.lock();
- ldout(async_msgr->cct,0) << __func__ << ": got bad authorizer" << dendl;
+ char tag;
+ if (need_challenge && !had_challenge && authorizer_challenge) {
+ ldout(async_msgr->cct,0) << __func__ << ": challenging authorizer"
+ << dendl;
+ assert(authorizer_reply.length());
+ tag = CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER;
+ } else {
+ ldout(async_msgr->cct,0) << __func__ << ": got bad authorizer" << dendl;
+ tag = CEPH_MSGR_TAG_BADAUTHORIZER;
+ }
session_security.reset();
- return _reply_accept(CEPH_MSGR_TAG_BADAUTHORIZER, connect, reply, authorizer_reply);
+ return _reply_accept(tag, connect, reply, authorizer_reply);
}
// We've verified the authorizer for this AsyncConnection, so set up the session security structure. PLR
// there shouldn't exist any buffer
assert(recv_start == recv_end);
+ existing->authorizer_challenge.reset();
+
auto deactivate_existing = std::bind(
[existing, new_worker, new_center, connect, reply, authorizer_reply](ConnectedSocket &cs) mutable {
// we need to delete time event in original thread
}
if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
ldout(async_msgr->cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down" << dendl;
- assert(state == STATE_CLOSED);
+ assert(state == STATE_CLOSED || state == STATE_NONE);
goto fail_registered;
}
<< " off " << header.data_off << dendl;
if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) {
- std::list<buffer::ptr>::const_iterator pb;
- for (pb = bl.buffers().begin(); pb != bl.buffers().end(); ++pb) {
- outcoming_bl.append((char*)pb->c_str(), pb->length());
+ for (const auto &pb : bl.buffers()) {
+ outcoming_bl.append((char*)pb.c_str(), pb.length());
}
} else {
outcoming_bl.claim_append(bl);
}
m->trace.event("async writing message");
- logger->inc(l_msgr_send_bytes, outcoming_bl.length() - original_bl_len);
ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq()
<< " " << m << dendl;
+ ssize_t total_send_size = outcoming_bl.length();
ssize_t rc = _try_send(more);
if (rc < 0) {
ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", "
<< cpp_strerror(rc) << dendl;
} else if (rc == 0) {
+ logger->inc(l_msgr_send_bytes, total_send_size - original_bl_len);
ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " done." << dendl;
} else {
+ logger->inc(l_msgr_send_bytes, total_send_size - outcoming_bl.length());
ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " continuely." << dendl;
}
if (m->get_type() == CEPH_MSG_OSD_OP)