]> git.proxmox.com Git - mirror_frr.git/blame - bgpd/bgp_io.c
bgpd: restyle
[mirror_frr.git] / bgpd / bgp_io.c
CommitLineData
958b450c
QY
1/* BGP I/O.
2 * Implements packet I/O in a consumer pthread.
3 * Copyright (C) 2017 Cumulus Networks
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; see the file COPYING; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston,
18 * MA 02110-1301 USA
56257a44
QY
19 */
20
95158b0c 21/* clang-format off */
42cf651e 22#include <zebra.h>
95158b0c 23#include <pthread.h> // for pthread_mutex_unlock, pthread_mutex_lock
56257a44 24
95158b0c
QY
25#include "frr_pthread.h" // for frr_pthread_get, frr_pthread
26#include "linklist.h" // for list_delete, list_delete_all_node, lis...
27#include "log.h" // for zlog_debug, safe_strerror, zlog_err
28#include "memory.h" // for MTYPE_TMP, XCALLOC, XFREE
29#include "network.h" // for ERRNO_IO_RETRY
30#include "stream.h" // for stream_get_endp, stream_getw_from, str...
31#include "thread.h" // for THREAD_OFF, THREAD_ARG, thread, thread...
32#include "zassert.h" // for assert
56257a44 33
42cf651e 34#include "bgpd/bgp_io.h"
95158b0c
QY
35#include "bgpd/bgp_debug.h" // for bgp_debug_neighbor_events, bgp_type_str
36#include "bgpd/bgp_fsm.h" // for BGP_EVENT_ADD, bgp_event
37#include "bgpd/bgp_packet.h" // for bgp_notify_send_with_data, bgp_notify...
38#include "bgpd/bgpd.h" // for peer, BGP_MARKER_SIZE, bgp_master, bm
39/* clang-format on */
56257a44 40
424ab01d
QY
41/* forward declarations */
42static uint16_t bgp_write(struct peer *);
43static uint16_t bgp_read(struct peer *);
44static int bgp_process_writes(struct thread *);
45static int bgp_process_reads(struct thread *);
46static bool validate_header(struct peer *);
56257a44 47
424ab01d 48/* generic i/o status codes */
95158b0c
QY
49#define BGP_IO_TRANS_ERR (1 << 0) // EAGAIN or similar occurred
50#define BGP_IO_FATAL_ERR (1 << 1) // some kind of fatal TCP error
56257a44 51
424ab01d 52/* Start and stop routines for I/O pthread + control variables
56257a44 53 * ------------------------------------------------------------------------ */
b750b0ba
QY
54_Atomic bool bgp_io_thread_run;
55_Atomic bool bgp_io_thread_started;
56257a44 56
424ab01d 57void bgp_io_init()
56257a44 58{
b750b0ba
QY
59 bgp_io_thread_run = false;
60 bgp_io_thread_started = false;
56257a44 61}
56257a44 62
424ab01d 63void *bgp_io_start(void *arg)
56257a44 64{
424ab01d 65 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
72bb6e33 66 fpt->master->owner = pthread_self();
424ab01d
QY
67
68 // we definitely don't want to handle signals
69 fpt->master->handle_signals = false;
70
424ab01d
QY
71 struct thread task;
72
b750b0ba
QY
73 atomic_store_explicit(&bgp_io_thread_run, true, memory_order_relaxed);
74 atomic_store_explicit(&bgp_io_thread_started, true,
75 memory_order_relaxed);
76
77 while (bgp_io_thread_run) {
424ab01d 78 if (thread_fetch(fpt->master, &task)) {
b750b0ba 79 thread_call(&task);
424ab01d
QY
80 }
81 }
56257a44 82
424ab01d 83 return NULL;
56257a44
QY
84}
85
424ab01d 86int bgp_io_stop(void **result, struct frr_pthread *fpt)
56257a44 87{
b750b0ba
QY
88
89 bgp_io_thread_run = false;
90 /* let the loop break */
424ab01d 91 fpt->master->spin = false;
b750b0ba 92 /* break poll */
424ab01d
QY
93 pthread_kill(fpt->thread, SIGINT);
94 pthread_join(fpt->thread, result);
56257a44 95
424ab01d 96 return 0;
56257a44 97}
424ab01d 98/* ------------------------------------------------------------------------ */
56257a44 99
424ab01d 100void bgp_writes_on(struct peer *peer)
56257a44 101{
b750b0ba
QY
102 while (!atomic_load_explicit(&bgp_io_thread_started,
103 memory_order_relaxed))
104 ;
105
424ab01d
QY
106 assert(peer->status != Deleted);
107 assert(peer->obuf);
108 assert(peer->ibuf);
109 assert(peer->ibuf_work);
110 assert(!peer->t_connect_check);
111 assert(peer->fd);
56257a44 112
424ab01d 113 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
56257a44 114
b750b0ba
QY
115 thread_add_write(fpt->master, bgp_process_writes, peer, peer->fd,
116 &peer->t_write);
117 SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
424ab01d 118}
56257a44 119
424ab01d
QY
120void bgp_writes_off(struct peer *peer)
121{
b750b0ba
QY
122 while (!atomic_load_explicit(&bgp_io_thread_started,
123 memory_order_relaxed))
124 ;
125
151044ce
QY
126 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
127
b750b0ba
QY
128 thread_cancel_async(fpt->master, &peer->t_write, NULL);
129 THREAD_OFF(peer->t_generate_updgrp_packets);
56257a44 130
b750b0ba 131 UNSET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
56257a44
QY
132}
133
424ab01d 134void bgp_reads_on(struct peer *peer)
56257a44 135{
b750b0ba
QY
136 while (!atomic_load_explicit(&bgp_io_thread_started,
137 memory_order_relaxed))
138 ;
139
424ab01d
QY
140 assert(peer->status != Deleted);
141 assert(peer->ibuf);
142 assert(peer->fd);
143 assert(peer->ibuf_work);
144 assert(stream_get_endp(peer->ibuf_work) == 0);
145 assert(peer->obuf);
146 assert(!peer->t_connect_check);
147 assert(peer->fd);
148
149 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
150
b750b0ba
QY
151 thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
152 &peer->t_read);
153
154 SET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
56257a44
QY
155}
156
424ab01d 157void bgp_reads_off(struct peer *peer)
56257a44 158{
b750b0ba
QY
159 while (!atomic_load_explicit(&bgp_io_thread_started,
160 memory_order_relaxed))
161 ;
162
151044ce
QY
163 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
164
b750b0ba
QY
165 thread_cancel_async(fpt->master, &peer->t_read, NULL);
166 THREAD_OFF(peer->t_process_packet);
56257a44 167
b750b0ba 168 UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
56257a44
QY
169}
170
424ab01d
QY
171/**
172 * Called from PTHREAD_IO when select() or poll() determines that the file
173 * descriptor is ready to be written to.
174 */
175static int bgp_process_writes(struct thread *thread)
56257a44 176{
424ab01d
QY
177 static struct peer *peer;
178 peer = THREAD_ARG(thread);
179 uint16_t status;
b750b0ba 180 bool reschedule;
424ab01d
QY
181
182 if (peer->fd < 0)
183 return -1;
184
185 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
186
424ab01d 187 pthread_mutex_lock(&peer->io_mtx);
56257a44 188 {
424ab01d
QY
189 status = bgp_write(peer);
190 reschedule = (stream_fifo_head(peer->obuf) != NULL);
191 }
192 pthread_mutex_unlock(&peer->io_mtx);
56257a44 193
424ab01d 194 if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */
56257a44 195 }
56257a44 196
424ab01d 197 if (CHECK_FLAG(status, BGP_IO_FATAL_ERR))
b750b0ba 198 reschedule = false; /* problem */
424ab01d
QY
199
200 if (reschedule) {
201 thread_add_write(fpt->master, bgp_process_writes, peer,
202 peer->fd, &peer->t_write);
a9794991 203 thread_add_timer_msec(bm->master, bgp_generate_updgrp_packets,
424ab01d
QY
204 peer, 0,
205 &peer->t_generate_updgrp_packets);
206 }
207
208 return 0;
56257a44
QY
209}
210
211/**
424ab01d
QY
212 * Called from PTHREAD_IO when select() or poll() determines that the file
213 * descriptor is ready to be read from.
9eb217ff
QY
214 *
215 * We read as much data as possible, process as many packets as we can and
216 * place them on peer->ibuf for secondary processing by the main thread.
56257a44 217 */
424ab01d 218static int bgp_process_reads(struct thread *thread)
56257a44 219{
9eb217ff
QY
220 static struct peer *peer; // peer to read from
221 uint16_t status; // bgp_read status code
222 bool more = true; // whether we got more data
223 bool fatal = false; // whether fatal error occurred
224 bool added_pkt = false; // whether we pushed onto ->ibuf
225 bool header_valid = true; // whether header is valid
226
424ab01d 227 peer = THREAD_ARG(thread);
424ab01d
QY
228
229 if (peer->fd < 0)
230 return -1;
231
232 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
233
424ab01d 234 pthread_mutex_lock(&peer->io_mtx);
56257a44 235 {
424ab01d
QY
236 status = bgp_read(peer);
237 }
238 pthread_mutex_unlock(&peer->io_mtx);
239
9eb217ff
QY
240 /* error checking phase */
241 if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) {
242 /* no problem; just don't process packets */
243 more = false;
244 }
424ab01d 245
9eb217ff
QY
246 if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) {
247 /* problem; tear down session */
248 more = false;
249 fatal = true;
56257a44 250 }
56257a44 251
9eb217ff
QY
252 while (more) {
253 /* static buffer for transferring packets */
254 static unsigned char pktbuf[BGP_MAX_PACKET_SIZE];
255 /* shorter alias to peer's input buffer */
256 struct stream *ibw = peer->ibuf_work;
257 /* offset of start of current packet */
258 size_t offset = stream_get_getp(ibw);
259 /* packet size as given by header */
260 u_int16_t pktsize = 0;
261
262 /* check that we have enough data for a header */
263 if (STREAM_READABLE(ibw) < BGP_HEADER_SIZE)
264 break;
424ab01d 265
9eb217ff 266 /* validate header */
424ab01d 267 header_valid = validate_header(peer);
9eb217ff 268
424ab01d 269 if (!header_valid) {
9eb217ff
QY
270 fatal = true;
271 break;
424ab01d 272 }
424ab01d 273
9eb217ff
QY
274 /* header is valid; retrieve packet size */
275 pktsize = stream_getw_from(ibw, offset + BGP_MARKER_SIZE);
424ab01d 276
9eb217ff
QY
277 /* if this fails we are seriously screwed */
278 assert(pktsize <= BGP_MAX_PACKET_SIZE);
279
280 /* If we have that much data, chuck it into its own
281 * stream and append to input queue for processing. */
282 if (STREAM_READABLE(ibw) >= pktsize) {
283 struct stream *pkt = stream_new(pktsize);
284 stream_get(pktbuf, ibw, pktsize);
285 stream_put(pkt, pktbuf, pktsize);
286
287 pthread_mutex_lock(&peer->io_mtx);
288 {
289 stream_fifo_push(peer->ibuf, pkt);
290 }
291 pthread_mutex_unlock(&peer->io_mtx);
292
293 added_pkt = true;
294 } else
295 break;
296 }
297
298 /* After reading:
299 * 1. Move unread data to stream start to make room for more.
300 * 2. Reschedule and return when we have additional data.
301 *
302 * XXX: Heavy abuse of stream API. This needs a ring buffer.
303 */
304 if (more && STREAM_WRITEABLE(peer->ibuf_work) < BGP_MAX_PACKET_SIZE) {
305 void *from = stream_pnt(peer->ibuf_work);
306 void *to = peer->ibuf_work->data;
307 size_t siz = STREAM_READABLE(peer->ibuf_work);
308 memmove(to, from, siz);
309 stream_set_getp(peer->ibuf_work, 0);
310 stream_set_endp(peer->ibuf_work, siz);
424ab01d
QY
311 }
312
9eb217ff
QY
313 assert(STREAM_WRITEABLE(peer->ibuf_work) >= BGP_MAX_PACKET_SIZE);
314
315 /* handle invalid header */
316 if (fatal) {
317 if (!header_valid) {
318 bgp_size_t pktsize = BGP_HEADER_SIZE;
319 stream_get(peer->last_reset_cause, peer->ibuf_work,
320 pktsize);
321 peer->last_reset_cause_size = pktsize;
322 }
323
324 /* wipe buffer just in case someone screwed up */
325 stream_reset(peer->ibuf_work);
326 } else {
424ab01d
QY
327 thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
328 &peer->t_read);
9eb217ff
QY
329 if (added_pkt)
330 thread_add_event(bm->master, bgp_process_packet, peer,
331 0, NULL);
9eb217ff 332 }
424ab01d
QY
333
334 return 0;
56257a44
QY
335}
336
337/**
338 * Flush peer output buffer.
339 *
340 * This function pops packets off of peer->obuf and writes them to peer->fd.
341 * The amount of packets written is equal to the minimum of peer->wpkt_quanta
424ab01d 342 * and the number of packets on the output buffer, unless an error occurs.
56257a44
QY
343 *
344 * If write() returns an error, the appropriate FSM event is generated.
345 *
346 * The return value is equal to the number of packets written
347 * (which may be zero).
348 */
424ab01d 349static uint16_t bgp_write(struct peer *peer)
56257a44
QY
350{
351 u_char type;
352 struct stream *s;
353 int num;
354 int update_last_write = 0;
355 unsigned int count = 0;
356 unsigned int oc = 0;
424ab01d 357 uint16_t status = 0;
555e09d4 358 uint32_t wpkt_quanta_old;
56257a44 359
555e09d4
QY
360 // cache current write quanta
361 wpkt_quanta_old = atomic_load_explicit(&peer->bgp->wpkt_quanta,
362 memory_order_relaxed);
363
364 while (count < wpkt_quanta_old && (s = stream_fifo_head(peer->obuf))) {
56257a44
QY
365 int writenum;
366 do {
367 writenum = stream_get_endp(s) - stream_get_getp(s);
368 num = write(peer->fd, STREAM_PNT(s), writenum);
369
370 if (num < 0) {
424ab01d 371 if (!ERRNO_IO_RETRY(errno)) {
56257a44 372 BGP_EVENT_ADD(peer, TCP_fatal_error);
424ab01d
QY
373 SET_FLAG(status, BGP_IO_FATAL_ERR);
374 } else {
375 SET_FLAG(status, BGP_IO_TRANS_ERR);
376 }
56257a44
QY
377
378 goto done;
379 } else if (num != writenum) // incomplete write
380 stream_forward_getp(s, num);
381
382 } while (num != writenum);
383
384 /* Retrieve BGP packet type. */
385 stream_set_getp(s, BGP_MARKER_SIZE + 2);
386 type = stream_getc(s);
387
388 switch (type) {
389 case BGP_MSG_OPEN:
390 peer->open_out++;
391 break;
392 case BGP_MSG_UPDATE:
393 peer->update_out++;
394 break;
395 case BGP_MSG_NOTIFY:
396 peer->notify_out++;
397 /* Double start timer. */
398 peer->v_start *= 2;
399
400 /* Overflow check. */
401 if (peer->v_start >= (60 * 2))
402 peer->v_start = (60 * 2);
403
404 /* Handle Graceful Restart case where the state changes
405 to
406 Connect instead of Idle */
407 /* Flush any existing events */
408 BGP_EVENT_ADD(peer, BGP_Stop);
409 goto done;
410
411 case BGP_MSG_KEEPALIVE:
412 peer->keepalive_out++;
413 break;
414 case BGP_MSG_ROUTE_REFRESH_NEW:
415 case BGP_MSG_ROUTE_REFRESH_OLD:
416 peer->refresh_out++;
417 break;
418 case BGP_MSG_CAPABILITY:
419 peer->dynamic_cap_out++;
420 break;
421 }
422
423 count++;
424ab01d 424
56257a44
QY
425 stream_free(stream_fifo_pop(peer->obuf));
426 update_last_write = 1;
427 }
428
429done : {
430 /* Update last_update if UPDATEs were written. */
431 if (peer->update_out > oc)
432 peer->last_update = bgp_clock();
433
434 /* If we TXed any flavor of packet update last_write */
435 if (update_last_write)
436 peer->last_write = bgp_clock();
437}
438
424ab01d
QY
439 return status;
440}
441
442/**
443 * Reads <= 1 packet worth of data from peer->fd into peer->ibuf_work.
444 *
445 * @return whether a full packet was read
446 */
447static uint16_t bgp_read(struct peer *peer)
448{
b750b0ba
QY
449 size_t readsize; // how many bytes we want to read
450 ssize_t nbytes; // how many bytes we actually read
424ab01d
QY
451 uint16_t status = 0;
452
9eb217ff 453 readsize = STREAM_WRITEABLE(peer->ibuf_work);
424ab01d
QY
454
455 nbytes = stream_read_try(peer->ibuf_work, peer->fd, readsize);
456
457 if (nbytes <= 0) // handle errors
458 {
459 switch (nbytes) {
460 case -1: // fatal error; tear down the session
461 zlog_err("%s [Error] bgp_read_packet error: %s",
462 peer->host, safe_strerror(errno));
463
464 if (peer->status == Established) {
465 if (CHECK_FLAG(peer->sflags,
466 PEER_STATUS_NSF_MODE)) {
467 peer->last_reset =
468 PEER_DOWN_NSF_CLOSE_SESSION;
469 SET_FLAG(peer->sflags,
470 PEER_STATUS_NSF_WAIT);
471 } else
472 peer->last_reset =
473 PEER_DOWN_CLOSE_SESSION;
474 }
475
476 BGP_EVENT_ADD(peer, TCP_fatal_error);
477 SET_FLAG(status, BGP_IO_FATAL_ERR);
478 break;
479
480 case 0: // TCP session closed
481 if (bgp_debug_neighbor_events(peer))
482 zlog_debug(
483 "%s [Event] BGP connection closed fd %d",
484 peer->host, peer->fd);
485
486 if (peer->status == Established) {
487 if (CHECK_FLAG(peer->sflags,
488 PEER_STATUS_NSF_MODE)) {
489 peer->last_reset =
490 PEER_DOWN_NSF_CLOSE_SESSION;
491 SET_FLAG(peer->sflags,
492 PEER_STATUS_NSF_WAIT);
493 } else
494 peer->last_reset =
495 PEER_DOWN_CLOSE_SESSION;
496 }
497
498 BGP_EVENT_ADD(peer, TCP_connection_closed);
499 SET_FLAG(status, BGP_IO_FATAL_ERR);
500 break;
501
502 case -2: // temporary error; come back later
503 SET_FLAG(status, BGP_IO_TRANS_ERR);
504 break;
505 default:
506 break;
507 }
508
509 return status;
510 }
511
424ab01d
QY
512 return status;
513}
514
515/*
516 * Called after we have read a BGP packet header. Validates marker, message
517 * type and packet length. If any of these aren't correct, sends a notify.
518 */
519static bool validate_header(struct peer *peer)
520{
521 u_int16_t size, type;
9eb217ff
QY
522 struct stream *pkt = peer->ibuf_work;
523 size_t getp = stream_get_getp(pkt);
424ab01d 524
442c9afb
QY
525 static uint8_t marker[BGP_MARKER_SIZE] = {
526 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
527 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
528
9eb217ff 529 if (memcmp(marker, stream_pnt(pkt), BGP_MARKER_SIZE) != 0) {
442c9afb
QY
530 bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
531 BGP_NOTIFY_HEADER_NOT_SYNC);
532 return false;
533 }
424ab01d
QY
534
535 /* Get size and type. */
9eb217ff
QY
536 size = stream_getw_from(pkt, getp + BGP_MARKER_SIZE);
537 type = stream_getc_from(pkt, getp + BGP_MARKER_SIZE + 2);
424ab01d
QY
538
539 /* BGP type check. */
540 if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
541 && type != BGP_MSG_NOTIFY && type != BGP_MSG_KEEPALIVE
542 && type != BGP_MSG_ROUTE_REFRESH_NEW
543 && type != BGP_MSG_ROUTE_REFRESH_OLD
544 && type != BGP_MSG_CAPABILITY) {
545 if (bgp_debug_neighbor_events(peer))
546 zlog_debug("%s unknown message type 0x%02x", peer->host,
547 type);
548
549 bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
550 BGP_NOTIFY_HEADER_BAD_MESTYPE,
551 (u_char *)&type, 1);
552 return false;
553 }
554
555 /* Mimimum packet length check. */
556 if ((size < BGP_HEADER_SIZE) || (size > BGP_MAX_PACKET_SIZE)
557 || (type == BGP_MSG_OPEN && size < BGP_MSG_OPEN_MIN_SIZE)
558 || (type == BGP_MSG_UPDATE && size < BGP_MSG_UPDATE_MIN_SIZE)
559 || (type == BGP_MSG_NOTIFY && size < BGP_MSG_NOTIFY_MIN_SIZE)
560 || (type == BGP_MSG_KEEPALIVE && size != BGP_MSG_KEEPALIVE_MIN_SIZE)
561 || (type == BGP_MSG_ROUTE_REFRESH_NEW
562 && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
563 || (type == BGP_MSG_ROUTE_REFRESH_OLD
564 && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
565 || (type == BGP_MSG_CAPABILITY
566 && size < BGP_MSG_CAPABILITY_MIN_SIZE)) {
567 if (bgp_debug_neighbor_events(peer))
568 zlog_debug("%s bad message length - %d for %s",
569 peer->host, size,
570 type == 128 ? "ROUTE-REFRESH"
571 : bgp_type_str[(int)type]);
572
573 bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
574 BGP_NOTIFY_HEADER_BAD_MESLEN,
575 (u_char *)&size, 2);
576 return false;
577 }
578
579 return true;
56257a44 580}