]> git.proxmox.com Git - mirror_frr.git/commitdiff
bgpd: move packet writes into dedicated pthread
authorQuentin Young <qlyoung@cumulusnetworks.com>
Mon, 6 Feb 2017 23:39:06 +0000 (23:39 +0000)
committerQuentin Young <qlyoung@cumulusnetworks.com>
Thu, 30 Nov 2017 21:17:57 +0000 (16:17 -0500)
* BGP_WRITE_ON() removed
* BGP_WRITE_OFF() removed
* peer_writes_on() added
* peer_writes_off() added
* bgp_write_proceed_actions() removed

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
bgpd/bgp_fsm.c
bgpd/bgp_fsm.h
bgpd/bgp_main.c
bgpd/bgp_packet.c
bgpd/bgp_packet.h
bgpd/bgp_updgrp.c
bgpd/bgp_updgrp.h
bgpd/bgp_updgrp_adv.c
bgpd/bgp_updgrp_packet.c
bgpd/bgpd.c
bgpd/bgpd.h

index 8de7e970de0fe0022cb963c5a4a20f3e59f87c2c..bc4f8272f3615b8054245ab1dfcceae4af0af9a1 100644 (file)
@@ -125,9 +125,9 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)
                           from_peer->host, from_peer, from_peer->fd, peer,
                           peer->fd);
 
-       BGP_WRITE_OFF(peer->t_write);
+       peer_writes_off(peer);
        BGP_READ_OFF(peer->t_read);
-       BGP_WRITE_OFF(from_peer->t_write);
+       peer_writes_off(from_peer);
        BGP_READ_OFF(from_peer->t_read);
 
        BGP_TIMER_OFF(peer->t_routeadv);
@@ -137,8 +137,18 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)
        peer->fd = from_peer->fd;
        from_peer->fd = fd;
        stream_reset(peer->ibuf);
-       stream_fifo_clean(peer->obuf);
-       stream_fifo_clean(from_peer->obuf);
+
+       pthread_mutex_lock(&peer->obuf_mtx);
+       {
+               stream_fifo_clean(peer->obuf);
+       }
+       pthread_mutex_unlock(&peer->obuf_mtx);
+
+       pthread_mutex_lock(&from_peer->obuf_mtx);
+       {
+               stream_fifo_clean(from_peer->obuf);
+       }
+       pthread_mutex_unlock(&from_peer->obuf_mtx);
 
        peer->as = from_peer->as;
        peer->v_holdtime = from_peer->v_holdtime;
@@ -217,7 +227,7 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)
        }
 
        BGP_READ_ON(peer->t_read, bgp_read, peer->fd);
-       BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
+       peer_writes_on(peer);
 
        if (from_peer)
                peer_xfer_stats(peer, from_peer);
@@ -433,8 +443,6 @@ int bgp_routeadv_timer(struct thread *thread)
 
        peer->synctime = bgp_clock();
 
-       BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
-
        /* MRAI timer will be started again when FIFO is built, no need to
         * do it here.
         */
@@ -640,7 +648,6 @@ void bgp_adjust_routeadv(struct peer *peer)
                        BGP_TIMER_OFF(peer->t_routeadv);
 
                peer->synctime = bgp_clock();
-               BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
                return;
        }
 
@@ -956,6 +963,9 @@ int bgp_stop(struct peer *peer)
        char orf_name[BUFSIZ];
        int ret = 0;
 
+       // immediately remove from threads
+       peer_writes_off(peer);
+
        if (peer_dynamic_neighbor(peer)
            && !(CHECK_FLAG(peer->flags, PEER_FLAG_DELETE))) {
                if (bgp_debug_neighbor_events(peer))
@@ -1037,7 +1047,7 @@ int bgp_stop(struct peer *peer)
 
        /* Stop read and write threads when exists. */
        BGP_READ_OFF(peer->t_read);
-       BGP_WRITE_OFF(peer->t_write);
+       peer_writes_off(peer);
 
        /* Stop all timers. */
        BGP_TIMER_OFF(peer->t_start);
@@ -1054,8 +1064,13 @@ int bgp_stop(struct peer *peer)
                stream_reset(peer->ibuf);
        if (peer->work)
                stream_reset(peer->work);
-       if (peer->obuf)
-               stream_fifo_clean(peer->obuf);
+
+       pthread_mutex_lock(&peer->obuf_mtx);
+       {
+               if (peer->obuf)
+                       stream_fifo_clean(peer->obuf);
+       }
+       pthread_mutex_unlock(&peer->obuf_mtx);
 
        /* Close of file descriptor. */
        if (peer->fd >= 0) {
@@ -1173,6 +1188,8 @@ static int bgp_connect_success(struct peer *peer)
                return -1;
        }
 
+       peer_writes_on(peer);
+
        if (bgp_getsockname(peer) < 0) {
                zlog_err("%s: bgp_getsockname(): failed for peer %s, fd %d",
                         __FUNCTION__, peer->host, peer->fd);
@@ -1313,7 +1330,7 @@ int bgp_start(struct peer *peer)
                        return -1;
                }
                BGP_READ_ON(peer->t_read, bgp_read, peer->fd);
-               BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
+               peer_writes_on(peer);
                break;
        }
        return 0;
