]> git.proxmox.com Git - mirror_frr.git/blobdiff - bgpd/bgp_packet.c
*: conform with COMMUNITY.md formatting rules, via 'make indent'
[mirror_frr.git] / bgpd / bgp_packet.c
index 5994df23ece07754002eeb11bff07fcfa3ca3d7d..cb702d80d1f1fd7f8afe97db90ece96fb56098ce 100644 (file)
@@ -1,4 +1,6 @@
 /* BGP packet management routine.
+ * Contains utility functions for constructing and consuming BGP messages.
+ * Copyright (C) 2017 Cumulus Networks
  * Copyright (C) 1999 Kunihiro Ishiguro
  *
  * This file is part of GNU Zebra.
@@ -34,7 +36,6 @@
 #include "plist.h"
 #include "queue.h"
 #include "filter.h"
-#include "lib/frr_pthread.h"
 
 #include "bgpd/bgpd.h"
 #include "bgpd/bgp_table.h"
 #include "bgpd/bgp_vty.h"
 #include "bgpd/bgp_updgrp.h"
 #include "bgpd/bgp_label.h"
+#include "bgpd/bgp_io.h"
+#include "bgpd/bgp_keepalives.h"
 
-/* Linked list of active peers */
-static pthread_mutex_t *plist_mtx;
-static pthread_cond_t *write_cond;
-static struct list *plist;
-
-/* periodically scheduled thread to generate update-group updates */
-static struct thread *t_generate_updgrp_packets;
-
-bool bgp_packet_writes_thread_run = false;
-
-/* Set up BGP packet marker and packet type. */
+/**
+ * Sets marker and type fields for a BGP message.
+ *
+ * @param s the stream containing the packet
+ * @param type the packet type
+ * @return the size of the stream
+ */
 int bgp_packet_set_marker(struct stream *s, u_char type)
 {
        int i;
@@ -86,8 +85,14 @@ int bgp_packet_set_marker(struct stream *s, u_char type)
        return stream_get_endp(s);
 }
 
-/* Set BGP packet header size entry.  If size is zero then use current
-   stream size. */
+/**
+ * Sets size field for a BGP message.
+ *
+ * Size field is set to the size of the stream passed.
+ *
+ * @param s the stream containing the packet
+ * @return the size of the stream
+ */
 int bgp_packet_set_size(struct stream *s)
 {
        int cp;
@@ -99,35 +104,15 @@ int bgp_packet_set_size(struct stream *s)
        return cp;
 }
 
-/**
- * Push a packet onto the beginning of the peer's output queue.
- * Must be externally synchronized around 'peer'.
- */
-static void bgp_packet_add_unsafe(struct peer *peer, struct stream *s)
-{
-       /* Add packet to the end of list. */
-       stream_fifo_push(peer->obuf, s);
-       peer_writes_wake();
-}
-
 /*
  * Push a packet onto the beginning of the peer's output queue.
  * This function acquires the peer's write mutex before proceeding.
  */
 static void bgp_packet_add(struct peer *peer, struct stream *s)
 {
-       pthread_mutex_lock(&peer->obuf_mtx);
-       bgp_packet_add_unsafe(peer, s);
-       pthread_mutex_unlock(&peer->obuf_mtx);
-}
-
-/**
- * Pop a packet off the end of the peer's output queue.
- * Must be externally synchronized around 'peer'.
- */
-static void bgp_packet_delete_unsafe(struct peer *peer)
-{
-       stream_free(stream_fifo_pop(peer->obuf));
+       pthread_mutex_lock(&peer->io_mtx);
+       stream_fifo_push(peer->obuf, s);
+       pthread_mutex_unlock(&peer->io_mtx);
 }
 
 static struct stream *bgp_update_packet_eor(struct peer *peer, afi_t afi,
@@ -172,45 +157,254 @@ static struct stream *bgp_update_packet_eor(struct peer *peer, afi_t afi,
        return s;
 }
 
-/* Get next packet to be written.  */
-static struct stream *bgp_write_packet(struct peer *peer)
+/* Called when there is a change in the EOR(implicit or explicit) status of a
+ * peer. Ends the update-delay if all expected peers are done with EORs. */
+void bgp_check_update_delay(struct bgp *bgp)
+{
+       struct listnode *node, *nnode;
+       struct peer *peer = NULL;
+
+       if (bgp_debug_neighbor_events(peer))
+               zlog_debug("Checking update delay, T: %d R: %d I:%d E: %d",
+                          bgp->established, bgp->restarted_peers,
+                          bgp->implicit_eors, bgp->explicit_eors);
+
+       if (bgp->established
+           <= bgp->restarted_peers + bgp->implicit_eors + bgp->explicit_eors) {
+               /*
+                * This is an extra sanity check to make sure we wait for all
+                * the eligible configured peers. This check is performed if
+                * establish wait timer is on, or establish wait option is not
+                * given with the update-delay command
+                */
+               if (bgp->t_establish_wait
+                   || (bgp->v_establish_wait == bgp->v_update_delay))
+                       for (ALL_LIST_ELEMENTS(bgp->peer, node, nnode, peer)) {
+                               if (CHECK_FLAG(peer->flags,
+                                              PEER_FLAG_CONFIG_NODE)
+                                   && !CHECK_FLAG(peer->flags,
+                                                  PEER_FLAG_SHUTDOWN)
+                                   && !peer->update_delay_over) {
+                                       if (bgp_debug_neighbor_events(peer))
+                                               zlog_debug(
+                                                       " Peer %s pending, continuing read-only mode",
+                                                       peer->host);
+                                       return;
+                               }
+                       }
+
+               zlog_info(
+                       "Update delay ended, restarted: %d, EORs implicit: %d, explicit: %d",
+                       bgp->restarted_peers, bgp->implicit_eors,
+                       bgp->explicit_eors);
+               bgp_update_delay_end(bgp);
+       }
+}
+
+/*
+ * Called if peer is known to have restarted. The restart-state bit in
+ * Graceful-Restart capability is used for that
+ */
+void bgp_update_restarted_peers(struct peer *peer)
+{
+       if (!bgp_update_delay_active(peer->bgp))
+               return; /* BGP update delay has ended */
+       if (peer->update_delay_over)
+               return; /* This peer has already been considered */
+
+       if (bgp_debug_neighbor_events(peer))
+               zlog_debug("Peer %s: Checking restarted", peer->host);
+
+       if (peer->status == Established) {
+               peer->update_delay_over = 1;
+               peer->bgp->restarted_peers++;
+               bgp_check_update_delay(peer->bgp);
+       }
+}
+
+/*
+ * Called as peer receives a keep-alive. Determines if this occurence can be
+ * taken as an implicit EOR for this peer.
+ * NOTE: The very first keep-alive after the Established state of a peer is
+ * considered implicit EOR for the update-delay purposes
+ */
+void bgp_update_implicit_eors(struct peer *peer)
 {
-       struct stream *s = NULL;
+       if (!bgp_update_delay_active(peer->bgp))
+               return; /* BGP update delay has ended */
+       if (peer->update_delay_over)
+               return; /* This peer has already been considered */
+
+       if (bgp_debug_neighbor_events(peer))
+               zlog_debug("Peer %s: Checking implicit EORs", peer->host);
+
+       if (peer->status == Established) {
+               peer->update_delay_over = 1;
+               peer->bgp->implicit_eors++;
+               bgp_check_update_delay(peer->bgp);
+       }
+}
+
+/*
+ * Should be called only when there is a change in the EOR_RECEIVED status
+ * for any afi/safi on a peer.
+ */
+static void bgp_update_explicit_eors(struct peer *peer)
+{
+       afi_t afi;
+       safi_t safi;
+
+       if (!bgp_update_delay_active(peer->bgp))
+               return; /* BGP update delay has ended */
+       if (peer->update_delay_over)
+               return; /* This peer has already been considered */
+
+       if (bgp_debug_neighbor_events(peer))
+               zlog_debug("Peer %s: Checking explicit EORs", peer->host);
+
+       for (afi = AFI_IP; afi < AFI_MAX; afi++)
+               for (safi = SAFI_UNICAST; safi < SAFI_MAX; safi++) {
+                       if (peer->afc_nego[afi][safi]
+                           && !CHECK_FLAG(peer->af_sflags[afi][safi],
+                                          PEER_STATUS_EOR_RECEIVED)) {
+                               if (bgp_debug_neighbor_events(peer))
+                                       zlog_debug(
+                                               "   afi %d safi %d didnt receive EOR",
+                                               afi, safi);
+                               return;
+                       }
+               }
+
+       peer->update_delay_over = 1;
+       peer->bgp->explicit_eors++;
+       bgp_check_update_delay(peer->bgp);
+}
+
+/**
+ * Frontend for NLRI parsing, to fan-out to AFI/SAFI specific parsers.
+ *
+ * mp_withdraw, if set, is used to nullify attr structure on most of the
+ * calling safi function and for evpn, passed as parameter
+ */
+int bgp_nlri_parse(struct peer *peer, struct attr *attr,
+                  struct bgp_nlri *packet, int mp_withdraw)
+{
+       switch (packet->safi) {
+       case SAFI_UNICAST:
+       case SAFI_MULTICAST:
+               return bgp_nlri_parse_ip(peer, mp_withdraw ? NULL : attr,
+                                        packet);
+       case SAFI_LABELED_UNICAST:
+               return bgp_nlri_parse_label(peer, mp_withdraw ? NULL : attr,
+                                           packet);
+       case SAFI_MPLS_VPN:
+               return bgp_nlri_parse_vpn(peer, mp_withdraw ? NULL : attr,
+                                         packet);
+       case SAFI_EVPN:
+               return bgp_nlri_parse_evpn(peer, attr, packet, mp_withdraw);
+       }
+       return -1;
+}
+
+/*
+ * Checks a variety of conditions to determine whether the peer needs to be
+ * rescheduled for packet generation again, and does so if necessary.
+ *
+ * @param peer to check for rescheduling
+ */
+static void bgp_write_proceed_actions(struct peer *peer)
+{
+       afi_t afi;
+       safi_t safi;
        struct peer_af *paf;
        struct bpacket *next_pkt;
+       struct update_subgroup *subgrp;
+
+       FOREACH_AFI_SAFI (afi, safi) {
+               paf = peer_af_find(peer, afi, safi);
+               if (!paf)
+                       continue;
+               subgrp = paf->subgroup;
+               if (!subgrp)
+                       continue;
+
+               next_pkt = paf->next_pkt_to_send;
+               if (next_pkt && next_pkt->buffer) {
+                       BGP_TIMER_ON(peer->t_generate_updgrp_packets,
+                                    bgp_generate_updgrp_packets, 0);
+                       return;
+               }
+
+               /* No packets readily available for AFI/SAFI, are there
+                * subgroup packets
+                * that need to be generated? */
+               if (bpacket_queue_is_full(SUBGRP_INST(subgrp),
+                                         SUBGRP_PKTQ(subgrp))
+                   || subgroup_packets_to_build(subgrp)) {
+                       BGP_TIMER_ON(peer->t_generate_updgrp_packets,
+                                    bgp_generate_updgrp_packets, 0);
+                       return;
+               }
+
+               /* No packets to send, see if EOR is pending */
+               if (CHECK_FLAG(peer->cap, PEER_CAP_RESTART_RCV)) {
+                       if (!subgrp->t_coalesce && peer->afc_nego[afi][safi]
+                           && peer->synctime
+                           && !CHECK_FLAG(peer->af_sflags[afi][safi],
+                                          PEER_STATUS_EOR_SEND)
+                           && safi != SAFI_MPLS_VPN) {
+                               BGP_TIMER_ON(peer->t_generate_updgrp_packets,
+                                            bgp_generate_updgrp_packets, 0);
+                               return;
+                       }
+               }
+       }
+}
+
+/*
+ * Generate advertisement information (withdraws, updates, EOR) from each
+ * update group a peer belongs to, encode this information into packets, and
+ * enqueue the packets onto the peer's output buffer.
+ */
+int bgp_generate_updgrp_packets(struct thread *thread)
+{
+       struct peer *peer = THREAD_ARG(thread);
+
+       struct stream *s;
+       struct peer_af *paf;
+       struct bpacket *next_pkt;
+       uint32_t wpq;
+       uint32_t generated = 0;
        afi_t afi;
        safi_t safi;
 
+       wpq = atomic_load_explicit(&peer->bgp->wpkt_quanta,
+                                  memory_order_relaxed);
+
        /*
         * The code beyond this part deals with update packets, proceed only
         * if peer is Established and updates are not on hold (as part of
         * update-delay post processing).
         */
        if (peer->status != Established)
-               return NULL;
+               return 0;
 
        if (peer->bgp && peer->bgp->main_peers_update_hold)
-               return NULL;
-
-       FOREACH_AFI_SAFI (afi, safi) {
-               paf = peer_af_find(peer, afi, safi);
-               if (!paf || !PAF_SUBGRP(paf))
-                       continue;
-               next_pkt = paf->next_pkt_to_send;
+               return 0;
 
-               /* Try to generate a packet for the peer if we are at
-                * the end of
-                * the list. Always try to push out WITHDRAWs first. */
-               if (!next_pkt || !next_pkt->buffer) {
-                       next_pkt = subgroup_withdraw_packet(PAF_SUBGRP(paf));
-                       if (!next_pkt || !next_pkt->buffer)
-                               subgroup_update_packet(PAF_SUBGRP(paf));
+       do {
+               s = NULL;
+               FOREACH_AFI_SAFI (afi, safi) {
+                       paf = peer_af_find(peer, afi, safi);
+                       if (!paf || !PAF_SUBGRP(paf))
+                               continue;
                        next_pkt = paf->next_pkt_to_send;
-               }
 
-                       /* Try to generate a packet for the peer if we are at
-                        * the end of
-                        * the list. Always try to push out WITHDRAWs first. */
+                       /*
+                        * Try to generate a packet for the peer if we are at
+                        * the end of the list. Always try to push out
+                        * WITHDRAWs first.
+                        */
                        if (!next_pkt || !next_pkt->buffer) {
                                next_pkt = subgroup_withdraw_packet(
                                        PAF_SUBGRP(paf));
@@ -219,15 +413,12 @@ static struct stream *bgp_write_packet(struct peer *peer)
                                next_pkt = paf->next_pkt_to_send;
                        }
 
-                       /* If we still don't have a packet to send to the peer,
-                        * then
-                        * try to find out out if we have to send eor or if not,
-                        * skip to
-                        * the next AFI, SAFI.
-                        * Don't send the EOR prematurely... if the subgroup's
-                        * coalesce
-                        * timer is running, the adjacency-out structure is not
-                        * created
+                       /*
+                        * If we still don't have a packet to send to the peer,
+                        * then try to find out out if we have to send eor or
+                        * if not, skip to the next AFI, SAFI. Don't send the
+                        * EOR prematurely; if the subgroup's coalesce timer is
+                        * running, the adjacency-out structure is not created
                         * yet.
                         */
                        if (!next_pkt || !next_pkt->buffer) {
@@ -245,46 +436,33 @@ static struct stream *bgp_write_packet(struct peer *peer)
                                                         PEER_STATUS_EOR_SEND);
 
                                                if ((s = bgp_update_packet_eor(
-                                                            peer, afi, safi)))
+                                                            peer, afi,
+                                                            safi))) {
                                                        bgp_packet_add(peer, s);
-
-                                               return s;
+                                               }
                                        }
                                }
