]> git.proxmox.com Git - mirror_frr.git/blobdiff - bgpd/bgp_io.c
bgpd: update last_update whenever obuf sent
[mirror_frr.git] / bgpd / bgp_io.c
index 581973d37ebefd534a7114d43a7019610cbb5434..f4bfc90b7e37adb8b7e0c719b6d736cfc95e0479 100644 (file)
@@ -1,6 +1,7 @@
 /* BGP I/O.
- * Implements packet I/O in a consumer pthread.
+ * Implements packet I/O in a pthread.
  * Copyright (C) 2017  Cumulus Networks
+ * Quentin Young
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
  * MA 02110-1301 USA
  */
 
+/* clang-format off */
 #include <zebra.h>
-#include <pthread.h> // for pthread_mutex_unlock, pthread_mutex_lock
-
-#include "frr_pthread.h" // for frr_pthread_get, frr_pthread
-#include "linklist.h"    // for list_delete, list_delete_all_node, lis...
-#include "log.h"        // for zlog_debug, safe_strerror, zlog_err
-#include "memory.h"      // for MTYPE_TMP, XCALLOC, XFREE
-#include "network.h"     // for ERRNO_IO_RETRY
-#include "stream.h"      // for stream_get_endp, stream_getw_from, str...
-#include "thread.h"      // for THREAD_OFF, THREAD_ARG, thread, thread...
-#include "zassert.h"     // for assert
+#include <pthread.h>           // for pthread_mutex_unlock, pthread_mutex_lock
+
+#include "frr_pthread.h"       // for frr_pthread_get, frr_pthread
+#include "linklist.h"          // for list_delete, list_delete_all_node, lis...
+#include "log.h"               // for zlog_debug, safe_strerror, zlog_err
+#include "memory.h"            // for MTYPE_TMP, XCALLOC, XFREE
+#include "network.h"           // for ERRNO_IO_RETRY
+#include "stream.h"            // for stream_get_endp, stream_getw_from, str...
+#include "ringbuf.h"           // for ringbuf_remain, ringbuf_peek, ringbuf_...
+#include "thread.h"            // for THREAD_OFF, THREAD_ARG, thread, thread...
+#include "zassert.h"           // for assert
 
 #include "bgpd/bgp_io.h"
-#include "bgpd/bgp_debug.h"  // for bgp_debug_neighbor_events, bgp_type_str
-#include "bgpd/bgp_fsm.h"    // for BGP_EVENT_ADD, bgp_event
-#include "bgpd/bgp_packet.h" // for bgp_notify_send_with_data, bgp_notify...
-#include "bgpd/bgpd.h"       // for peer, BGP_MARKER_SIZE, bgp_master, bm
+#include "bgpd/bgp_debug.h"    // for bgp_debug_neighbor_events, bgp_type_str
+#include "bgpd/bgp_fsm.h"      // for BGP_EVENT_ADD, bgp_event
+#include "bgpd/bgp_packet.h"   // for bgp_notify_send_with_data, bgp_notify...
+#include "bgpd/bgpd.h"         // for peer, BGP_MARKER_SIZE, bgp_master, bm
+/* clang-format on */
 
 /* forward declarations */
 static uint16_t bgp_write(struct peer *);
@@ -44,18 +48,39 @@ static int bgp_process_reads(struct thread *);
 static bool validate_header(struct peer *);
 
 /* generic i/o status codes */
-#define BGP_IO_TRANS_ERR        (1 << 0) // EAGAIN or similar occurred
-#define BGP_IO_FATAL_ERR        (1 << 1) // some kind of fatal TCP error
+#define BGP_IO_TRANS_ERR (1 << 0) // EAGAIN or similar occurred
+#define BGP_IO_FATAL_ERR (1 << 1) // some kind of fatal TCP error
 
-/* Start and stop routines for I/O pthread + control variables
+/* Plumbing & control variables for thread lifecycle
  * ------------------------------------------------------------------------ */
