2 * Implements packet I/O in a consumer pthread.
3 * Copyright (C) 2017 Cumulus Networks
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; see the file COPYING; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston,
22 #include <pthread.h> // for pthread_mutex_unlock, pthread_mutex_lock
24 #include "frr_pthread.h" // for frr_pthread_get, frr_pthread
25 #include "linklist.h" // for list_delete, list_delete_all_node, lis...
26 #include "log.h" // for zlog_debug, safe_strerror, zlog_err
27 #include "memory.h" // for MTYPE_TMP, XCALLOC, XFREE
28 #include "network.h" // for ERRNO_IO_RETRY
29 #include "stream.h" // for stream_get_endp, stream_getw_from, str...
30 #include "thread.h" // for THREAD_OFF, THREAD_ARG, thread, thread...
31 #include "zassert.h" // for assert
33 #include "bgpd/bgp_io.h"
34 #include "bgpd/bgp_debug.h" // for bgp_debug_neighbor_events, bgp_type_str
35 #include "bgpd/bgp_fsm.h" // for BGP_EVENT_ADD, bgp_event
36 #include "bgpd/bgp_packet.h" // for bgp_notify_send_with_data, bgp_notify...
37 #include "bgpd/bgpd.h" // for peer, BGP_MARKER_SIZE, bgp_master, bm
39 /* forward declarations */
40 static uint16_t bgp_write(struct peer
*);
41 static uint16_t bgp_read(struct peer
*);
42 static int bgp_process_writes(struct thread
*);
43 static int bgp_process_reads(struct thread
*);
44 static bool validate_header(struct peer
*);
46 /* generic i/o status codes */
47 #define BGP_IO_TRANS_ERR (1 << 1) // EAGAIN or similar occurred
48 #define BGP_IO_FATAL_ERR (1 << 2) // some kind of fatal TCP error
50 /* bgp_read() status codes */
51 #define BGP_IO_READ_HEADER (1 << 3) // when read a full packet header
52 #define BGP_IO_READ_FULLPACKET (1 << 4) // read a full packet
54 /* Start and stop routines for I/O pthread + control variables
55 * ------------------------------------------------------------------------ */
56 bool bgp_packet_write_thread_run
= false;
57 pthread_mutex_t
*work_mtx
;
59 static struct list
*read_cancel
;
60 static struct list
*write_cancel
;
64 work_mtx
= XCALLOC(MTYPE_TMP
, sizeof(pthread_mutex_t
));
65 pthread_mutex_init(work_mtx
, NULL
);
67 read_cancel
= list_new();
68 write_cancel
= list_new();
71 void *bgp_io_start(void *arg
)
73 struct frr_pthread
*fpt
= frr_pthread_get(PTHREAD_IO
);
75 // we definitely don't want to handle signals
76 fpt
->master
->handle_signals
= false;
78 bgp_packet_write_thread_run
= true;
81 while (bgp_packet_write_thread_run
) {
82 if (thread_fetch(fpt
->master
, &task
)) {
83 pthread_mutex_lock(work_mtx
);
86 struct peer
*peer
= THREAD_ARG(&task
);
87 if ((task
.func
== bgp_process_reads
88 && listnode_lookup(read_cancel
, peer
))
89 || (task
.func
== bgp_process_writes
90 && listnode_lookup(write_cancel
, peer
)))
93 list_delete_all_node(write_cancel
);
94 list_delete_all_node(read_cancel
);
99 pthread_mutex_unlock(work_mtx
);
106 int bgp_io_stop(void **result
, struct frr_pthread
*fpt
)
108 fpt
->master
->spin
= false;
109 bgp_packet_write_thread_run
= false;
110 pthread_kill(fpt
->thread
, SIGINT
);
111 pthread_join(fpt
->thread
, result
);
113 pthread_mutex_unlock(work_mtx
);
114 pthread_mutex_destroy(work_mtx
);
116 list_delete(read_cancel
);
117 list_delete(write_cancel
);
118 XFREE(MTYPE_TMP
, work_mtx
);
121 /* ------------------------------------------------------------------------ */
123 void bgp_writes_on(struct peer
*peer
)
125 assert(peer
->status
!= Deleted
);
128 assert(peer
->ibuf_work
);
129 assert(!peer
->t_connect_check
);
132 struct frr_pthread
*fpt
= frr_pthread_get(PTHREAD_IO
);
134 pthread_mutex_lock(work_mtx
);
136 listnode_delete(write_cancel
, peer
);
137 thread_add_write(fpt
->master
, bgp_process_writes
, peer
,
138 peer
->fd
, &peer
->t_write
);
139 SET_FLAG(peer
->thread_flags
, PEER_THREAD_WRITES_ON
);
141 pthread_mutex_unlock(work_mtx
);
144 void bgp_writes_off(struct peer
*peer
)
146 pthread_mutex_lock(work_mtx
);
148 THREAD_OFF(peer
->t_write
);
149 THREAD_OFF(peer
->t_generate_updgrp_packets
);
150 listnode_add(write_cancel
, peer
);
152 // peer access by us after this point will result in pain
153 UNSET_FLAG(peer
->thread_flags
, PEER_THREAD_WRITES_ON
);
155 pthread_mutex_unlock(work_mtx
);
156 /* upon return, i/o thread must not access the peer */
159 void bgp_reads_on(struct peer
*peer
)
161 assert(peer
->status
!= Deleted
);
164 assert(peer
->ibuf_work
);
165 assert(stream_get_endp(peer
->ibuf_work
) == 0);
167 assert(!peer
->t_connect_check
);
170 struct frr_pthread
*fpt
= frr_pthread_get(PTHREAD_IO
);
172 pthread_mutex_lock(work_mtx
);
174 listnode_delete(read_cancel
, peer
);
175 thread_add_read(fpt
->master
, bgp_process_reads
, peer
, peer
->fd
,
177 SET_FLAG(peer
->thread_flags
, PEER_THREAD_READS_ON
);
179 pthread_mutex_unlock(work_mtx
);
182 void bgp_reads_off(struct peer
*peer
)
184 pthread_mutex_lock(work_mtx
);
186 THREAD_OFF(peer
->t_read
);
187 THREAD_OFF(peer
->t_process_packet
);
188 listnode_add(read_cancel
, peer
);
190 // peer access by us after this point will result in pain
191 UNSET_FLAG(peer
->thread_flags
, PEER_THREAD_READS_ON
);
193 pthread_mutex_unlock(work_mtx
);
197 * Called from PTHREAD_IO when select() or poll() determines that the file
198 * descriptor is ready to be written to.
200 static int bgp_process_writes(struct thread
*thread
)
202 static struct peer
*peer
;
203 peer
= THREAD_ARG(thread
);
209 struct frr_pthread
*fpt
= frr_pthread_get(PTHREAD_IO
);
212 pthread_mutex_lock(&peer
->io_mtx
);
214 status
= bgp_write(peer
);
215 reschedule
= (stream_fifo_head(peer
->obuf
) != NULL
);
217 pthread_mutex_unlock(&peer
->io_mtx
);
219 if (CHECK_FLAG(status
, BGP_IO_TRANS_ERR
)) { /* no problem */
222 if (CHECK_FLAG(status
, BGP_IO_FATAL_ERR
))
223 reschedule
= 0; // problem
226 thread_add_write(fpt
->master
, bgp_process_writes
, peer
,
227 peer
->fd
, &peer
->t_write
);
228 thread_add_background(bm
->master
, bgp_generate_updgrp_packets
,
230 &peer
->t_generate_updgrp_packets
);
237 * Called from PTHREAD_IO when select() or poll() determines that the file
238 * descriptor is ready to be read from.
240 static int bgp_process_reads(struct thread
*thread
)
242 static struct peer
*peer
;
243 peer
= THREAD_ARG(thread
);
249 struct frr_pthread
*fpt
= frr_pthread_get(PTHREAD_IO
);
251 bool reschedule
= true;
254 pthread_mutex_lock(&peer
->io_mtx
);
256 status
= bgp_read(peer
);
258 pthread_mutex_unlock(&peer
->io_mtx
);
260 // check results of read
261 bool header_valid
= true;
263 if (CHECK_FLAG(status
, BGP_IO_TRANS_ERR
)) { /* no problem */
266 if (CHECK_FLAG(status
, BGP_IO_FATAL_ERR
))
267 reschedule
= false; // problem
269 if (CHECK_FLAG(status
, BGP_IO_READ_HEADER
)) {
270 header_valid
= validate_header(peer
);
272 bgp_size_t packetsize
=
273 MIN((int)stream_get_endp(peer
->ibuf_work
),
274 BGP_MAX_PACKET_SIZE
);
275 memcpy(peer
->last_reset_cause
, peer
->ibuf_work
->data
,
277 peer
->last_reset_cause_size
= packetsize
;
278 // We're tearing the session down, no point in
280 // Additionally, bgp_read() will use the TLV if it's
282 // determine how much to read; if this is corrupt, we'll
289 // if we read a full packet, push it onto peer->ibuf, reset our WiP
291 // and schedule a job to process it on the main thread
292 if (header_valid
&& CHECK_FLAG(status
, BGP_IO_READ_FULLPACKET
)) {
293 pthread_mutex_lock(&peer
->io_mtx
);
295 stream_fifo_push(peer
->ibuf
,
296 stream_dup(peer
->ibuf_work
));
298 pthread_mutex_unlock(&peer
->io_mtx
);
299 stream_reset(peer
->ibuf_work
);
300 assert(stream_get_endp(peer
->ibuf_work
) == 0);
302 thread_add_background(bm
->master
, bgp_process_packet
, peer
, 0,
303 &peer
->t_process_packet
);
307 thread_add_read(fpt
->master
, bgp_process_reads
, peer
, peer
->fd
,
314 * Flush peer output buffer.
316 * This function pops packets off of peer->obuf and writes them to peer->fd.
317 * The amount of packets written is equal to the minimum of peer->wpkt_quanta
318 * and the number of packets on the output buffer, unless an error occurs.
320 * If write() returns an error, the appropriate FSM event is generated.
322 * The return value is equal to the number of packets written
323 * (which may be zero).
325 static uint16_t bgp_write(struct peer
*peer
)
330 int update_last_write
= 0;
331 unsigned int count
= 0;
335 while (count
< peer
->bgp
->wpkt_quanta
336 && (s
= stream_fifo_head(peer
->obuf
))) {
339 writenum
= stream_get_endp(s
) - stream_get_getp(s
);
340 num
= write(peer
->fd
, STREAM_PNT(s
), writenum
);
343 if (!ERRNO_IO_RETRY(errno
)) {
344 BGP_EVENT_ADD(peer
, TCP_fatal_error
);
345 SET_FLAG(status
, BGP_IO_FATAL_ERR
);
347 SET_FLAG(status
, BGP_IO_TRANS_ERR
);
351 } else if (num
!= writenum
) // incomplete write
352 stream_forward_getp(s
, num
);
354 } while (num
!= writenum
);
356 /* Retrieve BGP packet type. */
357 stream_set_getp(s
, BGP_MARKER_SIZE
+ 2);
358 type
= stream_getc(s
);
369 /* Double start timer. */
372 /* Overflow check. */
373 if (peer
->v_start
>= (60 * 2))
374 peer
->v_start
= (60 * 2);
376 /* Handle Graceful Restart case where the state changes
378 Connect instead of Idle */
379 /* Flush any existing events */
380 BGP_EVENT_ADD(peer
, BGP_Stop
);
383 case BGP_MSG_KEEPALIVE
:
384 peer
->keepalive_out
++;
386 case BGP_MSG_ROUTE_REFRESH_NEW
:
387 case BGP_MSG_ROUTE_REFRESH_OLD
:
390 case BGP_MSG_CAPABILITY
:
391 peer
->dynamic_cap_out
++;
397 stream_free(stream_fifo_pop(peer
->obuf
));
398 update_last_write
= 1;
402 /* Update last_update if UPDATEs were written. */
403 if (peer
->update_out
> oc
)
404 peer
->last_update
= bgp_clock();
406 /* If we TXed any flavor of packet update last_write */
407 if (update_last_write
)
408 peer
->last_write
= bgp_clock();
415 * Reads <= 1 packet worth of data from peer->fd into peer->ibuf_work.
417 * @return whether a full packet was read
419 static uint16_t bgp_read(struct peer
*peer
)
421 int readsize
; // how many bytes we want to read
422 int nbytes
; // how many bytes we actually read
423 bool have_header
= false;
426 if (stream_get_endp(peer
->ibuf_work
) < BGP_HEADER_SIZE
)
427 readsize
= BGP_HEADER_SIZE
- stream_get_endp(peer
->ibuf_work
);
429 // retrieve packet length from tlv and compute # bytes we still
432 stream_getw_from(peer
->ibuf_work
, BGP_MARKER_SIZE
);
433 readsize
= mlen
- stream_get_endp(peer
->ibuf_work
);
437 nbytes
= stream_read_try(peer
->ibuf_work
, peer
->fd
, readsize
);
439 if (nbytes
<= 0) // handle errors
442 case -1: // fatal error; tear down the session
443 zlog_err("%s [Error] bgp_read_packet error: %s",
444 peer
->host
, safe_strerror(errno
));
446 if (peer
->status
== Established
) {
447 if (CHECK_FLAG(peer
->sflags
,
448 PEER_STATUS_NSF_MODE
)) {
450 PEER_DOWN_NSF_CLOSE_SESSION
;
451 SET_FLAG(peer
->sflags
,
452 PEER_STATUS_NSF_WAIT
);
455 PEER_DOWN_CLOSE_SESSION
;
458 BGP_EVENT_ADD(peer
, TCP_fatal_error
);
459 SET_FLAG(status
, BGP_IO_FATAL_ERR
);
462 case 0: // TCP session closed
463 if (bgp_debug_neighbor_events(peer
))
465 "%s [Event] BGP connection closed fd %d",
466 peer
->host
, peer
->fd
);
468 if (peer
->status
== Established
) {
469 if (CHECK_FLAG(peer
->sflags
,
470 PEER_STATUS_NSF_MODE
)) {
472 PEER_DOWN_NSF_CLOSE_SESSION
;
473 SET_FLAG(peer
->sflags
,
474 PEER_STATUS_NSF_WAIT
);
477 PEER_DOWN_CLOSE_SESSION
;
480 BGP_EVENT_ADD(peer
, TCP_connection_closed
);
481 SET_FLAG(status
, BGP_IO_FATAL_ERR
);
484 case -2: // temporary error; come back later
485 SET_FLAG(status
, BGP_IO_TRANS_ERR
);
494 // If we didn't have the header before read(), and now we do, set the
495 // appropriate flag. The caller must validate the header for us.
497 && stream_get_endp(peer
->ibuf_work
) >= BGP_HEADER_SIZE
) {
498 SET_FLAG(status
, BGP_IO_READ_HEADER
);
501 // If we read the # of bytes specified in the tlv, we have read a full
504 // Note that the header may not have been validated here. This flag
506 // ONLY that we read the # of bytes specified in the header; if the
508 // not valid, the packet MUST NOT be processed further.
509 if (have_header
&& (stream_getw_from(peer
->ibuf_work
, BGP_MARKER_SIZE
)
510 == stream_get_endp(peer
->ibuf_work
)))
511 SET_FLAG(status
, BGP_IO_READ_FULLPACKET
);
517 * Called after we have read a BGP packet header. Validates marker, message
518 * type and packet length. If any of these aren't correct, sends a notify.
520 static bool validate_header(struct peer
*peer
)
522 u_int16_t size
, type
;
524 static uint8_t marker
[BGP_MARKER_SIZE
] = {
525 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
526 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
528 if (memcmp(marker
, peer
->ibuf_work
->data
, BGP_MARKER_SIZE
) != 0) {
529 bgp_notify_send(peer
, BGP_NOTIFY_HEADER_ERR
,
530 BGP_NOTIFY_HEADER_NOT_SYNC
);
534 /* Get size and type. */
535 size
= stream_getw_from(peer
->ibuf_work
, BGP_MARKER_SIZE
);
536 type
= stream_getc_from(peer
->ibuf_work
, BGP_MARKER_SIZE
+ 2);
538 /* BGP type check. */
539 if (type
!= BGP_MSG_OPEN
&& type
!= BGP_MSG_UPDATE
540 && type
!= BGP_MSG_NOTIFY
&& type
!= BGP_MSG_KEEPALIVE
541 && type
!= BGP_MSG_ROUTE_REFRESH_NEW
542 && type
!= BGP_MSG_ROUTE_REFRESH_OLD
543 && type
!= BGP_MSG_CAPABILITY
) {
544 if (bgp_debug_neighbor_events(peer
))
545 zlog_debug("%s unknown message type 0x%02x", peer
->host
,
548 bgp_notify_send_with_data(peer
, BGP_NOTIFY_HEADER_ERR
,
549 BGP_NOTIFY_HEADER_BAD_MESTYPE
,
554 /* Mimimum packet length check. */
555 if ((size
< BGP_HEADER_SIZE
) || (size
> BGP_MAX_PACKET_SIZE
)
556 || (type
== BGP_MSG_OPEN
&& size
< BGP_MSG_OPEN_MIN_SIZE
)
557 || (type
== BGP_MSG_UPDATE
&& size
< BGP_MSG_UPDATE_MIN_SIZE
)
558 || (type
== BGP_MSG_NOTIFY
&& size
< BGP_MSG_NOTIFY_MIN_SIZE
)
559 || (type
== BGP_MSG_KEEPALIVE
&& size
!= BGP_MSG_KEEPALIVE_MIN_SIZE
)
560 || (type
== BGP_MSG_ROUTE_REFRESH_NEW
561 && size
< BGP_MSG_ROUTE_REFRESH_MIN_SIZE
)
562 || (type
== BGP_MSG_ROUTE_REFRESH_OLD
563 && size
< BGP_MSG_ROUTE_REFRESH_MIN_SIZE
)
564 || (type
== BGP_MSG_CAPABILITY
565 && size
< BGP_MSG_CAPABILITY_MIN_SIZE
)) {
566 if (bgp_debug_neighbor_events(peer
))
567 zlog_debug("%s bad message length - %d for %s",
569 type
== 128 ? "ROUTE-REFRESH"
570 : bgp_type_str
[(int)type
]);
572 bgp_notify_send_with_data(peer
, BGP_NOTIFY_HEADER_ERR
,
573 BGP_NOTIFY_HEADER_BAD_MESLEN
,