]> git.proxmox.com Git - mirror_frr.git/blob - pceplib/pcep_socket_comm_loop.c
Merge pull request #8257 from donaldsharp/fix_bgp_metric
[mirror_frr.git] / pceplib / pcep_socket_comm_loop.c
1 /*
2 * This file is part of the PCEPlib, a PCEP protocol library.
3 *
4 * Copyright (C) 2020 Volta Networks https://voltanet.io/
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <https://www.gnu.org/licenses/>.
18 *
19 * Author : Brady Johnson <brady@voltanet.io>
20 *
21 */
22
23
24 #include <errno.h>
25 #include <stdbool.h>
26 #include <stddef.h>
27 #include <string.h>
28 #include <unistd.h>
29
30 #include "pcep_socket_comm_internals.h"
31 #include "pcep_socket_comm_loop.h"
32 #include "pcep_utils_logging.h"
33 #include "pcep_utils_ordered_list.h"
34 #include "pcep_utils_logging.h"
35 #include "pcep_utils_memory.h"
36
37 void write_message(int socket_fd, const char *message, unsigned int msg_length);
38 unsigned int read_message(int socket_fd, char *received_message,
39 unsigned int max_message_size);
40 int build_fd_sets(pcep_socket_comm_handle *socket_comm_handle);
41 void handle_writes(pcep_socket_comm_handle *socket_comm_handle);
42 void handle_excepts(pcep_socket_comm_handle *socket_comm_handle);
43
44 bool comm_session_exists(pcep_socket_comm_handle *socket_comm_handle,
45 pcep_socket_comm_session *socket_comm_session)
46 {
47 if (socket_comm_handle == NULL) {
48 return false;
49 }
50
51 return (ordered_list_find(socket_comm_handle->session_list,
52 socket_comm_session)
53 != NULL);
54 }
55
56
57 bool comm_session_exists_locking(pcep_socket_comm_handle *socket_comm_handle,
58 pcep_socket_comm_session *socket_comm_session)
59 {
60 if (socket_comm_handle == NULL) {
61 return false;
62 }
63
64 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
65 bool exists =
66 comm_session_exists(socket_comm_handle, socket_comm_session);
67 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
68
69 return exists;
70 }
71
72
73 void write_message(int socket_fd, const char *message, unsigned int msg_length)
74 {
75 ssize_t bytes_sent = 0;
76 unsigned int total_bytes_sent = 0;
77
78 while ((uint32_t)bytes_sent < msg_length) {
79 bytes_sent = write(socket_fd, message + total_bytes_sent,
80 msg_length);
81
82 pcep_log(
83 LOG_INFO,
84 "%s: [%ld-%ld] socket_comm writing on socket fd [%d] msg_lenth [%u] bytes sent [%d]",
85 __func__, time(NULL), pthread_self(), socket_fd,
86 msg_length, bytes_sent);
87
88 if (bytes_sent < 0) {
89 if (errno != EAGAIN && errno != EWOULDBLOCK) {
90 pcep_log(LOG_WARNING, "%s: send() failure",
91 __func__);
92
93 return;
94 }
95 } else {
96 total_bytes_sent += bytes_sent;
97 }
98 }
99 }
100
101
102 unsigned int read_message(int socket_fd, char *received_message,
103 unsigned int max_message_size)
104 {
105 /* TODO what if bytes_read == max_message_size? there could be more to
106 * read */
107 unsigned int bytes_read =
108 read(socket_fd, received_message, max_message_size);
109 pcep_log(
110 LOG_INFO,
111 "%s: [%ld-%ld] socket_comm read message bytes_read [%u] on socket fd [%d]",
112 __func__, time(NULL), pthread_self(), bytes_read, socket_fd);
113
114 return bytes_read;
115 }
116
117
118 int build_fd_sets(pcep_socket_comm_handle *socket_comm_handle)
119 {
120 int max_fd = 0;
121
122 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
123
124 FD_ZERO(&socket_comm_handle->except_master_set);
125 FD_ZERO(&socket_comm_handle->read_master_set);
126 ordered_list_node *node = socket_comm_handle->read_list->head;
127 pcep_socket_comm_session *comm_session;
128 while (node != NULL) {
129 comm_session = (pcep_socket_comm_session *)node->data;
130 if (comm_session->socket_fd > max_fd) {
131 max_fd = comm_session->socket_fd;
132 }
133
134 /*pcep_log(LOG_DEBUG, ld] socket_comm::build_fdSets set
135 ready_toRead
136 [%d]", __func__, time(NULL), comm_session->socket_fd);*/
137 FD_SET(comm_session->socket_fd,
138 &socket_comm_handle->read_master_set);
139 FD_SET(comm_session->socket_fd,
140 &socket_comm_handle->except_master_set);
141 node = node->next_node;
142 }
143
144 FD_ZERO(&socket_comm_handle->write_master_set);
145 node = socket_comm_handle->write_list->head;
146 while (node != NULL) {
147 comm_session = (pcep_socket_comm_session *)node->data;
148 if (comm_session->socket_fd > max_fd) {
149 max_fd = comm_session->socket_fd;
150 }
151
152 /*pcep_log(LOG_DEBUG, "%s: [%ld] socket_comm::build_fdSets set
153 ready_toWrite [%d]", __func__, time(NULL),
154 comm_session->socket_fd);*/
155 FD_SET(comm_session->socket_fd,
156 &socket_comm_handle->write_master_set);
157 FD_SET(comm_session->socket_fd,
158 &socket_comm_handle->except_master_set);
159 node = node->next_node;
160 }
161
162 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
163
164 return max_fd + 1;
165 }
166
167
168 void handle_reads(pcep_socket_comm_handle *socket_comm_handle)
169 {
170
171 /*
172 * iterate all the socket_fd's in the read_list. it may be that not
173 * all of them have something to read. dont remove the socket_fd
174 * from the read_list since messages could come at any time.
175 */
176
177 /* Notice: Only locking the mutex when accessing the read_list,
178 * since the read callbacks may end up calling back into the socket
179 * comm module to write messages which could be a deadlock. */
180 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
181 ordered_list_node *node = socket_comm_handle->read_list->head;
182 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
183
184 while (node != NULL) {
185 pcep_socket_comm_session *comm_session =
186 (pcep_socket_comm_session *)node->data;
187
188 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
189 node = node->next_node;
190 if (!comm_session_exists(socket_comm_handle, comm_session)) {
191 /* This comm_session has been deleted, move on to the
192 * next one */
193 pthread_mutex_unlock(
194 &(socket_comm_handle->socket_comm_mutex));
195 continue;
196 }
197
198 int is_set = FD_ISSET(comm_session->socket_fd,
199 &(socket_comm_handle->read_master_set));
200 /* Upon read failure, the comm_session might be free'd, so we
201 * cant store the received_bytes in the comm_session, until we
202 * know the read was successful. */
203 int received_bytes = 0;
204 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
205
206 if (is_set) {
207 FD_CLR(comm_session->socket_fd,
208 &(socket_comm_handle->read_master_set));
209
210 /* either read the message locally, or call the
211 * message_ready_handler to read it */
212 if (comm_session->message_handler != NULL) {
213 received_bytes = read_message(
214 comm_session->socket_fd,
215 comm_session->received_message,
216 MAX_RECVD_MSG_SIZE);
217 if (received_bytes > 0) {
218 /* Send the received message to the
219 * handler */
220 comm_session->received_bytes =
221 received_bytes;
222 comm_session->message_handler(
223 comm_session->session_data,
224 comm_session->received_message,
225 comm_session->received_bytes);
226 }
227 } else {
228 /* Tell the handler a message is ready to be
229 * read. The comm_session may be destroyed in
230 * this call, if
231 * there is an error reading or if the socket is
232 * closed. */
233 received_bytes =
234 comm_session
235 ->message_ready_to_read_handler(
236 comm_session
237 ->session_data,
238 comm_session
239 ->socket_fd);
240 }
241
242 /* handle the read results */
243 if (received_bytes == 0) {
244 if (comm_session_exists_locking(
245 socket_comm_handle, comm_session)) {
246 comm_session->received_bytes = 0;
247 /* the socket was closed */
248 /* TODO should we define a socket except
249 * enum? or will the only time we call
250 * this is when the socket is closed??
251 */
252 if (comm_session->conn_except_notifier
253 != NULL) {
254 comm_session->conn_except_notifier(
255 comm_session
256 ->session_data,
257 comm_session
258 ->socket_fd);
259 }
260
261 /* stop reading from the socket if its
262 * closed */
263 pthread_mutex_lock(
264 &(socket_comm_handle
265 ->socket_comm_mutex));
266 ordered_list_remove_first_node_equals(
267 socket_comm_handle->read_list,
268 comm_session);
269 pthread_mutex_unlock(
270 &(socket_comm_handle
271 ->socket_comm_mutex));
272 }
273 } else if (received_bytes < 0) {
274 /* TODO should we call conn_except_notifier()
275 * here ? */
276 pcep_log(
277 LOG_WARNING,
278 "%s: Error on socket fd [%d] : errno [%d][%s]",
279 __func__, comm_session->socket_fd,
280 errno, strerror(errno));
281 } else {
282 comm_session->received_bytes = received_bytes;
283 }
284 }
285 }
286 }
287
288
289 void handle_writes(pcep_socket_comm_handle *socket_comm_handle)
290 {
291 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
292
293 /*
294 * iterate all the socket_fd's in the write_list. it may be that not
295 * all of them are ready to be written to. only remove the socket_fd
296 * from the list if it is ready to be written to.
297 */
298
299 ordered_list_node *node = socket_comm_handle->write_list->head;
300 pcep_socket_comm_session *comm_session;
301 bool msg_written;
302 while (node != NULL) {
303 comm_session = (pcep_socket_comm_session *)node->data;
304 node = node->next_node;
305 msg_written = false;
306
307 if (!comm_session_exists(socket_comm_handle, comm_session)) {
308 /* This comm_session has been deleted, move on to the
309 * next one */
310 continue;
311 }
312
313 if (FD_ISSET(comm_session->socket_fd,
314 &(socket_comm_handle->write_master_set))) {
315 /* only remove the entry from the list, if it is written
316 * to */
317 ordered_list_remove_first_node_equals(
318 socket_comm_handle->write_list, comm_session);
319 FD_CLR(comm_session->socket_fd,
320 &(socket_comm_handle->write_master_set));
321
322 /* dequeue all the comm_session messages and send them
323 */
324 pcep_socket_comm_queued_message *queued_message =
325 queue_dequeue(comm_session->message_queue);
326 while (queued_message != NULL) {
327 msg_written = true;
328 write_message(comm_session->socket_fd,
329 queued_message->encoded_message,
330 queued_message->msg_length);
331 if (queued_message->free_after_send) {
332 pceplib_free(PCEPLIB_MESSAGES,
333 (void *)queued_message
334 ->encoded_message);
335 }
336 pceplib_free(PCEPLIB_MESSAGES, queued_message);
337 queued_message = queue_dequeue(
338 comm_session->message_queue);
339 }
340 }
341
342 /* check if the socket should be closed after writing */
343 if (comm_session->close_after_write == true) {
344 if (comm_session->message_queue->num_entries == 0) {
345 /* TODO check to make sure modifying the
346 * write_list while iterating it doesnt cause
347 * problems. */
348 pcep_log(
349 LOG_DEBUG,
350 "%s: handle_writes close() socket fd [%d]",
351 __func__, comm_session->socket_fd);
352 ordered_list_remove_first_node_equals(
353 socket_comm_handle->read_list,
354 comm_session);
355 ordered_list_remove_first_node_equals(
356 socket_comm_handle->write_list,
357 comm_session);
358 close(comm_session->socket_fd);
359 comm_session->socket_fd = -1;
360 }
361 }
362
363 if (comm_session->message_sent_handler != NULL
364 && msg_written == true) {
365 /* Unlocking to allow the message_sent_handler to
366 * make calls like destroy_socket_comm_session */
367 pthread_mutex_unlock(
368 &(socket_comm_handle->socket_comm_mutex));
369 comm_session->message_sent_handler(
370 comm_session->session_data,
371 comm_session->socket_fd);
372 pthread_mutex_lock(
373 &(socket_comm_handle->socket_comm_mutex));
374 }
375 }
376
377 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
378 }
379
380
381 void handle_excepts(pcep_socket_comm_handle *socket_comm_handle)
382 {
383 /* TODO finish this */
384 (void)socket_comm_handle;
385 }
386
387
388 /* pcep_socket_comm::initialize_socket_comm_loop() will create a thread and
389 * invoke this method */
390 void *socket_comm_loop(void *data)
391 {
392 if (data == NULL) {
393 pcep_log(
394 LOG_WARNING,
395 "%s: Cannot start socket_comm_loop with NULL pcep_socketcomm_handle",
396 __func__);
397 return NULL;
398 }
399
400 pcep_log(LOG_NOTICE, "%s: [%ld-%ld] Starting socket_comm_loop thread",
401 __func__, time(NULL), pthread_self());
402
403 pcep_socket_comm_handle *socket_comm_handle =
404 (pcep_socket_comm_handle *)data;
405 struct timeval timer;
406 int max_fd;
407
408 while (socket_comm_handle->active) {
409 /* check the FD's every 1/4 sec, 250 milliseconds */
410 timer.tv_sec = 0;
411 timer.tv_usec = 250000;
412 max_fd = build_fd_sets(socket_comm_handle);
413
414 if (select(max_fd, &(socket_comm_handle->read_master_set),
415 &(socket_comm_handle->write_master_set),
416 &(socket_comm_handle->except_master_set), &timer)
417 < 0) {
418 /* TODO handle the error */
419 pcep_log(
420 LOG_WARNING,
421 "%s: ERROR socket_comm_loop on select : errno [%d][%s]",
422 __func__, errno, strerror(errno));
423 }
424
425 handle_reads(socket_comm_handle);
426 handle_writes(socket_comm_handle);
427 handle_excepts(socket_comm_handle);
428 }
429
430 pcep_log(LOG_NOTICE, "%s: [%ld-%ld] Finished socket_comm_loop thread",
431 __func__, time(NULL), pthread_self());
432
433 return NULL;
434 }
435
436 int pceplib_external_socket_read(int fd, void *payload)
437 {
438 pcep_socket_comm_handle *socket_comm_handle =
439 (pcep_socket_comm_handle *)payload;
440 if (socket_comm_handle == NULL) {
441 return -1;
442 }
443
444 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
445 FD_SET(fd, &(socket_comm_handle->read_master_set));
446 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
447
448 handle_reads(socket_comm_handle);
449
450 /* Get the socket_comm_session */
451 pcep_socket_comm_session find_session = {.socket_fd = fd};
452 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
453 ordered_list_node *node =
454 ordered_list_find(socket_comm_handle->read_list, &find_session);
455
456 /* read again */
457 if (node != NULL) {
458 socket_comm_handle->socket_read_func(
459 socket_comm_handle->external_infra_data,
460 &((pcep_socket_comm_session *)node)
461 ->external_socket_data,
462 fd, socket_comm_handle);
463 }
464 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
465
466 return 0;
467 }
468
469 int pceplib_external_socket_write(int fd, void *payload)
470 {
471 pcep_socket_comm_handle *socket_comm_handle =
472 (pcep_socket_comm_handle *)payload;
473 if (socket_comm_handle == NULL) {
474 return -1;
475 }
476
477 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
478 FD_SET(fd, &(socket_comm_handle->write_master_set));
479 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
480
481 handle_writes(socket_comm_handle);
482
483 /* TODO do we need to cancel this FD from writing?? */
484
485 return 0;
486 }