-_Atomic bool bgp_io_thread_run;
-_Atomic bool bgp_io_thread_started;
+bool bgp_io_thread_run;
+pthread_mutex_t *running_cond_mtx;
+pthread_cond_t *running_cond;
+
+/* Unused callback for thread_add_read() */
+static int bgp_io_dummy(struct thread *thread) { return 0; }
+
+/* Poison pill task */
+static int bgp_io_finish(struct thread *thread)
+{
+       bgp_io_thread_run = false;
+       return 0;
+}
 
+/* Extern lifecycle control functions. init -> start -> stop
+ * ------------------------------------------------------------------------ */
 void bgp_io_init()
 {
        bgp_io_thread_run = false;
-       bgp_io_thread_started = false;
+
+       running_cond_mtx = XCALLOC(MTYPE_PTHREAD_PRIM, sizeof(pthread_mutex_t));
+       running_cond = XCALLOC(MTYPE_PTHREAD_PRIM, sizeof(pthread_cond_t));
+
+       pthread_mutex_init(running_cond_mtx, NULL);
+       pthread_cond_init(running_cond, NULL);
+
+       /* unlocked in bgp_io_wait_running() */
+       pthread_mutex_lock(running_cond_mtx);
 }
 
 void *bgp_io_start(void *arg)
@@ -63,14 +88,22 @@ void *bgp_io_start(void *arg)
        struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
        fpt->master->owner = pthread_self();
 
+       // fd so we can sleep in poll()
+       int sleeper[2];
+       pipe(sleeper);
+       thread_add_read(fpt->master, &bgp_io_dummy, NULL, sleeper[0], NULL);
+
        // we definitely don't want to handle signals
        fpt->master->handle_signals = false;
 
        struct thread task;
 
-       atomic_store_explicit(&bgp_io_thread_run, true, memory_order_relaxed);
-       atomic_store_explicit(&bgp_io_thread_started, true,
-                             memory_order_relaxed);
+       pthread_mutex_lock(running_cond_mtx);
+       {
+               bgp_io_thread_run = true;
+               pthread_cond_signal(running_cond);
+       }
+       pthread_mutex_unlock(running_cond_mtx);
 
        while (bgp_io_thread_run) {
                if (thread_fetch(fpt->master, &task)) {
@@ -78,34 +111,47 @@ void *bgp_io_start(void *arg)
                }
        }
 
+       close(sleeper[1]);
+       close(sleeper[0]);
+
        return NULL;
 }
 
-int bgp_io_stop(void **result, struct frr_pthread *fpt)
+void bgp_io_wait_running()
 {
+       while (!bgp_io_thread_run)
+               pthread_cond_wait(running_cond, running_cond_mtx);
 
-       bgp_io_thread_run = false;
-       /* let the loop break */
-       fpt->master->spin = false;
-       /* break poll */
-       pthread_kill(fpt->thread, SIGINT);
+       /* locked in bgp_io_init() */
+       pthread_mutex_unlock(running_cond_mtx);
+}
+
+int bgp_io_stop(void **result, struct frr_pthread *fpt)
+{
+       thread_add_event(fpt->master, &bgp_io_finish, NULL, 0, NULL);
        pthread_join(fpt->thread, result);
 
+       pthread_mutex_destroy(running_cond_mtx);
+       pthread_cond_destroy(running_cond);
+
+       XFREE(MTYPE_PTHREAD_PRIM, running_cond_mtx);
+       XFREE(MTYPE_PTHREAD_PRIM, running_cond);
+
        return 0;
 }
