]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/xio/XioMsg.h
import ceph 14.2.5
[ceph.git] / ceph / src / msg / xio / XioMsg.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Portions Copyright (C) 2013 CohortFS, LLC
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #ifndef XIO_MSG_H
17 #define XIO_MSG_H
18
19 #include <boost/intrusive/list.hpp>
20 #include "msg/SimplePolicyMessenger.h"
21 extern "C" {
22 #include "libxio.h"
23 }
24 #include "XioConnection.h"
25 #include "XioSubmit.h"
26 #include "msg/msg_types.h"
27 #include "XioPool.h"
28
29 namespace bi = boost::intrusive;
30
31 class XioMessenger;
32
33 class XioMsgCnt
34 {
35 public:
36 ceph_le32 msg_cnt;
37 buffer::list bl;
38 public:
39 explicit XioMsgCnt(buffer::ptr p)
40 {
41 bl.append(p);
42 buffer::list::iterator bl_iter = bl.begin();
43 decode(msg_cnt, bl_iter);
44 }
45 };
46
47 class XioMsgHdr
48 {
49 public:
50 char tag;
51 ceph_le32 msg_cnt;
52 ceph_le32 peer_type;
53 entity_addr_t addr; /* XXX hack! */
54 ceph_msg_header* hdr;
55 ceph_msg_footer* ftr;
56 uint64_t features;
57 buffer::list bl;
58 public:
59 XioMsgHdr(ceph_msg_header& _hdr, ceph_msg_footer& _ftr, uint64_t _features)
60 : tag(CEPH_MSGR_TAG_MSG), msg_cnt(init_le32(0)), hdr(&_hdr), ftr(&_ftr),
61 features(_features)
62 { }
63
64 XioMsgHdr(ceph_msg_header& _hdr, ceph_msg_footer &_ftr, buffer::ptr p)
65 : hdr(&_hdr), ftr(&_ftr)
66 {
67 bl.append(p);
68 buffer::list::iterator bl_iter = bl.begin();
69 decode(bl_iter);
70 }
71
72 static size_t get_max_encoded_length();
73
74 const buffer::list& get_bl() { encode(bl); return bl; };
75
76 inline void encode_hdr(ceph::buffer::list& bl) const {
77 using ceph::encode;
78 encode(tag, bl);
79 encode(msg_cnt, bl);
80 encode(peer_type, bl);
81 encode(addr, bl, features);
82 encode(hdr->seq, bl);
83 encode(hdr->tid, bl);
84 encode(hdr->type, bl);
85 encode(hdr->priority, bl);
86 encode(hdr->version, bl);
87 encode(hdr->front_len, bl);
88 encode(hdr->middle_len, bl);
89 encode(hdr->data_len, bl);
90 encode(hdr->data_off, bl);
91 encode(hdr->src.type, bl);
92 encode(hdr->src.num, bl);
93 encode(hdr->compat_version, bl);
94 encode(hdr->crc, bl);
95 }
96
97 inline void encode_ftr(buffer::list& bl) const {
98 using ceph::encode;
99 encode(ftr->front_crc, bl);
100 encode(ftr->middle_crc, bl);
101 encode(ftr->data_crc, bl);
102 encode(ftr->sig, bl);
103 encode(ftr->flags, bl);
104 }
105
106 inline void encode(buffer::list& bl) const {
107 encode_hdr(bl);
108 encode_ftr(bl);
109 }
110
111 inline void decode_hdr(buffer::list::iterator& bl) {
112 using ceph::decode;
113 decode(tag, bl);
114 decode(msg_cnt, bl);
115 decode(peer_type, bl);
116 decode(addr, bl);
117 decode(hdr->seq, bl);
118 decode(hdr->tid, bl);
119 decode(hdr->type, bl);
120 decode(hdr->priority, bl);
121 decode(hdr->version, bl);
122 decode(hdr->front_len, bl);
123 decode(hdr->middle_len, bl);
124 decode(hdr->data_len, bl);
125 decode(hdr->data_off, bl);
126 decode(hdr->src.type, bl);
127 decode(hdr->src.num, bl);
128 decode(hdr->compat_version, bl);
129 decode(hdr->crc, bl);
130 }
131
132 inline void decode_ftr(buffer::list::iterator& bl) {
133 using ceph::decode;
134 decode(ftr->front_crc, bl);
135 decode(ftr->middle_crc, bl);
136 decode(ftr->data_crc, bl);
137 decode(ftr->sig, bl);
138 decode(ftr->flags, bl);
139 }
140
141 inline void decode(buffer::list::iterator& bl) {
142 decode_hdr(bl);
143 decode_ftr(bl);
144 }
145
146 virtual ~XioMsgHdr()
147 {}
148 };
149
150 WRITE_CLASS_ENCODER(XioMsgHdr);
151
152 extern struct xio_mempool *xio_msgr_noreg_mpool;
153
154 #define XIO_MSGR_IOVLEN 16
155
156 struct xio_msg_ex
157 {
158 struct xio_msg msg;
159 struct xio_iovec_ex iovs[XIO_MSGR_IOVLEN];
160
161 explicit xio_msg_ex(void* user_context) {
162 // go in structure order
163 msg.in.header.iov_len = 0;
164 msg.in.header.iov_base = NULL; /* XXX Accelio requires this currently */
165
166 msg.in.sgl_type = XIO_SGL_TYPE_IOV_PTR;
167 msg.in.pdata_iov.max_nents = XIO_MSGR_IOVLEN;
168 msg.in.pdata_iov.nents = 0;
169 msg.in.pdata_iov.sglist = iovs;
170
171 // minimal zero "out" side
172 msg.out.header.iov_len = 0;
173 msg.out.header.iov_base = NULL; /* XXX Accelio requires this currently,
174 * against spec */
175 // out (some members adjusted later)
176 msg.out.sgl_type = XIO_SGL_TYPE_IOV_PTR;
177 msg.out.pdata_iov.max_nents = XIO_MSGR_IOVLEN;
178 msg.out.pdata_iov.nents = 0;
179 msg.out.pdata_iov.sglist = iovs;
180
181 // minimal initialize an "out" msg
182 msg.request = NULL;
183 msg.type = XIO_MSG_TYPE_ONE_WAY;
184 // for now, we DO NEED receipts for every msg
185 msg.flags = 0;
186 msg.user_context = user_context;
187 msg.next = NULL;
188 // minimal zero "in" side
189 }
190 };
191
192 class XioSend : public XioSubmit
193 {
194 public:
195 virtual void print_debug(CephContext *cct, const char *tag) const {};
196 const struct xio_msg * get_xio_msg() const {return &req_0.msg;}
197 struct xio_msg * get_xio_msg() {return &req_0.msg;}
198 virtual size_t get_msg_count() const {return 1;}
199
200 XioSend(XioConnection *_xcon, struct xio_reg_mem& _mp, int _ex_cnt=0) :
201 XioSubmit(XioSubmit::OUTGOING_MSG, _xcon),
202 req_0(this), mp_this(_mp), nrefs(_ex_cnt+1)
203 {
204 xpool_inc_msgcnt();
205 xcon->get();
206 }
207
208 XioSend* get() { nrefs++; return this; };
209
210 void put(int n) {
211 int refs = nrefs -= n;
212 if (refs == 0) {
213 struct xio_reg_mem *mp = &this->mp_this;
214 this->~XioSend();
215 xpool_free(sizeof(XioSend), mp);
216 }
217 }
218
219 void put() {
220 put(1);
221 }
222
223 void put_msg_refs() {
224 put(get_msg_count());
225 }
226
227 virtual ~XioSend() {
228 xpool_dec_msgcnt();
229 xcon->put();
230 }
231
232 private:
233 xio_msg_ex req_0;
234 struct xio_reg_mem mp_this;
235 std::atomic<unsigned> nrefs = { 0 };
236 };
237
238 class XioCommand : public XioSend
239 {
240 public:
241 XioCommand(XioConnection *_xcon, struct xio_reg_mem& _mp):XioSend(_xcon, _mp) {
242 }
243
244 buffer::list& get_bl_ref() { return bl; };
245
246 private:
247 buffer::list bl;
248 };
249
250 struct XioMsg : public XioSend
251 {
252 public:
253 Message* m;
254 XioMsgHdr hdr;
255 xio_msg_ex* req_arr;
256
257 public:
258 XioMsg(Message *_m, XioConnection *_xcon, struct xio_reg_mem& _mp,
259 int _ex_cnt, uint64_t _features) :
260 XioSend(_xcon, _mp, _ex_cnt),
261 m(_m), hdr(m->get_header(), m->get_footer(), _features),
262 req_arr(NULL)
263 {
264 const entity_inst_t &inst = xcon->get_messenger()->get_myinst();
265 hdr.peer_type = inst.name.type();
266 hdr.addr = xcon->get_messenger()->get_myaddr_legacy();
267 hdr.hdr->src.type = inst.name.type();
268 hdr.hdr->src.num = inst.name.num();
269 hdr.msg_cnt = _ex_cnt+1;
270
271 if (unlikely(_ex_cnt > 0)) {
272 alloc_trailers(_ex_cnt);
273 }
274 }
275
276 void print_debug(CephContext *cct, const char *tag) const override;
277 size_t get_msg_count() const override {
278 return hdr.msg_cnt;
279 }
280
281 void alloc_trailers(int cnt) {
282 req_arr = static_cast<xio_msg_ex*>(malloc(cnt * sizeof(xio_msg_ex)));
283 for (int ix = 0; ix < cnt; ++ix) {
284 xio_msg_ex* xreq = &(req_arr[ix]);
285 new (xreq) xio_msg_ex(this);
286 }
287 }
288
289 Message *get_message() { return m; }
290
291 ~XioMsg()
292 {
293 if (unlikely(!!req_arr)) {
294 for (unsigned int ix = 0; ix < get_msg_count()-1; ++ix) {
295 xio_msg_ex* xreq = &(req_arr[ix]);
296 xreq->~xio_msg_ex();
297 }
298 free(req_arr);
299 }
300
301 /* testing only! server's ready, resubmit request (not reached on
302 * PASSIVE/server side) */
303 if (unlikely(m->get_magic() & MSG_MAGIC_REDUPE)) {
304 if (likely(xcon->is_connected())) {
305 xcon->send_message(m);
306 } else {
307 /* dispose it */
308 m->put();
309 }
310 } else {
311 /* the normal case: done with message */
312 m->put();
313 }
314 }
315 };
316
317 class XioDispatchHook : public Message::CompletionHook
318 {
319 private:
320 XioConnection *xcon;
321 XioInSeq msg_seq;
322 XioPool rsp_pool;
323 std::atomic<unsigned> nrefs { 1 };
324 bool cl_flag;
325 friend class XioConnection;
326 friend class XioMessenger;
327 public:
328 struct xio_reg_mem mp_this;
329
330 XioDispatchHook(XioConnection *_xcon, Message *_m, XioInSeq& _msg_seq,
331 struct xio_reg_mem& _mp) :
332 CompletionHook(_m),
333 xcon(_xcon->get()),
334 msg_seq(_msg_seq),
335 rsp_pool(xio_msgr_noreg_mpool),
336 cl_flag(false),
337 mp_this(_mp)
338 {
339 ++xcon->n_reqs; // atomicity by portal thread
340 xpool_inc_hookcnt();
341 }
342
343 virtual void finish(int r) {
344 this->put();
345 }
346
347 virtual void complete(int r) {
348 finish(r);
349 }
350
351 int release_msgs();
352
353 XioDispatchHook* get() {
354 nrefs++; return this;
355 }
356
357 void put(int n = 1) {
358 int refs = nrefs -= n;
359 if (refs == 0) {
360 /* in Marcus' new system, refs reaches 0 twice: once in
361 * Message lifecycle, and again after xio_release_msg.
362 */
363 if (!cl_flag && release_msgs())
364 return;
365 struct xio_reg_mem *mp = &this->mp_this;
366 this->~XioDispatchHook();
367 xpool_free(sizeof(XioDispatchHook), mp);
368 }
369 }
370
371 XioInSeq& get_seq() { return msg_seq; }
372
373 XioPool& get_pool() { return rsp_pool; }
374
375 void on_err_finalize(XioConnection *xcon) {
376 /* can't decode message; even with one-way must free
377 * xio_msg structures, and then xiopool
378 */
379 this->finish(-1);
380 }
381
382 ~XioDispatchHook() {
383 --xcon->n_reqs; // atomicity by portal thread
384 xpool_dec_hookcnt();
385 xcon->put();
386 }
387 };
388
389 /* A sender-side CompletionHook that relies on the on_msg_delivered
390 * to complete a pending mark down. */
391 class XioMarkDownHook : public Message::CompletionHook
392 {
393 private:
394 XioConnection* xcon;
395
396 public:
397 struct xio_reg_mem mp_this;
398
399 XioMarkDownHook(
400 XioConnection* _xcon, Message *_m, struct xio_reg_mem& _mp) :
401 CompletionHook(_m), xcon(_xcon->get()), mp_this(_mp)
402 { }
403
404 virtual void claim(int r) {}
405
406 virtual void finish(int r) {
407 xcon->put();
408 struct xio_reg_mem *mp = &this->mp_this;
409 this->~XioMarkDownHook();
410 xio_mempool_free(mp);
411 }
412
413 virtual void complete(int r) {
414 xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE);
415 finish(r);
416 }
417 };
418
419 struct XioCompletion : public XioSubmit
420 {
421 XioDispatchHook *xhook;
422 public:
423 XioCompletion(XioConnection *_xcon, XioDispatchHook *_xhook)
424 : XioSubmit(XioSubmit::INCOMING_MSG_RELEASE, _xcon /* not xcon! */),
425 xhook(_xhook->get()) {
426 // submit queue ref
427 xcon->get();
428 };
429
430 struct xio_msg* dequeue() {
431 return xhook->get_seq().dequeue();
432 }
433
434 XioDispatchHook* get_xhook() { return xhook; }
435
436 void finalize() {
437 xcon->put();
438 xhook->put();
439 }
440 };
441
442 void print_xio_msg_hdr(CephContext *cct, const char *tag,
443 const XioMsgHdr &hdr, const struct xio_msg *msg);
444 void print_ceph_msg(CephContext *cct, const char *tag, Message *m);
445
446 #endif /* XIO_MSG_H */