]> git.proxmox.com Git - mirror_frr.git/blob - pceplib/pcep_socket_comm_loop.c
Merge pull request #12798 from donaldsharp/rib_match_multicast
[mirror_frr.git] / pceplib / pcep_socket_comm_loop.c
1 // SPDX-License-Identifier: LGPL-2.1-or-later
2 /*
3 * This file is part of the PCEPlib, a PCEP protocol library.
4 *
5 * Copyright (C) 2020 Volta Networks https://voltanet.io/
6 *
7 * Author : Brady Johnson <brady@voltanet.io>
8 *
9 */
10
11
12 #ifdef HAVE_CONFIG_H
13 #include "config.h"
14 #endif
15
16 #include <errno.h>
17 #include <stdbool.h>
18 #include <stddef.h>
19 #include <string.h>
20 #include <unistd.h>
21 #include <assert.h>
22
23 #include "pcep_socket_comm_internals.h"
24 #include "pcep_socket_comm_loop.h"
25 #include "pcep_utils_logging.h"
26 #include "pcep_utils_ordered_list.h"
27 #include "pcep_utils_logging.h"
28 #include "pcep_utils_memory.h"
29
30 void write_message(int socket_fd, const char *message, unsigned int msg_length);
31 unsigned int read_message(int socket_fd, char *received_message,
32 unsigned int max_message_size);
33 int build_fd_sets(pcep_socket_comm_handle *socket_comm_handle);
34 void handle_writes(pcep_socket_comm_handle *socket_comm_handle);
35 void handle_excepts(pcep_socket_comm_handle *socket_comm_handle);
36
37 bool comm_session_exists(pcep_socket_comm_handle *socket_comm_handle,
38 pcep_socket_comm_session *socket_comm_session)
39 {
40 if (socket_comm_handle == NULL) {
41 return false;
42 }
43
44 return (ordered_list_find(socket_comm_handle->session_list,
45 socket_comm_session)
46 != NULL);
47 }
48
49
50 bool comm_session_exists_locking(pcep_socket_comm_handle *socket_comm_handle,
51 pcep_socket_comm_session *socket_comm_session)
52 {
53 if (socket_comm_handle == NULL) {
54 return false;
55 }
56
57 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
58 bool exists =
59 comm_session_exists(socket_comm_handle, socket_comm_session);
60 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
61
62 return exists;
63 }
64
65
66 void write_message(int socket_fd, const char *message, unsigned int msg_length)
67 {
68 ssize_t bytes_sent = 0;
69 unsigned int total_bytes_sent = 0;
70
71 while ((uint32_t)bytes_sent < msg_length) {
72 bytes_sent = write(socket_fd, message + total_bytes_sent,
73 msg_length);
74
75 pcep_log(
76 LOG_INFO,
77 "%s: [%ld-%ld] socket_comm writing on socket fd [%d] msg_lenth [%u] bytes sent [%d]",
78 __func__, time(NULL), pthread_self(), socket_fd,
79 msg_length, bytes_sent);
80
81 if (bytes_sent < 0) {
82 if (errno != EAGAIN && errno != EWOULDBLOCK) {
83 pcep_log(LOG_WARNING, "%s: send() failure",
84 __func__);
85
86 return;
87 }
88 } else {
89 total_bytes_sent += bytes_sent;
90 }
91 }
92 }
93
94
95 unsigned int read_message(int socket_fd, char *received_message,
96 unsigned int max_message_size)
97 {
98 /* TODO what if bytes_read == max_message_size? there could be more to
99 * read */
100 unsigned int bytes_read =
101 read(socket_fd, received_message, max_message_size);
102 pcep_log(
103 LOG_INFO,
104 "%s: [%ld-%ld] socket_comm read message bytes_read [%u] on socket fd [%d]",
105 __func__, time(NULL), pthread_self(), bytes_read, socket_fd);
106
107 return bytes_read;
108 }
109
110
111 int build_fd_sets(pcep_socket_comm_handle *socket_comm_handle)
112 {
113 int max_fd = 0;
114
115 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
116
117 FD_ZERO(&socket_comm_handle->except_master_set);
118 FD_ZERO(&socket_comm_handle->read_master_set);
119 ordered_list_node *node = socket_comm_handle->read_list->head;
120 pcep_socket_comm_session *comm_session;
121 while (node != NULL) {
122 comm_session = (pcep_socket_comm_session *)node->data;
123 if (comm_session->socket_fd > max_fd) {
124 max_fd = comm_session->socket_fd;
125 } else if (comm_session->socket_fd < 0) {
126 pcep_log(LOG_ERR, "%s: Negative fd", __func__);
127 assert(comm_session->socket_fd > 0);
128 }
129
130 /*pcep_log(LOG_DEBUG, ld] socket_comm::build_fdSets set
131 ready_toRead
132 [%d]", __func__, time(NULL), comm_session->socket_fd);*/
133 FD_SET(comm_session->socket_fd,
134 &socket_comm_handle->read_master_set);
135 FD_SET(comm_session->socket_fd,
136 &socket_comm_handle->except_master_set);
137 node = node->next_node;
138 }
139
140 FD_ZERO(&socket_comm_handle->write_master_set);
141 node = socket_comm_handle->write_list->head;
142 while (node != NULL) {
143 comm_session = (pcep_socket_comm_session *)node->data;
144 if (comm_session->socket_fd > max_fd) {
145 max_fd = comm_session->socket_fd;
146 } else if (comm_session->socket_fd < 0) {
147 pcep_log(LOG_ERR, "%s: Negative fd", __func__);
148 assert(comm_session->socket_fd > 0);
149 }
150
151 /*pcep_log(LOG_DEBUG, "%s: [%ld] socket_comm::build_fdSets set
152 ready_toWrite [%d]", __func__, time(NULL),
153 comm_session->socket_fd);*/
154 FD_SET(comm_session->socket_fd,
155 &socket_comm_handle->write_master_set);
156 FD_SET(comm_session->socket_fd,
157 &socket_comm_handle->except_master_set);
158 node = node->next_node;
159 }
160
161 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
162
163 return max_fd + 1;
164 }
165
166
167 void handle_reads(pcep_socket_comm_handle *socket_comm_handle)
168 {
169
170 /*
171 * iterate all the socket_fd's in the read_list. it may be that not
172 * all of them have something to read. dont remove the socket_fd
173 * from the read_list since messages could come at any time.
174 */
175
176 /* Notice: Only locking the mutex when accessing the read_list,
177 * since the read callbacks may end up calling back into the socket
178 * comm module to write messages which could be a deadlock. */
179 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
180 ordered_list_node *node = socket_comm_handle->read_list->head;
181 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
182
183 while (node != NULL) {
184 pcep_socket_comm_session *comm_session =
185 (pcep_socket_comm_session *)node->data;
186
187 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
188 node = node->next_node;
189 if (!comm_session_exists(socket_comm_handle, comm_session)) {
190 /* This comm_session has been deleted, move on to the
191 * next one */
192 pthread_mutex_unlock(
193 &(socket_comm_handle->socket_comm_mutex));
194 continue;
195 }
196
197 int is_set = FD_ISSET(comm_session->socket_fd,
198 &(socket_comm_handle->read_master_set));
199 /* Upon read failure, the comm_session might be free'd, so we
200 * cant store the received_bytes in the comm_session, until we
201 * know the read was successful. */
202 int received_bytes = 0;
203 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
204
205 if (is_set) {
206 FD_CLR(comm_session->socket_fd,
207 &(socket_comm_handle->read_master_set));
208
209 /* either read the message locally, or call the
210 * message_ready_handler to read it */
211 if (comm_session->message_handler != NULL) {
212 received_bytes = read_message(
213 comm_session->socket_fd,
214 comm_session->received_message,
215 MAX_RECVD_MSG_SIZE);
216 if (received_bytes > 0) {
217 /* Send the received message to the
218 * handler */
219 comm_session->received_bytes =
220 received_bytes;
221 comm_session->message_handler(
222 comm_session->session_data,
223 comm_session->received_message,
224 comm_session->received_bytes);
225 }
226 } else {
227 /* Tell the handler a message is ready to be
228 * read. The comm_session may be destroyed in
229 * this call, if
230 * there is an error reading or if the socket is
231 * closed. */
232 received_bytes =
233 comm_session
234 ->message_ready_to_read_handler(
235 comm_session
236 ->session_data,
237 comm_session
238 ->socket_fd);
239 }
240
241 /* handle the read results */
242 if (received_bytes == 0) {
243 if (comm_session_exists_locking(
244 socket_comm_handle, comm_session)) {
245 comm_session->received_bytes = 0;
246 /* the socket was closed */
247 /* TODO should we define a socket except
248 * enum? or will the only time we call
249 * this is when the socket is closed??
250 */
251 if (comm_session->conn_except_notifier
252 != NULL) {
253 comm_session->conn_except_notifier(
254 comm_session
255 ->session_data,
256 comm_session
257 ->socket_fd);
258 }
259
260 /* stop reading from the socket if its
261 * closed */
262 pthread_mutex_lock(
263 &(socket_comm_handle
264 ->socket_comm_mutex));
265 ordered_list_remove_first_node_equals(
266 socket_comm_handle->read_list,
267 comm_session);
268 pthread_mutex_unlock(
269 &(socket_comm_handle
270 ->socket_comm_mutex));
271 }
272 } else if (received_bytes < 0) {
273 /* TODO should we call conn_except_notifier()
274 * here ? */
275 pcep_log(
276 LOG_WARNING,
277 "%s: Error on socket fd [%d] : errno [%d][%s]",
278 __func__, comm_session->socket_fd,
279 errno, strerror(errno));
280 } else {
281 comm_session->received_bytes = received_bytes;
282 }
283 }
284 }
285 }
286
287
288 void handle_writes(pcep_socket_comm_handle *socket_comm_handle)
289 {
290 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
291
292 /*
293 * iterate all the socket_fd's in the write_list. it may be that not
294 * all of them are ready to be written to. only remove the socket_fd
295 * from the list if it is ready to be written to.
296 */
297
298 ordered_list_node *node = socket_comm_handle->write_list->head;
299 pcep_socket_comm_session *comm_session;
300 bool msg_written;
301 while (node != NULL) {
302 comm_session = (pcep_socket_comm_session *)node->data;
303 node = node->next_node;
304 msg_written = false;
305
306 if (!comm_session_exists(socket_comm_handle, comm_session)) {
307 /* This comm_session has been deleted, move on to the
308 * next one */
309 continue;
310 }
311
312 if (FD_ISSET(comm_session->socket_fd,
313 &(socket_comm_handle->write_master_set))) {
314 /* only remove the entry from the list, if it is written
315 * to */
316 ordered_list_remove_first_node_equals(
317 socket_comm_handle->write_list, comm_session);
318 FD_CLR(comm_session->socket_fd,
319 &(socket_comm_handle->write_master_set));
320
321 /* dequeue all the comm_session messages and send them
322 */
323 pcep_socket_comm_queued_message *queued_message =
324 queue_dequeue(comm_session->message_queue);
325 while (queued_message != NULL) {
326 msg_written = true;
327 write_message(comm_session->socket_fd,
328 queued_message->encoded_message,
329 queued_message->msg_length);
330 if (queued_message->free_after_send) {
331 pceplib_free(PCEPLIB_MESSAGES,
332 (void *)queued_message
333 ->encoded_message);
334 }
335 pceplib_free(PCEPLIB_MESSAGES, queued_message);
336 queued_message = queue_dequeue(
337 comm_session->message_queue);
338 }
339 }
340
341 /* check if the socket should be closed after writing */
342 if (comm_session->close_after_write == true) {
343 if (comm_session->message_queue->num_entries == 0) {
344 /* TODO check to make sure modifying the
345 * write_list while iterating it doesn't cause
346 * problems. */
347 pcep_log(
348 LOG_DEBUG,
349 "%s: handle_writes close() socket fd [%d]",
350 __func__, comm_session->socket_fd);
351 ordered_list_remove_first_node_equals(
352 socket_comm_handle->read_list,
353 comm_session);
354 ordered_list_remove_first_node_equals(
355 socket_comm_handle->write_list,
356 comm_session);
357 close(comm_session->socket_fd);
358 comm_session->socket_fd = -1;
359 }
360 }
361
362 if (comm_session->message_sent_handler != NULL
363 && msg_written == true) {
364 /* Unlocking to allow the message_sent_handler to
365 * make calls like destroy_socket_comm_session */
366 pthread_mutex_unlock(
367 &(socket_comm_handle->socket_comm_mutex));
368 comm_session->message_sent_handler(
369 comm_session->session_data,
370 comm_session->socket_fd);
371 pthread_mutex_lock(
372 &(socket_comm_handle->socket_comm_mutex));
373 }
374 }
375
376 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
377 }
378
379
380 void handle_excepts(pcep_socket_comm_handle *socket_comm_handle)
381 {
382 /* TODO finish this */
383 (void)socket_comm_handle;
384 }
385
386
387 /* pcep_socket_comm::initialize_socket_comm_loop() will create a thread and
388 * invoke this method */
389 void *socket_comm_loop(void *data)
390 {
391 if (data == NULL) {
392 pcep_log(
393 LOG_WARNING,
394 "%s: Cannot start socket_comm_loop with NULL pcep_socketcomm_handle",
395 __func__);
396 return NULL;
397 }
398
399 pcep_log(LOG_NOTICE, "%s: [%ld-%ld] Starting socket_comm_loop thread",
400 __func__, time(NULL), pthread_self());
401
402 pcep_socket_comm_handle *socket_comm_handle =
403 (pcep_socket_comm_handle *)data;
404 struct timeval timer;
405 int max_fd;
406
407 while (socket_comm_handle->active) {
408 /* check the FD's every 1/4 sec, 250 milliseconds */
409 timer.tv_sec = 0;
410 timer.tv_usec = 250000;
411 max_fd = build_fd_sets(socket_comm_handle);
412
413 if (select(max_fd, &(socket_comm_handle->read_master_set),
414 &(socket_comm_handle->write_master_set),
415 &(socket_comm_handle->except_master_set), &timer)
416 < 0) {
417 /* TODO handle the error */
418 pcep_log(
419 LOG_WARNING,
420 "%s: ERROR socket_comm_loop on select : errno [%d][%s]",
421 __func__, errno, strerror(errno));
422 }
423
424 handle_reads(socket_comm_handle);
425 handle_writes(socket_comm_handle);
426 handle_excepts(socket_comm_handle);
427 }
428
429 pcep_log(LOG_NOTICE, "%s: [%ld-%ld] Finished socket_comm_loop thread",
430 __func__, time(NULL), pthread_self());
431
432 return NULL;
433 }
434
435 int pceplib_external_socket_read(int fd, void *payload)
436 {
437 pcep_socket_comm_handle *socket_comm_handle =
438 (pcep_socket_comm_handle *)payload;
439 if (socket_comm_handle == NULL) {
440 return -1;
441 }
442
443 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
444 FD_SET(fd, &(socket_comm_handle->read_master_set));
445 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
446
447 handle_reads(socket_comm_handle);
448
449 /* Get the socket_comm_session */
450 pcep_socket_comm_session find_session = {.socket_fd = fd};
451 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
452 ordered_list_node *node =
453 ordered_list_find(socket_comm_handle->read_list, &find_session);
454
455 /* read again */
456 if (node != NULL) {
457 socket_comm_handle->socket_read_func(
458 socket_comm_handle->external_infra_data,
459 &((pcep_socket_comm_session *)node)
460 ->external_socket_data,
461 fd, socket_comm_handle);
462 }
463 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
464
465 return 0;
466 }
467
468 int pceplib_external_socket_write(int fd, void *payload)
469 {
470 pcep_socket_comm_handle *socket_comm_handle =
471 (pcep_socket_comm_handle *)payload;
472 if (socket_comm_handle == NULL) {
473 return -1;
474 }
475
476 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
477 FD_SET(fd, &(socket_comm_handle->write_master_set));
478 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
479
480 handle_writes(socket_comm_handle);
481
482 /* TODO do we need to cancel this FD from writing?? */
483
484 return 0;
485 }