]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * Portions Copyright (C) 2013 CohortFS, LLC | |
8 | * | |
9 | * This is free software; you can redistribute it and/or | |
10 | * modify it under the terms of the GNU Lesser General Public | |
11 | * License version 2.1, as published by the Free Software | |
12 | * Foundation. See file COPYING. | |
13 | * | |
14 | */ | |
15 | ||
16 | #include "XioMsg.h" | |
17 | #include "XioConnection.h" | |
18 | #include "XioMessenger.h" | |
19 | #include "messages/MDataPing.h" | |
20 | #include "msg/msg_types.h" | |
21 | #include "auth/none/AuthNoneProtocol.h" // XXX | |
22 | ||
11fdf7f2 | 23 | #include "include/ceph_assert.h" |
7c673cae FG |
24 | #include "common/dout.h" |
25 | ||
26 | extern struct xio_mempool *xio_msgr_mpool; | |
27 | extern struct xio_mempool *xio_msgr_noreg_mpool; | |
28 | ||
29 | #define dout_subsys ceph_subsys_xio | |
30 | ||
31 | void print_xio_msg_hdr(CephContext *cct, const char *tag, | |
32 | const XioMsgHdr &hdr, const struct xio_msg *msg) | |
33 | { | |
34 | if (msg) { | |
35 | ldout(cct,4) << tag << | |
36 | " xio msg:" << | |
37 | " sn: " << msg->sn << | |
38 | " timestamp: " << msg->timestamp << | |
39 | dendl; | |
40 | } | |
41 | ||
42 | ldout(cct,4) << tag << | |
43 | " ceph header: " << | |
44 | " front_len: " << hdr.hdr->front_len << | |
45 | " seq: " << hdr.hdr->seq << | |
46 | " tid: " << hdr.hdr->tid << | |
47 | " type: " << hdr.hdr->type << | |
48 | " prio: " << hdr.hdr->priority << | |
49 | " name type: " << (int) hdr.hdr->src.type << | |
50 | " name num: " << (int) hdr.hdr->src.num << | |
51 | " version: " << hdr.hdr->version << | |
52 | " compat_version: " << hdr.hdr->compat_version << | |
53 | " front_len: " << hdr.hdr->front_len << | |
54 | " middle_len: " << hdr.hdr->middle_len << | |
55 | " data_len: " << hdr.hdr->data_len << | |
56 | " xio header: " << | |
57 | " msg_cnt: " << hdr.msg_cnt << | |
58 | dendl; | |
59 | ||
60 | ldout(cct,4) << tag << | |
61 | " ceph footer: " << | |
62 | " front_crc: " << hdr.ftr->front_crc << | |
63 | " middle_crc: " << hdr.ftr->middle_crc << | |
64 | " data_crc: " << hdr.ftr->data_crc << | |
65 | " sig: " << hdr.ftr->sig << | |
66 | " flags: " << (uint32_t) hdr.ftr->flags << | |
67 | dendl; | |
68 | } | |
69 | ||
70 | void print_ceph_msg(CephContext *cct, const char *tag, Message *m) | |
71 | { | |
72 | if (m->get_magic() & (MSG_MAGIC_XIO & MSG_MAGIC_TRACE_DTOR)) { | |
73 | ceph_msg_header& header = m->get_header(); | |
74 | ldout(cct,4) << tag << " header version " << header.version << | |
75 | " compat version " << header.compat_version << | |
76 | dendl; | |
77 | } | |
78 | } | |
79 | ||
80 | #undef dout_prefix | |
81 | #define dout_prefix conn_prefix(_dout) | |
82 | ostream& XioConnection::conn_prefix(std::ostream *_dout) { | |
83 | return *_dout << "-- " << get_messenger()->get_myinst().addr << " >> " << peer_addr | |
84 | << " peer=" << peer.name.type_str() | |
85 | << " conn=" << conn << " sess=" << session << " "; | |
86 | } | |
87 | ||
88 | XioConnection::XioConnection(XioMessenger *m, XioConnection::type _type, | |
89 | const entity_inst_t& _peer) : | |
90 | Connection(m->cct, m), | |
91 | xio_conn_type(_type), | |
92 | portal(m->get_portal()), | |
93 | connected(false), | |
94 | peer(_peer), | |
95 | session(NULL), | |
96 | conn(NULL), | |
97 | magic(m->get_magic()), | |
98 | scount(0), | |
99 | send_ctr(0), | |
100 | in_seq(), | |
101 | cstate(this) | |
102 | { | |
7c673cae FG |
103 | set_peer_type(peer.name.type()); |
104 | set_peer_addr(peer.addr); | |
105 | ||
106 | Messenger::Policy policy; | |
107 | int64_t max_msgs = 0, max_bytes = 0, bytes_opt = 0; | |
108 | int xopt; | |
109 | ||
110 | policy = m->get_policy(peer_type); | |
111 | ||
112 | if (policy.throttler_messages) { | |
113 | max_msgs = policy.throttler_messages->get_max(); | |
114 | ldout(m->cct,4) << "XioMessenger throttle_msgs: " << max_msgs << dendl; | |
115 | } | |
116 | ||
117 | xopt = m->cct->_conf->xio_queue_depth; | |
118 | if (max_msgs > xopt) | |
119 | xopt = max_msgs; | |
120 | ||
121 | /* set high mark for send, reserved 20% for credits */ | |
122 | q_high_mark = xopt * 4 / 5; | |
123 | q_low_mark = q_high_mark/2; | |
124 | ||
125 | /* set send & receive msgs queue depth */ | |
126 | xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_SND_QUEUE_DEPTH_MSGS, | |
127 | &xopt, sizeof(xopt)); | |
128 | xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_MSGS, | |
129 | &xopt, sizeof(xopt)); | |
130 | ||
131 | if (policy.throttler_bytes) { | |
132 | max_bytes = policy.throttler_bytes->get_max(); | |
133 | ldout(m->cct,4) << "XioMessenger throttle_bytes: " << max_bytes << dendl; | |
134 | } | |
135 | ||
136 | bytes_opt = (2 << 28); /* default: 512 MB */ | |
137 | if (max_bytes > bytes_opt) | |
138 | bytes_opt = max_bytes; | |
139 | ||
140 | /* set send & receive total bytes throttle */ | |
141 | xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_SND_QUEUE_DEPTH_BYTES, | |
142 | &bytes_opt, sizeof(bytes_opt)); | |
143 | xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_BYTES, | |
144 | &bytes_opt, sizeof(bytes_opt)); | |
145 | ||
146 | ldout(m->cct,4) << "throttle_msgs: " << xopt << " throttle_bytes: " << bytes_opt << dendl; | |
147 | ||
148 | /* XXXX fake features, aieee! */ | |
149 | set_features(XIO_ALL_FEATURES); | |
150 | } | |
151 | ||
152 | int XioConnection::send_message(Message *m) | |
153 | { | |
154 | XioMessenger *ms = static_cast<XioMessenger*>(get_messenger()); | |
155 | return ms->_send_message(m, this); | |
156 | } | |
157 | ||
158 | void XioConnection::send_keepalive_or_ack(bool ack, const utime_t *tp) | |
159 | { | |
160 | /* If con is not in READY state, we need to queue the request */ | |
161 | if (cstate.session_state.read() != XioConnection::UP) { | |
11fdf7f2 | 162 | std::lock_guad<ceph::util::spinlock> lg(sp); |
7c673cae FG |
163 | if (cstate.session_state.read() != XioConnection::UP) { |
164 | if (ack) { | |
165 | outgoing.ack = true; | |
166 | outgoing.ack_time = *tp; | |
167 | } | |
168 | else { | |
169 | outgoing.keepalive = true; | |
170 | } | |
7c673cae FG |
171 | return; |
172 | } | |
7c673cae FG |
173 | } |
174 | ||
175 | send_keepalive_or_ack_internal(ack, tp); | |
176 | } | |
177 | ||
178 | void XioConnection::send_keepalive_or_ack_internal(bool ack, const utime_t *tp) | |
179 | { | |
180 | XioCommand *xcmd = pool_alloc_xio_command(this); | |
181 | if (! xcmd) { | |
182 | /* could happen if Accelio has been shutdown */ | |
183 | return; | |
184 | } | |
185 | ||
186 | struct ceph_timespec ts; | |
187 | if (ack) { | |
11fdf7f2 | 188 | ceph_assert(tp); |
7c673cae FG |
189 | tp->encode_timeval(&ts); |
190 | xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2_ACK); | |
191 | xcmd->get_bl_ref().append((char*)&ts, sizeof(ts)); | |
192 | } else if (has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) { | |
193 | utime_t t = ceph_clock_now(); | |
194 | t.encode_timeval(&ts); | |
195 | xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2); | |
196 | xcmd->get_bl_ref().append((char*)&ts, sizeof(ts)); | |
197 | } else { | |
198 | xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE); | |
199 | } | |
200 | ||
201 | const std::list<buffer::ptr>& header = xcmd->get_bl_ref().buffers(); | |
11fdf7f2 | 202 | ceph_assert(header.size() == 1); /* accelio header must be without scatter gather */ |
7c673cae | 203 | list<bufferptr>::const_iterator pb = header.begin(); |
11fdf7f2 | 204 | ceph_assert(pb->length() < XioMsgHdr::get_max_encoded_length()); |
7c673cae FG |
205 | struct xio_msg * msg = xcmd->get_xio_msg(); |
206 | msg->out.header.iov_base = (char*) pb->c_str(); | |
207 | msg->out.header.iov_len = pb->length(); | |
208 | ||
209 | ldout(msgr->cct,8) << __func__ << " sending command with tag " << (int)(*(char*)msg->out.header.iov_base) | |
210 | << " len " << msg->out.header.iov_len << dendl; | |
211 | ||
212 | portal->enqueue(this, xcmd); | |
213 | } | |
214 | ||
215 | ||
216 | int XioConnection::passive_setup() | |
217 | { | |
218 | /* XXX passive setup is a placeholder for (potentially active-side | |
219 | initiated) feature and auth* negotiation */ | |
220 | static bufferlist authorizer_reply; /* static because fake */ | |
221 | static CryptoKey session_key; /* ditto */ | |
222 | bool authorizer_valid; | |
223 | ||
224 | XioMessenger *msgr = static_cast<XioMessenger*>(get_messenger()); | |
225 | ||
226 | // fake an auth buffer | |
227 | EntityName name; | |
228 | name.set_type(peer.name.type()); | |
229 | ||
230 | AuthNoneAuthorizer auth; | |
231 | auth.build_authorizer(name, peer.name.num()); | |
232 | ||
233 | /* XXX fake authorizer! */ | |
234 | msgr->ms_deliver_verify_authorizer( | |
235 | this, peer_type, CEPH_AUTH_NONE, | |
236 | auth.bl, | |
11fdf7f2 | 237 | 0, |
7c673cae FG |
238 | authorizer_reply, |
239 | authorizer_valid, | |
240 | session_key); | |
241 | ||
242 | /* notify hook */ | |
243 | msgr->ms_deliver_handle_accept(this); | |
244 | msgr->ms_deliver_handle_fast_accept(this); | |
245 | ||
246 | /* try to insert in conns_entity_map */ | |
247 | msgr->try_insert(this); | |
248 | return (0); | |
249 | } | |
250 | ||
251 | static inline XioDispatchHook* pool_alloc_xio_dispatch_hook( | |
252 | XioConnection *xcon, Message *m, XioInSeq& msg_seq) | |
253 | { | |
254 | struct xio_reg_mem mp_mem; | |
255 | int e = xpool_alloc(xio_msgr_noreg_mpool, | |
256 | sizeof(XioDispatchHook), &mp_mem); | |
257 | if (!!e) | |
258 | return NULL; | |
259 | XioDispatchHook *xhook = static_cast<XioDispatchHook*>(mp_mem.addr); | |
260 | new (xhook) XioDispatchHook(xcon, m, msg_seq, mp_mem); | |
261 | return xhook; | |
262 | } | |
263 | ||
264 | int XioConnection::handle_data_msg(struct xio_session *session, | |
265 | struct xio_msg *msg, | |
266 | int more_in_batch, | |
267 | void *cb_user_context) | |
268 | { | |
269 | struct xio_msg *tmsg = msg; | |
270 | ||
271 | /* XXX Accelio guarantees message ordering at | |
272 | * xio_session */ | |
273 | ||
274 | if (! in_seq.p()) { | |
275 | if (!tmsg->in.header.iov_len) { | |
276 | ldout(msgr->cct,0) << __func__ << " empty header: packet out of sequence?" << dendl; | |
277 | xio_release_msg(msg); | |
278 | return 0; | |
279 | } | |
280 | const size_t sizeof_tag = 1; | |
281 | XioMsgCnt msg_cnt( | |
282 | buffer::create_static(tmsg->in.header.iov_len-sizeof_tag, | |
283 | ((char*) tmsg->in.header.iov_base)+sizeof_tag)); | |
284 | ldout(msgr->cct,10) << __func__ << " receive msg " << "tmsg " << tmsg | |
285 | << " msg_cnt " << msg_cnt.msg_cnt | |
286 | << " iov_base " << tmsg->in.header.iov_base | |
287 | << " iov_len " << (int) tmsg->in.header.iov_len | |
288 | << " nents " << tmsg->in.pdata_iov.nents | |
289 | << " sn " << tmsg->sn << dendl; | |
11fdf7f2 | 290 | ceph_assert(session == this->session); |
7c673cae FG |
291 | in_seq.set_count(msg_cnt.msg_cnt); |
292 | } else { | |
293 | /* XXX major sequence error */ | |
11fdf7f2 | 294 | ceph_assert(! tmsg->in.header.iov_len); |
7c673cae FG |
295 | } |
296 | ||
297 | in_seq.append(msg); | |
298 | if (in_seq.count() > 0) { | |
299 | return 0; | |
300 | } | |
301 | ||
302 | XioMessenger *msgr = static_cast<XioMessenger*>(get_messenger()); | |
303 | XioDispatchHook *m_hook = | |
304 | pool_alloc_xio_dispatch_hook(this, NULL /* msg */, in_seq); | |
305 | XioInSeq& msg_seq = m_hook->msg_seq; | |
306 | in_seq.clear(); | |
307 | ||
308 | ceph_msg_header header; | |
309 | ceph_msg_footer footer; | |
310 | buffer::list payload, middle, data; | |
311 | ||
312 | const utime_t recv_stamp = ceph_clock_now(); | |
313 | ||
314 | ldout(msgr->cct,4) << __func__ << " " << "msg_seq.size()=" << msg_seq.size() << | |
315 | dendl; | |
316 | ||
317 | struct xio_msg* msg_iter = msg_seq.begin(); | |
318 | tmsg = msg_iter; | |
319 | XioMsgHdr hdr(header, footer, | |
320 | buffer::create_static(tmsg->in.header.iov_len, | |
321 | (char*) tmsg->in.header.iov_base)); | |
322 | ||
323 | if (magic & (MSG_MAGIC_TRACE_XCON)) { | |
324 | if (hdr.hdr->type == 43) { | |
325 | print_xio_msg_hdr(msgr->cct, "on_msg", hdr, NULL); | |
326 | } | |
327 | } | |
328 | ||
329 | unsigned int ix, blen, iov_len; | |
330 | struct xio_iovec_ex *msg_iov, *iovs; | |
331 | uint32_t take_len, left_len = 0; | |
332 | char *left_base = NULL; | |
333 | ||
334 | ix = 0; | |
335 | blen = header.front_len; | |
336 | ||
337 | while (blen && (msg_iter != msg_seq.end())) { | |
338 | tmsg = msg_iter; | |
339 | iov_len = vmsg_sglist_nents(&tmsg->in); | |
340 | iovs = vmsg_sglist(&tmsg->in); | |
341 | for (; blen && (ix < iov_len); ++ix) { | |
342 | msg_iov = &iovs[ix]; | |
343 | ||
344 | /* XXX need to detect any buffer which needs to be | |
345 | * split due to coalescing of a segment (front, middle, | |
346 | * data) boundary */ | |
347 | ||
11fdf7f2 | 348 | take_len = std::min(blen, msg_iov->iov_len); |
7c673cae FG |
349 | payload.append( |
350 | buffer::create_msg( | |
351 | take_len, (char*) msg_iov->iov_base, m_hook)); | |
352 | blen -= take_len; | |
353 | if (! blen) { | |
354 | left_len = msg_iov->iov_len - take_len; | |
355 | if (left_len) { | |
356 | left_base = ((char*) msg_iov->iov_base) + take_len; | |
357 | } | |
358 | } | |
359 | } | |
360 | /* XXX as above, if a buffer is split, then we needed to track | |
361 | * the new start (carry) and not advance */ | |
362 | if (ix == iov_len) { | |
363 | msg_seq.next(&msg_iter); | |
364 | ix = 0; | |
365 | } | |
366 | } | |
367 | ||
368 | if (magic & (MSG_MAGIC_TRACE_XCON)) { | |
369 | if (hdr.hdr->type == 43) { | |
370 | ldout(msgr->cct,4) << "front (payload) dump:"; | |
371 | payload.hexdump( *_dout ); | |
372 | *_dout << dendl; | |
373 | } | |
374 | } | |
375 | ||
376 | blen = header.middle_len; | |
377 | ||
378 | if (blen && left_len) { | |
379 | middle.append( | |
380 | buffer::create_msg(left_len, left_base, m_hook)); | |
381 | left_len = 0; | |
382 | } | |
383 | ||
384 | while (blen && (msg_iter != msg_seq.end())) { | |
385 | tmsg = msg_iter; | |
386 | iov_len = vmsg_sglist_nents(&tmsg->in); | |
387 | iovs = vmsg_sglist(&tmsg->in); | |
388 | for (; blen && (ix < iov_len); ++ix) { | |
389 | msg_iov = &iovs[ix]; | |
11fdf7f2 | 390 | take_len = std::min(blen, msg_iov->iov_len); |
7c673cae FG |
391 | middle.append( |
392 | buffer::create_msg( | |
393 | take_len, (char*) msg_iov->iov_base, m_hook)); | |
394 | blen -= take_len; | |
395 | if (! blen) { | |
396 | left_len = msg_iov->iov_len - take_len; | |
397 | if (left_len) { | |
398 | left_base = ((char*) msg_iov->iov_base) + take_len; | |
399 | } | |
400 | } | |
401 | } | |
402 | if (ix == iov_len) { | |
403 | msg_seq.next(&msg_iter); | |
404 | ix = 0; | |
405 | } | |
406 | } | |
407 | ||
408 | blen = header.data_len; | |
409 | ||
410 | if (blen && left_len) { | |
411 | data.append( | |
412 | buffer::create_msg(left_len, left_base, m_hook)); | |
413 | left_len = 0; | |
414 | } | |
415 | ||
416 | while (blen && (msg_iter != msg_seq.end())) { | |
417 | tmsg = msg_iter; | |
418 | iov_len = vmsg_sglist_nents(&tmsg->in); | |
419 | iovs = vmsg_sglist(&tmsg->in); | |
420 | for (; blen && (ix < iov_len); ++ix) { | |
421 | msg_iov = &iovs[ix]; | |
422 | data.append( | |
423 | buffer::create_msg( | |
424 | msg_iov->iov_len, (char*) msg_iov->iov_base, m_hook)); | |
425 | blen -= msg_iov->iov_len; | |
426 | } | |
427 | if (ix == iov_len) { | |
428 | msg_seq.next(&msg_iter); | |
429 | ix = 0; | |
430 | } | |
431 | } | |
432 | ||
433 | /* update connection timestamp */ | |
31f18b77 | 434 | recv = tmsg->timestamp; |
7c673cae FG |
435 | |
436 | Message *m = decode_message(msgr->cct, msgr->crcflags, header, footer, | |
437 | payload, middle, data, this); | |
438 | ||
439 | if (m) { | |
440 | /* completion */ | |
441 | m->set_connection(this); | |
442 | ||
443 | /* reply hook */ | |
444 | m_hook->set_message(m); | |
445 | m->set_completion_hook(m_hook); | |
446 | ||
447 | /* trace flag */ | |
448 | m->set_magic(magic); | |
449 | ||
450 | /* update timestamps */ | |
451 | m->set_recv_stamp(recv_stamp); | |
452 | m->set_recv_complete_stamp(ceph_clock_now()); | |
453 | m->set_seq(header.seq); | |
454 | ||
455 | /* MP-SAFE */ | |
456 | state.set_in_seq(header.seq); | |
457 | ||
458 | /* XXXX validate peer type */ | |
459 | if (peer_type != (int) hdr.peer_type) { /* XXX isn't peer_type -1? */ | |
460 | peer_type = hdr.peer_type; | |
461 | peer_addr = hdr.addr; | |
462 | peer.addr = peer_addr; | |
463 | peer.name = entity_name_t(hdr.hdr->src); | |
464 | if (xio_conn_type == XioConnection::PASSIVE) { | |
465 | /* XXX kick off feature/authn/authz negotiation | |
466 | * nb: very possibly the active side should initiate this, but | |
467 | * for now, call a passive hook so OSD and friends can create | |
468 | * sessions without actually negotiating | |
469 | */ | |
470 | passive_setup(); | |
471 | } | |
472 | } | |
473 | ||
474 | if (magic & (MSG_MAGIC_TRACE_XCON)) { | |
475 | ldout(msgr->cct,4) << "decode m is " << m->get_type() << dendl; | |
476 | } | |
477 | ||
478 | /* dispatch it */ | |
479 | msgr->ds_dispatch(m); | |
480 | } else { | |
481 | /* responds for undecoded messages and frees hook */ | |
482 | ldout(msgr->cct,4) << "decode m failed" << dendl; | |
483 | m_hook->on_err_finalize(this); | |
484 | } | |
485 | ||
486 | return 0; | |
487 | } | |
488 | ||
489 | int XioConnection::on_msg(struct xio_session *session, | |
490 | struct xio_msg *msg, | |
491 | int more_in_batch, | |
492 | void *cb_user_context) | |
493 | { | |
494 | char tag = CEPH_MSGR_TAG_MSG; | |
495 | if (msg->in.header.iov_len) | |
496 | tag = *(char*)msg->in.header.iov_base; | |
497 | ||
498 | ldout(msgr->cct,8) << __func__ << " receive msg with iov_len " | |
499 | << (int) msg->in.header.iov_len << " tag " << (int)tag << dendl; | |
500 | ||
501 | //header_len_without_tag is only meaningful in case we have tag | |
502 | size_t header_len_without_tag = msg->in.header.iov_len - sizeof(tag); | |
503 | ||
504 | switch(tag) { | |
505 | case CEPH_MSGR_TAG_MSG: | |
506 | ldout(msgr->cct, 20) << __func__ << " got data message" << dendl; | |
507 | return handle_data_msg(session, msg, more_in_batch, cb_user_context); | |
508 | ||
509 | case CEPH_MSGR_TAG_KEEPALIVE: | |
510 | ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE" << dendl; | |
511 | set_last_keepalive(ceph_clock_now()); | |
512 | break; | |
513 | ||
514 | case CEPH_MSGR_TAG_KEEPALIVE2: | |
515 | if (header_len_without_tag < sizeof(ceph_timespec)) { | |
516 | lderr(msgr->cct) << __func__ << " too few data for KEEPALIVE2: got " << header_len_without_tag << | |
517 | " bytes instead of " << sizeof(ceph_timespec) << " bytes" << dendl; | |
518 | } | |
519 | else { | |
520 | ceph_timespec *t = (ceph_timespec *) ((char*)msg->in.header.iov_base + sizeof(tag)); | |
521 | utime_t kp_t = utime_t(*t); | |
522 | ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE2 with timestamp" << kp_t << dendl; | |
523 | send_keepalive_or_ack(true, &kp_t); | |
524 | set_last_keepalive(ceph_clock_now()); | |
525 | } | |
526 | ||
527 | break; | |
528 | ||
529 | case CEPH_MSGR_TAG_KEEPALIVE2_ACK: | |
530 | if (header_len_without_tag < sizeof(ceph_timespec)) { | |
531 | lderr(msgr->cct) << __func__ << " too few data for KEEPALIVE2_ACK: got " << header_len_without_tag << | |
532 | " bytes instead of " << sizeof(ceph_timespec) << " bytes" << dendl; | |
533 | } | |
534 | else { | |
535 | ceph_timespec *t = (ceph_timespec *) ((char*)msg->in.header.iov_base + sizeof(tag)); | |
536 | utime_t kp_t(*t); | |
537 | ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE2_ACK with timestamp" << kp_t << dendl; | |
538 | set_last_keepalive_ack(kp_t); | |
539 | } | |
540 | break; | |
541 | ||
542 | default: | |
543 | lderr(msgr->cct) << __func__ << " unsupported message tag " << (int) tag << dendl; | |
11fdf7f2 | 544 | ceph_assert(! "unsupported message tag"); |
7c673cae FG |
545 | } |
546 | ||
547 | xio_release_msg(msg); | |
548 | return 0; | |
549 | } | |
550 | ||
551 | ||
552 | int XioConnection::on_ow_msg_send_complete(struct xio_session *session, | |
553 | struct xio_msg *req, | |
554 | void *conn_user_context) | |
555 | { | |
556 | /* requester send complete (one-way) */ | |
557 | uint64_t rc = ++scount; | |
558 | ||
559 | XioSend* xsend = static_cast<XioSend*>(req->user_context); | |
560 | if (unlikely(magic & MSG_MAGIC_TRACE_CTR)) { | |
561 | if (unlikely((rc % 1000000) == 0)) { | |
562 | std::cout << "xio finished " << rc << " " << time(0) << std::endl; | |
563 | } | |
564 | } /* trace ctr */ | |
565 | ||
566 | ldout(msgr->cct,11) << "on_msg_delivered xcon: " << xsend->xcon << | |
567 | " msg: " << req << " sn: " << req->sn << dendl; | |
568 | ||
569 | XioMsg *xmsg = dynamic_cast<XioMsg*>(xsend); | |
570 | if (xmsg) { | |
571 | ldout(msgr->cct,11) << "on_msg_delivered xcon: " << | |
572 | " type: " << xmsg->m->get_type() << " tid: " << xmsg->m->get_tid() << | |
573 | " seq: " << xmsg->m->get_seq() << dendl; | |
574 | } | |
575 | ||
576 | --send_ctr; /* atomic, because portal thread */ | |
577 | ||
578 | /* unblock flow-controlled connections, avoid oscillation */ | |
579 | if (unlikely(cstate.session_state.read() == | |
580 | XioConnection::FLOW_CONTROLLED)) { | |
581 | if ((send_ctr <= uint32_t(xio_qdepth_low_mark())) && | |
582 | (1 /* XXX memory <= memory low-water mark */)) { | |
583 | cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE); | |
584 | ldout(msgr->cct,2) << "on_msg_delivered xcon: " << xsend->xcon | |
585 | << " up_ready from flow_controlled" << dendl; | |
586 | } | |
587 | } | |
588 | ||
589 | xsend->put(); | |
590 | ||
591 | return 0; | |
592 | } /* on_msg_delivered */ | |
593 | ||
594 | void XioConnection::msg_send_fail(XioSend *xsend, int code) | |
595 | { | |
596 | ldout(msgr->cct,2) << "xio_send_msg FAILED xcon: " << this << | |
597 | " msg: " << xsend->get_xio_msg() << " code=" << code << | |
598 | " (" << xio_strerror(code) << ")" << dendl; | |
599 | /* return refs taken for each xio_msg */ | |
600 | xsend->put_msg_refs(); | |
601 | } /* msg_send_fail */ | |
602 | ||
603 | void XioConnection::msg_release_fail(struct xio_msg *msg, int code) | |
604 | { | |
605 | ldout(msgr->cct,2) << "xio_release_msg FAILED xcon: " << this << | |
606 | " msg: " << msg << "code=" << code << | |
607 | " (" << xio_strerror(code) << ")" << dendl; | |
608 | } /* msg_release_fail */ | |
609 | ||
610 | int XioConnection::flush_out_queues(uint32_t flags) { | |
611 | XioMessenger* msgr = static_cast<XioMessenger*>(get_messenger()); | |
612 | if (! (flags & CState::OP_FLAG_LOCKED)) | |
11fdf7f2 | 613 | sp.lock(); |
7c673cae FG |
614 | |
615 | if (outgoing.keepalive) { | |
616 | outgoing.keepalive = false; | |
617 | send_keepalive_or_ack_internal(); | |
618 | } | |
619 | ||
620 | if (outgoing.ack) { | |
621 | outgoing.ack = false; | |
622 | send_keepalive_or_ack_internal(true, &outgoing.ack_time); | |
623 | } | |
624 | ||
625 | // send deferred 1 (direct backpresssure) | |
626 | if (outgoing.requeue.size() > 0) | |
627 | portal->requeue(this, outgoing.requeue); | |
628 | ||
629 | // send deferred 2 (sent while deferred) | |
630 | int ix, q_size = outgoing.mqueue.size(); | |
631 | for (ix = 0; ix < q_size; ++ix) { | |
632 | Message::Queue::iterator q_iter = outgoing.mqueue.begin(); | |
633 | Message* m = &(*q_iter); | |
634 | outgoing.mqueue.erase(q_iter); | |
635 | msgr->_send_message_impl(m, this); | |
636 | } | |
637 | if (! (flags & CState::OP_FLAG_LOCKED)) | |
11fdf7f2 | 638 | sp.unlock(); |
7c673cae FG |
639 | return 0; |
640 | } | |
641 | ||
642 | int XioConnection::discard_out_queues(uint32_t flags) | |
643 | { | |
644 | Message::Queue disc_q; | |
645 | XioSubmit::Queue deferred_q; | |
646 | ||
647 | if (! (flags & CState::OP_FLAG_LOCKED)) | |
11fdf7f2 | 648 | sp.lock(); |
7c673cae FG |
649 | |
650 | /* the two send queues contain different objects: | |
651 | * - anything on the mqueue is a Message | |
652 | * - anything on the requeue is an XioSend | |
653 | */ | |
654 | Message::Queue::const_iterator i1 = disc_q.end(); | |
655 | disc_q.splice(i1, outgoing.mqueue); | |
656 | ||
657 | XioSubmit::Queue::const_iterator i2 = deferred_q.end(); | |
658 | deferred_q.splice(i2, outgoing.requeue); | |
659 | ||
660 | outgoing.keepalive = outgoing.ack = false; | |
661 | ||
662 | if (! (flags & CState::OP_FLAG_LOCKED)) | |
11fdf7f2 | 663 | sp.unlock(); |
7c673cae FG |
664 | |
665 | // mqueue | |
666 | while (!disc_q.empty()) { | |
667 | Message::Queue::iterator q_iter = disc_q.begin(); | |
668 | Message* m = &(*q_iter); | |
669 | disc_q.erase(q_iter); | |
670 | m->put(); | |
671 | } | |
672 | ||
673 | // requeue | |
674 | while (!deferred_q.empty()) { | |
675 | XioSubmit::Queue::iterator q_iter = deferred_q.begin(); | |
676 | XioSubmit* xs = &(*q_iter); | |
677 | XioSend* xsend; | |
678 | switch (xs->type) { | |
679 | case XioSubmit::OUTGOING_MSG: | |
680 | xsend = static_cast<XioSend*>(xs); | |
681 | deferred_q.erase(q_iter); | |
682 | // release once for each chained xio_msg | |
683 | xsend->put(xsend->get_msg_count()); | |
684 | break; | |
685 | case XioSubmit::INCOMING_MSG_RELEASE: | |
686 | deferred_q.erase(q_iter); | |
687 | portal->release_xio_msg(static_cast<XioCompletion*>(xs)); | |
688 | break; | |
689 | default: | |
690 | ldout(msgr->cct,0) << __func__ << ": Unknown Msg type " << xs->type << dendl; | |
691 | break; | |
692 | } | |
693 | } | |
694 | ||
695 | return 0; | |
696 | } | |
697 | ||
698 | int XioConnection::adjust_clru(uint32_t flags) | |
699 | { | |
700 | if (flags & CState::OP_FLAG_LOCKED) | |
11fdf7f2 | 701 | sp.unlock(); |
7c673cae FG |
702 | |
703 | XioMessenger* msgr = static_cast<XioMessenger*>(get_messenger()); | |
704 | msgr->conns_sp.lock(); | |
11fdf7f2 | 705 | sp.lock(); |
7c673cae FG |
706 | |
707 | if (cstate.flags & CState::FLAG_MAPPED) { | |
708 | XioConnection::ConnList::iterator citer = | |
709 | XioConnection::ConnList::s_iterator_to(*this); | |
710 | msgr->conns_list.erase(citer); | |
711 | msgr->conns_list.push_front(*this); // LRU | |
712 | } | |
713 | ||
714 | msgr->conns_sp.unlock(); | |
715 | ||
716 | if (! (flags & CState::OP_FLAG_LOCKED)) | |
11fdf7f2 | 717 | sp.unlock(); |
7c673cae FG |
718 | |
719 | return 0; | |
720 | } | |
721 | ||
722 | int XioConnection::on_msg_error(struct xio_session *session, | |
723 | enum xio_status error, | |
724 | struct xio_msg *msg, | |
725 | void *conn_user_context) | |
726 | { | |
727 | XioSend *xsend = static_cast<XioSend*>(msg->user_context); | |
728 | if (xsend) | |
729 | xsend->put(); | |
730 | ||
731 | --send_ctr; /* atomic, because portal thread */ | |
732 | return 0; | |
733 | } /* on_msg_error */ | |
734 | ||
735 | void XioConnection::mark_down() | |
736 | { | |
737 | _mark_down(XioConnection::CState::OP_FLAG_NONE); | |
738 | } | |
739 | ||
740 | int XioConnection::_mark_down(uint32_t flags) | |
741 | { | |
742 | if (! (flags & CState::OP_FLAG_LOCKED)) | |
11fdf7f2 | 743 | sp.lock(); |
7c673cae FG |
744 | |
745 | // per interface comment, we only stage a remote reset if the | |
746 | // current policy required it | |
747 | if (cstate.policy.resetcheck) | |
748 | cstate.flags |= CState::FLAG_RESET; | |
749 | ||
750 | disconnect(); | |
751 | ||
752 | /* XXX this will almost certainly be called again from | |
753 | * on_disconnect_event() */ | |
754 | discard_out_queues(flags|CState::OP_FLAG_LOCKED); | |
755 | ||
756 | if (! (flags & CState::OP_FLAG_LOCKED)) | |
11fdf7f2 | 757 | sp.unlock(); |
7c673cae FG |
758 | |
759 | return 0; | |
760 | } | |
761 | ||
762 | void XioConnection::mark_disposable() | |
763 | { | |
764 | _mark_disposable(XioConnection::CState::OP_FLAG_NONE); | |
765 | } | |
766 | ||
767 | int XioConnection::_mark_disposable(uint32_t flags) | |
768 | { | |
769 | if (! (flags & CState::OP_FLAG_LOCKED)) | |
11fdf7f2 | 770 | sp.lock(); |
7c673cae FG |
771 | |
772 | cstate.policy.lossy = true; | |
773 | ||
774 | if (! (flags & CState::OP_FLAG_LOCKED)) | |
11fdf7f2 | 775 | sp.unlock(); |
7c673cae FG |
776 | |
777 | return 0; | |
778 | } | |
779 | ||
780 | int XioConnection::CState::state_up_ready(uint32_t flags) | |
781 | { | |
782 | if (! (flags & CState::OP_FLAG_LOCKED)) | |
11fdf7f2 | 783 | xcon->sp.lock(); |
7c673cae FG |
784 | |
785 | xcon->flush_out_queues(flags|CState::OP_FLAG_LOCKED); | |
786 | ||
31f18b77 FG |
787 | session_state = session_states::UP; |
788 | startup_state = session_startup_states::READY; | |
7c673cae FG |
789 | |
790 | if (! (flags & CState::OP_FLAG_LOCKED)) | |
11fdf7f2 | 791 | xcon->sp.unlock(); |
7c673cae FG |
792 | |
793 | return (0); | |
794 | } | |
795 | ||
796 | int XioConnection::CState::state_discon() | |
797 | { | |
31f18b77 FG |
798 | session_state = session_states::DISCONNECTED; |
799 | startup_state = session_startup_states::IDLE; | |
7c673cae FG |
800 | |
801 | return 0; | |
802 | } | |
803 | ||
804 | int XioConnection::CState::state_flow_controlled(uint32_t flags) | |
805 | { | |
806 | if (! (flags & OP_FLAG_LOCKED)) | |
11fdf7f2 | 807 | xcon->sp.lock(); |
7c673cae | 808 | |
31f18b77 | 809 | session_state = session_states::FLOW_CONTROLLED; |
7c673cae FG |
810 | |
811 | if (! (flags & OP_FLAG_LOCKED)) | |
11fdf7f2 | 812 | xcon->sp.unlock(); |
7c673cae FG |
813 | |
814 | return (0); | |
815 | } | |
816 | ||
817 | int XioConnection::CState::state_fail(Message* m, uint32_t flags) | |
818 | { | |
819 | if (! (flags & OP_FLAG_LOCKED)) | |
11fdf7f2 | 820 | xcon->sp.lock(); |
7c673cae FG |
821 | |
822 | // advance to state FAIL, drop queued, msgs, adjust LRU | |
11fdf7f2 TL |
823 | session_state = session_states::DISCONNECTED; |
824 | startup_state = session_startup_states::FAIL; | |
7c673cae FG |
825 | |
826 | xcon->discard_out_queues(flags|OP_FLAG_LOCKED); | |
827 | xcon->adjust_clru(flags|OP_FLAG_LOCKED|OP_FLAG_LRU); | |
828 | ||
829 | xcon->disconnect(); | |
830 | ||
831 | if (! (flags & OP_FLAG_LOCKED)) | |
11fdf7f2 | 832 | xcon->sp.unlock(); |
7c673cae FG |
833 | |
834 | // notify ULP | |
835 | XioMessenger* msgr = static_cast<XioMessenger*>(xcon->get_messenger()); | |
836 | msgr->ms_deliver_handle_reset(xcon); | |
837 | m->put(); | |
838 | ||
839 | return 0; | |
840 | } | |
841 | ||
842 | ||
843 | int XioLoopbackConnection::send_message(Message *m) | |
844 | { | |
845 | XioMessenger *ms = static_cast<XioMessenger*>(get_messenger()); | |
846 | m->set_connection(this); | |
847 | m->set_seq(next_seq()); | |
848 | m->set_src(ms->get_myinst().name); | |
849 | ms->ds_dispatch(m); | |
850 | return 0; | |
851 | } | |
852 | ||
853 | void XioLoopbackConnection::send_keepalive() | |
854 | { | |
855 | utime_t t = ceph_clock_now(); | |
856 | set_last_keepalive(t); | |
857 | set_last_keepalive_ack(t); | |
858 | } |