]>
git.proxmox.com Git - ceph.git/blob - ceph/src/msg/xio/XioMsg.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Portions Copyright (C) 2013 CohortFS, LLC
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
19 #include <boost/intrusive/list.hpp>
20 #include "msg/SimplePolicyMessenger.h"
24 #include "XioConnection.h"
25 #include "XioSubmit.h"
26 #include "msg/msg_types.h"
29 namespace bi
= boost::intrusive
;
39 explicit XioMsgCnt(buffer::ptr p
)
42 buffer::list::iterator bl_iter
= bl
.begin();
43 ::decode(msg_cnt
, bl_iter
);
53 entity_addr_t addr
; /* XXX hack! */
59 XioMsgHdr(ceph_msg_header
& _hdr
, ceph_msg_footer
& _ftr
, uint64_t _features
)
60 : tag(CEPH_MSGR_TAG_MSG
), msg_cnt(0), hdr(&_hdr
), ftr(&_ftr
),
64 XioMsgHdr(ceph_msg_header
& _hdr
, ceph_msg_footer
&_ftr
, buffer::ptr p
)
65 : hdr(&_hdr
), ftr(&_ftr
)
68 buffer::list::iterator bl_iter
= bl
.begin();
72 static size_t get_max_encoded_length();
74 const buffer::list
& get_bl() { encode(bl
); return bl
; };
76 inline void encode_hdr(ceph::buffer::list
& bl
) const {
78 ::encode(msg_cnt
, bl
);
79 ::encode(peer_type
, bl
);
80 ::encode(addr
, bl
, features
);
81 ::encode(hdr
->seq
, bl
);
82 ::encode(hdr
->tid
, bl
);
83 ::encode(hdr
->type
, bl
);
84 ::encode(hdr
->priority
, bl
);
85 ::encode(hdr
->version
, bl
);
86 ::encode(hdr
->front_len
, bl
);
87 ::encode(hdr
->middle_len
, bl
);
88 ::encode(hdr
->data_len
, bl
);
89 ::encode(hdr
->data_off
, bl
);
90 ::encode(hdr
->src
.type
, bl
);
91 ::encode(hdr
->src
.num
, bl
);
92 ::encode(hdr
->compat_version
, bl
);
93 ::encode(hdr
->crc
, bl
);
96 inline void encode_ftr(buffer::list
& bl
) const {
97 ::encode(ftr
->front_crc
, bl
);
98 ::encode(ftr
->middle_crc
, bl
);
99 ::encode(ftr
->data_crc
, bl
);
100 ::encode(ftr
->sig
, bl
);
101 ::encode(ftr
->flags
, bl
);
104 inline void encode(buffer::list
& bl
) const {
109 inline void decode_hdr(buffer::list::iterator
& bl
) {
111 ::decode(msg_cnt
, bl
);
112 ::decode(peer_type
, bl
);
114 ::decode(hdr
->seq
, bl
);
115 ::decode(hdr
->tid
, bl
);
116 ::decode(hdr
->type
, bl
);
117 ::decode(hdr
->priority
, bl
);
118 ::decode(hdr
->version
, bl
);
119 ::decode(hdr
->front_len
, bl
);
120 ::decode(hdr
->middle_len
, bl
);
121 ::decode(hdr
->data_len
, bl
);
122 ::decode(hdr
->data_off
, bl
);
123 ::decode(hdr
->src
.type
, bl
);
124 ::decode(hdr
->src
.num
, bl
);
125 ::decode(hdr
->compat_version
, bl
);
126 ::decode(hdr
->crc
, bl
);
129 inline void decode_ftr(buffer::list::iterator
& bl
) {
130 ::decode(ftr
->front_crc
, bl
);
131 ::decode(ftr
->middle_crc
, bl
);
132 ::decode(ftr
->data_crc
, bl
);
133 ::decode(ftr
->sig
, bl
);
134 ::decode(ftr
->flags
, bl
);
137 inline void decode(buffer::list::iterator
& bl
) {
146 WRITE_CLASS_ENCODER(XioMsgHdr
);
148 extern struct xio_mempool
*xio_msgr_noreg_mpool
;
150 #define XIO_MSGR_IOVLEN 16
155 struct xio_iovec_ex iovs
[XIO_MSGR_IOVLEN
];
157 explicit xio_msg_ex(void* user_context
) {
158 // go in structure order
159 msg
.in
.header
.iov_len
= 0;
160 msg
.in
.header
.iov_base
= NULL
; /* XXX Accelio requires this currently */
162 msg
.in
.sgl_type
= XIO_SGL_TYPE_IOV_PTR
;
163 msg
.in
.pdata_iov
.max_nents
= XIO_MSGR_IOVLEN
;
164 msg
.in
.pdata_iov
.nents
= 0;
165 msg
.in
.pdata_iov
.sglist
= iovs
;
167 // minimal zero "out" side
168 msg
.out
.header
.iov_len
= 0;
169 msg
.out
.header
.iov_base
= NULL
; /* XXX Accelio requires this currently,
171 // out (some members adjusted later)
172 msg
.out
.sgl_type
= XIO_SGL_TYPE_IOV_PTR
;
173 msg
.out
.pdata_iov
.max_nents
= XIO_MSGR_IOVLEN
;
174 msg
.out
.pdata_iov
.nents
= 0;
175 msg
.out
.pdata_iov
.sglist
= iovs
;
177 // minimal initialize an "out" msg
179 msg
.type
= XIO_MSG_TYPE_ONE_WAY
;
180 // for now, we DO NEED receipts for every msg
182 msg
.user_context
= user_context
;
184 // minimal zero "in" side
188 class XioSend
: public XioSubmit
191 virtual void print_debug(CephContext
*cct
, const char *tag
) const {};
192 const struct xio_msg
* get_xio_msg() const {return &req_0
.msg
;}
193 struct xio_msg
* get_xio_msg() {return &req_0
.msg
;}
194 virtual size_t get_msg_count() const {return 1;}
196 XioSend(XioConnection
*_xcon
, struct xio_reg_mem
& _mp
, int _ex_cnt
=0) :
197 XioSubmit(XioSubmit::OUTGOING_MSG
, _xcon
),
198 req_0(this), mp_this(_mp
), nrefs(_ex_cnt
+1)
204 XioSend
* get() { nrefs
++; return this; };
207 int refs
= nrefs
-= n
;
209 struct xio_reg_mem
*mp
= &this->mp_this
;
211 xpool_free(sizeof(XioSend
), mp
);
219 void put_msg_refs() {
220 put(get_msg_count());
230 struct xio_reg_mem mp_this
;
231 std::atomic
<unsigned> nrefs
= { 0 };
234 class XioCommand
: public XioSend
237 XioCommand(XioConnection
*_xcon
, struct xio_reg_mem
& _mp
):XioSend(_xcon
, _mp
) {
240 buffer::list
& get_bl_ref() { return bl
; };
246 struct XioMsg
: public XioSend
254 XioMsg(Message
*_m
, XioConnection
*_xcon
, struct xio_reg_mem
& _mp
,
255 int _ex_cnt
, uint64_t _features
) :
256 XioSend(_xcon
, _mp
, _ex_cnt
),
257 m(_m
), hdr(m
->get_header(), m
->get_footer(), _features
),
260 const entity_inst_t
&inst
= xcon
->get_messenger()->get_myinst();
261 hdr
.peer_type
= inst
.name
.type();
262 hdr
.addr
= xcon
->get_messenger()->get_myaddr();
263 hdr
.hdr
->src
.type
= inst
.name
.type();
264 hdr
.hdr
->src
.num
= inst
.name
.num();
265 hdr
.msg_cnt
= _ex_cnt
+1;
267 if (unlikely(_ex_cnt
> 0)) {
268 alloc_trailers(_ex_cnt
);
272 void print_debug(CephContext
*cct
, const char *tag
) const override
;
273 size_t get_msg_count() const override
{
277 void alloc_trailers(int cnt
) {
278 req_arr
= static_cast<xio_msg_ex
*>(malloc(cnt
* sizeof(xio_msg_ex
)));
279 for (int ix
= 0; ix
< cnt
; ++ix
) {
280 xio_msg_ex
* xreq
= &(req_arr
[ix
]);
281 new (xreq
) xio_msg_ex(this);
285 Message
*get_message() { return m
; }
289 if (unlikely(!!req_arr
)) {
290 for (unsigned int ix
= 0; ix
< get_msg_count()-1; ++ix
) {
291 xio_msg_ex
* xreq
= &(req_arr
[ix
]);
297 /* testing only! server's ready, resubmit request (not reached on
298 * PASSIVE/server side) */
299 if (unlikely(m
->get_magic() & MSG_MAGIC_REDUPE
)) {
300 if (likely(xcon
->is_connected())) {
301 xcon
->send_message(m
);
307 /* the normal case: done with message */
313 class XioDispatchHook
: public Message::CompletionHook
319 std::atomic
<unsigned> nrefs
{ 1 };
321 friend class XioConnection
;
322 friend class XioMessenger
;
324 struct xio_reg_mem mp_this
;
326 XioDispatchHook(XioConnection
*_xcon
, Message
*_m
, XioInSeq
& _msg_seq
,
327 struct xio_reg_mem
& _mp
) :
331 rsp_pool(xio_msgr_noreg_mpool
),
335 ++xcon
->n_reqs
; // atomicity by portal thread
339 virtual void finish(int r
) {
343 virtual void complete(int r
) {
349 XioDispatchHook
* get() {
350 nrefs
++; return this;
353 void put(int n
= 1) {
354 int refs
= nrefs
-= n
;
356 /* in Marcus' new system, refs reaches 0 twice: once in
357 * Message lifecycle, and again after xio_release_msg.
359 if (!cl_flag
&& release_msgs())
361 struct xio_reg_mem
*mp
= &this->mp_this
;
362 this->~XioDispatchHook();
363 xpool_free(sizeof(XioDispatchHook
), mp
);
367 XioInSeq
& get_seq() { return msg_seq
; }
369 XioPool
& get_pool() { return rsp_pool
; }
371 void on_err_finalize(XioConnection
*xcon
) {
372 /* can't decode message; even with one-way must free
373 * xio_msg structures, and then xiopool
379 --xcon
->n_reqs
; // atomicity by portal thread
385 /* A sender-side CompletionHook that relies on the on_msg_delivered
386 * to complete a pending mark down. */
387 class XioMarkDownHook
: public Message::CompletionHook
393 struct xio_reg_mem mp_this
;
396 XioConnection
* _xcon
, Message
*_m
, struct xio_reg_mem
& _mp
) :
397 CompletionHook(_m
), xcon(_xcon
->get()), mp_this(_mp
)
400 virtual void claim(int r
) {}
402 virtual void finish(int r
) {
404 struct xio_reg_mem
*mp
= &this->mp_this
;
405 this->~XioMarkDownHook();
406 xio_mempool_free(mp
);
409 virtual void complete(int r
) {
410 xcon
->_mark_down(XioConnection::CState::OP_FLAG_NONE
);
415 struct XioCompletion
: public XioSubmit
417 XioDispatchHook
*xhook
;
419 XioCompletion(XioConnection
*_xcon
, XioDispatchHook
*_xhook
)
420 : XioSubmit(XioSubmit::INCOMING_MSG_RELEASE
, _xcon
/* not xcon! */),
421 xhook(_xhook
->get()) {
426 struct xio_msg
* dequeue() {
427 return xhook
->get_seq().dequeue();
430 XioDispatchHook
* get_xhook() { return xhook
; }
438 void print_xio_msg_hdr(CephContext
*cct
, const char *tag
,
439 const XioMsgHdr
&hdr
, const struct xio_msg
*msg
);
440 void print_ceph_msg(CephContext
*cct
, const char *tag
, Message
*m
);
442 #endif /* XIO_MSG_H */