2 * This file is part of the PCEPlib, a PCEP protocol library.
4 * Copyright (C) 2020 Volta Networks https://voltanet.io/
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <https://www.gnu.org/licenses/>.
19 * Author : Brady Johnson <brady@voltanet.io>
35 #include "pcep_socket_comm_internals.h"
36 #include "pcep_socket_comm_loop.h"
37 #include "pcep_utils_logging.h"
38 #include "pcep_utils_ordered_list.h"
39 #include "pcep_utils_logging.h"
40 #include "pcep_utils_memory.h"
42 void write_message(int socket_fd
, const char *message
, unsigned int msg_length
);
43 unsigned int read_message(int socket_fd
, char *received_message
,
44 unsigned int max_message_size
);
45 int build_fd_sets(pcep_socket_comm_handle
*socket_comm_handle
);
46 void handle_writes(pcep_socket_comm_handle
*socket_comm_handle
);
47 void handle_excepts(pcep_socket_comm_handle
*socket_comm_handle
);
49 bool comm_session_exists(pcep_socket_comm_handle
*socket_comm_handle
,
50 pcep_socket_comm_session
*socket_comm_session
)
52 if (socket_comm_handle
== NULL
) {
56 return (ordered_list_find(socket_comm_handle
->session_list
,
62 bool comm_session_exists_locking(pcep_socket_comm_handle
*socket_comm_handle
,
63 pcep_socket_comm_session
*socket_comm_session
)
65 if (socket_comm_handle
== NULL
) {
69 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
71 comm_session_exists(socket_comm_handle
, socket_comm_session
);
72 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
78 void write_message(int socket_fd
, const char *message
, unsigned int msg_length
)
80 ssize_t bytes_sent
= 0;
81 unsigned int total_bytes_sent
= 0;
83 while ((uint32_t)bytes_sent
< msg_length
) {
84 bytes_sent
= write(socket_fd
, message
+ total_bytes_sent
,
89 "%s: [%ld-%ld] socket_comm writing on socket fd [%d] msg_lenth [%u] bytes sent [%d]",
90 __func__
, time(NULL
), pthread_self(), socket_fd
,
91 msg_length
, bytes_sent
);
94 if (errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) {
95 pcep_log(LOG_WARNING
, "%s: send() failure",
101 total_bytes_sent
+= bytes_sent
;
107 unsigned int read_message(int socket_fd
, char *received_message
,
108 unsigned int max_message_size
)
110 /* TODO what if bytes_read == max_message_size? there could be more to
112 unsigned int bytes_read
=
113 read(socket_fd
, received_message
, max_message_size
);
116 "%s: [%ld-%ld] socket_comm read message bytes_read [%u] on socket fd [%d]",
117 __func__
, time(NULL
), pthread_self(), bytes_read
, socket_fd
);
123 int build_fd_sets(pcep_socket_comm_handle
*socket_comm_handle
)
127 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
129 FD_ZERO(&socket_comm_handle
->except_master_set
);
130 FD_ZERO(&socket_comm_handle
->read_master_set
);
131 ordered_list_node
*node
= socket_comm_handle
->read_list
->head
;
132 pcep_socket_comm_session
*comm_session
;
133 while (node
!= NULL
) {
134 comm_session
= (pcep_socket_comm_session
*)node
->data
;
135 if (comm_session
->socket_fd
> max_fd
) {
136 max_fd
= comm_session
->socket_fd
;
137 } else if (comm_session
->socket_fd
< 0) {
138 pcep_log(LOG_ERR
, "%s: Negative fd", __func__
);
139 assert(comm_session
->socket_fd
> 0);
142 /*pcep_log(LOG_DEBUG, ld] socket_comm::build_fdSets set
144 [%d]", __func__, time(NULL), comm_session->socket_fd);*/
145 FD_SET(comm_session
->socket_fd
,
146 &socket_comm_handle
->read_master_set
);
147 FD_SET(comm_session
->socket_fd
,
148 &socket_comm_handle
->except_master_set
);
149 node
= node
->next_node
;
152 FD_ZERO(&socket_comm_handle
->write_master_set
);
153 node
= socket_comm_handle
->write_list
->head
;
154 while (node
!= NULL
) {
155 comm_session
= (pcep_socket_comm_session
*)node
->data
;
156 if (comm_session
->socket_fd
> max_fd
) {
157 max_fd
= comm_session
->socket_fd
;
158 } else if (comm_session
->socket_fd
< 0) {
159 pcep_log(LOG_ERR
, "%s: Negative fd", __func__
);
160 assert(comm_session
->socket_fd
> 0);
163 /*pcep_log(LOG_DEBUG, "%s: [%ld] socket_comm::build_fdSets set
164 ready_toWrite [%d]", __func__, time(NULL),
165 comm_session->socket_fd);*/
166 FD_SET(comm_session
->socket_fd
,
167 &socket_comm_handle
->write_master_set
);
168 FD_SET(comm_session
->socket_fd
,
169 &socket_comm_handle
->except_master_set
);
170 node
= node
->next_node
;
173 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
179 void handle_reads(pcep_socket_comm_handle
*socket_comm_handle
)
183 * iterate all the socket_fd's in the read_list. it may be that not
184 * all of them have something to read. dont remove the socket_fd
185 * from the read_list since messages could come at any time.
188 /* Notice: Only locking the mutex when accessing the read_list,
189 * since the read callbacks may end up calling back into the socket
190 * comm module to write messages which could be a deadlock. */
191 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
192 ordered_list_node
*node
= socket_comm_handle
->read_list
->head
;
193 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
195 while (node
!= NULL
) {
196 pcep_socket_comm_session
*comm_session
=
197 (pcep_socket_comm_session
*)node
->data
;
199 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
200 node
= node
->next_node
;
201 if (!comm_session_exists(socket_comm_handle
, comm_session
)) {
202 /* This comm_session has been deleted, move on to the
204 pthread_mutex_unlock(
205 &(socket_comm_handle
->socket_comm_mutex
));
209 int is_set
= FD_ISSET(comm_session
->socket_fd
,
210 &(socket_comm_handle
->read_master_set
));
211 /* Upon read failure, the comm_session might be free'd, so we
212 * cant store the received_bytes in the comm_session, until we
213 * know the read was successful. */
214 int received_bytes
= 0;
215 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
218 FD_CLR(comm_session
->socket_fd
,
219 &(socket_comm_handle
->read_master_set
));
221 /* either read the message locally, or call the
222 * message_ready_handler to read it */
223 if (comm_session
->message_handler
!= NULL
) {
224 received_bytes
= read_message(
225 comm_session
->socket_fd
,
226 comm_session
->received_message
,
228 if (received_bytes
> 0) {
229 /* Send the received message to the
231 comm_session
->received_bytes
=
233 comm_session
->message_handler(
234 comm_session
->session_data
,
235 comm_session
->received_message
,
236 comm_session
->received_bytes
);
239 /* Tell the handler a message is ready to be
240 * read. The comm_session may be destroyed in
242 * there is an error reading or if the socket is
246 ->message_ready_to_read_handler(
253 /* handle the read results */
254 if (received_bytes
== 0) {
255 if (comm_session_exists_locking(
256 socket_comm_handle
, comm_session
)) {
257 comm_session
->received_bytes
= 0;
258 /* the socket was closed */
259 /* TODO should we define a socket except
260 * enum? or will the only time we call
261 * this is when the socket is closed??
263 if (comm_session
->conn_except_notifier
265 comm_session
->conn_except_notifier(
272 /* stop reading from the socket if its
276 ->socket_comm_mutex
));
277 ordered_list_remove_first_node_equals(
278 socket_comm_handle
->read_list
,
280 pthread_mutex_unlock(
282 ->socket_comm_mutex
));
284 } else if (received_bytes
< 0) {
285 /* TODO should we call conn_except_notifier()
289 "%s: Error on socket fd [%d] : errno [%d][%s]",
290 __func__
, comm_session
->socket_fd
,
291 errno
, strerror(errno
));
293 comm_session
->received_bytes
= received_bytes
;
300 void handle_writes(pcep_socket_comm_handle
*socket_comm_handle
)
302 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
305 * iterate all the socket_fd's in the write_list. it may be that not
306 * all of them are ready to be written to. only remove the socket_fd
307 * from the list if it is ready to be written to.
310 ordered_list_node
*node
= socket_comm_handle
->write_list
->head
;
311 pcep_socket_comm_session
*comm_session
;
313 while (node
!= NULL
) {
314 comm_session
= (pcep_socket_comm_session
*)node
->data
;
315 node
= node
->next_node
;
318 if (!comm_session_exists(socket_comm_handle
, comm_session
)) {
319 /* This comm_session has been deleted, move on to the
324 if (FD_ISSET(comm_session
->socket_fd
,
325 &(socket_comm_handle
->write_master_set
))) {
326 /* only remove the entry from the list, if it is written
328 ordered_list_remove_first_node_equals(
329 socket_comm_handle
->write_list
, comm_session
);
330 FD_CLR(comm_session
->socket_fd
,
331 &(socket_comm_handle
->write_master_set
));
333 /* dequeue all the comm_session messages and send them
335 pcep_socket_comm_queued_message
*queued_message
=
336 queue_dequeue(comm_session
->message_queue
);
337 while (queued_message
!= NULL
) {
339 write_message(comm_session
->socket_fd
,
340 queued_message
->encoded_message
,
341 queued_message
->msg_length
);
342 if (queued_message
->free_after_send
) {
343 pceplib_free(PCEPLIB_MESSAGES
,
344 (void *)queued_message
347 pceplib_free(PCEPLIB_MESSAGES
, queued_message
);
348 queued_message
= queue_dequeue(
349 comm_session
->message_queue
);
353 /* check if the socket should be closed after writing */
354 if (comm_session
->close_after_write
== true) {
355 if (comm_session
->message_queue
->num_entries
== 0) {
356 /* TODO check to make sure modifying the
357 * write_list while iterating it doesn't cause
361 "%s: handle_writes close() socket fd [%d]",
362 __func__
, comm_session
->socket_fd
);
363 ordered_list_remove_first_node_equals(
364 socket_comm_handle
->read_list
,
366 ordered_list_remove_first_node_equals(
367 socket_comm_handle
->write_list
,
369 close(comm_session
->socket_fd
);
370 comm_session
->socket_fd
= -1;
374 if (comm_session
->message_sent_handler
!= NULL
375 && msg_written
== true) {
376 /* Unlocking to allow the message_sent_handler to
377 * make calls like destroy_socket_comm_session */
378 pthread_mutex_unlock(
379 &(socket_comm_handle
->socket_comm_mutex
));
380 comm_session
->message_sent_handler(
381 comm_session
->session_data
,
382 comm_session
->socket_fd
);
384 &(socket_comm_handle
->socket_comm_mutex
));
388 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
392 void handle_excepts(pcep_socket_comm_handle
*socket_comm_handle
)
394 /* TODO finish this */
395 (void)socket_comm_handle
;
399 /* pcep_socket_comm::initialize_socket_comm_loop() will create a thread and
400 * invoke this method */
401 void *socket_comm_loop(void *data
)
406 "%s: Cannot start socket_comm_loop with NULL pcep_socketcomm_handle",
411 pcep_log(LOG_NOTICE
, "%s: [%ld-%ld] Starting socket_comm_loop thread",
412 __func__
, time(NULL
), pthread_self());
414 pcep_socket_comm_handle
*socket_comm_handle
=
415 (pcep_socket_comm_handle
*)data
;
416 struct timeval timer
;
419 while (socket_comm_handle
->active
) {
420 /* check the FD's every 1/4 sec, 250 milliseconds */
422 timer
.tv_usec
= 250000;
423 max_fd
= build_fd_sets(socket_comm_handle
);
425 if (select(max_fd
, &(socket_comm_handle
->read_master_set
),
426 &(socket_comm_handle
->write_master_set
),
427 &(socket_comm_handle
->except_master_set
), &timer
)
429 /* TODO handle the error */
432 "%s: ERROR socket_comm_loop on select : errno [%d][%s]",
433 __func__
, errno
, strerror(errno
));
436 handle_reads(socket_comm_handle
);
437 handle_writes(socket_comm_handle
);
438 handle_excepts(socket_comm_handle
);
441 pcep_log(LOG_NOTICE
, "%s: [%ld-%ld] Finished socket_comm_loop thread",
442 __func__
, time(NULL
), pthread_self());
447 int pceplib_external_socket_read(int fd
, void *payload
)
449 pcep_socket_comm_handle
*socket_comm_handle
=
450 (pcep_socket_comm_handle
*)payload
;
451 if (socket_comm_handle
== NULL
) {
455 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
456 FD_SET(fd
, &(socket_comm_handle
->read_master_set
));
457 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
459 handle_reads(socket_comm_handle
);
461 /* Get the socket_comm_session */
462 pcep_socket_comm_session find_session
= {.socket_fd
= fd
};
463 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
464 ordered_list_node
*node
=
465 ordered_list_find(socket_comm_handle
->read_list
, &find_session
);
469 socket_comm_handle
->socket_read_func(
470 socket_comm_handle
->external_infra_data
,
471 &((pcep_socket_comm_session
*)node
)
472 ->external_socket_data
,
473 fd
, socket_comm_handle
);
475 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
480 int pceplib_external_socket_write(int fd
, void *payload
)
482 pcep_socket_comm_handle
*socket_comm_handle
=
483 (pcep_socket_comm_handle
*)payload
;
484 if (socket_comm_handle
== NULL
) {
488 pthread_mutex_lock(&(socket_comm_handle
->socket_comm_mutex
));
489 FD_SET(fd
, &(socket_comm_handle
->write_master_set
));
490 pthread_mutex_unlock(&(socket_comm_handle
->socket_comm_mutex
));
492 handle_writes(socket_comm_handle
);
494 /* TODO do we need to cancel this FD from writing?? */