index 51d5d7aaa84273780a69f54940b567d5697307bb..a7abfdb2f4964d9389ce81f475546f85d662cf56 100644 (file)
                        THREAD_READ_OFF(T);                                    \
        } while (0)
 
-#define BGP_WRITE_ON(T, F, V)                                                  \
-       do {                                                                   \
-               if ((peer)->status != Deleted)                                 \
-                       thread_add_write(bm->master, (F), (peer), (V), &(T));  \
-       } while (0)
-
-#define BGP_PEER_WRITE_ON(T, F, V, peer)                                       \
-       do {                                                                   \
-               if ((peer)->status != Deleted)                                 \
-                       thread_add_write(bm->master, (F), (peer), (V), &(T));  \
-       } while (0)
-
 #define BGP_WRITE_OFF(T)                                                       \
        do {                                                                   \
                if (T)                                                         \
index 1fac2936eb7d2e5ca25305fa66c5dcecded48863..b4ea45b3e0581d888ead9d75aa0b3180bcdaa765 100644 (file)
@@ -54,6 +54,7 @@
 #include "bgpd/bgp_debug.h"
 #include "bgpd/bgp_filter.h"
 #include "bgpd/bgp_zebra.h"
+#include "bgpd/bgp_packet.h"
 
 #ifdef ENABLE_BGP_VNC
 #include "bgpd/rfapi/rfapi_backend.h"
@@ -392,6 +393,9 @@ int main(int argc, char **argv)
        snprintf(bgpd_di.startinfo, sizeof(bgpd_di.startinfo), ", bgp@%s:%d",
                 (bm->address ? bm->address : "<all>"), bm->port);
 
+       pthread_t packet_writes;
+       pthread_create(&packet_writes, NULL, &peer_writes_start, NULL);
+
        frr_config_fork();
        frr_run(bm->master);
 
index a955b3512c9a96d1e754d557581879d24a06ef53..e27b416d071426f816e0f699d8c6c3560576b158 100644 (file)
@@ -19,6 +19,7 @@
  */
 
 #include <zebra.h>
+#include <sys/time.h>
 
 #include "thread.h"
 #include "stream.h"
 #include "bgpd/bgp_updgrp.h"
 #include "bgpd/bgp_label.h"
 
+/* Linked list of active peers */
+static pthread_mutex_t plist_mtx = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER;
+static struct list *plist;
+
+bool bgp_packet_writes_thread_run;
+
 /* Set up BGP packet marker and packet type. */
 int bgp_packet_set_marker(struct stream *s, u_char type)
 {
@@ -87,21 +95,40 @@ int bgp_packet_set_size(struct stream *s)
        return cp;
 }
 
-/* Add new packet to the peer. */
-void bgp_packet_add(struct peer *peer, struct stream *s)
+/**
+ * Push a packet onto the beginning of the peer's output queue.
+ * Must be externally synchronized around 'peer'.
+ */
+static void bgp_packet_add_unsafe(struct peer *peer, struct stream *s)
 {
        /* Add packet to the end of list. */
        stream_fifo_push(peer->obuf, s);
+       peer_writes_wake();
+}
+
+/*
+ * Push a packet onto the beginning of the peer's output queue.
+ * This function acquires the peer's write mutex before proceeding.
+ */
+static void bgp_packet_add(struct peer *peer, struct stream *s)
+{
+       pthread_mutex_lock(&peer->obuf_mtx);
+       bgp_packet_add_unsafe(peer, s);
+       pthread_mutex_unlock(&peer->obuf_mtx);
 }
 
-/* Free first packet. */
-static void bgp_packet_delete(struct peer *peer)
+/**
+ * Pop a packet off the end of the peer's output queue.
+ * Must be externally synchronized around 'peer'.
+ */
+static void bgp_packet_delete_unsafe(struct peer *peer)
 {
        stream_free(stream_fifo_pop(peer->obuf));
 }
 
+
 /* Check file descriptor whether connect is established. */
-int bgp_connect_check(struct peer *peer, int change_state)
+static int bgp_connect_check(struct peer *peer, int change_state)
 {
        int status;
        socklen_t slen;
@@ -109,7 +136,6 @@ int bgp_connect_check(struct peer *peer, int change_state)
 
        /* Anyway I have to reset read and write thread. */
        BGP_READ_OFF(peer->t_read);
-       BGP_WRITE_OFF(peer->t_write);
 
        /* Check file descriptor. */
        slen = sizeof(status);
@@ -176,7 +202,7 @@ static struct stream *bgp_update_packet_eor(struct peer *peer, afi_t afi,
        }
 
        bgp_packet_set_size(s);
-       bgp_packet_add(peer, s);
+       bgp_packet_add_unsafe(peer, s);
        return s;
 }
 
@@ -248,246 +274,21 @@ static struct stream *bgp_write_packet(struct peer *peer)
                }
 
 
