]> git.proxmox.com Git - mirror_frr.git/blame - lib/thread.c
libs: make the task cancellation struct private
[mirror_frr.git] / lib / thread.c
CommitLineData
718e3744 1/* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
896014f4
DL
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
718e3744 19 */
20
21/* #define DEBUG */
22
23#include <zebra.h>
308d14ae 24#include <sys/resource.h>
718e3744 25
26#include "thread.h"
27#include "memory.h"
3e41733f 28#include "frrcu.h"
718e3744 29#include "log.h"
e04ab74d 30#include "hash.h"
31#include "command.h"
05c447dd 32#include "sigevent.h"
3bf2673b 33#include "network.h"
bd74dc61 34#include "jhash.h"
fbcac826 35#include "frratomic.h"
00dffa8c 36#include "frr_pthread.h"
9ef9495e 37#include "lib_errors.h"
912d45a1 38#include "libfrr_trace.h"
1a9f340b 39#include "libfrr.h"
d6be5fb9 40
d62a17ae 41DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
4a1ab8e4 42DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
a6f235f3 43DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
d62a17ae 44DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
4a1ab8e4 45
c284542b
DL
46DECLARE_LIST(thread_list, struct thread, threaditem)
47
aea25d1e
MS
48struct cancel_req {
49 int flags;
50 struct thread *thread;
51 void *eventobj;
52 struct thread **threadref;
53};
54
55/* Flags for task cancellation */
56#define THREAD_CANCEL_FLAG_READY 0x01
57
27d29ced
DL
58static int thread_timer_cmp(const struct thread *a, const struct thread *b)
59{
60 if (a->u.sands.tv_sec < b->u.sands.tv_sec)
61 return -1;
62 if (a->u.sands.tv_sec > b->u.sands.tv_sec)
63 return 1;
64 if (a->u.sands.tv_usec < b->u.sands.tv_usec)
65 return -1;
66 if (a->u.sands.tv_usec > b->u.sands.tv_usec)
67 return 1;
68 return 0;
69}
70
71DECLARE_HEAP(thread_timer_list, struct thread, timeritem,
72 thread_timer_cmp)
73
3b96b781
HT
74#if defined(__APPLE__)
75#include <mach/mach.h>
76#include <mach/mach_time.h>
77#endif
78
d62a17ae 79#define AWAKEN(m) \
80 do { \
2b64873d 81 const unsigned char wakebyte = 0x01; \
d62a17ae 82 write(m->io_pipe[1], &wakebyte, 1); \
83 } while (0);
3bf2673b 84
62f44022 85/* control variable for initializer */
c17faa4b 86static pthread_once_t init_once = PTHREAD_ONCE_INIT;
e0bebc7c 87pthread_key_t thread_current;
6b0655a2 88
c17faa4b 89static pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
62f44022
QY
90static struct list *masters;
91
6655966d 92static void thread_free(struct thread_master *master, struct thread *thread);
6b0655a2 93
62f44022 94/* CLI start ---------------------------------------------------------------- */
d8b87afe 95static unsigned int cpu_record_hash_key(const struct cpu_thread_history *a)
e04ab74d 96{
883cc51d 97 int size = sizeof(a->func);
bd74dc61
DS
98
99 return jhash(&a->func, size, 0);
e04ab74d 100}
101
74df8d6d 102static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
d62a17ae 103 const struct cpu_thread_history *b)
e04ab74d 104{
d62a17ae 105 return a->func == b->func;
e04ab74d 106}
107
d62a17ae 108static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
e04ab74d 109{
d62a17ae 110 struct cpu_thread_history *new;
111 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
112 new->func = a->func;
113 new->funcname = a->funcname;
114 return new;
e04ab74d 115}
116
d62a17ae 117static void cpu_record_hash_free(void *a)
228da428 118{
d62a17ae 119 struct cpu_thread_history *hist = a;
120
121 XFREE(MTYPE_THREAD_STATS, hist);
228da428
CC
122}
123
f75e802d 124#ifndef EXCLUDE_CPU_TIME
d62a17ae 125static void vty_out_cpu_thread_history(struct vty *vty,
126 struct cpu_thread_history *a)
e04ab74d 127{
9a8a7b0e 128 vty_out(vty, "%5zu %10zu.%03zu %9zu %8zu %9zu %8zu %9zu",
72327cf3
MS
129 a->total_active, a->cpu.total / 1000, a->cpu.total % 1000,
130 a->total_calls, (a->cpu.total / a->total_calls), a->cpu.max,
131 (a->real.total / a->total_calls), a->real.max);
84d951d0 132 vty_out(vty, " %c%c%c%c%c %s\n",
d62a17ae 133 a->types & (1 << THREAD_READ) ? 'R' : ' ',
134 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
135 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
136 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
137 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
e04ab74d 138}
139
e3b78da8 140static void cpu_record_hash_print(struct hash_bucket *bucket, void *args[])
e04ab74d 141{
d62a17ae 142 struct cpu_thread_history *totals = args[0];
fbcac826 143 struct cpu_thread_history copy;
d62a17ae 144 struct vty *vty = args[1];
fbcac826 145 uint8_t *filter = args[2];
d62a17ae 146
147 struct cpu_thread_history *a = bucket->data;
148
fbcac826
QY
149 copy.total_active =
150 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
151 copy.total_calls =
152 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
153 copy.cpu.total =
154 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
155 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
156 copy.real.total =
157 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
158 copy.real.max =
159 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
160 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
161 copy.funcname = a->funcname;
162
163 if (!(copy.types & *filter))
d62a17ae 164 return;
fbcac826
QY
165
166 vty_out_cpu_thread_history(vty, &copy);
167 totals->total_active += copy.total_active;
168 totals->total_calls += copy.total_calls;
169 totals->real.total += copy.real.total;
170 if (totals->real.max < copy.real.max)
171 totals->real.max = copy.real.max;
172 totals->cpu.total += copy.cpu.total;
173 if (totals->cpu.max < copy.cpu.max)
174 totals->cpu.max = copy.cpu.max;
e04ab74d 175}
176
fbcac826 177static void cpu_record_print(struct vty *vty, uint8_t filter)
e04ab74d 178{
d62a17ae 179 struct cpu_thread_history tmp;
180 void *args[3] = {&tmp, vty, &filter};
181 struct thread_master *m;
182 struct listnode *ln;
183
0d6f7fd6 184 memset(&tmp, 0, sizeof(tmp));
d62a17ae 185 tmp.funcname = "TOTAL";
186 tmp.types = filter;
187
00dffa8c 188 frr_with_mutex(&masters_mtx) {
d62a17ae 189 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
190 const char *name = m->name ? m->name : "main";
191
192 char underline[strlen(name) + 1];
193 memset(underline, '-', sizeof(underline));
4f113d60 194 underline[sizeof(underline) - 1] = '\0';
d62a17ae 195
196 vty_out(vty, "\n");
197 vty_out(vty, "Showing statistics for pthread %s\n",
198 name);
199 vty_out(vty, "-------------------------------%s\n",
200 underline);
84d951d0 201 vty_out(vty, "%30s %18s %18s\n", "",
d62a17ae 202 "CPU (user+system):", "Real (wall-clock):");
203 vty_out(vty,
204 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
205 vty_out(vty, " Avg uSec Max uSecs");
206 vty_out(vty, " Type Thread\n");
207
208 if (m->cpu_record->count)
209 hash_iterate(
210 m->cpu_record,
e3b78da8 211 (void (*)(struct hash_bucket *,
d62a17ae 212 void *))cpu_record_hash_print,
213 args);
214 else
215 vty_out(vty, "No data to display yet.\n");
216
217 vty_out(vty, "\n");
218 }
219 }
d62a17ae 220
221 vty_out(vty, "\n");
222 vty_out(vty, "Total thread statistics\n");
223 vty_out(vty, "-------------------------\n");
84d951d0 224 vty_out(vty, "%30s %18s %18s\n", "",
d62a17ae 225 "CPU (user+system):", "Real (wall-clock):");
226 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
227 vty_out(vty, " Avg uSec Max uSecs");
228 vty_out(vty, " Type Thread\n");
229
230 if (tmp.total_calls > 0)
231 vty_out_cpu_thread_history(vty, &tmp);
e04ab74d 232}
f75e802d 233#endif
e04ab74d 234
e3b78da8 235static void cpu_record_hash_clear(struct hash_bucket *bucket, void *args[])
e276eb82 236{
fbcac826 237 uint8_t *filter = args[0];
d62a17ae 238 struct hash *cpu_record = args[1];
239
240 struct cpu_thread_history *a = bucket->data;
62f44022 241
d62a17ae 242 if (!(a->types & *filter))
243 return;
f48f65d2 244
d62a17ae 245 hash_release(cpu_record, bucket->data);
e276eb82
PJ
246}
247
fbcac826 248static void cpu_record_clear(uint8_t filter)
e276eb82 249{
fbcac826 250 uint8_t *tmp = &filter;
d62a17ae 251 struct thread_master *m;
252 struct listnode *ln;
253
00dffa8c 254 frr_with_mutex(&masters_mtx) {
d62a17ae 255 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
00dffa8c 256 frr_with_mutex(&m->mtx) {
d62a17ae 257 void *args[2] = {tmp, m->cpu_record};
258 hash_iterate(
259 m->cpu_record,
e3b78da8 260 (void (*)(struct hash_bucket *,
d62a17ae 261 void *))cpu_record_hash_clear,
262 args);
263 }
d62a17ae 264 }
265 }
62f44022
QY
266}
267
fbcac826 268static uint8_t parse_filter(const char *filterstr)
62f44022 269{
d62a17ae 270 int i = 0;
271 int filter = 0;
272
273 while (filterstr[i] != '\0') {
274 switch (filterstr[i]) {
275 case 'r':
276 case 'R':
277 filter |= (1 << THREAD_READ);
278 break;
279 case 'w':
280 case 'W':
281 filter |= (1 << THREAD_WRITE);
282 break;
283 case 't':
284 case 'T':
285 filter |= (1 << THREAD_TIMER);
286 break;
287 case 'e':
288 case 'E':
289 filter |= (1 << THREAD_EVENT);
290 break;
291 case 'x':
292 case 'X':
293 filter |= (1 << THREAD_EXECUTE);
294 break;
295 default:
296 break;
297 }
298 ++i;
299 }
300 return filter;
62f44022
QY
301}
302
f75e802d 303#ifndef EXCLUDE_CPU_TIME
62f44022
QY
304DEFUN (show_thread_cpu,
305 show_thread_cpu_cmd,
306 "show thread cpu [FILTER]",
307 SHOW_STR
308 "Thread information\n"
309 "Thread CPU usage\n"
61fa0b97 310 "Display filter (rwtex)\n")
62f44022 311{
fbcac826 312 uint8_t filter = (uint8_t)-1U;
d62a17ae 313 int idx = 0;
314
315 if (argv_find(argv, argc, "FILTER", &idx)) {
316 filter = parse_filter(argv[idx]->arg);
317 if (!filter) {
318 vty_out(vty,
3efd0893 319 "Invalid filter \"%s\" specified; must contain at leastone of 'RWTEXB'\n",
d62a17ae 320 argv[idx]->arg);
321 return CMD_WARNING;
322 }
323 }
324
325 cpu_record_print(vty, filter);
326 return CMD_SUCCESS;
e276eb82 327}
f75e802d 328#endif
e276eb82 329
8872626b
DS
330static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
331{
332 const char *name = m->name ? m->name : "main";
333 char underline[strlen(name) + 1];
a0b36ae6 334 struct thread *thread;
8872626b
DS
335 uint32_t i;
336
337 memset(underline, '-', sizeof(underline));
338 underline[sizeof(underline) - 1] = '\0';
339
340 vty_out(vty, "\nShowing poll FD's for %s\n", name);
341 vty_out(vty, "----------------------%s\n", underline);
6c19478a
DS
342 vty_out(vty, "Count: %u/%d\n", (uint32_t)m->handler.pfdcount,
343 m->fd_limit);
a0b36ae6
DS
344 for (i = 0; i < m->handler.pfdcount; i++) {
345 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\t\t", i,
346 m->handler.pfds[i].fd, m->handler.pfds[i].events,
8872626b 347 m->handler.pfds[i].revents);
a0b36ae6
DS
348
349 if (m->handler.pfds[i].events & POLLIN) {
350 thread = m->read[m->handler.pfds[i].fd];
351
352 if (!thread)
353 vty_out(vty, "ERROR ");
354 else
60a3efec 355 vty_out(vty, "%s ", thread->xref->funcname);
a0b36ae6
DS
356 } else
357 vty_out(vty, " ");
358
359 if (m->handler.pfds[i].events & POLLOUT) {
360 thread = m->write[m->handler.pfds[i].fd];
361
362 if (!thread)
363 vty_out(vty, "ERROR\n");
364 else
60a3efec 365 vty_out(vty, "%s\n", thread->xref->funcname);
a0b36ae6
DS
366 } else
367 vty_out(vty, "\n");
368 }
8872626b
DS
369}
370
371DEFUN (show_thread_poll,
372 show_thread_poll_cmd,
373 "show thread poll",
374 SHOW_STR
375 "Thread information\n"
376 "Show poll FD's and information\n")
377{
378 struct listnode *node;
379 struct thread_master *m;
380
00dffa8c 381 frr_with_mutex(&masters_mtx) {
8872626b
DS
382 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
383 show_thread_poll_helper(vty, m);
384 }
385 }
8872626b
DS
386
387 return CMD_SUCCESS;
388}
389
390
49d41a26
DS
391DEFUN (clear_thread_cpu,
392 clear_thread_cpu_cmd,
393 "clear thread cpu [FILTER]",
62f44022 394 "Clear stored data in all pthreads\n"
49d41a26
DS
395 "Thread information\n"
396 "Thread CPU usage\n"
397 "Display filter (rwtexb)\n")
e276eb82 398{
fbcac826 399 uint8_t filter = (uint8_t)-1U;
d62a17ae 400 int idx = 0;
401
402 if (argv_find(argv, argc, "FILTER", &idx)) {
403 filter = parse_filter(argv[idx]->arg);
404 if (!filter) {
405 vty_out(vty,
3efd0893 406 "Invalid filter \"%s\" specified; must contain at leastone of 'RWTEXB'\n",
d62a17ae 407 argv[idx]->arg);
408 return CMD_WARNING;
409 }
410 }
411
412 cpu_record_clear(filter);
413 return CMD_SUCCESS;
e276eb82 414}
6b0655a2 415
d62a17ae 416void thread_cmd_init(void)
0b84f294 417{
f75e802d 418#ifndef EXCLUDE_CPU_TIME
d62a17ae 419 install_element(VIEW_NODE, &show_thread_cpu_cmd);
f75e802d 420#endif
8872626b 421 install_element(VIEW_NODE, &show_thread_poll_cmd);
d62a17ae 422 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
0b84f294 423}
62f44022
QY
424/* CLI end ------------------------------------------------------------------ */
425
0b84f294 426
d62a17ae 427static void cancelreq_del(void *cr)
63ccb9cb 428{
d62a17ae 429 XFREE(MTYPE_TMP, cr);
63ccb9cb
QY
430}
431
e0bebc7c 432/* initializer, only ever called once */
4d762f26 433static void initializer(void)
e0bebc7c 434{
d62a17ae 435 pthread_key_create(&thread_current, NULL);
e0bebc7c
QY
436}
437
d62a17ae 438struct thread_master *thread_master_create(const char *name)
718e3744 439{
d62a17ae 440 struct thread_master *rv;
441 struct rlimit limit;
442
443 pthread_once(&init_once, &initializer);
444
445 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
d62a17ae 446
447 /* Initialize master mutex */
448 pthread_mutex_init(&rv->mtx, NULL);
449 pthread_cond_init(&rv->cancel_cond, NULL);
450
451 /* Set name */
7ffcd8bd
QY
452 name = name ? name : "default";
453 rv->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
d62a17ae 454
455 /* Initialize I/O task data structures */
1a9f340b
MS
456
457 /* Use configured limit if present, ulimit otherwise. */
458 rv->fd_limit = frr_get_fd_limit();
459 if (rv->fd_limit == 0) {
460 getrlimit(RLIMIT_NOFILE, &limit);
461 rv->fd_limit = (int)limit.rlim_cur;
462 }
463
a6f235f3
DS
464 rv->read = XCALLOC(MTYPE_THREAD_POLL,
465 sizeof(struct thread *) * rv->fd_limit);
466
467 rv->write = XCALLOC(MTYPE_THREAD_POLL,
468 sizeof(struct thread *) * rv->fd_limit);
d62a17ae 469
7ffcd8bd
QY
470 char tmhashname[strlen(name) + 32];
471 snprintf(tmhashname, sizeof(tmhashname), "%s - threadmaster event hash",
472 name);
bd74dc61 473 rv->cpu_record = hash_create_size(
d8b87afe 474 8, (unsigned int (*)(const void *))cpu_record_hash_key,
74df8d6d 475 (bool (*)(const void *, const void *))cpu_record_hash_cmp,
7ffcd8bd 476 tmhashname);
d62a17ae 477
c284542b
DL
478 thread_list_init(&rv->event);
479 thread_list_init(&rv->ready);
480 thread_list_init(&rv->unuse);
27d29ced 481 thread_timer_list_init(&rv->timer);
d62a17ae 482
483 /* Initialize thread_fetch() settings */
484 rv->spin = true;
485 rv->handle_signals = true;
486
487 /* Set pthread owner, should be updated by actual owner */
488 rv->owner = pthread_self();
489 rv->cancel_req = list_new();
490 rv->cancel_req->del = cancelreq_del;
491 rv->canceled = true;
492
493 /* Initialize pipe poker */
494 pipe(rv->io_pipe);
495 set_nonblocking(rv->io_pipe[0]);
496 set_nonblocking(rv->io_pipe[1]);
497
498 /* Initialize data structures for poll() */
499 rv->handler.pfdsize = rv->fd_limit;
500 rv->handler.pfdcount = 0;
501 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
502 sizeof(struct pollfd) * rv->handler.pfdsize);
503 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
504 sizeof(struct pollfd) * rv->handler.pfdsize);
505
eff09c66 506 /* add to list of threadmasters */
00dffa8c 507 frr_with_mutex(&masters_mtx) {
eff09c66
QY
508 if (!masters)
509 masters = list_new();
510
d62a17ae 511 listnode_add(masters, rv);
512 }
d62a17ae 513
514 return rv;
718e3744 515}
516
d8a8a8de
QY
517void thread_master_set_name(struct thread_master *master, const char *name)
518{
00dffa8c 519 frr_with_mutex(&master->mtx) {
0a22ddfb 520 XFREE(MTYPE_THREAD_MASTER, master->name);
d8a8a8de
QY
521 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
522 }
d8a8a8de
QY
523}
524
6ed04aa2
DS
525#define THREAD_UNUSED_DEPTH 10
526
718e3744 527/* Move thread to unuse list. */
d62a17ae 528static void thread_add_unuse(struct thread_master *m, struct thread *thread)
718e3744 529{
6655966d
RZ
530 pthread_mutex_t mtxc = thread->mtx;
531
d62a17ae 532 assert(m != NULL && thread != NULL);
d62a17ae 533
d62a17ae 534 thread->hist->total_active--;
6ed04aa2
DS
535 memset(thread, 0, sizeof(struct thread));
536 thread->type = THREAD_UNUSED;
537
6655966d
RZ
538 /* Restore the thread mutex context. */
539 thread->mtx = mtxc;
540
c284542b
DL
541 if (thread_list_count(&m->unuse) < THREAD_UNUSED_DEPTH) {
542 thread_list_add_tail(&m->unuse, thread);
6655966d
RZ
543 return;
544 }
545
546 thread_free(m, thread);
718e3744 547}
548
549/* Free all unused thread. */
c284542b
DL
550static void thread_list_free(struct thread_master *m,
551 struct thread_list_head *list)
718e3744 552{
d62a17ae 553 struct thread *t;
d62a17ae 554
c284542b 555 while ((t = thread_list_pop(list)))
6655966d 556 thread_free(m, t);
718e3744 557}
558
d62a17ae 559static void thread_array_free(struct thread_master *m,
560 struct thread **thread_array)
308d14ae 561{
d62a17ae 562 struct thread *t;
563 int index;
564
565 for (index = 0; index < m->fd_limit; ++index) {
566 t = thread_array[index];
567 if (t) {
568 thread_array[index] = NULL;
6655966d 569 thread_free(m, t);
d62a17ae 570 }
571 }
a6f235f3 572 XFREE(MTYPE_THREAD_POLL, thread_array);
308d14ae
DV
573}
574
495f0b13
DS
575/*
576 * thread_master_free_unused
577 *
578 * As threads are finished with they are put on the
579 * unuse list for later reuse.
580 * If we are shutting down, Free up unused threads
581 * So we can see if we forget to shut anything off
582 */
d62a17ae 583void thread_master_free_unused(struct thread_master *m)
495f0b13 584{
00dffa8c 585 frr_with_mutex(&m->mtx) {
d62a17ae 586 struct thread *t;
c284542b 587 while ((t = thread_list_pop(&m->unuse)))
6655966d 588 thread_free(m, t);
d62a17ae 589 }
495f0b13
DS
590}
591
718e3744 592/* Stop thread scheduler. */
d62a17ae 593void thread_master_free(struct thread_master *m)
718e3744 594{
27d29ced
DL
595 struct thread *t;
596
00dffa8c 597 frr_with_mutex(&masters_mtx) {
d62a17ae 598 listnode_delete(masters, m);
eff09c66 599 if (masters->count == 0) {
6a154c88 600 list_delete(&masters);
eff09c66 601 }
d62a17ae 602 }
d62a17ae 603
604 thread_array_free(m, m->read);
605 thread_array_free(m, m->write);
27d29ced
DL
606 while ((t = thread_timer_list_pop(&m->timer)))
607 thread_free(m, t);
d62a17ae 608 thread_list_free(m, &m->event);
609 thread_list_free(m, &m->ready);
610 thread_list_free(m, &m->unuse);
611 pthread_mutex_destroy(&m->mtx);
33844bbe 612 pthread_cond_destroy(&m->cancel_cond);
d62a17ae 613 close(m->io_pipe[0]);
614 close(m->io_pipe[1]);
6a154c88 615 list_delete(&m->cancel_req);
1a0a92ea 616 m->cancel_req = NULL;
d62a17ae 617
618 hash_clean(m->cpu_record, cpu_record_hash_free);
619 hash_free(m->cpu_record);
620 m->cpu_record = NULL;
621
0a22ddfb 622 XFREE(MTYPE_THREAD_MASTER, m->name);
d62a17ae 623 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
624 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
625 XFREE(MTYPE_THREAD_MASTER, m);
718e3744 626}
627
78ca0342
CF
628/* Return remain time in miliseconds. */
629unsigned long thread_timer_remain_msec(struct thread *thread)
718e3744 630{
d62a17ae 631 int64_t remain;
1189d95f 632
00dffa8c 633 frr_with_mutex(&thread->mtx) {
78ca0342 634 remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
d62a17ae 635 }
1189d95f 636
d62a17ae 637 return remain < 0 ? 0 : remain;
718e3744 638}
639
78ca0342
CF
640/* Return remain time in seconds. */
641unsigned long thread_timer_remain_second(struct thread *thread)
642{
643 return thread_timer_remain_msec(thread) / 1000LL;
644}
645
d62a17ae 646struct timeval thread_timer_remain(struct thread *thread)
6ac44687 647{
d62a17ae 648 struct timeval remain;
00dffa8c 649 frr_with_mutex(&thread->mtx) {
d62a17ae 650 monotime_until(&thread->u.sands, &remain);
651 }
d62a17ae 652 return remain;
6ac44687
CF
653}
654
0447957e
AK
655static int time_hhmmss(char *buf, int buf_size, long sec)
656{
657 long hh;
658 long mm;
659 int wr;
660
661 zassert(buf_size >= 8);
662
663 hh = sec / 3600;
664 sec %= 3600;
665 mm = sec / 60;
666 sec %= 60;
667
668 wr = snprintf(buf, buf_size, "%02ld:%02ld:%02ld", hh, mm, sec);
669
670 return wr != 8;
671}
672
673char *thread_timer_to_hhmmss(char *buf, int buf_size,
674 struct thread *t_timer)
675{
676 if (t_timer) {
677 time_hhmmss(buf, buf_size,
678 thread_timer_remain_second(t_timer));
679 } else {
680 snprintf(buf, buf_size, "--:--:--");
681 }
682 return buf;
683}
684
718e3744 685/* Get new thread. */
d7c0a89a 686static struct thread *thread_get(struct thread_master *m, uint8_t type,
d62a17ae 687 int (*func)(struct thread *), void *arg,
60a3efec 688 const struct xref_threadsched *xref)
718e3744 689{
c284542b 690 struct thread *thread = thread_list_pop(&m->unuse);
d62a17ae 691 struct cpu_thread_history tmp;
692
693 if (!thread) {
694 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
695 /* mutex only needs to be initialized at struct creation. */
696 pthread_mutex_init(&thread->mtx, NULL);
697 m->alloc++;
698 }
699
700 thread->type = type;
701 thread->add_type = type;
702 thread->master = m;
703 thread->arg = arg;
d62a17ae 704 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
705 thread->ref = NULL;
706
707 /*
708 * So if the passed in funcname is not what we have
709 * stored that means the thread->hist needs to be
710 * updated. We keep the last one around in unused
711 * under the assumption that we are probably
712 * going to immediately allocate the same
713 * type of thread.
714 * This hopefully saves us some serious
715 * hash_get lookups.
716 */
60a3efec
DL
717 if ((thread->xref && thread->xref->funcname != xref->funcname)
718 || thread->func != func) {
d62a17ae 719 tmp.func = func;
60a3efec 720 tmp.funcname = xref->funcname;
d62a17ae 721 thread->hist =
722 hash_get(m->cpu_record, &tmp,
723 (void *(*)(void *))cpu_record_hash_alloc);
724 }
725 thread->hist->total_active++;
726 thread->func = func;
60a3efec 727 thread->xref = xref;
d62a17ae 728
729 return thread;
718e3744 730}
731
6655966d
RZ
732static void thread_free(struct thread_master *master, struct thread *thread)
733{
734 /* Update statistics. */
735 assert(master->alloc > 0);
736 master->alloc--;
737
738 /* Free allocated resources. */
739 pthread_mutex_destroy(&thread->mtx);
740 XFREE(MTYPE_THREAD, thread);
741}
742
d81ca9a3
MS
743static int fd_poll(struct thread_master *m, const struct timeval *timer_wait,
744 bool *eintr_p)
209a72a6 745{
d81ca9a3
MS
746 sigset_t origsigs;
747 unsigned char trash[64];
748 nfds_t count = m->handler.copycount;
749
d279ef57
DS
750 /*
751 * If timer_wait is null here, that means poll() should block
752 * indefinitely, unless the thread_master has overridden it by setting
d62a17ae 753 * ->selectpoll_timeout.
d279ef57 754 *
d62a17ae 755 * If the value is positive, it specifies the maximum number of
d279ef57
DS
756 * milliseconds to wait. If the timeout is -1, it specifies that
757 * we should never wait and always return immediately even if no
758 * event is detected. If the value is zero, the behavior is default.
759 */
d62a17ae 760 int timeout = -1;
761
762 /* number of file descriptors with events */
763 int num;
764
765 if (timer_wait != NULL
766 && m->selectpoll_timeout == 0) // use the default value
767 timeout = (timer_wait->tv_sec * 1000)
768 + (timer_wait->tv_usec / 1000);
769 else if (m->selectpoll_timeout > 0) // use the user's timeout
770 timeout = m->selectpoll_timeout;
771 else if (m->selectpoll_timeout
772 < 0) // effect a poll (return immediately)
773 timeout = 0;
774
0bdeb5e5 775 zlog_tls_buffer_flush();
3e41733f
DL
776 rcu_read_unlock();
777 rcu_assert_read_unlocked();
778
d62a17ae 779 /* add poll pipe poker */
d81ca9a3
MS
780 assert(count + 1 < m->handler.pfdsize);
781 m->handler.copy[count].fd = m->io_pipe[0];
782 m->handler.copy[count].events = POLLIN;
783 m->handler.copy[count].revents = 0x00;
784
785 /* We need to deal with a signal-handling race here: we
786 * don't want to miss a crucial signal, such as SIGTERM or SIGINT,
787 * that may arrive just before we enter poll(). We will block the
788 * key signals, then check whether any have arrived - if so, we return
789 * before calling poll(). If not, we'll re-enable the signals
790 * in the ppoll() call.
791 */
792
793 sigemptyset(&origsigs);
794 if (m->handle_signals) {
795 /* Main pthread that handles the app signals */
796 if (frr_sigevent_check(&origsigs)) {
797 /* Signal to process - restore signal mask and return */
798 pthread_sigmask(SIG_SETMASK, &origsigs, NULL);
799 num = -1;
800 *eintr_p = true;
801 goto done;
802 }
803 } else {
804 /* Don't make any changes for the non-main pthreads */
805 pthread_sigmask(SIG_SETMASK, NULL, &origsigs);
806 }
d62a17ae 807
d81ca9a3
MS
808#if defined(HAVE_PPOLL)
809 struct timespec ts, *tsp;
810
811 if (timeout >= 0) {
812 ts.tv_sec = timeout / 1000;
813 ts.tv_nsec = (timeout % 1000) * 1000000;
814 tsp = &ts;
815 } else
816 tsp = NULL;
817
818 num = ppoll(m->handler.copy, count + 1, tsp, &origsigs);
819 pthread_sigmask(SIG_SETMASK, &origsigs, NULL);
820#else
821 /* Not ideal - there is a race after we restore the signal mask */
822 pthread_sigmask(SIG_SETMASK, &origsigs, NULL);
823 num = poll(m->handler.copy, count + 1, timeout);
824#endif
d62a17ae 825
d81ca9a3
MS
826done:
827
828 if (num < 0 && errno == EINTR)
829 *eintr_p = true;
830
831 if (num > 0 && m->handler.copy[count].revents != 0 && num--)
d62a17ae 832 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
833 ;
834
3e41733f
DL
835 rcu_read_lock();
836
d62a17ae 837 return num;
209a72a6
DS
838}
839
718e3744 840/* Add new read thread. */
60a3efec
DL
841struct thread *_thread_add_read_write(const struct xref_threadsched *xref,
842 struct thread_master *m,
843 int (*func)(struct thread *),
844 void *arg, int fd, struct thread **t_ptr)
718e3744 845{
60a3efec 846 int dir = xref->thread_type;
d62a17ae 847 struct thread *thread = NULL;
1ef14bee 848 struct thread **thread_array;
d62a17ae 849
abf96a87 850 if (dir == THREAD_READ)
6c3aa850
DL
851 frrtrace(9, frr_libfrr, schedule_read, m,
852 xref->funcname, xref->xref.file, xref->xref.line,
853 t_ptr, fd, 0, arg, 0);
abf96a87 854 else
6c3aa850
DL
855 frrtrace(9, frr_libfrr, schedule_write, m,
856 xref->funcname, xref->xref.file, xref->xref.line,
857 t_ptr, fd, 0, arg, 0);
abf96a87 858
9b864cd3 859 assert(fd >= 0 && fd < m->fd_limit);
00dffa8c
DL
860 frr_with_mutex(&m->mtx) {
861 if (t_ptr && *t_ptr)
862 // thread is already scheduled; don't reschedule
863 break;
d62a17ae 864
865 /* default to a new pollfd */
866 nfds_t queuepos = m->handler.pfdcount;
867
1ef14bee
DS
868 if (dir == THREAD_READ)
869 thread_array = m->read;
870 else
871 thread_array = m->write;
872
d62a17ae 873 /* if we already have a pollfd for our file descriptor, find and
874 * use it */
875 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
876 if (m->handler.pfds[i].fd == fd) {
877 queuepos = i;
1ef14bee
DS
878
879#ifdef DEV_BUILD
880 /*
881 * What happens if we have a thread already
882 * created for this event?
883 */
884 if (thread_array[fd])
885 assert(!"Thread already scheduled for file descriptor");
886#endif
d62a17ae 887 break;
888 }
889
890 /* make sure we have room for this fd + pipe poker fd */
891 assert(queuepos + 1 < m->handler.pfdsize);
892
60a3efec 893 thread = thread_get(m, dir, func, arg, xref);
d62a17ae 894
895 m->handler.pfds[queuepos].fd = fd;
896 m->handler.pfds[queuepos].events |=
897 (dir == THREAD_READ ? POLLIN : POLLOUT);
898
899 if (queuepos == m->handler.pfdcount)
900 m->handler.pfdcount++;
901
902 if (thread) {
00dffa8c 903 frr_with_mutex(&thread->mtx) {
d62a17ae 904 thread->u.fd = fd;
1ef14bee 905 thread_array[thread->u.fd] = thread;
d62a17ae 906 }
d62a17ae 907
908 if (t_ptr) {
909 *t_ptr = thread;
910 thread->ref = t_ptr;
911 }
912 }
913
914 AWAKEN(m);
915 }
d62a17ae 916
917 return thread;
718e3744 918}
919
56a94b36 920static struct thread *
60a3efec
DL
921_thread_add_timer_timeval(const struct xref_threadsched *xref,
922 struct thread_master *m, int (*func)(struct thread *),
923 int type, void *arg, struct timeval *time_relative,
924 struct thread **t_ptr)
718e3744 925{
d62a17ae 926 struct thread *thread;
d62a17ae 927
928 assert(m != NULL);
929
930 assert(type == THREAD_TIMER);
931 assert(time_relative);
932
6c3aa850
DL
933 frrtrace(9, frr_libfrr, schedule_timer, m,
934 xref->funcname, xref->xref.file, xref->xref.line,
c7bb4f00 935 t_ptr, 0, 0, arg, (long)time_relative->tv_sec);
abf96a87 936
00dffa8c
DL
937 frr_with_mutex(&m->mtx) {
938 if (t_ptr && *t_ptr)
d279ef57 939 /* thread is already scheduled; don't reschedule */
d62a17ae 940 return NULL;
d62a17ae 941
60a3efec 942 thread = thread_get(m, type, func, arg, xref);
d62a17ae 943
00dffa8c 944 frr_with_mutex(&thread->mtx) {
d62a17ae 945 monotime(&thread->u.sands);
946 timeradd(&thread->u.sands, time_relative,
947 &thread->u.sands);
27d29ced 948 thread_timer_list_add(&m->timer, thread);
d62a17ae 949 if (t_ptr) {
950 *t_ptr = thread;
951 thread->ref = t_ptr;
952 }
953 }
d62a17ae 954
955 AWAKEN(m);
956 }
d62a17ae 957
958 return thread;
9e867fe6 959}
960
98c91ac6 961
962/* Add timer event thread. */
60a3efec
DL
963struct thread *_thread_add_timer(const struct xref_threadsched *xref,
964 struct thread_master *m,
965 int (*func)(struct thread *),
966 void *arg, long timer, struct thread **t_ptr)
9e867fe6 967{
d62a17ae 968 struct timeval trel;
9e867fe6 969
d62a17ae 970 assert(m != NULL);
9e867fe6 971
d62a17ae 972 trel.tv_sec = timer;
973 trel.tv_usec = 0;
9e867fe6 974
60a3efec
DL
975 return _thread_add_timer_timeval(xref, m, func, THREAD_TIMER, arg,
976 &trel, t_ptr);
98c91ac6 977}
9e867fe6 978
98c91ac6 979/* Add timer event thread with "millisecond" resolution */
60a3efec
DL
980struct thread *_thread_add_timer_msec(const struct xref_threadsched *xref,
981 struct thread_master *m,
982 int (*func)(struct thread *),
983 void *arg, long timer,
984 struct thread **t_ptr)
98c91ac6 985{
d62a17ae 986 struct timeval trel;
9e867fe6 987
d62a17ae 988 assert(m != NULL);
718e3744 989
d62a17ae 990 trel.tv_sec = timer / 1000;
991 trel.tv_usec = 1000 * (timer % 1000);
98c91ac6 992
60a3efec
DL
993 return _thread_add_timer_timeval(xref, m, func, THREAD_TIMER, arg,
994 &trel, t_ptr);
a48b4e6d 995}
996
d03c4cbd 997/* Add timer event thread with "millisecond" resolution */
60a3efec
DL
998struct thread *_thread_add_timer_tv(const struct xref_threadsched *xref,
999 struct thread_master *m,
1000 int (*func)(struct thread *),
1001 void *arg, struct timeval *tv,
1002 struct thread **t_ptr)
d03c4cbd 1003{
60a3efec
DL
1004 return _thread_add_timer_timeval(xref, m, func, THREAD_TIMER, arg, tv,
1005 t_ptr);
d03c4cbd
DL
1006}
1007
718e3744 1008/* Add simple event thread. */
60a3efec
DL
1009struct thread *_thread_add_event(const struct xref_threadsched *xref,
1010 struct thread_master *m,
1011 int (*func)(struct thread *),
1012 void *arg, int val, struct thread **t_ptr)
718e3744 1013{
00dffa8c 1014 struct thread *thread = NULL;
d62a17ae 1015
6c3aa850
DL
1016 frrtrace(9, frr_libfrr, schedule_event, m,
1017 xref->funcname, xref->xref.file, xref->xref.line,
c7bb4f00 1018 t_ptr, 0, val, arg, 0);
abf96a87 1019
d62a17ae 1020 assert(m != NULL);
1021
00dffa8c
DL
1022 frr_with_mutex(&m->mtx) {
1023 if (t_ptr && *t_ptr)
d279ef57 1024 /* thread is already scheduled; don't reschedule */
00dffa8c 1025 break;
d62a17ae 1026
60a3efec 1027 thread = thread_get(m, THREAD_EVENT, func, arg, xref);
00dffa8c 1028 frr_with_mutex(&thread->mtx) {
d62a17ae 1029 thread->u.val = val;
c284542b 1030 thread_list_add_tail(&m->event, thread);
d62a17ae 1031 }
d62a17ae 1032
1033 if (t_ptr) {
1034 *t_ptr = thread;
1035 thread->ref = t_ptr;
1036 }
1037
1038 AWAKEN(m);
1039 }
d62a17ae 1040
1041 return thread;
718e3744 1042}
1043
63ccb9cb
QY
1044/* Thread cancellation ------------------------------------------------------ */
1045
8797240e
QY
1046/**
1047 * NOT's out the .events field of pollfd corresponding to the given file
1048 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
1049 *
1050 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
1051 * implementation for details.
1052 *
1053 * @param master
1054 * @param fd
1055 * @param state the event to cancel. One or more (OR'd together) of the
1056 * following:
1057 * - POLLIN
1058 * - POLLOUT
1059 */
d62a17ae 1060static void thread_cancel_rw(struct thread_master *master, int fd, short state)
0a95a0d0 1061{
42d74538
QY
1062 bool found = false;
1063
d62a17ae 1064 /* Cancel POLLHUP too just in case some bozo set it */
1065 state |= POLLHUP;
1066
1067 /* find the index of corresponding pollfd */
1068 nfds_t i;
1069
1070 for (i = 0; i < master->handler.pfdcount; i++)
42d74538
QY
1071 if (master->handler.pfds[i].fd == fd) {
1072 found = true;
d62a17ae 1073 break;
42d74538
QY
1074 }
1075
1076 if (!found) {
1077 zlog_debug(
1078 "[!] Received cancellation request for nonexistent rw job");
1079 zlog_debug("[!] threadmaster: %s | fd: %d",
996c9314 1080 master->name ? master->name : "", fd);
42d74538
QY
1081 return;
1082 }
d62a17ae 1083
1084 /* NOT out event. */
1085 master->handler.pfds[i].events &= ~(state);
1086
1087 /* If all events are canceled, delete / resize the pollfd array. */
1088 if (master->handler.pfds[i].events == 0) {
1089 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
1090 (master->handler.pfdcount - i - 1)
1091 * sizeof(struct pollfd));
1092 master->handler.pfdcount--;
e985cda0
S
1093 master->handler.pfds[master->handler.pfdcount].fd = 0;
1094 master->handler.pfds[master->handler.pfdcount].events = 0;
d62a17ae 1095 }
1096
1097 /* If we have the same pollfd in the copy, perform the same operations,
1098 * otherwise return. */
1099 if (i >= master->handler.copycount)
1100 return;
1101
1102 master->handler.copy[i].events &= ~(state);
1103
1104 if (master->handler.copy[i].events == 0) {
1105 memmove(master->handler.copy + i, master->handler.copy + i + 1,
1106 (master->handler.copycount - i - 1)
1107 * sizeof(struct pollfd));
1108 master->handler.copycount--;
e985cda0
S
1109 master->handler.copy[master->handler.copycount].fd = 0;
1110 master->handler.copy[master->handler.copycount].events = 0;
d62a17ae 1111 }
0a95a0d0
DS
1112}
1113
1189d95f 1114/**
63ccb9cb 1115 * Process cancellation requests.
1189d95f 1116 *
63ccb9cb
QY
1117 * This may only be run from the pthread which owns the thread_master.
1118 *
1119 * @param master the thread master to process
1120 * @REQUIRE master->mtx
1189d95f 1121 */
d62a17ae 1122static void do_thread_cancel(struct thread_master *master)
718e3744 1123{
c284542b 1124 struct thread_list_head *list = NULL;
d62a17ae 1125 struct thread **thread_array = NULL;
1126 struct thread *thread;
1127
1128 struct cancel_req *cr;
1129 struct listnode *ln;
1130 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
d279ef57
DS
1131 /*
1132 * If this is an event object cancellation, linear search
1133 * through event list deleting any events which have the
1134 * specified argument. We also need to check every thread
1135 * in the ready queue.
1136 */
d62a17ae 1137 if (cr->eventobj) {
1138 struct thread *t;
c284542b 1139
81fddbe7 1140 frr_each_safe(thread_list, &master->event, t) {
c284542b
DL
1141 if (t->arg != cr->eventobj)
1142 continue;
1143 thread_list_del(&master->event, t);
1144 if (t->ref)
1145 *t->ref = NULL;
1146 thread_add_unuse(master, t);
d62a17ae 1147 }
1148
81fddbe7 1149 frr_each_safe(thread_list, &master->ready, t) {
c284542b
DL
1150 if (t->arg != cr->eventobj)
1151 continue;
1152 thread_list_del(&master->ready, t);
1153 if (t->ref)
1154 *t->ref = NULL;
1155 thread_add_unuse(master, t);
d62a17ae 1156 }
1157 continue;
1158 }
1159
d279ef57
DS
1160 /*
1161 * The pointer varies depending on whether the cancellation
1162 * request was made asynchronously or not. If it was, we
1163 * need to check whether the thread even exists anymore
1164 * before cancelling it.
1165 */
d62a17ae 1166 thread = (cr->thread) ? cr->thread : *cr->threadref;
1167
1168 if (!thread)
1169 continue;
1170
1171 /* Determine the appropriate queue to cancel the thread from */
1172 switch (thread->type) {
1173 case THREAD_READ:
1174 thread_cancel_rw(master, thread->u.fd, POLLIN);
1175 thread_array = master->read;
1176 break;
1177 case THREAD_WRITE:
1178 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1179 thread_array = master->write;
1180 break;
1181 case THREAD_TIMER:
27d29ced 1182 thread_timer_list_del(&master->timer, thread);
d62a17ae 1183 break;
1184 case THREAD_EVENT:
1185 list = &master->event;
1186 break;
1187 case THREAD_READY:
1188 list = &master->ready;
1189 break;
1190 default:
1191 continue;
1192 break;
1193 }
1194
27d29ced 1195 if (list) {
c284542b 1196 thread_list_del(list, thread);
d62a17ae 1197 } else if (thread_array) {
1198 thread_array[thread->u.fd] = NULL;
d62a17ae 1199 }
1200
1201 if (thread->ref)
1202 *thread->ref = NULL;
1203
1204 thread_add_unuse(thread->master, thread);
1205 }
1206
1207 /* Delete and free all cancellation requests */
41b21bfa
MS
1208 if (master->cancel_req)
1209 list_delete_all_node(master->cancel_req);
d62a17ae 1210
1211 /* Wake up any threads which may be blocked in thread_cancel_async() */
1212 master->canceled = true;
1213 pthread_cond_broadcast(&master->cancel_cond);
718e3744 1214}
1215
63ccb9cb
QY
1216/**
1217 * Cancel any events which have the specified argument.
1218 *
1219 * MT-Unsafe
1220 *
1221 * @param m the thread_master to cancel from
1222 * @param arg the argument passed when creating the event
1223 */
d62a17ae 1224void thread_cancel_event(struct thread_master *master, void *arg)
718e3744 1225{
d62a17ae 1226 assert(master->owner == pthread_self());
1227
00dffa8c 1228 frr_with_mutex(&master->mtx) {
d62a17ae 1229 struct cancel_req *cr =
1230 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1231 cr->eventobj = arg;
1232 listnode_add(master->cancel_req, cr);
1233 do_thread_cancel(master);
1234 }
63ccb9cb 1235}
1189d95f 1236
63ccb9cb
QY
1237/**
1238 * Cancel a specific task.
1239 *
1240 * MT-Unsafe
1241 *
1242 * @param thread task to cancel
1243 */
b3d6bc6e 1244void thread_cancel(struct thread **thread)
63ccb9cb 1245{
b3d6bc6e
MS
1246 struct thread_master *master;
1247
1248 if (thread == NULL || *thread == NULL)
1249 return;
1250
1251 master = (*thread)->master;
d62a17ae 1252
6c3aa850
DL
1253 frrtrace(9, frr_libfrr, thread_cancel, master,
1254 (*thread)->xref->funcname, (*thread)->xref->xref.file,
1255 (*thread)->xref->xref.line, NULL, (*thread)->u.fd,
b4d6e855 1256 (*thread)->u.val, (*thread)->arg, (*thread)->u.sands.tv_sec);
abf96a87 1257
6ed04aa2
DS
1258 assert(master->owner == pthread_self());
1259
00dffa8c 1260 frr_with_mutex(&master->mtx) {
d62a17ae 1261 struct cancel_req *cr =
1262 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
b3d6bc6e 1263 cr->thread = *thread;
6ed04aa2
DS
1264 listnode_add(master->cancel_req, cr);
1265 do_thread_cancel(master);
d62a17ae 1266 }
b3d6bc6e
MS
1267
1268 *thread = NULL;
63ccb9cb 1269}
1189d95f 1270
63ccb9cb
QY
1271/**
1272 * Asynchronous cancellation.
1273 *
8797240e
QY
1274 * Called with either a struct thread ** or void * to an event argument,
1275 * this function posts the correct cancellation request and blocks until it is
1276 * serviced.
63ccb9cb
QY
1277 *
1278 * If the thread is currently running, execution blocks until it completes.
1279 *
8797240e
QY
1280 * The last two parameters are mutually exclusive, i.e. if you pass one the
1281 * other must be NULL.
1282 *
1283 * When the cancellation procedure executes on the target thread_master, the
1284 * thread * provided is checked for nullity. If it is null, the thread is
1285 * assumed to no longer exist and the cancellation request is a no-op. Thus
1286 * users of this API must pass a back-reference when scheduling the original
1287 * task.
1288 *
63ccb9cb
QY
1289 * MT-Safe
1290 *
8797240e
QY
1291 * @param master the thread master with the relevant event / task
1292 * @param thread pointer to thread to cancel
1293 * @param eventobj the event
63ccb9cb 1294 */
d62a17ae 1295void thread_cancel_async(struct thread_master *master, struct thread **thread,
1296 void *eventobj)
63ccb9cb 1297{
d62a17ae 1298 assert(!(thread && eventobj) && (thread || eventobj));
abf96a87
QY
1299
1300 if (thread && *thread)
c7bb4f00 1301 frrtrace(9, frr_libfrr, thread_cancel_async, master,
6c3aa850
DL
1302 (*thread)->xref->funcname, (*thread)->xref->xref.file,
1303 (*thread)->xref->xref.line, NULL, (*thread)->u.fd,
c7bb4f00
QY
1304 (*thread)->u.val, (*thread)->arg,
1305 (*thread)->u.sands.tv_sec);
abf96a87 1306 else
c7bb4f00
QY
1307 frrtrace(9, frr_libfrr, thread_cancel_async, master, NULL, NULL,
1308 0, NULL, 0, 0, eventobj, 0);
abf96a87 1309
d62a17ae 1310 assert(master->owner != pthread_self());
1311
00dffa8c 1312 frr_with_mutex(&master->mtx) {
d62a17ae 1313 master->canceled = false;
1314
1315 if (thread) {
1316 struct cancel_req *cr =
1317 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1318 cr->threadref = thread;
1319 listnode_add(master->cancel_req, cr);
1320 } else if (eventobj) {
1321 struct cancel_req *cr =
1322 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1323 cr->eventobj = eventobj;
1324 listnode_add(master->cancel_req, cr);
1325 }
1326 AWAKEN(master);
1327
1328 while (!master->canceled)
1329 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1330 }
50478845
MS
1331
1332 if (thread)
1333 *thread = NULL;
718e3744 1334}
63ccb9cb 1335/* ------------------------------------------------------------------------- */
718e3744 1336
27d29ced 1337static struct timeval *thread_timer_wait(struct thread_timer_list_head *timers,
d62a17ae 1338 struct timeval *timer_val)
718e3744 1339{
27d29ced
DL
1340 if (!thread_timer_list_count(timers))
1341 return NULL;
1342
1343 struct thread *next_timer = thread_timer_list_first(timers);
1344 monotime_until(&next_timer->u.sands, timer_val);
1345 return timer_val;
718e3744 1346}
718e3744 1347
d62a17ae 1348static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1349 struct thread *fetch)
718e3744 1350{
d62a17ae 1351 *fetch = *thread;
1352 thread_add_unuse(m, thread);
1353 return fetch;
718e3744 1354}
1355
d62a17ae 1356static int thread_process_io_helper(struct thread_master *m,
45f3d590
DS
1357 struct thread *thread, short state,
1358 short actual_state, int pos)
5d4ccd4e 1359{
d62a17ae 1360 struct thread **thread_array;
1361
45f3d590
DS
1362 /*
1363 * poll() clears the .events field, but the pollfd array we
1364 * pass to poll() is a copy of the one used to schedule threads.
1365 * We need to synchronize state between the two here by applying
1366 * the same changes poll() made on the copy of the "real" pollfd
1367 * array.
1368 *
1369 * This cleans up a possible infinite loop where we refuse
1370 * to respond to a poll event but poll is insistent that
1371 * we should.
1372 */
1373 m->handler.pfds[pos].events &= ~(state);
1374
1375 if (!thread) {
1376 if ((actual_state & (POLLHUP|POLLIN)) != POLLHUP)
1377 flog_err(EC_LIB_NO_THREAD,
1378 "Attempting to process an I/O event but for fd: %d(%d) no thread to handle this!\n",
1379 m->handler.pfds[pos].fd, actual_state);
d62a17ae 1380 return 0;
45f3d590 1381 }
d62a17ae 1382
1383 if (thread->type == THREAD_READ)
1384 thread_array = m->read;
1385 else
1386 thread_array = m->write;
1387
1388 thread_array[thread->u.fd] = NULL;
c284542b 1389 thread_list_add_tail(&m->ready, thread);
d62a17ae 1390 thread->type = THREAD_READY;
45f3d590 1391
d62a17ae 1392 return 1;
5d4ccd4e
DS
1393}
1394
8797240e
QY
1395/**
1396 * Process I/O events.
1397 *
1398 * Walks through file descriptor array looking for those pollfds whose .revents
1399 * field has something interesting. Deletes any invalid file descriptors.
1400 *
1401 * @param m the thread master
1402 * @param num the number of active file descriptors (return value of poll())
1403 */
d62a17ae 1404static void thread_process_io(struct thread_master *m, unsigned int num)
0a95a0d0 1405{
d62a17ae 1406 unsigned int ready = 0;
1407 struct pollfd *pfds = m->handler.copy;
1408
1409 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1410 /* no event for current fd? immediately continue */
1411 if (pfds[i].revents == 0)
1412 continue;
1413
1414 ready++;
1415
d279ef57
DS
1416 /*
1417 * Unless someone has called thread_cancel from another
1418 * pthread, the only thing that could have changed in
1419 * m->handler.pfds while we were asleep is the .events
1420 * field in a given pollfd. Barring thread_cancel() that
1421 * value should be a superset of the values we have in our
1422 * copy, so there's no need to update it. Similarily,
1423 * barring deletion, the fd should still be a valid index
1424 * into the master's pfds.
d142453d
DS
1425 *
1426 * We are including POLLERR here to do a READ event
1427 * this is because the read should fail and the
1428 * read function should handle it appropriately
d279ef57 1429 */
d142453d 1430 if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
d62a17ae 1431 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
45f3d590
DS
1432 pfds[i].revents, i);
1433 }
d62a17ae 1434 if (pfds[i].revents & POLLOUT)
1435 thread_process_io_helper(m, m->write[pfds[i].fd],
45f3d590 1436 POLLOUT, pfds[i].revents, i);
d62a17ae 1437
1438 /* if one of our file descriptors is garbage, remove the same
1439 * from
1440 * both pfds + update sizes and index */
1441 if (pfds[i].revents & POLLNVAL) {
1442 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1443 (m->handler.pfdcount - i - 1)
1444 * sizeof(struct pollfd));
1445 m->handler.pfdcount--;
e985cda0
S
1446 m->handler.pfds[m->handler.pfdcount].fd = 0;
1447 m->handler.pfds[m->handler.pfdcount].events = 0;
d62a17ae 1448
1449 memmove(pfds + i, pfds + i + 1,
1450 (m->handler.copycount - i - 1)
1451 * sizeof(struct pollfd));
1452 m->handler.copycount--;
e985cda0
S
1453 m->handler.copy[m->handler.copycount].fd = 0;
1454 m->handler.copy[m->handler.copycount].events = 0;
d62a17ae 1455
1456 i--;
1457 }
1458 }
718e3744 1459}
1460
8b70d0b0 1461/* Add all timers that have popped to the ready list. */
27d29ced 1462static unsigned int thread_process_timers(struct thread_timer_list_head *timers,
d62a17ae 1463 struct timeval *timenow)
a48b4e6d 1464{
d62a17ae 1465 struct thread *thread;
1466 unsigned int ready = 0;
1467
27d29ced 1468 while ((thread = thread_timer_list_first(timers))) {
d62a17ae 1469 if (timercmp(timenow, &thread->u.sands, <))
1470 return ready;
27d29ced 1471 thread_timer_list_pop(timers);
d62a17ae 1472 thread->type = THREAD_READY;
c284542b 1473 thread_list_add_tail(&thread->master->ready, thread);
d62a17ae 1474 ready++;
1475 }
1476 return ready;
a48b4e6d 1477}
1478
2613abe6 1479/* process a list en masse, e.g. for event thread lists */
c284542b 1480static unsigned int thread_process(struct thread_list_head *list)
2613abe6 1481{
d62a17ae 1482 struct thread *thread;
d62a17ae 1483 unsigned int ready = 0;
1484
c284542b 1485 while ((thread = thread_list_pop(list))) {
d62a17ae 1486 thread->type = THREAD_READY;
c284542b 1487 thread_list_add_tail(&thread->master->ready, thread);
d62a17ae 1488 ready++;
1489 }
1490 return ready;
2613abe6
PJ
1491}
1492
1493
718e3744 1494/* Fetch next ready thread. */
d62a17ae 1495struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
718e3744 1496{
d62a17ae 1497 struct thread *thread = NULL;
1498 struct timeval now;
1499 struct timeval zerotime = {0, 0};
1500 struct timeval tv;
1501 struct timeval *tw = NULL;
d81ca9a3 1502 bool eintr_p = false;
d62a17ae 1503 int num = 0;
1504
1505 do {
1506 /* Handle signals if any */
1507 if (m->handle_signals)
1508 quagga_sigevent_process();
1509
1510 pthread_mutex_lock(&m->mtx);
1511
1512 /* Process any pending cancellation requests */
1513 do_thread_cancel(m);
1514
e3c9529e
QY
1515 /*
1516 * Attempt to flush ready queue before going into poll().
1517 * This is performance-critical. Think twice before modifying.
1518 */
c284542b 1519 if ((thread = thread_list_pop(&m->ready))) {
e3c9529e
QY
1520 fetch = thread_run(m, thread, fetch);
1521 if (fetch->ref)
1522 *fetch->ref = NULL;
1523 pthread_mutex_unlock(&m->mtx);
1524 break;
1525 }
1526
1527 /* otherwise, tick through scheduling sequence */
1528
bca37d17
QY
1529 /*
1530 * Post events to ready queue. This must come before the
1531 * following block since events should occur immediately
1532 */
d62a17ae 1533 thread_process(&m->event);
1534
bca37d17
QY
1535 /*
1536 * If there are no tasks on the ready queue, we will poll()
1537 * until a timer expires or we receive I/O, whichever comes
1538 * first. The strategy for doing this is:
d62a17ae 1539 *
1540 * - If there are events pending, set the poll() timeout to zero
1541 * - If there are no events pending, but there are timers
d279ef57
DS
1542 * pending, set the timeout to the smallest remaining time on
1543 * any timer.
d62a17ae 1544 * - If there are neither timers nor events pending, but there
d279ef57 1545 * are file descriptors pending, block indefinitely in poll()
d62a17ae 1546 * - If nothing is pending, it's time for the application to die
1547 *
1548 * In every case except the last, we need to hit poll() at least
bca37d17
QY
1549 * once per loop to avoid starvation by events
1550 */
c284542b 1551 if (!thread_list_count(&m->ready))
27d29ced 1552 tw = thread_timer_wait(&m->timer, &tv);
d62a17ae 1553
c284542b
DL
1554 if (thread_list_count(&m->ready) ||
1555 (tw && !timercmp(tw, &zerotime, >)))
d62a17ae 1556 tw = &zerotime;
1557
1558 if (!tw && m->handler.pfdcount == 0) { /* die */
1559 pthread_mutex_unlock(&m->mtx);
1560 fetch = NULL;
1561 break;
1562 }
1563
bca37d17
QY
1564 /*
1565 * Copy pollfd array + # active pollfds in it. Not necessary to
1566 * copy the array size as this is fixed.
1567 */
d62a17ae 1568 m->handler.copycount = m->handler.pfdcount;
1569 memcpy(m->handler.copy, m->handler.pfds,
1570 m->handler.copycount * sizeof(struct pollfd));
1571
e3c9529e
QY
1572 pthread_mutex_unlock(&m->mtx);
1573 {
d81ca9a3
MS
1574 eintr_p = false;
1575 num = fd_poll(m, tw, &eintr_p);
e3c9529e
QY
1576 }
1577 pthread_mutex_lock(&m->mtx);
d764d2cc 1578
e3c9529e
QY
1579 /* Handle any errors received in poll() */
1580 if (num < 0) {
d81ca9a3 1581 if (eintr_p) {
d62a17ae 1582 pthread_mutex_unlock(&m->mtx);
e3c9529e
QY
1583 /* loop around to signal handler */
1584 continue;
d62a17ae 1585 }
1586
e3c9529e 1587 /* else die */
450971aa 1588 flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
9ef9495e 1589 safe_strerror(errno));
e3c9529e
QY
1590 pthread_mutex_unlock(&m->mtx);
1591 fetch = NULL;
1592 break;
bca37d17 1593 }
d62a17ae 1594
1595 /* Post timers to ready queue. */
1596 monotime(&now);
27d29ced 1597 thread_process_timers(&m->timer, &now);
d62a17ae 1598
1599 /* Post I/O to ready queue. */
1600 if (num > 0)
1601 thread_process_io(m, num);
1602
d62a17ae 1603 pthread_mutex_unlock(&m->mtx);
1604
1605 } while (!thread && m->spin);
1606
1607 return fetch;
718e3744 1608}
1609
d62a17ae 1610static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
62f44022 1611{
d62a17ae 1612 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1613 + (a.tv_usec - b.tv_usec));
62f44022
QY
1614}
1615
d62a17ae 1616unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1617 unsigned long *cputime)
718e3744 1618{
d62a17ae 1619 /* This is 'user + sys' time. */
1620 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1621 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1622 return timeval_elapsed(now->real, start->real);
8b70d0b0 1623}
1624
50596be0
DS
1625/* We should aim to yield after yield milliseconds, which defaults
1626 to THREAD_YIELD_TIME_SLOT .
8b70d0b0 1627 Note: we are using real (wall clock) time for this calculation.
1628 It could be argued that CPU time may make more sense in certain
1629 contexts. The things to consider are whether the thread may have
1630 blocked (in which case wall time increases, but CPU time does not),
1631 or whether the system is heavily loaded with other processes competing
d62a17ae 1632 for CPU time. On balance, wall clock time seems to make sense.
8b70d0b0 1633 Plus it has the added benefit that gettimeofday should be faster
1634 than calling getrusage. */
d62a17ae 1635int thread_should_yield(struct thread *thread)
718e3744 1636{
d62a17ae 1637 int result;
00dffa8c 1638 frr_with_mutex(&thread->mtx) {
d62a17ae 1639 result = monotime_since(&thread->real, NULL)
1640 > (int64_t)thread->yield;
1641 }
d62a17ae 1642 return result;
50596be0
DS
1643}
1644
d62a17ae 1645void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
50596be0 1646{
00dffa8c 1647 frr_with_mutex(&thread->mtx) {
d62a17ae 1648 thread->yield = yield_time;
1649 }
718e3744 1650}
1651
d62a17ae 1652void thread_getrusage(RUSAGE_T *r)
db9c0df9 1653{
231db9a6
DS
1654#if defined RUSAGE_THREAD
1655#define FRR_RUSAGE RUSAGE_THREAD
1656#else
1657#define FRR_RUSAGE RUSAGE_SELF
1658#endif
d62a17ae 1659 monotime(&r->real);
f75e802d 1660#ifndef EXCLUDE_CPU_TIME
231db9a6 1661 getrusage(FRR_RUSAGE, &(r->cpu));
f75e802d 1662#endif
db9c0df9
PJ
1663}
1664
fbcac826
QY
1665/*
1666 * Call a thread.
1667 *
1668 * This function will atomically update the thread's usage history. At present
1669 * this is the only spot where usage history is written. Nevertheless the code
1670 * has been written such that the introduction of writers in the future should
1671 * not need to update it provided the writers atomically perform only the
1672 * operations done here, i.e. updating the total and maximum times. In
1673 * particular, the maximum real and cpu times must be monotonically increasing
1674 * or this code is not correct.
1675 */
d62a17ae 1676void thread_call(struct thread *thread)
718e3744 1677{
f75e802d 1678#ifndef EXCLUDE_CPU_TIME
fbcac826
QY
1679 _Atomic unsigned long realtime, cputime;
1680 unsigned long exp;
1681 unsigned long helper;
f75e802d 1682#endif
d62a17ae 1683 RUSAGE_T before, after;
cc8b13a0 1684
d62a17ae 1685 GETRUSAGE(&before);
1686 thread->real = before.real;
718e3744 1687
6c3aa850
DL
1688 frrtrace(9, frr_libfrr, thread_call, thread->master,
1689 thread->xref->funcname, thread->xref->xref.file,
1690 thread->xref->xref.line, NULL, thread->u.fd,
c7bb4f00 1691 thread->u.val, thread->arg, thread->u.sands.tv_sec);
abf96a87 1692
d62a17ae 1693 pthread_setspecific(thread_current, thread);
1694 (*thread->func)(thread);
1695 pthread_setspecific(thread_current, NULL);
718e3744 1696
d62a17ae 1697 GETRUSAGE(&after);
718e3744 1698
f75e802d 1699#ifndef EXCLUDE_CPU_TIME
fbcac826
QY
1700 realtime = thread_consumed_time(&after, &before, &helper);
1701 cputime = helper;
1702
1703 /* update realtime */
1704 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1705 memory_order_seq_cst);
1706 exp = atomic_load_explicit(&thread->hist->real.max,
1707 memory_order_seq_cst);
1708 while (exp < realtime
1709 && !atomic_compare_exchange_weak_explicit(
1710 &thread->hist->real.max, &exp, realtime,
1711 memory_order_seq_cst, memory_order_seq_cst))
1712 ;
1713
1714 /* update cputime */
1715 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1716 memory_order_seq_cst);
1717 exp = atomic_load_explicit(&thread->hist->cpu.max,
1718 memory_order_seq_cst);
1719 while (exp < cputime
1720 && !atomic_compare_exchange_weak_explicit(
1721 &thread->hist->cpu.max, &exp, cputime,
1722 memory_order_seq_cst, memory_order_seq_cst))
1723 ;
1724
1725 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1726 memory_order_seq_cst);
1727 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1728 memory_order_seq_cst);
718e3744 1729
924b9229 1730#ifdef CONSUMED_TIME_CHECK
d62a17ae 1731 if (realtime > CONSUMED_TIME_CHECK) {
1732 /*
1733 * We have a CPU Hog on our hands.
1734 * Whinge about it now, so we're aware this is yet another task
1735 * to fix.
1736 */
9ef9495e 1737 flog_warn(
450971aa 1738 EC_LIB_SLOW_THREAD,
d62a17ae 1739 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
60a3efec 1740 thread->xref->funcname, (unsigned long)thread->func,
d62a17ae 1741 realtime / 1000, cputime / 1000);
1742 }
924b9229 1743#endif /* CONSUMED_TIME_CHECK */
f75e802d 1744#endif /* Exclude CPU Time */
718e3744 1745}
1746
1747/* Execute thread */
60a3efec
DL
1748void _thread_execute(const struct xref_threadsched *xref,
1749 struct thread_master *m, int (*func)(struct thread *),
1750 void *arg, int val)
718e3744 1751{
c4345fbf 1752 struct thread *thread;
718e3744 1753
c4345fbf 1754 /* Get or allocate new thread to execute. */
00dffa8c 1755 frr_with_mutex(&m->mtx) {
60a3efec 1756 thread = thread_get(m, THREAD_EVENT, func, arg, xref);
9c7753e4 1757
c4345fbf 1758 /* Set its event value. */
00dffa8c 1759 frr_with_mutex(&thread->mtx) {
c4345fbf
RZ
1760 thread->add_type = THREAD_EXECUTE;
1761 thread->u.val = val;
1762 thread->ref = &thread;
1763 }
c4345fbf 1764 }
f7c62e11 1765
c4345fbf
RZ
1766 /* Execute thread doing all accounting. */
1767 thread_call(thread);
9c7753e4 1768
c4345fbf
RZ
1769 /* Give back or free thread. */
1770 thread_add_unuse(m, thread);
718e3744 1771}
1543c387
MS
1772
1773/* Debug signal mask - if 'sigs' is NULL, use current effective mask. */
1774void debug_signals(const sigset_t *sigs)
1775{
1776 int i, found;
1777 sigset_t tmpsigs;
1778 char buf[300];
1779
1780 /*
1781 * We're only looking at the non-realtime signals here, so we need
1782 * some limit value. Platform differences mean at some point we just
1783 * need to pick a reasonable value.
1784 */
1785#if defined SIGRTMIN
1786# define LAST_SIGNAL SIGRTMIN
1787#else
1788# define LAST_SIGNAL 32
1789#endif
1790
1791
1792 if (sigs == NULL) {
1793 sigemptyset(&tmpsigs);
1794 pthread_sigmask(SIG_BLOCK, NULL, &tmpsigs);
1795 sigs = &tmpsigs;
1796 }
1797
1798 found = 0;
1799 buf[0] = '\0';
1800
1801 for (i = 0; i < LAST_SIGNAL; i++) {
1802 char tmp[20];
1803
1804 if (sigismember(sigs, i) > 0) {
1805 if (found > 0)
1806 strlcat(buf, ",", sizeof(buf));
1807 snprintf(tmp, sizeof(tmp), "%d", i);
1808 strlcat(buf, tmp, sizeof(buf));
1809 found++;
1810 }
1811 }
1812
1813 if (found == 0)
1814 snprintf(buf, sizeof(buf), "<none>");
1815
1816 zlog_debug("%s: %s", __func__, buf);
1817}