-/* ------------------------------------------------------------------------ */
+
+/* Extern API -------------------------------------------------------------- */
 
 void bgp_writes_on(struct peer *peer)
 {
-       while (!atomic_load_explicit(&bgp_io_thread_started,
-                                    memory_order_relaxed))
-               ;
+       assert(bgp_io_thread_run);
 
        assert(peer->status != Deleted);
        assert(peer->obuf);
        assert(peer->ibuf);
        assert(peer->ibuf_work);
-       assert(!peer->t_connect_check);
+       assert(!peer->t_connect_check_r);
+       assert(!peer->t_connect_check_w);
        assert(peer->fd);
 
        struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
@@ -117,9 +163,7 @@ void bgp_writes_on(struct peer *peer)
 
 void bgp_writes_off(struct peer *peer)
 {
-       while (!atomic_load_explicit(&bgp_io_thread_started,
-                                    memory_order_relaxed))
-               ;
+       assert(bgp_io_thread_run);
 
        struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
 
@@ -131,17 +175,15 @@ void bgp_writes_off(struct peer *peer)
 
 void bgp_reads_on(struct peer *peer)
 {
-       while (!atomic_load_explicit(&bgp_io_thread_started,
-                                    memory_order_relaxed))
-               ;
+       assert(bgp_io_thread_run);
 
        assert(peer->status != Deleted);
        assert(peer->ibuf);
        assert(peer->fd);
        assert(peer->ibuf_work);
-       assert(stream_get_endp(peer->ibuf_work) == 0);
        assert(peer->obuf);
-       assert(!peer->t_connect_check);
+       assert(!peer->t_connect_check_r);
+       assert(!peer->t_connect_check_w);
        assert(peer->fd);
 
        struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
@@ -154,9 +196,7 @@ void bgp_reads_on(struct peer *peer)
 
 void bgp_reads_off(struct peer *peer)
 {
-       while (!atomic_load_explicit(&bgp_io_thread_started,
-                                    memory_order_relaxed))
-               ;
+       assert(bgp_io_thread_run);
 
        struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
 
@@ -166,9 +206,10 @@ void bgp_reads_off(struct peer *peer)
        UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
 }
 
+/* Internal functions ------------------------------------------------------- */
+
 /**
- * Called from PTHREAD_IO when select() or poll() determines that the file
- * descriptor is ready to be written to.
+ * Called from I/O pthread when a file descriptor has become ready for writing.
  */
 static int bgp_process_writes(struct thread *thread)
 {
@@ -176,6 +217,7 @@ static int bgp_process_writes(struct thread *thread)
        peer = THREAD_ARG(thread);
        uint16_t status;
        bool reschedule;
+       bool fatal = false;
 
        if (peer->fd < 0)
                return -1;
@@ -192,35 +234,39 @@ static int bgp_process_writes(struct thread *thread)
        if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */
        }
 
-       if (CHECK_FLAG(status, BGP_IO_FATAL_ERR))
+       if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) {
                reschedule = false; /* problem */
+               fatal = true;
+       }
 
        if (reschedule) {
                thread_add_write(fpt->master, bgp_process_writes, peer,
                                 peer->fd, &peer->t_write);
-               thread_add_timer_msec(bm->master, bgp_generate_updgrp_packets,
-                                     peer, 0,
-                                     &peer->t_generate_updgrp_packets);
+       } else if (!fatal) {
+               BGP_TIMER_ON(peer->t_generate_updgrp_packets,
+                            bgp_generate_updgrp_packets, 0);
        }
 
        return 0;
 }
 
 /**
- * Called from PTHREAD_IO when select() or poll() determines that the file
- * descriptor is ready to be read from.
+ * Called from I/O pthread when a file descriptor has become ready for reading,
+ * or has hung up.
  *
  * We read as much data as possible, process as many packets as we can and
  * place them on peer->ibuf for secondary processing by the main thread.
  */
 static int bgp_process_reads(struct thread *thread)
 {
-       static struct peer *peer; // peer to read from
-       uint16_t status;          // bgp_read status code
-       bool more = true;        // whether we got more data
-       bool fatal = false;       // whether fatal error occurred
-       bool added_pkt = false;   // whether we pushed onto ->ibuf
-       bool header_valid = true; // whether header is valid
+       /* clang-format off */
+       static struct peer *peer;       // peer to read from
+       uint16_t status;                // bgp_read status code
+       bool more = true;               // whether we got more data
+       bool fatal = false;             // whether fatal error occurred
+       bool added_pkt = false;         // whether we pushed onto ->ibuf
+       bool header_valid = true;       // whether header is valid
+       /* clang-format on */
 
        peer = THREAD_ARG(thread);
 
@@ -251,14 +297,12 @@ static int bgp_process_reads(struct thread *thread)
                /* static buffer for transferring packets */
                static unsigned char pktbuf[BGP_MAX_PACKET_SIZE];
                /* shorter alias to peer's input buffer */
-               struct stream *ibw = peer->ibuf_work;
-               /* offset of start of current packet */
-               size_t offset = stream_get_getp(ibw);
+               struct ringbuf *ibw = peer->ibuf_work;
                /* packet size as given by header */
-               u_int16_t pktsize = 0;
+               uint16_t pktsize = 0;
 
                /* check that we have enough data for a header */
-               if (STREAM_READABLE(ibw) < BGP_HEADER_SIZE)
+               if (ringbuf_remain(ibw) < BGP_HEADER_SIZE)
                        break;
 
                /* validate header */
@@ -270,16 +314,18 @@ static int bgp_process_reads(struct thread *thread)
                }
 
                /* header is valid; retrieve packet size */
-               pktsize = stream_getw_from(ibw, offset + BGP_MARKER_SIZE);
+               ringbuf_peek(ibw, BGP_MARKER_SIZE, &pktsize, sizeof(pktsize));
+
+               pktsize = ntohs(pktsize);
 
                /* if this fails we are seriously screwed */
                assert(pktsize <= BGP_MAX_PACKET_SIZE);
 
                /* If we have that much data, chuck it into its own
                 * stream and append to input queue for processing. */
-               if (STREAM_READABLE(ibw) >= pktsize) {
+               if (ringbuf_remain(ibw) >= pktsize) {
                        struct stream *pkt = stream_new(pktsize);
-                       stream_get(pktbuf, ibw, pktsize);
+                       assert(ringbuf_get(ibw, pktbuf, pktsize) == pktsize);
                        stream_put(pkt, pktbuf, pktsize);
 
                        pthread_mutex_lock(&peer->io_mtx);
@@ -293,40 +339,18 @@ static int bgp_process_reads(struct thread *thread)
                        break;
        }
 
-       /* After reading:
-        * 1. Move unread data to stream start to make room for more.
-        * 2. Reschedule and return when we have additional data.
-        *
-        * XXX: Heavy abuse of stream API. This needs a ring buffer.
-        */
-       if (more && STREAM_WRITEABLE(peer->ibuf_work) < BGP_MAX_PACKET_SIZE) {
-               void *from = stream_pnt(peer->ibuf_work);
-               void *to = peer->ibuf_work->data;
-               size_t siz = STREAM_READABLE(peer->ibuf_work);
-               memmove(to, from, siz);
-               stream_set_getp(peer->ibuf_work, 0);
-               stream_set_endp(peer->ibuf_work, siz);
-       }
-
-       assert(STREAM_WRITEABLE(peer->ibuf_work) >= BGP_MAX_PACKET_SIZE);
+       assert(ringbuf_space(peer->ibuf_work) >= BGP_MAX_PACKET_SIZE);
 
        /* handle invalid header */
        if (fatal) {
-               if (!header_valid) {
-                       bgp_size_t pktsize = BGP_HEADER_SIZE;
-                       stream_get(peer->last_reset_cause, peer->ibuf_work,
-                                  pktsize);
-                       peer->last_reset_cause_size = pktsize;
-               }
-
                /* wipe buffer just in case someone screwed up */
-               stream_reset(peer->ibuf_work);
+               ringbuf_wipe(peer->ibuf_work);
        } else {
                thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
                                &peer->t_read);
                if (added_pkt)
-                       thread_add_event(bm->master, bgp_process_packet, peer,
-                                        0, NULL);
+                       thread_add_timer_msec(bm->master, bgp_process_packet,
+                                             peer, 0, &peer->t_process_packet);
        }
 
        return 0;
@@ -351,13 +375,13 @@ static uint16_t bgp_write(struct peer *peer)
        int num;
        int update_last_write = 0;
        unsigned int count = 0;
-       unsigned int oc = 0;
+       uint32_t uo = 0;
        uint16_t status = 0;
        uint32_t wpkt_quanta_old;
 
        // cache current write quanta
-       wpkt_quanta_old = atomic_load_explicit(&peer->bgp->wpkt_quanta,
-                                              memory_order_relaxed);
+       wpkt_quanta_old =
+           atomic_load_explicit(&peer->bgp->wpkt_quanta, memory_order_relaxed);
 
        while (count < wpkt_quanta_old && (s = stream_fifo_head(peer->obuf))) {
                int writenum;
@@ -385,13 +409,17 @@ static uint16_t bgp_write(struct peer *peer)
 
                switch (type) {
                case BGP_MSG_OPEN:
-                       peer->open_out++;
+                       atomic_fetch_add_explicit(&peer->open_out, 1,
+                                                 memory_order_relaxed);
                        break;
                case BGP_MSG_UPDATE:
-                       peer->update_out++;
+                       atomic_fetch_add_explicit(&peer->update_out, 1,
+                                                 memory_order_relaxed);
+                       uo++;
                        break;
                case BGP_MSG_NOTIFY:
-                       peer->notify_out++;
+                       atomic_fetch_add_explicit(&peer->notify_out, 1,
+                                                 memory_order_relaxed);
                        /* Double start timer. */
                        peer->v_start *= 2;
 
@@ -400,21 +428,22 @@ static uint16_t bgp_write(struct peer *peer)
                                peer->v_start = (60 * 2);
 
                        /* Handle Graceful Restart case where the state changes
-                          to
-                          Connect instead of Idle */
-                       /* Flush any existing events */
+                        * to Connect instead of Idle */
                        BGP_EVENT_ADD(peer, BGP_Stop);
                        goto done;
 
                case BGP_MSG_KEEPALIVE:
-                       peer->keepalive_out++;
+                       atomic_fetch_add_explicit(&peer->keepalive_out, 1,
+                                                 memory_order_relaxed);
                        break;
                case BGP_MSG_ROUTE_REFRESH_NEW:
                case BGP_MSG_ROUTE_REFRESH_OLD:
-                       peer->refresh_out++;
+                       atomic_fetch_add_explicit(&peer->refresh_out, 1,
+                                                 memory_order_relaxed);
                        break;
                case BGP_MSG_CAPABILITY:
-                       peer->dynamic_cap_out++;
+                       atomic_fetch_add_explicit(&peer->dynamic_cap_out, 1,
+                                                 memory_order_relaxed);
                        break;
                }
 
@@ -425,86 +454,76 @@ static uint16_t bgp_write(struct peer *peer)
        }
 
 done : {
-       /* Update last_update if UPDATEs were written. */
-       if (peer->update_out > oc)
-               peer->last_update = bgp_clock();
+       /*
+        * Update last_update if UPDATEs were written.
+        * Note: that these are only updated at end,
+        *       not per message (i.e., per loop)
+        */
+       if (uo)
+               atomic_store_explicit(&peer->last_update, bgp_clock(),
+                                     memory_order_relaxed);
 
-       /* If we TXed any flavor of packet update last_write */
+       /* If we TXed any flavor of packet */
        if (update_last_write)
-               peer->last_write = bgp_clock();
+               atomic_store_explicit(&peer->last_write, bgp_clock(),
+                                     memory_order_relaxed);
 }
 
        return status;
 }
 
 /**
- * Reads <= 1 packet worth of data from peer->fd into peer->ibuf_work.
+ * Reads a chunk of data from peer->fd into peer->ibuf_work.
  *
- * @return whether a full packet was read
+ * @return status flag (see top-of-file)
  */
 static uint16_t bgp_read(struct peer *peer)
 {
        size_t readsize; // how many bytes we want to read
        ssize_t nbytes;  // how many bytes we actually read
        uint16_t status = 0;
+       static uint8_t ibw[BGP_MAX_PACKET_SIZE * BGP_READ_PACKET_MAX];
+
+       readsize = MIN(ringbuf_space(peer->ibuf_work), sizeof(ibw));
+       nbytes = read(peer->fd, ibw, readsize);
+
+       /* EAGAIN or EWOULDBLOCK; come back later */
+       if (nbytes < 0 && ERRNO_IO_RETRY(errno)) {
+               SET_FLAG(status, BGP_IO_TRANS_ERR);
+       /* Fatal error; tear down session */
+       } else if (nbytes < 0) {
+               zlog_err("%s [Error] bgp_read_packet error: %s", peer->host,
+                        safe_strerror(errno));
+
+               if (peer->status == Established) {
+                       if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_MODE)) {
+                               peer->last_reset = PEER_DOWN_NSF_CLOSE_SESSION;
+                               SET_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT);
+                       } else
+                               peer->last_reset = PEER_DOWN_CLOSE_SESSION;
+               }
 