+                               continue;
                        }
-                       continue;
-               }
 
 
-                       /* Found a packet template to send, overwrite packet
-                        * with appropriate
-                        * attributes from peer and advance peer */
+                       /* Found a packet template to send, overwrite
+                        * packet with appropriate attributes from peer
+                        * and advance peer */
                        s = bpacket_reformat_for_peer(next_pkt, paf);
                        bgp_packet_add(peer, s);
                        bpacket_queue_advance_peer(paf);
-                       return s;
                }
+       } while (s && (++generated < wpq));
 
-       return NULL;
-}
+       if (generated)
+               bgp_writes_on(peer);
 
-static int bgp_generate_updgrp_packets(struct thread *thread)
-{
-       struct listnode *ln;
-       struct peer *peer;
-       pthread_mutex_lock(plist_mtx);
-       {
-               for (ALL_LIST_ELEMENTS_RO(plist, ln, peer))
-                       while (bgp_write_packet(peer))
-                               ;
+       bgp_write_proceed_actions(peer);
 
-               t_generate_updgrp_packets = NULL;
-       }
-       pthread_mutex_unlock(plist_mtx);
        return 0;
 }
 
-
 /*
  * Creates a BGP Keepalive packet and appends it to the peer's output queue.
  */
@@ -308,6 +486,8 @@ void bgp_keepalive_send(struct peer *peer)
 
        /* Add packet to the peer. */
        bgp_packet_add(peer, s);
+
+       bgp_writes_on(peer);
 }
 
 /*
@@ -361,13 +541,101 @@ void bgp_open_send(struct peer *peer)
 
        /* Add packet to the peer. */
        bgp_packet_add(peer, s);
+
+       bgp_writes_on(peer);
+}
+
+/*
+ * Writes NOTIFICATION message directly to a peer socket without waiting for
+ * the I/O thread.
+ *
+ * There must be exactly one stream on the peer->obuf FIFO, and the data within
+ * this stream must match the format of a BGP NOTIFICATION message.
+ * Transmission is best-effort.
+ *
+ * @requires peer->io_mtx
+ * @param peer
+ * @return 0
+ */
+static int bgp_write_notify(struct peer *peer)
+{
+       int ret, val;
+       u_char type;
+       struct stream *s;
+
+       /* There should be at least one packet. */
+       s = stream_fifo_pop(peer->obuf);
+
+       if (!s)
+               return 0;
+
+       assert(stream_get_endp(s) >= BGP_HEADER_SIZE);
+
+       /* Stop collecting data within the socket */
+       sockopt_cork(peer->fd, 0);
+
+       /*
+        * socket is in nonblocking mode, if we can't deliver the NOTIFY, well,
+        * we only care about getting a clean shutdown at this point.
+        */
+       ret = write(peer->fd, STREAM_DATA(s), stream_get_endp(s));
+
+       /*
+        * only connection reset/close gets counted as TCP_fatal_error, failure
+        * to write the entire NOTIFY doesn't get different FSM treatment
+        */
+       if (ret <= 0) {
+               stream_free(s);
+               BGP_EVENT_ADD(peer, TCP_fatal_error);
+               return 0;
+       }
+
+       /* Disable Nagle, make NOTIFY packet go out right away */
+       val = 1;
+       (void)setsockopt(peer->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&val,
+                        sizeof(val));
+
+       /* Retrieve BGP packet type. */
+       stream_set_getp(s, BGP_MARKER_SIZE + 2);
+       type = stream_getc(s);
+
+       assert(type == BGP_MSG_NOTIFY);
+
+       /* Type should be notify. */
+       atomic_fetch_add_explicit(&peer->notify_out, 1, memory_order_relaxed);
+       peer->notify_out++;
+
+       /* Double start timer. */
+       peer->v_start *= 2;
+
+       /* Overflow check. */
+       if (peer->v_start >= (60 * 2))
+               peer->v_start = (60 * 2);
+
+       /*
+        * Handle Graceful Restart case where the state changes to
+        * Connect instead of Idle
+        */
+       BGP_EVENT_ADD(peer, BGP_Stop);
+
+       stream_free(s);
+
+       return 0;
 }
 
 /*
  * Creates a BGP Notify and appends it to the peer's output queue.
  *
- * This function awakens the write thread to ensure the packet
- * gets out ASAP.
+ * This function attempts to write the packet from the thread it is called
+ * from, to ensure the packet gets out ASAP.
+ *
+ * This function may be called from multiple threads. Since the function
+ * modifies I/O buffer(s) in the peer, these are locked for the duration of the
+ * call to prevent tampering from other threads.
+ *
+ * Delivery of the NOTIFICATION is attempted once and is best-effort. After
+ * return, the peer structure *must* be reset; no assumptions about session
+ * state are valid.
  *
  * @param peer
  * @param code      BGP error code
@@ -381,6 +649,10 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,
        struct stream *s;
        int length;
 
+       /* Lock I/O mutex to prevent other threads from pushing packets */
+       pthread_mutex_lock(&peer->io_mtx);
+       /* ============================================== */
+
        /* Allocate new stream. */
        s = stream_new(BGP_MAX_PACKET_SIZE);
 
