]> git.proxmox.com Git - mirror_frr.git/blame - bgpd/bgp_io.c
bgpd: bye bye THREAD_BACKGROUND
[mirror_frr.git] / bgpd / bgp_io.c
CommitLineData
958b450c
QY
1/* BGP I/O.
2 * Implements packet I/O in a consumer pthread.
3 * Copyright (C) 2017 Cumulus Networks
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; see the file COPYING; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston,
18 * MA 02110-1301 USA
56257a44
QY
19 */
20
42cf651e 21#include <zebra.h>
cfdc170e 22#include <pthread.h> // for pthread_mutex_unlock, pthread_mutex_lock
56257a44 23
cfdc170e
QY
24#include "frr_pthread.h" // for frr_pthread_get, frr_pthread
25#include "linklist.h" // for list_delete, list_delete_all_node, lis...
26#include "log.h" // for zlog_debug, safe_strerror, zlog_err
27#include "memory.h" // for MTYPE_TMP, XCALLOC, XFREE
28#include "network.h" // for ERRNO_IO_RETRY
29#include "stream.h" // for stream_get_endp, stream_getw_from, str...
30#include "thread.h" // for THREAD_OFF, THREAD_ARG, thread, thread...
31#include "zassert.h" // for assert
56257a44 32
42cf651e 33#include "bgpd/bgp_io.h"
cfdc170e
QY
34#include "bgpd/bgp_debug.h" // for bgp_debug_neighbor_events, bgp_type_str
35#include "bgpd/bgp_fsm.h" // for BGP_EVENT_ADD, bgp_event
36#include "bgpd/bgp_packet.h" // for bgp_notify_send_with_data, bgp_notify...
37#include "bgpd/bgpd.h" // for peer, BGP_MARKER_SIZE, bgp_master, bm
56257a44 38
424ab01d
QY
39/* forward declarations */
40static uint16_t bgp_write(struct peer *);
41static uint16_t bgp_read(struct peer *);
42static int bgp_process_writes(struct thread *);
43static int bgp_process_reads(struct thread *);
44static bool validate_header(struct peer *);
56257a44 45
424ab01d
QY
46/* generic i/o status codes */
47#define BGP_IO_TRANS_ERR (1 << 1) // EAGAIN or similar occurred
48#define BGP_IO_FATAL_ERR (1 << 2) // some kind of fatal TCP error
56257a44 49
424ab01d 50/* Start and stop routines for I/O pthread + control variables
56257a44 51 * ------------------------------------------------------------------------ */
424ab01d
QY
52bool bgp_packet_write_thread_run = false;
53pthread_mutex_t *work_mtx;
56257a44 54
424ab01d 55void bgp_io_init()
56257a44 56{
424ab01d
QY
57 work_mtx = XCALLOC(MTYPE_TMP, sizeof(pthread_mutex_t));
58 pthread_mutex_init(work_mtx, NULL);
56257a44 59}
56257a44 60
424ab01d 61void *bgp_io_start(void *arg)
56257a44 62{
424ab01d 63 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
72bb6e33 64 fpt->master->owner = pthread_self();
424ab01d
QY
65
66 // we definitely don't want to handle signals
67 fpt->master->handle_signals = false;
68
69 bgp_packet_write_thread_run = true;
70 struct thread task;
71
72 while (bgp_packet_write_thread_run) {
73 if (thread_fetch(fpt->master, &task)) {
74 pthread_mutex_lock(work_mtx);
75 {
151044ce 76 thread_call(&task);
424ab01d
QY
77 }
78 pthread_mutex_unlock(work_mtx);
79 }
80 }
56257a44 81
424ab01d 82 return NULL;
56257a44
QY
83}
84
424ab01d 85int bgp_io_stop(void **result, struct frr_pthread *fpt)
56257a44 86{
424ab01d
QY
87 fpt->master->spin = false;
88 bgp_packet_write_thread_run = false;
89 pthread_kill(fpt->thread, SIGINT);
90 pthread_join(fpt->thread, result);
56257a44 91
424ab01d
QY
92 pthread_mutex_unlock(work_mtx);
93 pthread_mutex_destroy(work_mtx);
56257a44 94
424ab01d
QY
95 XFREE(MTYPE_TMP, work_mtx);
96 return 0;
56257a44 97}
424ab01d 98/* ------------------------------------------------------------------------ */
56257a44 99
424ab01d 100void bgp_writes_on(struct peer *peer)
56257a44 101{
424ab01d
QY
102 assert(peer->status != Deleted);
103 assert(peer->obuf);
104 assert(peer->ibuf);
105 assert(peer->ibuf_work);
106 assert(!peer->t_connect_check);
107 assert(peer->fd);
56257a44 108
424ab01d 109 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
56257a44 110
424ab01d
QY
111 pthread_mutex_lock(work_mtx);
112 {
424ab01d
QY
113 thread_add_write(fpt->master, bgp_process_writes, peer,
114 peer->fd, &peer->t_write);
115 SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
56257a44 116 }
424ab01d
QY
117 pthread_mutex_unlock(work_mtx);
118}
56257a44 119
424ab01d
QY
120void bgp_writes_off(struct peer *peer)
121{
151044ce
QY
122 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
123
424ab01d
QY
124 pthread_mutex_lock(work_mtx);
125 {
151044ce 126 thread_cancel_async(fpt->master, &peer->t_write);
424ab01d 127 THREAD_OFF(peer->t_generate_updgrp_packets);
56257a44 128
424ab01d
QY
129 // peer access by us after this point will result in pain
130 UNSET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
131 }
132 pthread_mutex_unlock(work_mtx);
133 /* upon return, i/o thread must not access the peer */
56257a44
QY
134}
135
424ab01d 136void bgp_reads_on(struct peer *peer)
56257a44 137{
424ab01d
QY
138 assert(peer->status != Deleted);
139 assert(peer->ibuf);
140 assert(peer->fd);
141 assert(peer->ibuf_work);
142 assert(stream_get_endp(peer->ibuf_work) == 0);
143 assert(peer->obuf);
144 assert(!peer->t_connect_check);
145 assert(peer->fd);
146
147 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
148
149 pthread_mutex_lock(work_mtx);
150 {
424ab01d
QY
151 thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
152 &peer->t_read);
a9794991 153 thread_add_timer_msec(bm->master, bgp_process_packet, peer, 0,
9eb217ff 154 &peer->t_process_packet);
424ab01d
QY
155 SET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
156 }
157 pthread_mutex_unlock(work_mtx);
56257a44
QY
158}
159
424ab01d 160void bgp_reads_off(struct peer *peer)
56257a44 161{
151044ce
QY
162 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
163
164 /* this mutex ensures t_read is not being modified */
424ab01d 165 pthread_mutex_lock(work_mtx);
56257a44 166 {
151044ce 167 thread_cancel_async(fpt->master, &peer->t_read);
424ab01d 168 THREAD_OFF(peer->t_process_packet);
56257a44 169
424ab01d
QY
170 // peer access by us after this point will result in pain
171 UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
56257a44 172 }
424ab01d 173 pthread_mutex_unlock(work_mtx);
56257a44
QY
174}
175
424ab01d
QY
176/**
177 * Called from PTHREAD_IO when select() or poll() determines that the file
178 * descriptor is ready to be written to.
179 */
180static int bgp_process_writes(struct thread *thread)
56257a44 181{
424ab01d
QY
182 static struct peer *peer;
183 peer = THREAD_ARG(thread);
184 uint16_t status;
185
186 if (peer->fd < 0)
187 return -1;
188
189 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
190
191 bool reschedule;
192 pthread_mutex_lock(&peer->io_mtx);
56257a44 193 {
424ab01d
QY
194 status = bgp_write(peer);
195 reschedule = (stream_fifo_head(peer->obuf) != NULL);
196 }
197 pthread_mutex_unlock(&peer->io_mtx);
56257a44 198
424ab01d 199 if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */
56257a44 200 }
56257a44 201
424ab01d
QY
202 if (CHECK_FLAG(status, BGP_IO_FATAL_ERR))
203 reschedule = 0; // problem
204
205 if (reschedule) {
206 thread_add_write(fpt->master, bgp_process_writes, peer,
207 peer->fd, &peer->t_write);
a9794991 208 thread_add_timer_msec(bm->master, bgp_generate_updgrp_packets,
424ab01d
QY
209 peer, 0,
210 &peer->t_generate_updgrp_packets);
211 }
212
213 return 0;
56257a44
QY
214}
215
216/**
424ab01d
QY
217 * Called from PTHREAD_IO when select() or poll() determines that the file
218 * descriptor is ready to be read from.
9eb217ff
QY
219 *
220 * We read as much data as possible, process as many packets as we can and
221 * place them on peer->ibuf for secondary processing by the main thread.
56257a44 222 */
424ab01d 223static int bgp_process_reads(struct thread *thread)
56257a44 224{
9eb217ff
QY
225 static struct peer *peer; // peer to read from
226 uint16_t status; // bgp_read status code
227 bool more = true; // whether we got more data
228 bool fatal = false; // whether fatal error occurred
229 bool added_pkt = false; // whether we pushed onto ->ibuf
230 bool header_valid = true; // whether header is valid
231
424ab01d 232 peer = THREAD_ARG(thread);
424ab01d
QY
233
234 if (peer->fd < 0)
235 return -1;
236
237 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
238
424ab01d 239 pthread_mutex_lock(&peer->io_mtx);
56257a44 240 {
424ab01d
QY
241 status = bgp_read(peer);
242 }
243 pthread_mutex_unlock(&peer->io_mtx);
244
9eb217ff
QY
245 /* error checking phase */
246 if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) {
247 /* no problem; just don't process packets */
248 more = false;
249 }
424ab01d 250
9eb217ff
QY
251 if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) {
252 /* problem; tear down session */
253 more = false;
254 fatal = true;
56257a44 255 }
56257a44 256
9eb217ff
QY
257 while (more) {
258 /* static buffer for transferring packets */
259 static unsigned char pktbuf[BGP_MAX_PACKET_SIZE];
260 /* shorter alias to peer's input buffer */
261 struct stream *ibw = peer->ibuf_work;
262 /* offset of start of current packet */
263 size_t offset = stream_get_getp(ibw);
264 /* packet size as given by header */
265 u_int16_t pktsize = 0;
266
267 /* check that we have enough data for a header */
268 if (STREAM_READABLE(ibw) < BGP_HEADER_SIZE)
269 break;
424ab01d 270
9eb217ff 271 /* validate header */
424ab01d 272 header_valid = validate_header(peer);
9eb217ff 273
424ab01d 274 if (!header_valid) {
9eb217ff
QY
275 fatal = true;
276 break;
424ab01d 277 }
424ab01d 278
9eb217ff
QY
279 /* header is valid; retrieve packet size */
280 pktsize = stream_getw_from(ibw, offset + BGP_MARKER_SIZE);
424ab01d 281
9eb217ff
QY
282 /* if this fails we are seriously screwed */
283 assert(pktsize <= BGP_MAX_PACKET_SIZE);
284
285 /* If we have that much data, chuck it into its own
286 * stream and append to input queue for processing. */
287 if (STREAM_READABLE(ibw) >= pktsize) {
288 struct stream *pkt = stream_new(pktsize);
289 stream_get(pktbuf, ibw, pktsize);
290 stream_put(pkt, pktbuf, pktsize);
291
292 pthread_mutex_lock(&peer->io_mtx);
293 {
294 stream_fifo_push(peer->ibuf, pkt);
295 }
296 pthread_mutex_unlock(&peer->io_mtx);
297
298 added_pkt = true;
299 } else
300 break;
301 }
302
303 /* After reading:
304 * 1. Move unread data to stream start to make room for more.
305 * 2. Reschedule and return when we have additional data.
306 *
307 * XXX: Heavy abuse of stream API. This needs a ring buffer.
308 */
309 if (more && STREAM_WRITEABLE(peer->ibuf_work) < BGP_MAX_PACKET_SIZE) {
310 void *from = stream_pnt(peer->ibuf_work);
311 void *to = peer->ibuf_work->data;
312 size_t siz = STREAM_READABLE(peer->ibuf_work);
313 memmove(to, from, siz);
314 stream_set_getp(peer->ibuf_work, 0);
315 stream_set_endp(peer->ibuf_work, siz);
424ab01d
QY
316 }
317
9eb217ff
QY
318 assert(STREAM_WRITEABLE(peer->ibuf_work) >= BGP_MAX_PACKET_SIZE);
319
320 /* handle invalid header */
321 if (fatal) {
322 if (!header_valid) {
323 bgp_size_t pktsize = BGP_HEADER_SIZE;
324 stream_get(peer->last_reset_cause, peer->ibuf_work,
325 pktsize);
326 peer->last_reset_cause_size = pktsize;
327 }
328
329 /* wipe buffer just in case someone screwed up */
330 stream_reset(peer->ibuf_work);
331 } else {
424ab01d
QY
332 thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
333 &peer->t_read);
9eb217ff
QY
334 if (added_pkt)
335 thread_add_event(bm->master, bgp_process_packet, peer,
336 0, NULL);
9eb217ff 337 }
424ab01d
QY
338
339 return 0;
56257a44
QY
340}
341
342/**
343 * Flush peer output buffer.
344 *
345 * This function pops packets off of peer->obuf and writes them to peer->fd.
346 * The amount of packets written is equal to the minimum of peer->wpkt_quanta
424ab01d 347 * and the number of packets on the output buffer, unless an error occurs.
56257a44
QY
348 *
349 * If write() returns an error, the appropriate FSM event is generated.
350 *
351 * The return value is equal to the number of packets written
352 * (which may be zero).
353 */
424ab01d 354static uint16_t bgp_write(struct peer *peer)
56257a44
QY
355{
356 u_char type;
357 struct stream *s;
358 int num;
359 int update_last_write = 0;
360 unsigned int count = 0;
361 unsigned int oc = 0;
424ab01d 362 uint16_t status = 0;
555e09d4 363 uint32_t wpkt_quanta_old;
56257a44 364
555e09d4
QY
365 // cache current write quanta
366 wpkt_quanta_old = atomic_load_explicit(&peer->bgp->wpkt_quanta,
367 memory_order_relaxed);
368
369 while (count < wpkt_quanta_old && (s = stream_fifo_head(peer->obuf))) {
56257a44
QY
370 int writenum;
371 do {
372 writenum = stream_get_endp(s) - stream_get_getp(s);
373 num = write(peer->fd, STREAM_PNT(s), writenum);
374
375 if (num < 0) {
424ab01d 376 if (!ERRNO_IO_RETRY(errno)) {
56257a44 377 BGP_EVENT_ADD(peer, TCP_fatal_error);
424ab01d
QY
378 SET_FLAG(status, BGP_IO_FATAL_ERR);
379 } else {
380 SET_FLAG(status, BGP_IO_TRANS_ERR);
381 }
56257a44
QY
382
383 goto done;
384 } else if (num != writenum) // incomplete write
385 stream_forward_getp(s, num);
386
387 } while (num != writenum);
388
389 /* Retrieve BGP packet type. */
390 stream_set_getp(s, BGP_MARKER_SIZE + 2);
391 type = stream_getc(s);
392
393 switch (type) {
394 case BGP_MSG_OPEN:
395 peer->open_out++;
396 break;
397 case BGP_MSG_UPDATE:
398 peer->update_out++;
399 break;
400 case BGP_MSG_NOTIFY:
401 peer->notify_out++;
402 /* Double start timer. */
403 peer->v_start *= 2;
404
405 /* Overflow check. */
406 if (peer->v_start >= (60 * 2))
407 peer->v_start = (60 * 2);
408
409 /* Handle Graceful Restart case where the state changes
410 to
411 Connect instead of Idle */
412 /* Flush any existing events */
413 BGP_EVENT_ADD(peer, BGP_Stop);
414 goto done;
415
416 case BGP_MSG_KEEPALIVE:
417 peer->keepalive_out++;
418 break;
419 case BGP_MSG_ROUTE_REFRESH_NEW:
420 case BGP_MSG_ROUTE_REFRESH_OLD:
421 peer->refresh_out++;
422 break;
423 case BGP_MSG_CAPABILITY:
424 peer->dynamic_cap_out++;
425 break;
426 }
427
428 count++;
424ab01d 429
56257a44
QY
430 stream_free(stream_fifo_pop(peer->obuf));
431 update_last_write = 1;
432 }
433
434done : {
435 /* Update last_update if UPDATEs were written. */
436 if (peer->update_out > oc)
437 peer->last_update = bgp_clock();
438
439 /* If we TXed any flavor of packet update last_write */
440 if (update_last_write)
441 peer->last_write = bgp_clock();
442}
443
424ab01d
QY
444 return status;
445}
446
447/**
448 * Reads <= 1 packet worth of data from peer->fd into peer->ibuf_work.
449 *
450 * @return whether a full packet was read
451 */
452static uint16_t bgp_read(struct peer *peer)
453{
454 int readsize; // how many bytes we want to read
455 int nbytes; // how many bytes we actually read
424ab01d
QY
456 uint16_t status = 0;
457
9eb217ff 458 readsize = STREAM_WRITEABLE(peer->ibuf_work);
424ab01d
QY
459
460 nbytes = stream_read_try(peer->ibuf_work, peer->fd, readsize);
461
462 if (nbytes <= 0) // handle errors
463 {
464 switch (nbytes) {
465 case -1: // fatal error; tear down the session
466 zlog_err("%s [Error] bgp_read_packet error: %s",
467 peer->host, safe_strerror(errno));
468
469 if (peer->status == Established) {
470 if (CHECK_FLAG(peer->sflags,
471 PEER_STATUS_NSF_MODE)) {
472 peer->last_reset =
473 PEER_DOWN_NSF_CLOSE_SESSION;
474 SET_FLAG(peer->sflags,
475 PEER_STATUS_NSF_WAIT);
476 } else
477 peer->last_reset =
478 PEER_DOWN_CLOSE_SESSION;
479 }
480
481 BGP_EVENT_ADD(peer, TCP_fatal_error);
482 SET_FLAG(status, BGP_IO_FATAL_ERR);
483 break;
484
485 case 0: // TCP session closed
486 if (bgp_debug_neighbor_events(peer))
487 zlog_debug(
488 "%s [Event] BGP connection closed fd %d",
489 peer->host, peer->fd);
490
491 if (peer->status == Established) {
492 if (CHECK_FLAG(peer->sflags,
493 PEER_STATUS_NSF_MODE)) {
494 peer->last_reset =
495 PEER_DOWN_NSF_CLOSE_SESSION;
496 SET_FLAG(peer->sflags,
497 PEER_STATUS_NSF_WAIT);
498 } else
499 peer->last_reset =
500 PEER_DOWN_CLOSE_SESSION;
501 }
502
503 BGP_EVENT_ADD(peer, TCP_connection_closed);
504 SET_FLAG(status, BGP_IO_FATAL_ERR);
505 break;
506
507 case -2: // temporary error; come back later
508 SET_FLAG(status, BGP_IO_TRANS_ERR);
509 break;
510 default:
511 break;
512 }
513
514 return status;
515 }
516
424ab01d
QY
517 return status;
518}
519
520/*
521 * Called after we have read a BGP packet header. Validates marker, message
522 * type and packet length. If any of these aren't correct, sends a notify.
523 */
524static bool validate_header(struct peer *peer)
525{
526 u_int16_t size, type;
9eb217ff
QY
527 struct stream *pkt = peer->ibuf_work;
528 size_t getp = stream_get_getp(pkt);
424ab01d 529
442c9afb
QY
530 static uint8_t marker[BGP_MARKER_SIZE] = {
531 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
532 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
533
9eb217ff 534 if (memcmp(marker, stream_pnt(pkt), BGP_MARKER_SIZE) != 0) {
442c9afb
QY
535 bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
536 BGP_NOTIFY_HEADER_NOT_SYNC);
537 return false;
538 }
424ab01d
QY
539
540 /* Get size and type. */
9eb217ff
QY
541 size = stream_getw_from(pkt, getp + BGP_MARKER_SIZE);
542 type = stream_getc_from(pkt, getp + BGP_MARKER_SIZE + 2);
424ab01d
QY
543
544 /* BGP type check. */
545 if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
546 && type != BGP_MSG_NOTIFY && type != BGP_MSG_KEEPALIVE
547 && type != BGP_MSG_ROUTE_REFRESH_NEW
548 && type != BGP_MSG_ROUTE_REFRESH_OLD
549 && type != BGP_MSG_CAPABILITY) {
550 if (bgp_debug_neighbor_events(peer))
551 zlog_debug("%s unknown message type 0x%02x", peer->host,
552 type);
553
554 bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
555 BGP_NOTIFY_HEADER_BAD_MESTYPE,
556 (u_char *)&type, 1);
557 return false;
558 }
559
560 /* Mimimum packet length check. */
561 if ((size < BGP_HEADER_SIZE) || (size > BGP_MAX_PACKET_SIZE)
562 || (type == BGP_MSG_OPEN && size < BGP_MSG_OPEN_MIN_SIZE)
563 || (type == BGP_MSG_UPDATE && size < BGP_MSG_UPDATE_MIN_SIZE)
564 || (type == BGP_MSG_NOTIFY && size < BGP_MSG_NOTIFY_MIN_SIZE)
565 || (type == BGP_MSG_KEEPALIVE && size != BGP_MSG_KEEPALIVE_MIN_SIZE)
566 || (type == BGP_MSG_ROUTE_REFRESH_NEW
567 && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
568 || (type == BGP_MSG_ROUTE_REFRESH_OLD
569 && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
570 || (type == BGP_MSG_CAPABILITY
571 && size < BGP_MSG_CAPABILITY_MIN_SIZE)) {
572 if (bgp_debug_neighbor_events(peer))
573 zlog_debug("%s bad message length - %d for %s",
574 peer->host, size,
575 type == 128 ? "ROUTE-REFRESH"
576 : bgp_type_str[(int)type]);
577
578 bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
579 BGP_NOTIFY_HEADER_BAD_MESLEN,
580 (u_char *)&size, 2);
581 return false;
582 }
583
584 return true;
56257a44 585}