-               /*
-                * Found a packet template to send, overwrite packet
-                * with appropriate
-                * attributes from peer and advance peer
-                */
-               s = bpacket_reformat_for_peer(next_pkt, paf);
-               bpacket_queue_advance_peer(paf);
-               return s;
-       }
-
-       return NULL;
-}
-
-/* The next action for the peer from a write perspective */
-static void bgp_write_proceed_actions(struct peer *peer)
-{
-       afi_t afi;
-       safi_t safi;
-       struct peer_af *paf;
-       struct bpacket *next_pkt;
-       int fullq_found = 0;
-       struct update_subgroup *subgrp;
-
-       if (stream_fifo_head(peer->obuf)) {
-               BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
-               return;
-       }
-
-       FOREACH_AFI_SAFI (afi, safi) {
-               paf = peer_af_find(peer, afi, safi);
-               if (!paf)
-                       continue;
-               subgrp = paf->subgroup;
-               if (!subgrp)
-                       continue;
-
-               next_pkt = paf->next_pkt_to_send;
-               if (next_pkt && next_pkt->buffer) {
-                       BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
-                       return;
-               }
-
-               /* No packets readily available for AFI/SAFI, are there
-                * subgroup packets
-                * that need to be generated? */
-               if (bpacket_queue_is_full(SUBGRP_INST(subgrp),
-                                         SUBGRP_PKTQ(subgrp)))
-                       fullq_found = 1;
-               else if (subgroup_packets_to_build(subgrp)) {
-                       BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
-                       return;
-               }
-
-               /* No packets to send, see if EOR is pending */
-               if (CHECK_FLAG(peer->cap, PEER_CAP_RESTART_RCV)) {
-                       if (!subgrp->t_coalesce && peer->afc_nego[afi][safi]
-                           && peer->synctime
-                           && !CHECK_FLAG(peer->af_sflags[afi][safi],
-                                          PEER_STATUS_EOR_SEND)
-                           && safi != SAFI_MPLS_VPN) {
-                               BGP_WRITE_ON(peer->t_write, bgp_write,
-                                            peer->fd);
-                               return;
-                       }
-               }
-       }
-       if (fullq_found) {
-               BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
-               return;
-       }
-}
-
-/* Write packet to the peer. */
-int bgp_write(struct thread *thread)
-{
-       struct peer *peer;
-       u_char type;
-       struct stream *s;
-       int num;
-       int update_last_write = 0;
-       unsigned int count = 0;
-       unsigned int oc = 0;
-
-       /* Yes first of all get peer pointer. */
-       peer = THREAD_ARG(thread);
-       peer->t_write = NULL;
-
-       /* For non-blocking IO check. */
-       if (peer->status == Connect) {
-               bgp_connect_check(peer, 1);
-               return 0;
-       }
-
-       s = bgp_write_packet(peer);
-       if (!s) {
-               bgp_write_proceed_actions(peer);
-               return 0;
-       }
-
-       sockopt_cork(peer->fd, 1);
-
-       oc = peer->update_out;
-
-       /* Nonblocking write until TCP output buffer is full.  */
-       do {
-               int writenum;
-
-               /* Number of bytes to be sent.  */
-               writenum = stream_get_endp(s) - stream_get_getp(s);
-
-               /* Call write() system call.  */
-               num = write(peer->fd, STREAM_PNT(s), writenum);
-               if (num < 0) {
-                       /* write failed either retry needed or error */
-                       if (ERRNO_IO_RETRY(errno))
-                               break;
-
-                       BGP_EVENT_ADD(peer, TCP_fatal_error);
-                       return 0;
-               }
-
-               if (num != writenum) {
-                       /* Partial write */
-                       stream_forward_getp(s, num);
-                       break;
-               }
-
-               /* Retrieve BGP packet type. */
-               stream_set_getp(s, BGP_MARKER_SIZE + 2);
-               type = stream_getc(s);
-
-               switch (type) {
-               case BGP_MSG_OPEN:
-                       peer->open_out++;
-                       break;
-               case BGP_MSG_UPDATE:
-                       peer->update_out++;
-                       break;
-               case BGP_MSG_NOTIFY:
-                       peer->notify_out++;
-                       /* Double start timer. */
-                       peer->v_start *= 2;
-
-                       /* Overflow check. */
-                       if (peer->v_start >= (60 * 2))
-                               peer->v_start = (60 * 2);
-
-                       /* Flush any existing events */
-                       BGP_EVENT_ADD(peer, BGP_Stop);
-                       goto done;
-
-               case BGP_MSG_KEEPALIVE:
-                       peer->keepalive_out++;
-                       break;
-               case BGP_MSG_ROUTE_REFRESH_NEW:
-               case BGP_MSG_ROUTE_REFRESH_OLD:
-                       peer->refresh_out++;
-                       break;
-               case BGP_MSG_CAPABILITY:
-                       peer->dynamic_cap_out++;
-                       break;
+                       /* Found a packet template to send, overwrite packet
+                        * with appropriate
+                        * attributes from peer and advance peer */
+                       s = bpacket_reformat_for_peer(next_pkt, paf);
+                       bgp_packet_add_unsafe(peer, s);
+                       bpacket_queue_advance_peer(paf);
+                       return s;
                }
 
-               /* OK we send packet so delete it. */
-               bgp_packet_delete(peer);
-               update_last_write = 1;
-       } while (++count < peer->bgp->wpkt_quanta
-                && (s = bgp_write_packet(peer)) != NULL);
-
-       bgp_write_proceed_actions(peer);
-
-done:
-       /* Update last_update if UPDATEs were written. */
-       if (peer->update_out > oc)
-               peer->last_update = bgp_clock();
-
-       /* If we TXed any flavor of packet update last_write */
-       if (update_last_write)
-               peer->last_write = bgp_clock();
-
-       sockopt_cork(peer->fd, 0);
-       return 0;
-}
-
-/* This is only for sending NOTIFICATION message to neighbor. */
-static int bgp_write_notify(struct peer *peer)
-{
-       int ret, val;
-       u_char type;
-       struct stream *s;
-
-       /* There should be at least one packet. */
-       s = stream_fifo_head(peer->obuf);
-       if (!s)
-               return 0;
-       assert(stream_get_endp(s) >= BGP_HEADER_SIZE);
-
-       /* Stop collecting data within the socket */
-       sockopt_cork(peer->fd, 0);
-
-       /* socket is in nonblocking mode, if we can't deliver the NOTIFY, well,
-        * we only care about getting a clean shutdown at this point. */
-       ret = write(peer->fd, STREAM_DATA(s), stream_get_endp(s));
-
-       /* only connection reset/close gets counted as TCP_fatal_error, failure
-        * to write the entire NOTIFY doesn't get different FSM treatment */
-       if (ret <= 0) {
-               BGP_EVENT_ADD(peer, TCP_fatal_error);
-               return 0;
-       }
-
-       /* Disable Nagle, make NOTIFY packet go out right away */
-       val = 1;
-       (void)setsockopt(peer->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&val,
-                        sizeof(val));
-
-       /* Retrieve BGP packet type. */
-       stream_set_getp(s, BGP_MARKER_SIZE + 2);
-       type = stream_getc(s);
-
-       assert(type == BGP_MSG_NOTIFY);
-
-       /* Type should be notify. */
-       peer->notify_out++;
-
-       /* Double start timer. */
-       peer->v_start *= 2;
-
-       /* Overflow check. */
-       if (peer->v_start >= (60 * 2))
-               peer->v_start = (60 * 2);
-
-       /* Handle Graceful Restart case where the state changes to
-          Connect instead of Idle */
-       BGP_EVENT_ADD(peer, BGP_Stop);
-
-       return 0;
+       return NULL;
 }
 
