]> git.proxmox.com Git - mirror_frr.git/blob - bgpd/bgp_io.c
bgpd, tests: comment formatting
[mirror_frr.git] / bgpd / bgp_io.c
1 /* BGP I/O.
2 * Implements packet I/O in a pthread.
3 * Copyright (C) 2017 Cumulus Networks
4 * Quentin Young
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; see the file COPYING; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston,
19 * MA 02110-1301 USA
20 */
21
22 /* clang-format off */
23 #include <zebra.h>
24 #include <pthread.h> // for pthread_mutex_unlock, pthread_mutex_lock
25
26 #include "frr_pthread.h" // for frr_pthread_get, frr_pthread
27 #include "linklist.h" // for list_delete, list_delete_all_node, lis...
28 #include "log.h" // for zlog_debug, safe_strerror, zlog_err
29 #include "memory.h" // for MTYPE_TMP, XCALLOC, XFREE
30 #include "network.h" // for ERRNO_IO_RETRY
31 #include "stream.h" // for stream_get_endp, stream_getw_from, str...
32 #include "thread.h" // for THREAD_OFF, THREAD_ARG, thread, thread...
33 #include "zassert.h" // for assert
34
35 #include "bgpd/bgp_io.h"
36 #include "bgpd/bgp_debug.h" // for bgp_debug_neighbor_events, bgp_type_str
37 #include "bgpd/bgp_fsm.h" // for BGP_EVENT_ADD, bgp_event
38 #include "bgpd/bgp_packet.h" // for bgp_notify_send_with_data, bgp_notify...
39 #include "bgpd/bgpd.h" // for peer, BGP_MARKER_SIZE, bgp_master, bm
40 /* clang-format on */
41
42 /* forward declarations */
43 static uint16_t bgp_write(struct peer *);
44 static uint16_t bgp_read(struct peer *);
45 static int bgp_process_writes(struct thread *);
46 static int bgp_process_reads(struct thread *);
47 static bool validate_header(struct peer *);
48
49 /* generic i/o status codes */
50 #define BGP_IO_TRANS_ERR (1 << 0) // EAGAIN or similar occurred
51 #define BGP_IO_FATAL_ERR (1 << 1) // some kind of fatal TCP error
52
53 /* Start and stop routines for I/O pthread + control variables
54 * ------------------------------------------------------------------------ */
55 _Atomic bool bgp_io_thread_run;
56 _Atomic bool bgp_io_thread_started;
57
58 void bgp_io_init()
59 {
60 bgp_io_thread_run = false;
61 bgp_io_thread_started = false;
62 }
63
64 /* Unused callback for thread_add_read() */
65 static int bgp_io_dummy(struct thread *thread) { return 0; }
66
67 void *bgp_io_start(void *arg)
68 {
69 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
70 fpt->master->owner = pthread_self();
71
72 // fd so we can sleep in poll()
73 int sleeper[2];
74 pipe(sleeper);
75 thread_add_read(fpt->master, &bgp_io_dummy, NULL, sleeper[0], NULL);
76
77 // we definitely don't want to handle signals
78 fpt->master->handle_signals = false;
79
80 struct thread task;
81
82 atomic_store_explicit(&bgp_io_thread_run, true, memory_order_seq_cst);
83 atomic_store_explicit(&bgp_io_thread_started, true,
84 memory_order_seq_cst);
85
86 while (bgp_io_thread_run) {
87 if (thread_fetch(fpt->master, &task)) {
88 thread_call(&task);
89 }
90 }
91
92 close(sleeper[1]);
93 close(sleeper[0]);
94
95 return NULL;
96 }
97
98 static int bgp_io_finish(struct thread *thread)
99 {
100 atomic_store_explicit(&bgp_io_thread_run, false, memory_order_seq_cst);
101 return 0;
102 }
103
104 int bgp_io_stop(void **result, struct frr_pthread *fpt)
105 {
106 thread_add_event(fpt->master, &bgp_io_finish, NULL, 0, NULL);
107 pthread_join(fpt->thread, result);
108 return 0;
109 }
110
111 /* Extern API -------------------------------------------------------------- */
112
113 void bgp_writes_on(struct peer *peer)
114 {
115 while (
116 !atomic_load_explicit(&bgp_io_thread_started, memory_order_seq_cst))
117 ;
118
119 assert(peer->status != Deleted);
120 assert(peer->obuf);
121 assert(peer->ibuf);
122 assert(peer->ibuf_work);
123 assert(!peer->t_connect_check);
124 assert(peer->fd);
125
126 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
127
128 thread_add_write(fpt->master, bgp_process_writes, peer, peer->fd,
129 &peer->t_write);
130 SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
131 }
132
133 void bgp_writes_off(struct peer *peer)
134 {
135 while (
136 !atomic_load_explicit(&bgp_io_thread_started, memory_order_seq_cst))
137 ;
138
139 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
140
141 thread_cancel_async(fpt->master, &peer->t_write, NULL);
142 THREAD_OFF(peer->t_generate_updgrp_packets);
143
144 UNSET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
145 }
146
147 void bgp_reads_on(struct peer *peer)
148 {
149 while (
150 !atomic_load_explicit(&bgp_io_thread_started, memory_order_seq_cst))
151 ;
152
153 assert(peer->status != Deleted);
154 assert(peer->ibuf);
155 assert(peer->fd);
156 assert(peer->ibuf_work);
157 assert(stream_get_endp(peer->ibuf_work) == 0);
158 assert(peer->obuf);
159 assert(!peer->t_connect_check);
160 assert(peer->fd);
161
162 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
163
164 thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
165 &peer->t_read);
166
167 SET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
168 }
169
170 void bgp_reads_off(struct peer *peer)
171 {
172 while (
173 !atomic_load_explicit(&bgp_io_thread_started, memory_order_seq_cst))
174 ;
175
176 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
177
178 thread_cancel_async(fpt->master, &peer->t_read, NULL);
179 THREAD_OFF(peer->t_process_packet);
180
181 UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
182 }
183
184 /* Internal functions ------------------------------------------------------- */
185
186 /**
187 * Called from I/O pthread when a file descriptor has become ready for writing.
188 */
189 static int bgp_process_writes(struct thread *thread)
190 {
191 static struct peer *peer;
192 peer = THREAD_ARG(thread);
193 uint16_t status;
194 bool reschedule;
195 bool fatal = false;
196
197 if (peer->fd < 0)
198 return -1;
199
200 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
201
202 pthread_mutex_lock(&peer->io_mtx);
203 {
204 status = bgp_write(peer);
205 reschedule = (stream_fifo_head(peer->obuf) != NULL);
206 }
207 pthread_mutex_unlock(&peer->io_mtx);
208
209 if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */
210 }
211
212 if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) {
213 reschedule = false; /* problem */
214 fatal = true;
215 }
216
217 if (reschedule) {
218 thread_add_write(fpt->master, bgp_process_writes, peer,
219 peer->fd, &peer->t_write);
220 }
221
222 if (!fatal) {
223 thread_add_timer_msec(bm->master, bgp_generate_updgrp_packets,
224 peer, 0,
225 &peer->t_generate_updgrp_packets);
226 }
227
228 return 0;
229 }
230
231 /**
232 * Called from I/O pthread when a file descriptor has become ready for reading,
233 * or has hung up.
234 *
235 * We read as much data as possible, process as many packets as we can and
236 * place them on peer->ibuf for secondary processing by the main thread.
237 */
238 static int bgp_process_reads(struct thread *thread)
239 {
240 /* clang-format off */
241 static struct peer *peer; // peer to read from
242 uint16_t status; // bgp_read status code
243 bool more = true; // whether we got more data
244 bool fatal = false; // whether fatal error occurred
245 bool added_pkt = false; // whether we pushed onto ->ibuf
246 bool header_valid = true; // whether header is valid
247 /* clang-format on */
248
249 peer = THREAD_ARG(thread);
250
251 if (peer->fd < 0)
252 return -1;
253
254 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
255
256 pthread_mutex_lock(&peer->io_mtx);
257 {
258 status = bgp_read(peer);
259 }
260 pthread_mutex_unlock(&peer->io_mtx);
261
262 /* error checking phase */
263 if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) {
264 /* no problem; just don't process packets */
265 more = false;
266 }
267
268 if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) {
269 /* problem; tear down session */
270 more = false;
271 fatal = true;
272 }
273
274 while (more) {
275 /* static buffer for transferring packets */
276 static unsigned char pktbuf[BGP_MAX_PACKET_SIZE];
277 /* shorter alias to peer's input buffer */
278 struct stream *ibw = peer->ibuf_work;
279 /* offset of start of current packet */
280 size_t offset = stream_get_getp(ibw);
281 /* packet size as given by header */
282 u_int16_t pktsize = 0;
283
284 /* check that we have enough data for a header */
285 if (STREAM_READABLE(ibw) < BGP_HEADER_SIZE)
286 break;
287
288 /* validate header */
289 header_valid = validate_header(peer);
290
291 if (!header_valid) {
292 fatal = true;
293 break;
294 }
295
296 /* header is valid; retrieve packet size */
297 pktsize = stream_getw_from(ibw, offset + BGP_MARKER_SIZE);
298
299 /* if this fails we are seriously screwed */
300 assert(pktsize <= BGP_MAX_PACKET_SIZE);
301
302 /* If we have that much data, chuck it into its own
303 * stream and append to input queue for processing. */
304 if (STREAM_READABLE(ibw) >= pktsize) {
305 struct stream *pkt = stream_new(pktsize);
306 stream_get(pktbuf, ibw, pktsize);
307 stream_put(pkt, pktbuf, pktsize);
308
309 pthread_mutex_lock(&peer->io_mtx);
310 {
311 stream_fifo_push(peer->ibuf, pkt);
312 }
313 pthread_mutex_unlock(&peer->io_mtx);
314
315 added_pkt = true;
316 } else
317 break;
318 }
319
320 /*
321 * After reading:
322 * 1. Move unread data to stream start to make room for more.
323 * 2. Reschedule and return when we have additional data.
324 *
325 * XXX: Heavy abuse of stream API. This needs a ring buffer.
326 */
327 if (more && STREAM_WRITEABLE(peer->ibuf_work) < BGP_MAX_PACKET_SIZE) {
328 void *from = stream_pnt(peer->ibuf_work);
329 void *to = peer->ibuf_work->data;
330 size_t siz = STREAM_READABLE(peer->ibuf_work);
331 memmove(to, from, siz);
332 stream_set_getp(peer->ibuf_work, 0);
333 stream_set_endp(peer->ibuf_work, siz);
334 }
335
336 assert(STREAM_WRITEABLE(peer->ibuf_work) >= BGP_MAX_PACKET_SIZE);
337
338 /* handle invalid header */
339 if (fatal) {
340 /* wipe buffer just in case someone screwed up */
341 stream_reset(peer->ibuf_work);
342 } else {
343 thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
344 &peer->t_read);
345 if (added_pkt)
346 thread_add_event(bm->master, bgp_process_packet, peer,
347 0, NULL);
348 }
349
350 return 0;
351 }
352
353 /**
354 * Flush peer output buffer.
355 *
356 * This function pops packets off of peer->obuf and writes them to peer->fd.
357 * The amount of packets written is equal to the minimum of peer->wpkt_quanta
358 * and the number of packets on the output buffer, unless an error occurs.
359 *
360 * If write() returns an error, the appropriate FSM event is generated.
361 *
362 * The return value is equal to the number of packets written
363 * (which may be zero).
364 */
365 static uint16_t bgp_write(struct peer *peer)
366 {
367 u_char type;
368 struct stream *s;
369 int num;
370 int update_last_write = 0;
371 unsigned int count = 0;
372 unsigned int oc = 0;
373 uint16_t status = 0;
374 uint32_t wpkt_quanta_old;
375
376 // cache current write quanta
377 wpkt_quanta_old =
378 atomic_load_explicit(&peer->bgp->wpkt_quanta, memory_order_relaxed);
379
380 while (count < wpkt_quanta_old && (s = stream_fifo_head(peer->obuf))) {
381 int writenum;
382 do {
383 writenum = stream_get_endp(s) - stream_get_getp(s);
384 num = write(peer->fd, STREAM_PNT(s), writenum);
385
386 if (num < 0) {
387 if (!ERRNO_IO_RETRY(errno)) {
388 BGP_EVENT_ADD(peer, TCP_fatal_error);
389 SET_FLAG(status, BGP_IO_FATAL_ERR);
390 } else {
391 SET_FLAG(status, BGP_IO_TRANS_ERR);
392 }
393
394 goto done;
395 } else if (num != writenum) // incomplete write
396 stream_forward_getp(s, num);
397
398 } while (num != writenum);
399
400 /* Retrieve BGP packet type. */
401 stream_set_getp(s, BGP_MARKER_SIZE + 2);
402 type = stream_getc(s);
403
404 switch (type) {
405 case BGP_MSG_OPEN:
406 atomic_fetch_add_explicit(&peer->open_out, 1,
407 memory_order_relaxed);
408 break;
409 case BGP_MSG_UPDATE:
410 atomic_fetch_add_explicit(&peer->update_out, 1,
411 memory_order_relaxed);
412 break;
413 case BGP_MSG_NOTIFY:
414 atomic_fetch_add_explicit(&peer->notify_out, 1,
415 memory_order_relaxed);
416 /* Double start timer. */
417 peer->v_start *= 2;
418
419 /* Overflow check. */
420 if (peer->v_start >= (60 * 2))
421 peer->v_start = (60 * 2);
422
423 /* Handle Graceful Restart case where the state changes
424 * to Connect instead of Idle */
425 BGP_EVENT_ADD(peer, BGP_Stop);
426 goto done;
427
428 case BGP_MSG_KEEPALIVE:
429 atomic_fetch_add_explicit(&peer->keepalive_out, 1,
430 memory_order_relaxed);
431 break;
432 case BGP_MSG_ROUTE_REFRESH_NEW:
433 case BGP_MSG_ROUTE_REFRESH_OLD:
434 atomic_fetch_add_explicit(&peer->refresh_out, 1,
435 memory_order_relaxed);
436 break;
437 case BGP_MSG_CAPABILITY:
438 atomic_fetch_add_explicit(&peer->dynamic_cap_out, 1,
439 memory_order_relaxed);
440 break;
441 }
442
443 count++;
444
445 stream_free(stream_fifo_pop(peer->obuf));
446 update_last_write = 1;
447 }
448
449 done : {
450 /* Update last_update if UPDATEs were written. */
451 if (peer->update_out > oc)
452 atomic_store_explicit(&peer->last_update, bgp_clock(),
453 memory_order_relaxed);
454
455 /* If we TXed any flavor of packet update last_write */
456 if (update_last_write)
457 atomic_store_explicit(&peer->last_write, bgp_clock(),
458 memory_order_relaxed);
459 }
460
461 return status;
462 }
463
464 /**
465 * Reads a chunk of data from peer->fd into peer->ibuf_work.
466 *
467 * @return status flag (see top-of-file)
468 */
469 static uint16_t bgp_read(struct peer *peer)
470 {
471 size_t readsize; // how many bytes we want to read
472 ssize_t nbytes; // how many bytes we actually read
473 uint16_t status = 0;
474
475 readsize = STREAM_WRITEABLE(peer->ibuf_work);
476
477 nbytes = stream_read_try(peer->ibuf_work, peer->fd, readsize);
478
479 switch (nbytes) {
480 /* Fatal error; tear down session */
481 case -1:
482 zlog_err("%s [Error] bgp_read_packet error: %s", peer->host,
483 safe_strerror(errno));
484
485 if (peer->status == Established) {
486 if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_MODE)) {
487 peer->last_reset = PEER_DOWN_NSF_CLOSE_SESSION;
488 SET_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT);
489 } else
490 peer->last_reset = PEER_DOWN_CLOSE_SESSION;
491 }
492
493 BGP_EVENT_ADD(peer, TCP_fatal_error);
494 SET_FLAG(status, BGP_IO_FATAL_ERR);
495 break;
496
497 /* Received EOF / TCP session closed */
498 case 0:
499 if (bgp_debug_neighbor_events(peer))
500 zlog_debug("%s [Event] BGP connection closed fd %d",
501 peer->host, peer->fd);
502
503 if (peer->status == Established) {
504 if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_MODE)) {
505 peer->last_reset = PEER_DOWN_NSF_CLOSE_SESSION;
506 SET_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT);
507 } else
508 peer->last_reset = PEER_DOWN_CLOSE_SESSION;
509 }
510
511 BGP_EVENT_ADD(peer, TCP_connection_closed);
512 SET_FLAG(status, BGP_IO_FATAL_ERR);
513 break;
514
515 /* EAGAIN or EWOULDBLOCK; come back later */
516 case -2:
517 SET_FLAG(status, BGP_IO_TRANS_ERR);
518 break;
519 default:
520 break;
521 }
522
523 return status;
524 }
525
526 /*
527 * Called after we have read a BGP packet header. Validates marker, message
528 * type and packet length. If any of these aren't correct, sends a notify.
529 */
530 static bool validate_header(struct peer *peer)
531 {
532 uint16_t size, type;
533 struct stream *pkt = peer->ibuf_work;
534 size_t getp = stream_get_getp(pkt);
535
536 static uint8_t marker[BGP_MARKER_SIZE] = {
537 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
538 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
539
540 if (memcmp(marker, stream_pnt(pkt), BGP_MARKER_SIZE) != 0) {
541 bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
542 BGP_NOTIFY_HEADER_NOT_SYNC);
543 return false;
544 }
545
546 /* Get size and type. */
547 size = stream_getw_from(pkt, getp + BGP_MARKER_SIZE);
548 type = stream_getc_from(pkt, getp + BGP_MARKER_SIZE + 2);
549
550 /* BGP type check. */
551 if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
552 && type != BGP_MSG_NOTIFY && type != BGP_MSG_KEEPALIVE
553 && type != BGP_MSG_ROUTE_REFRESH_NEW
554 && type != BGP_MSG_ROUTE_REFRESH_OLD
555 && type != BGP_MSG_CAPABILITY) {
556 if (bgp_debug_neighbor_events(peer)) {
557 // XXX: zlog is not MT-safe
558 zlog_debug("%s unknown message type 0x%02x", peer->host,
559 type);
560 }
561
562 bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
563 BGP_NOTIFY_HEADER_BAD_MESTYPE,
564 (u_char *) &type, 1);
565 return false;
566 }
567
568 /* Mimimum packet length check. */
569 if ((size < BGP_HEADER_SIZE) || (size > BGP_MAX_PACKET_SIZE)
570 || (type == BGP_MSG_OPEN && size < BGP_MSG_OPEN_MIN_SIZE)
571 || (type == BGP_MSG_UPDATE && size < BGP_MSG_UPDATE_MIN_SIZE)
572 || (type == BGP_MSG_NOTIFY && size < BGP_MSG_NOTIFY_MIN_SIZE)
573 || (type == BGP_MSG_KEEPALIVE && size != BGP_MSG_KEEPALIVE_MIN_SIZE)
574 || (type == BGP_MSG_ROUTE_REFRESH_NEW
575 && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
576 || (type == BGP_MSG_ROUTE_REFRESH_OLD
577 && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
578 || (type == BGP_MSG_CAPABILITY
579 && size < BGP_MSG_CAPABILITY_MIN_SIZE)) {
580 if (bgp_debug_neighbor_events(peer)) {
581 zlog_debug("%s bad message length - %d for %s",
582 peer->host, size,
583 type == 128 ? "ROUTE-REFRESH"
584 : bgp_type_str[(int) type]);
585 }
586
587 bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
588 BGP_NOTIFY_HEADER_BAD_MESLEN,
589 (u_char *) &size, 2);
590 return false;
591 }
592
593 return true;
594 }