-       readsize = STREAM_WRITEABLE(peer->ibuf_work);
-
-       nbytes = stream_read_try(peer->ibuf_work, peer->fd, readsize);
-
-       if (nbytes <= 0) // handle errors
-       {
-               switch (nbytes) {
-               case -1: // fatal error; tear down the session
-                       zlog_err("%s [Error] bgp_read_packet error: %s",
-                                peer->host, safe_strerror(errno));
-
-                       if (peer->status == Established) {
-                               if (CHECK_FLAG(peer->sflags,
-                                              PEER_STATUS_NSF_MODE)) {
-                                       peer->last_reset =
-                                               PEER_DOWN_NSF_CLOSE_SESSION;
-                                       SET_FLAG(peer->sflags,
-                                                PEER_STATUS_NSF_WAIT);
-                               } else
-                                       peer->last_reset =
-                                               PEER_DOWN_CLOSE_SESSION;
-                       }
-
-                       BGP_EVENT_ADD(peer, TCP_fatal_error);
-                       SET_FLAG(status, BGP_IO_FATAL_ERR);
-                       break;
-
-               case 0: // TCP session closed
-                       if (bgp_debug_neighbor_events(peer))
-                               zlog_debug(
-                                       "%s [Event] BGP connection closed fd %d",
-                                       peer->host, peer->fd);
-
-                       if (peer->status == Established) {
-                               if (CHECK_FLAG(peer->sflags,
-                                              PEER_STATUS_NSF_MODE)) {
-                                       peer->last_reset =
-                                               PEER_DOWN_NSF_CLOSE_SESSION;
-                                       SET_FLAG(peer->sflags,
-                                                PEER_STATUS_NSF_WAIT);
-                               } else
-                                       peer->last_reset =
-                                               PEER_DOWN_CLOSE_SESSION;
-                       }
-
-                       BGP_EVENT_ADD(peer, TCP_connection_closed);
-                       SET_FLAG(status, BGP_IO_FATAL_ERR);
-                       break;
-
-               case -2: // temporary error; come back later
-                       SET_FLAG(status, BGP_IO_TRANS_ERR);
-                       break;
-               default:
-                       break;
+               BGP_EVENT_ADD(peer, TCP_fatal_error);
+               SET_FLAG(status, BGP_IO_FATAL_ERR);
+       /* Received EOF / TCP session closed */
+       } else if (nbytes == 0) {
+               if (bgp_debug_neighbor_events(peer))
+                       zlog_debug("%s [Event] BGP connection closed fd %d",
+                                  peer->host, peer->fd);
+
+               if (peer->status == Established) {
+                       if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_MODE)) {
+                               peer->last_reset = PEER_DOWN_NSF_CLOSE_SESSION;
+                               SET_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT);
+                       } else
+                               peer->last_reset = PEER_DOWN_CLOSE_SESSION;
                }
 