-/* Make keepalive packet and send it to the peer. */
+/*
+ * Creates a BGP Keepalive packet and appends it to the peer's output queue.
+ */
 void bgp_keepalive_send(struct peer *peer)
 {
        struct stream *s;
@@ -508,11 +309,12 @@ void bgp_keepalive_send(struct peer *peer)
 
        /* Add packet to the peer. */
        bgp_packet_add(peer, s);
-
-       BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
 }
 
-/* Make open packet and send it to the peer. */
+/*
+ * Creates a BGP Open packet and appends it to the peer's output queue.
+ * Sets capabilities as necessary.
+ */
 void bgp_open_send(struct peer *peer)
 {
        struct stream *s;
@@ -560,11 +362,20 @@ void bgp_open_send(struct peer *peer)
 
        /* Add packet to the peer. */
        bgp_packet_add(peer, s);
-
-       BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
 }
 
-/* Send BGP notify packet with data potion. */
+/*
+ * Creates a BGP Notify and appends it to the peer's output queue.
+ *
+ * This function awakens the write thread to ensure the packet
+ * gets out ASAP.
+ *
+ * @param peer
+ * @param code      BGP error code
+ * @param sub_code  BGP error subcode
+ * @param data      Data portion
+ * @param datalen   length of data portion
+ */
 void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,
                               u_char *data, size_t datalen)
 {
@@ -574,7 +385,7 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,
        /* Allocate new stream. */
        s = stream_new(BGP_MAX_PACKET_SIZE);
 
-       /* Make nitify packet. */
+       /* Make notify packet. */
        bgp_packet_set_marker(s, BGP_MSG_NOTIFY);
 
        /* Set notify packet values. */
@@ -589,8 +400,9 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,
        length = bgp_packet_set_size(s);
 
        /* Add packet to the peer. */
+       pthread_mutex_lock(&peer->obuf_mtx);
        stream_fifo_clean(peer->obuf);
-       bgp_packet_add(peer, s);
+       pthread_mutex_unlock(&peer->obuf_mtx);
 
        /* For debug */
        {
@@ -641,19 +453,37 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,
        } else
                peer->last_reset = PEER_DOWN_NOTIFY_SEND;
 
-       /* Call immediately. */
-       BGP_WRITE_OFF(peer->t_write);
-
-       bgp_write_notify(peer);
+       /* Add packet to peer's output queue */
+       bgp_packet_add(peer, s);
+       /* Wake up the write thread to get the notify out ASAP */
+       peer_writes_wake();
 }
 
