]> git.proxmox.com Git - mirror_frr.git/blob - lib/mgmt_msg.c
lib: new message library for mgmtd client and adapters
[mirror_frr.git] / lib / mgmt_msg.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3 * March 6 2023, Christian Hopps <chopps@labn.net>
4 *
5 * Copyright (C) 2021 Vmware, Inc.
6 * Pushpasis Sarkar <spushpasis@vmware.com>
7 * Copyright (c) 2023, LabN Consulting, L.L.C.
8 */
9 #include <zebra.h>
10 #include "network.h"
11 #include "sockopt.h"
12 #include "stream.h"
13 #include "thread.h"
14 #include "mgmt_msg.h"
15
16
17 #define MGMT_MSG_DBG(dbgtag, fmt, ...) \
18 do { \
19 if (dbgtag) \
20 zlog_debug("%s: %s: " fmt, dbgtag, __func__, \
21 ##__VA_ARGS__); \
22 } while (0)
23
24 #define MGMT_MSG_ERR(ms, fmt, ...) \
25 zlog_err("%s: %s: " fmt, ms->idtag, __func__, ##__VA_ARGS__)
26
27 /**
28 * Read data from a socket into streams containing 1 or more full msgs headed by
29 * mgmt_msg_hdr which contain API messages (currently protobuf).
30 *
31 * Args:
32 * ms: mgmt_msg_state for this process.
33 * fd: socket/file to read data from.
34 * debug: true to enable debug logging.
35 *
36 * Returns:
37 * MPP_DISCONNECT - socket should be closed and connect retried.
38 * MSV_SCHED_STREAM - this call should be rescheduled to run.
39 * MPP_SCHED_BOTH - this call and the procmsg buf should be scheduled to
40 *run.
41 */
42 enum mgmt_msg_rsched mgmt_msg_read(struct mgmt_msg_state *ms, int fd,
43 bool debug)
44 {
45 const char *dbgtag = debug ? ms->idtag : NULL;
46 size_t avail = STREAM_WRITEABLE(ms->ins);
47 struct mgmt_msg_hdr *mhdr = NULL;
48 size_t total = 0;
49 size_t mcount = 0;
50 ssize_t n, left;
51
52 assert(ms && fd != -1);
53
54 /*
55 * Read as much as we can into the stream.
56 */
57 while (avail > sizeof(struct mgmt_msg_hdr)) {
58 n = stream_read_try(ms->ins, fd, avail);
59 MGMT_MSG_DBG(dbgtag, "got %ld bytes", n);
60
61 /* -2 is normal nothing read, and to retry */
62 if (n == -2)
63 break;
64 if (n <= 0) {
65 if (n == 0)
66 MGMT_MSG_ERR(ms, "got EOF/disconnect");
67 else
68 MGMT_MSG_ERR(ms,
69 "got error while reading: '%s'",
70 safe_strerror(errno));
71 return MSR_DISCONNECT;
72 }
73 ms->nrxb += n;
74 avail -= n;
75 }
76
77 /*
78 * Check if we have read a complete messages or not.
79 */
80 assert(stream_get_getp(ms->ins) == 0);
81 left = stream_get_endp(ms->ins);
82 while (left > (long)sizeof(struct mgmt_msg_hdr)) {
83 mhdr = (struct mgmt_msg_hdr *)(STREAM_DATA(ms->ins) + total);
84 if (mhdr->marker != MGMT_MSG_MARKER) {
85 MGMT_MSG_DBG(dbgtag, "recv corrupt buffer, disconnect");
86 return MSR_DISCONNECT;
87 }
88 if (mhdr->len > left)
89 break;
90
91 MGMT_MSG_DBG(dbgtag, "read full message len %u", mhdr->len);
92 total += mhdr->len;
93 left -= mhdr->len;
94 mcount++;
95 }
96
97 if (!mcount)
98 return MSR_SCHED_STREAM;
99
100 /*
101 * We have read at least one message into the stream, queue it up.
102 */
103 mhdr = (struct mgmt_msg_hdr *)(STREAM_DATA(ms->ins) + total);
104 stream_set_endp(ms->ins, total);
105 stream_fifo_push(&ms->inq, ms->ins);
106 ms->ins = stream_new(ms->max_msg_sz);
107 if (left) {
108 stream_put(ms->ins, mhdr, left);
109 stream_set_endp(ms->ins, left);
110 }
111
112 return MSR_SCHED_BOTH;
113 }
114
115 /**
116 * Process streams containing whole messages that have been pushed onto the
117 * FIFO. This should be called from an event/timer handler and should be
118 * reschedulable.
119 *
120 * Args:
121 * ms: mgmt_msg_state for this process.
122 * handle_mgs: function to call for each received message.
123 * user: opaque value passed through to handle_msg.
124 * debug: true to enable debug logging.
125 *
126 * Returns:
127 * true if more to process (so reschedule) else false
128 */
129 bool mgmt_msg_procbufs(struct mgmt_msg_state *ms,
130 void (*handle_msg)(void *user, uint8_t *msg,
131 size_t msglen),
132 void *user, bool debug)
133 {
134 const char *dbgtag = debug ? ms->idtag : NULL;
135 struct mgmt_msg_hdr *mhdr;
136 struct stream *work;
137 uint8_t *data;
138 size_t left, nproc;
139
140 MGMT_MSG_DBG(dbgtag, "Have %zu streams to process", ms->inq.count);
141
142 nproc = 0;
143 while (nproc < ms->max_read_buf) {
144 work = stream_fifo_pop(&ms->inq);
145 if (!work)
146 break;
147
148 data = STREAM_DATA(work);
149 left = stream_get_endp(work);
150 MGMT_MSG_DBG(dbgtag, "Processing stream of len %zu", left);
151
152 for (; left > sizeof(struct mgmt_msg_hdr);
153 left -= mhdr->len, data += mhdr->len) {
154 mhdr = (struct mgmt_msg_hdr *)data;
155
156 assert(mhdr->marker == MGMT_MSG_MARKER);
157 assert(left >= mhdr->len);
158
159 handle_msg(user, (uint8_t *)(mhdr + 1),
160 mhdr->len - sizeof(struct mgmt_msg_hdr));
161 ms->nrxm++;
162 nproc++;
163 }
164
165 if (work != ms->ins)
166 stream_free(work); /* Free it up */
167 else
168 stream_reset(work); /* Reset stream for next read */
169 }
170
171 /* return true if should reschedule b/c more to process. */
172 return stream_fifo_head(&ms->inq) != NULL;
173 }
174
175 /**
176 * Write data from a onto the socket, using streams that have been queued for
177 * sending by mgmt_msg_send_msg. This function should be reschedulable.
178 *
179 * Args:
180 * ms: mgmt_msg_state for this process.
181 * fd: socket/file to read data from.
182 * debug: true to enable debug logging.
183 *
184 * Returns:
185 * MSW_SCHED_NONE - do not reschedule anything.
186 * MSW_SCHED_STREAM - this call should be rescheduled to run again.
187 * MSW_SCHED_WRITES_OFF - writes should be disabled with a timer to
188 * re-enable them a short time later
189 * MSW_DISCONNECT - socket should be closed and reconnect retried.
190 *run.
191 */
192 enum mgmt_msg_wsched mgmt_msg_write(struct mgmt_msg_state *ms, int fd,
193 bool debug)
194 {
195 const char *dbgtag = debug ? ms->idtag : NULL;
196 struct stream *s;
197 size_t nproc = 0;
198 ssize_t left;
199 ssize_t n;
200
201 if (ms->outs) {
202 MGMT_MSG_DBG(dbgtag,
203 "found unqueued stream with %zu bytes, queueing",
204 stream_get_endp(ms->outs));
205 stream_fifo_push(&ms->outq, ms->outs);
206 ms->outs = NULL;
207 }
208
209 for (s = stream_fifo_head(&ms->outq); s && nproc < ms->max_write_buf;
210 s = stream_fifo_head(&ms->outq)) {
211 left = STREAM_READABLE(s);
212 assert(left);
213
214 n = stream_flush(s, fd);
215 if (n <= 0) {
216 if (n == 0)
217 MGMT_MSG_ERR(ms,
218 "connection closed while writing");
219 else if (ERRNO_IO_RETRY(errno)) {
220 MGMT_MSG_DBG(
221 dbgtag,
222 "retry error while writing %zd bytes: %s (%d)",
223 left, safe_strerror(errno), errno);
224 return MSW_SCHED_STREAM;
225 } else
226 MGMT_MSG_ERR(
227 ms,
228 "error while writing %zd bytes: %s (%d)",
229 left, safe_strerror(errno), errno);
230
231 n = mgmt_msg_reset_writes(ms);
232 MGMT_MSG_DBG(dbgtag, "drop and freed %zd streams", n);
233
234 return MSW_DISCONNECT;
235 }
236
237 ms->ntxb += n;
238 if (n != left) {
239 MGMT_MSG_DBG(dbgtag, "short stream write %zd of %zd", n,
240 left);
241 stream_forward_getp(s, n);
242 return MSW_SCHED_STREAM;
243 }
244
245 stream_free(stream_fifo_pop(&ms->outq));
246 MGMT_MSG_DBG(dbgtag, "wrote stream of %zd bytes", n);
247 nproc++;
248 }
249 if (s) {
250 MGMT_MSG_DBG(
251 dbgtag,
252 "reached %zu buffer writes, pausing with %zu streams left",
253 ms->max_write_buf, ms->outq.count);
254 return MSW_SCHED_WRITES_OFF;
255 }
256 MGMT_MSG_DBG(dbgtag, "flushed all streams from output q");
257 return MSW_SCHED_NONE;
258 }
259
260
261 /**
262 * Send a message by enqueueing it to be written over the socket by
263 * mgmt_msg_write.
264 *
265 * Args:
266 * ms: mgmt_msg_state for this process.
267 * fd: socket/file to read data from.
268 * debug: true to enable debug logging.
269 *
270 * Returns:
271 * 0 on success, otherwise -1 on failure. The only failure mode is if a
272 * the message exceeds the maximum message size configured on init.
273 */
274 int mgmt_msg_send_msg(struct mgmt_msg_state *ms, void *msg, size_t len,
275 mgmt_msg_packf packf, bool debug)
276 {
277 const char *dbgtag = debug ? ms->idtag : NULL;
278 struct mgmt_msg_hdr *mhdr;
279 struct stream *s;
280 uint8_t *dstbuf;
281 size_t endp, n;
282 size_t mlen = len + sizeof(*mhdr);
283
284 if (mlen > ms->max_msg_sz) {
285 MGMT_MSG_ERR(ms, "Message %zu > max size %zu, dropping", mlen,
286 ms->max_msg_sz);
287 return -1;
288 }
289
290 if (!ms->outs) {
291 MGMT_MSG_DBG(dbgtag, "creating new stream for msg len %zu",
292 len);
293 ms->outs = stream_new(ms->max_msg_sz);
294 } else if (STREAM_WRITEABLE(ms->outs) < mlen) {
295 MGMT_MSG_DBG(
296 dbgtag,
297 "enq existing stream len %zu and creating new stream for msg len %zu",
298 STREAM_WRITEABLE(ms->outs), mlen);
299 stream_fifo_push(&ms->outq, ms->outs);
300 ms->outs = stream_new(ms->max_msg_sz);
301 } else {
302 MGMT_MSG_DBG(
303 dbgtag,
304 "using existing stream with avail %zu for msg len %zu",
305 STREAM_WRITEABLE(ms->outs), mlen);
306 }
307 s = ms->outs;
308
309 /* We have a stream with space, pack the message into it. */
310 mhdr = (struct mgmt_msg_hdr *)(STREAM_DATA(s) + s->endp);
311 mhdr->marker = MGMT_MSG_MARKER;
312 mhdr->len = mlen;
313 stream_forward_endp(s, sizeof(*mhdr));
314 endp = stream_get_endp(s);
315 dstbuf = STREAM_DATA(s) + endp;
316 n = packf(msg, dstbuf);
317 stream_set_endp(s, endp + n);
318 ms->ntxm++;
319
320 return 0;
321 }
322
323 /**
324 * Create and open a unix domain stream socket on the given path
325 * setting non-blocking and send and receive buffer sizes.
326 *
327 * Args:
328 * path: path of unix domain socket to connect to.
329 * sendbuf: size of socket send buffer.
330 * recvbuf: size of socket receive buffer.
331 * dbgtag: if non-NULL enable log debug, and use this tag.
332 *
333 * Returns:
334 * socket fd or -1 on error.
335 */
336 int mgmt_msg_connect(const char *path, size_t sendbuf, size_t recvbuf,
337 const char *dbgtag)
338 {
339 int ret, sock, len;
340 struct sockaddr_un addr;
341
342 MGMT_MSG_DBG(dbgtag, "connecting to server on %s", path);
343 sock = socket(AF_UNIX, SOCK_STREAM, 0);
344 if (sock < 0) {
345 MGMT_MSG_DBG(dbgtag, "socket failed: %s", safe_strerror(errno));
346 return -1;
347 }
348
349 memset(&addr, 0, sizeof(struct sockaddr_un));
350 addr.sun_family = AF_UNIX;
351 strlcpy(addr.sun_path, path, sizeof(addr.sun_path));
352 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
353 len = addr.sun_len = SUN_LEN(&addr);
354 #else
355 len = sizeof(addr.sun_family) + strlen(addr.sun_path);
356 #endif /* HAVE_STRUCT_SOCKADDR_UN_SUN_LEN */
357 ret = connect(sock, (struct sockaddr *)&addr, len);
358 if (ret < 0) {
359 MGMT_MSG_DBG(dbgtag, "failed to connect on %s: %s", path,
360 safe_strerror(errno));
361 close(sock);
362 return -1;
363 }
364
365 MGMT_MSG_DBG(dbgtag, "connected to server on %s", path);
366 set_nonblocking(sock);
367 setsockopt_so_sendbuf(sock, sendbuf);
368 setsockopt_so_recvbuf(sock, recvbuf);
369 return sock;
370 }
371
372 /**
373 * Reset the sending queue, by dequeueing all streams and freeing them. Return
374 * the number of streams freed.
375 *
376 * Args:
377 * ms: mgmt_msg_state for this process.
378 *
379 * Returns:
380 * Number of streams that were freed.
381 *
382 */
383 size_t mgmt_msg_reset_writes(struct mgmt_msg_state *ms)
384 {
385 struct stream *s;
386 size_t nproc = 0;
387
388 for (s = stream_fifo_pop(&ms->outq); s;
389 s = stream_fifo_pop(&ms->outq), nproc++)
390 stream_free(s);
391
392 return nproc;
393 }
394
395 void mgmt_msg_init(struct mgmt_msg_state *ms, size_t max_read_buf,
396 size_t max_write_buf, size_t max_msg_sz, const char *idtag)
397 {
398 memset(ms, 0, sizeof(*ms));
399 ms->ins = stream_new(max_msg_sz);
400 stream_fifo_init(&ms->inq);
401 stream_fifo_init(&ms->outq);
402 ms->max_read_buf = max_write_buf;
403 ms->max_write_buf = max_read_buf;
404 ms->max_msg_sz = max_msg_sz;
405 ms->idtag = strdup(idtag);
406 }
407
408 void mgmt_msg_destroy(struct mgmt_msg_state *ms)
409 {
410 mgmt_msg_reset_writes(ms);
411 if (ms->ins)
412 stream_free(ms->ins);
413 free(ms->idtag);
414 }