]> git.proxmox.com Git - mirror_frr.git/blob - bgpd/bgp_io.c
Merge pull request #1606 from bingen/lm_privs
[mirror_frr.git] / bgpd / bgp_io.c
1 /* BGP I/O.
2 * Implements packet I/O in a pthread.
3 * Copyright (C) 2017 Cumulus Networks
4 * Quentin Young
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; see the file COPYING; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston,
19 * MA 02110-1301 USA
20 */
21
22 /* clang-format off */
23 #include <zebra.h>
24 #include <pthread.h> // for pthread_mutex_unlock, pthread_mutex_lock
25
26 #include "frr_pthread.h" // for frr_pthread_get, frr_pthread
27 #include "linklist.h" // for list_delete, list_delete_all_node, lis...
28 #include "log.h" // for zlog_debug, safe_strerror, zlog_err
29 #include "memory.h" // for MTYPE_TMP, XCALLOC, XFREE
30 #include "network.h" // for ERRNO_IO_RETRY
31 #include "stream.h" // for stream_get_endp, stream_getw_from, str...
32 #include "thread.h" // for THREAD_OFF, THREAD_ARG, thread, thread...
33 #include "zassert.h" // for assert
34
35 #include "bgpd/bgp_io.h"
36 #include "bgpd/bgp_debug.h" // for bgp_debug_neighbor_events, bgp_type_str
37 #include "bgpd/bgp_fsm.h" // for BGP_EVENT_ADD, bgp_event
38 #include "bgpd/bgp_packet.h" // for bgp_notify_send_with_data, bgp_notify...
39 #include "bgpd/bgpd.h" // for peer, BGP_MARKER_SIZE, bgp_master, bm
40 /* clang-format on */
41
42 /* forward declarations */
43 static uint16_t bgp_write(struct peer *);
44 static uint16_t bgp_read(struct peer *);
45 static int bgp_process_writes(struct thread *);
46 static int bgp_process_reads(struct thread *);
47 static bool validate_header(struct peer *);
48
49 /* generic i/o status codes */
50 #define BGP_IO_TRANS_ERR (1 << 0) // EAGAIN or similar occurred
51 #define BGP_IO_FATAL_ERR (1 << 1) // some kind of fatal TCP error
52
53 /* Start and stop routines for I/O pthread + control variables
54 * ------------------------------------------------------------------------ */
55 _Atomic bool bgp_io_thread_run;
56 _Atomic bool bgp_io_thread_started;
57
58 void bgp_io_init()
59 {
60 bgp_io_thread_run = false;
61 bgp_io_thread_started = false;
62 }
63
64 /* Unused callback for thread_add_read() */
65 static int bgp_io_dummy(struct thread *thread) { return 0; }
66
67 void *bgp_io_start(void *arg)
68 {
69 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
70 fpt->master->owner = pthread_self();
71
72 // fd so we can sleep in poll()
73 int sleeper[2];
74 pipe(sleeper);
75 thread_add_read(fpt->master, &bgp_io_dummy, NULL, sleeper[0], NULL);
76
77 // we definitely don't want to handle signals
78 fpt->master->handle_signals = false;
79
80 struct thread task;
81
82 atomic_store_explicit(&bgp_io_thread_run, true, memory_order_seq_cst);
83 atomic_store_explicit(&bgp_io_thread_started, true,
84 memory_order_seq_cst);
85
86 while (bgp_io_thread_run) {
87 if (thread_fetch(fpt->master, &task)) {
88 thread_call(&task);
89 }
90 }
91
92 close(sleeper[1]);
93 close(sleeper[0]);
94
95 return NULL;
96 }
97
98 static int bgp_io_finish(struct thread *thread)
99 {
100 atomic_store_explicit(&bgp_io_thread_run, false, memory_order_seq_cst);
101 return 0;
102 }
103
104 int bgp_io_stop(void **result, struct frr_pthread *fpt)
105 {
106 thread_add_event(fpt->master, &bgp_io_finish, NULL, 0, NULL);
107 pthread_join(fpt->thread, result);
108 return 0;
109 }
110
111 /* Extern API -------------------------------------------------------------- */
112 void bgp_io_running(void)
113 {
114 while (!atomic_load_explicit(&bgp_io_thread_started,
115 memory_order_seq_cst))
116 frr_pthread_yield();
117 }
118
119 void bgp_writes_on(struct peer *peer)
120 {
121 assert(peer->status != Deleted);
122 assert(peer->obuf);
123 assert(peer->ibuf);
124 assert(peer->ibuf_work);
125 assert(!peer->t_connect_check_r);
126 assert(!peer->t_connect_check_w);
127 assert(peer->fd);
128
129 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
130
131 thread_add_write(fpt->master, bgp_process_writes, peer, peer->fd,
132 &peer->t_write);
133 SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
134 }
135
136 void bgp_writes_off(struct peer *peer)
137 {
138 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
139
140 thread_cancel_async(fpt->master, &peer->t_write, NULL);
141 THREAD_OFF(peer->t_generate_updgrp_packets);
142
143 UNSET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
144 }
145
146 void bgp_reads_on(struct peer *peer)
147 {
148 assert(peer->status != Deleted);
149 assert(peer->ibuf);
150 assert(peer->fd);
151 assert(peer->ibuf_work);
152 assert(peer->obuf);
153 assert(!peer->t_connect_check_r);
154 assert(!peer->t_connect_check_w);
155 assert(peer->fd);
156
157 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
158
159 thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
160 &peer->t_read);
161
162 SET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
163 }
164
165 void bgp_reads_off(struct peer *peer)
166 {
167 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
168
169 thread_cancel_async(fpt->master, &peer->t_read, NULL);
170 THREAD_OFF(peer->t_process_packet);
171
172 UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
173 }
174
175 /* Internal functions ------------------------------------------------------- */
176
177 /**
178 * Called from I/O pthread when a file descriptor has become ready for writing.
179 */
180 static int bgp_process_writes(struct thread *thread)
181 {
182 static struct peer *peer;
183 peer = THREAD_ARG(thread);
184 uint16_t status;
185 bool reschedule;
186 bool fatal = false;
187
188 if (peer->fd < 0)
189 return -1;
190
191 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
192
193 pthread_mutex_lock(&peer->io_mtx);
194 {
195 status = bgp_write(peer);
196 reschedule = (stream_fifo_head(peer->obuf) != NULL);
197 }
198 pthread_mutex_unlock(&peer->io_mtx);
199
200 if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */
201 }
202
203 if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) {
204 reschedule = false; /* problem */
205 fatal = true;
206 }
207
208 if (reschedule) {
209 thread_add_write(fpt->master, bgp_process_writes, peer,
210 peer->fd, &peer->t_write);
211 } else if (!fatal) {
212 BGP_TIMER_ON(peer->t_generate_updgrp_packets,
213 bgp_generate_updgrp_packets, 0);
214 }
215
216 return 0;
217 }
218
219 /**
220 * Called from I/O pthread when a file descriptor has become ready for reading,
221 * or has hung up.
222 *
223 * We read as much data as possible, process as many packets as we can and
224 * place them on peer->ibuf for secondary processing by the main thread.
225 */
226 static int bgp_process_reads(struct thread *thread)
227 {
228 /* clang-format off */
229 static struct peer *peer; // peer to read from
230 uint16_t status; // bgp_read status code
231 bool more = true; // whether we got more data
232 bool fatal = false; // whether fatal error occurred
233 bool added_pkt = false; // whether we pushed onto ->ibuf
234 bool header_valid = true; // whether header is valid
235 /* clang-format on */
236
237 peer = THREAD_ARG(thread);
238
239 if (peer->fd < 0)
240 return -1;
241
242 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
243
244 pthread_mutex_lock(&peer->io_mtx);
245 {
246 status = bgp_read(peer);
247 }
248 pthread_mutex_unlock(&peer->io_mtx);
249
250 /* error checking phase */
251 if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) {
252 /* no problem; just don't process packets */
253 more = false;
254 }
255
256 if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) {
257 /* problem; tear down session */
258 more = false;
259 fatal = true;
260 }
261
262 while (more) {
263 /* static buffer for transferring packets */
264 static unsigned char pktbuf[BGP_MAX_PACKET_SIZE];
265 /* shorter alias to peer's input buffer */
266 struct stream *ibw = peer->ibuf_work;
267 /* offset of start of current packet */
268 size_t offset = stream_get_getp(ibw);
269 /* packet size as given by header */
270 u_int16_t pktsize = 0;
271
272 /* check that we have enough data for a header */
273 if (STREAM_READABLE(ibw) < BGP_HEADER_SIZE)
274 break;
275
276 /* validate header */
277 header_valid = validate_header(peer);
278
279 if (!header_valid) {
280 fatal = true;
281 break;
282 }
283
284 /* header is valid; retrieve packet size */
285 pktsize = stream_getw_from(ibw, offset + BGP_MARKER_SIZE);
286
287 /* if this fails we are seriously screwed */
288 assert(pktsize <= BGP_MAX_PACKET_SIZE);
289
290 /* If we have that much data, chuck it into its own
291 * stream and append to input queue for processing. */
292 if (STREAM_READABLE(ibw) >= pktsize) {
293 struct stream *pkt = stream_new(pktsize);
294 stream_get(pktbuf, ibw, pktsize);
295 stream_put(pkt, pktbuf, pktsize);
296
297 pthread_mutex_lock(&peer->io_mtx);
298 {
299 stream_fifo_push(peer->ibuf, pkt);
300 }
301 pthread_mutex_unlock(&peer->io_mtx);
302
303 added_pkt = true;
304 } else
305 break;
306 }
307
308 /*
309 * After reading:
310 * 1. Move unread data to stream start to make room for more.
311 * 2. Reschedule and return when we have additional data.
312 *
313 * XXX: Heavy abuse of stream API. This needs a ring buffer.
314 */
315 if (more && STREAM_WRITEABLE(peer->ibuf_work) < BGP_MAX_PACKET_SIZE) {
316 void *from = stream_pnt(peer->ibuf_work);
317 void *to = peer->ibuf_work->data;
318 size_t siz = STREAM_READABLE(peer->ibuf_work);
319 memmove(to, from, siz);
320 stream_set_getp(peer->ibuf_work, 0);
321 stream_set_endp(peer->ibuf_work, siz);
322 }
323
324 assert(STREAM_WRITEABLE(peer->ibuf_work) >= BGP_MAX_PACKET_SIZE);
325
326 /* handle invalid header */
327 if (fatal) {
328 /* wipe buffer just in case someone screwed up */
329 stream_reset(peer->ibuf_work);
330 } else {
331 thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
332 &peer->t_read);
333 if (added_pkt)
334 thread_add_timer_msec(bm->master, bgp_process_packet,
335 peer, 0, &peer->t_process_packet);
336 }
337
338 return 0;
339 }
340
341 /**
342 * Flush peer output buffer.
343 *
344 * This function pops packets off of peer->obuf and writes them to peer->fd.
345 * The amount of packets written is equal to the minimum of peer->wpkt_quanta
346 * and the number of packets on the output buffer, unless an error occurs.
347 *
348 * If write() returns an error, the appropriate FSM event is generated.
349 *
350 * The return value is equal to the number of packets written
351 * (which may be zero).
352 */
353 static uint16_t bgp_write(struct peer *peer)
354 {
355 u_char type;
356 struct stream *s;
357 int num;
358 int update_last_write = 0;
359 unsigned int count = 0;
360 uint32_t oc;
361 uint32_t uo;
362 uint16_t status = 0;
363 uint32_t wpkt_quanta_old;
364
365 // save current # updates sent
366 oc = atomic_load_explicit(&peer->update_out, memory_order_relaxed);
367
368 // cache current write quanta
369 wpkt_quanta_old =
370 atomic_load_explicit(&peer->bgp->wpkt_quanta, memory_order_relaxed);
371
372 while (count < wpkt_quanta_old && (s = stream_fifo_head(peer->obuf))) {
373 int writenum;
374 do {
375 writenum = stream_get_endp(s) - stream_get_getp(s);
376 num = write(peer->fd, STREAM_PNT(s), writenum);
377
378 if (num < 0) {
379 if (!ERRNO_IO_RETRY(errno)) {
380 BGP_EVENT_ADD(peer, TCP_fatal_error);
381 SET_FLAG(status, BGP_IO_FATAL_ERR);
382 } else {
383 SET_FLAG(status, BGP_IO_TRANS_ERR);
384 }
385
386 goto done;
387 } else if (num != writenum) // incomplete write
388 stream_forward_getp(s, num);
389
390 } while (num != writenum);
391
392 /* Retrieve BGP packet type. */
393 stream_set_getp(s, BGP_MARKER_SIZE + 2);
394 type = stream_getc(s);
395
396 switch (type) {
397 case BGP_MSG_OPEN:
398 atomic_fetch_add_explicit(&peer->open_out, 1,
399 memory_order_relaxed);
400 break;
401 case BGP_MSG_UPDATE:
402 atomic_fetch_add_explicit(&peer->update_out, 1,
403 memory_order_relaxed);
404 break;
405 case BGP_MSG_NOTIFY:
406 atomic_fetch_add_explicit(&peer->notify_out, 1,
407 memory_order_relaxed);
408 /* Double start timer. */
409 peer->v_start *= 2;
410
411 /* Overflow check. */
412 if (peer->v_start >= (60 * 2))
413 peer->v_start = (60 * 2);
414
415 /* Handle Graceful Restart case where the state changes
416 * to Connect instead of Idle */
417 BGP_EVENT_ADD(peer, BGP_Stop);
418 goto done;
419
420 case BGP_MSG_KEEPALIVE:
421 atomic_fetch_add_explicit(&peer->keepalive_out, 1,
422 memory_order_relaxed);
423 break;
424 case BGP_MSG_ROUTE_REFRESH_NEW:
425 case BGP_MSG_ROUTE_REFRESH_OLD:
426 atomic_fetch_add_explicit(&peer->refresh_out, 1,
427 memory_order_relaxed);
428 break;
429 case BGP_MSG_CAPABILITY:
430 atomic_fetch_add_explicit(&peer->dynamic_cap_out, 1,
431 memory_order_relaxed);
432 break;
433 }
434
435 count++;
436
437 stream_free(stream_fifo_pop(peer->obuf));
438 update_last_write = 1;
439 }
440
441 done : {
442 /* Update last_update if UPDATEs were written. */
443 uo = atomic_load_explicit(&peer->update_out, memory_order_relaxed);
444 if (uo > oc)
445 atomic_store_explicit(&peer->last_update, bgp_clock(),
446 memory_order_relaxed);
447
448 /* If we TXed any flavor of packet */
449 if (update_last_write)
450 atomic_store_explicit(&peer->last_write, bgp_clock(),
451 memory_order_relaxed);
452 }
453
454 return status;
455 }
456
457 /**
458 * Reads a chunk of data from peer->fd into peer->ibuf_work.
459 *
460 * @return status flag (see top-of-file)
461 */
462 static uint16_t bgp_read(struct peer *peer)
463 {
464 size_t readsize; // how many bytes we want to read
465 ssize_t nbytes; // how many bytes we actually read
466 uint16_t status = 0;
467
468 readsize = STREAM_WRITEABLE(peer->ibuf_work);
469
470 nbytes = stream_read_try(peer->ibuf_work, peer->fd, readsize);
471
472 switch (nbytes) {
473 /* Fatal error; tear down session */
474 case -1:
475 zlog_err("%s [Error] bgp_read_packet error: %s", peer->host,
476 safe_strerror(errno));
477
478 if (peer->status == Established) {
479 if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_MODE)) {
480 peer->last_reset = PEER_DOWN_NSF_CLOSE_SESSION;
481 SET_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT);
482 } else
483 peer->last_reset = PEER_DOWN_CLOSE_SESSION;
484 }
485
486 BGP_EVENT_ADD(peer, TCP_fatal_error);
487 SET_FLAG(status, BGP_IO_FATAL_ERR);
488 break;
489
490 /* Received EOF / TCP session closed */
491 case 0:
492 if (bgp_debug_neighbor_events(peer))
493 zlog_debug("%s [Event] BGP connection closed fd %d",
494 peer->host, peer->fd);
495
496 if (peer->status == Established) {
497 if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_MODE)) {
498 peer->last_reset = PEER_DOWN_NSF_CLOSE_SESSION;
499 SET_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT);
500 } else
501 peer->last_reset = PEER_DOWN_CLOSE_SESSION;
502 }
503
504 BGP_EVENT_ADD(peer, TCP_connection_closed);
505 SET_FLAG(status, BGP_IO_FATAL_ERR);
506 break;
507
508 /* EAGAIN or EWOULDBLOCK; come back later */
509 case -2:
510 SET_FLAG(status, BGP_IO_TRANS_ERR);
511 break;
512 default:
513 break;
514 }
515
516 return status;
517 }
518
519 /*
520 * Called after we have read a BGP packet header. Validates marker, message
521 * type and packet length. If any of these aren't correct, sends a notify.
522 */
523 static bool validate_header(struct peer *peer)
524 {
525 uint16_t size;
526 uint8_t type;
527 struct stream *pkt = peer->ibuf_work;
528 size_t getp = stream_get_getp(pkt);
529
530 static uint8_t marker[BGP_MARKER_SIZE] = {
531 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
532 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
533
534 if (memcmp(marker, stream_pnt(pkt), BGP_MARKER_SIZE) != 0) {
535 bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
536 BGP_NOTIFY_HEADER_NOT_SYNC);
537 return false;
538 }
539
540 /* Get size and type in host byte order. */
541 size = stream_getw_from(pkt, getp + BGP_MARKER_SIZE);
542 type = stream_getc_from(pkt, getp + BGP_MARKER_SIZE + 2);
543
544 /* BGP type check. */
545 if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
546 && type != BGP_MSG_NOTIFY && type != BGP_MSG_KEEPALIVE
547 && type != BGP_MSG_ROUTE_REFRESH_NEW
548 && type != BGP_MSG_ROUTE_REFRESH_OLD
549 && type != BGP_MSG_CAPABILITY) {
550 if (bgp_debug_neighbor_events(peer))
551 zlog_debug("%s unknown message type 0x%02x", peer->host,
552 type);
553
554 bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
555 BGP_NOTIFY_HEADER_BAD_MESTYPE,
556 &type, 1);
557 return false;
558 }
559
560 /* Minimum packet length check. */
561 if ((size < BGP_HEADER_SIZE) || (size > BGP_MAX_PACKET_SIZE)
562 || (type == BGP_MSG_OPEN && size < BGP_MSG_OPEN_MIN_SIZE)
563 || (type == BGP_MSG_UPDATE && size < BGP_MSG_UPDATE_MIN_SIZE)
564 || (type == BGP_MSG_NOTIFY && size < BGP_MSG_NOTIFY_MIN_SIZE)
565 || (type == BGP_MSG_KEEPALIVE && size != BGP_MSG_KEEPALIVE_MIN_SIZE)
566 || (type == BGP_MSG_ROUTE_REFRESH_NEW
567 && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
568 || (type == BGP_MSG_ROUTE_REFRESH_OLD
569 && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
570 || (type == BGP_MSG_CAPABILITY
571 && size < BGP_MSG_CAPABILITY_MIN_SIZE)) {
572 if (bgp_debug_neighbor_events(peer)) {
573 zlog_debug("%s bad message length - %d for %s",
574 peer->host, size,
575 type == 128 ? "ROUTE-REFRESH"
576 : bgp_type_str[(int) type]);
577 }
578
579 uint16_t nsize = htons(size);
580
581 bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
582 BGP_NOTIFY_HEADER_BAD_MESLEN,
583 (unsigned char *) &nsize, 2);
584 return false;
585 }
586
587 return true;
588 }