]> git.proxmox.com Git - mirror_frr.git/blob - pceplib/pcep_socket_comm_loop.c
Merge pull request #12720 from opensourcerouting/fix/ecommunity_ipv6_missing_token
[mirror_frr.git] / pceplib / pcep_socket_comm_loop.c
1 /*
2 * This file is part of the PCEPlib, a PCEP protocol library.
3 *
4 * Copyright (C) 2020 Volta Networks https://voltanet.io/
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <https://www.gnu.org/licenses/>.
18 *
19 * Author : Brady Johnson <brady@voltanet.io>
20 *
21 */
22
23
24 #ifdef HAVE_CONFIG_H
25 #include "config.h"
26 #endif
27
28 #include <errno.h>
29 #include <stdbool.h>
30 #include <stddef.h>
31 #include <string.h>
32 #include <unistd.h>
33 #include <assert.h>
34
35 #include "pcep_socket_comm_internals.h"
36 #include "pcep_socket_comm_loop.h"
37 #include "pcep_utils_logging.h"
38 #include "pcep_utils_ordered_list.h"
39 #include "pcep_utils_logging.h"
40 #include "pcep_utils_memory.h"
41
42 void write_message(int socket_fd, const char *message, unsigned int msg_length);
43 unsigned int read_message(int socket_fd, char *received_message,
44 unsigned int max_message_size);
45 int build_fd_sets(pcep_socket_comm_handle *socket_comm_handle);
46 void handle_writes(pcep_socket_comm_handle *socket_comm_handle);
47 void handle_excepts(pcep_socket_comm_handle *socket_comm_handle);
48
49 bool comm_session_exists(pcep_socket_comm_handle *socket_comm_handle,
50 pcep_socket_comm_session *socket_comm_session)
51 {
52 if (socket_comm_handle == NULL) {
53 return false;
54 }
55
56 return (ordered_list_find(socket_comm_handle->session_list,
57 socket_comm_session)
58 != NULL);
59 }
60
61
62 bool comm_session_exists_locking(pcep_socket_comm_handle *socket_comm_handle,
63 pcep_socket_comm_session *socket_comm_session)
64 {
65 if (socket_comm_handle == NULL) {
66 return false;
67 }
68
69 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
70 bool exists =
71 comm_session_exists(socket_comm_handle, socket_comm_session);
72 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
73
74 return exists;
75 }
76
77
78 void write_message(int socket_fd, const char *message, unsigned int msg_length)
79 {
80 ssize_t bytes_sent = 0;
81 unsigned int total_bytes_sent = 0;
82
83 while ((uint32_t)bytes_sent < msg_length) {
84 bytes_sent = write(socket_fd, message + total_bytes_sent,
85 msg_length);
86
87 pcep_log(
88 LOG_INFO,
89 "%s: [%ld-%ld] socket_comm writing on socket fd [%d] msg_lenth [%u] bytes sent [%d]",
90 __func__, time(NULL), pthread_self(), socket_fd,
91 msg_length, bytes_sent);
92
93 if (bytes_sent < 0) {
94 if (errno != EAGAIN && errno != EWOULDBLOCK) {
95 pcep_log(LOG_WARNING, "%s: send() failure",
96 __func__);
97
98 return;
99 }
100 } else {
101 total_bytes_sent += bytes_sent;
102 }
103 }
104 }
105
106
107 unsigned int read_message(int socket_fd, char *received_message,
108 unsigned int max_message_size)
109 {
110 /* TODO what if bytes_read == max_message_size? there could be more to
111 * read */
112 unsigned int bytes_read =
113 read(socket_fd, received_message, max_message_size);
114 pcep_log(
115 LOG_INFO,
116 "%s: [%ld-%ld] socket_comm read message bytes_read [%u] on socket fd [%d]",
117 __func__, time(NULL), pthread_self(), bytes_read, socket_fd);
118
119 return bytes_read;
120 }
121
122
123 int build_fd_sets(pcep_socket_comm_handle *socket_comm_handle)
124 {
125 int max_fd = 0;
126
127 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
128
129 FD_ZERO(&socket_comm_handle->except_master_set);
130 FD_ZERO(&socket_comm_handle->read_master_set);
131 ordered_list_node *node = socket_comm_handle->read_list->head;
132 pcep_socket_comm_session *comm_session;
133 while (node != NULL) {
134 comm_session = (pcep_socket_comm_session *)node->data;
135 if (comm_session->socket_fd > max_fd) {
136 max_fd = comm_session->socket_fd;
137 } else if (comm_session->socket_fd < 0) {
138 pcep_log(LOG_ERR, "%s: Negative fd", __func__);
139 assert(comm_session->socket_fd > 0);
140 }
141
142 /*pcep_log(LOG_DEBUG, ld] socket_comm::build_fdSets set
143 ready_toRead
144 [%d]", __func__, time(NULL), comm_session->socket_fd);*/
145 FD_SET(comm_session->socket_fd,
146 &socket_comm_handle->read_master_set);
147 FD_SET(comm_session->socket_fd,
148 &socket_comm_handle->except_master_set);
149 node = node->next_node;
150 }
151
152 FD_ZERO(&socket_comm_handle->write_master_set);
153 node = socket_comm_handle->write_list->head;
154 while (node != NULL) {
155 comm_session = (pcep_socket_comm_session *)node->data;
156 if (comm_session->socket_fd > max_fd) {
157 max_fd = comm_session->socket_fd;
158 } else if (comm_session->socket_fd < 0) {
159 pcep_log(LOG_ERR, "%s: Negative fd", __func__);
160 assert(comm_session->socket_fd > 0);
161 }
162
163 /*pcep_log(LOG_DEBUG, "%s: [%ld] socket_comm::build_fdSets set
164 ready_toWrite [%d]", __func__, time(NULL),
165 comm_session->socket_fd);*/
166 FD_SET(comm_session->socket_fd,
167 &socket_comm_handle->write_master_set);
168 FD_SET(comm_session->socket_fd,
169 &socket_comm_handle->except_master_set);
170 node = node->next_node;
171 }
172
173 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
174
175 return max_fd + 1;
176 }
177
178
179 void handle_reads(pcep_socket_comm_handle *socket_comm_handle)
180 {
181
182 /*
183 * iterate all the socket_fd's in the read_list. it may be that not
184 * all of them have something to read. dont remove the socket_fd
185 * from the read_list since messages could come at any time.
186 */
187
188 /* Notice: Only locking the mutex when accessing the read_list,
189 * since the read callbacks may end up calling back into the socket
190 * comm module to write messages which could be a deadlock. */
191 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
192 ordered_list_node *node = socket_comm_handle->read_list->head;
193 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
194
195 while (node != NULL) {
196 pcep_socket_comm_session *comm_session =
197 (pcep_socket_comm_session *)node->data;
198
199 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
200 node = node->next_node;
201 if (!comm_session_exists(socket_comm_handle, comm_session)) {
202 /* This comm_session has been deleted, move on to the
203 * next one */
204 pthread_mutex_unlock(
205 &(socket_comm_handle->socket_comm_mutex));
206 continue;
207 }
208
209 int is_set = FD_ISSET(comm_session->socket_fd,
210 &(socket_comm_handle->read_master_set));
211 /* Upon read failure, the comm_session might be free'd, so we
212 * cant store the received_bytes in the comm_session, until we
213 * know the read was successful. */
214 int received_bytes = 0;
215 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
216
217 if (is_set) {
218 FD_CLR(comm_session->socket_fd,
219 &(socket_comm_handle->read_master_set));
220
221 /* either read the message locally, or call the
222 * message_ready_handler to read it */
223 if (comm_session->message_handler != NULL) {
224 received_bytes = read_message(
225 comm_session->socket_fd,
226 comm_session->received_message,
227 MAX_RECVD_MSG_SIZE);
228 if (received_bytes > 0) {
229 /* Send the received message to the
230 * handler */
231 comm_session->received_bytes =
232 received_bytes;
233 comm_session->message_handler(
234 comm_session->session_data,
235 comm_session->received_message,
236 comm_session->received_bytes);
237 }
238 } else {
239 /* Tell the handler a message is ready to be
240 * read. The comm_session may be destroyed in
241 * this call, if
242 * there is an error reading or if the socket is
243 * closed. */
244 received_bytes =
245 comm_session
246 ->message_ready_to_read_handler(
247 comm_session
248 ->session_data,
249 comm_session
250 ->socket_fd);
251 }
252
253 /* handle the read results */
254 if (received_bytes == 0) {
255 if (comm_session_exists_locking(
256 socket_comm_handle, comm_session)) {
257 comm_session->received_bytes = 0;
258 /* the socket was closed */
259 /* TODO should we define a socket except
260 * enum? or will the only time we call
261 * this is when the socket is closed??
262 */
263 if (comm_session->conn_except_notifier
264 != NULL) {
265 comm_session->conn_except_notifier(
266 comm_session
267 ->session_data,
268 comm_session
269 ->socket_fd);
270 }
271
272 /* stop reading from the socket if its
273 * closed */
274 pthread_mutex_lock(
275 &(socket_comm_handle
276 ->socket_comm_mutex));
277 ordered_list_remove_first_node_equals(
278 socket_comm_handle->read_list,
279 comm_session);
280 pthread_mutex_unlock(
281 &(socket_comm_handle
282 ->socket_comm_mutex));
283 }
284 } else if (received_bytes < 0) {
285 /* TODO should we call conn_except_notifier()
286 * here ? */
287 pcep_log(
288 LOG_WARNING,
289 "%s: Error on socket fd [%d] : errno [%d][%s]",
290 __func__, comm_session->socket_fd,
291 errno, strerror(errno));
292 } else {
293 comm_session->received_bytes = received_bytes;
294 }
295 }
296 }
297 }
298
299
300 void handle_writes(pcep_socket_comm_handle *socket_comm_handle)
301 {
302 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
303
304 /*
305 * iterate all the socket_fd's in the write_list. it may be that not
306 * all of them are ready to be written to. only remove the socket_fd
307 * from the list if it is ready to be written to.
308 */
309
310 ordered_list_node *node = socket_comm_handle->write_list->head;
311 pcep_socket_comm_session *comm_session;
312 bool msg_written;
313 while (node != NULL) {
314 comm_session = (pcep_socket_comm_session *)node->data;
315 node = node->next_node;
316 msg_written = false;
317
318 if (!comm_session_exists(socket_comm_handle, comm_session)) {
319 /* This comm_session has been deleted, move on to the
320 * next one */
321 continue;
322 }
323
324 if (FD_ISSET(comm_session->socket_fd,
325 &(socket_comm_handle->write_master_set))) {
326 /* only remove the entry from the list, if it is written
327 * to */
328 ordered_list_remove_first_node_equals(
329 socket_comm_handle->write_list, comm_session);
330 FD_CLR(comm_session->socket_fd,
331 &(socket_comm_handle->write_master_set));
332
333 /* dequeue all the comm_session messages and send them
334 */
335 pcep_socket_comm_queued_message *queued_message =
336 queue_dequeue(comm_session->message_queue);
337 while (queued_message != NULL) {
338 msg_written = true;
339 write_message(comm_session->socket_fd,
340 queued_message->encoded_message,
341 queued_message->msg_length);
342 if (queued_message->free_after_send) {
343 pceplib_free(PCEPLIB_MESSAGES,
344 (void *)queued_message
345 ->encoded_message);
346 }
347 pceplib_free(PCEPLIB_MESSAGES, queued_message);
348 queued_message = queue_dequeue(
349 comm_session->message_queue);
350 }
351 }
352
353 /* check if the socket should be closed after writing */
354 if (comm_session->close_after_write == true) {
355 if (comm_session->message_queue->num_entries == 0) {
356 /* TODO check to make sure modifying the
357 * write_list while iterating it doesn't cause
358 * problems. */
359 pcep_log(
360 LOG_DEBUG,
361 "%s: handle_writes close() socket fd [%d]",
362 __func__, comm_session->socket_fd);
363 ordered_list_remove_first_node_equals(
364 socket_comm_handle->read_list,
365 comm_session);
366 ordered_list_remove_first_node_equals(
367 socket_comm_handle->write_list,
368 comm_session);
369 close(comm_session->socket_fd);
370 comm_session->socket_fd = -1;
371 }
372 }
373
374 if (comm_session->message_sent_handler != NULL
375 && msg_written == true) {
376 /* Unlocking to allow the message_sent_handler to
377 * make calls like destroy_socket_comm_session */
378 pthread_mutex_unlock(
379 &(socket_comm_handle->socket_comm_mutex));
380 comm_session->message_sent_handler(
381 comm_session->session_data,
382 comm_session->socket_fd);
383 pthread_mutex_lock(
384 &(socket_comm_handle->socket_comm_mutex));
385 }
386 }
387
388 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
389 }
390
391
392 void handle_excepts(pcep_socket_comm_handle *socket_comm_handle)
393 {
394 /* TODO finish this */
395 (void)socket_comm_handle;
396 }
397
398
399 /* pcep_socket_comm::initialize_socket_comm_loop() will create a thread and
400 * invoke this method */
401 void *socket_comm_loop(void *data)
402 {
403 if (data == NULL) {
404 pcep_log(
405 LOG_WARNING,
406 "%s: Cannot start socket_comm_loop with NULL pcep_socketcomm_handle",
407 __func__);
408 return NULL;
409 }
410
411 pcep_log(LOG_NOTICE, "%s: [%ld-%ld] Starting socket_comm_loop thread",
412 __func__, time(NULL), pthread_self());
413
414 pcep_socket_comm_handle *socket_comm_handle =
415 (pcep_socket_comm_handle *)data;
416 struct timeval timer;
417 int max_fd;
418
419 while (socket_comm_handle->active) {
420 /* check the FD's every 1/4 sec, 250 milliseconds */
421 timer.tv_sec = 0;
422 timer.tv_usec = 250000;
423 max_fd = build_fd_sets(socket_comm_handle);
424
425 if (select(max_fd, &(socket_comm_handle->read_master_set),
426 &(socket_comm_handle->write_master_set),
427 &(socket_comm_handle->except_master_set), &timer)
428 < 0) {
429 /* TODO handle the error */
430 pcep_log(
431 LOG_WARNING,
432 "%s: ERROR socket_comm_loop on select : errno [%d][%s]",
433 __func__, errno, strerror(errno));
434 }
435
436 handle_reads(socket_comm_handle);
437 handle_writes(socket_comm_handle);
438 handle_excepts(socket_comm_handle);
439 }
440
441 pcep_log(LOG_NOTICE, "%s: [%ld-%ld] Finished socket_comm_loop thread",
442 __func__, time(NULL), pthread_self());
443
444 return NULL;
445 }
446
447 int pceplib_external_socket_read(int fd, void *payload)
448 {
449 pcep_socket_comm_handle *socket_comm_handle =
450 (pcep_socket_comm_handle *)payload;
451 if (socket_comm_handle == NULL) {
452 return -1;
453 }
454
455 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
456 FD_SET(fd, &(socket_comm_handle->read_master_set));
457 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
458
459 handle_reads(socket_comm_handle);
460
461 /* Get the socket_comm_session */
462 pcep_socket_comm_session find_session = {.socket_fd = fd};
463 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
464 ordered_list_node *node =
465 ordered_list_find(socket_comm_handle->read_list, &find_session);
466
467 /* read again */
468 if (node != NULL) {
469 socket_comm_handle->socket_read_func(
470 socket_comm_handle->external_infra_data,
471 &((pcep_socket_comm_session *)node)
472 ->external_socket_data,
473 fd, socket_comm_handle);
474 }
475 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
476
477 return 0;
478 }
479
480 int pceplib_external_socket_write(int fd, void *payload)
481 {
482 pcep_socket_comm_handle *socket_comm_handle =
483 (pcep_socket_comm_handle *)payload;
484 if (socket_comm_handle == NULL) {
485 return -1;
486 }
487
488 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
489 FD_SET(fd, &(socket_comm_handle->write_master_set));
490 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
491
492 handle_writes(socket_comm_handle);
493
494 /* TODO do we need to cancel this FD from writing?? */
495
496 return 0;
497 }