]> git.proxmox.com Git - mirror_frr.git/blob - pceplib/pcep_socket_comm_loop.c
Merge pull request #8078 from idryzhov/fix-zebra-vni
[mirror_frr.git] / pceplib / pcep_socket_comm_loop.c
1 /*
2 * This file is part of the PCEPlib, a PCEP protocol library.
3 *
4 * Copyright (C) 2020 Volta Networks https://voltanet.io/
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <https://www.gnu.org/licenses/>.
18 *
19 * Author : Brady Johnson <brady@voltanet.io>
20 *
21 */
22
23
24 #include <errno.h>
25 #include <stdbool.h>
26 #include <stddef.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include <assert.h>
30
31 #include "pcep_socket_comm_internals.h"
32 #include "pcep_socket_comm_loop.h"
33 #include "pcep_utils_logging.h"
34 #include "pcep_utils_ordered_list.h"
35 #include "pcep_utils_logging.h"
36 #include "pcep_utils_memory.h"
37
38 void write_message(int socket_fd, const char *message, unsigned int msg_length);
39 unsigned int read_message(int socket_fd, char *received_message,
40 unsigned int max_message_size);
41 int build_fd_sets(pcep_socket_comm_handle *socket_comm_handle);
42 void handle_writes(pcep_socket_comm_handle *socket_comm_handle);
43 void handle_excepts(pcep_socket_comm_handle *socket_comm_handle);
44
45 bool comm_session_exists(pcep_socket_comm_handle *socket_comm_handle,
46 pcep_socket_comm_session *socket_comm_session)
47 {
48 if (socket_comm_handle == NULL) {
49 return false;
50 }
51
52 return (ordered_list_find(socket_comm_handle->session_list,
53 socket_comm_session)
54 != NULL);
55 }
56
57
58 bool comm_session_exists_locking(pcep_socket_comm_handle *socket_comm_handle,
59 pcep_socket_comm_session *socket_comm_session)
60 {
61 if (socket_comm_handle == NULL) {
62 return false;
63 }
64
65 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
66 bool exists =
67 comm_session_exists(socket_comm_handle, socket_comm_session);
68 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
69
70 return exists;
71 }
72
73
74 void write_message(int socket_fd, const char *message, unsigned int msg_length)
75 {
76 ssize_t bytes_sent = 0;
77 unsigned int total_bytes_sent = 0;
78
79 while ((uint32_t)bytes_sent < msg_length) {
80 bytes_sent = write(socket_fd, message + total_bytes_sent,
81 msg_length);
82
83 pcep_log(
84 LOG_INFO,
85 "%s: [%ld-%ld] socket_comm writing on socket fd [%d] msg_lenth [%u] bytes sent [%d]",
86 __func__, time(NULL), pthread_self(), socket_fd,
87 msg_length, bytes_sent);
88
89 if (bytes_sent < 0) {
90 if (errno != EAGAIN && errno != EWOULDBLOCK) {
91 pcep_log(LOG_WARNING, "%s: send() failure",
92 __func__);
93
94 return;
95 }
96 } else {
97 total_bytes_sent += bytes_sent;
98 }
99 }
100 }
101
102
103 unsigned int read_message(int socket_fd, char *received_message,
104 unsigned int max_message_size)
105 {
106 /* TODO what if bytes_read == max_message_size? there could be more to
107 * read */
108 unsigned int bytes_read =
109 read(socket_fd, received_message, max_message_size);
110 pcep_log(
111 LOG_INFO,
112 "%s: [%ld-%ld] socket_comm read message bytes_read [%u] on socket fd [%d]",
113 __func__, time(NULL), pthread_self(), bytes_read, socket_fd);
114
115 return bytes_read;
116 }
117
118
119 int build_fd_sets(pcep_socket_comm_handle *socket_comm_handle)
120 {
121 int max_fd = 0;
122
123 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
124
125 FD_ZERO(&socket_comm_handle->except_master_set);
126 FD_ZERO(&socket_comm_handle->read_master_set);
127 ordered_list_node *node = socket_comm_handle->read_list->head;
128 pcep_socket_comm_session *comm_session;
129 while (node != NULL) {
130 comm_session = (pcep_socket_comm_session *)node->data;
131 if (comm_session->socket_fd > max_fd) {
132 max_fd = comm_session->socket_fd;
133 } else if (comm_session->socket_fd < 0) {
134 pcep_log(LOG_ERR, "%s: Negative fd", __func__);
135 assert(comm_session->socket_fd > 0);
136 }
137
138 /*pcep_log(LOG_DEBUG, ld] socket_comm::build_fdSets set
139 ready_toRead
140 [%d]", __func__, time(NULL), comm_session->socket_fd);*/
141 FD_SET(comm_session->socket_fd,
142 &socket_comm_handle->read_master_set);
143 FD_SET(comm_session->socket_fd,
144 &socket_comm_handle->except_master_set);
145 node = node->next_node;
146 }
147
148 FD_ZERO(&socket_comm_handle->write_master_set);
149 node = socket_comm_handle->write_list->head;
150 while (node != NULL) {
151 comm_session = (pcep_socket_comm_session *)node->data;
152 if (comm_session->socket_fd > max_fd) {
153 max_fd = comm_session->socket_fd;
154 } else if (comm_session->socket_fd < 0) {
155 pcep_log(LOG_ERR, "%s: Negative fd", __func__);
156 assert(comm_session->socket_fd > 0);
157 }
158
159 /*pcep_log(LOG_DEBUG, "%s: [%ld] socket_comm::build_fdSets set
160 ready_toWrite [%d]", __func__, time(NULL),
161 comm_session->socket_fd);*/
162 FD_SET(comm_session->socket_fd,
163 &socket_comm_handle->write_master_set);
164 FD_SET(comm_session->socket_fd,
165 &socket_comm_handle->except_master_set);
166 node = node->next_node;
167 }
168
169 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
170
171 return max_fd + 1;
172 }
173
174
175 void handle_reads(pcep_socket_comm_handle *socket_comm_handle)
176 {
177
178 /*
179 * iterate all the socket_fd's in the read_list. it may be that not
180 * all of them have something to read. dont remove the socket_fd
181 * from the read_list since messages could come at any time.
182 */
183
184 /* Notice: Only locking the mutex when accessing the read_list,
185 * since the read callbacks may end up calling back into the socket
186 * comm module to write messages which could be a deadlock. */
187 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
188 ordered_list_node *node = socket_comm_handle->read_list->head;
189 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
190
191 while (node != NULL) {
192 pcep_socket_comm_session *comm_session =
193 (pcep_socket_comm_session *)node->data;
194
195 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
196 node = node->next_node;
197 if (!comm_session_exists(socket_comm_handle, comm_session)) {
198 /* This comm_session has been deleted, move on to the
199 * next one */
200 pthread_mutex_unlock(
201 &(socket_comm_handle->socket_comm_mutex));
202 continue;
203 }
204
205 int is_set = FD_ISSET(comm_session->socket_fd,
206 &(socket_comm_handle->read_master_set));
207 /* Upon read failure, the comm_session might be free'd, so we
208 * cant store the received_bytes in the comm_session, until we
209 * know the read was successful. */
210 int received_bytes = 0;
211 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
212
213 if (is_set) {
214 FD_CLR(comm_session->socket_fd,
215 &(socket_comm_handle->read_master_set));
216
217 /* either read the message locally, or call the
218 * message_ready_handler to read it */
219 if (comm_session->message_handler != NULL) {
220 received_bytes = read_message(
221 comm_session->socket_fd,
222 comm_session->received_message,
223 MAX_RECVD_MSG_SIZE);
224 if (received_bytes > 0) {
225 /* Send the received message to the
226 * handler */
227 comm_session->received_bytes =
228 received_bytes;
229 comm_session->message_handler(
230 comm_session->session_data,
231 comm_session->received_message,
232 comm_session->received_bytes);
233 }
234 } else {
235 /* Tell the handler a message is ready to be
236 * read. The comm_session may be destroyed in
237 * this call, if
238 * there is an error reading or if the socket is
239 * closed. */
240 received_bytes =
241 comm_session
242 ->message_ready_to_read_handler(
243 comm_session
244 ->session_data,
245 comm_session
246 ->socket_fd);
247 }
248
249 /* handle the read results */
250 if (received_bytes == 0) {
251 if (comm_session_exists_locking(
252 socket_comm_handle, comm_session)) {
253 comm_session->received_bytes = 0;
254 /* the socket was closed */
255 /* TODO should we define a socket except
256 * enum? or will the only time we call
257 * this is when the socket is closed??
258 */
259 if (comm_session->conn_except_notifier
260 != NULL) {
261 comm_session->conn_except_notifier(
262 comm_session
263 ->session_data,
264 comm_session
265 ->socket_fd);
266 }
267
268 /* stop reading from the socket if its
269 * closed */
270 pthread_mutex_lock(
271 &(socket_comm_handle
272 ->socket_comm_mutex));
273 ordered_list_remove_first_node_equals(
274 socket_comm_handle->read_list,
275 comm_session);
276 pthread_mutex_unlock(
277 &(socket_comm_handle
278 ->socket_comm_mutex));
279 }
280 } else if (received_bytes < 0) {
281 /* TODO should we call conn_except_notifier()
282 * here ? */
283 pcep_log(
284 LOG_WARNING,
285 "%s: Error on socket fd [%d] : errno [%d][%s]",
286 __func__, comm_session->socket_fd,
287 errno, strerror(errno));
288 } else {
289 comm_session->received_bytes = received_bytes;
290 }
291 }
292 }
293 }
294
295
296 void handle_writes(pcep_socket_comm_handle *socket_comm_handle)
297 {
298 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
299
300 /*
301 * iterate all the socket_fd's in the write_list. it may be that not
302 * all of them are ready to be written to. only remove the socket_fd
303 * from the list if it is ready to be written to.
304 */
305
306 ordered_list_node *node = socket_comm_handle->write_list->head;
307 pcep_socket_comm_session *comm_session;
308 bool msg_written;
309 while (node != NULL) {
310 comm_session = (pcep_socket_comm_session *)node->data;
311 node = node->next_node;
312 msg_written = false;
313
314 if (!comm_session_exists(socket_comm_handle, comm_session)) {
315 /* This comm_session has been deleted, move on to the
316 * next one */
317 continue;
318 }
319
320 if (FD_ISSET(comm_session->socket_fd,
321 &(socket_comm_handle->write_master_set))) {
322 /* only remove the entry from the list, if it is written
323 * to */
324 ordered_list_remove_first_node_equals(
325 socket_comm_handle->write_list, comm_session);
326 FD_CLR(comm_session->socket_fd,
327 &(socket_comm_handle->write_master_set));
328
329 /* dequeue all the comm_session messages and send them
330 */
331 pcep_socket_comm_queued_message *queued_message =
332 queue_dequeue(comm_session->message_queue);
333 while (queued_message != NULL) {
334 msg_written = true;
335 write_message(comm_session->socket_fd,
336 queued_message->encoded_message,
337 queued_message->msg_length);
338 if (queued_message->free_after_send) {
339 pceplib_free(PCEPLIB_MESSAGES,
340 (void *)queued_message
341 ->encoded_message);
342 }
343 pceplib_free(PCEPLIB_MESSAGES, queued_message);
344 queued_message = queue_dequeue(
345 comm_session->message_queue);
346 }
347 }
348
349 /* check if the socket should be closed after writing */
350 if (comm_session->close_after_write == true) {
351 if (comm_session->message_queue->num_entries == 0) {
352 /* TODO check to make sure modifying the
353 * write_list while iterating it doesnt cause
354 * problems. */
355 pcep_log(
356 LOG_DEBUG,
357 "%s: handle_writes close() socket fd [%d]",
358 __func__, comm_session->socket_fd);
359 ordered_list_remove_first_node_equals(
360 socket_comm_handle->read_list,
361 comm_session);
362 ordered_list_remove_first_node_equals(
363 socket_comm_handle->write_list,
364 comm_session);
365 close(comm_session->socket_fd);
366 comm_session->socket_fd = -1;
367 }
368 }
369
370 if (comm_session->message_sent_handler != NULL
371 && msg_written == true) {
372 /* Unlocking to allow the message_sent_handler to
373 * make calls like destroy_socket_comm_session */
374 pthread_mutex_unlock(
375 &(socket_comm_handle->socket_comm_mutex));
376 comm_session->message_sent_handler(
377 comm_session->session_data,
378 comm_session->socket_fd);
379 pthread_mutex_lock(
380 &(socket_comm_handle->socket_comm_mutex));
381 }
382 }
383
384 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
385 }
386
387
388 void handle_excepts(pcep_socket_comm_handle *socket_comm_handle)
389 {
390 /* TODO finish this */
391 (void)socket_comm_handle;
392 }
393
394
395 /* pcep_socket_comm::initialize_socket_comm_loop() will create a thread and
396 * invoke this method */
397 void *socket_comm_loop(void *data)
398 {
399 if (data == NULL) {
400 pcep_log(
401 LOG_WARNING,
402 "%s: Cannot start socket_comm_loop with NULL pcep_socketcomm_handle",
403 __func__);
404 return NULL;
405 }
406
407 pcep_log(LOG_NOTICE, "%s: [%ld-%ld] Starting socket_comm_loop thread",
408 __func__, time(NULL), pthread_self());
409
410 pcep_socket_comm_handle *socket_comm_handle =
411 (pcep_socket_comm_handle *)data;
412 struct timeval timer;
413 int max_fd;
414
415 while (socket_comm_handle->active) {
416 /* check the FD's every 1/4 sec, 250 milliseconds */
417 timer.tv_sec = 0;
418 timer.tv_usec = 250000;
419 max_fd = build_fd_sets(socket_comm_handle);
420
421 if (select(max_fd, &(socket_comm_handle->read_master_set),
422 &(socket_comm_handle->write_master_set),
423 &(socket_comm_handle->except_master_set), &timer)
424 < 0) {
425 /* TODO handle the error */
426 pcep_log(
427 LOG_WARNING,
428 "%s: ERROR socket_comm_loop on select : errno [%d][%s]",
429 __func__, errno, strerror(errno));
430 }
431
432 handle_reads(socket_comm_handle);
433 handle_writes(socket_comm_handle);
434 handle_excepts(socket_comm_handle);
435 }
436
437 pcep_log(LOG_NOTICE, "%s: [%ld-%ld] Finished socket_comm_loop thread",
438 __func__, time(NULL), pthread_self());
439
440 return NULL;
441 }
442
443 int pceplib_external_socket_read(int fd, void *payload)
444 {
445 pcep_socket_comm_handle *socket_comm_handle =
446 (pcep_socket_comm_handle *)payload;
447 if (socket_comm_handle == NULL) {
448 return -1;
449 }
450
451 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
452 FD_SET(fd, &(socket_comm_handle->read_master_set));
453 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
454
455 handle_reads(socket_comm_handle);
456
457 /* Get the socket_comm_session */
458 pcep_socket_comm_session find_session = {.socket_fd = fd};
459 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
460 ordered_list_node *node =
461 ordered_list_find(socket_comm_handle->read_list, &find_session);
462
463 /* read again */
464 if (node != NULL) {
465 socket_comm_handle->socket_read_func(
466 socket_comm_handle->external_infra_data,
467 &((pcep_socket_comm_session *)node)
468 ->external_socket_data,
469 fd, socket_comm_handle);
470 }
471 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
472
473 return 0;
474 }
475
476 int pceplib_external_socket_write(int fd, void *payload)
477 {
478 pcep_socket_comm_handle *socket_comm_handle =
479 (pcep_socket_comm_handle *)payload;
480 if (socket_comm_handle == NULL) {
481 return -1;
482 }
483
484 pthread_mutex_lock(&(socket_comm_handle->socket_comm_mutex));
485 FD_SET(fd, &(socket_comm_handle->write_master_set));
486 pthread_mutex_unlock(&(socket_comm_handle->socket_comm_mutex));
487
488 handle_writes(socket_comm_handle);
489
490 /* TODO do we need to cancel this FD from writing?? */
491
492 return 0;
493 }