]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * Portions Copyright (C) 2013 CohortFS, LLC | |
8 | * | |
9 | * This is free software; you can redistribute it and/or | |
10 | * modify it under the terms of the GNU Lesser General Public | |
11 | * License version 2.1, as published by the Free Software | |
12 | * Foundation. See file COPYING. | |
13 | * | |
14 | */ | |
15 | ||
16 | #ifndef XIO_MSG_H | |
17 | #define XIO_MSG_H | |
18 | ||
19 | #include <boost/intrusive/list.hpp> | |
20 | #include "msg/SimplePolicyMessenger.h" | |
21 | extern "C" { | |
22 | #include "libxio.h" | |
23 | } | |
24 | #include "XioConnection.h" | |
25 | #include "XioSubmit.h" | |
26 | #include "msg/msg_types.h" | |
27 | #include "XioPool.h" | |
28 | ||
29 | namespace bi = boost::intrusive; | |
30 | ||
31 | class XioMessenger; | |
32 | ||
33 | class XioMsgCnt | |
34 | { | |
35 | public: | |
eafe8130 | 36 | ceph_le32 msg_cnt; |
7c673cae FG |
37 | buffer::list bl; |
38 | public: | |
39 | explicit XioMsgCnt(buffer::ptr p) | |
40 | { | |
41 | bl.append(p); | |
42 | buffer::list::iterator bl_iter = bl.begin(); | |
11fdf7f2 | 43 | decode(msg_cnt, bl_iter); |
7c673cae FG |
44 | } |
45 | }; | |
46 | ||
47 | class XioMsgHdr | |
48 | { | |
49 | public: | |
50 | char tag; | |
eafe8130 TL |
51 | ceph_le32 msg_cnt; |
52 | ceph_le32 peer_type; | |
7c673cae FG |
53 | entity_addr_t addr; /* XXX hack! */ |
54 | ceph_msg_header* hdr; | |
55 | ceph_msg_footer* ftr; | |
56 | uint64_t features; | |
57 | buffer::list bl; | |
58 | public: | |
59 | XioMsgHdr(ceph_msg_header& _hdr, ceph_msg_footer& _ftr, uint64_t _features) | |
eafe8130 | 60 | : tag(CEPH_MSGR_TAG_MSG), msg_cnt(init_le32(0)), hdr(&_hdr), ftr(&_ftr), |
7c673cae FG |
61 | features(_features) |
62 | { } | |
63 | ||
64 | XioMsgHdr(ceph_msg_header& _hdr, ceph_msg_footer &_ftr, buffer::ptr p) | |
65 | : hdr(&_hdr), ftr(&_ftr) | |
66 | { | |
67 | bl.append(p); | |
68 | buffer::list::iterator bl_iter = bl.begin(); | |
69 | decode(bl_iter); | |
70 | } | |
71 | ||
72 | static size_t get_max_encoded_length(); | |
73 | ||
74 | const buffer::list& get_bl() { encode(bl); return bl; }; | |
75 | ||
76 | inline void encode_hdr(ceph::buffer::list& bl) const { | |
11fdf7f2 TL |
77 | using ceph::encode; |
78 | encode(tag, bl); | |
79 | encode(msg_cnt, bl); | |
80 | encode(peer_type, bl); | |
81 | encode(addr, bl, features); | |
82 | encode(hdr->seq, bl); | |
83 | encode(hdr->tid, bl); | |
84 | encode(hdr->type, bl); | |
85 | encode(hdr->priority, bl); | |
86 | encode(hdr->version, bl); | |
87 | encode(hdr->front_len, bl); | |
88 | encode(hdr->middle_len, bl); | |
89 | encode(hdr->data_len, bl); | |
90 | encode(hdr->data_off, bl); | |
91 | encode(hdr->src.type, bl); | |
92 | encode(hdr->src.num, bl); | |
93 | encode(hdr->compat_version, bl); | |
94 | encode(hdr->crc, bl); | |
7c673cae FG |
95 | } |
96 | ||
97 | inline void encode_ftr(buffer::list& bl) const { | |
11fdf7f2 TL |
98 | using ceph::encode; |
99 | encode(ftr->front_crc, bl); | |
100 | encode(ftr->middle_crc, bl); | |
101 | encode(ftr->data_crc, bl); | |
102 | encode(ftr->sig, bl); | |
103 | encode(ftr->flags, bl); | |
7c673cae FG |
104 | } |
105 | ||
106 | inline void encode(buffer::list& bl) const { | |
107 | encode_hdr(bl); | |
108 | encode_ftr(bl); | |
109 | } | |
110 | ||
111 | inline void decode_hdr(buffer::list::iterator& bl) { | |
11fdf7f2 TL |
112 | using ceph::decode; |
113 | decode(tag, bl); | |
114 | decode(msg_cnt, bl); | |
115 | decode(peer_type, bl); | |
116 | decode(addr, bl); | |
117 | decode(hdr->seq, bl); | |
118 | decode(hdr->tid, bl); | |
119 | decode(hdr->type, bl); | |
120 | decode(hdr->priority, bl); | |
121 | decode(hdr->version, bl); | |
122 | decode(hdr->front_len, bl); | |
123 | decode(hdr->middle_len, bl); | |
124 | decode(hdr->data_len, bl); | |
125 | decode(hdr->data_off, bl); | |
126 | decode(hdr->src.type, bl); | |
127 | decode(hdr->src.num, bl); | |
128 | decode(hdr->compat_version, bl); | |
129 | decode(hdr->crc, bl); | |
7c673cae FG |
130 | } |
131 | ||
132 | inline void decode_ftr(buffer::list::iterator& bl) { | |
11fdf7f2 TL |
133 | using ceph::decode; |
134 | decode(ftr->front_crc, bl); | |
135 | decode(ftr->middle_crc, bl); | |
136 | decode(ftr->data_crc, bl); | |
137 | decode(ftr->sig, bl); | |
138 | decode(ftr->flags, bl); | |
7c673cae FG |
139 | } |
140 | ||
141 | inline void decode(buffer::list::iterator& bl) { | |
142 | decode_hdr(bl); | |
143 | decode_ftr(bl); | |
144 | } | |
145 | ||
146 | virtual ~XioMsgHdr() | |
147 | {} | |
148 | }; | |
149 | ||
150 | WRITE_CLASS_ENCODER(XioMsgHdr); | |
151 | ||
152 | extern struct xio_mempool *xio_msgr_noreg_mpool; | |
153 | ||
154 | #define XIO_MSGR_IOVLEN 16 | |
155 | ||
156 | struct xio_msg_ex | |
157 | { | |
158 | struct xio_msg msg; | |
159 | struct xio_iovec_ex iovs[XIO_MSGR_IOVLEN]; | |
160 | ||
161 | explicit xio_msg_ex(void* user_context) { | |
162 | // go in structure order | |
163 | msg.in.header.iov_len = 0; | |
164 | msg.in.header.iov_base = NULL; /* XXX Accelio requires this currently */ | |
165 | ||
166 | msg.in.sgl_type = XIO_SGL_TYPE_IOV_PTR; | |
167 | msg.in.pdata_iov.max_nents = XIO_MSGR_IOVLEN; | |
168 | msg.in.pdata_iov.nents = 0; | |
169 | msg.in.pdata_iov.sglist = iovs; | |
170 | ||
171 | // minimal zero "out" side | |
172 | msg.out.header.iov_len = 0; | |
173 | msg.out.header.iov_base = NULL; /* XXX Accelio requires this currently, | |
174 | * against spec */ | |
175 | // out (some members adjusted later) | |
176 | msg.out.sgl_type = XIO_SGL_TYPE_IOV_PTR; | |
177 | msg.out.pdata_iov.max_nents = XIO_MSGR_IOVLEN; | |
178 | msg.out.pdata_iov.nents = 0; | |
179 | msg.out.pdata_iov.sglist = iovs; | |
180 | ||
181 | // minimal initialize an "out" msg | |
182 | msg.request = NULL; | |
183 | msg.type = XIO_MSG_TYPE_ONE_WAY; | |
184 | // for now, we DO NEED receipts for every msg | |
185 | msg.flags = 0; | |
186 | msg.user_context = user_context; | |
187 | msg.next = NULL; | |
188 | // minimal zero "in" side | |
189 | } | |
190 | }; | |
191 | ||
192 | class XioSend : public XioSubmit | |
193 | { | |
194 | public: | |
195 | virtual void print_debug(CephContext *cct, const char *tag) const {}; | |
196 | const struct xio_msg * get_xio_msg() const {return &req_0.msg;} | |
197 | struct xio_msg * get_xio_msg() {return &req_0.msg;} | |
198 | virtual size_t get_msg_count() const {return 1;} | |
199 | ||
200 | XioSend(XioConnection *_xcon, struct xio_reg_mem& _mp, int _ex_cnt=0) : | |
201 | XioSubmit(XioSubmit::OUTGOING_MSG, _xcon), | |
202 | req_0(this), mp_this(_mp), nrefs(_ex_cnt+1) | |
203 | { | |
204 | xpool_inc_msgcnt(); | |
205 | xcon->get(); | |
206 | } | |
207 | ||
31f18b77 | 208 | XioSend* get() { nrefs++; return this; }; |
7c673cae FG |
209 | |
210 | void put(int n) { | |
31f18b77 | 211 | int refs = nrefs -= n; |
7c673cae FG |
212 | if (refs == 0) { |
213 | struct xio_reg_mem *mp = &this->mp_this; | |
214 | this->~XioSend(); | |
215 | xpool_free(sizeof(XioSend), mp); | |
216 | } | |
217 | } | |
218 | ||
219 | void put() { | |
220 | put(1); | |
221 | } | |
222 | ||
223 | void put_msg_refs() { | |
224 | put(get_msg_count()); | |
225 | } | |
226 | ||
227 | virtual ~XioSend() { | |
228 | xpool_dec_msgcnt(); | |
229 | xcon->put(); | |
230 | } | |
231 | ||
232 | private: | |
233 | xio_msg_ex req_0; | |
234 | struct xio_reg_mem mp_this; | |
31f18b77 | 235 | std::atomic<unsigned> nrefs = { 0 }; |
7c673cae FG |
236 | }; |
237 | ||
238 | class XioCommand : public XioSend | |
239 | { | |
240 | public: | |
241 | XioCommand(XioConnection *_xcon, struct xio_reg_mem& _mp):XioSend(_xcon, _mp) { | |
242 | } | |
243 | ||
244 | buffer::list& get_bl_ref() { return bl; }; | |
245 | ||
246 | private: | |
247 | buffer::list bl; | |
248 | }; | |
249 | ||
250 | struct XioMsg : public XioSend | |
251 | { | |
252 | public: | |
253 | Message* m; | |
254 | XioMsgHdr hdr; | |
255 | xio_msg_ex* req_arr; | |
256 | ||
257 | public: | |
258 | XioMsg(Message *_m, XioConnection *_xcon, struct xio_reg_mem& _mp, | |
259 | int _ex_cnt, uint64_t _features) : | |
260 | XioSend(_xcon, _mp, _ex_cnt), | |
261 | m(_m), hdr(m->get_header(), m->get_footer(), _features), | |
262 | req_arr(NULL) | |
263 | { | |
264 | const entity_inst_t &inst = xcon->get_messenger()->get_myinst(); | |
265 | hdr.peer_type = inst.name.type(); | |
11fdf7f2 | 266 | hdr.addr = xcon->get_messenger()->get_myaddr_legacy(); |
7c673cae FG |
267 | hdr.hdr->src.type = inst.name.type(); |
268 | hdr.hdr->src.num = inst.name.num(); | |
269 | hdr.msg_cnt = _ex_cnt+1; | |
270 | ||
271 | if (unlikely(_ex_cnt > 0)) { | |
272 | alloc_trailers(_ex_cnt); | |
273 | } | |
274 | } | |
275 | ||
276 | void print_debug(CephContext *cct, const char *tag) const override; | |
277 | size_t get_msg_count() const override { | |
278 | return hdr.msg_cnt; | |
279 | } | |
280 | ||
281 | void alloc_trailers(int cnt) { | |
282 | req_arr = static_cast<xio_msg_ex*>(malloc(cnt * sizeof(xio_msg_ex))); | |
283 | for (int ix = 0; ix < cnt; ++ix) { | |
284 | xio_msg_ex* xreq = &(req_arr[ix]); | |
285 | new (xreq) xio_msg_ex(this); | |
286 | } | |
287 | } | |
288 | ||
289 | Message *get_message() { return m; } | |
290 | ||
291 | ~XioMsg() | |
292 | { | |
293 | if (unlikely(!!req_arr)) { | |
294 | for (unsigned int ix = 0; ix < get_msg_count()-1; ++ix) { | |
295 | xio_msg_ex* xreq = &(req_arr[ix]); | |
296 | xreq->~xio_msg_ex(); | |
297 | } | |
298 | free(req_arr); | |
299 | } | |
300 | ||
301 | /* testing only! server's ready, resubmit request (not reached on | |
302 | * PASSIVE/server side) */ | |
303 | if (unlikely(m->get_magic() & MSG_MAGIC_REDUPE)) { | |
304 | if (likely(xcon->is_connected())) { | |
305 | xcon->send_message(m); | |
306 | } else { | |
307 | /* dispose it */ | |
308 | m->put(); | |
309 | } | |
310 | } else { | |
311 | /* the normal case: done with message */ | |
312 | m->put(); | |
313 | } | |
314 | } | |
315 | }; | |
316 | ||
317 | class XioDispatchHook : public Message::CompletionHook | |
318 | { | |
319 | private: | |
320 | XioConnection *xcon; | |
321 | XioInSeq msg_seq; | |
322 | XioPool rsp_pool; | |
31f18b77 | 323 | std::atomic<unsigned> nrefs { 1 }; |
7c673cae FG |
324 | bool cl_flag; |
325 | friend class XioConnection; | |
326 | friend class XioMessenger; | |
327 | public: | |
328 | struct xio_reg_mem mp_this; | |
329 | ||
330 | XioDispatchHook(XioConnection *_xcon, Message *_m, XioInSeq& _msg_seq, | |
331 | struct xio_reg_mem& _mp) : | |
332 | CompletionHook(_m), | |
333 | xcon(_xcon->get()), | |
334 | msg_seq(_msg_seq), | |
335 | rsp_pool(xio_msgr_noreg_mpool), | |
7c673cae FG |
336 | cl_flag(false), |
337 | mp_this(_mp) | |
338 | { | |
339 | ++xcon->n_reqs; // atomicity by portal thread | |
340 | xpool_inc_hookcnt(); | |
341 | } | |
342 | ||
343 | virtual void finish(int r) { | |
344 | this->put(); | |
345 | } | |
346 | ||
347 | virtual void complete(int r) { | |
348 | finish(r); | |
349 | } | |
350 | ||
351 | int release_msgs(); | |
352 | ||
353 | XioDispatchHook* get() { | |
31f18b77 | 354 | nrefs++; return this; |
7c673cae FG |
355 | } |
356 | ||
357 | void put(int n = 1) { | |
31f18b77 | 358 | int refs = nrefs -= n; |
7c673cae FG |
359 | if (refs == 0) { |
360 | /* in Marcus' new system, refs reaches 0 twice: once in | |
361 | * Message lifecycle, and again after xio_release_msg. | |
362 | */ | |
363 | if (!cl_flag && release_msgs()) | |
364 | return; | |
365 | struct xio_reg_mem *mp = &this->mp_this; | |
366 | this->~XioDispatchHook(); | |
367 | xpool_free(sizeof(XioDispatchHook), mp); | |
368 | } | |
369 | } | |
370 | ||
371 | XioInSeq& get_seq() { return msg_seq; } | |
372 | ||
373 | XioPool& get_pool() { return rsp_pool; } | |
374 | ||
375 | void on_err_finalize(XioConnection *xcon) { | |
376 | /* can't decode message; even with one-way must free | |
377 | * xio_msg structures, and then xiopool | |
378 | */ | |
379 | this->finish(-1); | |
380 | } | |
381 | ||
382 | ~XioDispatchHook() { | |
383 | --xcon->n_reqs; // atomicity by portal thread | |
384 | xpool_dec_hookcnt(); | |
385 | xcon->put(); | |
386 | } | |
387 | }; | |
388 | ||
389 | /* A sender-side CompletionHook that relies on the on_msg_delivered | |
390 | * to complete a pending mark down. */ | |
391 | class XioMarkDownHook : public Message::CompletionHook | |
392 | { | |
393 | private: | |
394 | XioConnection* xcon; | |
395 | ||
396 | public: | |
397 | struct xio_reg_mem mp_this; | |
398 | ||
399 | XioMarkDownHook( | |
400 | XioConnection* _xcon, Message *_m, struct xio_reg_mem& _mp) : | |
401 | CompletionHook(_m), xcon(_xcon->get()), mp_this(_mp) | |
402 | { } | |
403 | ||
404 | virtual void claim(int r) {} | |
405 | ||
406 | virtual void finish(int r) { | |
407 | xcon->put(); | |
408 | struct xio_reg_mem *mp = &this->mp_this; | |
409 | this->~XioMarkDownHook(); | |
410 | xio_mempool_free(mp); | |
411 | } | |
412 | ||
413 | virtual void complete(int r) { | |
414 | xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE); | |
415 | finish(r); | |
416 | } | |
417 | }; | |
418 | ||
419 | struct XioCompletion : public XioSubmit | |
420 | { | |
421 | XioDispatchHook *xhook; | |
422 | public: | |
423 | XioCompletion(XioConnection *_xcon, XioDispatchHook *_xhook) | |
424 | : XioSubmit(XioSubmit::INCOMING_MSG_RELEASE, _xcon /* not xcon! */), | |
425 | xhook(_xhook->get()) { | |
426 | // submit queue ref | |
427 | xcon->get(); | |
428 | }; | |
429 | ||
430 | struct xio_msg* dequeue() { | |
431 | return xhook->get_seq().dequeue(); | |
432 | } | |
433 | ||
434 | XioDispatchHook* get_xhook() { return xhook; } | |
435 | ||
436 | void finalize() { | |
437 | xcon->put(); | |
438 | xhook->put(); | |
439 | } | |
440 | }; | |
441 | ||
442 | void print_xio_msg_hdr(CephContext *cct, const char *tag, | |
443 | const XioMsgHdr &hdr, const struct xio_msg *msg); | |
444 | void print_ceph_msg(CephContext *cct, const char *tag, Message *m); | |
445 | ||
446 | #endif /* XIO_MSG_H */ |