@@ -398,10 +670,20 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,
        /* Set BGP packet length. */
        length = bgp_packet_set_size(s);
 
-       /* Add packet to the peer. */
-       pthread_mutex_lock(&peer->obuf_mtx);
+       /* wipe output buffer */
        stream_fifo_clean(peer->obuf);
-       pthread_mutex_unlock(&peer->obuf_mtx);
+
+       /*
+        * If possible, store last packet for debugging purposes. This check is
+        * in place because we are sometimes called with a doppelganger peer,
+        * who tends to have a plethora of fields nulled out.
+        */
+       if (peer->curr && peer->last_reset_cause_size) {
+               size_t packetsize = stream_get_endp(peer->curr);
+               assert(packetsize <= peer->last_reset_cause_size);
+               memcpy(peer->last_reset_cause, peer->curr->data, packetsize);
+               peer->last_reset_cause_size = packetsize;
+       }
 
        /* For debug */
        {
@@ -453,16 +735,19 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,
                peer->last_reset = PEER_DOWN_NOTIFY_SEND;
 
        /* Add packet to peer's output queue */
-       bgp_packet_add(peer, s);
-       /* Wake up the write thread to get the notify out ASAP */
-       peer_writes_wake();
+       stream_fifo_push(peer->obuf, s);
+
+       bgp_write_notify(peer);
+
+       /* ============================================== */
+       pthread_mutex_unlock(&peer->io_mtx);
 }
 
 /*
  * Creates a BGP Notify and appends it to the peer's output queue.
  *
- * This function awakens the write thread to ensure the packet
- * gets out ASAP.
+ * This function attempts to write the packet from the thread it is called
+ * from, to ensure the packet gets out ASAP.
  *
  * @param peer
  * @param code      BGP error code
@@ -570,6 +855,8 @@ void bgp_route_refresh_send(struct peer *peer, afi_t afi, safi_t safi,
 
        /* Add packet to the peer. */
        bgp_packet_add(peer, s);
+
+       bgp_writes_on(peer);
 }
 
 /*
@@ -619,6 +906,8 @@ void bgp_capability_send(struct peer *peer, afi_t afi, safi_t safi,
 
        /* Add packet to the peer. */
        bgp_packet_add(peer, s);
+
+       bgp_writes_on(peer);
 }
 
 /* RFC1771 6.8 Connection collision detection. */
@@ -705,6 +994,42 @@ static int bgp_collision_detect(struct peer *new, struct in_addr remote_id)
        return 0;
 }
 