-/* Send BGP notify packet. */
+/*
+ * Creates a BGP Notify and appends it to the peer's output queue.
+ *
+ * This function awakens the write thread to ensure the packet
+ * gets out ASAP.
+ *
+ * @param peer
+ * @param code      BGP error code
+ * @param sub_code  BGP error subcode
+ */
 void bgp_notify_send(struct peer *peer, u_char code, u_char sub_code)
 {
        bgp_notify_send_with_data(peer, code, sub_code, NULL, 0);
 }
 
-/* Send route refresh message to the peer. */
+/*
+ * Creates BGP Route Refresh packet and appends it to the peer's output queue.
+ *
+ * @param peer
+ * @param afi               Address Family Identifier
+ * @param safi              Subsequent Address Family Identifier
+ * @param orf_type          Outbound Route Filtering type
+ * @param when_to_refresh   Whether to refresh immediately or defer
+ * @param remove            Whether to remove ORF for specified AFI/SAFI
+ */
 void bgp_route_refresh_send(struct peer *peer, afi_t afi, safi_t safi,
                            u_char orf_type, u_char when_to_refresh, int remove)
 {
@@ -741,11 +571,17 @@ void bgp_route_refresh_send(struct peer *peer, afi_t afi, safi_t safi,
 
        /* Add packet to the peer. */
        bgp_packet_add(peer, s);
-
-       BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
 }
 
-/* Send capability message to the peer. */
+/*
+ * Create a BGP Capability packet and append it to the peer's output queue.
+ *
+ * @param peer
+ * @param afi              Address Family Identifier
+ * @param safi             Subsequent Address Family Identifier
+ * @param capability_code  BGP Capability Code
+ * @param action           Set or Remove capability
+ */
 void bgp_capability_send(struct peer *peer, afi_t afi, safi_t safi,
                         int capability_code, int action)
 {
@@ -784,8 +620,6 @@ void bgp_capability_send(struct peer *peer, afi_t afi, safi_t safi,
 
        /* Add packet to the peer. */
        bgp_packet_add(peer, s);
-
-       BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
 }
 
 /* RFC1771 6.8 Connection collision detection. */
@@ -2340,3 +2174,226 @@ done:
 
        return 0;
 }
+
+/* ------------- write thread ------------------ */
+
+/**
+ * Flush peer output buffer.
+ *
+ * This function pops packets off of peer->obuf and writes them to peer->fd.
+ * The amount of packets written is equal to the minimum of peer->wpkt_quanta
+ * and the number of packets on the output buffer.
+ *
+ * If write() returns an error, the appropriate FSM event is generated.
+ *
+ * The return value is equal to the number of packets written
+ * (which may be zero).
+ */
+static int bgp_write(struct peer *peer)
+{
+       u_char type;
+       struct stream *s;
+       int num;
+       int update_last_write = 0;
+       unsigned int count = 0;
+       unsigned int oc = 0;
+
+       /* For non-blocking IO check. */
+       if (peer->status == Connect) {
+               bgp_connect_check(peer, 1);
+               return 0;
+       }
+
+       /* Write packets. The number of packets written is the value of
+        * bgp->wpkt_quanta or the size of the output buffer, whichever is
+        * smaller.*/
+       while (count < peer->bgp->wpkt_quanta
+              && (s = bgp_write_packet(peer)) != NULL) {
+               int writenum;
+               do { // write a full packet, or return on error
+                       writenum = stream_get_endp(s) - stream_get_getp(s);
+                       num = write(peer->fd, STREAM_PNT(s), writenum);
+
+                       if (num < 0) {
+                               if (ERRNO_IO_RETRY(errno))
+                                       continue;
+
+                               BGP_EVENT_ADD(peer, TCP_fatal_error);
+                               goto done;
+                       } else if (num != writenum) // incomplete write
+                               stream_forward_getp(s, num);
+
+               } while (num != writenum);
+
+               /* Retrieve BGP packet type. */
+               stream_set_getp(s, BGP_MARKER_SIZE + 2);
+               type = stream_getc(s);
+
+               switch (type) {
+               case BGP_MSG_OPEN:
+                       peer->open_out++;
+                       break;
+               case BGP_MSG_UPDATE:
+                       peer->update_out++;
+                       break;
+               case BGP_MSG_NOTIFY:
+                       peer->notify_out++;
+                       /* Double start timer. */
+                       peer->v_start *= 2;
+
+                       /* Overflow check. */
+                       if (peer->v_start >= (60 * 2))
+                               peer->v_start = (60 * 2);
+
+                       /* Handle Graceful Restart case where the state changes
+                          to
+                          Connect instead of Idle */
+                       /* Flush any existing events */
+                       BGP_EVENT_ADD(peer, BGP_Stop);
+                       goto done;
+
+               case BGP_MSG_KEEPALIVE:
+                       peer->keepalive_out++;
+                       break;
+               case BGP_MSG_ROUTE_REFRESH_NEW:
+               case BGP_MSG_ROUTE_REFRESH_OLD:
+                       peer->refresh_out++;
+                       break;
+               case BGP_MSG_CAPABILITY:
+                       peer->dynamic_cap_out++;
+                       break;
+               }
+
+               count++;
+               /* OK we send packet so delete it. */
+               bgp_packet_delete_unsafe(peer);
+               update_last_write = 1;
+       }
+
+done : {
+       /* Update last_update if UPDATEs were written. */
+       if (peer->update_out > oc)
+               peer->last_update = bgp_clock();
+
+       /* If we TXed any flavor of packet update last_write */
+       if (update_last_write)
+               peer->last_write = bgp_clock();
+}
+
+       return count;
+}
+
+static void cleanup_handler(void *arg)
+{
+       if (plist)
+               list_delete(plist);
+
+       plist = NULL;
+
+       pthread_mutex_unlock(&plist_mtx);
+}
+
+/**
+ * Entry function for peer packet flushing pthread.
+ *
+ * The plist must be initialized before calling this.
+ */
+void *peer_writes_start(void *arg)
+{
+       struct timeval currtime = {0, 0};
+       struct timeval sleeptime = {0, 500};
+       struct timespec next_update = {0, 0};
+
+       // initialize
+       pthread_mutex_lock(&plist_mtx);
+       plist = list_new();
+
+       struct listnode *ln;
+       struct peer *peer;
+
+       pthread_cleanup_push(&cleanup_handler, NULL);
+
+       bgp_packet_writes_thread_run = true;
+
+       while (bgp_packet_writes_thread_run) { // wait around until next update
+                                              // time
+               if (plist->count > 0)
+                       pthread_cond_timedwait(&write_cond, &plist_mtx,
+                                              &next_update);
+               else // wait around until we have some peers
+                       while (plist->count == 0
+                              && bgp_packet_writes_thread_run)
+                               pthread_cond_wait(&write_cond, &plist_mtx);
+
+               for (ALL_LIST_ELEMENTS_RO(plist, ln, peer)) {
+                       pthread_mutex_lock(&peer->obuf_mtx);
+                       {
+                               bgp_write(peer);
+                       }
+                       pthread_mutex_unlock(&peer->obuf_mtx);
+               }
+
+               gettimeofday(&currtime, NULL);
+               timeradd(&currtime, &sleeptime, &currtime);
+               TIMEVAL_TO_TIMESPEC(&currtime, &next_update);
+       }
+
+       // clean up
+       pthread_cleanup_pop(1);
+
+       return NULL;
+}
+
+/**
+ * Turns on packet writing for a peer.
+ */
+void peer_writes_on(struct peer *peer)
+{
+       if (peer->status == Deleted)
+               return;
+
+       pthread_mutex_lock(&plist_mtx);
+       {
+               struct listnode *ln, *nn;
+               struct peer *p;
+
+               // make sure this peer isn't already in the list
+               for (ALL_LIST_ELEMENTS(plist, ln, nn, p))
+                       if (p == peer) {
+                               pthread_mutex_unlock(&plist_mtx);
+                               return;
+                       }
+
+               peer_lock(peer);
+               listnode_add(plist, peer);
+       }
+       pthread_mutex_unlock(&plist_mtx);
+       peer_writes_wake();
+}
+
+/**
+ * Turns off packet writing for a peer.
+ */
+void peer_writes_off(struct peer *peer)
+{
+       struct listnode *ln, *nn;
+       struct peer *p;
+       pthread_mutex_lock(&plist_mtx);
+       {
+               for (ALL_LIST_ELEMENTS(plist, ln, nn, p))
+                       if (p == peer) {
+                               list_delete_node(plist, ln);
+                               peer_unlock(peer);
+                               break;
+                       }
+       }
+       pthread_mutex_unlock(&plist_mtx);
+}
+
+/**
+ * Wakes up the write thread to do work.
+ */
+void peer_writes_wake()
+{
+       pthread_cond_signal(&write_cond);
+}
index 7bf498c37cfc2ba044f0cb2508ebc61df4cf4b30..c18e4a2ebd72ccded5cf29041a876746434bd074 100644 (file)
@@ -39,8 +39,6 @@
 
 /* Packet send and receive function prototypes. */
 extern int bgp_read(struct thread *);
-extern int bgp_write(struct thread *);
-extern int bgp_connect_check(struct peer *, int change_state);
 
 extern void bgp_keepalive_send(struct peer *);
 extern void bgp_open_send(struct peer *);
@@ -65,6 +63,13 @@ extern void bgp_check_update_delay(struct bgp *);
 
 extern int bgp_packet_set_marker(struct stream *s, u_char type);
 extern int bgp_packet_set_size(struct stream *s);
-extern void bgp_packet_add(struct peer *peer, struct stream *s);
+
+/* Control variable for write thread. */
+extern bool bgp_packet_writes_thread_run;
+
+extern void *peer_writes_start(void *arg);
+extern void peer_writes_on(struct peer *peer);
+extern void peer_writes_off(struct peer *peer);
+extern void peer_writes_wake(void);
 
 #endif /* _QUAGGA_BGP_PACKET_H */
index 8f6729060098032e7100ff74b8c070cd6672477a..585c6cebbce9f14ec9b27a14ca60f1c3b8ae4435 100644 (file)
@@ -1867,23 +1867,6 @@ void peer_af_announce_route(struct peer_af *paf, int combine)
                            subgrp->peer_count - 1);
 }
 
