]> git.proxmox.com Git - mirror_frr.git/blob - bgpd/bgp_io.c
Merge pull request #13649 from donaldsharp/unlock_the_node_or_else
[mirror_frr.git] / bgpd / bgp_io.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* BGP I/O.
3 * Implements packet I/O in a pthread.
4 * Copyright (C) 2017 Cumulus Networks
5 * Quentin Young
6 */
7
8 /* clang-format off */
9 #include <zebra.h>
10 #include <pthread.h> // for pthread_mutex_unlock, pthread_mutex_lock
11 #include <sys/uio.h> // for writev
12
13 #include "frr_pthread.h"
14 #include "linklist.h" // for list_delete, list_delete_all_node, lis...
15 #include "log.h" // for zlog_debug, safe_strerror, zlog_err
16 #include "memory.h" // for MTYPE_TMP, XCALLOC, XFREE
17 #include "network.h" // for ERRNO_IO_RETRY
18 #include "stream.h" // for stream_get_endp, stream_getw_from, str...
19 #include "ringbuf.h" // for ringbuf_remain, ringbuf_peek, ringbuf_...
20 #include "frrevent.h" // for EVENT_OFF, EVENT_ARG, thread...
21
22 #include "bgpd/bgp_io.h"
23 #include "bgpd/bgp_debug.h" // for bgp_debug_neighbor_events, bgp_type_str
24 #include "bgpd/bgp_errors.h" // for expanded error reference information
25 #include "bgpd/bgp_fsm.h" // for BGP_EVENT_ADD, bgp_event
26 #include "bgpd/bgp_packet.h" // for bgp_notify_io_invalid...
27 #include "bgpd/bgp_trace.h" // for frrtraces
28 #include "bgpd/bgpd.h" // for peer, BGP_MARKER_SIZE, bgp_master, bm
29 /* clang-format on */
30
31 /* forward declarations */
32 static uint16_t bgp_write(struct peer *);
33 static uint16_t bgp_read(struct peer *peer, int *code_p);
34 static void bgp_process_writes(struct event *event);
35 static void bgp_process_reads(struct event *event);
36 static bool validate_header(struct peer *);
37
38 /* generic i/o status codes */
39 #define BGP_IO_TRANS_ERR (1 << 0) /* EAGAIN or similar occurred */
40 #define BGP_IO_FATAL_ERR (1 << 1) /* some kind of fatal TCP error */
41 #define BGP_IO_WORK_FULL_ERR (1 << 2) /* No room in work buffer */
42
43 /* Thread external API ----------------------------------------------------- */
44
45 void bgp_writes_on(struct peer *peer)
46 {
47 struct frr_pthread *fpt = bgp_pth_io;
48 assert(fpt->running);
49
50 assert(peer->status != Deleted);
51 assert(peer->obuf);
52 assert(peer->ibuf);
53 assert(peer->ibuf_work);
54 assert(!peer->t_connect_check_r);
55 assert(!peer->t_connect_check_w);
56 assert(peer->fd);
57
58 event_add_write(fpt->master, bgp_process_writes, peer, peer->fd,
59 &peer->t_write);
60 SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
61 }
62
63 void bgp_writes_off(struct peer *peer)
64 {
65 struct frr_pthread *fpt = bgp_pth_io;
66 assert(fpt->running);
67
68 event_cancel_async(fpt->master, &peer->t_write, NULL);
69 EVENT_OFF(peer->t_generate_updgrp_packets);
70
71 UNSET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
72 }
73
74 void bgp_reads_on(struct peer *peer)
75 {
76 struct frr_pthread *fpt = bgp_pth_io;
77 assert(fpt->running);
78
79 assert(peer->status != Deleted);
80 assert(peer->ibuf);
81 assert(peer->fd);
82 assert(peer->ibuf_work);
83 assert(peer->obuf);
84 assert(!peer->t_connect_check_r);
85 assert(!peer->t_connect_check_w);
86 assert(peer->fd);
87
88 event_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
89 &peer->t_read);
90
91 SET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
92 }
93
94 void bgp_reads_off(struct peer *peer)
95 {
96 struct frr_pthread *fpt = bgp_pth_io;
97 assert(fpt->running);
98
99 event_cancel_async(fpt->master, &peer->t_read, NULL);
100 EVENT_OFF(peer->t_process_packet);
101 EVENT_OFF(peer->t_process_packet_error);
102
103 UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
104 }
105
106 /* Thread internal functions ----------------------------------------------- */
107
108 /*
109 * Called from I/O pthread when a file descriptor has become ready for writing.
110 */
111 static void bgp_process_writes(struct event *thread)
112 {
113 static struct peer *peer;
114 peer = EVENT_ARG(thread);
115 uint16_t status;
116 bool reschedule;
117 bool fatal = false;
118
119 if (peer->fd < 0)
120 return;
121
122 struct frr_pthread *fpt = bgp_pth_io;
123
124 frr_with_mutex (&peer->io_mtx) {
125 status = bgp_write(peer);
126 reschedule = (stream_fifo_head(peer->obuf) != NULL);
127 }
128
129 /* no problem */
130 if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) {
131 }
132
133 /* problem */
134 if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) {
135 reschedule = false;
136 fatal = true;
137 }
138
139 /* If suppress fib pending is enabled, route is advertised to peers when
140 * the status is received from the FIB. The delay is added
141 * to update group packet generate which will allow more routes to be
142 * sent in the update message
143 */
144 if (reschedule) {
145 event_add_write(fpt->master, bgp_process_writes, peer, peer->fd,
146 &peer->t_write);
147 } else if (!fatal) {
148 BGP_UPDATE_GROUP_TIMER_ON(&peer->t_generate_updgrp_packets,
149 bgp_generate_updgrp_packets);
150 }
151 }
152
153 static int read_ibuf_work(struct peer *peer)
154 {
155 /* static buffer for transferring packets */
156 /* shorter alias to peer's input buffer */
157 struct ringbuf *ibw = peer->ibuf_work;
158 /* packet size as given by header */
159 uint16_t pktsize = 0;
160 struct stream *pkt;
161
162 /* ============================================== */
163 frr_with_mutex (&peer->io_mtx) {
164 if (peer->ibuf->count >= bm->inq_limit)
165 return -ENOMEM;
166 }
167
168 /* check that we have enough data for a header */
169 if (ringbuf_remain(ibw) < BGP_HEADER_SIZE)
170 return 0;
171
172 /* check that header is valid */
173 if (!validate_header(peer))
174 return -EBADMSG;
175
176 /* header is valid; retrieve packet size */
177 ringbuf_peek(ibw, BGP_MARKER_SIZE, &pktsize, sizeof(pktsize));
178
179 pktsize = ntohs(pktsize);
180
181 /* if this fails we are seriously screwed */
182 assert(pktsize <= peer->max_packet_size);
183
184 /*
185 * If we have that much data, chuck it into its own
186 * stream and append to input queue for processing.
187 *
188 * Otherwise, come back later.
189 */
190 if (ringbuf_remain(ibw) < pktsize)
191 return 0;
192
193 pkt = stream_new(pktsize);
194 assert(STREAM_WRITEABLE(pkt) == pktsize);
195 assert(ringbuf_get(ibw, pkt->data, pktsize) == pktsize);
196 stream_set_endp(pkt, pktsize);
197
198 frrtrace(2, frr_bgp, packet_read, peer, pkt);
199 frr_with_mutex (&peer->io_mtx) {
200 stream_fifo_push(peer->ibuf, pkt);
201 }
202
203 return pktsize;
204 }
205
206 /*
207 * Called from I/O pthread when a file descriptor has become ready for reading,
208 * or has hung up.
209 *
210 * We read as much data as possible, process as many packets as we can and
211 * place them on peer->ibuf for secondary processing by the main thread.
212 */
213 static void bgp_process_reads(struct event *thread)
214 {
215 /* clang-format off */
216 static struct peer *peer; /* peer to read from */
217 uint16_t status; /* bgp_read status code */
218 bool fatal = false; /* whether fatal error occurred */
219 bool added_pkt = false; /* whether we pushed onto ->ibuf */
220 int code = 0; /* FSM code if error occurred */
221 bool ibuf_full = false; /* Is peer fifo IN Buffer full */
222 static bool ibuf_full_logged; /* Have we logged full already */
223 int ret = 1;
224 /* clang-format on */
225
226 peer = EVENT_ARG(thread);
227
228 if (peer->fd < 0 || bm->terminating)
229 return;
230
231 struct frr_pthread *fpt = bgp_pth_io;
232
233 frr_with_mutex (&peer->io_mtx) {
234 status = bgp_read(peer, &code);
235 }
236
237 /* error checking phase */
238 if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) {
239 /* no problem; just don't process packets */
240 goto done;
241 }
242
243 if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) {
244 /* problem; tear down session */
245 fatal = true;
246
247 /* Handle the error in the main pthread, include the
248 * specific state change from 'bgp_read'.
249 */
250 event_add_event(bm->master, bgp_packet_process_error, peer,
251 code, &peer->t_process_packet_error);
252 goto done;
253 }
254
255 while (true) {
256 ret = read_ibuf_work(peer);
257 if (ret <= 0)
258 break;
259
260 added_pkt = true;
261 }
262
263 switch (ret) {
264 case -EBADMSG:
265 fatal = true;
266 break;
267 case -ENOMEM:
268 ibuf_full = true;
269 if (!ibuf_full_logged) {
270 if (bgp_debug_neighbor_events(peer))
271 zlog_debug(
272 "%s [Event] Peer Input-Queue is full: limit (%u)",
273 peer->host, bm->inq_limit);
274
275 ibuf_full_logged = true;
276 }
277 break;
278 default:
279 ibuf_full_logged = false;
280 break;
281 }
282
283 done:
284 /* handle invalid header */
285 if (fatal) {
286 /* wipe buffer just in case someone screwed up */
287 ringbuf_wipe(peer->ibuf_work);
288 return;
289 }
290
291 /* ringbuf should be fully drained unless ibuf is full */
292 if (!ibuf_full)
293 assert(ringbuf_space(peer->ibuf_work) >= peer->max_packet_size);
294
295 event_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
296 &peer->t_read);
297 if (added_pkt)
298 event_add_event(bm->master, bgp_process_packet, peer, 0,
299 &peer->t_process_packet);
300 }
301
302 /*
303 * Flush peer output buffer.
304 *
305 * This function pops packets off of peer->obuf and writes them to peer->fd.
306 * The amount of packets written is equal to the minimum of peer->wpkt_quanta
307 * and the number of packets on the output buffer, unless an error occurs.
308 *
309 * If write() returns an error, the appropriate FSM event is generated.
310 *
311 * The return value is equal to the number of packets written
312 * (which may be zero).
313 */
314 static uint16_t bgp_write(struct peer *peer)
315 {
316 uint8_t type;
317 struct stream *s;
318 int update_last_write = 0;
319 unsigned int count;
320 uint32_t uo = 0;
321 uint16_t status = 0;
322 uint32_t wpkt_quanta_old;
323
324 int writenum = 0;
325 int num;
326 unsigned int iovsz;
327 unsigned int strmsz;
328 unsigned int total_written;
329 time_t now;
330
331 wpkt_quanta_old = atomic_load_explicit(&peer->bgp->wpkt_quanta,
332 memory_order_relaxed);
333 struct stream *ostreams[wpkt_quanta_old];
334 struct stream **streams = ostreams;
335 struct iovec iov[wpkt_quanta_old];
336
337 s = stream_fifo_head(peer->obuf);
338
339 if (!s)
340 goto done;
341
342 count = iovsz = 0;
343 while (count < wpkt_quanta_old && iovsz < array_size(iov) && s) {
344 ostreams[iovsz] = s;
345 iov[iovsz].iov_base = stream_pnt(s);
346 iov[iovsz].iov_len = STREAM_READABLE(s);
347 writenum += STREAM_READABLE(s);
348 s = s->next;
349 ++iovsz;
350 ++count;
351 }
352
353 strmsz = iovsz;
354 total_written = 0;
355
356 do {
357 num = writev(peer->fd, iov, iovsz);
358
359 if (num < 0) {
360 if (!ERRNO_IO_RETRY(errno)) {
361 BGP_EVENT_ADD(peer, TCP_fatal_error);
362 SET_FLAG(status, BGP_IO_FATAL_ERR);
363 } else {
364 SET_FLAG(status, BGP_IO_TRANS_ERR);
365 }
366
367 break;
368 } else if (num != writenum) {
369 unsigned int msg_written = 0;
370 unsigned int ic = iovsz;
371
372 for (unsigned int i = 0; i < ic; i++) {
373 size_t ss = iov[i].iov_len;
374
375 if (ss > (unsigned int) num)
376 break;
377
378 msg_written++;
379 iovsz--;
380 writenum -= ss;
381 num -= ss;
382 }
383
384 total_written += msg_written;
385
386 assert(total_written < count);
387
388 memmove(&iov, &iov[msg_written],
389 sizeof(iov[0]) * iovsz);
390 streams = &streams[msg_written];
391 stream_forward_getp(streams[0], num);
392 iov[0].iov_base = stream_pnt(streams[0]);
393 iov[0].iov_len = STREAM_READABLE(streams[0]);
394
395 writenum -= num;
396 num = 0;
397 assert(writenum > 0);
398 } else {
399 total_written = strmsz;
400 }
401
402 } while (num != writenum);
403
404 /* Handle statistics */
405 for (unsigned int i = 0; i < total_written; i++) {
406 s = stream_fifo_pop(peer->obuf);
407
408 assert(s == ostreams[i]);
409
410 /* Retrieve BGP packet type. */
411 stream_set_getp(s, BGP_MARKER_SIZE + 2);
412 type = stream_getc(s);
413
414 switch (type) {
415 case BGP_MSG_OPEN:
416 atomic_fetch_add_explicit(&peer->open_out, 1,
417 memory_order_relaxed);
418 break;
419 case BGP_MSG_UPDATE:
420 atomic_fetch_add_explicit(&peer->update_out, 1,
421 memory_order_relaxed);
422 uo++;
423 break;
424 case BGP_MSG_NOTIFY:
425 atomic_fetch_add_explicit(&peer->notify_out, 1,
426 memory_order_relaxed);
427 /* Double start timer. */
428 peer->v_start *= 2;
429
430 /* Overflow check. */
431 if (peer->v_start >= (60 * 2))
432 peer->v_start = (60 * 2);
433
434 /*
435 * Handle Graceful Restart case where the state changes
436 * to Connect instead of Idle.
437 */
438 BGP_EVENT_ADD(peer, BGP_Stop);
439 goto done;
440
441 case BGP_MSG_KEEPALIVE:
442 atomic_fetch_add_explicit(&peer->keepalive_out, 1,
443 memory_order_relaxed);
444 break;
445 case BGP_MSG_ROUTE_REFRESH_NEW:
446 case BGP_MSG_ROUTE_REFRESH_OLD:
447 atomic_fetch_add_explicit(&peer->refresh_out, 1,
448 memory_order_relaxed);
449 break;
450 case BGP_MSG_CAPABILITY:
451 atomic_fetch_add_explicit(&peer->dynamic_cap_out, 1,
452 memory_order_relaxed);
453 break;
454 }
455
456 stream_free(s);
457 ostreams[i] = NULL;
458 update_last_write = 1;
459 }
460
461 done : {
462 now = monotime(NULL);
463 /*
464 * Update last_update if UPDATEs were written.
465 * Note: that these are only updated at end,
466 * not per message (i.e., per loop)
467 */
468 if (uo)
469 atomic_store_explicit(&peer->last_update, now,
470 memory_order_relaxed);
471
472 /* If we TXed any flavor of packet */
473 if (update_last_write) {
474 atomic_store_explicit(&peer->last_write, now,
475 memory_order_relaxed);
476 peer->last_sendq_ok = now;
477 }
478 }
479
480 return status;
481 }
482
483 /*
484 * Reads a chunk of data from peer->fd into peer->ibuf_work.
485 *
486 * code_p
487 * Pointer to location to store FSM event code in case of fatal error.
488 *
489 * @return status flag (see top-of-file)
490 */
491 static uint16_t bgp_read(struct peer *peer, int *code_p)
492 {
493 size_t readsize; /* how many bytes we want to read */
494 ssize_t nbytes; /* how many bytes we actually read */
495 size_t ibuf_work_space; /* space we can read into the work buf */
496 uint16_t status = 0;
497
498 ibuf_work_space = ringbuf_space(peer->ibuf_work);
499
500 if (ibuf_work_space == 0) {
501 SET_FLAG(status, BGP_IO_WORK_FULL_ERR);
502 return status;
503 }
504
505 readsize = MIN(ibuf_work_space, sizeof(peer->ibuf_scratch));
506
507 nbytes = read(peer->fd, peer->ibuf_scratch, readsize);
508
509 /* EAGAIN or EWOULDBLOCK; come back later */
510 if (nbytes < 0 && ERRNO_IO_RETRY(errno)) {
511 SET_FLAG(status, BGP_IO_TRANS_ERR);
512 } else if (nbytes < 0) {
513 /* Fatal error; tear down session */
514 flog_err(EC_BGP_UPDATE_RCV,
515 "%s [Error] bgp_read_packet error: %s", peer->host,
516 safe_strerror(errno));
517
518 /* Handle the error in the main pthread. */
519 if (code_p)
520 *code_p = TCP_fatal_error;
521
522 SET_FLAG(status, BGP_IO_FATAL_ERR);
523
524 } else if (nbytes == 0) {
525 /* Received EOF / TCP session closed */
526 if (bgp_debug_neighbor_events(peer))
527 zlog_debug("%s [Event] BGP connection closed fd %d",
528 peer->host, peer->fd);
529
530 /* Handle the error in the main pthread. */
531 if (code_p)
532 *code_p = TCP_connection_closed;
533
534 SET_FLAG(status, BGP_IO_FATAL_ERR);
535 } else {
536 assert(ringbuf_put(peer->ibuf_work, peer->ibuf_scratch, nbytes)
537 == (size_t)nbytes);
538 }
539
540 return status;
541 }
542
543 /*
544 * Called after we have read a BGP packet header. Validates marker, message
545 * type and packet length. If any of these aren't correct, sends a notify.
546 *
547 * Assumes that there are at least BGP_HEADER_SIZE readable bytes in the input
548 * buffer.
549 */
550 static bool validate_header(struct peer *peer)
551 {
552 uint16_t size;
553 uint8_t type;
554 struct ringbuf *pkt = peer->ibuf_work;
555
556 static const uint8_t m_correct[BGP_MARKER_SIZE] = {
557 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
558 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
559 uint8_t m_rx[BGP_MARKER_SIZE] = {0x00};
560
561 if (ringbuf_peek(pkt, 0, m_rx, BGP_MARKER_SIZE) != BGP_MARKER_SIZE)
562 return false;
563
564 if (memcmp(m_correct, m_rx, BGP_MARKER_SIZE) != 0) {
565 bgp_notify_io_invalid(peer, BGP_NOTIFY_HEADER_ERR,
566 BGP_NOTIFY_HEADER_NOT_SYNC, NULL, 0);
567 return false;
568 }
569
570 /* Get size and type in network byte order. */
571 ringbuf_peek(pkt, BGP_MARKER_SIZE, &size, sizeof(size));
572 ringbuf_peek(pkt, BGP_MARKER_SIZE + 2, &type, sizeof(type));
573
574 size = ntohs(size);
575
576 /* BGP type check. */
577 if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
578 && type != BGP_MSG_NOTIFY && type != BGP_MSG_KEEPALIVE
579 && type != BGP_MSG_ROUTE_REFRESH_NEW
580 && type != BGP_MSG_ROUTE_REFRESH_OLD
581 && type != BGP_MSG_CAPABILITY) {
582 if (bgp_debug_neighbor_events(peer))
583 zlog_debug("%s unknown message type 0x%02x", peer->host,
584 type);
585
586 bgp_notify_io_invalid(peer, BGP_NOTIFY_HEADER_ERR,
587 BGP_NOTIFY_HEADER_BAD_MESTYPE, &type, 1);
588 return false;
589 }
590
591 /* Minimum packet length check. */
592 if ((size < BGP_HEADER_SIZE) || (size > peer->max_packet_size)
593 || (type == BGP_MSG_OPEN && size < BGP_MSG_OPEN_MIN_SIZE)
594 || (type == BGP_MSG_UPDATE && size < BGP_MSG_UPDATE_MIN_SIZE)
595 || (type == BGP_MSG_NOTIFY && size < BGP_MSG_NOTIFY_MIN_SIZE)
596 || (type == BGP_MSG_KEEPALIVE && size != BGP_MSG_KEEPALIVE_MIN_SIZE)
597 || (type == BGP_MSG_ROUTE_REFRESH_NEW
598 && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
599 || (type == BGP_MSG_ROUTE_REFRESH_OLD
600 && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
601 || (type == BGP_MSG_CAPABILITY
602 && size < BGP_MSG_CAPABILITY_MIN_SIZE)) {
603 if (bgp_debug_neighbor_events(peer)) {
604 zlog_debug("%s bad message length - %d for %s",
605 peer->host, size,
606 type == 128 ? "ROUTE-REFRESH"
607 : bgp_type_str[(int)type]);
608 }
609
610 uint16_t nsize = htons(size);
611
612 bgp_notify_io_invalid(peer, BGP_NOTIFY_HEADER_ERR,
613 BGP_NOTIFY_HEADER_BAD_MESLEN,
614 (unsigned char *)&nsize, 2);
615 return false;
616 }
617
618 return true;
619 }