+/* Packet processing routines ---------------------------------------------- */
+/*
+ * This is a family of functions designed to be called from
+ * bgp_process_packet(). These functions all share similar behavior and should
+ * adhere to the following invariants and restrictions:
+ *
+ * Return codes
+ * ------------
+ * The return code of any one of those functions should be one of the FSM event
+ * codes specified in bgpd.h. If a NOTIFY was sent, this event code MUST be
+ * BGP_Stop. Otherwise, the code SHOULD correspond to the function's expected
+ * packet type. For example, bgp_open_receive() should return BGP_Stop upon
+ * error and Receive_OPEN_message otherwise.
+ *
+ * If no action is necessary, the correct return code is BGP_PACKET_NOOP as
+ * defined below.
+ *
+ * Side effects
+ * ------------
+ * - May send NOTIFY messages
+ * - May not modify peer->status
+ * - May not call bgp_event_update()
+ */
+
+#define BGP_PACKET_NOOP 0
+
+/**
+ * Process BGP OPEN message for peer.
+ *
+ * If any errors are encountered in the OPEN message, immediately sends NOTIFY
+ * and returns BGP_Stop.
+ *
+ * @param peer
+ * @param size size of the packet
+ * @return as in summary
+ */
 static int bgp_open_receive(struct peer *peer, bgp_size_t size)
 {
        int ret;
@@ -722,13 +1047,13 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
        u_int16_t *holdtime_ptr;
 
        /* Parse open packet. */
-       version = stream_getc(peer->ibuf);
-       memcpy(notify_data_remote_as, stream_pnt(peer->ibuf), 2);
-       remote_as = stream_getw(peer->ibuf);
-       holdtime_ptr = (u_int16_t *)stream_pnt(peer->ibuf);
-       holdtime = stream_getw(peer->ibuf);
-       memcpy(notify_data_remote_id, stream_pnt(peer->ibuf), 4);
-       remote_id.s_addr = stream_get_ipv4(peer->ibuf);
+       version = stream_getc(peer->curr);
+       memcpy(notify_data_remote_as, stream_pnt(peer->curr), 2);
+       remote_as = stream_getw(peer->curr);
+       holdtime_ptr = (u_int16_t *)stream_pnt(peer->curr);
+       holdtime = stream_getw(peer->curr);
+       memcpy(notify_data_remote_id, stream_pnt(peer->curr), 4);
+       remote_id.s_addr = stream_get_ipv4(peer->curr);
 
        /* Receive OPEN message log  */
        if (bgp_debug_neighbor_events(peer))
@@ -740,14 +1065,14 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
 
        /* BEGIN to read the capability here, but dont do it yet */
        mp_capability = 0;
-       optlen = stream_getc(peer->ibuf);
+       optlen = stream_getc(peer->curr);
 
        if (optlen != 0) {
                /* If not enough bytes, it is an error. */
-               if (STREAM_READABLE(peer->ibuf) < optlen) {
+               if (STREAM_READABLE(peer->curr) < optlen) {
                        bgp_notify_send(peer, BGP_NOTIFY_OPEN_ERR,
                                        BGP_NOTIFY_OPEN_MALFORMED_ATTR);
-                       return -1;
+                       return BGP_Stop;
                }
 
                /* We need the as4 capability value *right now* because
@@ -767,7 +1092,7 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
                bgp_notify_send_with_data(peer, BGP_NOTIFY_OPEN_ERR,
                                          BGP_NOTIFY_OPEN_BAD_PEER_AS,
                                          notify_data_remote_as4, 4);
-               return -1;
+               return BGP_Stop;
        }
 
        if (remote_as == BGP_AS_TRANS) {
@@ -782,7 +1107,7 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
                        bgp_notify_send_with_data(peer, BGP_NOTIFY_OPEN_ERR,
                                                  BGP_NOTIFY_OPEN_BAD_PEER_AS,
                                                  notify_data_remote_as4, 4);
-                       return -1;
+                       return BGP_Stop;
                }
 
                if (!as4 && BGP_DEBUG(as4, AS4))
@@ -812,7 +1137,7 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
                        bgp_notify_send_with_data(peer, BGP_NOTIFY_OPEN_ERR,
                                                  BGP_NOTIFY_OPEN_BAD_PEER_AS,
                                                  notify_data_remote_as4, 4);
-                       return -1;
+                       return BGP_Stop;
                }
        }
 
@@ -825,7 +1150,7 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
                bgp_notify_send_with_data(peer, BGP_NOTIFY_OPEN_ERR,
                                          BGP_NOTIFY_OPEN_BAD_BGP_IDENT,
                                          notify_data_remote_id, 4);
-               return -1;
+               return BGP_Stop;
        }
 
        /* Set remote router-id */
@@ -843,7 +1168,7 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
                bgp_notify_send_with_data(peer, BGP_NOTIFY_OPEN_ERR,
                                          BGP_NOTIFY_OPEN_UNSUP_VERSION,
                                          (u_int8_t *)&maxver, 2);
-               return -1;
+               return BGP_Stop;
        }
 
        /* Check neighbor as number. */
@@ -855,7 +1180,7 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
                bgp_notify_send_with_data(peer, BGP_NOTIFY_OPEN_ERR,
                                          BGP_NOTIFY_OPEN_BAD_PEER_AS,
                                          notify_data_remote_as, 2);
-               return -1;
+               return BGP_Stop;
        } else if (peer->as_type == AS_INTERNAL) {
                if (remote_as != peer->bgp->as) {
                        if (bgp_debug_neighbor_events(peer))
@@ -865,7 +1190,7 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
                        bgp_notify_send_with_data(peer, BGP_NOTIFY_OPEN_ERR,
                                                  BGP_NOTIFY_OPEN_BAD_PEER_AS,
                                                  notify_data_remote_as, 2);
-                       return -1;
+                       return BGP_Stop;
                }
                peer->as = peer->local_as;
        } else if (peer->as_type == AS_EXTERNAL) {
@@ -877,7 +1202,7 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
                        bgp_notify_send_with_data(peer, BGP_NOTIFY_OPEN_ERR,
                                                  BGP_NOTIFY_OPEN_BAD_PEER_AS,
                                                  notify_data_remote_as, 2);
-                       return -1;
+                       return BGP_Stop;
                }
                peer->as = remote_as;
        } else if ((peer->as_type == AS_SPECIFIED) && (remote_as != peer->as)) {
@@ -887,7 +1212,7 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
                bgp_notify_send_with_data(peer, BGP_NOTIFY_OPEN_ERR,
                                          BGP_NOTIFY_OPEN_BAD_PEER_AS,
                                          notify_data_remote_as, 2);
-               return -1;
+               return BGP_Stop;
        }
 
        /* From the rfc: Upon receipt of an OPEN message, a BGP speaker MUST
@@ -901,7 +1226,7 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
                bgp_notify_send_with_data(peer, BGP_NOTIFY_OPEN_ERR,
                                          BGP_NOTIFY_OPEN_UNACEP_HOLDTIME,
                                          (u_char *)holdtime_ptr, 2);
-               return -1;
+               return BGP_Stop;
        }
 
        /* From the rfc: A reasonable maximum time between KEEPALIVE messages
@@ -930,7 +1255,7 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
        if (optlen != 0) {
                if ((ret = bgp_open_option_parse(peer, optlen, &mp_capability))
                    < 0)
-                       return ret;
+                       return BGP_Stop;
        } else {
                if (bgp_debug_neighbor_events(peer))
                        zlog_debug("%s rcvd OPEN w/ OPTION parameter len: 0",
@@ -960,213 +1285,85 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
                        peer->afc[AFI_L2VPN][SAFI_EVPN];
        }
 
-       /* When collision is detected and this peer is closed.  Retrun
-          immidiately. */
-       ret = bgp_collision_detect(peer, remote_id);
-       if (ret < 0)
-               return ret;
-
-       /* Get sockname. */
-       if ((ret = bgp_getsockname(peer)) < 0) {
-               zlog_err("%s: bgp_getsockname() failed for peer: %s",
-                        __FUNCTION__, peer->host);
-               return (ret);
-       }
-
-       /* Verify valid local address present based on negotiated
-        * address-families. */
-       if (peer->afc_nego[AFI_IP][SAFI_UNICAST]
-           || peer->afc_nego[AFI_IP][SAFI_LABELED_UNICAST]
-           || peer->afc_nego[AFI_IP][SAFI_MULTICAST]
-           || peer->afc_nego[AFI_IP][SAFI_MPLS_VPN]
-           || peer->afc_nego[AFI_IP][SAFI_ENCAP]) {
-               if (!peer->nexthop.v4.s_addr) {
-#if defined(HAVE_CUMULUS)
-                       zlog_err(
-                               "%s: No local IPv4 addr resetting connection, fd %d",
-                               peer->host, peer->fd);
-                       bgp_notify_send(peer, BGP_NOTIFY_CEASE,
-                                       BGP_NOTIFY_SUBCODE_UNSPECIFIC);
-                       return -1;
-#endif
-               }
-       }
-       if (peer->afc_nego[AFI_IP6][SAFI_UNICAST]
-           || peer->afc_nego[AFI_IP6][SAFI_LABELED_UNICAST]
-           || peer->afc_nego[AFI_IP6][SAFI_MULTICAST]
-           || peer->afc_nego[AFI_IP6][SAFI_MPLS_VPN]
-           || peer->afc_nego[AFI_IP6][SAFI_ENCAP]) {
-               if (IN6_IS_ADDR_UNSPECIFIED(&peer->nexthop.v6_global)) {
-#if defined(HAVE_CUMULUS)
-                       zlog_err(
-                               "%s: No local IPv6 addr resetting connection, fd %d",
-                               peer->host, peer->fd);
-                       bgp_notify_send(peer, BGP_NOTIFY_CEASE,
-                                       BGP_NOTIFY_SUBCODE_UNSPECIFIC);
-                       return -1;
-#endif
-               }
-       }
-       peer->rtt = sockopt_tcp_rtt(peer->fd);
-
-       if ((ret = bgp_event_update(peer, Receive_OPEN_message)) < 0) {
-               zlog_err("%s: BGP event update failed for peer: %s",
-                        __FUNCTION__, peer->host);
-               /* DD: bgp send notify and reset state */
-               return (ret);
-       }
-
-       peer->packet_size = 0;
-       if (peer->ibuf)
-               stream_reset(peer->ibuf);
-
-       return 0;
-}
-
-/* Called when there is a change in the EOR(implicit or explicit) status of a
-   peer.
-   Ends the update-delay if all expected peers are done with EORs. */
-void bgp_check_update_delay(struct bgp *bgp)
-{
-       struct listnode *node, *nnode;
-       struct peer *peer = NULL;
-
-       if (bgp_debug_neighbor_events(peer))
-               zlog_debug("Checking update delay, T: %d R: %d I:%d E: %d",
-                          bgp->established, bgp->restarted_peers,
-                          bgp->implicit_eors, bgp->explicit_eors);
-
-       if (bgp->established
-           <= bgp->restarted_peers + bgp->implicit_eors + bgp->explicit_eors) {
-               /* This is an extra sanity check to make sure we wait for all
-                  the
-                  eligible configured peers. This check is performed if
-                  establish wait
-                  timer is on, or establish wait option is not given with the
-                  update-delay command */
-               if (bgp->t_establish_wait
-                   || (bgp->v_establish_wait == bgp->v_update_delay))
-                       for (ALL_LIST_ELEMENTS(bgp->peer, node, nnode, peer)) {
-                               if (CHECK_FLAG(peer->flags,
-                                              PEER_FLAG_CONFIG_NODE)
-                                   && !CHECK_FLAG(peer->flags,
-                                                  PEER_FLAG_SHUTDOWN)
-                                   && !peer->update_delay_over) {
-                                       if (bgp_debug_neighbor_events(peer))
-                                               zlog_debug(
-                                                       " Peer %s pending, continuing read-only mode",
-                                                       peer->host);
-                                       return;
-                               }
-                       }
-
-               zlog_info(
-                       "Update delay ended, restarted: %d, EORs implicit: %d, explicit: %d",
-                       bgp->restarted_peers, bgp->implicit_eors,
-                       bgp->explicit_eors);
-               bgp_update_delay_end(bgp);
-       }
-}
-
-/* Called if peer is known to have restarted. The restart-state bit in
-   Graceful-Restart capability is used for that */
-void bgp_update_restarted_peers(struct peer *peer)
-{
-       if (!bgp_update_delay_active(peer->bgp))
-               return; /* BGP update delay has ended */
-       if (peer->update_delay_over)
-               return; /* This peer has already been considered */
-
-       if (bgp_debug_neighbor_events(peer))
-               zlog_debug("Peer %s: Checking restarted", peer->host);
-
-       if (peer->status == Established) {
-               peer->update_delay_over = 1;
-               peer->bgp->restarted_peers++;
-               bgp_check_update_delay(peer->bgp);
-       }
-}
-
-/* Called as peer receives a keep-alive. Determines if this occurence can be
-   taken as an implicit EOR for this peer.
-   NOTE: The very first keep-alive after the Established state of a peer is
-        considered implicit EOR for the update-delay purposes */
-void bgp_update_implicit_eors(struct peer *peer)
-{
-       if (!bgp_update_delay_active(peer->bgp))
-               return; /* BGP update delay has ended */
-       if (peer->update_delay_over)
-               return; /* This peer has already been considered */
-
-       if (bgp_debug_neighbor_events(peer))
-               zlog_debug("Peer %s: Checking implicit EORs", peer->host);
-
-       if (peer->status == Established) {
-               peer->update_delay_over = 1;
-               peer->bgp->implicit_eors++;
-               bgp_check_update_delay(peer->bgp);
-       }
-}
-
-/* Should be called only when there is a change in the EOR_RECEIVED status
-   for any afi/safi on a peer */
-static void bgp_update_explicit_eors(struct peer *peer)
-{
-       afi_t afi;
-       safi_t safi;
-
-       if (!bgp_update_delay_active(peer->bgp))
-               return; /* BGP update delay has ended */
-       if (peer->update_delay_over)
-               return; /* This peer has already been considered */
+       /* When collision is detected and this peer is closed.  Retrun
+          immidiately. */
+       ret = bgp_collision_detect(peer, remote_id);
+       if (ret < 0)
+               return BGP_Stop;
 
-       if (bgp_debug_neighbor_events(peer))
-               zlog_debug("Peer %s: Checking explicit EORs", peer->host);
+       /* Get sockname. */
+       if ((ret = bgp_getsockname(peer)) < 0) {
+               zlog_err("%s: bgp_getsockname() failed for peer: %s",
+                        __FUNCTION__, peer->host);
+               return BGP_Stop;
+       }
 
-       FOREACH_AFI_SAFI (afi, safi) {
-               if (peer->afc_nego[afi][safi]
-                   && !CHECK_FLAG(peer->af_sflags[afi][safi],
-                                  PEER_STATUS_EOR_RECEIVED)) {
-                       if (bgp_debug_neighbor_events(peer))
-                               zlog_debug(
-                                       "   afi %d safi %d didnt receive EOR",
-                                       afi, safi);
-                       return;
+       /* Verify valid local address present based on negotiated
+        * address-families. */
+       if (peer->afc_nego[AFI_IP][SAFI_UNICAST]
+           || peer->afc_nego[AFI_IP][SAFI_LABELED_UNICAST]
+           || peer->afc_nego[AFI_IP][SAFI_MULTICAST]
+           || peer->afc_nego[AFI_IP][SAFI_MPLS_VPN]
+           || peer->afc_nego[AFI_IP][SAFI_ENCAP]) {
+               if (!peer->nexthop.v4.s_addr) {
+#if defined(HAVE_CUMULUS)
+                       zlog_err(
+                               "%s: No local IPv4 addr resetting connection, fd %d",
+                               peer->host, peer->fd);
+                       bgp_notify_send(peer, BGP_NOTIFY_CEASE,
+                                       BGP_NOTIFY_SUBCODE_UNSPECIFIC);
+                       return BGP_Stop;
+#endif
+               }
+       }
+       if (peer->afc_nego[AFI_IP6][SAFI_UNICAST]
+           || peer->afc_nego[AFI_IP6][SAFI_LABELED_UNICAST]
+           || peer->afc_nego[AFI_IP6][SAFI_MULTICAST]
+           || peer->afc_nego[AFI_IP6][SAFI_MPLS_VPN]
+           || peer->afc_nego[AFI_IP6][SAFI_ENCAP]) {
+               if (IN6_IS_ADDR_UNSPECIFIED(&peer->nexthop.v6_global)) {
+#if defined(HAVE_CUMULUS)
+                       zlog_err(
+                               "%s: No local IPv6 addr resetting connection, fd %d",
+                               peer->host, peer->fd);
+                       bgp_notify_send(peer, BGP_NOTIFY_CEASE,
+                                       BGP_NOTIFY_SUBCODE_UNSPECIFIC);
+                       return BGP_Stop;
+#endif
                }
        }
+       peer->rtt = sockopt_tcp_rtt(peer->fd);
 
-       peer->update_delay_over = 1;
-       peer->bgp->explicit_eors++;
-       bgp_check_update_delay(peer->bgp);
+       return Receive_OPEN_message;
 }
 
-/* Frontend for NLRI parsing, to fan-out to AFI/SAFI specific parsers
- * mp_withdraw, if set, is used to nullify attr structure on most of the calling
- * safi function
- * and for evpn, passed as parameter
+/**
+ * Process BGP KEEPALIVE message for peer.
+ *
+ * @param peer
+ * @param size size of the packet
+ * @return as in summary
  */
-int bgp_nlri_parse(struct peer *peer, struct attr *attr,
-                  struct bgp_nlri *packet, int mp_withdraw)
+static int bgp_keepalive_receive(struct peer *peer, bgp_size_t size)
 {
-       switch (packet->safi) {
-       case SAFI_UNICAST:
-       case SAFI_MULTICAST:
-               return bgp_nlri_parse_ip(peer, mp_withdraw ? NULL : attr,
-                                        packet);
-       case SAFI_LABELED_UNICAST:
-               return bgp_nlri_parse_label(peer, mp_withdraw ? NULL : attr,
-                                           packet);
-       case SAFI_MPLS_VPN:
-               return bgp_nlri_parse_vpn(peer, mp_withdraw ? NULL : attr,
-                                         packet);
-       case SAFI_EVPN:
-               return bgp_nlri_parse_evpn(peer, attr, packet, mp_withdraw);
-       default:
-               return -1;
-       }
+       if (bgp_debug_keepalive(peer))
+               zlog_debug("%s KEEPALIVE rcvd", peer->host);
+
+       bgp_update_implicit_eors(peer);
+
+       return Receive_KEEPALIVE_message;
 }
 
-/* Parse BGP Update packet and make attribute object. */
+
+/**
+ * Process BGP UPDATE message for peer.
+ *
+ * Parses UPDATE and creates attribute object.
+ *
+ * @param peer
+ * @param size size of the packet
+ * @return as in summary
+ */
 static int bgp_update_receive(struct peer *peer, bgp_size_t size)
 {
        int ret, nlri_ret;
@@ -1192,7 +1389,7 @@ static int bgp_update_receive(struct peer *peer, bgp_size_t size)
                         peer->host,
                         lookup_msg(bgp_status_msg, peer->status, NULL));
                bgp_notify_send(peer, BGP_NOTIFY_FSM_ERR, 0);
-               return -1;
+               return BGP_Stop;
        }
 
        /* Set initial values. */
@@ -1203,7 +1400,7 @@ static int bgp_update_receive(struct peer *peer, bgp_size_t size)
        memset(peer->rcvd_attr_str, 0, BUFSIZ);
        peer->rcvd_attr_printed = 0;
 
-       s = peer->ibuf;
+       s = peer->curr;
        end = stream_pnt(s) + size;
 
        /* RFC1771 6.3 If the Unfeasible Routes Length or Total Attribute
@@ -1217,7 +1414,7 @@ static int bgp_update_receive(struct peer *peer, bgp_size_t size)
                        peer->host);
                bgp_notify_send(peer, BGP_NOTIFY_UPDATE_ERR,
                                BGP_NOTIFY_UPDATE_MAL_ATTR);
-               return -1;
+               return BGP_Stop;
        }
 
        /* Unfeasible Route Length. */
@@ -1231,7 +1428,7 @@ static int bgp_update_receive(struct peer *peer, bgp_size_t size)
                        peer->host, withdraw_len);
                bgp_notify_send(peer, BGP_NOTIFY_UPDATE_ERR,
                                BGP_NOTIFY_UPDATE_MAL_ATTR);