-               return status;
+               BGP_EVENT_ADD(peer, TCP_connection_closed);
+               SET_FLAG(status, BGP_IO_FATAL_ERR);
+       } else {
+               assert(ringbuf_put(peer->ibuf_work, ibw, nbytes)
+                      == (size_t)nbytes);
        }
 
        return status;
@@ -513,26 +532,35 @@ static uint16_t bgp_read(struct peer *peer)
 /*
  * Called after we have read a BGP packet header. Validates marker, message
  * type and packet length. If any of these aren't correct, sends a notify.
+ *
+ * Assumes that there are at least BGP_HEADER_SIZE readable bytes in the input
+ * buffer.
  */
 static bool validate_header(struct peer *peer)
 {
-       u_int16_t size, type;
-       struct stream *pkt = peer->ibuf_work;
-       size_t getp = stream_get_getp(pkt);
+       uint16_t size;
+       uint8_t type;
+       struct ringbuf *pkt = peer->ibuf_work;
 
-       static uint8_t marker[BGP_MARKER_SIZE] = {
+       static uint8_t m_correct[BGP_MARKER_SIZE] = {
                0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
                0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
+       uint8_t m_rx[BGP_MARKER_SIZE] = {0x00};
+
+       if (ringbuf_peek(pkt, 0, m_rx, BGP_MARKER_SIZE) != BGP_MARKER_SIZE)
+               return false;
 
-       if (memcmp(marker, stream_pnt(pkt), BGP_MARKER_SIZE) != 0) {
+       if (memcmp(m_correct, m_rx, BGP_MARKER_SIZE) != 0) {
                bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
                                BGP_NOTIFY_HEADER_NOT_SYNC);
                return false;
        }
 
-       /* Get size and type. */
-       size = stream_getw_from(pkt, getp + BGP_MARKER_SIZE);
-       type = stream_getc_from(pkt, getp + BGP_MARKER_SIZE + 2);
+       /* Get size and type in network byte order. */
+       ringbuf_peek(pkt, BGP_MARKER_SIZE, &size, sizeof(size));
+       ringbuf_peek(pkt, BGP_MARKER_SIZE + 2, &type, sizeof(type));
+
+       size = ntohs(size);
 
        /* BGP type check. */
        if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
@@ -546,11 +574,11 @@ static bool validate_header(struct peer *peer)
 
                bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
                                          BGP_NOTIFY_HEADER_BAD_MESTYPE,
-                                         (u_char *)&type, 1);
+                                         &type, 1);
                return false;
        }
 
-       /* Mimimum packet length check. */
+       /* Minimum packet length check. */
        if ((size < BGP_HEADER_SIZE) || (size > BGP_MAX_PACKET_SIZE)
            || (type == BGP_MSG_OPEN && size < BGP_MSG_OPEN_MIN_SIZE)
            || (type == BGP_MSG_UPDATE && size < BGP_MSG_UPDATE_MIN_SIZE)
@@ -562,15 +590,18 @@ static bool validate_header(struct peer *peer)
                && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
            || (type == BGP_MSG_CAPABILITY
                && size < BGP_MSG_CAPABILITY_MIN_SIZE)) {
-               if (bgp_debug_neighbor_events(peer))
+               if (bgp_debug_neighbor_events(peer)) {
                        zlog_debug("%s bad message length - %d for %s",
                                   peer->host, size,
                                   type == 128 ? "ROUTE-REFRESH"
-                                              : bgp_type_str[(int)type]);
+                                              : bgp_type_str[(int) type]);
+               }
+
+               uint16_t nsize = htons(size);
 
                bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
                                          BGP_NOTIFY_HEADER_BAD_MESLEN,
-                                         (u_char *)&size, 2);
+                                         (unsigned char *) &nsize, 2);
                return false;
        }