2 * This file is part of the PCEPlib, a PCEP protocol library.
4 * Copyright (C) 2020 Volta Networks https://voltanet.io/
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <https://www.gnu.org/licenses/>.
19 * Author : Brady Johnson <brady@voltanet.io>
31 #include "pcep_socket_comm_internals.h"
32 #include "pcep_socket_comm_loop.h"
33 #include "pcep_utils_logging.h"
34 #include "pcep_utils_ordered_list.h"
35 #include "pcep_utils_logging.h"
36 #include "pcep_utils_memory.h"
38 void write_message(int socket_fd
, const char *message
, unsigned int msg_length
);
39 unsigned int read_message(int socket_fd
, char *received_message
,
40 unsigned int max_message_size
);
41 int build_fd_sets(pcep_socket_comm_handle
*socket_comm_handle
);
42 void handle_writes(pcep_socket_comm_handle
*socket_comm_handle
);
43 void handle_excepts(pcep_socket_comm_handle
*socket_comm_handle
);
45 bool comm_session_exists(pcep_socket_comm_handle
*socket_comm_handle
,
46 pcep_socket_comm_session
*socket_comm_session
)
48 if (socket_comm_handle
== NULL
) {
52 return (ordered_list_find(socket_comm_handle
->session_list
,
58 bool comm_session_exists_locking(pcep_socket_comm_handle
*socket_comm_handle
,
59 pcep_socket_comm_session
*socket_comm_session
)
61 if (socket_comm_handle
== NULL
) {
65 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
67 comm_session_exists(socket_comm_handle
, socket_comm_session
);
68 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
74 void write_message(int socket_fd
, const char *message
, unsigned int msg_length
)
76 ssize_t bytes_sent
= 0;
77 unsigned int total_bytes_sent
= 0;
79 while ((uint32_t)bytes_sent
< msg_length
) {
80 bytes_sent
= write(socket_fd
, message
+ total_bytes_sent
,
85 "%s: [%ld-%ld] socket_comm writing on socket fd [%d] msg_lenth [%u] bytes sent [%d]",
86 __func__
, time(NULL
), pthread_self(), socket_fd
,
87 msg_length
, bytes_sent
);
90 if (errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) {
91 pcep_log(LOG_WARNING
, "%s: send() failure",
97 total_bytes_sent
+= bytes_sent
;
103 unsigned int read_message(int socket_fd
, char *received_message
,
104 unsigned int max_message_size
)
106 /* TODO what if bytes_read == max_message_size? there could be more to
108 unsigned int bytes_read
=
109 read(socket_fd
, received_message
, max_message_size
);
112 "%s: [%ld-%ld] socket_comm read message bytes_read [%u] on socket fd [%d]",
113 __func__
, time(NULL
), pthread_self(), bytes_read
, socket_fd
);
119 int build_fd_sets(pcep_socket_comm_handle
*socket_comm_handle
)
123 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
125 FD_ZERO(&socket_comm_handle
->except_master_set
);
126 FD_ZERO(&socket_comm_handle
->read_master_set
);
127 ordered_list_node
*node
= socket_comm_handle
->read_list
->head
;
128 pcep_socket_comm_session
*comm_session
;
129 while (node
!= NULL
) {
130 comm_session
= (pcep_socket_comm_session
*)node
->data
;
131 if (comm_session
->socket_fd
> max_fd
) {
132 max_fd
= comm_session
->socket_fd
;
133 } else if (comm_session
->socket_fd
< 0) {
134 pcep_log(LOG_ERR
, "%s: Negative fd", __func__
);
135 assert(comm_session
->socket_fd
> 0);
138 /*pcep_log(LOG_DEBUG, ld] socket_comm::build_fdSets set
140 [%d]", __func__, time(NULL), comm_session->socket_fd);*/
141 FD_SET(comm_session
->socket_fd
,
142 &socket_comm_handle
->read_master_set
);
143 FD_SET(comm_session
->socket_fd
,
144 &socket_comm_handle
->except_master_set
);
145 node
= node
->next_node
;
148 FD_ZERO(&socket_comm_handle
->write_master_set
);
149 node
= socket_comm_handle
->write_list
->head
;
150 while (node
!= NULL
) {
151 comm_session
= (pcep_socket_comm_session
*)node
->data
;
152 if (comm_session
->socket_fd
> max_fd
) {
153 max_fd
= comm_session
->socket_fd
;
154 } else if (comm_session
->socket_fd
< 0) {
155 pcep_log(LOG_ERR
, "%s: Negative fd", __func__
);
156 assert(comm_session
->socket_fd
> 0);
159 /*pcep_log(LOG_DEBUG, "%s: [%ld] socket_comm::build_fdSets set
160 ready_toWrite [%d]", __func__, time(NULL),
161 comm_session->socket_fd);*/
162 FD_SET(comm_session
->socket_fd
,
163 &socket_comm_handle
->write_master_set
);
164 FD_SET(comm_session
->socket_fd
,
165 &socket_comm_handle
->except_master_set
);
166 node
= node
->next_node
;
169 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
175 void handle_reads(pcep_socket_comm_handle
*socket_comm_handle
)
179 * iterate all the socket_fd's in the read_list. it may be that not
180 * all of them have something to read. dont remove the socket_fd
181 * from the read_list since messages could come at any time.
184 /* Notice: Only locking the mutex when accessing the read_list,
185 * since the read callbacks may end up calling back into the socket
186 * comm module to write messages which could be a deadlock. */
187 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
188 ordered_list_node
*node
= socket_comm_handle
->read_list
->head
;
189 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
191 while (node
!= NULL
) {
192 pcep_socket_comm_session
*comm_session
=
193 (pcep_socket_comm_session
*)node
->data
;
195 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
196 node
= node
->next_node
;
197 if (!comm_session_exists(socket_comm_handle
, comm_session
)) {
198 /* This comm_session has been deleted, move on to the
200 pthread_mutex_unlock(
201 &(socket_comm_handle
->socket_comm_mutex
));
205 int is_set
= FD_ISSET(comm_session
->socket_fd
,
206 &(socket_comm_handle
->read_master_set
));
207 /* Upon read failure, the comm_session might be free'd, so we
208 * cant store the received_bytes in the comm_session, until we
209 * know the read was successful. */
210 int received_bytes
= 0;
211 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
214 FD_CLR(comm_session
->socket_fd
,
215 &(socket_comm_handle
->read_master_set
));
217 /* either read the message locally, or call the
218 * message_ready_handler to read it */
219 if (comm_session
->message_handler
!= NULL
) {
220 received_bytes
= read_message(
221 comm_session
->socket_fd
,
222 comm_session
->received_message
,
224 if (received_bytes
> 0) {
225 /* Send the received message to the
227 comm_session
->received_bytes
=
229 comm_session
->message_handler(
230 comm_session
->session_data
,
231 comm_session
->received_message
,
232 comm_session
->received_bytes
);
235 /* Tell the handler a message is ready to be
236 * read. The comm_session may be destroyed in
238 * there is an error reading or if the socket is
242 ->message_ready_to_read_handler(
249 /* handle the read results */
250 if (received_bytes
== 0) {
251 if (comm_session_exists_locking(
252 socket_comm_handle
, comm_session
)) {
253 comm_session
->received_bytes
= 0;
254 /* the socket was closed */
255 /* TODO should we define a socket except
256 * enum? or will the only time we call
257 * this is when the socket is closed??
259 if (comm_session
->conn_except_notifier
261 comm_session
->conn_except_notifier(
268 /* stop reading from the socket if its
272 ->socket_comm_mutex
));
273 ordered_list_remove_first_node_equals(
274 socket_comm_handle
->read_list
,
276 pthread_mutex_unlock(
278 ->socket_comm_mutex
));
280 } else if (received_bytes
< 0) {
281 /* TODO should we call conn_except_notifier()
285 "%s: Error on socket fd [%d] : errno [%d][%s]",
286 __func__
, comm_session
->socket_fd
,
287 errno
, strerror(errno
));
289 comm_session
->received_bytes
= received_bytes
;
296 void handle_writes(pcep_socket_comm_handle
*socket_comm_handle
)
298 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
301 * iterate all the socket_fd's in the write_list. it may be that not
302 * all of them are ready to be written to. only remove the socket_fd
303 * from the list if it is ready to be written to.
306 ordered_list_node
*node
= socket_comm_handle
->write_list
->head
;
307 pcep_socket_comm_session
*comm_session
;
309 while (node
!= NULL
) {
310 comm_session
= (pcep_socket_comm_session
*)node
->data
;
311 node
= node
->next_node
;
314 if (!comm_session_exists(socket_comm_handle
, comm_session
)) {
315 /* This comm_session has been deleted, move on to the
320 if (FD_ISSET(comm_session
->socket_fd
,
321 &(socket_comm_handle
->write_master_set
))) {
322 /* only remove the entry from the list, if it is written
324 ordered_list_remove_first_node_equals(
325 socket_comm_handle
->write_list
, comm_session
);
326 FD_CLR(comm_session
->socket_fd
,
327 &(socket_comm_handle
->write_master_set
));
329 /* dequeue all the comm_session messages and send them
331 pcep_socket_comm_queued_message
*queued_message
=
332 queue_dequeue(comm_session
->message_queue
);
333 while (queued_message
!= NULL
) {
335 write_message(comm_session
->socket_fd
,
336 queued_message
->encoded_message
,
337 queued_message
->msg_length
);
338 if (queued_message
->free_after_send
) {
339 pceplib_free(PCEPLIB_MESSAGES
,
340 (void *)queued_message
343 pceplib_free(PCEPLIB_MESSAGES
, queued_message
);
344 queued_message
= queue_dequeue(
345 comm_session
->message_queue
);
349 /* check if the socket should be closed after writing */
350 if (comm_session
->close_after_write
== true) {
351 if (comm_session
->message_queue
->num_entries
== 0) {
352 /* TODO check to make sure modifying the
353 * write_list while iterating it doesnt cause
357 "%s: handle_writes close() socket fd [%d]",
358 __func__
, comm_session
->socket_fd
);
359 ordered_list_remove_first_node_equals(
360 socket_comm_handle
->read_list
,
362 ordered_list_remove_first_node_equals(
363 socket_comm_handle
->write_list
,
365 close(comm_session
->socket_fd
);
366 comm_session
->socket_fd
= -1;
370 if (comm_session
->message_sent_handler
!= NULL
371 && msg_written
== true) {
372 /* Unlocking to allow the message_sent_handler to
373 * make calls like destroy_socket_comm_session */
374 pthread_mutex_unlock(
375 &(socket_comm_handle
->socket_comm_mutex
));
376 comm_session
->message_sent_handler(
377 comm_session
->session_data
,
378 comm_session
->socket_fd
);
380 &(socket_comm_handle
->socket_comm_mutex
));
384 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
388 void handle_excepts(pcep_socket_comm_handle
*socket_comm_handle
)
390 /* TODO finish this */
391 (void)socket_comm_handle
;
395 /* pcep_socket_comm::initialize_socket_comm_loop() will create a thread and
396 * invoke this method */
397 void *socket_comm_loop(void *data
)
402 "%s: Cannot start socket_comm_loop with NULL pcep_socketcomm_handle",
407 pcep_log(LOG_NOTICE
, "%s: [%ld-%ld] Starting socket_comm_loop thread",
408 __func__
, time(NULL
), pthread_self());
410 pcep_socket_comm_handle
*socket_comm_handle
=
411 (pcep_socket_comm_handle
*)data
;
412 struct timeval timer
;
415 while (socket_comm_handle
->active
) {
416 /* check the FD's every 1/4 sec, 250 milliseconds */
418 timer
.tv_usec
= 250000;
419 max_fd
= build_fd_sets(socket_comm_handle
);
421 if (select(max_fd
, &(socket_comm_handle
->read_master_set
),
422 &(socket_comm_handle
->write_master_set
),
423 &(socket_comm_handle
->except_master_set
), &timer
)
425 /* TODO handle the error */
428 "%s: ERROR socket_comm_loop on select : errno [%d][%s]",
429 __func__
, errno
, strerror(errno
));
432 handle_reads(socket_comm_handle
);
433 handle_writes(socket_comm_handle
);
434 handle_excepts(socket_comm_handle
);
437 pcep_log(LOG_NOTICE
, "%s: [%ld-%ld] Finished socket_comm_loop thread",
438 __func__
, time(NULL
), pthread_self());
443 int pceplib_external_socket_read(int fd
, void *payload
)
445 pcep_socket_comm_handle
*socket_comm_handle
=
446 (pcep_socket_comm_handle
*)payload
;
447 if (socket_comm_handle
== NULL
) {
451 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
452 FD_SET(fd
, &(socket_comm_handle
->read_master_set
));
453 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
455 handle_reads(socket_comm_handle
);
457 /* Get the socket_comm_session */
458 pcep_socket_comm_session find_session
= {.socket_fd
= fd
};
459 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
460 ordered_list_node
*node
=
461 ordered_list_find(socket_comm_handle
->read_list
, &find_session
);
465 socket_comm_handle
->socket_read_func(
466 socket_comm_handle
->external_infra_data
,
467 &((pcep_socket_comm_session
*)node
)
468 ->external_socket_data
,
469 fd
, socket_comm_handle
);
471 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
476 int pceplib_external_socket_write(int fd
, void *payload
)
478 pcep_socket_comm_handle
*socket_comm_handle
=
479 (pcep_socket_comm_handle
*)payload
;
480 if (socket_comm_handle
== NULL
) {
484 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
485 FD_SET(fd
, &(socket_comm_handle
->write_master_set
));
486 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
488 handle_writes(socket_comm_handle
);
490 /* TODO do we need to cancel this FD from writing?? */