-               return -1;
+               return BGP_Stop;
        }
 
        /* Unfeasible Route packet format check. */
@@ -1251,7 +1448,7 @@ static int bgp_update_receive(struct peer *peer, bgp_size_t size)
                        peer->host);
                bgp_notify_send(peer, BGP_NOTIFY_UPDATE_ERR,
                                BGP_NOTIFY_UPDATE_MAL_ATTR);
-               return -1;
+               return BGP_Stop;
        }
 
        /* Fetch attribute total length. */
@@ -1265,7 +1462,7 @@ static int bgp_update_receive(struct peer *peer, bgp_size_t size)
                        peer->host, attribute_len);
                bgp_notify_send(peer, BGP_NOTIFY_UPDATE_ERR,
                                BGP_NOTIFY_UPDATE_MAL_ATTR);
-               return -1;
+               return BGP_Stop;
        }
 
        /* Certain attribute parsing errors should not be considered bad enough
@@ -1288,7 +1485,7 @@ static int bgp_update_receive(struct peer *peer, bgp_size_t size)
                                                &nlris[NLRI_MP_WITHDRAW]);
                if (attr_parse_ret == BGP_ATTR_PARSE_ERROR) {
                        bgp_attr_unintern_sub(&attr);
-                       return -1;
+                       return BGP_Stop;
                }
        }
 
@@ -1367,7 +1564,7 @@ static int bgp_update_receive(struct peer *peer, bgp_size_t size)
                                                ? BGP_NOTIFY_UPDATE_INVAL_NETWORK
                                                : BGP_NOTIFY_UPDATE_OPT_ATTR_ERR);
                        bgp_attr_unintern_sub(&attr);
-                       return -1;
+                       return BGP_Stop;
                }
        }
 
@@ -1376,9 +1573,8 @@ static int bgp_update_receive(struct peer *peer, bgp_size_t size)
         * Non-MP IPv4/Unicast EoR is a completely empty UPDATE
         * and MP EoR should have only an empty MP_UNREACH
         */
-       if ((!update_len && !withdraw_len &&
-            nlris[NLRI_MP_UPDATE].length == 0) ||
-           (attr_parse_ret == BGP_ATTR_PARSE_EOR)) {
+       if ((!update_len && !withdraw_len && nlris[NLRI_MP_UPDATE].length == 0)
+           || (attr_parse_ret == BGP_ATTR_PARSE_EOR)) {
                afi_t afi = 0;
                safi_t safi;
 
@@ -1423,24 +1619,23 @@ static int bgp_update_receive(struct peer *peer, bgp_size_t size)
           interned in bgp_attr_parse(). */
        bgp_attr_unintern_sub(&attr);
 
-       /* If peering is stopped due to some reason, do not generate BGP
-          event.  */
-       if (peer->status != Established)
-               return 0;
-
-       /* Increment packet counter. */
-       peer->update_in++;
        peer->update_time = bgp_clock();
 
        /* Rearm holdtime timer */
        BGP_TIMER_OFF(peer->t_holdtime);
        bgp_timer_set(peer);
 
-       return 0;
+       return Receive_UPDATE_message;
 }
 
-/* Notify message treatment function. */
-static void bgp_notify_receive(struct peer *peer, bgp_size_t size)
+/**
+ * Process BGP NOTIFY message for peer.
+ *
+ * @param peer
+ * @param size size of the packet
+ * @return as in summary
+ */
+static int bgp_notify_receive(struct peer *peer, bgp_size_t size)
 {
        struct bgp_notify bgp_notify;
 
@@ -1450,8 +1645,8 @@ static void bgp_notify_receive(struct peer *peer, bgp_size_t size)
                peer->notify.length = 0;
        }
 
-       bgp_notify.code = stream_getc(peer->ibuf);
-       bgp_notify.subcode = stream_getc(peer->ibuf);
+       bgp_notify.code = stream_getc(peer->curr);
+       bgp_notify.subcode = stream_getc(peer->curr);
        bgp_notify.length = size - 2;
        bgp_notify.data = NULL;
 
@@ -1462,7 +1657,7 @@ static void bgp_notify_receive(struct peer *peer, bgp_size_t size)
        if (bgp_notify.length) {
                peer->notify.length = size - 2;
                peer->notify.data = XMALLOC(MTYPE_TMP, size - 2);
-               memcpy(peer->notify.data, stream_pnt(peer->ibuf), size - 2);
+               memcpy(peer->notify.data, stream_pnt(peer->curr), size - 2);
        }
 
        /* For debug */
@@ -1477,12 +1672,12 @@ static void bgp_notify_receive(struct peer *peer, bgp_size_t size)
                        for (i = 0; i < bgp_notify.length; i++)
                                if (first) {
                                        sprintf(c, " %02x",
-                                               stream_getc(peer->ibuf));
+                                               stream_getc(peer->curr));
                                        strcat(bgp_notify.data, c);
                                } else {
                                        first = 1;
                                        sprintf(c, "%02x",
-                                               stream_getc(peer->ibuf));
+                                               stream_getc(peer->curr));
                                        strcpy(bgp_notify.data, c);
                                }
                        bgp_notify.raw_data = (u_char *)peer->notify.data;
@@ -1497,7 +1692,7 @@ static void bgp_notify_receive(struct peer *peer, bgp_size_t size)
        }
 
        /* peer count update */
-       peer->notify_in++;
+       atomic_fetch_add_explicit(&peer->notify_in, 1, memory_order_relaxed);
 
        peer->last_reset = PEER_DOWN_NOTIFY_RECEIVED;
 
