]> git.proxmox.com Git - mirror_frr.git/blob - bgpd/bgp_io.c
bgpd: fix includes for bgp_io.c
[mirror_frr.git] / bgpd / bgp_io.c
1 /* BGP I/O.
2 * Implements packet I/O in a consumer pthread.
3 * Copyright (C) 2017 Cumulus Networks
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; see the file COPYING; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston,
18 * MA 02110-1301 USA
19 */
20
21 #include <zebra.h>
22 #include <pthread.h> // for pthread_mutex_unlock, pthread_mutex_lock
23
24 #include "frr_pthread.h" // for frr_pthread_get, frr_pthread
25 #include "linklist.h" // for list_delete, list_delete_all_node, lis...
26 #include "log.h" // for zlog_debug, safe_strerror, zlog_err
27 #include "memory.h" // for MTYPE_TMP, XCALLOC, XFREE
28 #include "network.h" // for ERRNO_IO_RETRY
29 #include "stream.h" // for stream_get_endp, stream_getw_from, str...
30 #include "thread.h" // for THREAD_OFF, THREAD_ARG, thread, thread...
31 #include "zassert.h" // for assert
32
33 #include "bgpd/bgp_io.h"
34 #include "bgpd/bgp_debug.h" // for bgp_debug_neighbor_events, bgp_type_str
35 #include "bgpd/bgp_fsm.h" // for BGP_EVENT_ADD, bgp_event
36 #include "bgpd/bgp_packet.h" // for bgp_notify_send_with_data, bgp_notify...
37 #include "bgpd/bgpd.h" // for peer, BGP_MARKER_SIZE, bgp_master, bm
38
39 /* forward declarations */
40 static uint16_t bgp_write(struct peer *);
41 static uint16_t bgp_read(struct peer *);
42 static int bgp_process_writes(struct thread *);
43 static int bgp_process_reads(struct thread *);
44 static bool validate_header(struct peer *);
45
46 /* generic i/o status codes */
47 #define BGP_IO_TRANS_ERR (1 << 1) // EAGAIN or similar occurred
48 #define BGP_IO_FATAL_ERR (1 << 2) // some kind of fatal TCP error
49
50 /* bgp_read() status codes */
51 #define BGP_IO_READ_HEADER (1 << 3) // when read a full packet header
52 #define BGP_IO_READ_FULLPACKET (1 << 4) // read a full packet
53
54 /* Start and stop routines for I/O pthread + control variables
55 * ------------------------------------------------------------------------ */
56 bool bgp_packet_write_thread_run = false;
57 pthread_mutex_t *work_mtx;
58
59 static struct list *read_cancel;
60 static struct list *write_cancel;
61
62 void bgp_io_init()
63 {
64 work_mtx = XCALLOC(MTYPE_TMP, sizeof(pthread_mutex_t));
65 pthread_mutex_init(work_mtx, NULL);
66
67 read_cancel = list_new();
68 write_cancel = list_new();
69 }
70
71 void *bgp_io_start(void *arg)
72 {
73 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
74
75 // we definitely don't want to handle signals
76 fpt->master->handle_signals = false;
77
78 bgp_packet_write_thread_run = true;
79 struct thread task;
80
81 while (bgp_packet_write_thread_run) {
82 if (thread_fetch(fpt->master, &task)) {
83 pthread_mutex_lock(work_mtx);
84 {
85 bool cancel = false;
86 struct peer *peer = THREAD_ARG(&task);
87 if ((task.func == bgp_process_reads
88 && listnode_lookup(read_cancel, peer))
89 || (task.func == bgp_process_writes
90 && listnode_lookup(write_cancel, peer)))
91 cancel = true;
92
93 list_delete_all_node(write_cancel);
94 list_delete_all_node(read_cancel);
95
96 if (!cancel)
97 thread_call(&task);
98 }
99 pthread_mutex_unlock(work_mtx);
100 }
101 }
102
103 return NULL;
104 }
105
106 int bgp_io_stop(void **result, struct frr_pthread *fpt)
107 {
108 fpt->master->spin = false;
109 bgp_packet_write_thread_run = false;
110 pthread_kill(fpt->thread, SIGINT);
111 pthread_join(fpt->thread, result);
112
113 pthread_mutex_unlock(work_mtx);
114 pthread_mutex_destroy(work_mtx);
115
116 list_delete(read_cancel);
117 list_delete(write_cancel);
118 XFREE(MTYPE_TMP, work_mtx);
119 return 0;
120 }
121 /* ------------------------------------------------------------------------ */
122
123 void bgp_writes_on(struct peer *peer)
124 {
125 assert(peer->status != Deleted);
126 assert(peer->obuf);
127 assert(peer->ibuf);
128 assert(peer->ibuf_work);
129 assert(!peer->t_connect_check);
130 assert(peer->fd);
131
132 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
133
134 pthread_mutex_lock(work_mtx);
135 {
136 listnode_delete(write_cancel, peer);
137 thread_add_write(fpt->master, bgp_process_writes, peer,
138 peer->fd, &peer->t_write);
139 SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
140 }
141 pthread_mutex_unlock(work_mtx);
142 }
143
144 void bgp_writes_off(struct peer *peer)
145 {
146 pthread_mutex_lock(work_mtx);
147 {
148 THREAD_OFF(peer->t_write);
149 THREAD_OFF(peer->t_generate_updgrp_packets);
150 listnode_add(write_cancel, peer);
151
152 // peer access by us after this point will result in pain
153 UNSET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
154 }
155 pthread_mutex_unlock(work_mtx);
156 /* upon return, i/o thread must not access the peer */
157 }
158
159 void bgp_reads_on(struct peer *peer)
160 {
161 assert(peer->status != Deleted);
162 assert(peer->ibuf);
163 assert(peer->fd);
164 assert(peer->ibuf_work);
165 assert(stream_get_endp(peer->ibuf_work) == 0);
166 assert(peer->obuf);
167 assert(!peer->t_connect_check);
168 assert(peer->fd);
169
170 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
171
172 pthread_mutex_lock(work_mtx);
173 {
174 listnode_delete(read_cancel, peer);
175 thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
176 &peer->t_read);
177 SET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
178 }
179 pthread_mutex_unlock(work_mtx);
180 }
181
182 void bgp_reads_off(struct peer *peer)
183 {
184 pthread_mutex_lock(work_mtx);
185 {
186 THREAD_OFF(peer->t_read);
187 THREAD_OFF(peer->t_process_packet);
188 listnode_add(read_cancel, peer);
189
190 // peer access by us after this point will result in pain
191 UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
192 }
193 pthread_mutex_unlock(work_mtx);
194 }
195
196 /**
197 * Called from PTHREAD_IO when select() or poll() determines that the file
198 * descriptor is ready to be written to.
199 */
200 static int bgp_process_writes(struct thread *thread)
201 {
202 static struct peer *peer;
203 peer = THREAD_ARG(thread);
204 uint16_t status;
205
206 if (peer->fd < 0)
207 return -1;
208
209 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
210
211 bool reschedule;
212 pthread_mutex_lock(&peer->io_mtx);
213 {
214 status = bgp_write(peer);
215 reschedule = (stream_fifo_head(peer->obuf) != NULL);
216 }
217 pthread_mutex_unlock(&peer->io_mtx);
218
219 if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */
220 }
221
222 if (CHECK_FLAG(status, BGP_IO_FATAL_ERR))
223 reschedule = 0; // problem
224
225 if (reschedule) {
226 thread_add_write(fpt->master, bgp_process_writes, peer,
227 peer->fd, &peer->t_write);
228 thread_add_background(bm->master, bgp_generate_updgrp_packets,
229 peer, 0,
230 &peer->t_generate_updgrp_packets);
231 }
232
233 return 0;
234 }
235
236 /**
237 * Called from PTHREAD_IO when select() or poll() determines that the file
238 * descriptor is ready to be read from.
239 */
240 static int bgp_process_reads(struct thread *thread)
241 {
242 static struct peer *peer;
243 peer = THREAD_ARG(thread);
244 uint16_t status;
245
246 if (peer->fd < 0)
247 return -1;
248
249 struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
250
251 bool reschedule = true;
252
253 // execute read
254 pthread_mutex_lock(&peer->io_mtx);
255 {
256 status = bgp_read(peer);
257 }
258 pthread_mutex_unlock(&peer->io_mtx);
259
260 // check results of read
261 bool header_valid = true;
262
263 if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */
264 }
265
266 if (CHECK_FLAG(status, BGP_IO_FATAL_ERR))
267 reschedule = false; // problem
268
269 if (CHECK_FLAG(status, BGP_IO_READ_HEADER)) {
270 header_valid = validate_header(peer);
271 if (!header_valid) {
272 bgp_size_t packetsize =
273 MIN((int)stream_get_endp(peer->ibuf_work),
274 BGP_MAX_PACKET_SIZE);
275 memcpy(peer->last_reset_cause, peer->ibuf_work->data,
276 packetsize);
277 peer->last_reset_cause_size = packetsize;
278 // We're tearing the session down, no point in
279 // rescheduling.
280 // Additionally, bgp_read() will use the TLV if it's
281 // present to
282 // determine how much to read; if this is corrupt, we'll
283 // crash the
284 // program.
285 reschedule = false;
286 }
287 }
288
289 // if we read a full packet, push it onto peer->ibuf, reset our WiP
290 // buffer
291 // and schedule a job to process it on the main thread
292 if (header_valid && CHECK_FLAG(status, BGP_IO_READ_FULLPACKET)) {
293 pthread_mutex_lock(&peer->io_mtx);
294 {
295 stream_fifo_push(peer->ibuf,
296 stream_dup(peer->ibuf_work));
297 }
298 pthread_mutex_unlock(&peer->io_mtx);
299 stream_reset(peer->ibuf_work);
300 assert(stream_get_endp(peer->ibuf_work) == 0);
301
302 thread_add_background(bm->master, bgp_process_packet, peer, 0,
303 &peer->t_process_packet);
304 }
305
306 if (reschedule)
307 thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
308 &peer->t_read);
309
310 return 0;
311 }
312
313 /**
314 * Flush peer output buffer.
315 *
316 * This function pops packets off of peer->obuf and writes them to peer->fd.
317 * The amount of packets written is equal to the minimum of peer->wpkt_quanta
318 * and the number of packets on the output buffer, unless an error occurs.
319 *
320 * If write() returns an error, the appropriate FSM event is generated.
321 *
322 * The return value is equal to the number of packets written
323 * (which may be zero).
324 */
325 static uint16_t bgp_write(struct peer *peer)
326 {
327 u_char type;
328 struct stream *s;
329 int num;
330 int update_last_write = 0;
331 unsigned int count = 0;
332 unsigned int oc = 0;
333 uint16_t status = 0;
334
335 while (count < peer->bgp->wpkt_quanta
336 && (s = stream_fifo_head(peer->obuf))) {
337 int writenum;
338 do {
339 writenum = stream_get_endp(s) - stream_get_getp(s);
340 num = write(peer->fd, STREAM_PNT(s), writenum);
341
342 if (num < 0) {
343 if (!ERRNO_IO_RETRY(errno)) {
344 BGP_EVENT_ADD(peer, TCP_fatal_error);
345 SET_FLAG(status, BGP_IO_FATAL_ERR);
346 } else {
347 SET_FLAG(status, BGP_IO_TRANS_ERR);
348 }
349
350 goto done;
351 } else if (num != writenum) // incomplete write
352 stream_forward_getp(s, num);
353
354 } while (num != writenum);
355
356 /* Retrieve BGP packet type. */
357 stream_set_getp(s, BGP_MARKER_SIZE + 2);
358 type = stream_getc(s);
359
360 switch (type) {
361 case BGP_MSG_OPEN:
362 peer->open_out++;
363 break;
364 case BGP_MSG_UPDATE:
365 peer->update_out++;
366 break;
367 case BGP_MSG_NOTIFY:
368 peer->notify_out++;
369 /* Double start timer. */
370 peer->v_start *= 2;
371
372 /* Overflow check. */
373 if (peer->v_start >= (60 * 2))
374 peer->v_start = (60 * 2);
375
376 /* Handle Graceful Restart case where the state changes
377 to
378 Connect instead of Idle */
379 /* Flush any existing events */
380 BGP_EVENT_ADD(peer, BGP_Stop);
381 goto done;
382
383 case BGP_MSG_KEEPALIVE:
384 peer->keepalive_out++;
385 break;
386 case BGP_MSG_ROUTE_REFRESH_NEW:
387 case BGP_MSG_ROUTE_REFRESH_OLD:
388 peer->refresh_out++;
389 break;
390 case BGP_MSG_CAPABILITY:
391 peer->dynamic_cap_out++;
392 break;
393 }
394
395 count++;
396
397 stream_free(stream_fifo_pop(peer->obuf));
398 update_last_write = 1;
399 }
400
401 done : {
402 /* Update last_update if UPDATEs were written. */
403 if (peer->update_out > oc)
404 peer->last_update = bgp_clock();
405
406 /* If we TXed any flavor of packet update last_write */
407 if (update_last_write)
408 peer->last_write = bgp_clock();
409 }
410
411 return status;
412 }
413
414 /**
415 * Reads <= 1 packet worth of data from peer->fd into peer->ibuf_work.
416 *
417 * @return whether a full packet was read
418 */
419 static uint16_t bgp_read(struct peer *peer)
420 {
421 int readsize; // how many bytes we want to read
422 int nbytes; // how many bytes we actually read
423 bool have_header = false;
424 uint16_t status = 0;
425
426 if (stream_get_endp(peer->ibuf_work) < BGP_HEADER_SIZE)
427 readsize = BGP_HEADER_SIZE - stream_get_endp(peer->ibuf_work);
428 else {
429 // retrieve packet length from tlv and compute # bytes we still
430 // need
431 u_int16_t mlen =
432 stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE);
433 readsize = mlen - stream_get_endp(peer->ibuf_work);
434 have_header = true;
435 }
436
437 nbytes = stream_read_try(peer->ibuf_work, peer->fd, readsize);
438
439 if (nbytes <= 0) // handle errors
440 {
441 switch (nbytes) {
442 case -1: // fatal error; tear down the session
443 zlog_err("%s [Error] bgp_read_packet error: %s",
444 peer->host, safe_strerror(errno));
445
446 if (peer->status == Established) {
447 if (CHECK_FLAG(peer->sflags,
448 PEER_STATUS_NSF_MODE)) {
449 peer->last_reset =
450 PEER_DOWN_NSF_CLOSE_SESSION;
451 SET_FLAG(peer->sflags,
452 PEER_STATUS_NSF_WAIT);
453 } else
454 peer->last_reset =
455 PEER_DOWN_CLOSE_SESSION;
456 }
457
458 BGP_EVENT_ADD(peer, TCP_fatal_error);
459 SET_FLAG(status, BGP_IO_FATAL_ERR);
460 break;
461
462 case 0: // TCP session closed
463 if (bgp_debug_neighbor_events(peer))
464 zlog_debug(
465 "%s [Event] BGP connection closed fd %d",
466 peer->host, peer->fd);
467
468 if (peer->status == Established) {
469 if (CHECK_FLAG(peer->sflags,
470 PEER_STATUS_NSF_MODE)) {
471 peer->last_reset =
472 PEER_DOWN_NSF_CLOSE_SESSION;
473 SET_FLAG(peer->sflags,
474 PEER_STATUS_NSF_WAIT);
475 } else
476 peer->last_reset =
477 PEER_DOWN_CLOSE_SESSION;
478 }
479
480 BGP_EVENT_ADD(peer, TCP_connection_closed);
481 SET_FLAG(status, BGP_IO_FATAL_ERR);
482 break;
483
484 case -2: // temporary error; come back later
485 SET_FLAG(status, BGP_IO_TRANS_ERR);
486 break;
487 default:
488 break;
489 }
490
491 return status;
492 }
493
494 // If we didn't have the header before read(), and now we do, set the
495 // appropriate flag. The caller must validate the header for us.
496 if (!have_header
497 && stream_get_endp(peer->ibuf_work) >= BGP_HEADER_SIZE) {
498 SET_FLAG(status, BGP_IO_READ_HEADER);
499 have_header = true;
500 }
501 // If we read the # of bytes specified in the tlv, we have read a full
502 // packet.
503 //
504 // Note that the header may not have been validated here. This flag
505 // means
506 // ONLY that we read the # of bytes specified in the header; if the
507 // header is
508 // not valid, the packet MUST NOT be processed further.
509 if (have_header && (stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE)
510 == stream_get_endp(peer->ibuf_work)))
511 SET_FLAG(status, BGP_IO_READ_FULLPACKET);
512
513 return status;
514 }
515
516 /*
517 * Called after we have read a BGP packet header. Validates marker, message
518 * type and packet length. If any of these aren't correct, sends a notify.
519 */
520 static bool validate_header(struct peer *peer)
521 {
522 u_int16_t size, type;
523
524 static uint8_t marker[BGP_MARKER_SIZE] = {
525 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
526 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
527
528 if (memcmp(marker, peer->ibuf_work->data, BGP_MARKER_SIZE) != 0) {
529 bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
530 BGP_NOTIFY_HEADER_NOT_SYNC);
531 return false;
532 }
533
534 /* Get size and type. */
535 size = stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE);
536 type = stream_getc_from(peer->ibuf_work, BGP_MARKER_SIZE + 2);
537
538 /* BGP type check. */
539 if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
540 && type != BGP_MSG_NOTIFY && type != BGP_MSG_KEEPALIVE
541 && type != BGP_MSG_ROUTE_REFRESH_NEW
542 && type != BGP_MSG_ROUTE_REFRESH_OLD
543 && type != BGP_MSG_CAPABILITY) {
544 if (bgp_debug_neighbor_events(peer))
545 zlog_debug("%s unknown message type 0x%02x", peer->host,
546 type);
547
548 bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
549 BGP_NOTIFY_HEADER_BAD_MESTYPE,
550 (u_char *)&type, 1);
551 return false;
552 }
553
554 /* Mimimum packet length check. */
555 if ((size < BGP_HEADER_SIZE) || (size > BGP_MAX_PACKET_SIZE)
556 || (type == BGP_MSG_OPEN && size < BGP_MSG_OPEN_MIN_SIZE)
557 || (type == BGP_MSG_UPDATE && size < BGP_MSG_UPDATE_MIN_SIZE)
558 || (type == BGP_MSG_NOTIFY && size < BGP_MSG_NOTIFY_MIN_SIZE)
559 || (type == BGP_MSG_KEEPALIVE && size != BGP_MSG_KEEPALIVE_MIN_SIZE)
560 || (type == BGP_MSG_ROUTE_REFRESH_NEW
561 && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
562 || (type == BGP_MSG_ROUTE_REFRESH_OLD
563 && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
564 || (type == BGP_MSG_CAPABILITY
565 && size < BGP_MSG_CAPABILITY_MIN_SIZE)) {
566 if (bgp_debug_neighbor_events(peer))
567 zlog_debug("%s bad message length - %d for %s",
568 peer->host, size,
569 type == 128 ? "ROUTE-REFRESH"
570 : bgp_type_str[(int)type]);
571
572 bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
573 BGP_NOTIFY_HEADER_BAD_MESLEN,
574 (u_char *)&size, 2);
575 return false;
576 }
577
578 return true;
579 }