-void subgroup_trigger_write(struct update_subgroup *subgrp)
-{
-       struct peer_af *paf;
-
-#if 0
-  if (bgp_debug_update(NULL, NULL, subgrp->update_group, 0))
-    zlog_debug("u%llu:s%llu scheduling write thread for peers",
-               subgrp->update_group->id, subgrp->id);
-#endif
-       SUBGRP_FOREACH_PEER (subgrp, paf) {
-               if (paf->peer->status == Established) {
-                       BGP_PEER_WRITE_ON(paf->peer->t_write, bgp_write,
-                                         paf->peer->fd, paf->peer);
-               }
-       }
-}
-
 int update_group_clear_update_dbg(struct update_group *updgrp, void *arg)
 {
        UPDGRP_PEER_DBG_OFF(updgrp);
index 52a21679b895c40a9b398629a490fcaddb4b9e6a..a50bc05fedb5fdb19554d88f07028682190738df 100644 (file)
@@ -442,7 +442,6 @@ extern void bgp_adj_out_unset_subgroup(struct bgp_node *rn,
                                       char withdraw, u_int32_t addpath_tx_id);
 void subgroup_announce_table(struct update_subgroup *subgrp,
                             struct bgp_table *table);
-extern void subgroup_trigger_write(struct update_subgroup *subgrp);
 
 extern int update_group_clear_update_dbg(struct update_group *updgrp,
                                         void *arg);
index b4f18c9f5eb05fc4f334fa9d9fa62b844500a89d..1ec9915ee561bee39ff131f935a158ef23a9d278 100644 (file)
@@ -483,7 +483,6 @@ void bgp_adj_out_unset_subgroup(struct bgp_node *rn,
 {
        struct bgp_adj_out *adj;
        struct bgp_advertise *adv;
-       char trigger_write;
 
        if (DISABLE_BGP_ANNOUNCE)
                return;
@@ -501,20 +500,9 @@ void bgp_adj_out_unset_subgroup(struct bgp_node *rn,
                        adv->rn = rn;
                        adv->adj = adj;
 
-                       /* Note if we need to trigger a packet write */
-                       if (BGP_ADV_FIFO_EMPTY(&subgrp->sync->withdraw))
-                               trigger_write = 1;
-                       else
-                               trigger_write = 0;
-
                        /* Add to synchronization entry for withdraw
                         * announcement.  */
                        BGP_ADV_FIFO_ADD(&subgrp->sync->withdraw, &adv->fifo);
-
-                       /* Schedule packet write, if FIFO is getting its first
-                        * entry. */
-                       if (trigger_write)
-                               subgroup_trigger_write(subgrp);
                } else {
                        /* Remove myself from adjacency. */
                        BGP_ADJ_OUT_DEL(rn, adj);
index a35d814e477cc8c00f4473135aefcdecf61a57f6..77b3ce1937df5a59c27c0dd9d60ba2c5cbd9b6fb 100644 (file)
@@ -633,7 +633,6 @@ struct stream *bpacket_reformat_for_peer(struct bpacket *pkt,
                }
        }
 
-       bgp_packet_add(peer, s);
        return s;
 }
 
@@ -1149,7 +1148,6 @@ void subgroup_default_update_packet(struct update_subgroup *subgrp,
        bgp_packet_set_size(s);
 
        (void)bpacket_queue_add(SUBGRP_PKTQ(subgrp), s, &vecarr);
-       subgroup_trigger_write(subgrp);
 }
 
 void subgroup_default_withdraw_packet(struct update_subgroup *subgrp)
@@ -1242,7 +1240,6 @@ void subgroup_default_withdraw_packet(struct update_subgroup *subgrp)
        bgp_packet_set_size(s);
 
        (void)bpacket_queue_add(SUBGRP_PKTQ(subgrp), s, NULL);
-       subgroup_trigger_write(subgrp);
 }
 
 static void
index a4952be8a67a6ac66d0ebe12efd63094f221566c..cee73e2c43f3e7a5f09b02176f2a2ed92c2727eb 100644 (file)
@@ -990,7 +990,7 @@ static void peer_free(struct peer *peer)
         */
        bgp_timer_set(peer);
        BGP_READ_OFF(peer->t_read);
-       BGP_WRITE_OFF(peer->t_write);
+       peer_writes_off(peer);
        BGP_EVENT_FLUSH(peer);
 
        /* Free connected nexthop, if present */
@@ -1137,6 +1137,7 @@ struct peer *peer_new(struct bgp *bgp)
        /* Create buffers.  */
        peer->ibuf = stream_new(BGP_MAX_PACKET_SIZE);
        peer->obuf = stream_fifo_new();
+       pthread_mutex_init(&peer->obuf_mtx, NULL);
 
        /* We use a larger buffer for peer->work in the event that:
         * - We RX a BGP_UPDATE where the attributes alone are just
index 36bdaf0125f41712b0e5037ece7c1584e8fd54e0..f427c5d9f5d56a04b053350e3d4756bff1a56ba9 100644 (file)
@@ -22,6 +22,8 @@
 #define _QUAGGA_BGPD_H
 
 #include "qobj.h"
+#include <pthread.h>
+
 #include "lib/json.h"
 #include "vrf.h"
 #include "vty.h"
@@ -584,6 +586,7 @@ struct peer {
 
        /* Packet receive and send buffer. */
        struct stream *ibuf;
+       pthread_mutex_t obuf_mtx;
        struct stream_fifo *obuf;
        struct stream *work;