]> git.proxmox.com Git - mirror_frr.git/blob - bgpd/bgp_io.c
Merge pull request #1472 from opensourcerouting/lintian-warning
[mirror_frr.git] / bgpd / bgp_io.c
1 /* BGP I/O.
2 * Implements packet I/O in a pthread.
3 * Copyright (C) 2017 Cumulus Networks
4 * Quentin Young
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; see the file COPYING; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston,
19 * MA 02110-1301 USA
20 */
21
22 /* clang-format off */
23 #include <zebra.h>
24 #include <pthread.h> // for pthread_mutex_unlock, pthread_mutex_lock
25
26 #include "frr_pthread.h" // for frr_pthread_get, frr_pthread
27 #include "linklist.h" // for list_delete, list_delete_all_node, lis...
28 #include "log.h" // for zlog_debug, safe_strerror, zlog_err
29 #include "memory.h" // for MTYPE_TMP, XCALLOC, XFREE
30 #include "network.h" // for ERRNO_IO_RETRY
31 #include "stream.h" // for stream_get_endp, stream_getw_from, str...
32 #include "thread.h" // for THREAD_OFF, THREAD_ARG, thread, thread...
33 #include "zassert.h" // for assert
34
35 #include "bgpd/bgp_io.h"
36 #include "bgpd/bgp_debug.h" // for bgp_debug_neighbor_events, bgp_type_str
37 #include "bgpd/bgp_fsm.h" // for BGP_EVENT_ADD, bgp_event
38 #include "bgpd/bgp_packet.h" // for bgp_notify_send_with_data, bgp_notify...
39 #include "bgpd/bgpd.h" // for peer, BGP_MARKER_SIZE, bgp_master, bm
40 /* clang-format on */
41
42 /* forward declarations */
43 static uint16_t bgp_write(struct peer *);
44 static uint16_t bgp_read(struct peer *);
45 static int bgp_process_writes(struct thread *);
46 static int bgp_process_reads(struct thread *);
47 static bool validate_header(struct peer *);
48
49 /* generic i/o status codes */
50 #define BGP_IO_TRANS_ERR (1 << 0) // EAGAIN or similar occurred
51 #define BGP_IO_FATAL_ERR (1 << 1) // some kind of fatal TCP error
52
53 /* Start and stop routines for I/O pthread + control variables
54 * ------------------------------------------------------------------------ */
55 _Atomic bool bgp_io_thread_run;
56 _Atomic bool bgp_io_thread_started;
57
58 void bgp_io_init()
59 {
60 bgp_io_thread_run = false;
61 bgp_io_thread_started = false;
62 }
63
64 /* Unused callback for thread_add_read() */
65 static int bgp_io_dummy(struct thread *thread) { return 0; }
66
67 void *bgp_io_start(void *arg)
68 {
69 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
70 fpt->master->owner = pthread_self();
71
72 // fd so we can sleep in poll()
73 int sleeper[2];
74 pipe(sleeper);
75 thread_add_read(fpt->master, &bgp_io_dummy, NULL, sleeper[0], NULL);
76
77 // we definitely don't want to handle signals
78 fpt->master->handle_signals = false;
79
80 struct thread task;
81
82 atomic_store_explicit(&bgp_io_thread_run, true, memory_order_seq_cst);
83 atomic_store_explicit(&bgp_io_thread_started, true,
84 memory_order_seq_cst);
85
86 while (bgp_io_thread_run) {
87 if (thread_fetch(fpt->master, &task)) {
88 thread_call(&task);
89 }
90 }
91
92 close(sleeper[1]);
93 close(sleeper[0]);
94
95 return NULL;
96 }
97
98 static int bgp_io_finish(struct thread *thread)
99 {
100 atomic_store_explicit(&bgp_io_thread_run, false, memory_order_seq_cst);
101 return 0;
102 }
103
104 int bgp_io_stop(void **result, struct frr_pthread *fpt)
105 {
106 thread_add_event(fpt->master, &bgp_io_finish, NULL, 0, NULL);
107 pthread_join(fpt->thread, result);
108 return 0;
109 }
110
111 /* Extern API -------------------------------------------------------------- */
112
113 void bgp_writes_on(struct peer *peer)
114 {
115 while (
116 !atomic_load_explicit(&bgp_io_thread_started, memory_order_seq_cst))
117 ;
118
119 assert(peer->status != Deleted);
120 assert(peer->obuf);
121 assert(peer->ibuf);
122 assert(peer->ibuf_work);
123 assert(!peer->t_connect_check_r);
124 assert(!peer->t_connect_check_w);
125 assert(peer->fd);
126
127 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
128
129 thread_add_write(fpt->master, bgp_process_writes, peer, peer->fd,
130 &peer->t_write);
131 SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
132 }
133
134 void bgp_writes_off(struct peer *peer)
135 {
136 while (
137 !atomic_load_explicit(&bgp_io_thread_started, memory_order_seq_cst))
138 ;
139
140 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
141
142 thread_cancel_async(fpt->master, &peer->t_write, NULL);
143 THREAD_OFF(peer->t_generate_updgrp_packets);
144
145 UNSET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
146 }
147
148 void bgp_reads_on(struct peer *peer)
149 {
150 while (
151 !atomic_load_explicit(&bgp_io_thread_started, memory_order_seq_cst))
152 ;
153
154 assert(peer->status != Deleted);
155 assert(peer->ibuf);
156 assert(peer->fd);
157 assert(peer->ibuf_work);
158 assert(peer->obuf);
159 assert(!peer->t_connect_check_r);
160 assert(!peer->t_connect_check_w);
161 assert(peer->fd);
162
163 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
164
165 thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
166 &peer->t_read);
167
168 SET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
169 }
170
171 void bgp_reads_off(struct peer *peer)
172 {
173 while (
174 !atomic_load_explicit(&bgp_io_thread_started, memory_order_seq_cst))
175 ;
176
177 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
178
179 thread_cancel_async(fpt->master, &peer->t_read, NULL);
180 THREAD_OFF(peer->t_process_packet);
181
182 UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
183 }
184
185 /* Internal functions ------------------------------------------------------- */
186
187 /**
188 * Called from I/O pthread when a file descriptor has become ready for writing.
189 */
190 static int bgp_process_writes(struct thread *thread)
191 {
192 static struct peer *peer;
193 peer = THREAD_ARG(thread);
194 uint16_t status;
195 bool reschedule;
196 bool fatal = false;
197
198 if (peer->fd < 0)
199 return -1;
200
201 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
202
203 pthread_mutex_lock(&peer->io_mtx);
204 {
205 status = bgp_write(peer);
206 reschedule = (stream_fifo_head(peer->obuf) != NULL);
207 }
208 pthread_mutex_unlock(&peer->io_mtx);
209
210 if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */
211 }
212
213 if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) {
214 reschedule = false; /* problem */
215 fatal = true;
216 }
217
218 if (reschedule) {
219 thread_add_write(fpt->master, bgp_process_writes, peer,
220 peer->fd, &peer->t_write);
221 } else if (!fatal) {
222 BGP_TIMER_ON(peer->t_generate_updgrp_packets,
223 bgp_generate_updgrp_packets, 0);
224 }
225
226 return 0;
227 }
228
229 /**
230 * Called from I/O pthread when a file descriptor has become ready for reading,
231 * or has hung up.
232 *
233 * We read as much data as possible, process as many packets as we can and
234 * place them on peer->ibuf for secondary processing by the main thread.
235 */
236 static int bgp_process_reads(struct thread *thread)
237 {
238 /* clang-format off */
239 static struct peer *peer; // peer to read from
240 uint16_t status; // bgp_read status code
241 bool more = true; // whether we got more data
242 bool fatal = false; // whether fatal error occurred
243 bool added_pkt = false; // whether we pushed onto ->ibuf
244 bool header_valid = true; // whether header is valid
245 /* clang-format on */
246
247 peer = THREAD_ARG(thread);
248
249 if (peer->fd < 0)
250 return -1;
251
252 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
253
254 pthread_mutex_lock(&peer->io_mtx);
255 {
256 status = bgp_read(peer);
257 }
258 pthread_mutex_unlock(&peer->io_mtx);
259
260 /* error checking phase */
261 if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) {
262 /* no problem; just don't process packets */
263 more = false;
264 }
265
266 if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) {
267 /* problem; tear down session */
268 more = false;
269 fatal = true;
270 }
271
272 while (more) {
273 /* static buffer for transferring packets */
274 static unsigned char pktbuf[BGP_MAX_PACKET_SIZE];
275 /* shorter alias to peer's input buffer */
276 struct stream *ibw = peer->ibuf_work;
277 /* offset of start of current packet */
278 size_t offset = stream_get_getp(ibw);
279 /* packet size as given by header */
280 u_int16_t pktsize = 0;
281
282 /* check that we have enough data for a header */
283 if (STREAM_READABLE(ibw) < BGP_HEADER_SIZE)
284 break;
285
286 /* validate header */
287 header_valid = validate_header(peer);
288
289 if (!header_valid) {
290 fatal = true;
291 break;
292 }
293
294 /* header is valid; retrieve packet size */
295 pktsize = stream_getw_from(ibw, offset + BGP_MARKER_SIZE);
296
297 /* if this fails we are seriously screwed */
298 assert(pktsize <= BGP_MAX_PACKET_SIZE);
299
300 /* If we have that much data, chuck it into its own
301 * stream and append to input queue for processing. */
302 if (STREAM_READABLE(ibw) >= pktsize) {
303 struct stream *pkt = stream_new(pktsize);
304 stream_get(pktbuf, ibw, pktsize);
305 stream_put(pkt, pktbuf, pktsize);
306
307 pthread_mutex_lock(&peer->io_mtx);
308 {
309 stream_fifo_push(peer->ibuf, pkt);
310 }
311 pthread_mutex_unlock(&peer->io_mtx);
312
313 added_pkt = true;
314 } else
315 break;
316 }
317
318 /*
319 * After reading:
320 * 1. Move unread data to stream start to make room for more.
321 * 2. Reschedule and return when we have additional data.
322 *
323 * XXX: Heavy abuse of stream API. This needs a ring buffer.
324 */
325 if (more && STREAM_WRITEABLE(peer->ibuf_work) < BGP_MAX_PACKET_SIZE) {
326 void *from = stream_pnt(peer->ibuf_work);
327 void *to = peer->ibuf_work->data;
328 size_t siz = STREAM_READABLE(peer->ibuf_work);
329 memmove(to, from, siz);
330 stream_set_getp(peer->ibuf_work, 0);
331 stream_set_endp(peer->ibuf_work, siz);
332 }
333
334 assert(STREAM_WRITEABLE(peer->ibuf_work) >= BGP_MAX_PACKET_SIZE);
335
336 /* handle invalid header */
337 if (fatal) {
338 /* wipe buffer just in case someone screwed up */
339 stream_reset(peer->ibuf_work);
340 } else {
341 thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
342 &peer->t_read);
343 if (added_pkt)
344 thread_add_timer_msec(bm->master, bgp_process_packet,
345 peer, 0, &peer->t_process_packet);
346 }
347
348 return 0;
349 }
350
351 /**
352 * Flush peer output buffer.
353 *
354 * This function pops packets off of peer->obuf and writes them to peer->fd.
355 * The amount of packets written is equal to the minimum of peer->wpkt_quanta
356 * and the number of packets on the output buffer, unless an error occurs.
357 *
358 * If write() returns an error, the appropriate FSM event is generated.
359 *
360 * The return value is equal to the number of packets written
361 * (which may be zero).
362 */
363 static uint16_t bgp_write(struct peer *peer)
364 {
365 u_char type;
366 struct stream *s;
367 int num;
368 int update_last_write = 0;
369 unsigned int count = 0;
370 uint32_t oc;
371 uint32_t uo;
372 uint16_t status = 0;
373 uint32_t wpkt_quanta_old;
374
375 // save current # updates sent
376 oc = atomic_load_explicit(&peer->update_out, memory_order_relaxed);
377
378 // cache current write quanta
379 wpkt_quanta_old =
380 atomic_load_explicit(&peer->bgp->wpkt_quanta, memory_order_relaxed);
381
382 while (count < wpkt_quanta_old && (s = stream_fifo_head(peer->obuf))) {
383 int writenum;
384 do {
385 writenum = stream_get_endp(s) - stream_get_getp(s);
386 num = write(peer->fd, STREAM_PNT(s), writenum);
387
388 if (num < 0) {
389 if (!ERRNO_IO_RETRY(errno)) {
390 BGP_EVENT_ADD(peer, TCP_fatal_error);
391 SET_FLAG(status, BGP_IO_FATAL_ERR);
392 } else {
393 SET_FLAG(status, BGP_IO_TRANS_ERR);
394 }
395
396 goto done;
397 } else if (num != writenum) // incomplete write
398 stream_forward_getp(s, num);
399
400 } while (num != writenum);
401
402 /* Retrieve BGP packet type. */
403 stream_set_getp(s, BGP_MARKER_SIZE + 2);
404 type = stream_getc(s);
405
406 switch (type) {
407 case BGP_MSG_OPEN:
408 atomic_fetch_add_explicit(&peer->open_out, 1,
409 memory_order_relaxed);
410 break;
411 case BGP_MSG_UPDATE:
412 atomic_fetch_add_explicit(&peer->update_out, 1,
413 memory_order_relaxed);
414 break;
415 case BGP_MSG_NOTIFY:
416 atomic_fetch_add_explicit(&peer->notify_out, 1,
417 memory_order_relaxed);
418 /* Double start timer. */
419 peer->v_start *= 2;
420
421 /* Overflow check. */
422 if (peer->v_start >= (60 * 2))
423 peer->v_start = (60 * 2);
424
425 /* Handle Graceful Restart case where the state changes
426 * to Connect instead of Idle */
427 BGP_EVENT_ADD(peer, BGP_Stop);
428 goto done;
429
430 case BGP_MSG_KEEPALIVE:
431 atomic_fetch_add_explicit(&peer->keepalive_out, 1,
432 memory_order_relaxed);
433 break;
434 case BGP_MSG_ROUTE_REFRESH_NEW:
435 case BGP_MSG_ROUTE_REFRESH_OLD:
436 atomic_fetch_add_explicit(&peer->refresh_out, 1,
437 memory_order_relaxed);
438 break;
439 case BGP_MSG_CAPABILITY:
440 atomic_fetch_add_explicit(&peer->dynamic_cap_out, 1,
441 memory_order_relaxed);
442 break;
443 }
444
445 count++;
446
447 stream_free(stream_fifo_pop(peer->obuf));
448 update_last_write = 1;
449 }
450
451 done : {
452 /* Update last_update if UPDATEs were written. */
453 uo = atomic_load_explicit(&peer->update_out, memory_order_relaxed);
454 if (uo > oc)
455 atomic_store_explicit(&peer->last_update, bgp_clock(),
456 memory_order_relaxed);
457
458 /* If we TXed any flavor of packet */
459 if (update_last_write)
460 atomic_store_explicit(&peer->last_write, bgp_clock(),
461 memory_order_relaxed);
462 }
463
464 return status;
465 }
466
467 /**
468 * Reads a chunk of data from peer->fd into peer->ibuf_work.
469 *
470 * @return status flag (see top-of-file)
471 */
472 static uint16_t bgp_read(struct peer *peer)
473 {
474 size_t readsize; // how many bytes we want to read
475 ssize_t nbytes; // how many bytes we actually read
476 uint16_t status = 0;
477
478 readsize = STREAM_WRITEABLE(peer->ibuf_work);
479
480 nbytes = stream_read_try(peer->ibuf_work, peer->fd, readsize);
481
482 switch (nbytes) {
483 /* Fatal error; tear down session */
484 case -1:
485 zlog_err("%s [Error] bgp_read_packet error: %s", peer->host,
486 safe_strerror(errno));
487
488 if (peer->status == Established) {
489 if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_MODE)) {
490 peer->last_reset = PEER_DOWN_NSF_CLOSE_SESSION;
491 SET_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT);
492 } else
493 peer->last_reset = PEER_DOWN_CLOSE_SESSION;
494 }
495
496 BGP_EVENT_ADD(peer, TCP_fatal_error);
497 SET_FLAG(status, BGP_IO_FATAL_ERR);
498 break;
499
500 /* Received EOF / TCP session closed */
501 case 0:
502 if (bgp_debug_neighbor_events(peer))
503 zlog_debug("%s [Event] BGP connection closed fd %d",
504 peer->host, peer->fd);
505
506 if (peer->status == Established) {
507 if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_MODE)) {
508 peer->last_reset = PEER_DOWN_NSF_CLOSE_SESSION;
509 SET_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT);
510 } else
511 peer->last_reset = PEER_DOWN_CLOSE_SESSION;
512 }
513
514 BGP_EVENT_ADD(peer, TCP_connection_closed);
515 SET_FLAG(status, BGP_IO_FATAL_ERR);
516 break;
517
518 /* EAGAIN or EWOULDBLOCK; come back later */
519 case -2:
520 SET_FLAG(status, BGP_IO_TRANS_ERR);
521 break;
522 default:
523 break;
524 }
525
526 return status;
527 }
528
529 /*
530 * Called after we have read a BGP packet header. Validates marker, message
531 * type and packet length. If any of these aren't correct, sends a notify.
532 */
533 static bool validate_header(struct peer *peer)
534 {
535 uint16_t size;
536 uint8_t type;
537 struct stream *pkt = peer->ibuf_work;
538 size_t getp = stream_get_getp(pkt);
539
540 static uint8_t marker[BGP_MARKER_SIZE] = {
541 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
542 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
543
544 if (memcmp(marker, stream_pnt(pkt), BGP_MARKER_SIZE) != 0) {
545 bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
546 BGP_NOTIFY_HEADER_NOT_SYNC);
547 return false;
548 }
549
550 /* Get size and type in host byte order. */
551 size = stream_getw_from(pkt, getp + BGP_MARKER_SIZE);
552 type = stream_getc_from(pkt, getp + BGP_MARKER_SIZE + 2);
553
554 /* BGP type check. */
555 if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
556 && type != BGP_MSG_NOTIFY && type != BGP_MSG_KEEPALIVE
557 && type != BGP_MSG_ROUTE_REFRESH_NEW
558 && type != BGP_MSG_ROUTE_REFRESH_OLD
559 && type != BGP_MSG_CAPABILITY) {
560 if (bgp_debug_neighbor_events(peer))
561 zlog_debug("%s unknown message type 0x%02x", peer->host,
562 type);
563
564 bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
565 BGP_NOTIFY_HEADER_BAD_MESTYPE,
566 &type, 1);
567 return false;
568 }
569
570 /* Minimum packet length check. */
571 if ((size < BGP_HEADER_SIZE) || (size > BGP_MAX_PACKET_SIZE)
572 || (type == BGP_MSG_OPEN && size < BGP_MSG_OPEN_MIN_SIZE)
573 || (type == BGP_MSG_UPDATE && size < BGP_MSG_UPDATE_MIN_SIZE)
574 || (type == BGP_MSG_NOTIFY && size < BGP_MSG_NOTIFY_MIN_SIZE)
575 || (type == BGP_MSG_KEEPALIVE && size != BGP_MSG_KEEPALIVE_MIN_SIZE)
576 || (type == BGP_MSG_ROUTE_REFRESH_NEW
577 && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
578 || (type == BGP_MSG_ROUTE_REFRESH_OLD
579 && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
580 || (type == BGP_MSG_CAPABILITY
581 && size < BGP_MSG_CAPABILITY_MIN_SIZE)) {
582 if (bgp_debug_neighbor_events(peer)) {
583 zlog_debug("%s bad message length - %d for %s",
584 peer->host, size,
585 type == 128 ? "ROUTE-REFRESH"
586 : bgp_type_str[(int) type]);
587 }
588
589 uint16_t nsize = htons(size);
590
591 bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
592 BGP_NOTIFY_HEADER_BAD_MESLEN,
593 (unsigned char *) &nsize, 2);
594 return false;
595 }
596
597 return true;
598 }