]>
git.proxmox.com Git - mirror_frr.git/blob - lib/mgmt_msg.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
3 * March 6 2023, Christian Hopps <chopps@labn.net>
5 * Copyright (C) 2021 Vmware, Inc.
6 * Pushpasis Sarkar <spushpasis@vmware.com>
7 * Copyright (c) 2023, LabN Consulting, L.L.C.
17 #define MGMT_MSG_DBG(dbgtag, fmt, ...) \
20 zlog_debug("%s: %s: " fmt, dbgtag, __func__, \
24 #define MGMT_MSG_ERR(ms, fmt, ...) \
25 zlog_err("%s: %s: " fmt, ms->idtag, __func__, ##__VA_ARGS__)
28 * Read data from a socket into streams containing 1 or more full msgs headed by
29 * mgmt_msg_hdr which contain API messages (currently protobuf).
32 * ms: mgmt_msg_state for this process.
33 * fd: socket/file to read data from.
34 * debug: true to enable debug logging.
37 * MPP_DISCONNECT - socket should be closed and connect retried.
38 * MSV_SCHED_STREAM - this call should be rescheduled to run.
39 * MPP_SCHED_BOTH - this call and the procmsg buf should be scheduled to
42 enum mgmt_msg_rsched
mgmt_msg_read(struct mgmt_msg_state
*ms
, int fd
,
45 const char *dbgtag
= debug
? ms
->idtag
: NULL
;
46 size_t avail
= STREAM_WRITEABLE(ms
->ins
);
47 struct mgmt_msg_hdr
*mhdr
= NULL
;
52 assert(ms
&& fd
!= -1);
55 * Read as much as we can into the stream.
57 while (avail
> sizeof(struct mgmt_msg_hdr
)) {
58 n
= stream_read_try(ms
->ins
, fd
, avail
);
59 MGMT_MSG_DBG(dbgtag
, "got %zd bytes", n
);
61 /* -2 is normal nothing read, and to retry */
66 MGMT_MSG_ERR(ms
, "got EOF/disconnect");
69 "got error while reading: '%s'",
70 safe_strerror(errno
));
71 return MSR_DISCONNECT
;
78 * Check if we have read a complete messages or not.
80 assert(stream_get_getp(ms
->ins
) == 0);
81 left
= stream_get_endp(ms
->ins
);
82 while (left
> (long)sizeof(struct mgmt_msg_hdr
)) {
83 mhdr
= (struct mgmt_msg_hdr
*)(STREAM_DATA(ms
->ins
) + total
);
84 if (mhdr
->marker
!= MGMT_MSG_MARKER
) {
85 MGMT_MSG_DBG(dbgtag
, "recv corrupt buffer, disconnect");
86 return MSR_DISCONNECT
;
88 if ((ssize_t
)mhdr
->len
> left
)
91 MGMT_MSG_DBG(dbgtag
, "read full message len %u", mhdr
->len
);
98 return MSR_SCHED_STREAM
;
101 * We have read at least one message into the stream, queue it up.
103 mhdr
= (struct mgmt_msg_hdr
*)(STREAM_DATA(ms
->ins
) + total
);
104 stream_set_endp(ms
->ins
, total
);
105 stream_fifo_push(&ms
->inq
, ms
->ins
);
106 ms
->ins
= stream_new(ms
->max_msg_sz
);
108 stream_put(ms
->ins
, mhdr
, left
);
109 stream_set_endp(ms
->ins
, left
);
112 return MSR_SCHED_BOTH
;
116 * Process streams containing whole messages that have been pushed onto the
117 * FIFO. This should be called from an event/timer handler and should be
121 * ms: mgmt_msg_state for this process.
122 * handle_mgs: function to call for each received message.
123 * user: opaque value passed through to handle_msg.
124 * debug: true to enable debug logging.
127 * true if more to process (so reschedule) else false
129 bool mgmt_msg_procbufs(struct mgmt_msg_state
*ms
,
130 void (*handle_msg
)(void *user
, uint8_t *msg
,
132 void *user
, bool debug
)
134 const char *dbgtag
= debug
? ms
->idtag
: NULL
;
135 struct mgmt_msg_hdr
*mhdr
;
140 MGMT_MSG_DBG(dbgtag
, "Have %zu streams to process", ms
->inq
.count
);
143 while (nproc
< ms
->max_read_buf
) {
144 work
= stream_fifo_pop(&ms
->inq
);
148 data
= STREAM_DATA(work
);
149 left
= stream_get_endp(work
);
150 MGMT_MSG_DBG(dbgtag
, "Processing stream of len %zu", left
);
152 for (; left
> sizeof(struct mgmt_msg_hdr
);
153 left
-= mhdr
->len
, data
+= mhdr
->len
) {
154 mhdr
= (struct mgmt_msg_hdr
*)data
;
156 assert(mhdr
->marker
== MGMT_MSG_MARKER
);
157 assert(left
>= mhdr
->len
);
159 handle_msg(user
, (uint8_t *)(mhdr
+ 1),
160 mhdr
->len
- sizeof(struct mgmt_msg_hdr
));
166 stream_free(work
); /* Free it up */
168 stream_reset(work
); /* Reset stream for next read */
171 /* return true if should reschedule b/c more to process. */
172 return stream_fifo_head(&ms
->inq
) != NULL
;
176 * Write data from a onto the socket, using streams that have been queued for
177 * sending by mgmt_msg_send_msg. This function should be reschedulable.
180 * ms: mgmt_msg_state for this process.
181 * fd: socket/file to read data from.
182 * debug: true to enable debug logging.
185 * MSW_SCHED_NONE - do not reschedule anything.
186 * MSW_SCHED_STREAM - this call should be rescheduled to run again.
187 * MSW_SCHED_WRITES_OFF - writes should be disabled with a timer to
188 * re-enable them a short time later
189 * MSW_DISCONNECT - socket should be closed and reconnect retried.
192 enum mgmt_msg_wsched
mgmt_msg_write(struct mgmt_msg_state
*ms
, int fd
,
195 const char *dbgtag
= debug
? ms
->idtag
: NULL
;
203 "found unqueued stream with %zu bytes, queueing",
204 stream_get_endp(ms
->outs
));
205 stream_fifo_push(&ms
->outq
, ms
->outs
);
209 for (s
= stream_fifo_head(&ms
->outq
); s
&& nproc
< ms
->max_write_buf
;
210 s
= stream_fifo_head(&ms
->outq
)) {
211 left
= STREAM_READABLE(s
);
214 n
= stream_flush(s
, fd
);
218 "connection closed while writing");
219 else if (ERRNO_IO_RETRY(errno
)) {
222 "retry error while writing %zd bytes: %s (%d)",
223 left
, safe_strerror(errno
), errno
);
224 return MSW_SCHED_STREAM
;
228 "error while writing %zd bytes: %s (%d)",
229 left
, safe_strerror(errno
), errno
);
231 n
= mgmt_msg_reset_writes(ms
);
232 MGMT_MSG_DBG(dbgtag
, "drop and freed %zd streams", n
);
234 return MSW_DISCONNECT
;
239 MGMT_MSG_DBG(dbgtag
, "short stream write %zd of %zd", n
,
241 stream_forward_getp(s
, n
);
242 return MSW_SCHED_STREAM
;
245 stream_free(stream_fifo_pop(&ms
->outq
));
246 MGMT_MSG_DBG(dbgtag
, "wrote stream of %zd bytes", n
);
252 "reached %zu buffer writes, pausing with %zu streams left",
253 ms
->max_write_buf
, ms
->outq
.count
);
254 return MSW_SCHED_WRITES_OFF
;
256 MGMT_MSG_DBG(dbgtag
, "flushed all streams from output q");
257 return MSW_SCHED_NONE
;
262 * Send a message by enqueueing it to be written over the socket by
266 * ms: mgmt_msg_state for this process.
267 * fd: socket/file to read data from.
268 * debug: true to enable debug logging.
271 * 0 on success, otherwise -1 on failure. The only failure mode is if a
272 * the message exceeds the maximum message size configured on init.
274 int mgmt_msg_send_msg(struct mgmt_msg_state
*ms
, void *msg
, size_t len
,
275 mgmt_msg_packf packf
, bool debug
)
277 const char *dbgtag
= debug
? ms
->idtag
: NULL
;
278 struct mgmt_msg_hdr
*mhdr
;
282 size_t mlen
= len
+ sizeof(*mhdr
);
284 if (mlen
> ms
->max_msg_sz
) {
285 MGMT_MSG_ERR(ms
, "Message %zu > max size %zu, dropping", mlen
,
291 MGMT_MSG_DBG(dbgtag
, "creating new stream for msg len %zu",
293 ms
->outs
= stream_new(ms
->max_msg_sz
);
294 } else if (STREAM_WRITEABLE(ms
->outs
) < mlen
) {
297 "enq existing stream len %zu and creating new stream for msg len %zu",
298 STREAM_WRITEABLE(ms
->outs
), mlen
);
299 stream_fifo_push(&ms
->outq
, ms
->outs
);
300 ms
->outs
= stream_new(ms
->max_msg_sz
);
304 "using existing stream with avail %zu for msg len %zu",
305 STREAM_WRITEABLE(ms
->outs
), mlen
);
309 /* We have a stream with space, pack the message into it. */
310 mhdr
= (struct mgmt_msg_hdr
*)(STREAM_DATA(s
) + s
->endp
);
311 mhdr
->marker
= MGMT_MSG_MARKER
;
313 stream_forward_endp(s
, sizeof(*mhdr
));
314 endp
= stream_get_endp(s
);
315 dstbuf
= STREAM_DATA(s
) + endp
;
316 n
= packf(msg
, dstbuf
);
317 stream_set_endp(s
, endp
+ n
);
324 * Create and open a unix domain stream socket on the given path
325 * setting non-blocking and send and receive buffer sizes.
328 * path: path of unix domain socket to connect to.
329 * sendbuf: size of socket send buffer.
330 * recvbuf: size of socket receive buffer.
331 * dbgtag: if non-NULL enable log debug, and use this tag.
334 * socket fd or -1 on error.
336 int mgmt_msg_connect(const char *path
, size_t sendbuf
, size_t recvbuf
,
340 struct sockaddr_un addr
;
342 MGMT_MSG_DBG(dbgtag
, "connecting to server on %s", path
);
343 sock
= socket(AF_UNIX
, SOCK_STREAM
, 0);
345 MGMT_MSG_DBG(dbgtag
, "socket failed: %s", safe_strerror(errno
));
349 memset(&addr
, 0, sizeof(struct sockaddr_un
));
350 addr
.sun_family
= AF_UNIX
;
351 strlcpy(addr
.sun_path
, path
, sizeof(addr
.sun_path
));
352 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
353 len
= addr
.sun_len
= SUN_LEN(&addr
);
355 len
= sizeof(addr
.sun_family
) + strlen(addr
.sun_path
);
356 #endif /* HAVE_STRUCT_SOCKADDR_UN_SUN_LEN */
357 ret
= connect(sock
, (struct sockaddr
*)&addr
, len
);
359 MGMT_MSG_DBG(dbgtag
, "failed to connect on %s: %s", path
,
360 safe_strerror(errno
));
365 MGMT_MSG_DBG(dbgtag
, "connected to server on %s", path
);
366 set_nonblocking(sock
);
367 setsockopt_so_sendbuf(sock
, sendbuf
);
368 setsockopt_so_recvbuf(sock
, recvbuf
);
373 * Reset the sending queue, by dequeueing all streams and freeing them. Return
374 * the number of streams freed.
377 * ms: mgmt_msg_state for this process.
380 * Number of streams that were freed.
383 size_t mgmt_msg_reset_writes(struct mgmt_msg_state
*ms
)
388 for (s
= stream_fifo_pop(&ms
->outq
); s
;
389 s
= stream_fifo_pop(&ms
->outq
), nproc
++)
395 void mgmt_msg_init(struct mgmt_msg_state
*ms
, size_t max_read_buf
,
396 size_t max_write_buf
, size_t max_msg_sz
, const char *idtag
)
398 memset(ms
, 0, sizeof(*ms
));
399 ms
->ins
= stream_new(max_msg_sz
);
400 stream_fifo_init(&ms
->inq
);
401 stream_fifo_init(&ms
->outq
);
402 ms
->max_read_buf
= max_write_buf
;
403 ms
->max_write_buf
= max_read_buf
;
404 ms
->max_msg_sz
= max_msg_sz
;
405 ms
->idtag
= strdup(idtag
);
408 void mgmt_msg_destroy(struct mgmt_msg_state
*ms
)
410 mgmt_msg_reset_writes(ms
);
412 stream_free(ms
->ins
);