1 // SPDX-License-Identifier: LGPL-2.1-or-later
3 * This file is part of the PCEPlib, a PCEP protocol library.
5 * Copyright (C) 2020 Volta Networks https://voltanet.io/
7 * Author : Brady Johnson <brady@voltanet.io>
23 #include "pcep_socket_comm_internals.h"
24 #include "pcep_socket_comm_loop.h"
25 #include "pcep_utils_logging.h"
26 #include "pcep_utils_ordered_list.h"
27 #include "pcep_utils_logging.h"
28 #include "pcep_utils_memory.h"
30 void write_message(int socket_fd
, const char *message
, unsigned int msg_length
);
31 unsigned int read_message(int socket_fd
, char *received_message
,
32 unsigned int max_message_size
);
33 int build_fd_sets(pcep_socket_comm_handle
*socket_comm_handle
);
34 void handle_writes(pcep_socket_comm_handle
*socket_comm_handle
);
35 void handle_excepts(pcep_socket_comm_handle
*socket_comm_handle
);
37 bool comm_session_exists(pcep_socket_comm_handle
*socket_comm_handle
,
38 pcep_socket_comm_session
*socket_comm_session
)
40 if (socket_comm_handle
== NULL
) {
44 return (ordered_list_find(socket_comm_handle
->session_list
,
50 bool comm_session_exists_locking(pcep_socket_comm_handle
*socket_comm_handle
,
51 pcep_socket_comm_session
*socket_comm_session
)
53 if (socket_comm_handle
== NULL
) {
57 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
59 comm_session_exists(socket_comm_handle
, socket_comm_session
);
60 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
66 void write_message(int socket_fd
, const char *message
, unsigned int msg_length
)
68 ssize_t bytes_sent
= 0;
69 unsigned int total_bytes_sent
= 0;
71 while ((uint32_t)bytes_sent
< msg_length
) {
72 bytes_sent
= write(socket_fd
, message
+ total_bytes_sent
,
77 "%s: [%ld-%ld] socket_comm writing on socket fd [%d] msg_lenth [%u] bytes sent [%d]",
78 __func__
, time(NULL
), pthread_self(), socket_fd
,
79 msg_length
, bytes_sent
);
82 if (errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) {
83 pcep_log(LOG_WARNING
, "%s: send() failure",
89 total_bytes_sent
+= bytes_sent
;
95 unsigned int read_message(int socket_fd
, char *received_message
,
96 unsigned int max_message_size
)
98 /* TODO what if bytes_read == max_message_size? there could be more to
100 unsigned int bytes_read
=
101 read(socket_fd
, received_message
, max_message_size
);
104 "%s: [%ld-%ld] socket_comm read message bytes_read [%u] on socket fd [%d]",
105 __func__
, time(NULL
), pthread_self(), bytes_read
, socket_fd
);
111 int build_fd_sets(pcep_socket_comm_handle
*socket_comm_handle
)
115 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
117 FD_ZERO(&socket_comm_handle
->except_master_set
);
118 FD_ZERO(&socket_comm_handle
->read_master_set
);
119 ordered_list_node
*node
= socket_comm_handle
->read_list
->head
;
120 pcep_socket_comm_session
*comm_session
;
121 while (node
!= NULL
) {
122 comm_session
= (pcep_socket_comm_session
*)node
->data
;
123 if (comm_session
->socket_fd
> max_fd
) {
124 max_fd
= comm_session
->socket_fd
;
125 } else if (comm_session
->socket_fd
< 0) {
126 pcep_log(LOG_ERR
, "%s: Negative fd", __func__
);
127 assert(comm_session
->socket_fd
> 0);
130 /*pcep_log(LOG_DEBUG, ld] socket_comm::build_fdSets set
132 [%d]", __func__, time(NULL), comm_session->socket_fd);*/
133 FD_SET(comm_session
->socket_fd
,
134 &socket_comm_handle
->read_master_set
);
135 FD_SET(comm_session
->socket_fd
,
136 &socket_comm_handle
->except_master_set
);
137 node
= node
->next_node
;
140 FD_ZERO(&socket_comm_handle
->write_master_set
);
141 node
= socket_comm_handle
->write_list
->head
;
142 while (node
!= NULL
) {
143 comm_session
= (pcep_socket_comm_session
*)node
->data
;
144 if (comm_session
->socket_fd
> max_fd
) {
145 max_fd
= comm_session
->socket_fd
;
146 } else if (comm_session
->socket_fd
< 0) {
147 pcep_log(LOG_ERR
, "%s: Negative fd", __func__
);
148 assert(comm_session
->socket_fd
> 0);
151 /*pcep_log(LOG_DEBUG, "%s: [%ld] socket_comm::build_fdSets set
152 ready_toWrite [%d]", __func__, time(NULL),
153 comm_session->socket_fd);*/
154 FD_SET(comm_session
->socket_fd
,
155 &socket_comm_handle
->write_master_set
);
156 FD_SET(comm_session
->socket_fd
,
157 &socket_comm_handle
->except_master_set
);
158 node
= node
->next_node
;
161 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
167 void handle_reads(pcep_socket_comm_handle
*socket_comm_handle
)
171 * iterate all the socket_fd's in the read_list. it may be that not
172 * all of them have something to read. dont remove the socket_fd
173 * from the read_list since messages could come at any time.
176 /* Notice: Only locking the mutex when accessing the read_list,
177 * since the read callbacks may end up calling back into the socket
178 * comm module to write messages which could be a deadlock. */
179 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
180 ordered_list_node
*node
= socket_comm_handle
->read_list
->head
;
181 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
183 while (node
!= NULL
) {
184 pcep_socket_comm_session
*comm_session
=
185 (pcep_socket_comm_session
*)node
->data
;
187 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
188 node
= node
->next_node
;
189 if (!comm_session_exists(socket_comm_handle
, comm_session
)) {
190 /* This comm_session has been deleted, move on to the
192 pthread_mutex_unlock(
193 &(socket_comm_handle
->socket_comm_mutex
));
197 int is_set
= FD_ISSET(comm_session
->socket_fd
,
198 &(socket_comm_handle
->read_master_set
));
199 /* Upon read failure, the comm_session might be free'd, so we
200 * cant store the received_bytes in the comm_session, until we
201 * know the read was successful. */
202 int received_bytes
= 0;
203 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
206 FD_CLR(comm_session
->socket_fd
,
207 &(socket_comm_handle
->read_master_set
));
209 /* either read the message locally, or call the
210 * message_ready_handler to read it */
211 if (comm_session
->message_handler
!= NULL
) {
212 received_bytes
= read_message(
213 comm_session
->socket_fd
,
214 comm_session
->received_message
,
216 if (received_bytes
> 0) {
217 /* Send the received message to the
219 comm_session
->received_bytes
=
221 comm_session
->message_handler(
222 comm_session
->session_data
,
223 comm_session
->received_message
,
224 comm_session
->received_bytes
);
227 /* Tell the handler a message is ready to be
228 * read. The comm_session may be destroyed in
230 * there is an error reading or if the socket is
234 ->message_ready_to_read_handler(
241 /* handle the read results */
242 if (received_bytes
== 0) {
243 if (comm_session_exists_locking(
244 socket_comm_handle
, comm_session
)) {
245 comm_session
->received_bytes
= 0;
246 /* the socket was closed */
247 /* TODO should we define a socket except
248 * enum? or will the only time we call
249 * this is when the socket is closed??
251 if (comm_session
->conn_except_notifier
253 comm_session
->conn_except_notifier(
260 /* stop reading from the socket if its
264 ->socket_comm_mutex
));
265 ordered_list_remove_first_node_equals(
266 socket_comm_handle
->read_list
,
268 pthread_mutex_unlock(
270 ->socket_comm_mutex
));
272 } else if (received_bytes
< 0) {
273 /* TODO should we call conn_except_notifier()
277 "%s: Error on socket fd [%d] : errno [%d][%s]",
278 __func__
, comm_session
->socket_fd
,
279 errno
, strerror(errno
));
281 comm_session
->received_bytes
= received_bytes
;
288 void handle_writes(pcep_socket_comm_handle
*socket_comm_handle
)
290 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
293 * iterate all the socket_fd's in the write_list. it may be that not
294 * all of them are ready to be written to. only remove the socket_fd
295 * from the list if it is ready to be written to.
298 ordered_list_node
*node
= socket_comm_handle
->write_list
->head
;
299 pcep_socket_comm_session
*comm_session
;
301 while (node
!= NULL
) {
302 comm_session
= (pcep_socket_comm_session
*)node
->data
;
303 node
= node
->next_node
;
306 if (!comm_session_exists(socket_comm_handle
, comm_session
)) {
307 /* This comm_session has been deleted, move on to the
312 if (FD_ISSET(comm_session
->socket_fd
,
313 &(socket_comm_handle
->write_master_set
))) {
314 /* only remove the entry from the list, if it is written
316 ordered_list_remove_first_node_equals(
317 socket_comm_handle
->write_list
, comm_session
);
318 FD_CLR(comm_session
->socket_fd
,
319 &(socket_comm_handle
->write_master_set
));
321 /* dequeue all the comm_session messages and send them
323 pcep_socket_comm_queued_message
*queued_message
=
324 queue_dequeue(comm_session
->message_queue
);
325 while (queued_message
!= NULL
) {
327 write_message(comm_session
->socket_fd
,
328 queued_message
->encoded_message
,
329 queued_message
->msg_length
);
330 if (queued_message
->free_after_send
) {
331 pceplib_free(PCEPLIB_MESSAGES
,
332 (void *)queued_message
335 pceplib_free(PCEPLIB_MESSAGES
, queued_message
);
336 queued_message
= queue_dequeue(
337 comm_session
->message_queue
);
341 /* check if the socket should be closed after writing */
342 if (comm_session
->close_after_write
== true) {
343 if (comm_session
->message_queue
->num_entries
== 0) {
344 /* TODO check to make sure modifying the
345 * write_list while iterating it doesn't cause
349 "%s: handle_writes close() socket fd [%d]",
350 __func__
, comm_session
->socket_fd
);
351 ordered_list_remove_first_node_equals(
352 socket_comm_handle
->read_list
,
354 ordered_list_remove_first_node_equals(
355 socket_comm_handle
->write_list
,
357 close(comm_session
->socket_fd
);
358 comm_session
->socket_fd
= -1;
362 if (comm_session
->message_sent_handler
!= NULL
363 && msg_written
== true) {
364 /* Unlocking to allow the message_sent_handler to
365 * make calls like destroy_socket_comm_session */
366 pthread_mutex_unlock(
367 &(socket_comm_handle
->socket_comm_mutex
));
368 comm_session
->message_sent_handler(
369 comm_session
->session_data
,
370 comm_session
->socket_fd
);
372 &(socket_comm_handle
->socket_comm_mutex
));
376 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
380 void handle_excepts(pcep_socket_comm_handle
*socket_comm_handle
)
382 /* TODO finish this */
383 (void)socket_comm_handle
;
387 /* pcep_socket_comm::initialize_socket_comm_loop() will create a thread and
388 * invoke this method */
389 void *socket_comm_loop(void *data
)
394 "%s: Cannot start socket_comm_loop with NULL pcep_socketcomm_handle",
399 pcep_log(LOG_NOTICE
, "%s: [%ld-%ld] Starting socket_comm_loop thread",
400 __func__
, time(NULL
), pthread_self());
402 pcep_socket_comm_handle
*socket_comm_handle
=
403 (pcep_socket_comm_handle
*)data
;
404 struct timeval timer
;
407 while (socket_comm_handle
->active
) {
408 /* check the FD's every 1/4 sec, 250 milliseconds */
410 timer
.tv_usec
= 250000;
411 max_fd
= build_fd_sets(socket_comm_handle
);
413 if (select(max_fd
, &(socket_comm_handle
->read_master_set
),
414 &(socket_comm_handle
->write_master_set
),
415 &(socket_comm_handle
->except_master_set
), &timer
)
417 /* TODO handle the error */
420 "%s: ERROR socket_comm_loop on select : errno [%d][%s]",
421 __func__
, errno
, strerror(errno
));
424 handle_reads(socket_comm_handle
);
425 handle_writes(socket_comm_handle
);
426 handle_excepts(socket_comm_handle
);
429 pcep_log(LOG_NOTICE
, "%s: [%ld-%ld] Finished socket_comm_loop thread",
430 __func__
, time(NULL
), pthread_self());
435 int pceplib_external_socket_read(int fd
, void *payload
)
437 pcep_socket_comm_handle
*socket_comm_handle
=
438 (pcep_socket_comm_handle
*)payload
;
439 if (socket_comm_handle
== NULL
) {
443 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
444 FD_SET(fd
, &(socket_comm_handle
->read_master_set
));
445 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
447 handle_reads(socket_comm_handle
);
449 /* Get the socket_comm_session */
450 pcep_socket_comm_session find_session
= {.socket_fd
= fd
};
451 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
452 ordered_list_node
*node
=
453 ordered_list_find(socket_comm_handle
->read_list
, &find_session
);
457 socket_comm_handle
->socket_read_func(
458 socket_comm_handle
->external_infra_data
,
459 &((pcep_socket_comm_session
*)node
)
460 ->external_socket_data
,
461 fd
, socket_comm_handle
);
463 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
468 int pceplib_external_socket_write(int fd
, void *payload
)
470 pcep_socket_comm_handle
*socket_comm_handle
=
471 (pcep_socket_comm_handle
*)payload
;
472 if (socket_comm_handle
== NULL
) {
476 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
477 FD_SET(fd
, &(socket_comm_handle
->write_master_set
));
478 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
480 handle_writes(socket_comm_handle
);
482 /* TODO do we need to cancel this FD from writing?? */