1 // SPDX-License-Identifier: GPL-2.0-or-later
3 * March 6 2023, Christian Hopps <chopps@labn.net>
5 * Copyright (C) 2021 Vmware, Inc.
6 * Pushpasis Sarkar <spushpasis@vmware.com>
7 * Copyright (c) 2023, LabN Consulting, L.L.C.
18 #define MGMT_MSG_DBG(dbgtag, fmt, ...) \
21 zlog_debug("%s: %s: " fmt, dbgtag, __func__, \
25 #define MGMT_MSG_ERR(ms, fmt, ...) \
26 zlog_err("%s: %s: " fmt, (ms)->idtag, __func__, ##__VA_ARGS__)
28 DEFINE_MTYPE(LIB
, MSG_CONN
, "msg connection state");
31 * Read data from a socket into streams containing 1 or more full msgs headed by
32 * mgmt_msg_hdr which contain API messages (currently protobuf).
35 * ms: mgmt_msg_state for this process.
36 * fd: socket/file to read data from.
37 * debug: true to enable debug logging.
40 * MPP_DISCONNECT - socket should be closed and connect retried.
41 * MSV_SCHED_STREAM - this call should be rescheduled to run.
42 * MPP_SCHED_BOTH - this call and the procmsg buf should be scheduled to
45 enum mgmt_msg_rsched
mgmt_msg_read(struct mgmt_msg_state
*ms
, int fd
,
48 const char *dbgtag
= debug
? ms
->idtag
: NULL
;
49 size_t avail
= STREAM_WRITEABLE(ms
->ins
);
50 struct mgmt_msg_hdr
*mhdr
= NULL
;
55 assert(ms
&& fd
!= -1);
58 * Read as much as we can into the stream.
60 while (avail
> sizeof(struct mgmt_msg_hdr
)) {
61 n
= stream_read_try(ms
->ins
, fd
, avail
);
62 MGMT_MSG_DBG(dbgtag
, "got %zd bytes", n
);
64 /* -2 is normal nothing read, and to retry */
69 MGMT_MSG_ERR(ms
, "got EOF/disconnect");
72 "got error while reading: '%s'",
73 safe_strerror(errno
));
74 return MSR_DISCONNECT
;
81 * Check if we have read a complete messages or not.
83 assert(stream_get_getp(ms
->ins
) == 0);
84 left
= stream_get_endp(ms
->ins
);
85 while (left
> (long)sizeof(struct mgmt_msg_hdr
)) {
86 mhdr
= (struct mgmt_msg_hdr
*)(STREAM_DATA(ms
->ins
) + total
);
87 if (!MGMT_MSG_IS_MARKER(mhdr
->marker
)) {
88 MGMT_MSG_DBG(dbgtag
, "recv corrupt buffer, disconnect");
89 return MSR_DISCONNECT
;
91 if ((ssize_t
)mhdr
->len
> left
)
94 MGMT_MSG_DBG(dbgtag
, "read full message len %u", mhdr
->len
);
101 return MSR_SCHED_STREAM
;
104 * We have read at least one message into the stream, queue it up.
106 mhdr
= (struct mgmt_msg_hdr
*)(STREAM_DATA(ms
->ins
) + total
);
107 stream_set_endp(ms
->ins
, total
);
108 stream_fifo_push(&ms
->inq
, ms
->ins
);
109 ms
->ins
= stream_new(ms
->max_msg_sz
);
111 stream_put(ms
->ins
, mhdr
, left
);
112 stream_set_endp(ms
->ins
, left
);
115 return MSR_SCHED_BOTH
;
119 * Process streams containing whole messages that have been pushed onto the
120 * FIFO. This should be called from an event/timer handler and should be
124 * ms: mgmt_msg_state for this process.
125 * handle_mgs: function to call for each received message.
126 * user: opaque value passed through to handle_msg.
127 * debug: true to enable debug logging.
130 * true if more to process (so reschedule) else false
132 bool mgmt_msg_procbufs(struct mgmt_msg_state
*ms
,
133 void (*handle_msg
)(uint8_t version
, uint8_t *msg
,
134 size_t msglen
, void *user
),
135 void *user
, bool debug
)
137 const char *dbgtag
= debug
? ms
->idtag
: NULL
;
138 struct mgmt_msg_hdr
*mhdr
;
143 MGMT_MSG_DBG(dbgtag
, "Have %zu streams to process", ms
->inq
.count
);
146 while (nproc
< ms
->max_read_buf
) {
147 work
= stream_fifo_pop(&ms
->inq
);
151 data
= STREAM_DATA(work
);
152 left
= stream_get_endp(work
);
153 MGMT_MSG_DBG(dbgtag
, "Processing stream of len %zu", left
);
155 for (; left
> sizeof(struct mgmt_msg_hdr
);
156 left
-= mhdr
->len
, data
+= mhdr
->len
) {
157 mhdr
= (struct mgmt_msg_hdr
*)data
;
159 assert(MGMT_MSG_IS_MARKER(mhdr
->marker
));
160 assert(left
>= mhdr
->len
);
162 handle_msg(MGMT_MSG_MARKER_VERSION(mhdr
->marker
),
163 (uint8_t *)(mhdr
+ 1),
164 mhdr
->len
- sizeof(struct mgmt_msg_hdr
),
171 stream_free(work
); /* Free it up */
173 stream_reset(work
); /* Reset stream for next read */
176 /* return true if should reschedule b/c more to process. */
177 return stream_fifo_head(&ms
->inq
) != NULL
;
181 * Write data from a onto the socket, using streams that have been queued for
182 * sending by mgmt_msg_send_msg. This function should be reschedulable.
185 * ms: mgmt_msg_state for this process.
186 * fd: socket/file to read data from.
187 * debug: true to enable debug logging.
190 * MSW_SCHED_NONE - do not reschedule anything.
191 * MSW_SCHED_STREAM - this call should be rescheduled to run again.
192 * MSW_SCHED_WRITES_OFF - writes should be disabled with a timer to
193 * re-enable them a short time later
194 * MSW_DISCONNECT - socket should be closed and reconnect retried.
197 enum mgmt_msg_wsched
mgmt_msg_write(struct mgmt_msg_state
*ms
, int fd
,
200 const char *dbgtag
= debug
? ms
->idtag
: NULL
;
208 "found unqueued stream with %zu bytes, queueing",
209 stream_get_endp(ms
->outs
));
210 stream_fifo_push(&ms
->outq
, ms
->outs
);
214 for (s
= stream_fifo_head(&ms
->outq
); s
&& nproc
< ms
->max_write_buf
;
215 s
= stream_fifo_head(&ms
->outq
)) {
216 left
= STREAM_READABLE(s
);
219 n
= stream_flush(s
, fd
);
223 "connection closed while writing");
224 else if (ERRNO_IO_RETRY(errno
)) {
227 "retry error while writing %zd bytes: %s (%d)",
228 left
, safe_strerror(errno
), errno
);
229 return MSW_SCHED_STREAM
;
233 "error while writing %zd bytes: %s (%d)",
234 left
, safe_strerror(errno
), errno
);
236 n
= mgmt_msg_reset_writes(ms
);
237 MGMT_MSG_DBG(dbgtag
, "drop and freed %zd streams", n
);
239 return MSW_DISCONNECT
;
244 MGMT_MSG_DBG(dbgtag
, "short stream write %zd of %zd", n
,
246 stream_forward_getp(s
, n
);
247 return MSW_SCHED_STREAM
;
250 stream_free(stream_fifo_pop(&ms
->outq
));
251 MGMT_MSG_DBG(dbgtag
, "wrote stream of %zd bytes", n
);
257 "reached %zu buffer writes, pausing with %zu streams left",
258 ms
->max_write_buf
, ms
->outq
.count
);
259 return MSW_SCHED_STREAM
;
261 MGMT_MSG_DBG(dbgtag
, "flushed all streams from output q");
262 return MSW_SCHED_NONE
;
267 * Send a message by enqueueing it to be written over the socket by
271 * ms: mgmt_msg_state for this process.
272 * version: version of this message, will be given to receiving side.
273 * msg: the message to be sent.
274 * len: the length of the message.
275 * packf: a function to pack the message.
276 * debug: true to enable debug logging.
279 * 0 on success, otherwise -1 on failure. The only failure mode is if a
280 * the message exceeds the maximum message size configured on init.
282 int mgmt_msg_send_msg(struct mgmt_msg_state
*ms
, uint8_t version
, void *msg
,
283 size_t len
, size_t (*packf
)(void *msg
, void *buf
),
286 const char *dbgtag
= debug
? ms
->idtag
: NULL
;
287 struct mgmt_msg_hdr
*mhdr
;
291 size_t mlen
= len
+ sizeof(*mhdr
);
293 if (mlen
> ms
->max_msg_sz
) {
294 MGMT_MSG_ERR(ms
, "Message %zu > max size %zu, dropping", mlen
,
300 MGMT_MSG_DBG(dbgtag
, "creating new stream for msg len %zu",
302 ms
->outs
= stream_new(ms
->max_msg_sz
);
303 } else if (STREAM_WRITEABLE(ms
->outs
) < mlen
) {
306 "enq existing stream len %zu and creating new stream for msg len %zu",
307 STREAM_WRITEABLE(ms
->outs
), mlen
);
308 stream_fifo_push(&ms
->outq
, ms
->outs
);
309 ms
->outs
= stream_new(ms
->max_msg_sz
);
313 "using existing stream with avail %zu for msg len %zu",
314 STREAM_WRITEABLE(ms
->outs
), mlen
);
318 /* We have a stream with space, pack the message into it. */
319 mhdr
= (struct mgmt_msg_hdr
*)(STREAM_DATA(s
) + s
->endp
);
320 mhdr
->marker
= MGMT_MSG_MARKER(version
);
322 stream_forward_endp(s
, sizeof(*mhdr
));
323 endp
= stream_get_endp(s
);
324 dstbuf
= STREAM_DATA(s
) + endp
;
326 n
= packf(msg
, dstbuf
);
328 memcpy(dstbuf
, msg
, len
);
331 stream_set_endp(s
, endp
+ n
);
338 * Create and open a unix domain stream socket on the given path
339 * setting non-blocking and send and receive buffer sizes.
342 * path: path of unix domain socket to connect to.
343 * sendbuf: size of socket send buffer.
344 * recvbuf: size of socket receive buffer.
345 * dbgtag: if non-NULL enable log debug, and use this tag.
348 * socket fd or -1 on error.
350 int mgmt_msg_connect(const char *path
, size_t sendbuf
, size_t recvbuf
,
354 struct sockaddr_un addr
;
356 MGMT_MSG_DBG(dbgtag
, "connecting to server on %s", path
);
357 sock
= socket(AF_UNIX
, SOCK_STREAM
, 0);
359 MGMT_MSG_DBG(dbgtag
, "socket failed: %s", safe_strerror(errno
));
363 memset(&addr
, 0, sizeof(struct sockaddr_un
));
364 addr
.sun_family
= AF_UNIX
;
365 strlcpy(addr
.sun_path
, path
, sizeof(addr
.sun_path
));
366 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
367 len
= addr
.sun_len
= SUN_LEN(&addr
);
369 len
= sizeof(addr
.sun_family
) + strlen(addr
.sun_path
);
370 #endif /* HAVE_STRUCT_SOCKADDR_UN_SUN_LEN */
371 ret
= connect(sock
, (struct sockaddr
*)&addr
, len
);
373 MGMT_MSG_DBG(dbgtag
, "failed to connect on %s: %s", path
,
374 safe_strerror(errno
));
379 MGMT_MSG_DBG(dbgtag
, "connected to server on %s", path
);
380 set_nonblocking(sock
);
381 setsockopt_so_sendbuf(sock
, sendbuf
);
382 setsockopt_so_recvbuf(sock
, recvbuf
);
387 * Reset the sending queue, by dequeueing all streams and freeing them. Return
388 * the number of streams freed.
391 * ms: mgmt_msg_state for this process.
394 * Number of streams that were freed.
397 size_t mgmt_msg_reset_writes(struct mgmt_msg_state
*ms
)
402 for (s
= stream_fifo_pop(&ms
->outq
); s
;
403 s
= stream_fifo_pop(&ms
->outq
), nproc
++)
410 void mgmt_msg_init(struct mgmt_msg_state
*ms
, size_t max_read_buf
,
411 size_t max_write_buf
, size_t max_msg_sz
, const char *idtag
)
413 memset(ms
, 0, sizeof(*ms
));
414 ms
->ins
= stream_new(max_msg_sz
);
415 stream_fifo_init(&ms
->inq
);
416 stream_fifo_init(&ms
->outq
);
417 ms
->max_read_buf
= max_write_buf
;
418 ms
->max_write_buf
= max_read_buf
;
419 ms
->max_msg_sz
= max_msg_sz
;
420 ms
->idtag
= strdup(idtag
);
423 void mgmt_msg_destroy(struct mgmt_msg_state
*ms
)
425 mgmt_msg_reset_writes(ms
);
427 stream_free(ms
->ins
);
435 #define MSG_CONN_DEFAULT_CONN_RETRY_MSEC 250
436 #define MSG_CONN_SEND_BUF_SIZE (1u << 16)
437 #define MSG_CONN_RECV_BUF_SIZE (1u << 16)
439 static void msg_client_sched_connect(struct msg_client
*client
,
442 static void msg_conn_sched_proc_msgs(struct msg_conn
*conn
);
443 static void msg_conn_sched_read(struct msg_conn
*conn
);
444 static void msg_conn_sched_write(struct msg_conn
*conn
);
446 static void msg_conn_write(struct event
*thread
)
448 struct msg_conn
*conn
= EVENT_ARG(thread
);
449 enum mgmt_msg_wsched rv
;
451 rv
= mgmt_msg_write(&conn
->mstate
, conn
->fd
, conn
->debug
);
452 if (rv
== MSW_SCHED_STREAM
)
453 msg_conn_sched_write(conn
);
454 else if (rv
== MSW_DISCONNECT
)
455 msg_conn_disconnect(conn
, conn
->is_client
);
457 assert(rv
== MSW_SCHED_NONE
);
460 static void msg_conn_read(struct event
*thread
)
462 struct msg_conn
*conn
= EVENT_ARG(thread
);
463 enum mgmt_msg_rsched rv
;
465 rv
= mgmt_msg_read(&conn
->mstate
, conn
->fd
, conn
->debug
);
466 if (rv
== MSR_DISCONNECT
) {
467 msg_conn_disconnect(conn
, conn
->is_client
);
470 if (rv
== MSR_SCHED_BOTH
)
471 msg_conn_sched_proc_msgs(conn
);
472 msg_conn_sched_read(conn
);
475 /* collapse this into mgmt_msg_procbufs */
476 static void msg_conn_proc_msgs(struct event
*thread
)
478 struct msg_conn
*conn
= EVENT_ARG(thread
);
480 if (mgmt_msg_procbufs(&conn
->mstate
,
481 (void (*)(uint8_t, uint8_t *, size_t,
482 void *))conn
->handle_msg
,
484 /* there's more, schedule handling more */
485 msg_conn_sched_proc_msgs(conn
);
488 static void msg_conn_sched_read(struct msg_conn
*conn
)
490 event_add_read(conn
->loop
, msg_conn_read
, conn
, conn
->fd
,
494 static void msg_conn_sched_write(struct msg_conn
*conn
)
496 event_add_write(conn
->loop
, msg_conn_write
, conn
, conn
->fd
,
500 static void msg_conn_sched_proc_msgs(struct msg_conn
*conn
)
502 event_add_event(conn
->loop
, msg_conn_proc_msgs
, conn
, 0,
507 void msg_conn_disconnect(struct msg_conn
*conn
, bool reconnect
)
510 if (conn
->fd
!= -1) {
514 /* Notify client through registered callback (if any) */
515 if (conn
->notify_disconnect
)
516 (void)(*conn
->notify_disconnect
)(conn
);
520 assert(conn
->is_client
);
521 msg_client_sched_connect(
522 container_of(conn
, struct msg_client
, conn
),
523 MSG_CONN_DEFAULT_CONN_RETRY_MSEC
);
527 int msg_conn_send_msg(struct msg_conn
*conn
, uint8_t version
, void *msg
,
528 size_t mlen
, size_t (*packf
)(void *, void *))
530 if (conn
->fd
== -1) {
531 MGMT_MSG_ERR(&conn
->mstate
,
532 "can't send message on closed connection");
536 int rv
= mgmt_msg_send_msg(&conn
->mstate
, version
, msg
, mlen
, packf
,
539 msg_conn_sched_write(conn
);
544 void msg_conn_cleanup(struct msg_conn
*conn
)
546 struct mgmt_msg_state
*ms
= &conn
->mstate
;
548 if (conn
->fd
!= -1) {
553 EVENT_OFF(conn
->read_ev
);
554 EVENT_OFF(conn
->write_ev
);
555 EVENT_OFF(conn
->proc_msg_ev
);
557 mgmt_msg_destroy(ms
);
564 static void msg_client_connect(struct msg_client
*conn
);
566 static void msg_client_connect_timer(struct event
*thread
)
568 msg_client_connect(EVENT_ARG(thread
));
571 static void msg_client_sched_connect(struct msg_client
*client
,
574 struct msg_conn
*conn
= &client
->conn
;
575 const char *dbgtag
= conn
->debug
? conn
->mstate
.idtag
: NULL
;
577 MGMT_MSG_DBG(dbgtag
, "connection retry in %lu msec", msec
);
579 event_add_timer_msec(conn
->loop
, msg_client_connect_timer
,
580 client
, msec
, &client
->conn_retry_tmr
);
582 event_add_event(conn
->loop
, msg_client_connect_timer
, client
, 0,
583 &client
->conn_retry_tmr
);
587 /* Connect and start reading from the socket */
588 static void msg_client_connect(struct msg_client
*client
)
590 struct msg_conn
*conn
= &client
->conn
;
591 const char *dbgtag
= conn
->debug
? conn
->mstate
.idtag
: NULL
;
593 conn
->fd
= mgmt_msg_connect(client
->sopath
, MSG_CONN_SEND_BUF_SIZE
,
594 MSG_CONN_RECV_BUF_SIZE
, dbgtag
);
597 /* retry the connection */
598 msg_client_sched_connect(client
,
599 MSG_CONN_DEFAULT_CONN_RETRY_MSEC
);
600 else if (client
->notify_connect
&& client
->notify_connect(client
))
601 /* client connect notify failed */
602 msg_conn_disconnect(conn
, true);
605 msg_conn_sched_read(conn
);
608 void msg_client_init(struct msg_client
*client
, struct event_loop
*tm
,
610 int (*notify_connect
)(struct msg_client
*client
),
611 int (*notify_disconnect
)(struct msg_conn
*client
),
612 void (*handle_msg
)(uint8_t version
, uint8_t *data
,
613 size_t len
, struct msg_conn
*client
),
614 size_t max_read_buf
, size_t max_write_buf
,
615 size_t max_msg_sz
, const char *idtag
, bool debug
)
617 struct msg_conn
*conn
= &client
->conn
;
618 memset(client
, 0, sizeof(*client
));
622 conn
->handle_msg
= handle_msg
;
623 conn
->notify_disconnect
= notify_disconnect
;
624 conn
->is_client
= true;
626 client
->sopath
= strdup(sopath
);
627 client
->notify_connect
= notify_connect
;
629 mgmt_msg_init(&conn
->mstate
, max_read_buf
, max_write_buf
, max_msg_sz
,
632 /* XXX maybe just have client kick this off */
633 /* Start trying to connect to server */
634 msg_client_sched_connect(client
, 0);
637 void msg_client_cleanup(struct msg_client
*client
)
639 assert(client
->conn
.is_client
);
641 EVENT_OFF(client
->conn_retry_tmr
);
642 free(client
->sopath
);
644 msg_conn_cleanup(&client
->conn
);
649 * Server-side connections
652 static void msg_server_accept(struct event
*event
)
654 struct msg_server
*server
= EVENT_ARG(event
);
661 /* We continue hearing server listen socket. */
662 event_add_read(server
->loop
, msg_server_accept
, server
, server
->fd
,
665 memset(&su
, 0, sizeof(union sockunion
));
667 /* We can handle IPv4 or IPv6 socket. */
668 fd
= sockunion_accept(server
->fd
, &su
);
670 zlog_err("Failed to accept %s client connection: %s",
671 server
->idtag
, safe_strerror(errno
));
677 DEBUGD(server
->debug
, "Accepted new %s connection", server
->idtag
);
679 server
->create(fd
, &su
);
682 int msg_server_init(struct msg_server
*server
, const char *sopath
,
683 struct event_loop
*loop
,
684 struct msg_conn
*(*create
)(int fd
, union sockunion
*su
),
685 const char *idtag
, struct debug
*debug
)
689 struct sockaddr_un addr
;
692 memset(server
, 0, sizeof(*server
));
695 sock
= socket(AF_UNIX
, SOCK_STREAM
, PF_UNSPEC
);
697 zlog_err("Failed to create %s server socket: %s", server
->idtag
,
698 safe_strerror(errno
));
702 addr
.sun_family
= AF_UNIX
,
703 strlcpy(addr
.sun_path
, sopath
, sizeof(addr
.sun_path
));
704 unlink(addr
.sun_path
);
705 old_mask
= umask(0077);
706 ret
= bind(sock
, (struct sockaddr
*)&addr
, sizeof(addr
));
708 zlog_err("Failed to bind %s server socket to '%s': %s",
709 server
->idtag
, addr
.sun_path
, safe_strerror(errno
));
715 ret
= listen(sock
, MGMTD_MAX_CONN
);
717 zlog_err("Failed to listen on %s server socket: %s",
718 server
->idtag
, safe_strerror(errno
));
724 server
->sopath
= strdup(sopath
);
725 server
->idtag
= strdup(idtag
);
726 server
->create
= create
;
727 server
->debug
= debug
;
729 event_add_read(server
->loop
, msg_server_accept
, server
, server
->fd
,
733 DEBUGD(debug
, "Started %s server, listening on %s", idtag
, sopath
);
743 void msg_server_cleanup(struct msg_server
*server
)
745 DEBUGD(server
->debug
, "Closing %s server", server
->idtag
);
747 if (server
->listen_ev
)
748 EVENT_OFF(server
->listen_ev
);
751 free((char *)server
->sopath
);
752 free((char *)server
->idtag
);
754 memset(server
, 0, sizeof(*server
));
759 * Initialize and start reading from the accepted socket
761 * notify_connect - only called for disconnect i.e., connected == false
763 void msg_conn_accept_init(struct msg_conn
*conn
, struct event_loop
*tm
, int fd
,
764 int (*notify_disconnect
)(struct msg_conn
*conn
),
765 void (*handle_msg
)(uint8_t version
, uint8_t *data
,
766 size_t len
, struct msg_conn
*conn
),
767 size_t max_read
, size_t max_write
, size_t max_size
,
772 conn
->notify_disconnect
= notify_disconnect
;
773 conn
->handle_msg
= handle_msg
;
774 conn
->is_client
= false;
776 mgmt_msg_init(&conn
->mstate
, max_read
, max_write
, max_size
, idtag
);
779 msg_conn_sched_read(conn
);
781 /* Make socket non-blocking. */
782 set_nonblocking(conn
->fd
);
783 setsockopt_so_sendbuf(conn
->fd
, MSG_CONN_SEND_BUF_SIZE
);
784 setsockopt_so_recvbuf(conn
->fd
, MSG_CONN_RECV_BUF_SIZE
);
788 msg_server_conn_create(struct event_loop
*tm
, int fd
,
789 int (*notify_disconnect
)(struct msg_conn
*conn
),
790 void (*handle_msg
)(uint8_t version
, uint8_t *data
,
791 size_t len
, struct msg_conn
*conn
),
792 size_t max_read
, size_t max_write
, size_t max_size
,
793 void *user
, const char *idtag
)
795 struct msg_conn
*conn
= XMALLOC(MTYPE_MSG_CONN
, sizeof(*conn
));
796 memset(conn
, 0, sizeof(*conn
));
797 msg_conn_accept_init(conn
, tm
, fd
, notify_disconnect
, handle_msg
,
798 max_read
, max_write
, max_size
, idtag
);
803 void msg_server_conn_delete(struct msg_conn
*conn
)
807 msg_conn_cleanup(conn
);
808 XFREE(MTYPE_MSG_CONN
, conn
);