zserv_encode_interface(s, ifp);
client->ifadd_cnt++;
- return zebra_server_send_message(client, s);
+ return zserv_send_message(client, s);
}
/* Interface deletion from zebra daemon. */
zserv_encode_interface(s, ifp);
client->ifdel_cnt++;
- return zebra_server_send_message(client, s);
+ return zserv_send_message(client, s);
}
int zsend_vrf_add(struct zserv *client, struct zebra_vrf *zvrf)
zserv_encode_vrf(s, zvrf);
client->vrfadd_cnt++;
- return zebra_server_send_message(client, s);
+ return zserv_send_message(client, s);
}
/* VRF deletion from zebra daemon. */
zserv_encode_vrf(s, zvrf);
client->vrfdel_cnt++;
- return zebra_server_send_message(client, s);
+ return zserv_send_message(client, s);
}
int zsend_interface_link_params(struct zserv *client, struct interface *ifp)
/* Write packet size. */
stream_putw_at(s, 0, stream_get_endp(s));
- return zebra_server_send_message(client, s);
+ return zserv_send_message(client, s);
}
/* Interface address is added/deleted. Send ZEBRA_INTERFACE_ADDRESS_ADD or
stream_putw_at(s, 0, stream_get_endp(s));
client->connected_rt_add_cnt++;
- return zebra_server_send_message(client, s);
+ return zserv_send_message(client, s);
}
static int zsend_interface_nbr_address(int cmd, struct zserv *client,
/* Write packet size. */
stream_putw_at(s, 0, stream_get_endp(s));
- return zebra_server_send_message(client, s);
+ return zserv_send_message(client, s);
}
/* Interface address addition. */
stream_putw_at(s, 0, stream_get_endp(s));
client->if_vrfchg_cnt++;
- return zebra_server_send_message(client, s);
+ return zserv_send_message(client, s);
}
/* Add new nbr connected IPv6 address */
else
client->ifdown_cnt++;
- return zebra_server_send_message(client, s);
+ return zserv_send_message(client, s);
}
int zsend_redistribute_route(int cmd, struct zserv *client, struct prefix *p,
zebra_route_string(api.type), api.vrf_id,
buf_prefix);
}
- return zebra_server_send_message(client, s);
+ return zserv_send_message(client, s);
}
/*
stream_putw_at(s, 0, stream_get_endp(s));
- return zebra_server_send_message(client, s);
+ return zserv_send_message(client, s);
}
int zsend_route_notify_owner(struct route_entry *re, struct prefix *p,
struct stream *s;
uint8_t blen;
- client = zebra_find_client(re->type, re->instance);
+ client = zserv_find_client(re->type, re->instance);
if (!client || !client->notify_owner) {
if (IS_ZEBRA_DEBUG_PACKET) {
char buff[PREFIX_STRLEN];
stream_putw_at(s, 0, stream_get_endp(s));
- return zebra_server_send_message(client, s);
+ return zserv_send_message(client, s);
}
void zsend_rule_notify_owner(struct zebra_pbr_rule *rule,
stream_putw_at(s, 0, stream_get_endp(s));
- zebra_server_send_message(client, s);
+ zserv_send_message(client, s);
}
void zsend_ipset_notify_owner(struct zebra_pbr_ipset *ipset,
stream_put(s, ipset->ipset_name, ZEBRA_IPSET_NAME_SIZE);
stream_putw_at(s, 0, stream_get_endp(s));
- zebra_server_send_message(client, s);
+ zserv_send_message(client, s);
}
void zsend_ipset_entry_notify_owner(struct zebra_pbr_ipset_entry *ipset,
stream_put(s, ipset->backpointer->ipset_name, ZEBRA_IPSET_NAME_SIZE);
stream_putw_at(s, 0, stream_get_endp(s));
- zebra_server_send_message(client, s);
+ zserv_send_message(client, s);
}
void zsend_iptable_notify_owner(struct zebra_pbr_iptable *iptable,
stream_putl(s, iptable->unique);
stream_putw_at(s, 0, stream_get_endp(s));
- zebra_server_send_message(client, s);
+ zserv_send_message(client, s);
}
/* Router-id is updated. Send ZEBRA_ROUTER_ID_ADD to client. */
/* Write packet size. */
stream_putw_at(s, 0, stream_get_endp(s));
- return zebra_server_send_message(client, s);
+ return zserv_send_message(client, s);
}
/*
/* Put length at the first point of the stream. */
stream_putw_at(s, 0, stream_get_endp(s));
- return zebra_server_send_message(client, s);
+ return zserv_send_message(client, s);
}
/* Send response to a get label chunk request to client */
/* Write packet size. */
stream_putw_at(s, 0, stream_get_endp(s));
- return zebra_server_send_message(client, s);
+ return zserv_send_message(client, s);
}
static int zsend_table_manager_connect_response(struct zserv *client,
stream_putw_at(s, 0, stream_get_endp(s));
- return zebra_server_send_message(client, s);
+ return zserv_send_message(client, s);
}
/* Inbound message handling ------------------------------------------------ */
stream_putl(s, multipath_num);
stream_putw_at(s, 0, stream_get_endp(s));
- zebra_server_send_message(client, s);
+ zserv_send_message(client, s);
}
/* Tie up route-type and client->sock */
[ZEBRA_IPTABLE_DELETE] = zread_iptable,
};
-void zserv_handle_commands(struct zserv *client, struct zmsghdr *hdr,
- struct stream *msg, struct zebra_vrf *zvrf)
+#if defined(HANDLE_ZAPI_FUZZING)
+extern struct zebra_privs_t zserv_privs;
+
+static void zserv_write_incoming(struct stream *orig, uint16_t command)
{
- if (hdr->command > array_size(zserv_handlers)
- || zserv_handlers[hdr->command] == NULL)
- zlog_info("Zebra received unknown command %d", hdr->command);
- else
- zserv_handlers[hdr->command](client, hdr, msg, zvrf);
+ char fname[MAXPATHLEN];
+ struct stream *copy;
+ int fd = -1;
+
+ copy = stream_dup(orig);
+ stream_set_getp(copy, 0);
+
+ zserv_privs.change(ZPRIVS_RAISE);
+ snprintf(fname, MAXPATHLEN, "%s/%u", DAEMON_VTY_DIR, command);
+ fd = open(fname, O_CREAT | O_WRONLY | O_EXCL, 0644);
+ stream_flush(copy, fd);
+ close(fd);
+ zserv_privs.change(ZPRIVS_LOWER);
+ stream_free(copy);
+}
+#endif
+
+void zserv_handle_commands(struct zserv *client, struct stream *msg)
+{
+ struct zmsghdr hdr;
+ struct zebra_vrf *zvrf;
- stream_free(msg);
+ zapi_parse_header(msg, &hdr);
+
+#if defined(HANDLE_ZAPI_FUZZING)
+ zserv_write_incoming(msg, hdr.command);
+#endif
+
+ hdr.length -= ZEBRA_HEADER_SIZE;
+
+ /* lookup vrf */
+ zvrf = zebra_vrf_lookup_by_id(hdr.vrf_id);
+ if (!zvrf) {
+ if (IS_ZEBRA_DEBUG_PACKET && IS_ZEBRA_DEBUG_RECV)
+ zlog_warn("ZAPI message specifies unknown VRF: %d",
+ hdr.vrf_id);
+ return;
+ }
+
+ if (hdr.command > array_size(zserv_handlers)
+ || zserv_handlers[hdr.command] == NULL)
+ zlog_info("Zebra received unknown command %d", hdr.command);
+ else
+ zserv_handlers[hdr.command](client, &hdr, msg, zvrf);
}
#include "lib/vty.h" /* for vty_out, vty (ptr only) */
#include "lib/zassert.h" /* for assert */
#include "lib/zclient.h" /* for zmsghdr, ZEBRA_HEADER_SIZE, ZEBRA... */
+#include "lib/frr_pthread.h" /* for frr_pthread_new, frr_pthread_stop... */
+#include "lib/frratomic.h" /* for atomic_load_explicit, atomic_stor... */
#include "zebra/debug.h" /* for various debugging macros */
#include "zebra/rib.h" /* for rib_score_proto */
#include "zebra/zserv.h" /* for zserv */
/* clang-format on */
-/* Event list of zebra. */
-enum event { ZEBRA_READ, ZEBRA_WRITE };
/* privileges */
extern struct zebra_privs_t zserv_privs;
-/* post event into client */
-static void zebra_event(struct zserv *client, enum event event);
-
-
-/* Public interface --------------------------------------------------------- */
-
-int zebra_server_send_message(struct zserv *client, struct stream *msg)
-{
- stream_fifo_push(client->obuf_fifo, msg);
- zebra_event(client, ZEBRA_WRITE);
- return 0;
-}
-
-/* Lifecycle ---------------------------------------------------------------- */
-
-/* Hooks for client connect / disconnect */
-DEFINE_HOOK(zapi_client_connect, (struct zserv *client), (client));
-DEFINE_KOOH(zapi_client_close, (struct zserv *client), (client));
-
-/* free zebra client information. */
-static void zebra_client_free(struct zserv *client)
-{
- hook_call(zapi_client_close, client);
-
- /* Close file descriptor. */
- if (client->sock) {
- unsigned long nroutes;
-
- close(client->sock);
- nroutes = rib_score_proto(client->proto, client->instance);
- zlog_notice(
- "client %d disconnected. %lu %s routes removed from the rib",
- client->sock, nroutes,
- zebra_route_string(client->proto));
- client->sock = -1;
- }
-
- /* Free stream buffers. */
- if (client->ibuf_work)
- stream_free(client->ibuf_work);
- if (client->obuf_work)
- stream_free(client->obuf_work);
- if (client->ibuf_fifo)
- stream_fifo_free(client->ibuf_fifo);
- if (client->obuf_fifo)
- stream_fifo_free(client->obuf_fifo);
- if (client->wb)
- buffer_free(client->wb);
-
- /* Release threads. */
- if (client->t_read)
- thread_cancel(client->t_read);
- if (client->t_write)
- thread_cancel(client->t_write);
- if (client->t_suicide)
- thread_cancel(client->t_suicide);
-
- /* Free bitmaps. */
- for (afi_t afi = AFI_IP; afi < AFI_MAX; afi++)
- for (int i = 0; i < ZEBRA_ROUTE_MAX; i++)
- vrf_bitmap_free(client->redist[afi][i]);
-
- vrf_bitmap_free(client->redist_default);
- vrf_bitmap_free(client->ifinfo);
- vrf_bitmap_free(client->ridinfo);
-
- XFREE(MTYPE_TMP, client);
-}
/*
- * Called from client thread to terminate itself.
+ * Client thread events.
+ *
+ * These are used almost exclusively by client threads to drive their own event
+ * loops. The only exception is in zebra_client_create(), which pushes an
+ * initial ZSERV_CLIENT_READ event to start the API handler loop.
*/
-static void zebra_client_close(struct zserv *client)
-{
- listnode_delete(zebrad.client_list, client);
- zebra_client_free(client);
-}
-
-/* Make new client. */
-static void zebra_client_create(int sock)
-{
- struct zserv *client;
- int i;
- afi_t afi;
-
- client = XCALLOC(MTYPE_TMP, sizeof(struct zserv));
-
- /* Make client input/output buffer. */
- client->sock = sock;
- client->ibuf_fifo = stream_fifo_new();
- client->obuf_fifo = stream_fifo_new();
- client->ibuf_work = stream_new(ZEBRA_MAX_PACKET_SIZ);
- client->obuf_work = stream_new(ZEBRA_MAX_PACKET_SIZ);
- client->wb = buffer_new(0);
-
- /* Set table number. */
- client->rtm_table = zebrad.rtm_table_default;
-
- client->connect_time = monotime(NULL);
- /* Initialize flags */
- for (afi = AFI_IP; afi < AFI_MAX; afi++)
- for (i = 0; i < ZEBRA_ROUTE_MAX; i++)
- client->redist[afi][i] = vrf_bitmap_init();
- client->redist_default = vrf_bitmap_init();
- client->ifinfo = vrf_bitmap_init();
- client->ridinfo = vrf_bitmap_init();
-
- /* by default, it's not a synchronous client */
- client->is_synchronous = 0;
+enum zserv_client_event {
+ /* Schedule a socket read */
+ ZSERV_CLIENT_READ,
+ /* Schedule a buffer write */
+ ZSERV_CLIENT_WRITE,
+};
- /* Add this client to linked list. */
- listnode_add(zebrad.client_list, client);
-
- zebra_vrf_update_all(client);
+/*
+ * Main thread events.
+ *
+ * These are used by client threads to notify the main thread about various
+ * events and to make processing requests.
+ */
+enum zserv_event {
+ /* Schedule listen job on Zebra API socket */
+ ZSERV_ACCEPT,
+ /* The calling client has packets on its input buffer */
+ ZSERV_PROCESS_MESSAGES,
+ /* The calling client wishes to be killed */
+ ZSERV_HANDLE_CLOSE,
+};
- hook_call(zapi_client_connect, client);
+/*
+ * Zebra server event driver for all client threads.
+ *
+ * This is essentially a wrapper around thread_add_event() that centralizes
+ * those scheduling calls into one place.
+ *
+ * All calls to this function schedule an event on the pthread running the
+ * provided client.
+ *
+ * client
+ * the client in question, and thread target
+ *
+ * event
+ * the event to notify them about
+ */
+static void zserv_client_event(struct zserv *client,
+ enum zserv_client_event event);
- /* start read loop */
- zebra_event(client, ZEBRA_READ);
-}
+/*
+ * Zebra server event driver for the main thread.
+ *
+ * This is essentially a wrapper around thread_add_event() that centralizes
+ * those scheduling calls into one place.
+ *
+ * All calls to this function schedule an event on Zebra's main pthread.
+ *
+ * client
+ * the client in question
+ *
+ * event
+ * the event to notify the main thread about
+ */
+static void zserv_event(struct zserv *client, enum zserv_event event);
-static int zserv_delayed_close(struct thread *thread)
-{
- struct zserv *client = THREAD_ARG(thread);
- client->t_suicide = NULL;
- zebra_client_close(client);
- return 0;
-}
+/* Client thread lifecycle -------------------------------------------------- */
/*
* Log zapi message to zlog.
zlog_hexdump(msg->data, STREAM_READABLE(msg));
}
-static int zserv_flush_data(struct thread *thread)
+/*
+ * Gracefully shut down a client connection.
+ *
+ * Cancel any pending tasks for the client's thread. Then schedule a task on the
+ * main thread to shut down the calling thread.
+ *
+ * Must be called from the client pthread, never the main thread.
+ */
+static void zserv_client_close(struct zserv *client)
{
- struct zserv *client = THREAD_ARG(thread);
-
- client->t_write = NULL;
- if (client->t_suicide) {
- zebra_client_close(client);
- return -1;
- }
- switch (buffer_flush_available(client->wb, client->sock)) {
- case BUFFER_ERROR:
- zlog_warn(
- "%s: buffer_flush_available failed on zserv client fd %d, closing",
- __func__, client->sock);
- zebra_client_close(client);
- client = NULL;
- break;
- case BUFFER_PENDING:
- client->t_write = NULL;
- thread_add_write(zebrad.master, zserv_flush_data, client,
- client->sock, &client->t_write);
- break;
- case BUFFER_EMPTY:
- break;
- }
-
- if (client)
- client->last_write_time = monotime(NULL);
- return 0;
+ atomic_store_explicit(&client->pthread->running, false,
+ memory_order_seq_cst);
+ THREAD_OFF(client->t_read);
+ THREAD_OFF(client->t_write);
+ zserv_event(client, ZSERV_HANDLE_CLOSE);
}
/*
- * Write a single packet.
+ * Write all pending messages to client socket.
+ *
+ * This function first attempts to flush any buffered data. If unsuccessful,
+ * the function reschedules itself and returns. If successful, it pops all
+ * available messages from the output queue and continues to write data
+ * directly to the socket until the socket would block. If the socket never
+ * blocks and all data is written, the function returns without rescheduling
+ * itself. If the socket ends up throwing EWOULDBLOCK, the remaining data is
+ * buffered and the function reschedules itself.
+ *
+ * The utility of the buffer is that it allows us to vastly reduce lock
+ * contention by allowing us to pop *all* messages off the output queue at once
+ * instead of locking and unlocking each time we want to pop a single message
+ * off the queue. The same thing could arguably be accomplished faster by
+ * allowing the main thread to write directly into the buffer instead of
+ * enqueuing packets onto an intermediary queue, but the intermediary queue
+ * allows us to expose information about input and output queues to the user in
+ * terms of number of packets rather than size of data.
*/
static int zserv_write(struct thread *thread)
{
struct zserv *client = THREAD_ARG(thread);
struct stream *msg;
- int writerv;
-
- if (client->t_suicide)
- return -1;
-
- if (client->is_synchronous)
- return 0;
-
- msg = stream_fifo_pop(client->obuf_fifo);
- stream_set_getp(msg, 0);
- client->last_write_cmd = stream_getw_from(msg, 6);
+ uint32_t wcmd;
+ struct stream_fifo *cache;
- writerv = buffer_write(client->wb, client->sock, STREAM_DATA(msg),
- stream_get_endp(msg));
-
- stream_free(msg);
-
- switch (writerv) {
+ /* If we have any data pending, try to flush it first */
+ switch (buffer_flush_all(client->wb, client->sock)) {
case BUFFER_ERROR:
- zlog_warn(
- "%s: buffer_write failed to zserv client fd %d, closing",
- __func__, client->sock);
- /*
- * Schedule a delayed close since many of the functions that
- * call this one do not check the return code. They do not
- * allow for the possibility that an I/O error may have caused
- * the client to be deleted.
- */
- client->t_suicide = NULL;
- thread_add_event(zebrad.master, zserv_delayed_close, client, 0,
- &client->t_suicide);
- return -1;
- case BUFFER_EMPTY:
- THREAD_OFF(client->t_write);
- break;
+ goto zwrite_fail;
case BUFFER_PENDING:
- thread_add_write(zebrad.master, zserv_flush_data, client,
- client->sock, &client->t_write);
+ atomic_store_explicit(&client->last_write_time,
+ (uint32_t)monotime(NULL),
+ memory_order_relaxed);
+ zserv_client_event(client, ZSERV_CLIENT_WRITE);
+ return 0;
+ case BUFFER_EMPTY:
break;
}
- if (client->obuf_fifo->count)
- zebra_event(client, ZEBRA_WRITE);
-
- client->last_write_time = monotime(NULL);
- return 0;
-}
-
-#if defined(HANDLE_ZAPI_FUZZING)
-static void zserv_write_incoming(struct stream *orig, uint16_t command)
-{
- char fname[MAXPATHLEN];
- struct stream *copy;
- int fd = -1;
+ cache = stream_fifo_new();
- copy = stream_dup(orig);
- stream_set_getp(copy, 0);
+ pthread_mutex_lock(&client->obuf_mtx);
+ {
+ while (stream_fifo_head(client->obuf_fifo))
+ stream_fifo_push(cache,
+ stream_fifo_pop(client->obuf_fifo));
+ }
+ pthread_mutex_unlock(&client->obuf_mtx);
- zserv_privs.change(ZPRIVS_RAISE);
- snprintf(fname, MAXPATHLEN, "%s/%u", DAEMON_VTY_DIR, command);
- fd = open(fname, O_CREAT | O_WRONLY | O_EXCL, 0644);
- stream_flush(copy, fd);
- close(fd);
- zserv_privs.change(ZPRIVS_LOWER);
- stream_free(copy);
-}
-#endif
+ if (cache->tail) {
+ msg = cache->tail;
+ stream_set_getp(msg, 0);
+ wcmd = stream_getw_from(msg, 6);
+ }
-static int zserv_process_messages(struct thread *thread)
-{
- struct zserv *client = THREAD_ARG(thread);
- struct zebra_vrf *zvrf;
- struct zmsghdr hdr;
- struct stream *msg;
- bool hdrvalid;
+ while (stream_fifo_head(cache)) {
+ msg = stream_fifo_pop(cache);
+ buffer_put(client->wb, STREAM_DATA(msg), stream_get_endp(msg));
+ stream_free(msg);
+ }
- do {
- msg = stream_fifo_pop(client->ibuf_fifo);
+ stream_fifo_free(cache);
- /* break if out of messages */
- if (!msg)
- continue;
+ /* If we have any data pending, try to flush it first */
+ switch (buffer_flush_all(client->wb, client->sock)) {
+ case BUFFER_ERROR:
+ goto zwrite_fail;
+ case BUFFER_PENDING:
+ atomic_store_explicit(&client->last_write_time,
+ (uint32_t)monotime(NULL),
+ memory_order_relaxed);
+ zserv_client_event(client, ZSERV_CLIENT_WRITE);
+ return 0;
+ case BUFFER_EMPTY:
+ break;
+ }
- /* read & check header */
- hdrvalid = zapi_parse_header(msg, &hdr);
- if (!hdrvalid && IS_ZEBRA_DEBUG_PACKET && IS_ZEBRA_DEBUG_RECV) {
- const char *emsg = "Message has corrupt header";
- zserv_log_message(emsg, msg, NULL);
- }
- if (!hdrvalid)
- continue;
-
- hdr.length -= ZEBRA_HEADER_SIZE;
- /* lookup vrf */
- zvrf = zebra_vrf_lookup_by_id(hdr.vrf_id);
- if (!zvrf && IS_ZEBRA_DEBUG_PACKET && IS_ZEBRA_DEBUG_RECV) {
- const char *emsg = "Message specifies unknown VRF";
- zserv_log_message(emsg, msg, &hdr);
- }
- if (!zvrf)
- continue;
+ atomic_store_explicit(&client->last_write_cmd, wcmd,
+ memory_order_relaxed);
- /* process commands */
- zserv_handle_commands(client, &hdr, msg, zvrf);
+ atomic_store_explicit(&client->last_write_time,
+ (uint32_t)monotime(NULL), memory_order_relaxed);
- } while (msg);
+ return 0;
+zwrite_fail:
+ zlog_warn("%s: could not write to %s [fd = %d], closing.", __func__,
+ zebra_route_string(client->proto), client->sock);
+ zserv_client_close(client);
return 0;
}
-/* Handler of zebra service request. */
+/*
+ * Read and process data from a client socket.
+ *
+ * The responsibilities here are to read raw data from the client socket,
+ * validate the header, encapsulate it into a single stream object, push it
+ * onto the input queue and then notify the main thread that there is new data
+ * available.
+ *
+ * This function first looks for any data in the client structure's working
+ * input buffer. If data is present, it is assumed that reading stopped in a
+ * previous invocation of this task and needs to be resumed to finish a message.
+ * Otherwise, the socket data stream is assumed to be at the beginning of a new
+ * ZAPI message (specifically at the header). The header is read and validated.
+ * If the header passed validation then the length field found in the header is
+ * used to compute the total length of the message. That much data is read (but
+ * not inspected), appended to the header, placed into a stream and pushed onto
+ * the client's input queue. A task is then scheduled on the main thread to
+ * process the client's input queue. Finally, if all of this was successful,
+ * this task reschedules itself.
+ *
+ * Any failure in any of these actions is handled by terminating the client.
+ */
static int zserv_read(struct thread *thread)
{
+ struct zserv *client = THREAD_ARG(thread);
int sock;
- struct zserv *client;
size_t already;
-#if defined(HANDLE_ZAPI_FUZZING)
- int packets = 1;
-#else
- int packets = zebrad.packets_to_process;
-#endif
- /* Get thread data. Reset reading thread because I'm running. */
- sock = THREAD_FD(thread);
- client = THREAD_ARG(thread);
+ struct stream_fifo *cache;
+ uint32_t p2p_orig;
- if (client->t_suicide) {
- zebra_client_close(client);
- return -1;
- }
+ uint32_t p2p;
+ struct zmsghdr hdr;
- while (packets) {
- struct zmsghdr hdr;
+ p2p_orig = atomic_load_explicit(&zebrad.packets_to_process,
+ memory_order_relaxed);
+ cache = stream_fifo_new();
+ p2p = p2p_orig;
+ sock = THREAD_FD(thread);
+
+ while (p2p) {
ssize_t nb;
bool hdrvalid;
char errmsg[256];
"Message has corrupt header\n%s: socket %d message length %u exceeds buffer size %lu",
__func__, sock, hdr.length,
(unsigned long)STREAM_SIZE(client->ibuf_work));
+ zserv_log_message(errmsg, client->ibuf_work, &hdr);
goto zread_fail;
}
}
}
-#if defined(HANDLE_ZAPI_FUZZING)
- zserv_write_incoming(client->ibuf_work, command);
-#endif
-
/* Debug packet information. */
if (IS_ZEBRA_DEBUG_EVENT)
zlog_debug("zebra message comes from socket [%d]",
if (IS_ZEBRA_DEBUG_PACKET && IS_ZEBRA_DEBUG_RECV)
zserv_log_message(NULL, client->ibuf_work, &hdr);
- client->last_read_time = monotime(NULL);
- client->last_read_cmd = hdr.command;
-
stream_set_getp(client->ibuf_work, 0);
struct stream *msg = stream_dup(client->ibuf_work);
- stream_fifo_push(client->ibuf_fifo, msg);
+ stream_fifo_push(cache, msg);
+ stream_reset(client->ibuf_work);
+ p2p--;
+ }
- if (client->t_suicide)
- goto zread_fail;
+ if (p2p < p2p_orig) {
+ /* update session statistics */
+ atomic_store_explicit(&client->last_read_time, monotime(NULL),
+ memory_order_relaxed);
+ atomic_store_explicit(&client->last_read_cmd, hdr.command,
+ memory_order_relaxed);
+
+ /* publish read packets on client's input queue */
+ pthread_mutex_lock(&client->ibuf_mtx);
+ {
+ while (cache->head)
+ stream_fifo_push(client->ibuf_fifo,
+ stream_fifo_pop(cache));
+ }
+ pthread_mutex_unlock(&client->ibuf_mtx);
+
+ /* Schedule job to process those packets */
+ zserv_event(client, ZSERV_PROCESS_MESSAGES);
- --packets;
- stream_reset(client->ibuf_work);
}
if (IS_ZEBRA_DEBUG_PACKET)
- zlog_debug("Read %d packets",
- zebrad.packets_to_process - packets);
-
- /* Schedule job to process those packets */
- thread_add_event(zebrad.master, &zserv_process_messages, client, 0,
- NULL);
+ zlog_debug("Read %d packets", p2p_orig - p2p);
/* Reschedule ourselves */
- zebra_event(client, ZEBRA_READ);
+ zserv_client_event(client, ZSERV_CLIENT_READ);
+
+ stream_fifo_free(cache);
return 0;
zread_fail:
- zebra_client_close(client);
+ stream_fifo_free(cache);
+ zserv_client_close(client);
return -1;
}
-static void zebra_event(struct zserv *client, enum event event)
+static void zserv_client_event(struct zserv *client,
+ enum zserv_client_event event)
{
switch (event) {
- case ZEBRA_READ:
- thread_add_read(zebrad.master, zserv_read, client, client->sock,
- &client->t_read);
+ case ZSERV_CLIENT_READ:
+ thread_add_read(client->pthread->master, zserv_read, client,
+ client->sock, &client->t_read);
break;
- case ZEBRA_WRITE:
- thread_add_write(zebrad.master, zserv_write, client,
+ case ZSERV_CLIENT_WRITE:
+ thread_add_write(client->pthread->master, zserv_write, client,
client->sock, &client->t_write);
break;
}
}
-/* Accept code of zebra server socket. */
-static int zebra_accept(struct thread *thread)
+/* Main thread lifecycle ---------------------------------------------------- */
+
+/*
+ * Read and process messages from a client.
+ *
+ * This task runs on the main pthread. It is scheduled by client pthreads when
+ * they have new messages available on their input queues. The client is passed
+ * as the task argument.
+ *
+ * Each message is popped off the client's input queue and the action associated
+ * with the message is executed. This proceeds until there are no more messages,
+ * an error occurs, or the processing limit is reached.
+ *
+ * The client's I/O thread can push at most zebrad.packets_to_process messages
+ * onto the input buffer before notifying us there are packets to read. As long
+ * as we always process zebrad.packets_to_process messages here, then we can
+ * rely on the read thread to handle queuing this task enough times to process
+ * everything on the input queue.
+ */
+static int zserv_process_messages(struct thread *thread)
+{
+ struct zserv *client = THREAD_ARG(thread);
+ struct stream *msg;
+ struct stream_fifo *cache = stream_fifo_new();
+
+ uint32_t p2p = zebrad.packets_to_process;
+
+ pthread_mutex_lock(&client->ibuf_mtx);
+ {
+ uint32_t i;
+ for (i = 0; i < p2p && stream_fifo_head(client->ibuf_fifo);
+ ++i) {
+ msg = stream_fifo_pop(client->ibuf_fifo);
+ stream_fifo_push(cache, msg);
+ }
+
+ msg = NULL;
+ }
+ pthread_mutex_unlock(&client->ibuf_mtx);
+
+ while (stream_fifo_head(cache)) {
+ msg = stream_fifo_pop(cache);
+ zserv_handle_commands(client, msg);
+ stream_free(msg);
+ }
+
+ stream_fifo_free(cache);
+
+ return 0;
+}
+
+int zserv_send_message(struct zserv *client, struct stream *msg)
+{
+ /*
+ * This is a somewhat poorly named variable added with Zebra's portion
+ * of the label manager. That component does not use the regular
+ * zserv/zapi_msg interface for handling its messages, as the client
+ * itself runs in-process. Instead it uses synchronous writes on the
+ * zserv client's socket directly in the zread* handlers for its
+ * message types. Furthermore, it cannot handle the usual messages
+ * Zebra sends (such as those for interface changes) and so has added
+ * this flag and check here as a hack to suppress all messages that it
+ * does not explicitly know about.
+ *
+ * In any case this needs to be cleaned up at some point.
+ *
+ * See also:
+ * zread_label_manager_request
+ * zsend_label_manager_connect_response
+ * zsend_assign_label_chunk_response
+ * ...
+ */
+ if (client->is_synchronous)
+ return 0;
+
+ pthread_mutex_lock(&client->obuf_mtx);
+ {
+ stream_fifo_push(client->obuf_fifo, msg);
+ }
+ pthread_mutex_unlock(&client->obuf_mtx);
+
+ zserv_client_event(client, ZSERV_CLIENT_WRITE);
+
+ return 0;
+}
+
+
+/* Hooks for client connect / disconnect */
+DEFINE_HOOK(zserv_client_connect, (struct zserv *client), (client));
+DEFINE_KOOH(zserv_client_close, (struct zserv *client), (client));
+
+/*
+ * Deinitialize zebra client.
+ *
+ * - Deregister and deinitialize related internal resources
+ * - Gracefully close socket
+ * - Free associated resources
+ * - Free client structure
+ *
+ * This does *not* take any action on the struct thread * fields. These are
+ * managed by the owning pthread and any tasks associated with them must have
+ * been stopped prior to invoking this function.
+ */
+static void zserv_client_free(struct zserv *client)
+{
+ hook_call(zserv_client_close, client);
+
+ /* Close file descriptor. */
+ if (client->sock) {
+ unsigned long nroutes;
+
+ close(client->sock);
+ nroutes = rib_score_proto(client->proto, client->instance);
+ zlog_notice(
+ "client %d disconnected. %lu %s routes removed from the rib",
+ client->sock, nroutes,
+ zebra_route_string(client->proto));
+ client->sock = -1;
+ }
+
+ /* Free stream buffers. */
+ if (client->ibuf_work)
+ stream_free(client->ibuf_work);
+ if (client->obuf_work)
+ stream_free(client->obuf_work);
+ if (client->ibuf_fifo)
+ stream_fifo_free(client->ibuf_fifo);
+ if (client->obuf_fifo)
+ stream_fifo_free(client->obuf_fifo);
+ if (client->wb)
+ buffer_free(client->wb);
+
+ /* Free buffer mutexes */
+ pthread_mutex_destroy(&client->obuf_mtx);
+ pthread_mutex_destroy(&client->ibuf_mtx);
+
+ /* Free bitmaps. */
+ for (afi_t afi = AFI_IP; afi < AFI_MAX; afi++)
+ for (int i = 0; i < ZEBRA_ROUTE_MAX; i++)
+ vrf_bitmap_free(client->redist[afi][i]);
+
+ vrf_bitmap_free(client->redist_default);
+ vrf_bitmap_free(client->ifinfo);
+ vrf_bitmap_free(client->ridinfo);
+
+ XFREE(MTYPE_TMP, client);
+}
+
+/*
+ * Finish closing a client.
+ *
+ * This task is scheduled by a ZAPI client pthread on the main pthread when it
+ * wants to stop itself. When this executes, the client connection should
+ * already have been closed. This task's responsibility is to gracefully
+ * terminate the client thread, update relevant internal datastructures and
+ * free any resources allocated by the main thread.
+ */
+static int zserv_handle_client_close(struct thread *thread)
+{
+ struct zserv *client = THREAD_ARG(thread);
+
+ /*
+ * Ensure these have been nulled. This does not equate to the
+ * associated task(s) being scheduled or unscheduled on the client
+ * pthread's threadmaster.
+ */
+ assert(!client->t_read);
+ assert(!client->t_write);
+
+ /* synchronously stop thread */
+ frr_pthread_stop(client->pthread, NULL);
+
+ /* destroy frr_pthread */
+ frr_pthread_destroy(client->pthread);
+ client->pthread = NULL;
+
+ listnode_delete(zebrad.client_list, client);
+ zserv_client_free(client);
+ return 0;
+}
+
+/*
+ * Create a new client.
+ *
+ * This is called when a new connection is accept()'d on the ZAPI socket. It
+ * initializes new client structure, notifies any subscribers of the connection
+ * event and spawns the client's thread.
+ *
+ * sock
+ * client's socket file descriptor
+ */
+static void zserv_client_create(int sock)
+{
+ struct zserv *client;
+ int i;
+ afi_t afi;
+
+ client = XCALLOC(MTYPE_TMP, sizeof(struct zserv));
+
+ /* Make client input/output buffer. */
+ client->sock = sock;
+ client->ibuf_fifo = stream_fifo_new();
+ client->obuf_fifo = stream_fifo_new();
+ client->ibuf_work = stream_new(ZEBRA_MAX_PACKET_SIZ);
+ client->obuf_work = stream_new(ZEBRA_MAX_PACKET_SIZ);
+ pthread_mutex_init(&client->ibuf_mtx, NULL);
+ pthread_mutex_init(&client->obuf_mtx, NULL);
+ client->wb = buffer_new(0);
+
+ /* Set table number. */
+ client->rtm_table = zebrad.rtm_table_default;
+
+ atomic_store_explicit(&client->connect_time, (uint32_t) monotime(NULL),
+ memory_order_relaxed);
+
+ /* Initialize flags */
+ for (afi = AFI_IP; afi < AFI_MAX; afi++)
+ for (i = 0; i < ZEBRA_ROUTE_MAX; i++)
+ client->redist[afi][i] = vrf_bitmap_init();
+ client->redist_default = vrf_bitmap_init();
+ client->ifinfo = vrf_bitmap_init();
+ client->ridinfo = vrf_bitmap_init();
+
+ /* by default, it's not a synchronous client */
+ client->is_synchronous = 0;
+
+ /* Add this client to linked list. */
+ listnode_add(zebrad.client_list, client);
+
+ struct frr_pthread_attr zclient_pthr_attrs = {
+ .id = frr_pthread_get_id(),
+ .start = frr_pthread_attr_default.start,
+ .stop = frr_pthread_attr_default.stop
+ };
+ client->pthread =
+ frr_pthread_new(&zclient_pthr_attrs, "Zebra API client thread");
+
+ zebra_vrf_update_all(client);
+
+ /* start read loop */
+ zserv_client_event(client, ZSERV_CLIENT_READ);
+
+ /* call callbacks */
+ hook_call(zserv_client_connect, client);
+
+ /* start pthread */
+ frr_pthread_run(client->pthread, NULL);
+}
+
+/*
+ * Accept socket connection.
+ */
+static int zserv_accept(struct thread *thread)
{
int accept_sock;
int client_sock;
accept_sock = THREAD_FD(thread);
/* Reregister myself. */
- thread_add_read(zebrad.master, zebra_accept, NULL, accept_sock, NULL);
+ zserv_event(NULL, ZSERV_ACCEPT);
len = sizeof(struct sockaddr_in);
client_sock = accept(accept_sock, (struct sockaddr *)&client, &len);
set_nonblocking(client_sock);
/* Create new zebra client. */
- zebra_client_create(client_sock);
+ zserv_client_create(client_sock);
return 0;
}
-/* Make zebra server socket, wiping any existing one (see bug #403). */
-void zebra_zserv_socket_init(char *path)
+void zserv_start(char *path)
{
int ret;
- int sock;
mode_t old_mask;
struct sockaddr_storage sa;
socklen_t sa_len;
old_mask = umask(0077);
/* Make UNIX domain socket. */
- sock = socket(sa.ss_family, SOCK_STREAM, 0);
- if (sock < 0) {
+ zebrad.sock = socket(sa.ss_family, SOCK_STREAM, 0);
+ if (zebrad.sock < 0) {
zlog_warn("Can't create zserv socket: %s",
safe_strerror(errno));
zlog_warn(
}
if (sa.ss_family != AF_UNIX) {
- sockopt_reuseaddr(sock);
- sockopt_reuseport(sock);
+ sockopt_reuseaddr(zebrad.sock);
+ sockopt_reuseport(zebrad.sock);
} else {
struct sockaddr_un *suna = (struct sockaddr_un *)&sa;
if (suna->sun_path[0])
}
zserv_privs.change(ZPRIVS_RAISE);
- setsockopt_so_recvbuf(sock, 1048576);
- setsockopt_so_sendbuf(sock, 1048576);
+ setsockopt_so_recvbuf(zebrad.sock, 1048576);
+ setsockopt_so_sendbuf(zebrad.sock, 1048576);
zserv_privs.change(ZPRIVS_LOWER);
if (sa.ss_family != AF_UNIX && zserv_privs.change(ZPRIVS_RAISE))
zlog_err("Can't raise privileges");
- ret = bind(sock, (struct sockaddr *)&sa, sa_len);
+ ret = bind(zebrad.sock, (struct sockaddr *)&sa, sa_len);
if (ret < 0) {
zlog_warn("Can't bind zserv socket on %s: %s", path,
safe_strerror(errno));
zlog_warn(
"zebra can't provide full functionality due to above error");
- close(sock);
+ close(zebrad.sock);
+ zebrad.sock = -1;
return;
}
if (sa.ss_family != AF_UNIX && zserv_privs.change(ZPRIVS_LOWER))
zlog_err("Can't lower privileges");
- ret = listen(sock, 5);
+ ret = listen(zebrad.sock, 5);
if (ret < 0) {
zlog_warn("Can't listen to zserv socket %s: %s", path,
safe_strerror(errno));
zlog_warn(
"zebra can't provide full functionality due to above error");
- close(sock);
+ close(zebrad.sock);
+ zebrad.sock = -1;
return;
}
umask(old_mask);
- thread_add_read(zebrad.master, zebra_accept, NULL, sock, NULL);
+ zserv_event(NULL, ZSERV_ACCEPT);
+}
+
+void zserv_event(struct zserv *client, enum zserv_event event)
+{
+ switch (event) {
+ case ZSERV_ACCEPT:
+ thread_add_read(zebrad.master, zserv_accept, NULL, zebrad.sock,
+ NULL);
+ break;
+ case ZSERV_PROCESS_MESSAGES:
+ thread_add_event(zebrad.master, zserv_process_messages, client,
+ 0, NULL);
+ break;
+ case ZSERV_HANDLE_CLOSE:
+ thread_add_event(zebrad.master, zserv_handle_client_close,
+ client, 0, NULL);
+ }
}
+
+/* General purpose ---------------------------------------------------------- */
+
#define ZEBRA_TIME_BUF 32
static char *zserv_time_buf(time_t *time1, char *buf, int buflen)
{
{
char cbuf[ZEBRA_TIME_BUF], rbuf[ZEBRA_TIME_BUF];
char wbuf[ZEBRA_TIME_BUF], nhbuf[ZEBRA_TIME_BUF], mbuf[ZEBRA_TIME_BUF];
+ time_t connect_time, last_read_time, last_write_time;
+ uint16_t last_read_cmd, last_write_cmd;
vty_out(vty, "Client: %s", zebra_route_string(client->proto));
if (client->instance)
vty_out(vty, "FD: %d \n", client->sock);
vty_out(vty, "Route Table ID: %d \n", client->rtm_table);
+ connect_time = (time_t) atomic_load_explicit(&client->connect_time,
+ memory_order_relaxed);
+
vty_out(vty, "Connect Time: %s \n",
- zserv_time_buf(&client->connect_time, cbuf, ZEBRA_TIME_BUF));
+ zserv_time_buf(&connect_time, cbuf, ZEBRA_TIME_BUF));
if (client->nh_reg_time) {
vty_out(vty, "Nexthop Registry Time: %s \n",
zserv_time_buf(&client->nh_reg_time, nhbuf,
} else
vty_out(vty, "Not registered for Nexthop Updates\n");
+ last_read_time = (time_t)atomic_load_explicit(&client->last_read_time,
+ memory_order_relaxed);
+ last_write_time = (time_t)atomic_load_explicit(&client->last_write_time,
+ memory_order_relaxed);
+
+ last_read_cmd = atomic_load_explicit(&client->last_read_cmd,
+ memory_order_relaxed);
+ last_write_cmd = atomic_load_explicit(&client->last_write_cmd,
+ memory_order_relaxed);
+
vty_out(vty, "Last Msg Rx Time: %s \n",
- zserv_time_buf(&client->last_read_time, rbuf, ZEBRA_TIME_BUF));
+ zserv_time_buf(&last_read_time, rbuf, ZEBRA_TIME_BUF));
vty_out(vty, "Last Msg Tx Time: %s \n",
- zserv_time_buf(&client->last_write_time, wbuf, ZEBRA_TIME_BUF));
- if (client->last_read_time)
+ zserv_time_buf(&last_write_time, wbuf, ZEBRA_TIME_BUF));
+ if (last_read_cmd)
vty_out(vty, "Last Rcvd Cmd: %s \n",
- zserv_command_string(client->last_read_cmd));
- if (client->last_write_time)
+ zserv_command_string(last_read_cmd));
+ if (last_write_cmd)
vty_out(vty, "Last Sent Cmd: %s \n",
- zserv_command_string(client->last_write_cmd));
+ zserv_command_string(last_write_cmd));
vty_out(vty, "\n");
vty_out(vty, "Type Add Update Del \n");
{
char cbuf[ZEBRA_TIME_BUF], rbuf[ZEBRA_TIME_BUF];
char wbuf[ZEBRA_TIME_BUF];
+ time_t connect_time, last_read_time, last_write_time;
+
+ connect_time = (time_t)atomic_load_explicit(&client->connect_time,
+ memory_order_relaxed);
+ last_read_time = (time_t)atomic_load_explicit(&client->last_read_time,
+ memory_order_relaxed);
+ last_write_time = (time_t)atomic_load_explicit(&client->last_write_time,
+ memory_order_relaxed);
vty_out(vty, "%-8s%12s %12s%12s%8d/%-8d%8d/%-8d\n",
zebra_route_string(client->proto),
- zserv_time_buf(&client->connect_time, cbuf, ZEBRA_TIME_BUF),
- zserv_time_buf(&client->last_read_time, rbuf, ZEBRA_TIME_BUF),
- zserv_time_buf(&client->last_write_time, wbuf, ZEBRA_TIME_BUF),
+ zserv_time_buf(&connect_time, cbuf, ZEBRA_TIME_BUF),
+ zserv_time_buf(&last_read_time, rbuf, ZEBRA_TIME_BUF),
+ zserv_time_buf(&last_write_time, wbuf, ZEBRA_TIME_BUF),
client->v4_route_add_cnt + client->v4_route_upd8_cnt,
client->v4_route_del_cnt,
client->v6_route_add_cnt + client->v6_route_upd8_cnt,
client->v6_route_del_cnt);
}
-struct zserv *zebra_find_client(uint8_t proto, unsigned short instance)
+struct zserv *zserv_find_client(uint8_t proto, unsigned short instance)
{
struct listnode *node, *nnode;
struct zserv *client;
struct thread t;
zebra_client_create(-1);
- client = zebrad.client_list->head->data;
+
+ frr_pthread_stop(client->pthread, NULL);
+ frr_pthread_destroy(client->pthread);
+ client->pthread = NULL;
+
t.arg = client;
fd = open(input, O_RDONLY | O_NONBLOCK);
t.u.fd = fd;
- zebra_client_read(&t);
+ zserv_read(&t);
close(fd);
}
{
/* Client list init. */
zebrad.client_list = list_new();
- zebrad.client_list->del = (void (*)(void *))zebra_client_free;
+ zebrad.client_list->del = (void (*)(void *)) zserv_client_free;
+
+ /* Misc init. */
+ zebrad.sock = -1;
install_element(ENABLE_NODE, &show_zebra_client_cmd);
install_element(ENABLE_NODE, &show_zebra_client_summary_cmd);