@@ -1509,20 +1704,17 @@ static void bgp_notify_receive(struct peer *peer, bgp_size_t size)
            && bgp_notify.subcode == BGP_NOTIFY_OPEN_UNSUP_PARAM)
                UNSET_FLAG(peer->sflags, PEER_STATUS_CAPABILITY_OPEN);
 
-       BGP_EVENT_ADD(peer, Receive_NOTIFICATION_message);
-}
-
-/* Keepalive treatment function -- get keepalive send keepalive */
-static void bgp_keepalive_receive(struct peer *peer, bgp_size_t size)
-{
-       if (bgp_debug_keepalive(peer))
-               zlog_debug("%s KEEPALIVE rcvd", peer->host);
-
-       BGP_EVENT_ADD(peer, Receive_KEEPALIVE_message);
+       return Receive_NOTIFICATION_message;
 }
 
-/* Route refresh message is received. */
-static void bgp_route_refresh_receive(struct peer *peer, bgp_size_t size)
+/**
+ * Process BGP ROUTEREFRESH message for peer.
+ *
+ * @param peer
+ * @param size size of the packet
+ * @return as in summary
+ */
+static int bgp_route_refresh_receive(struct peer *peer, bgp_size_t size)
 {
        iana_afi_t pkt_afi;
        afi_t afi;
@@ -1539,7 +1731,7 @@ static void bgp_route_refresh_receive(struct peer *peer, bgp_size_t size)
                         peer->host);
                bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
                                BGP_NOTIFY_HEADER_BAD_MESTYPE);
-               return;
+               return BGP_Stop;
        }
 
        /* Status must be Established. */
@@ -1549,10 +1741,10 @@ static void bgp_route_refresh_receive(struct peer *peer, bgp_size_t size)
                        peer->host,
                        lookup_msg(bgp_status_msg, peer->status, NULL));
                bgp_notify_send(peer, BGP_NOTIFY_FSM_ERR, 0);
-               return;
+               return BGP_Stop;
        }
 
-       s = peer->ibuf;
+       s = peer->curr;
 
        /* Parse packet. */
        pkt_afi = stream_getw(s);
@@ -1568,7 +1760,7 @@ static void bgp_route_refresh_receive(struct peer *peer, bgp_size_t size)
                zlog_info(
                        "%s REFRESH_REQ for unrecognized afi/safi: %d/%d - ignored",
                        peer->host, pkt_afi, pkt_safi);
-               return;
+               return BGP_PACKET_NOOP;
        }
 
        if (size != BGP_MSG_ROUTE_REFRESH_MIN_SIZE - BGP_HEADER_SIZE) {
@@ -1582,7 +1774,7 @@ static void bgp_route_refresh_receive(struct peer *peer, bgp_size_t size)
                        zlog_info("%s ORF route refresh length error",
                                  peer->host);
                        bgp_notify_send(peer, BGP_NOTIFY_CEASE, 0);
-                       return;
+                       return BGP_Stop;
                }
 
                when_to_refresh = stream_getc(s);
@@ -1753,7 +1945,7 @@ static void bgp_route_refresh_receive(struct peer *peer, bgp_size_t size)
                                           ? "Defer"
                                           : "Immediate");
                if (when_to_refresh == REFRESH_DEFER)
-                       return;
+                       return BGP_PACKET_NOOP;
        }
 
        /* First update is deferred until ORF or ROUTE-REFRESH is received */
@@ -1784,8 +1976,18 @@ static void bgp_route_refresh_receive(struct peer *peer, bgp_size_t size)
 
        /* Perform route refreshment to the peer */
        bgp_announce_route(peer, afi, safi);
+
+       /* No FSM action necessary */
+       return BGP_PACKET_NOOP;
 }
 
+/**
+ * Parse BGP CAPABILITY message for peer.
+ *
+ * @param peer
+ * @param size size of the packet
+ * @return as in summary
+ */
 static int bgp_capability_msg_parse(struct peer *peer, u_char *pnt,
                                    bgp_size_t length)
 {
@@ -1806,7 +2008,7 @@ static int bgp_capability_msg_parse(struct peer *peer, u_char *pnt,
                if (pnt + 3 > end) {
                        zlog_info("%s Capability length error", peer->host);
                        bgp_notify_send(peer, BGP_NOTIFY_CEASE, 0);
-                       return -1;
+                       return BGP_Stop;
                }
                action = *pnt;
                hdr = (struct capability_header *)(pnt + 1);
@@ -1817,7 +2019,7 @@ static int bgp_capability_msg_parse(struct peer *peer, u_char *pnt,
                        zlog_info("%s Capability Action Value error %d",
                                  peer->host, action);
                        bgp_notify_send(peer, BGP_NOTIFY_CEASE, 0);
-                       return -1;
+                       return BGP_Stop;
                }
 
                if (bgp_debug_neighbor_events(peer))
@@ -1829,7 +2031,7 @@ static int bgp_capability_msg_parse(struct peer *peer, u_char *pnt,
                if ((pnt + hdr->length + 3) > end) {
                        zlog_info("%s Capability length error", peer->host);
                        bgp_notify_send(peer, BGP_NOTIFY_CEASE, 0);
-                       return -1;
+                       return BGP_Stop;
                }
 
                /* Fetch structure to the byte stream. */
@@ -1880,7 +2082,7 @@ static int bgp_capability_msg_parse(struct peer *peer, u_char *pnt,
                                if (peer_active_nego(peer))
                                        bgp_clear_route(peer, afi, safi);
                                else
-                                       BGP_EVENT_ADD(peer, BGP_Stop);
+                                       return BGP_Stop;
                        }
                } else {
                        zlog_warn(
@@ -1888,19 +2090,26 @@ static int bgp_capability_msg_parse(struct peer *peer, u_char *pnt,
                                peer->host, hdr->code);
                }
        }
-       return 0;
+
+       /* No FSM action necessary */
+       return BGP_PACKET_NOOP;
 }
 
-/* Dynamic Capability is received.
+/**
+ * Parse BGP CAPABILITY message for peer.
+ *
+ * Exported for unit testing.
  *
- * This is exported for unit-test purposes
+ * @param peer
+ * @param size size of the packet
+ * @return as in summary
  */
 int bgp_capability_receive(struct peer *peer, bgp_size_t size)
 {
        u_char *pnt;
 
        /* Fetch pointer. */
-       pnt = stream_pnt(peer->ibuf);
+       pnt = stream_pnt(peer->curr);
 
        if (bgp_debug_neighbor_events(peer))
                zlog_debug("%s rcv CAPABILITY", peer->host);
@@ -1911,7 +2120,7 @@ int bgp_capability_receive(struct peer *peer, bgp_size_t size)
                         peer->host);
                bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
                                BGP_NOTIFY_HEADER_BAD_MESTYPE);
-               return -1;
+               return BGP_Stop;
        }
 
        /* Status must be Established. */
@@ -1921,513 +2130,176 @@ int bgp_capability_receive(struct peer *peer, bgp_size_t size)
                        peer->host,
                        lookup_msg(bgp_status_msg, peer->status, NULL));
                bgp_notify_send(peer, BGP_NOTIFY_FSM_ERR, 0);
-               return -1;
+               return BGP_Stop;
        }
 
        /* Parse packet. */
        return bgp_capability_msg_parse(peer, pnt, size);
 }
 
-/* BGP read utility function. */
-static int bgp_read_packet(struct peer *peer)
+/**
+ * Processes a peer's input buffer.
+ *
+ * This function sidesteps the event loop and directly calls bgp_event_update()
+ * after processing each BGP message. This is necessary to ensure proper
+ * ordering of FSM events and unifies the behavior that was present previously,
+ * whereby some of the packet handling functions would update the FSM and some
+ * would not, making event flow difficult to understand. Please think twice
+ * before hacking this.
+ *
+ * Thread type: THREAD_EVENT
+ * @param thread
+ * @return 0
+ */
+int bgp_process_packet(struct thread *thread)
 {
-       int nbytes;
-       int readsize;
+       /* Yes first of all get peer pointer. */
+       struct peer *peer;      // peer
+       uint32_t rpkt_quanta_old; // how many packets to read
+       int fsm_update_result;    // return code of bgp_event_update()
+       int mprc;                 // message processing return code
 
-       readsize = peer->packet_size - stream_get_endp(peer->ibuf);
+       peer = THREAD_ARG(thread);
+       rpkt_quanta_old = atomic_load_explicit(&peer->bgp->rpkt_quanta,
+                                              memory_order_relaxed);
+       fsm_update_result = 0;
 
-       /* If size is zero then return. */
-       if (!readsize)
+       /* Guard against scheduled events that occur after peer deletion. */
+       if (peer->status == Deleted || peer->status == Clearing)
                return 0;
 
-       /* Read packet from fd. */
-       nbytes = stream_read_try(peer->ibuf, peer->fd, readsize);
-
-       /* If read byte is smaller than zero then error occured. */
-       if (nbytes < 0) {
-               /* Transient error should retry */
-               if (nbytes == -2)
-                       return -1;
-
-               zlog_err("%s [Error] bgp_read_packet error: %s", peer->host,
-                        safe_strerror(errno));
-
-               if (peer->status == Established) {
-                       if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_MODE)) {
-                               peer->last_reset = PEER_DOWN_NSF_CLOSE_SESSION;
-                               SET_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT);
-                       } else
-                               peer->last_reset = PEER_DOWN_CLOSE_SESSION;
-               }
+       unsigned int processed = 0;
 
