]> git.proxmox.com Git - mirror_frr.git/blob - lib/mgmt_msg.c
Merge pull request #13414 from LabNConsulting/chopps/no-mgmtd-nowrite-on-off
[mirror_frr.git] / lib / mgmt_msg.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3 * March 6 2023, Christian Hopps <chopps@labn.net>
4 *
5 * Copyright (C) 2021 Vmware, Inc.
6 * Pushpasis Sarkar <spushpasis@vmware.com>
7 * Copyright (c) 2023, LabN Consulting, L.L.C.
8 */
9 #include <zebra.h>
10 #include "debug.h"
11 #include "network.h"
12 #include "sockopt.h"
13 #include "stream.h"
14 #include "frrevent.h"
15 #include "mgmt_msg.h"
16
17
18 #define MGMT_MSG_DBG(dbgtag, fmt, ...) \
19 do { \
20 if (dbgtag) \
21 zlog_debug("%s: %s: " fmt, dbgtag, __func__, \
22 ##__VA_ARGS__); \
23 } while (0)
24
25 #define MGMT_MSG_ERR(ms, fmt, ...) \
26 zlog_err("%s: %s: " fmt, (ms)->idtag, __func__, ##__VA_ARGS__)
27
28 DEFINE_MTYPE(LIB, MSG_CONN, "msg connection state");
29
30 /**
31 * Read data from a socket into streams containing 1 or more full msgs headed by
32 * mgmt_msg_hdr which contain API messages (currently protobuf).
33 *
34 * Args:
35 * ms: mgmt_msg_state for this process.
36 * fd: socket/file to read data from.
37 * debug: true to enable debug logging.
38 *
39 * Returns:
40 * MPP_DISCONNECT - socket should be closed and connect retried.
41 * MSV_SCHED_STREAM - this call should be rescheduled to run.
42 * MPP_SCHED_BOTH - this call and the procmsg buf should be scheduled to
43 *run.
44 */
45 enum mgmt_msg_rsched mgmt_msg_read(struct mgmt_msg_state *ms, int fd,
46 bool debug)
47 {
48 const char *dbgtag = debug ? ms->idtag : NULL;
49 size_t avail = STREAM_WRITEABLE(ms->ins);
50 struct mgmt_msg_hdr *mhdr = NULL;
51 size_t total = 0;
52 size_t mcount = 0;
53 ssize_t n, left;
54
55 assert(ms && fd != -1);
56
57 /*
58 * Read as much as we can into the stream.
59 */
60 while (avail > sizeof(struct mgmt_msg_hdr)) {
61 n = stream_read_try(ms->ins, fd, avail);
62 MGMT_MSG_DBG(dbgtag, "got %zd bytes", n);
63
64 /* -2 is normal nothing read, and to retry */
65 if (n == -2)
66 break;
67 if (n <= 0) {
68 if (n == 0)
69 MGMT_MSG_ERR(ms, "got EOF/disconnect");
70 else
71 MGMT_MSG_ERR(ms,
72 "got error while reading: '%s'",
73 safe_strerror(errno));
74 return MSR_DISCONNECT;
75 }
76 ms->nrxb += n;
77 avail -= n;
78 }
79
80 /*
81 * Check if we have read a complete messages or not.
82 */
83 assert(stream_get_getp(ms->ins) == 0);
84 left = stream_get_endp(ms->ins);
85 while (left > (long)sizeof(struct mgmt_msg_hdr)) {
86 mhdr = (struct mgmt_msg_hdr *)(STREAM_DATA(ms->ins) + total);
87 if (!MGMT_MSG_IS_MARKER(mhdr->marker)) {
88 MGMT_MSG_DBG(dbgtag, "recv corrupt buffer, disconnect");
89 return MSR_DISCONNECT;
90 }
91 if ((ssize_t)mhdr->len > left)
92 break;
93
94 MGMT_MSG_DBG(dbgtag, "read full message len %u", mhdr->len);
95 total += mhdr->len;
96 left -= mhdr->len;
97 mcount++;
98 }
99
100 if (!mcount)
101 return MSR_SCHED_STREAM;
102
103 /*
104 * We have read at least one message into the stream, queue it up.
105 */
106 mhdr = (struct mgmt_msg_hdr *)(STREAM_DATA(ms->ins) + total);
107 stream_set_endp(ms->ins, total);
108 stream_fifo_push(&ms->inq, ms->ins);
109 ms->ins = stream_new(ms->max_msg_sz);
110 if (left) {
111 stream_put(ms->ins, mhdr, left);
112 stream_set_endp(ms->ins, left);
113 }
114
115 return MSR_SCHED_BOTH;
116 }
117
118 /**
119 * Process streams containing whole messages that have been pushed onto the
120 * FIFO. This should be called from an event/timer handler and should be
121 * reschedulable.
122 *
123 * Args:
124 * ms: mgmt_msg_state for this process.
125 * handle_mgs: function to call for each received message.
126 * user: opaque value passed through to handle_msg.
127 * debug: true to enable debug logging.
128 *
129 * Returns:
130 * true if more to process (so reschedule) else false
131 */
132 bool mgmt_msg_procbufs(struct mgmt_msg_state *ms,
133 void (*handle_msg)(uint8_t version, uint8_t *msg,
134 size_t msglen, void *user),
135 void *user, bool debug)
136 {
137 const char *dbgtag = debug ? ms->idtag : NULL;
138 struct mgmt_msg_hdr *mhdr;
139 struct stream *work;
140 uint8_t *data;
141 size_t left, nproc;
142
143 MGMT_MSG_DBG(dbgtag, "Have %zu streams to process", ms->inq.count);
144
145 nproc = 0;
146 while (nproc < ms->max_read_buf) {
147 work = stream_fifo_pop(&ms->inq);
148 if (!work)
149 break;
150
151 data = STREAM_DATA(work);
152 left = stream_get_endp(work);
153 MGMT_MSG_DBG(dbgtag, "Processing stream of len %zu", left);
154
155 for (; left > sizeof(struct mgmt_msg_hdr);
156 left -= mhdr->len, data += mhdr->len) {
157 mhdr = (struct mgmt_msg_hdr *)data;
158
159 assert(MGMT_MSG_IS_MARKER(mhdr->marker));
160 assert(left >= mhdr->len);
161
162 handle_msg(MGMT_MSG_MARKER_VERSION(mhdr->marker),
163 (uint8_t *)(mhdr + 1),
164 mhdr->len - sizeof(struct mgmt_msg_hdr),
165 user);
166 ms->nrxm++;
167 nproc++;
168 }
169
170 if (work != ms->ins)
171 stream_free(work); /* Free it up */
172 else
173 stream_reset(work); /* Reset stream for next read */
174 }
175
176 /* return true if should reschedule b/c more to process. */
177 return stream_fifo_head(&ms->inq) != NULL;
178 }
179
180 /**
181 * Write data from a onto the socket, using streams that have been queued for
182 * sending by mgmt_msg_send_msg. This function should be reschedulable.
183 *
184 * Args:
185 * ms: mgmt_msg_state for this process.
186 * fd: socket/file to read data from.
187 * debug: true to enable debug logging.
188 *
189 * Returns:
190 * MSW_SCHED_NONE - do not reschedule anything.
191 * MSW_SCHED_STREAM - this call should be rescheduled to run again.
192 * MSW_SCHED_WRITES_OFF - writes should be disabled with a timer to
193 * re-enable them a short time later
194 * MSW_DISCONNECT - socket should be closed and reconnect retried.
195 *run.
196 */
197 enum mgmt_msg_wsched mgmt_msg_write(struct mgmt_msg_state *ms, int fd,
198 bool debug)
199 {
200 const char *dbgtag = debug ? ms->idtag : NULL;
201 struct stream *s;
202 size_t nproc = 0;
203 ssize_t left;
204 ssize_t n;
205
206 if (ms->outs) {
207 MGMT_MSG_DBG(dbgtag,
208 "found unqueued stream with %zu bytes, queueing",
209 stream_get_endp(ms->outs));
210 stream_fifo_push(&ms->outq, ms->outs);
211 ms->outs = NULL;
212 }
213
214 for (s = stream_fifo_head(&ms->outq); s && nproc < ms->max_write_buf;
215 s = stream_fifo_head(&ms->outq)) {
216 left = STREAM_READABLE(s);
217 assert(left);
218
219 n = stream_flush(s, fd);
220 if (n <= 0) {
221 if (n == 0)
222 MGMT_MSG_ERR(ms,
223 "connection closed while writing");
224 else if (ERRNO_IO_RETRY(errno)) {
225 MGMT_MSG_DBG(
226 dbgtag,
227 "retry error while writing %zd bytes: %s (%d)",
228 left, safe_strerror(errno), errno);
229 return MSW_SCHED_STREAM;
230 } else
231 MGMT_MSG_ERR(
232 ms,
233 "error while writing %zd bytes: %s (%d)",
234 left, safe_strerror(errno), errno);
235
236 n = mgmt_msg_reset_writes(ms);
237 MGMT_MSG_DBG(dbgtag, "drop and freed %zd streams", n);
238
239 return MSW_DISCONNECT;
240 }
241
242 ms->ntxb += n;
243 if (n != left) {
244 MGMT_MSG_DBG(dbgtag, "short stream write %zd of %zd", n,
245 left);
246 stream_forward_getp(s, n);
247 return MSW_SCHED_STREAM;
248 }
249
250 stream_free(stream_fifo_pop(&ms->outq));
251 MGMT_MSG_DBG(dbgtag, "wrote stream of %zd bytes", n);
252 nproc++;
253 }
254 if (s) {
255 MGMT_MSG_DBG(
256 dbgtag,
257 "reached %zu buffer writes, pausing with %zu streams left",
258 ms->max_write_buf, ms->outq.count);
259 return MSW_SCHED_STREAM;
260 }
261 MGMT_MSG_DBG(dbgtag, "flushed all streams from output q");
262 return MSW_SCHED_NONE;
263 }
264
265
266 /**
267 * Send a message by enqueueing it to be written over the socket by
268 * mgmt_msg_write.
269 *
270 * Args:
271 * ms: mgmt_msg_state for this process.
272 * version: version of this message, will be given to receiving side.
273 * msg: the message to be sent.
274 * len: the length of the message.
275 * packf: a function to pack the message.
276 * debug: true to enable debug logging.
277 *
278 * Returns:
279 * 0 on success, otherwise -1 on failure. The only failure mode is if a
280 * the message exceeds the maximum message size configured on init.
281 */
282 int mgmt_msg_send_msg(struct mgmt_msg_state *ms, uint8_t version, void *msg,
283 size_t len, size_t (*packf)(void *msg, void *buf),
284 bool debug)
285 {
286 const char *dbgtag = debug ? ms->idtag : NULL;
287 struct mgmt_msg_hdr *mhdr;
288 struct stream *s;
289 uint8_t *dstbuf;
290 size_t endp, n;
291 size_t mlen = len + sizeof(*mhdr);
292
293 if (mlen > ms->max_msg_sz) {
294 MGMT_MSG_ERR(ms, "Message %zu > max size %zu, dropping", mlen,
295 ms->max_msg_sz);
296 return -1;
297 }
298
299 if (!ms->outs) {
300 MGMT_MSG_DBG(dbgtag, "creating new stream for msg len %zu",
301 len);
302 ms->outs = stream_new(ms->max_msg_sz);
303 } else if (STREAM_WRITEABLE(ms->outs) < mlen) {
304 MGMT_MSG_DBG(
305 dbgtag,
306 "enq existing stream len %zu and creating new stream for msg len %zu",
307 STREAM_WRITEABLE(ms->outs), mlen);
308 stream_fifo_push(&ms->outq, ms->outs);
309 ms->outs = stream_new(ms->max_msg_sz);
310 } else {
311 MGMT_MSG_DBG(
312 dbgtag,
313 "using existing stream with avail %zu for msg len %zu",
314 STREAM_WRITEABLE(ms->outs), mlen);
315 }
316 s = ms->outs;
317
318 /* We have a stream with space, pack the message into it. */
319 mhdr = (struct mgmt_msg_hdr *)(STREAM_DATA(s) + s->endp);
320 mhdr->marker = MGMT_MSG_MARKER(version);
321 mhdr->len = mlen;
322 stream_forward_endp(s, sizeof(*mhdr));
323 endp = stream_get_endp(s);
324 dstbuf = STREAM_DATA(s) + endp;
325 if (packf)
326 n = packf(msg, dstbuf);
327 else {
328 memcpy(dstbuf, msg, len);
329 n = len;
330 }
331 stream_set_endp(s, endp + n);
332 ms->ntxm++;
333
334 return 0;
335 }
336
337 /**
338 * Create and open a unix domain stream socket on the given path
339 * setting non-blocking and send and receive buffer sizes.
340 *
341 * Args:
342 * path: path of unix domain socket to connect to.
343 * sendbuf: size of socket send buffer.
344 * recvbuf: size of socket receive buffer.
345 * dbgtag: if non-NULL enable log debug, and use this tag.
346 *
347 * Returns:
348 * socket fd or -1 on error.
349 */
350 int mgmt_msg_connect(const char *path, size_t sendbuf, size_t recvbuf,
351 const char *dbgtag)
352 {
353 int ret, sock, len;
354 struct sockaddr_un addr;
355
356 MGMT_MSG_DBG(dbgtag, "connecting to server on %s", path);
357 sock = socket(AF_UNIX, SOCK_STREAM, 0);
358 if (sock < 0) {
359 MGMT_MSG_DBG(dbgtag, "socket failed: %s", safe_strerror(errno));
360 return -1;
361 }
362
363 memset(&addr, 0, sizeof(struct sockaddr_un));
364 addr.sun_family = AF_UNIX;
365 strlcpy(addr.sun_path, path, sizeof(addr.sun_path));
366 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
367 len = addr.sun_len = SUN_LEN(&addr);
368 #else
369 len = sizeof(addr.sun_family) + strlen(addr.sun_path);
370 #endif /* HAVE_STRUCT_SOCKADDR_UN_SUN_LEN */
371 ret = connect(sock, (struct sockaddr *)&addr, len);
372 if (ret < 0) {
373 MGMT_MSG_DBG(dbgtag, "failed to connect on %s: %s", path,
374 safe_strerror(errno));
375 close(sock);
376 return -1;
377 }
378
379 MGMT_MSG_DBG(dbgtag, "connected to server on %s", path);
380 set_nonblocking(sock);
381 setsockopt_so_sendbuf(sock, sendbuf);
382 setsockopt_so_recvbuf(sock, recvbuf);
383 return sock;
384 }
385
386 /**
387 * Reset the sending queue, by dequeueing all streams and freeing them. Return
388 * the number of streams freed.
389 *
390 * Args:
391 * ms: mgmt_msg_state for this process.
392 *
393 * Returns:
394 * Number of streams that were freed.
395 *
396 */
397 size_t mgmt_msg_reset_writes(struct mgmt_msg_state *ms)
398 {
399 struct stream *s;
400 size_t nproc = 0;
401
402 for (s = stream_fifo_pop(&ms->outq); s;
403 s = stream_fifo_pop(&ms->outq), nproc++)
404 stream_free(s);
405
406 return nproc;
407 }
408
409
410 void mgmt_msg_init(struct mgmt_msg_state *ms, size_t max_read_buf,
411 size_t max_write_buf, size_t max_msg_sz, const char *idtag)
412 {
413 memset(ms, 0, sizeof(*ms));
414 ms->ins = stream_new(max_msg_sz);
415 stream_fifo_init(&ms->inq);
416 stream_fifo_init(&ms->outq);
417 ms->max_read_buf = max_write_buf;
418 ms->max_write_buf = max_read_buf;
419 ms->max_msg_sz = max_msg_sz;
420 ms->idtag = strdup(idtag);
421 }
422
423 void mgmt_msg_destroy(struct mgmt_msg_state *ms)
424 {
425 mgmt_msg_reset_writes(ms);
426 if (ms->ins)
427 stream_free(ms->ins);
428 free(ms->idtag);
429 }
430
431 /*
432 * Connections
433 */
434
435 #define MSG_CONN_DEFAULT_CONN_RETRY_MSEC 250
436 #define MSG_CONN_SEND_BUF_SIZE (1u << 16)
437 #define MSG_CONN_RECV_BUF_SIZE (1u << 16)
438
439 static void msg_client_sched_connect(struct msg_client *client,
440 unsigned long msec);
441
442 static void msg_conn_sched_proc_msgs(struct msg_conn *conn);
443 static void msg_conn_sched_read(struct msg_conn *conn);
444 static void msg_conn_sched_write(struct msg_conn *conn);
445
446 static void msg_conn_write(struct event *thread)
447 {
448 struct msg_conn *conn = EVENT_ARG(thread);
449 enum mgmt_msg_wsched rv;
450
451 rv = mgmt_msg_write(&conn->mstate, conn->fd, conn->debug);
452 if (rv == MSW_SCHED_STREAM)
453 msg_conn_sched_write(conn);
454 else if (rv == MSW_DISCONNECT)
455 msg_conn_disconnect(conn, conn->is_client);
456 else
457 assert(rv == MSW_SCHED_NONE);
458 }
459
460 static void msg_conn_read(struct event *thread)
461 {
462 struct msg_conn *conn = EVENT_ARG(thread);
463 enum mgmt_msg_rsched rv;
464
465 rv = mgmt_msg_read(&conn->mstate, conn->fd, conn->debug);
466 if (rv == MSR_DISCONNECT) {
467 msg_conn_disconnect(conn, conn->is_client);
468 return;
469 }
470 if (rv == MSR_SCHED_BOTH)
471 msg_conn_sched_proc_msgs(conn);
472 msg_conn_sched_read(conn);
473 }
474
475 /* collapse this into mgmt_msg_procbufs */
476 static void msg_conn_proc_msgs(struct event *thread)
477 {
478 struct msg_conn *conn = EVENT_ARG(thread);
479
480 if (mgmt_msg_procbufs(&conn->mstate,
481 (void (*)(uint8_t, uint8_t *, size_t,
482 void *))conn->handle_msg,
483 conn, conn->debug))
484 /* there's more, schedule handling more */
485 msg_conn_sched_proc_msgs(conn);
486 }
487
488 static void msg_conn_sched_read(struct msg_conn *conn)
489 {
490 event_add_read(conn->loop, msg_conn_read, conn, conn->fd,
491 &conn->read_ev);
492 }
493
494 static void msg_conn_sched_write(struct msg_conn *conn)
495 {
496 event_add_write(conn->loop, msg_conn_write, conn, conn->fd,
497 &conn->write_ev);
498 }
499
500 static void msg_conn_sched_proc_msgs(struct msg_conn *conn)
501 {
502 event_add_event(conn->loop, msg_conn_proc_msgs, conn, 0,
503 &conn->proc_msg_ev);
504 }
505
506
507 void msg_conn_disconnect(struct msg_conn *conn, bool reconnect)
508 {
509
510 /* disconnect short-circuit if present */
511 if (conn->remote_conn) {
512 conn->remote_conn->remote_conn = NULL;
513 conn->remote_conn = NULL;
514 }
515
516 if (conn->fd != -1) {
517 close(conn->fd);
518 conn->fd = -1;
519
520 /* Notify client through registered callback (if any) */
521 if (conn->notify_disconnect)
522 (void)(*conn->notify_disconnect)(conn);
523 }
524
525 if (reconnect) {
526 assert(conn->is_client);
527 msg_client_sched_connect(
528 container_of(conn, struct msg_client, conn),
529 MSG_CONN_DEFAULT_CONN_RETRY_MSEC);
530 }
531 }
532
533 int msg_conn_send_msg(struct msg_conn *conn, uint8_t version, void *msg,
534 size_t mlen, size_t (*packf)(void *, void *),
535 bool short_circuit_ok)
536 {
537 const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL;
538
539 if (conn->fd == -1) {
540 MGMT_MSG_ERR(&conn->mstate,
541 "can't send message on closed connection");
542 return -1;
543 }
544
545 /* immediately handle the message if short-circuit is present */
546 if (conn->remote_conn && short_circuit_ok) {
547 uint8_t *buf = msg;
548 size_t n = mlen;
549
550 if (packf) {
551 buf = XMALLOC(MTYPE_TMP, mlen);
552 n = packf(msg, buf);
553 }
554
555 MGMT_MSG_DBG(dbgtag, "SC send: depth %u msg: %p",
556 ++conn->short_circuit_depth, msg);
557
558 conn->remote_conn->handle_msg(version, buf, n,
559 conn->remote_conn);
560
561 MGMT_MSG_DBG(dbgtag, "SC return from depth: %u msg: %p",
562 conn->short_circuit_depth--, msg);
563
564 if (packf)
565 XFREE(MTYPE_TMP, buf);
566 return 0;
567 }
568
569 int rv = mgmt_msg_send_msg(&conn->mstate, version, msg, mlen, packf,
570 conn->debug);
571
572 msg_conn_sched_write(conn);
573
574 return rv;
575 }
576
577 void msg_conn_cleanup(struct msg_conn *conn)
578 {
579 struct mgmt_msg_state *ms = &conn->mstate;
580
581 /* disconnect short-circuit if present */
582 if (conn->remote_conn) {
583 conn->remote_conn->remote_conn = NULL;
584 conn->remote_conn = NULL;
585 }
586
587 if (conn->fd != -1) {
588 close(conn->fd);
589 conn->fd = -1;
590 }
591
592 EVENT_OFF(conn->read_ev);
593 EVENT_OFF(conn->write_ev);
594 EVENT_OFF(conn->proc_msg_ev);
595
596 mgmt_msg_destroy(ms);
597 }
598
599 /*
600 * Client Connections
601 */
602
603 DECLARE_LIST(msg_server_list, struct msg_server, link);
604
605 static struct msg_server_list_head msg_servers;
606
607 static void msg_client_connect(struct msg_client *conn);
608
609 static void msg_client_connect_timer(struct event *thread)
610 {
611 msg_client_connect(EVENT_ARG(thread));
612 }
613
614 static void msg_client_sched_connect(struct msg_client *client,
615 unsigned long msec)
616 {
617 struct msg_conn *conn = &client->conn;
618 const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL;
619
620 MGMT_MSG_DBG(dbgtag, "connection retry in %lu msec", msec);
621 if (msec)
622 event_add_timer_msec(conn->loop, msg_client_connect_timer,
623 client, msec, &client->conn_retry_tmr);
624 else
625 event_add_event(conn->loop, msg_client_connect_timer, client, 0,
626 &client->conn_retry_tmr);
627 }
628
629 static bool msg_client_connect_short_circuit(struct msg_client *client)
630 {
631 struct msg_conn *server_conn;
632 struct msg_server *server;
633 const char *dbgtag =
634 client->conn.debug ? client->conn.mstate.idtag : NULL;
635 union sockunion su = {0};
636 int sockets[2];
637
638 frr_each (msg_server_list, &msg_servers, server)
639 if (!strcmp(server->sopath, client->sopath))
640 break;
641 if (!server) {
642 MGMT_MSG_DBG(dbgtag,
643 "no short-circuit connection available for %s",
644 client->sopath);
645
646 return false;
647 }
648
649 if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockets)) {
650 MGMT_MSG_ERR(
651 &client->conn.mstate,
652 "socketpair failed trying to short-circuit connection on %s: %s",
653 client->sopath, safe_strerror(errno));
654 return false;
655 }
656
657 /* client side */
658 client->conn.fd = sockets[0];
659 set_nonblocking(sockets[0]);
660 setsockopt_so_sendbuf(sockets[0], client->conn.mstate.max_write_buf);
661 setsockopt_so_recvbuf(sockets[0], client->conn.mstate.max_read_buf);
662 client->conn.is_short_circuit = true;
663
664 /* server side */
665 memset(&su, 0, sizeof(union sockunion));
666 server_conn = server->create(sockets[1], &su);
667 server_conn->is_short_circuit = true;
668
669 client->conn.remote_conn = server_conn;
670 server_conn->remote_conn = &client->conn;
671
672 MGMT_MSG_DBG(
673 dbgtag,
674 "short-circuit connection on %s server %s:%d to client %s:%d",
675 client->sopath, server_conn->mstate.idtag, server_conn->fd,
676 client->conn.mstate.idtag, client->conn.fd);
677
678 MGMT_MSG_DBG(
679 server_conn->debug ? server_conn->mstate.idtag : NULL,
680 "short-circuit connection on %s client %s:%d to server %s:%d",
681 client->sopath, client->conn.mstate.idtag, client->conn.fd,
682 server_conn->mstate.idtag, server_conn->fd);
683
684 return true;
685 }
686
687
688 /* Connect and start reading from the socket */
689 static void msg_client_connect(struct msg_client *client)
690 {
691 struct msg_conn *conn = &client->conn;
692 const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL;
693
694 if (!client->short_circuit_ok ||
695 !msg_client_connect_short_circuit(client))
696 conn->fd =
697 mgmt_msg_connect(client->sopath, MSG_CONN_SEND_BUF_SIZE,
698 MSG_CONN_RECV_BUF_SIZE, dbgtag);
699
700 if (conn->fd == -1)
701 /* retry the connection */
702 msg_client_sched_connect(client,
703 MSG_CONN_DEFAULT_CONN_RETRY_MSEC);
704 else if (client->notify_connect && client->notify_connect(client))
705 /* client connect notify failed */
706 msg_conn_disconnect(conn, true);
707 else
708 /* start reading */
709 msg_conn_sched_read(conn);
710 }
711
712 void msg_client_init(struct msg_client *client, struct event_loop *tm,
713 const char *sopath,
714 int (*notify_connect)(struct msg_client *client),
715 int (*notify_disconnect)(struct msg_conn *client),
716 void (*handle_msg)(uint8_t version, uint8_t *data,
717 size_t len, struct msg_conn *client),
718 size_t max_read_buf, size_t max_write_buf,
719 size_t max_msg_sz, bool short_circuit_ok,
720 const char *idtag, bool debug)
721 {
722 struct msg_conn *conn = &client->conn;
723 memset(client, 0, sizeof(*client));
724
725 conn->loop = tm;
726 conn->fd = -1;
727 conn->handle_msg = handle_msg;
728 conn->notify_disconnect = notify_disconnect;
729 conn->is_client = true;
730 conn->debug = debug;
731 client->short_circuit_ok = short_circuit_ok;
732 client->sopath = strdup(sopath);
733 client->notify_connect = notify_connect;
734
735 mgmt_msg_init(&conn->mstate, max_read_buf, max_write_buf, max_msg_sz,
736 idtag);
737
738 /* XXX maybe just have client kick this off */
739 /* Start trying to connect to server */
740 msg_client_sched_connect(client, 0);
741 }
742
743 void msg_client_cleanup(struct msg_client *client)
744 {
745 assert(client->conn.is_client);
746
747 EVENT_OFF(client->conn_retry_tmr);
748 free(client->sopath);
749
750 msg_conn_cleanup(&client->conn);
751 }
752
753
754 /*
755 * Server-side connections
756 */
757
758 static void msg_server_accept(struct event *event)
759 {
760 struct msg_server *server = EVENT_ARG(event);
761 int fd;
762 union sockunion su;
763
764 if (server->fd < 0)
765 return;
766
767 /* We continue hearing server listen socket. */
768 event_add_read(server->loop, msg_server_accept, server, server->fd,
769 &server->listen_ev);
770
771 memset(&su, 0, sizeof(union sockunion));
772
773 /* We can handle IPv4 or IPv6 socket. */
774 fd = sockunion_accept(server->fd, &su);
775 if (fd < 0) {
776 zlog_err("Failed to accept %s client connection: %s",
777 server->idtag, safe_strerror(errno));
778 return;
779 }
780 set_nonblocking(fd);
781 set_cloexec(fd);
782
783 DEBUGD(server->debug, "Accepted new %s connection", server->idtag);
784
785 server->create(fd, &su);
786 }
787
788 int msg_server_init(struct msg_server *server, const char *sopath,
789 struct event_loop *loop,
790 struct msg_conn *(*create)(int fd, union sockunion *su),
791 const char *idtag, struct debug *debug)
792 {
793 int ret;
794 int sock;
795 struct sockaddr_un addr;
796 mode_t old_mask;
797
798 memset(server, 0, sizeof(*server));
799 server->fd = -1;
800
801 sock = socket(AF_UNIX, SOCK_STREAM, PF_UNSPEC);
802 if (sock < 0) {
803 zlog_err("Failed to create %s server socket: %s", server->idtag,
804 safe_strerror(errno));
805 goto fail;
806 }
807
808 addr.sun_family = AF_UNIX,
809 strlcpy(addr.sun_path, sopath, sizeof(addr.sun_path));
810 unlink(addr.sun_path);
811 old_mask = umask(0077);
812 ret = bind(sock, (struct sockaddr *)&addr, sizeof(addr));
813 if (ret < 0) {
814 zlog_err("Failed to bind %s server socket to '%s': %s",
815 server->idtag, addr.sun_path, safe_strerror(errno));
816 umask(old_mask);
817 goto fail;
818 }
819 umask(old_mask);
820
821 ret = listen(sock, MGMTD_MAX_CONN);
822 if (ret < 0) {
823 zlog_err("Failed to listen on %s server socket: %s",
824 server->idtag, safe_strerror(errno));
825 goto fail;
826 }
827
828 server->fd = sock;
829 server->loop = loop;
830 server->sopath = strdup(sopath);
831 server->idtag = strdup(idtag);
832 server->create = create;
833 server->debug = debug;
834
835 msg_server_list_add_head(&msg_servers, server);
836
837 event_add_read(server->loop, msg_server_accept, server, server->fd,
838 &server->listen_ev);
839
840
841 DEBUGD(debug, "Started %s server, listening on %s", idtag, sopath);
842 return 0;
843
844 fail:
845 if (sock >= 0)
846 close(sock);
847 server->fd = -1;
848 return -1;
849 }
850
851 void msg_server_cleanup(struct msg_server *server)
852 {
853 DEBUGD(server->debug, "Closing %s server", server->idtag);
854
855 if (server->listen_ev)
856 EVENT_OFF(server->listen_ev);
857
858 msg_server_list_del(&msg_servers, server);
859
860 if (server->fd >= 0)
861 close(server->fd);
862 free((char *)server->sopath);
863 free((char *)server->idtag);
864
865 memset(server, 0, sizeof(*server));
866 server->fd = -1;
867 }
868
869 /*
870 * Initialize and start reading from the accepted socket
871 *
872 * notify_connect - only called for disconnect i.e., connected == false
873 */
874 void msg_conn_accept_init(struct msg_conn *conn, struct event_loop *tm, int fd,
875 int (*notify_disconnect)(struct msg_conn *conn),
876 void (*handle_msg)(uint8_t version, uint8_t *data,
877 size_t len, struct msg_conn *conn),
878 size_t max_read, size_t max_write, size_t max_size,
879 const char *idtag)
880 {
881 conn->loop = tm;
882 conn->fd = fd;
883 conn->notify_disconnect = notify_disconnect;
884 conn->handle_msg = handle_msg;
885 conn->is_client = false;
886
887 mgmt_msg_init(&conn->mstate, max_read, max_write, max_size, idtag);
888
889 /* start reading */
890 msg_conn_sched_read(conn);
891
892 /* Make socket non-blocking. */
893 set_nonblocking(conn->fd);
894 setsockopt_so_sendbuf(conn->fd, MSG_CONN_SEND_BUF_SIZE);
895 setsockopt_so_recvbuf(conn->fd, MSG_CONN_RECV_BUF_SIZE);
896 }
897
898 struct msg_conn *
899 msg_server_conn_create(struct event_loop *tm, int fd,
900 int (*notify_disconnect)(struct msg_conn *conn),
901 void (*handle_msg)(uint8_t version, uint8_t *data,
902 size_t len, struct msg_conn *conn),
903 size_t max_read, size_t max_write, size_t max_size,
904 void *user, const char *idtag)
905 {
906 struct msg_conn *conn = XMALLOC(MTYPE_MSG_CONN, sizeof(*conn));
907 memset(conn, 0, sizeof(*conn));
908 msg_conn_accept_init(conn, tm, fd, notify_disconnect, handle_msg,
909 max_read, max_write, max_size, idtag);
910 conn->user = user;
911 return conn;
912 }
913
914 void msg_server_conn_delete(struct msg_conn *conn)
915 {
916 if (!conn)
917 return;
918 msg_conn_cleanup(conn);
919 XFREE(MTYPE_MSG_CONN, conn);
920 }