2 * This file is part of the PCEPlib, a PCEP protocol library.
4 * Copyright (C) 2020 Volta Networks https://voltanet.io/
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <https://www.gnu.org/licenses/>.
19 * Author : Brady Johnson <brady@voltanet.io>
30 #include "pcep_socket_comm_internals.h"
31 #include "pcep_socket_comm_loop.h"
32 #include "pcep_utils_logging.h"
33 #include "pcep_utils_ordered_list.h"
34 #include "pcep_utils_logging.h"
35 #include "pcep_utils_memory.h"
37 void write_message(int socket_fd
, const char *message
, unsigned int msg_length
);
38 unsigned int read_message(int socket_fd
, char *received_message
,
39 unsigned int max_message_size
);
40 int build_fd_sets(pcep_socket_comm_handle
*socket_comm_handle
);
41 void handle_writes(pcep_socket_comm_handle
*socket_comm_handle
);
42 void handle_excepts(pcep_socket_comm_handle
*socket_comm_handle
);
44 bool comm_session_exists(pcep_socket_comm_handle
*socket_comm_handle
,
45 pcep_socket_comm_session
*socket_comm_session
)
47 if (socket_comm_handle
== NULL
) {
51 return (ordered_list_find(socket_comm_handle
->session_list
,
57 bool comm_session_exists_locking(pcep_socket_comm_handle
*socket_comm_handle
,
58 pcep_socket_comm_session
*socket_comm_session
)
60 if (socket_comm_handle
== NULL
) {
64 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
66 comm_session_exists(socket_comm_handle
, socket_comm_session
);
67 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
73 void write_message(int socket_fd
, const char *message
, unsigned int msg_length
)
75 ssize_t bytes_sent
= 0;
76 unsigned int total_bytes_sent
= 0;
78 while ((uint32_t)bytes_sent
< msg_length
) {
79 bytes_sent
= write(socket_fd
, message
+ total_bytes_sent
,
84 "%s: [%ld-%ld] socket_comm writing on socket fd [%d] msg_lenth [%u] bytes sent [%d]",
85 __func__
, time(NULL
), pthread_self(), socket_fd
,
86 msg_length
, bytes_sent
);
89 if (errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) {
90 pcep_log(LOG_WARNING
, "%s: send() failure",
96 total_bytes_sent
+= bytes_sent
;
102 unsigned int read_message(int socket_fd
, char *received_message
,
103 unsigned int max_message_size
)
105 /* TODO what if bytes_read == max_message_size? there could be more to
107 unsigned int bytes_read
=
108 read(socket_fd
, received_message
, max_message_size
);
111 "%s: [%ld-%ld] socket_comm read message bytes_read [%u] on socket fd [%d]",
112 __func__
, time(NULL
), pthread_self(), bytes_read
, socket_fd
);
118 int build_fd_sets(pcep_socket_comm_handle
*socket_comm_handle
)
122 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
124 FD_ZERO(&socket_comm_handle
->except_master_set
);
125 FD_ZERO(&socket_comm_handle
->read_master_set
);
126 ordered_list_node
*node
= socket_comm_handle
->read_list
->head
;
127 pcep_socket_comm_session
*comm_session
;
128 while (node
!= NULL
) {
129 comm_session
= (pcep_socket_comm_session
*)node
->data
;
130 if (comm_session
->socket_fd
> max_fd
) {
131 max_fd
= comm_session
->socket_fd
;
134 /*pcep_log(LOG_DEBUG, ld] socket_comm::build_fdSets set
136 [%d]", __func__, time(NULL), comm_session->socket_fd);*/
137 FD_SET(comm_session
->socket_fd
,
138 &socket_comm_handle
->read_master_set
);
139 FD_SET(comm_session
->socket_fd
,
140 &socket_comm_handle
->except_master_set
);
141 node
= node
->next_node
;
144 FD_ZERO(&socket_comm_handle
->write_master_set
);
145 node
= socket_comm_handle
->write_list
->head
;
146 while (node
!= NULL
) {
147 comm_session
= (pcep_socket_comm_session
*)node
->data
;
148 if (comm_session
->socket_fd
> max_fd
) {
149 max_fd
= comm_session
->socket_fd
;
152 /*pcep_log(LOG_DEBUG, "%s: [%ld] socket_comm::build_fdSets set
153 ready_toWrite [%d]", __func__, time(NULL),
154 comm_session->socket_fd);*/
155 FD_SET(comm_session
->socket_fd
,
156 &socket_comm_handle
->write_master_set
);
157 FD_SET(comm_session
->socket_fd
,
158 &socket_comm_handle
->except_master_set
);
159 node
= node
->next_node
;
162 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
168 void handle_reads(pcep_socket_comm_handle
*socket_comm_handle
)
172 * iterate all the socket_fd's in the read_list. it may be that not
173 * all of them have something to read. dont remove the socket_fd
174 * from the read_list since messages could come at any time.
177 /* Notice: Only locking the mutex when accessing the read_list,
178 * since the read callbacks may end up calling back into the socket
179 * comm module to write messages which could be a deadlock. */
180 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
181 ordered_list_node
*node
= socket_comm_handle
->read_list
->head
;
182 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
184 while (node
!= NULL
) {
185 pcep_socket_comm_session
*comm_session
=
186 (pcep_socket_comm_session
*)node
->data
;
188 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
189 node
= node
->next_node
;
190 if (!comm_session_exists(socket_comm_handle
, comm_session
)) {
191 /* This comm_session has been deleted, move on to the
193 pthread_mutex_unlock(
194 &(socket_comm_handle
->socket_comm_mutex
));
198 int is_set
= FD_ISSET(comm_session
->socket_fd
,
199 &(socket_comm_handle
->read_master_set
));
200 /* Upon read failure, the comm_session might be free'd, so we
201 * cant store the received_bytes in the comm_session, until we
202 * know the read was successful. */
203 int received_bytes
= 0;
204 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
207 FD_CLR(comm_session
->socket_fd
,
208 &(socket_comm_handle
->read_master_set
));
210 /* either read the message locally, or call the
211 * message_ready_handler to read it */
212 if (comm_session
->message_handler
!= NULL
) {
213 received_bytes
= read_message(
214 comm_session
->socket_fd
,
215 comm_session
->received_message
,
217 if (received_bytes
> 0) {
218 /* Send the received message to the
220 comm_session
->received_bytes
=
222 comm_session
->message_handler(
223 comm_session
->session_data
,
224 comm_session
->received_message
,
225 comm_session
->received_bytes
);
228 /* Tell the handler a message is ready to be
229 * read. The comm_session may be destroyed in
231 * there is an error reading or if the socket is
235 ->message_ready_to_read_handler(
242 /* handle the read results */
243 if (received_bytes
== 0) {
244 if (comm_session_exists_locking(
245 socket_comm_handle
, comm_session
)) {
246 comm_session
->received_bytes
= 0;
247 /* the socket was closed */
248 /* TODO should we define a socket except
249 * enum? or will the only time we call
250 * this is when the socket is closed??
252 if (comm_session
->conn_except_notifier
254 comm_session
->conn_except_notifier(
261 /* stop reading from the socket if its
265 ->socket_comm_mutex
));
266 ordered_list_remove_first_node_equals(
267 socket_comm_handle
->read_list
,
269 pthread_mutex_unlock(
271 ->socket_comm_mutex
));
273 } else if (received_bytes
< 0) {
274 /* TODO should we call conn_except_notifier()
278 "%s: Error on socket fd [%d] : errno [%d][%s]",
279 __func__
, comm_session
->socket_fd
,
280 errno
, strerror(errno
));
282 comm_session
->received_bytes
= received_bytes
;
289 void handle_writes(pcep_socket_comm_handle
*socket_comm_handle
)
291 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
294 * iterate all the socket_fd's in the write_list. it may be that not
295 * all of them are ready to be written to. only remove the socket_fd
296 * from the list if it is ready to be written to.
299 ordered_list_node
*node
= socket_comm_handle
->write_list
->head
;
300 pcep_socket_comm_session
*comm_session
;
302 while (node
!= NULL
) {
303 comm_session
= (pcep_socket_comm_session
*)node
->data
;
304 node
= node
->next_node
;
307 if (!comm_session_exists(socket_comm_handle
, comm_session
)) {
308 /* This comm_session has been deleted, move on to the
313 if (FD_ISSET(comm_session
->socket_fd
,
314 &(socket_comm_handle
->write_master_set
))) {
315 /* only remove the entry from the list, if it is written
317 ordered_list_remove_first_node_equals(
318 socket_comm_handle
->write_list
, comm_session
);
319 FD_CLR(comm_session
->socket_fd
,
320 &(socket_comm_handle
->write_master_set
));
322 /* dequeue all the comm_session messages and send them
324 pcep_socket_comm_queued_message
*queued_message
=
325 queue_dequeue(comm_session
->message_queue
);
326 while (queued_message
!= NULL
) {
328 write_message(comm_session
->socket_fd
,
329 queued_message
->encoded_message
,
330 queued_message
->msg_length
);
331 if (queued_message
->free_after_send
) {
332 pceplib_free(PCEPLIB_MESSAGES
,
333 (void *)queued_message
336 pceplib_free(PCEPLIB_MESSAGES
, queued_message
);
337 queued_message
= queue_dequeue(
338 comm_session
->message_queue
);
342 /* check if the socket should be closed after writing */
343 if (comm_session
->close_after_write
== true) {
344 if (comm_session
->message_queue
->num_entries
== 0) {
345 /* TODO check to make sure modifying the
346 * write_list while iterating it doesnt cause
350 "%s: handle_writes close() socket fd [%d]",
351 __func__
, comm_session
->socket_fd
);
352 ordered_list_remove_first_node_equals(
353 socket_comm_handle
->read_list
,
355 ordered_list_remove_first_node_equals(
356 socket_comm_handle
->write_list
,
358 close(comm_session
->socket_fd
);
359 comm_session
->socket_fd
= -1;
363 if (comm_session
->message_sent_handler
!= NULL
364 && msg_written
== true) {
365 /* Unlocking to allow the message_sent_handler to
366 * make calls like destroy_socket_comm_session */
367 pthread_mutex_unlock(
368 &(socket_comm_handle
->socket_comm_mutex
));
369 comm_session
->message_sent_handler(
370 comm_session
->session_data
,
371 comm_session
->socket_fd
);
373 &(socket_comm_handle
->socket_comm_mutex
));
377 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
381 void handle_excepts(pcep_socket_comm_handle
*socket_comm_handle
)
383 /* TODO finish this */
384 (void)socket_comm_handle
;
388 /* pcep_socket_comm::initialize_socket_comm_loop() will create a thread and
389 * invoke this method */
390 void *socket_comm_loop(void *data
)
395 "%s: Cannot start socket_comm_loop with NULL pcep_socketcomm_handle",
400 pcep_log(LOG_NOTICE
, "%s: [%ld-%ld] Starting socket_comm_loop thread",
401 __func__
, time(NULL
), pthread_self());
403 pcep_socket_comm_handle
*socket_comm_handle
=
404 (pcep_socket_comm_handle
*)data
;
405 struct timeval timer
;
408 while (socket_comm_handle
->active
) {
409 /* check the FD's every 1/4 sec, 250 milliseconds */
411 timer
.tv_usec
= 250000;
412 max_fd
= build_fd_sets(socket_comm_handle
);
414 if (select(max_fd
, &(socket_comm_handle
->read_master_set
),
415 &(socket_comm_handle
->write_master_set
),
416 &(socket_comm_handle
->except_master_set
), &timer
)
418 /* TODO handle the error */
421 "%s: ERROR socket_comm_loop on select : errno [%d][%s]",
422 __func__
, errno
, strerror(errno
));
425 handle_reads(socket_comm_handle
);
426 handle_writes(socket_comm_handle
);
427 handle_excepts(socket_comm_handle
);
430 pcep_log(LOG_NOTICE
, "%s: [%ld-%ld] Finished socket_comm_loop thread",
431 __func__
, time(NULL
), pthread_self());
436 int pceplib_external_socket_read(int fd
, void *payload
)
438 pcep_socket_comm_handle
*socket_comm_handle
=
439 (pcep_socket_comm_handle
*)payload
;
440 if (socket_comm_handle
== NULL
) {
444 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
445 FD_SET(fd
, &(socket_comm_handle
->read_master_set
));
446 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
448 handle_reads(socket_comm_handle
);
450 /* Get the socket_comm_session */
451 pcep_socket_comm_session find_session
= {.socket_fd
= fd
};
452 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
453 ordered_list_node
*node
=
454 ordered_list_find(socket_comm_handle
->read_list
, &find_session
);
458 socket_comm_handle
->socket_read_func(
459 socket_comm_handle
->external_infra_data
,
460 &((pcep_socket_comm_session
*)node
)
461 ->external_socket_data
,
462 fd
, socket_comm_handle
);
464 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
469 int pceplib_external_socket_write(int fd
, void *payload
)
471 pcep_socket_comm_handle
*socket_comm_handle
=
472 (pcep_socket_comm_handle
*)payload
;
473 if (socket_comm_handle
== NULL
) {
477 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
478 FD_SET(fd
, &(socket_comm_handle
->write_master_set
));
479 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
481 handle_writes(socket_comm_handle
);
483 /* TODO do we need to cancel this FD from writing?? */