-               BGP_EVENT_ADD(peer, TCP_fatal_error);
-               return -1;
-       }
+       while (processed < rpkt_quanta_old) {
+               u_char type = 0;
+               bgp_size_t size;
+               char notify_data_length[2];
 
-       /* When read byte is zero : clear bgp peer and return */
-       if (nbytes == 0) {
-               if (bgp_debug_neighbor_events(peer))
-                       zlog_debug("%s [Event] BGP connection closed fd %d",
-                                  peer->host, peer->fd);
-
-               if (peer->status == Established) {
-                       if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_MODE)) {
-                               peer->last_reset = PEER_DOWN_NSF_CLOSE_SESSION;
-                               SET_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT);
-                       } else
-                               peer->last_reset = PEER_DOWN_CLOSE_SESSION;
+               pthread_mutex_lock(&peer->io_mtx);
+               {
+                       peer->curr = stream_fifo_pop(peer->ibuf);
                }
+               pthread_mutex_unlock(&peer->io_mtx);
 
-               BGP_EVENT_ADD(peer, TCP_connection_closed);
-               return -1;
-       }
-
-       /* We read partial packet. */
-       if (stream_get_endp(peer->ibuf) != peer->packet_size)
-               return -1;
-
-       return 0;
-}
-
-/* Marker check. */
-static int bgp_marker_all_one(struct stream *s, int length)
-{
-       int i;
-
-       for (i = 0; i < length; i++)
-               if (s->data[i] != 0xff)
+               if (peer->curr == NULL) // no packets to process, hmm...
                        return 0;
 
-       return 1;
-}
-
-/* Starting point of packet process function. */
-int bgp_read(struct thread *thread)
-{
-       int ret;
-       u_char type = 0;
-       struct peer *peer;
-       bgp_size_t size;
-       char notify_data_length[2];
-       u_int32_t notify_out;
-
-       /* Yes first of all get peer pointer. */
-       peer = THREAD_ARG(thread);
-       peer->t_read = NULL;
-
-       /* Note notify_out so we can check later to see if we sent another one
-        */
-       notify_out = peer->notify_out;
-
-       if (peer->fd < 0) {
-               zlog_err("bgp_read(): peer's fd is negative value %d",
-                        peer->fd);
-               return -1;
-       }
-
-       BGP_READ_ON(peer->t_read, bgp_read, peer->fd);
-
-       /* Read packet header to determine type of the packet */
-       if (peer->packet_size == 0)
-               peer->packet_size = BGP_HEADER_SIZE;
-
-       if (stream_get_endp(peer->ibuf) < BGP_HEADER_SIZE) {
-               ret = bgp_read_packet(peer);
-
-               /* Header read error or partial read packet. */
-               if (ret < 0)
-                       goto done;
-
-               /* Get size and type. */
-               stream_forward_getp(peer->ibuf, BGP_MARKER_SIZE);
-               memcpy(notify_data_length, stream_pnt(peer->ibuf), 2);
-               size = stream_getw(peer->ibuf);
-               type = stream_getc(peer->ibuf);
-
-               /* Marker check */
-               if (((type == BGP_MSG_OPEN) || (type == BGP_MSG_KEEPALIVE))
-                   && !bgp_marker_all_one(peer->ibuf, BGP_MARKER_SIZE)) {
-                       bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
-                                       BGP_NOTIFY_HEADER_NOT_SYNC);
-                       goto done;
-               }
-
-               /* BGP type check. */
-               if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
-                   && type != BGP_MSG_NOTIFY && type != BGP_MSG_KEEPALIVE
-                   && type != BGP_MSG_ROUTE_REFRESH_NEW
-                   && type != BGP_MSG_ROUTE_REFRESH_OLD
-                   && type != BGP_MSG_CAPABILITY) {
-                       if (bgp_debug_neighbor_events(peer))
-                               zlog_debug("%s unknown message type 0x%02x",
-                                          peer->host, type);
-                       bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
-                                                 BGP_NOTIFY_HEADER_BAD_MESTYPE,
-                                                 &type, 1);
-                       goto done;
-               }
-               /* Mimimum packet length check. */
-               if ((size < BGP_HEADER_SIZE) || (size > BGP_MAX_PACKET_SIZE)
-                   || (type == BGP_MSG_OPEN && size < BGP_MSG_OPEN_MIN_SIZE)
-                   || (type == BGP_MSG_UPDATE
-                       && size < BGP_MSG_UPDATE_MIN_SIZE)
-                   || (type == BGP_MSG_NOTIFY
-                       && size < BGP_MSG_NOTIFY_MIN_SIZE)
-                   || (type == BGP_MSG_KEEPALIVE
-                       && size != BGP_MSG_KEEPALIVE_MIN_SIZE)
-                   || (type == BGP_MSG_ROUTE_REFRESH_NEW
-                       && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
-                   || (type == BGP_MSG_ROUTE_REFRESH_OLD
-                       && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
-                   || (type == BGP_MSG_CAPABILITY
-                       && size < BGP_MSG_CAPABILITY_MIN_SIZE)) {
-                       if (bgp_debug_neighbor_events(peer))
-                               zlog_debug("%s bad message length - %d for %s",
-                                          peer->host, size,
-                                          type == 128
-                                                  ? "ROUTE-REFRESH"
-                                                  : bgp_type_str[(int)type]);
-                       bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
-                                                 BGP_NOTIFY_HEADER_BAD_MESLEN,
-                                                 (u_char *)notify_data_length,
-                                                 2);
-                       goto done;
-               }
-
-               /* Adjust size to message length. */
-               peer->packet_size = size;
-       }
-
-       ret = bgp_read_packet(peer);
-       if (ret < 0)
-               goto done;
-
-       /* Get size and type again. */
-       (void)stream_getw_from(peer->ibuf, BGP_MARKER_SIZE);
-       type = stream_getc_from(peer->ibuf, BGP_MARKER_SIZE + 2);
-
-       /* BGP packet dump function. */
-       bgp_dump_packet(peer, type, peer->ibuf);
-
-       size = (peer->packet_size - BGP_HEADER_SIZE);
-
-       /* Read rest of the packet and call each sort of packet routine */
-       switch (type) {
-       case BGP_MSG_OPEN:
-               peer->open_in++;
-               bgp_open_receive(peer, size); /* XXX return value ignored! */
-               break;
-       case BGP_MSG_UPDATE:
-               peer->readtime = monotime(NULL);
-               bgp_update_receive(peer, size);
-               break;
-       case BGP_MSG_NOTIFY:
-               bgp_notify_receive(peer, size);
-               break;
-       case BGP_MSG_KEEPALIVE:
-               peer->readtime = monotime(NULL);
-               bgp_keepalive_receive(peer, size);
-               break;
-       case BGP_MSG_ROUTE_REFRESH_NEW:
-       case BGP_MSG_ROUTE_REFRESH_OLD:
-               peer->refresh_in++;
-               bgp_route_refresh_receive(peer, size);
-               break;
-       case BGP_MSG_CAPABILITY:
-               peer->dynamic_cap_in++;
-               bgp_capability_receive(peer, size);
-               break;
-       }
-
-       /* If reading this packet caused us to send a NOTIFICATION then store a
-        * copy
-        * of the packet for troubleshooting purposes
-        */
-       if (notify_out < peer->notify_out) {
-               memcpy(peer->last_reset_cause, peer->ibuf->data,
-                      peer->packet_size);
-               peer->last_reset_cause_size = peer->packet_size;
-               notify_out = peer->notify_out;
-       }
-
-       /* Clear input buffer. */
-       peer->packet_size = 0;
-       if (peer->ibuf)
-               stream_reset(peer->ibuf);
-
-done:
-       /* If reading this packet caused us to send a NOTIFICATION then store a
-        * copy
-        * of the packet for troubleshooting purposes
-        */
-       if (notify_out < peer->notify_out) {
-               memcpy(peer->last_reset_cause, peer->ibuf->data,
-                      peer->packet_size);
-               peer->last_reset_cause_size = peer->packet_size;
-       }
+               /* skip the marker and copy the packet length */
+               stream_forward_getp(peer->curr, BGP_MARKER_SIZE);
+               memcpy(notify_data_length, stream_pnt(peer->curr), 2);
 
-       return 0;
-}
+               /* read in the packet length and type */
+               size = stream_getw(peer->curr);
+               type = stream_getc(peer->curr);
 
-/* ------------- write thread ------------------ */
+               /* BGP packet dump function. */
+               bgp_dump_packet(peer, type, peer->curr);
 
-/**
- * Flush peer output buffer.
- *
- * This function pops packets off of peer->obuf and writes them to peer->fd.
- * The amount of packets written is equal to the minimum of peer->wpkt_quanta
- * and the number of packets on the output buffer.
- *
- * If write() returns an error, the appropriate FSM event is generated.
- *
- * The return value is equal to the number of packets written
- * (which may be zero).
- */
-static int bgp_write(struct peer *peer)
-{
-       u_char type;
-       struct stream *s;
-       int num;
-       int update_last_write = 0;
-       unsigned int count = 0;
-       unsigned int oc = 0;
-
-       /* Write packets. The number of packets written is the value of
-        * bgp->wpkt_quanta or the size of the output buffer, whichever is
-        * smaller.*/
-       while (count < peer->bgp->wpkt_quanta
-              && (s = stream_fifo_head(peer->obuf))) {
-               int writenum;
-               do { // write a full packet, or return on error
-                       writenum = stream_get_endp(s) - stream_get_getp(s);
-                       num = write(peer->fd, STREAM_PNT(s), writenum);
-
-                       if (num < 0) {
-                               if (!ERRNO_IO_RETRY(errno))
-                                       BGP_EVENT_ADD(peer, TCP_fatal_error);
-
-                               goto done;
-                       } else if (num != writenum) // incomplete write
-                               stream_forward_getp(s, num);
-
-               } while (num != writenum);
-
-               /* Retrieve BGP packet type. */
-               stream_set_getp(s, BGP_MARKER_SIZE + 2);
-               type = stream_getc(s);
+               /* adjust size to exclude the marker + length + type */
+               size -= BGP_HEADER_SIZE;
 
+               /* Read rest of the packet and call each sort of packet routine
+                */
                switch (type) {
                case BGP_MSG_OPEN:
-                       peer->open_out++;
+                       atomic_fetch_add_explicit(&peer->open_in, 1,
+                                                 memory_order_relaxed);
+                       mprc = bgp_open_receive(peer, size);
+                       if (mprc == BGP_Stop)
+                               zlog_err(
+                                       "%s: BGP OPEN receipt failed for peer: %s",
+                                       __FUNCTION__, peer->host);
                        break;
                case BGP_MSG_UPDATE:
-                       peer->update_out++;
+                       atomic_fetch_add_explicit(&peer->update_in, 1,
+                                                 memory_order_relaxed);
+                       peer->readtime = monotime(NULL);
+                       mprc = bgp_update_receive(peer, size);
+                       if (mprc == BGP_Stop)
+                               zlog_err(
+                                       "%s: BGP UPDATE receipt failed for peer: %s",
+                                       __FUNCTION__, peer->host);
                        break;
                case BGP_MSG_NOTIFY:
-                       peer->notify_out++;
-                       /* Double start timer. */
-                       peer->v_start *= 2;
-
-                       /* Overflow check. */
-                       if (peer->v_start >= (60 * 2))
-                               peer->v_start = (60 * 2);
-
-                       /* Handle Graceful Restart case where the state changes
-                          to
-                          Connect instead of Idle */
-                       /* Flush any existing events */
-                       BGP_EVENT_ADD(peer, BGP_Stop);
-                       goto done;
-
+                       atomic_fetch_add_explicit(&peer->notify_in, 1,
+                                                 memory_order_relaxed);
+                       mprc = bgp_notify_receive(peer, size);
+                       if (mprc == BGP_Stop)
+                               zlog_err(
+                                       "%s: BGP NOTIFY receipt failed for peer: %s",
+                                       __FUNCTION__, peer->host);
+                       break;
                case BGP_MSG_KEEPALIVE:
-                       peer->keepalive_out++;
+                       peer->readtime = monotime(NULL);
+                       atomic_fetch_add_explicit(&peer->keepalive_in, 1,
+                                                 memory_order_relaxed);
+                       mprc = bgp_keepalive_receive(peer, size);
+                       if (mprc == BGP_Stop)
+                               zlog_err(
+                                       "%s: BGP KEEPALIVE receipt failed for peer: %s",
+                                       __FUNCTION__, peer->host);
                        break;
                case BGP_MSG_ROUTE_REFRESH_NEW:
                case BGP_MSG_ROUTE_REFRESH_OLD:
-                       peer->refresh_out++;
+                       atomic_fetch_add_explicit(&peer->refresh_in, 1,
+                                                 memory_order_relaxed);
+                       mprc = bgp_route_refresh_receive(peer, size);
+                       if (mprc == BGP_Stop)
+                               zlog_err(
+                                       "%s: BGP ROUTEREFRESH receipt failed for peer: %s",
+                                       __FUNCTION__, peer->host);
                        break;
                case BGP_MSG_CAPABILITY:
-                       peer->dynamic_cap_out++;
+                       atomic_fetch_add_explicit(&peer->dynamic_cap_in, 1,
+                                                 memory_order_relaxed);
+                       mprc = bgp_capability_receive(peer, size);
+                       if (mprc == BGP_Stop)
+                               zlog_err(
+                                       "%s: BGP CAPABILITY receipt failed for peer: %s",
+                                       __FUNCTION__, peer->host);
                        break;
+               default:
+                       /*
+                        * The message type should have been sanitized before
+                        * we ever got here. Receipt of a message with an
+                        * invalid header at this point is indicative of a
+                        * security issue.
+                        */
+                       assert (!"Message of invalid type received during input processing");
                }
 
-               count++;
-               /* OK we send packet so delete it. */
-               bgp_packet_delete_unsafe(peer);
-               update_last_write = 1;
-       }
-
-done : {
-       /* Update last_update if UPDATEs were written. */
-       if (peer->update_out > oc)
-               peer->last_update = bgp_clock();
-
-       /* If we TXed any flavor of packet update last_write */
-       if (update_last_write)
-               peer->last_write = bgp_clock();
-}
-
-       return count;
-}
-
-void peer_writes_init(void)
-{
-       plist_mtx = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_mutex_t));
-       write_cond = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_cond_t));
-
-       // initialize mutex
-       pthread_mutex_init(plist_mtx, NULL);
-
-       // use monotonic clock with condition variable
-       pthread_condattr_t attrs;
-       pthread_condattr_init(&attrs);
-       pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC);
-       pthread_cond_init(write_cond, &attrs);
-       pthread_condattr_destroy(&attrs);
-
-       // initialize peerlist
-       plist = list_new();
-}
-
-static void peer_writes_finish(void *arg)
-{
-       bgp_packet_writes_thread_run = false;
-
-       if (plist)
-               list_delete(plist);
-
-       plist = NULL;
-
-       pthread_mutex_unlock(plist_mtx);
-       pthread_mutex_destroy(plist_mtx);
-       pthread_cond_destroy(write_cond);
-
-       XFREE(MTYPE_PTHREAD, plist_mtx);
-       XFREE(MTYPE_PTHREAD, write_cond);
-}
-
-/**
- * Entry function for peer packet flushing pthread.
- *
- * peer_writes_init() must be called prior to this.
- */
-void *peer_writes_start(void *arg)
-{
-       struct timeval currtime = {0, 0};
-       struct timeval sleeptime = {0, 500};
-       struct timespec next_update = {0, 0};
-
-       struct listnode *ln;
-       struct peer *peer;
-
-       pthread_mutex_lock(plist_mtx);
-
-       // register cleanup handler
-       pthread_cleanup_push(&peer_writes_finish, NULL);
-
-       bgp_packet_writes_thread_run = true;
-
-       while (bgp_packet_writes_thread_run) { // wait around until next update
-                                              // time
-               if (plist->count > 0)
-                       pthread_cond_timedwait(write_cond, plist_mtx,
-                                              &next_update);
-               else // wait around until we have some peers
-                       while (plist->count == 0
-                              && bgp_packet_writes_thread_run)
-                               pthread_cond_wait(write_cond, plist_mtx);
-
-               for (ALL_LIST_ELEMENTS_RO(plist, ln, peer)) {
-                       pthread_mutex_lock(&peer->obuf_mtx);
-                       {
-                               bgp_write(peer);
-                       }
-                       pthread_mutex_unlock(&peer->obuf_mtx);
-
-                       if (!bgp_packet_writes_thread_run)
-                               break;
-               }
-
-               // schedule update packet generation on main thread
-               if (!t_generate_updgrp_packets)
-                       t_generate_updgrp_packets = thread_add_event(
-                               bm->master, bgp_generate_updgrp_packets, NULL,
-                               0);
-
-               monotime(&currtime);
-               timeradd(&currtime, &sleeptime, &currtime);
-               TIMEVAL_TO_TIMESPEC(&currtime, &next_update);
-       }
-
-       // clean up
-       pthread_cleanup_pop(1);
-
-       return NULL;
-}
-
-/**
- * Turns on packet writing for a peer.
- */
-void peer_writes_on(struct peer *peer)
-{
-       if (peer->status == Deleted)
-               return;
-
-       pthread_mutex_lock(plist_mtx);
-       {
-               struct listnode *ln, *nn;
-               struct peer *p;
-
-               // make sure this peer isn't already in the list
-               for (ALL_LIST_ELEMENTS(plist, ln, nn, p))
-                       if (p == peer) {
-                               pthread_mutex_unlock(plist_mtx);
-                               return;
-                       }
+               /* delete processed packet */
+               stream_free(peer->curr);
+               peer->curr = NULL;
+               processed++;
 
-               peer_lock(peer);
-               listnode_add(plist, peer);
+               /* Update FSM */
+               if (mprc != BGP_PACKET_NOOP)
+                       fsm_update_result = bgp_event_update(peer, mprc);
+               else
+                       continue;
 
-               SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
+               /*
+                * If peer was deleted, do not process any more packets. This
+                * is usually due to executing BGP_Stop or a stub deletion.
+                */
+               if (fsm_update_result == FSM_PEER_TRANSFERRED
+                   || fsm_update_result == FSM_PEER_STOPPED)
+                       break;
        }
-       pthread_mutex_unlock(plist_mtx);
-       peer_writes_wake();
-}
-
-/**
- * Turns off packet writing for a peer.
- */
-void peer_writes_off(struct peer *peer)
-{
-       struct listnode *ln, *nn;
-       struct peer *p;
-       pthread_mutex_lock(plist_mtx);
-       {
-               for (ALL_LIST_ELEMENTS(plist, ln, nn, p))
-                       if (p == peer) {
-                               list_delete_node(plist, ln);
-                               peer_unlock(peer);
-                               break;
-                       }
 
-               UNSET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
+       if (fsm_update_result != FSM_PEER_TRANSFERRED
+           && fsm_update_result != FSM_PEER_STOPPED) {
+               pthread_mutex_lock(&peer->io_mtx);
+               {
+                       // more work to do, come back later
+                       if (peer->ibuf->count > 0)
+                               thread_add_timer_msec(
+                                       bm->master, bgp_process_packet, peer, 0,
+                                       &peer->t_process_packet);
+               }
+               pthread_mutex_unlock(&peer->io_mtx);
        }
-       pthread_mutex_unlock(plist_mtx);
-}
-
-/**
- * Wakes up the write thread to do work.
- */
-void peer_writes_wake()
-{
-       pthread_cond_signal(write_cond);
-}
 
-int peer_writes_stop(void **result)
-{
-       struct frr_pthread *fpt = frr_pthread_get(PTHREAD_WRITE);
-       bgp_packet_writes_thread_run = false;
-       peer_writes_wake();
-       pthread_join(fpt->thread, result);
        return 0;
 }