]> git.proxmox.com Git - mirror_frr.git/blame - lib/thread.c
lib: don't awaken from poll for every timer
[mirror_frr.git] / lib / thread.c
CommitLineData
718e3744 1/* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
896014f4
DL
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
718e3744 19 */
20
21/* #define DEBUG */
22
23#include <zebra.h>
308d14ae 24#include <sys/resource.h>
718e3744 25
26#include "thread.h"
27#include "memory.h"
3e41733f 28#include "frrcu.h"
718e3744 29#include "log.h"
e04ab74d 30#include "hash.h"
31#include "command.h"
05c447dd 32#include "sigevent.h"
3bf2673b 33#include "network.h"
bd74dc61 34#include "jhash.h"
fbcac826 35#include "frratomic.h"
00dffa8c 36#include "frr_pthread.h"
9ef9495e 37#include "lib_errors.h"
912d45a1 38#include "libfrr_trace.h"
1a9f340b 39#include "libfrr.h"
d6be5fb9 40
d62a17ae 41DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
4a1ab8e4 42DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
a6f235f3 43DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
d62a17ae 44DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
4a1ab8e4 45
c284542b
DL
46DECLARE_LIST(thread_list, struct thread, threaditem)
47
27d29ced
DL
48static int thread_timer_cmp(const struct thread *a, const struct thread *b)
49{
50 if (a->u.sands.tv_sec < b->u.sands.tv_sec)
51 return -1;
52 if (a->u.sands.tv_sec > b->u.sands.tv_sec)
53 return 1;
54 if (a->u.sands.tv_usec < b->u.sands.tv_usec)
55 return -1;
56 if (a->u.sands.tv_usec > b->u.sands.tv_usec)
57 return 1;
58 return 0;
59}
60
a5824290 61DECLARE_HEAP(thread_timer_list, struct thread, timeritem, thread_timer_cmp)
27d29ced 62
3b96b781
HT
63#if defined(__APPLE__)
64#include <mach/mach.h>
65#include <mach/mach_time.h>
66#endif
67
d62a17ae 68#define AWAKEN(m) \
69 do { \
2b64873d 70 const unsigned char wakebyte = 0x01; \
d62a17ae 71 write(m->io_pipe[1], &wakebyte, 1); \
72 } while (0);
3bf2673b 73
62f44022 74/* control variable for initializer */
c17faa4b 75static pthread_once_t init_once = PTHREAD_ONCE_INIT;
e0bebc7c 76pthread_key_t thread_current;
6b0655a2 77
c17faa4b 78static pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
62f44022
QY
79static struct list *masters;
80
6655966d 81static void thread_free(struct thread_master *master, struct thread *thread);
6b0655a2 82
62f44022 83/* CLI start ---------------------------------------------------------------- */
d8b87afe 84static unsigned int cpu_record_hash_key(const struct cpu_thread_history *a)
e04ab74d 85{
883cc51d 86 int size = sizeof(a->func);
bd74dc61
DS
87
88 return jhash(&a->func, size, 0);
e04ab74d 89}
90
74df8d6d 91static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
d62a17ae 92 const struct cpu_thread_history *b)
e04ab74d 93{
d62a17ae 94 return a->func == b->func;
e04ab74d 95}
96
d62a17ae 97static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
e04ab74d 98{
d62a17ae 99 struct cpu_thread_history *new;
100 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
101 new->func = a->func;
102 new->funcname = a->funcname;
103 return new;
e04ab74d 104}
105
d62a17ae 106static void cpu_record_hash_free(void *a)
228da428 107{
d62a17ae 108 struct cpu_thread_history *hist = a;
109
110 XFREE(MTYPE_THREAD_STATS, hist);
228da428
CC
111}
112
f75e802d 113#ifndef EXCLUDE_CPU_TIME
d62a17ae 114static void vty_out_cpu_thread_history(struct vty *vty,
115 struct cpu_thread_history *a)
e04ab74d 116{
9a8a7b0e 117 vty_out(vty, "%5zu %10zu.%03zu %9zu %8zu %9zu %8zu %9zu",
72327cf3
MS
118 a->total_active, a->cpu.total / 1000, a->cpu.total % 1000,
119 a->total_calls, (a->cpu.total / a->total_calls), a->cpu.max,
120 (a->real.total / a->total_calls), a->real.max);
84d951d0 121 vty_out(vty, " %c%c%c%c%c %s\n",
d62a17ae 122 a->types & (1 << THREAD_READ) ? 'R' : ' ',
123 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
124 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
125 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
126 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
e04ab74d 127}
128
e3b78da8 129static void cpu_record_hash_print(struct hash_bucket *bucket, void *args[])
e04ab74d 130{
d62a17ae 131 struct cpu_thread_history *totals = args[0];
fbcac826 132 struct cpu_thread_history copy;
d62a17ae 133 struct vty *vty = args[1];
fbcac826 134 uint8_t *filter = args[2];
d62a17ae 135
136 struct cpu_thread_history *a = bucket->data;
137
fbcac826
QY
138 copy.total_active =
139 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
140 copy.total_calls =
141 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
142 copy.cpu.total =
143 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
144 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
145 copy.real.total =
146 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
147 copy.real.max =
148 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
149 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
150 copy.funcname = a->funcname;
151
152 if (!(copy.types & *filter))
d62a17ae 153 return;
fbcac826
QY
154
155 vty_out_cpu_thread_history(vty, &copy);
156 totals->total_active += copy.total_active;
157 totals->total_calls += copy.total_calls;
158 totals->real.total += copy.real.total;
159 if (totals->real.max < copy.real.max)
160 totals->real.max = copy.real.max;
161 totals->cpu.total += copy.cpu.total;
162 if (totals->cpu.max < copy.cpu.max)
163 totals->cpu.max = copy.cpu.max;
e04ab74d 164}
165
fbcac826 166static void cpu_record_print(struct vty *vty, uint8_t filter)
e04ab74d 167{
d62a17ae 168 struct cpu_thread_history tmp;
169 void *args[3] = {&tmp, vty, &filter};
170 struct thread_master *m;
171 struct listnode *ln;
172
0d6f7fd6 173 memset(&tmp, 0, sizeof(tmp));
d62a17ae 174 tmp.funcname = "TOTAL";
175 tmp.types = filter;
176
00dffa8c 177 frr_with_mutex(&masters_mtx) {
d62a17ae 178 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
179 const char *name = m->name ? m->name : "main";
180
181 char underline[strlen(name) + 1];
182 memset(underline, '-', sizeof(underline));
4f113d60 183 underline[sizeof(underline) - 1] = '\0';
d62a17ae 184
185 vty_out(vty, "\n");
186 vty_out(vty, "Showing statistics for pthread %s\n",
187 name);
188 vty_out(vty, "-------------------------------%s\n",
189 underline);
84d951d0 190 vty_out(vty, "%30s %18s %18s\n", "",
d62a17ae 191 "CPU (user+system):", "Real (wall-clock):");
192 vty_out(vty,
193 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
194 vty_out(vty, " Avg uSec Max uSecs");
195 vty_out(vty, " Type Thread\n");
196
197 if (m->cpu_record->count)
198 hash_iterate(
199 m->cpu_record,
e3b78da8 200 (void (*)(struct hash_bucket *,
d62a17ae 201 void *))cpu_record_hash_print,
202 args);
203 else
204 vty_out(vty, "No data to display yet.\n");
205
206 vty_out(vty, "\n");
207 }
208 }
d62a17ae 209
210 vty_out(vty, "\n");
211 vty_out(vty, "Total thread statistics\n");
212 vty_out(vty, "-------------------------\n");
84d951d0 213 vty_out(vty, "%30s %18s %18s\n", "",
d62a17ae 214 "CPU (user+system):", "Real (wall-clock):");
215 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
216 vty_out(vty, " Avg uSec Max uSecs");
217 vty_out(vty, " Type Thread\n");
218
219 if (tmp.total_calls > 0)
220 vty_out_cpu_thread_history(vty, &tmp);
e04ab74d 221}
f75e802d 222#endif
e04ab74d 223
e3b78da8 224static void cpu_record_hash_clear(struct hash_bucket *bucket, void *args[])
e276eb82 225{
fbcac826 226 uint8_t *filter = args[0];
d62a17ae 227 struct hash *cpu_record = args[1];
228
229 struct cpu_thread_history *a = bucket->data;
62f44022 230
d62a17ae 231 if (!(a->types & *filter))
232 return;
f48f65d2 233
d62a17ae 234 hash_release(cpu_record, bucket->data);
e276eb82
PJ
235}
236
fbcac826 237static void cpu_record_clear(uint8_t filter)
e276eb82 238{
fbcac826 239 uint8_t *tmp = &filter;
d62a17ae 240 struct thread_master *m;
241 struct listnode *ln;
242
00dffa8c 243 frr_with_mutex(&masters_mtx) {
d62a17ae 244 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
00dffa8c 245 frr_with_mutex(&m->mtx) {
d62a17ae 246 void *args[2] = {tmp, m->cpu_record};
247 hash_iterate(
248 m->cpu_record,
e3b78da8 249 (void (*)(struct hash_bucket *,
d62a17ae 250 void *))cpu_record_hash_clear,
251 args);
252 }
d62a17ae 253 }
254 }
62f44022
QY
255}
256
fbcac826 257static uint8_t parse_filter(const char *filterstr)
62f44022 258{
d62a17ae 259 int i = 0;
260 int filter = 0;
261
262 while (filterstr[i] != '\0') {
263 switch (filterstr[i]) {
264 case 'r':
265 case 'R':
266 filter |= (1 << THREAD_READ);
267 break;
268 case 'w':
269 case 'W':
270 filter |= (1 << THREAD_WRITE);
271 break;
272 case 't':
273 case 'T':
274 filter |= (1 << THREAD_TIMER);
275 break;
276 case 'e':
277 case 'E':
278 filter |= (1 << THREAD_EVENT);
279 break;
280 case 'x':
281 case 'X':
282 filter |= (1 << THREAD_EXECUTE);
283 break;
284 default:
285 break;
286 }
287 ++i;
288 }
289 return filter;
62f44022
QY
290}
291
f75e802d 292#ifndef EXCLUDE_CPU_TIME
62f44022
QY
293DEFUN (show_thread_cpu,
294 show_thread_cpu_cmd,
295 "show thread cpu [FILTER]",
296 SHOW_STR
297 "Thread information\n"
298 "Thread CPU usage\n"
61fa0b97 299 "Display filter (rwtex)\n")
62f44022 300{
fbcac826 301 uint8_t filter = (uint8_t)-1U;
d62a17ae 302 int idx = 0;
303
304 if (argv_find(argv, argc, "FILTER", &idx)) {
305 filter = parse_filter(argv[idx]->arg);
306 if (!filter) {
307 vty_out(vty,
3efd0893 308 "Invalid filter \"%s\" specified; must contain at leastone of 'RWTEXB'\n",
d62a17ae 309 argv[idx]->arg);
310 return CMD_WARNING;
311 }
312 }
313
314 cpu_record_print(vty, filter);
315 return CMD_SUCCESS;
e276eb82 316}
f75e802d 317#endif
e276eb82 318
8872626b
DS
319static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
320{
321 const char *name = m->name ? m->name : "main";
322 char underline[strlen(name) + 1];
a0b36ae6 323 struct thread *thread;
8872626b
DS
324 uint32_t i;
325
326 memset(underline, '-', sizeof(underline));
327 underline[sizeof(underline) - 1] = '\0';
328
329 vty_out(vty, "\nShowing poll FD's for %s\n", name);
330 vty_out(vty, "----------------------%s\n", underline);
6c19478a
DS
331 vty_out(vty, "Count: %u/%d\n", (uint32_t)m->handler.pfdcount,
332 m->fd_limit);
a0b36ae6
DS
333 for (i = 0; i < m->handler.pfdcount; i++) {
334 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\t\t", i,
335 m->handler.pfds[i].fd, m->handler.pfds[i].events,
8872626b 336 m->handler.pfds[i].revents);
a0b36ae6
DS
337
338 if (m->handler.pfds[i].events & POLLIN) {
339 thread = m->read[m->handler.pfds[i].fd];
340
341 if (!thread)
342 vty_out(vty, "ERROR ");
343 else
60a3efec 344 vty_out(vty, "%s ", thread->xref->funcname);
a0b36ae6
DS
345 } else
346 vty_out(vty, " ");
347
348 if (m->handler.pfds[i].events & POLLOUT) {
349 thread = m->write[m->handler.pfds[i].fd];
350
351 if (!thread)
352 vty_out(vty, "ERROR\n");
353 else
60a3efec 354 vty_out(vty, "%s\n", thread->xref->funcname);
a0b36ae6
DS
355 } else
356 vty_out(vty, "\n");
357 }
8872626b
DS
358}
359
360DEFUN (show_thread_poll,
361 show_thread_poll_cmd,
362 "show thread poll",
363 SHOW_STR
364 "Thread information\n"
365 "Show poll FD's and information\n")
366{
367 struct listnode *node;
368 struct thread_master *m;
369
00dffa8c 370 frr_with_mutex(&masters_mtx) {
8872626b
DS
371 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
372 show_thread_poll_helper(vty, m);
373 }
374 }
8872626b
DS
375
376 return CMD_SUCCESS;
377}
378
379
49d41a26
DS
380DEFUN (clear_thread_cpu,
381 clear_thread_cpu_cmd,
382 "clear thread cpu [FILTER]",
62f44022 383 "Clear stored data in all pthreads\n"
49d41a26
DS
384 "Thread information\n"
385 "Thread CPU usage\n"
386 "Display filter (rwtexb)\n")
e276eb82 387{
fbcac826 388 uint8_t filter = (uint8_t)-1U;
d62a17ae 389 int idx = 0;
390
391 if (argv_find(argv, argc, "FILTER", &idx)) {
392 filter = parse_filter(argv[idx]->arg);
393 if (!filter) {
394 vty_out(vty,
3efd0893 395 "Invalid filter \"%s\" specified; must contain at leastone of 'RWTEXB'\n",
d62a17ae 396 argv[idx]->arg);
397 return CMD_WARNING;
398 }
399 }
400
401 cpu_record_clear(filter);
402 return CMD_SUCCESS;
e276eb82 403}
6b0655a2 404
d62a17ae 405void thread_cmd_init(void)
0b84f294 406{
f75e802d 407#ifndef EXCLUDE_CPU_TIME
d62a17ae 408 install_element(VIEW_NODE, &show_thread_cpu_cmd);
f75e802d 409#endif
8872626b 410 install_element(VIEW_NODE, &show_thread_poll_cmd);
d62a17ae 411 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
0b84f294 412}
62f44022
QY
413/* CLI end ------------------------------------------------------------------ */
414
0b84f294 415
d62a17ae 416static void cancelreq_del(void *cr)
63ccb9cb 417{
d62a17ae 418 XFREE(MTYPE_TMP, cr);
63ccb9cb
QY
419}
420
e0bebc7c 421/* initializer, only ever called once */
4d762f26 422static void initializer(void)
e0bebc7c 423{
d62a17ae 424 pthread_key_create(&thread_current, NULL);
e0bebc7c
QY
425}
426
d62a17ae 427struct thread_master *thread_master_create(const char *name)
718e3744 428{
d62a17ae 429 struct thread_master *rv;
430 struct rlimit limit;
431
432 pthread_once(&init_once, &initializer);
433
434 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
d62a17ae 435
436 /* Initialize master mutex */
437 pthread_mutex_init(&rv->mtx, NULL);
438 pthread_cond_init(&rv->cancel_cond, NULL);
439
440 /* Set name */
7ffcd8bd
QY
441 name = name ? name : "default";
442 rv->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
d62a17ae 443
444 /* Initialize I/O task data structures */
1a9f340b
MS
445
446 /* Use configured limit if present, ulimit otherwise. */
447 rv->fd_limit = frr_get_fd_limit();
448 if (rv->fd_limit == 0) {
449 getrlimit(RLIMIT_NOFILE, &limit);
450 rv->fd_limit = (int)limit.rlim_cur;
451 }
452
a6f235f3
DS
453 rv->read = XCALLOC(MTYPE_THREAD_POLL,
454 sizeof(struct thread *) * rv->fd_limit);
455
456 rv->write = XCALLOC(MTYPE_THREAD_POLL,
457 sizeof(struct thread *) * rv->fd_limit);
d62a17ae 458
7ffcd8bd
QY
459 char tmhashname[strlen(name) + 32];
460 snprintf(tmhashname, sizeof(tmhashname), "%s - threadmaster event hash",
461 name);
bd74dc61 462 rv->cpu_record = hash_create_size(
d8b87afe 463 8, (unsigned int (*)(const void *))cpu_record_hash_key,
74df8d6d 464 (bool (*)(const void *, const void *))cpu_record_hash_cmp,
7ffcd8bd 465 tmhashname);
d62a17ae 466
c284542b
DL
467 thread_list_init(&rv->event);
468 thread_list_init(&rv->ready);
469 thread_list_init(&rv->unuse);
27d29ced 470 thread_timer_list_init(&rv->timer);
d62a17ae 471
472 /* Initialize thread_fetch() settings */
473 rv->spin = true;
474 rv->handle_signals = true;
475
476 /* Set pthread owner, should be updated by actual owner */
477 rv->owner = pthread_self();
478 rv->cancel_req = list_new();
479 rv->cancel_req->del = cancelreq_del;
480 rv->canceled = true;
481
482 /* Initialize pipe poker */
483 pipe(rv->io_pipe);
484 set_nonblocking(rv->io_pipe[0]);
485 set_nonblocking(rv->io_pipe[1]);
486
487 /* Initialize data structures for poll() */
488 rv->handler.pfdsize = rv->fd_limit;
489 rv->handler.pfdcount = 0;
490 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
491 sizeof(struct pollfd) * rv->handler.pfdsize);
492 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
493 sizeof(struct pollfd) * rv->handler.pfdsize);
494
eff09c66 495 /* add to list of threadmasters */
00dffa8c 496 frr_with_mutex(&masters_mtx) {
eff09c66
QY
497 if (!masters)
498 masters = list_new();
499
d62a17ae 500 listnode_add(masters, rv);
501 }
d62a17ae 502
503 return rv;
718e3744 504}
505
d8a8a8de
QY
506void thread_master_set_name(struct thread_master *master, const char *name)
507{
00dffa8c 508 frr_with_mutex(&master->mtx) {
0a22ddfb 509 XFREE(MTYPE_THREAD_MASTER, master->name);
d8a8a8de
QY
510 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
511 }
d8a8a8de
QY
512}
513
6ed04aa2
DS
514#define THREAD_UNUSED_DEPTH 10
515
718e3744 516/* Move thread to unuse list. */
d62a17ae 517static void thread_add_unuse(struct thread_master *m, struct thread *thread)
718e3744 518{
6655966d
RZ
519 pthread_mutex_t mtxc = thread->mtx;
520
d62a17ae 521 assert(m != NULL && thread != NULL);
d62a17ae 522
d62a17ae 523 thread->hist->total_active--;
6ed04aa2
DS
524 memset(thread, 0, sizeof(struct thread));
525 thread->type = THREAD_UNUSED;
526
6655966d
RZ
527 /* Restore the thread mutex context. */
528 thread->mtx = mtxc;
529
c284542b
DL
530 if (thread_list_count(&m->unuse) < THREAD_UNUSED_DEPTH) {
531 thread_list_add_tail(&m->unuse, thread);
6655966d
RZ
532 return;
533 }
534
535 thread_free(m, thread);
718e3744 536}
537
538/* Free all unused thread. */
c284542b
DL
539static void thread_list_free(struct thread_master *m,
540 struct thread_list_head *list)
718e3744 541{
d62a17ae 542 struct thread *t;
d62a17ae 543
c284542b 544 while ((t = thread_list_pop(list)))
6655966d 545 thread_free(m, t);
718e3744 546}
547
d62a17ae 548static void thread_array_free(struct thread_master *m,
549 struct thread **thread_array)
308d14ae 550{
d62a17ae 551 struct thread *t;
552 int index;
553
554 for (index = 0; index < m->fd_limit; ++index) {
555 t = thread_array[index];
556 if (t) {
557 thread_array[index] = NULL;
6655966d 558 thread_free(m, t);
d62a17ae 559 }
560 }
a6f235f3 561 XFREE(MTYPE_THREAD_POLL, thread_array);
308d14ae
DV
562}
563
495f0b13
DS
564/*
565 * thread_master_free_unused
566 *
567 * As threads are finished with they are put on the
568 * unuse list for later reuse.
569 * If we are shutting down, Free up unused threads
570 * So we can see if we forget to shut anything off
571 */
d62a17ae 572void thread_master_free_unused(struct thread_master *m)
495f0b13 573{
00dffa8c 574 frr_with_mutex(&m->mtx) {
d62a17ae 575 struct thread *t;
c284542b 576 while ((t = thread_list_pop(&m->unuse)))
6655966d 577 thread_free(m, t);
d62a17ae 578 }
495f0b13
DS
579}
580
718e3744 581/* Stop thread scheduler. */
d62a17ae 582void thread_master_free(struct thread_master *m)
718e3744 583{
27d29ced
DL
584 struct thread *t;
585
00dffa8c 586 frr_with_mutex(&masters_mtx) {
d62a17ae 587 listnode_delete(masters, m);
eff09c66 588 if (masters->count == 0) {
6a154c88 589 list_delete(&masters);
eff09c66 590 }
d62a17ae 591 }
d62a17ae 592
593 thread_array_free(m, m->read);
594 thread_array_free(m, m->write);
27d29ced
DL
595 while ((t = thread_timer_list_pop(&m->timer)))
596 thread_free(m, t);
d62a17ae 597 thread_list_free(m, &m->event);
598 thread_list_free(m, &m->ready);
599 thread_list_free(m, &m->unuse);
600 pthread_mutex_destroy(&m->mtx);
33844bbe 601 pthread_cond_destroy(&m->cancel_cond);
d62a17ae 602 close(m->io_pipe[0]);
603 close(m->io_pipe[1]);
6a154c88 604 list_delete(&m->cancel_req);
1a0a92ea 605 m->cancel_req = NULL;
d62a17ae 606
607 hash_clean(m->cpu_record, cpu_record_hash_free);
608 hash_free(m->cpu_record);
609 m->cpu_record = NULL;
610
0a22ddfb 611 XFREE(MTYPE_THREAD_MASTER, m->name);
d62a17ae 612 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
613 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
614 XFREE(MTYPE_THREAD_MASTER, m);
718e3744 615}
616
78ca0342
CF
617/* Return remain time in miliseconds. */
618unsigned long thread_timer_remain_msec(struct thread *thread)
718e3744 619{
d62a17ae 620 int64_t remain;
1189d95f 621
00dffa8c 622 frr_with_mutex(&thread->mtx) {
78ca0342 623 remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
d62a17ae 624 }
1189d95f 625
d62a17ae 626 return remain < 0 ? 0 : remain;
718e3744 627}
628
78ca0342
CF
629/* Return remain time in seconds. */
630unsigned long thread_timer_remain_second(struct thread *thread)
631{
632 return thread_timer_remain_msec(thread) / 1000LL;
633}
634
d62a17ae 635struct timeval thread_timer_remain(struct thread *thread)
6ac44687 636{
d62a17ae 637 struct timeval remain;
00dffa8c 638 frr_with_mutex(&thread->mtx) {
d62a17ae 639 monotime_until(&thread->u.sands, &remain);
640 }
d62a17ae 641 return remain;
6ac44687
CF
642}
643
0447957e
AK
644static int time_hhmmss(char *buf, int buf_size, long sec)
645{
646 long hh;
647 long mm;
648 int wr;
649
650 zassert(buf_size >= 8);
651
652 hh = sec / 3600;
653 sec %= 3600;
654 mm = sec / 60;
655 sec %= 60;
656
657 wr = snprintf(buf, buf_size, "%02ld:%02ld:%02ld", hh, mm, sec);
658
659 return wr != 8;
660}
661
662char *thread_timer_to_hhmmss(char *buf, int buf_size,
663 struct thread *t_timer)
664{
665 if (t_timer) {
666 time_hhmmss(buf, buf_size,
667 thread_timer_remain_second(t_timer));
668 } else {
669 snprintf(buf, buf_size, "--:--:--");
670 }
671 return buf;
672}
673
718e3744 674/* Get new thread. */
d7c0a89a 675static struct thread *thread_get(struct thread_master *m, uint8_t type,
d62a17ae 676 int (*func)(struct thread *), void *arg,
60a3efec 677 const struct xref_threadsched *xref)
718e3744 678{
c284542b 679 struct thread *thread = thread_list_pop(&m->unuse);
d62a17ae 680 struct cpu_thread_history tmp;
681
682 if (!thread) {
683 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
684 /* mutex only needs to be initialized at struct creation. */
685 pthread_mutex_init(&thread->mtx, NULL);
686 m->alloc++;
687 }
688
689 thread->type = type;
690 thread->add_type = type;
691 thread->master = m;
692 thread->arg = arg;
d62a17ae 693 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
694 thread->ref = NULL;
695
696 /*
697 * So if the passed in funcname is not what we have
698 * stored that means the thread->hist needs to be
699 * updated. We keep the last one around in unused
700 * under the assumption that we are probably
701 * going to immediately allocate the same
702 * type of thread.
703 * This hopefully saves us some serious
704 * hash_get lookups.
705 */
60a3efec
DL
706 if ((thread->xref && thread->xref->funcname != xref->funcname)
707 || thread->func != func) {
d62a17ae 708 tmp.func = func;
60a3efec 709 tmp.funcname = xref->funcname;
d62a17ae 710 thread->hist =
711 hash_get(m->cpu_record, &tmp,
712 (void *(*)(void *))cpu_record_hash_alloc);
713 }
714 thread->hist->total_active++;
715 thread->func = func;
60a3efec 716 thread->xref = xref;
d62a17ae 717
718 return thread;
718e3744 719}
720
6655966d
RZ
721static void thread_free(struct thread_master *master, struct thread *thread)
722{
723 /* Update statistics. */
724 assert(master->alloc > 0);
725 master->alloc--;
726
727 /* Free allocated resources. */
728 pthread_mutex_destroy(&thread->mtx);
729 XFREE(MTYPE_THREAD, thread);
730}
731
d81ca9a3
MS
732static int fd_poll(struct thread_master *m, const struct timeval *timer_wait,
733 bool *eintr_p)
209a72a6 734{
d81ca9a3
MS
735 sigset_t origsigs;
736 unsigned char trash[64];
737 nfds_t count = m->handler.copycount;
738
d279ef57
DS
739 /*
740 * If timer_wait is null here, that means poll() should block
741 * indefinitely, unless the thread_master has overridden it by setting
d62a17ae 742 * ->selectpoll_timeout.
d279ef57 743 *
d62a17ae 744 * If the value is positive, it specifies the maximum number of
d279ef57
DS
745 * milliseconds to wait. If the timeout is -1, it specifies that
746 * we should never wait and always return immediately even if no
747 * event is detected. If the value is zero, the behavior is default.
748 */
d62a17ae 749 int timeout = -1;
750
751 /* number of file descriptors with events */
752 int num;
753
754 if (timer_wait != NULL
755 && m->selectpoll_timeout == 0) // use the default value
756 timeout = (timer_wait->tv_sec * 1000)
757 + (timer_wait->tv_usec / 1000);
758 else if (m->selectpoll_timeout > 0) // use the user's timeout
759 timeout = m->selectpoll_timeout;
760 else if (m->selectpoll_timeout
761 < 0) // effect a poll (return immediately)
762 timeout = 0;
763
0bdeb5e5 764 zlog_tls_buffer_flush();
3e41733f
DL
765 rcu_read_unlock();
766 rcu_assert_read_unlocked();
767
d62a17ae 768 /* add poll pipe poker */
d81ca9a3
MS
769 assert(count + 1 < m->handler.pfdsize);
770 m->handler.copy[count].fd = m->io_pipe[0];
771 m->handler.copy[count].events = POLLIN;
772 m->handler.copy[count].revents = 0x00;
773
774 /* We need to deal with a signal-handling race here: we
775 * don't want to miss a crucial signal, such as SIGTERM or SIGINT,
776 * that may arrive just before we enter poll(). We will block the
777 * key signals, then check whether any have arrived - if so, we return
778 * before calling poll(). If not, we'll re-enable the signals
779 * in the ppoll() call.
780 */
781
782 sigemptyset(&origsigs);
783 if (m->handle_signals) {
784 /* Main pthread that handles the app signals */
785 if (frr_sigevent_check(&origsigs)) {
786 /* Signal to process - restore signal mask and return */
787 pthread_sigmask(SIG_SETMASK, &origsigs, NULL);
788 num = -1;
789 *eintr_p = true;
790 goto done;
791 }
792 } else {
793 /* Don't make any changes for the non-main pthreads */
794 pthread_sigmask(SIG_SETMASK, NULL, &origsigs);
795 }
d62a17ae 796
d81ca9a3
MS
797#if defined(HAVE_PPOLL)
798 struct timespec ts, *tsp;
799
800 if (timeout >= 0) {
801 ts.tv_sec = timeout / 1000;
802 ts.tv_nsec = (timeout % 1000) * 1000000;
803 tsp = &ts;
804 } else
805 tsp = NULL;
806
807 num = ppoll(m->handler.copy, count + 1, tsp, &origsigs);
808 pthread_sigmask(SIG_SETMASK, &origsigs, NULL);
809#else
810 /* Not ideal - there is a race after we restore the signal mask */
811 pthread_sigmask(SIG_SETMASK, &origsigs, NULL);
812 num = poll(m->handler.copy, count + 1, timeout);
813#endif
d62a17ae 814
d81ca9a3
MS
815done:
816
817 if (num < 0 && errno == EINTR)
818 *eintr_p = true;
819
820 if (num > 0 && m->handler.copy[count].revents != 0 && num--)
d62a17ae 821 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
822 ;
823
3e41733f
DL
824 rcu_read_lock();
825
d62a17ae 826 return num;
209a72a6
DS
827}
828
718e3744 829/* Add new read thread. */
60a3efec
DL
830struct thread *_thread_add_read_write(const struct xref_threadsched *xref,
831 struct thread_master *m,
832 int (*func)(struct thread *),
833 void *arg, int fd, struct thread **t_ptr)
718e3744 834{
60a3efec 835 int dir = xref->thread_type;
d62a17ae 836 struct thread *thread = NULL;
1ef14bee 837 struct thread **thread_array;
d62a17ae 838
abf96a87 839 if (dir == THREAD_READ)
6c3aa850
DL
840 frrtrace(9, frr_libfrr, schedule_read, m,
841 xref->funcname, xref->xref.file, xref->xref.line,
842 t_ptr, fd, 0, arg, 0);
abf96a87 843 else
6c3aa850
DL
844 frrtrace(9, frr_libfrr, schedule_write, m,
845 xref->funcname, xref->xref.file, xref->xref.line,
846 t_ptr, fd, 0, arg, 0);
abf96a87 847
9b864cd3 848 assert(fd >= 0 && fd < m->fd_limit);
00dffa8c
DL
849 frr_with_mutex(&m->mtx) {
850 if (t_ptr && *t_ptr)
851 // thread is already scheduled; don't reschedule
852 break;
d62a17ae 853
854 /* default to a new pollfd */
855 nfds_t queuepos = m->handler.pfdcount;
856
1ef14bee
DS
857 if (dir == THREAD_READ)
858 thread_array = m->read;
859 else
860 thread_array = m->write;
861
d62a17ae 862 /* if we already have a pollfd for our file descriptor, find and
863 * use it */
864 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
865 if (m->handler.pfds[i].fd == fd) {
866 queuepos = i;
1ef14bee
DS
867
868#ifdef DEV_BUILD
869 /*
870 * What happens if we have a thread already
871 * created for this event?
872 */
873 if (thread_array[fd])
874 assert(!"Thread already scheduled for file descriptor");
875#endif
d62a17ae 876 break;
877 }
878
879 /* make sure we have room for this fd + pipe poker fd */
880 assert(queuepos + 1 < m->handler.pfdsize);
881
60a3efec 882 thread = thread_get(m, dir, func, arg, xref);
d62a17ae 883
884 m->handler.pfds[queuepos].fd = fd;
885 m->handler.pfds[queuepos].events |=
886 (dir == THREAD_READ ? POLLIN : POLLOUT);
887
888 if (queuepos == m->handler.pfdcount)
889 m->handler.pfdcount++;
890
891 if (thread) {
00dffa8c 892 frr_with_mutex(&thread->mtx) {
d62a17ae 893 thread->u.fd = fd;
1ef14bee 894 thread_array[thread->u.fd] = thread;
d62a17ae 895 }
d62a17ae 896
897 if (t_ptr) {
898 *t_ptr = thread;
899 thread->ref = t_ptr;
900 }
901 }
902
903 AWAKEN(m);
904 }
d62a17ae 905
906 return thread;
718e3744 907}
908
56a94b36 909static struct thread *
60a3efec
DL
910_thread_add_timer_timeval(const struct xref_threadsched *xref,
911 struct thread_master *m, int (*func)(struct thread *),
4322dea7 912 void *arg, struct timeval *time_relative,
60a3efec 913 struct thread **t_ptr)
718e3744 914{
d62a17ae 915 struct thread *thread;
96fe578a 916 struct timeval t;
d62a17ae 917
918 assert(m != NULL);
919
d62a17ae 920 assert(time_relative);
921
6c3aa850
DL
922 frrtrace(9, frr_libfrr, schedule_timer, m,
923 xref->funcname, xref->xref.file, xref->xref.line,
c7bb4f00 924 t_ptr, 0, 0, arg, (long)time_relative->tv_sec);
abf96a87 925
96fe578a
MS
926 /* Compute expiration/deadline time. */
927 monotime(&t);
928 timeradd(&t, time_relative, &t);
929
00dffa8c
DL
930 frr_with_mutex(&m->mtx) {
931 if (t_ptr && *t_ptr)
d279ef57 932 /* thread is already scheduled; don't reschedule */
d62a17ae 933 return NULL;
d62a17ae 934
4322dea7 935 thread = thread_get(m, THREAD_TIMER, func, arg, xref);
d62a17ae 936
00dffa8c 937 frr_with_mutex(&thread->mtx) {
96fe578a 938 thread->u.sands = t;
27d29ced 939 thread_timer_list_add(&m->timer, thread);
d62a17ae 940 if (t_ptr) {
941 *t_ptr = thread;
942 thread->ref = t_ptr;
943 }
944 }
d62a17ae 945
96fe578a
MS
946 /* The timer list is sorted - if this new timer
947 * might change the time we'll wait for, give the pthread
948 * a chance to re-compute.
949 */
950 if (thread_timer_list_first(&m->timer) == thread)
951 AWAKEN(m);
d62a17ae 952 }
d62a17ae 953
954 return thread;
9e867fe6 955}
956
98c91ac6 957
958/* Add timer event thread. */
60a3efec
DL
959struct thread *_thread_add_timer(const struct xref_threadsched *xref,
960 struct thread_master *m,
961 int (*func)(struct thread *),
962 void *arg, long timer, struct thread **t_ptr)
9e867fe6 963{
d62a17ae 964 struct timeval trel;
9e867fe6 965
d62a17ae 966 assert(m != NULL);
9e867fe6 967
d62a17ae 968 trel.tv_sec = timer;
969 trel.tv_usec = 0;
9e867fe6 970
4322dea7 971 return _thread_add_timer_timeval(xref, m, func, arg, &trel, t_ptr);
98c91ac6 972}
9e867fe6 973
98c91ac6 974/* Add timer event thread with "millisecond" resolution */
60a3efec
DL
975struct thread *_thread_add_timer_msec(const struct xref_threadsched *xref,
976 struct thread_master *m,
977 int (*func)(struct thread *),
978 void *arg, long timer,
979 struct thread **t_ptr)
98c91ac6 980{
d62a17ae 981 struct timeval trel;
9e867fe6 982
d62a17ae 983 assert(m != NULL);
718e3744 984
d62a17ae 985 trel.tv_sec = timer / 1000;
986 trel.tv_usec = 1000 * (timer % 1000);
98c91ac6 987
4322dea7 988 return _thread_add_timer_timeval(xref, m, func, arg, &trel, t_ptr);
a48b4e6d 989}
990
4322dea7 991/* Add timer event thread with "timeval" resolution */
60a3efec
DL
992struct thread *_thread_add_timer_tv(const struct xref_threadsched *xref,
993 struct thread_master *m,
994 int (*func)(struct thread *),
995 void *arg, struct timeval *tv,
996 struct thread **t_ptr)
d03c4cbd 997{
4322dea7 998 return _thread_add_timer_timeval(xref, m, func, arg, tv, t_ptr);
d03c4cbd
DL
999}
1000
718e3744 1001/* Add simple event thread. */
60a3efec
DL
1002struct thread *_thread_add_event(const struct xref_threadsched *xref,
1003 struct thread_master *m,
1004 int (*func)(struct thread *),
1005 void *arg, int val, struct thread **t_ptr)
718e3744 1006{
00dffa8c 1007 struct thread *thread = NULL;
d62a17ae 1008
6c3aa850
DL
1009 frrtrace(9, frr_libfrr, schedule_event, m,
1010 xref->funcname, xref->xref.file, xref->xref.line,
c7bb4f00 1011 t_ptr, 0, val, arg, 0);
abf96a87 1012
d62a17ae 1013 assert(m != NULL);
1014
00dffa8c
DL
1015 frr_with_mutex(&m->mtx) {
1016 if (t_ptr && *t_ptr)
d279ef57 1017 /* thread is already scheduled; don't reschedule */
00dffa8c 1018 break;
d62a17ae 1019
60a3efec 1020 thread = thread_get(m, THREAD_EVENT, func, arg, xref);
00dffa8c 1021 frr_with_mutex(&thread->mtx) {
d62a17ae 1022 thread->u.val = val;
c284542b 1023 thread_list_add_tail(&m->event, thread);
d62a17ae 1024 }
d62a17ae 1025
1026 if (t_ptr) {
1027 *t_ptr = thread;
1028 thread->ref = t_ptr;
1029 }
1030
1031 AWAKEN(m);
1032 }
d62a17ae 1033
1034 return thread;
718e3744 1035}
1036
63ccb9cb
QY
1037/* Thread cancellation ------------------------------------------------------ */
1038
8797240e
QY
1039/**
1040 * NOT's out the .events field of pollfd corresponding to the given file
1041 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
1042 *
1043 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
1044 * implementation for details.
1045 *
1046 * @param master
1047 * @param fd
1048 * @param state the event to cancel. One or more (OR'd together) of the
1049 * following:
1050 * - POLLIN
1051 * - POLLOUT
1052 */
d62a17ae 1053static void thread_cancel_rw(struct thread_master *master, int fd, short state)
0a95a0d0 1054{
42d74538
QY
1055 bool found = false;
1056
d62a17ae 1057 /* Cancel POLLHUP too just in case some bozo set it */
1058 state |= POLLHUP;
1059
1060 /* find the index of corresponding pollfd */
1061 nfds_t i;
1062
1063 for (i = 0; i < master->handler.pfdcount; i++)
42d74538
QY
1064 if (master->handler.pfds[i].fd == fd) {
1065 found = true;
d62a17ae 1066 break;
42d74538
QY
1067 }
1068
1069 if (!found) {
1070 zlog_debug(
1071 "[!] Received cancellation request for nonexistent rw job");
1072 zlog_debug("[!] threadmaster: %s | fd: %d",
996c9314 1073 master->name ? master->name : "", fd);
42d74538
QY
1074 return;
1075 }
d62a17ae 1076
1077 /* NOT out event. */
1078 master->handler.pfds[i].events &= ~(state);
1079
1080 /* If all events are canceled, delete / resize the pollfd array. */
1081 if (master->handler.pfds[i].events == 0) {
1082 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
1083 (master->handler.pfdcount - i - 1)
1084 * sizeof(struct pollfd));
1085 master->handler.pfdcount--;
e985cda0
S
1086 master->handler.pfds[master->handler.pfdcount].fd = 0;
1087 master->handler.pfds[master->handler.pfdcount].events = 0;
d62a17ae 1088 }
1089
1090 /* If we have the same pollfd in the copy, perform the same operations,
1091 * otherwise return. */
1092 if (i >= master->handler.copycount)
1093 return;
1094
1095 master->handler.copy[i].events &= ~(state);
1096
1097 if (master->handler.copy[i].events == 0) {
1098 memmove(master->handler.copy + i, master->handler.copy + i + 1,
1099 (master->handler.copycount - i - 1)
1100 * sizeof(struct pollfd));
1101 master->handler.copycount--;
e985cda0
S
1102 master->handler.copy[master->handler.copycount].fd = 0;
1103 master->handler.copy[master->handler.copycount].events = 0;
d62a17ae 1104 }
0a95a0d0
DS
1105}
1106
1189d95f 1107/**
63ccb9cb 1108 * Process cancellation requests.
1189d95f 1109 *
63ccb9cb
QY
1110 * This may only be run from the pthread which owns the thread_master.
1111 *
1112 * @param master the thread master to process
1113 * @REQUIRE master->mtx
1189d95f 1114 */
d62a17ae 1115static void do_thread_cancel(struct thread_master *master)
718e3744 1116{
c284542b 1117 struct thread_list_head *list = NULL;
d62a17ae 1118 struct thread **thread_array = NULL;
1119 struct thread *thread;
1120
1121 struct cancel_req *cr;
1122 struct listnode *ln;
1123 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
d279ef57
DS
1124 /*
1125 * If this is an event object cancellation, linear search
1126 * through event list deleting any events which have the
1127 * specified argument. We also need to check every thread
1128 * in the ready queue.
1129 */
d62a17ae 1130 if (cr->eventobj) {
1131 struct thread *t;
c284542b 1132
81fddbe7 1133 frr_each_safe(thread_list, &master->event, t) {
c284542b
DL
1134 if (t->arg != cr->eventobj)
1135 continue;
1136 thread_list_del(&master->event, t);
1137 if (t->ref)
1138 *t->ref = NULL;
1139 thread_add_unuse(master, t);
d62a17ae 1140 }
1141
81fddbe7 1142 frr_each_safe(thread_list, &master->ready, t) {
c284542b
DL
1143 if (t->arg != cr->eventobj)
1144 continue;
1145 thread_list_del(&master->ready, t);
1146 if (t->ref)
1147 *t->ref = NULL;
1148 thread_add_unuse(master, t);
d62a17ae 1149 }
1150 continue;
1151 }
1152
d279ef57
DS
1153 /*
1154 * The pointer varies depending on whether the cancellation
1155 * request was made asynchronously or not. If it was, we
1156 * need to check whether the thread even exists anymore
1157 * before cancelling it.
1158 */
d62a17ae 1159 thread = (cr->thread) ? cr->thread : *cr->threadref;
1160
1161 if (!thread)
1162 continue;
1163
1164 /* Determine the appropriate queue to cancel the thread from */
1165 switch (thread->type) {
1166 case THREAD_READ:
1167 thread_cancel_rw(master, thread->u.fd, POLLIN);
1168 thread_array = master->read;
1169 break;
1170 case THREAD_WRITE:
1171 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1172 thread_array = master->write;
1173 break;
1174 case THREAD_TIMER:
27d29ced 1175 thread_timer_list_del(&master->timer, thread);
d62a17ae 1176 break;
1177 case THREAD_EVENT:
1178 list = &master->event;
1179 break;
1180 case THREAD_READY:
1181 list = &master->ready;
1182 break;
1183 default:
1184 continue;
1185 break;
1186 }
1187
27d29ced 1188 if (list) {
c284542b 1189 thread_list_del(list, thread);
d62a17ae 1190 } else if (thread_array) {
1191 thread_array[thread->u.fd] = NULL;
d62a17ae 1192 }
1193
1194 if (thread->ref)
1195 *thread->ref = NULL;
1196
1197 thread_add_unuse(thread->master, thread);
1198 }
1199
1200 /* Delete and free all cancellation requests */
41b21bfa
MS
1201 if (master->cancel_req)
1202 list_delete_all_node(master->cancel_req);
d62a17ae 1203
1204 /* Wake up any threads which may be blocked in thread_cancel_async() */
1205 master->canceled = true;
1206 pthread_cond_broadcast(&master->cancel_cond);
718e3744 1207}
1208
63ccb9cb
QY
1209/**
1210 * Cancel any events which have the specified argument.
1211 *
1212 * MT-Unsafe
1213 *
1214 * @param m the thread_master to cancel from
1215 * @param arg the argument passed when creating the event
1216 */
d62a17ae 1217void thread_cancel_event(struct thread_master *master, void *arg)
718e3744 1218{
d62a17ae 1219 assert(master->owner == pthread_self());
1220
00dffa8c 1221 frr_with_mutex(&master->mtx) {
d62a17ae 1222 struct cancel_req *cr =
1223 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1224 cr->eventobj = arg;
1225 listnode_add(master->cancel_req, cr);
1226 do_thread_cancel(master);
1227 }
63ccb9cb 1228}
1189d95f 1229
63ccb9cb
QY
1230/**
1231 * Cancel a specific task.
1232 *
1233 * MT-Unsafe
1234 *
1235 * @param thread task to cancel
1236 */
b3d6bc6e 1237void thread_cancel(struct thread **thread)
63ccb9cb 1238{
b3d6bc6e
MS
1239 struct thread_master *master;
1240
1241 if (thread == NULL || *thread == NULL)
1242 return;
1243
1244 master = (*thread)->master;
d62a17ae 1245
6c3aa850
DL
1246 frrtrace(9, frr_libfrr, thread_cancel, master,
1247 (*thread)->xref->funcname, (*thread)->xref->xref.file,
1248 (*thread)->xref->xref.line, NULL, (*thread)->u.fd,
b4d6e855 1249 (*thread)->u.val, (*thread)->arg, (*thread)->u.sands.tv_sec);
abf96a87 1250
6ed04aa2
DS
1251 assert(master->owner == pthread_self());
1252
00dffa8c 1253 frr_with_mutex(&master->mtx) {
d62a17ae 1254 struct cancel_req *cr =
1255 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
b3d6bc6e 1256 cr->thread = *thread;
6ed04aa2
DS
1257 listnode_add(master->cancel_req, cr);
1258 do_thread_cancel(master);
d62a17ae 1259 }
b3d6bc6e
MS
1260
1261 *thread = NULL;
63ccb9cb 1262}
1189d95f 1263
63ccb9cb
QY
1264/**
1265 * Asynchronous cancellation.
1266 *
8797240e
QY
1267 * Called with either a struct thread ** or void * to an event argument,
1268 * this function posts the correct cancellation request and blocks until it is
1269 * serviced.
63ccb9cb
QY
1270 *
1271 * If the thread is currently running, execution blocks until it completes.
1272 *
8797240e
QY
1273 * The last two parameters are mutually exclusive, i.e. if you pass one the
1274 * other must be NULL.
1275 *
1276 * When the cancellation procedure executes on the target thread_master, the
1277 * thread * provided is checked for nullity. If it is null, the thread is
1278 * assumed to no longer exist and the cancellation request is a no-op. Thus
1279 * users of this API must pass a back-reference when scheduling the original
1280 * task.
1281 *
63ccb9cb
QY
1282 * MT-Safe
1283 *
8797240e
QY
1284 * @param master the thread master with the relevant event / task
1285 * @param thread pointer to thread to cancel
1286 * @param eventobj the event
63ccb9cb 1287 */
d62a17ae 1288void thread_cancel_async(struct thread_master *master, struct thread **thread,
1289 void *eventobj)
63ccb9cb 1290{
d62a17ae 1291 assert(!(thread && eventobj) && (thread || eventobj));
abf96a87
QY
1292
1293 if (thread && *thread)
c7bb4f00 1294 frrtrace(9, frr_libfrr, thread_cancel_async, master,
6c3aa850
DL
1295 (*thread)->xref->funcname, (*thread)->xref->xref.file,
1296 (*thread)->xref->xref.line, NULL, (*thread)->u.fd,
c7bb4f00
QY
1297 (*thread)->u.val, (*thread)->arg,
1298 (*thread)->u.sands.tv_sec);
abf96a87 1299 else
c7bb4f00
QY
1300 frrtrace(9, frr_libfrr, thread_cancel_async, master, NULL, NULL,
1301 0, NULL, 0, 0, eventobj, 0);
abf96a87 1302
d62a17ae 1303 assert(master->owner != pthread_self());
1304
00dffa8c 1305 frr_with_mutex(&master->mtx) {
d62a17ae 1306 master->canceled = false;
1307
1308 if (thread) {
1309 struct cancel_req *cr =
1310 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1311 cr->threadref = thread;
1312 listnode_add(master->cancel_req, cr);
1313 } else if (eventobj) {
1314 struct cancel_req *cr =
1315 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1316 cr->eventobj = eventobj;
1317 listnode_add(master->cancel_req, cr);
1318 }
1319 AWAKEN(master);
1320
1321 while (!master->canceled)
1322 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1323 }
50478845
MS
1324
1325 if (thread)
1326 *thread = NULL;
718e3744 1327}
63ccb9cb 1328/* ------------------------------------------------------------------------- */
718e3744 1329
27d29ced 1330static struct timeval *thread_timer_wait(struct thread_timer_list_head *timers,
d62a17ae 1331 struct timeval *timer_val)
718e3744 1332{
27d29ced
DL
1333 if (!thread_timer_list_count(timers))
1334 return NULL;
1335
1336 struct thread *next_timer = thread_timer_list_first(timers);
1337 monotime_until(&next_timer->u.sands, timer_val);
1338 return timer_val;
718e3744 1339}
718e3744 1340
d62a17ae 1341static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1342 struct thread *fetch)
718e3744 1343{
d62a17ae 1344 *fetch = *thread;
1345 thread_add_unuse(m, thread);
1346 return fetch;
718e3744 1347}
1348
d62a17ae 1349static int thread_process_io_helper(struct thread_master *m,
45f3d590
DS
1350 struct thread *thread, short state,
1351 short actual_state, int pos)
5d4ccd4e 1352{
d62a17ae 1353 struct thread **thread_array;
1354
45f3d590
DS
1355 /*
1356 * poll() clears the .events field, but the pollfd array we
1357 * pass to poll() is a copy of the one used to schedule threads.
1358 * We need to synchronize state between the two here by applying
1359 * the same changes poll() made on the copy of the "real" pollfd
1360 * array.
1361 *
1362 * This cleans up a possible infinite loop where we refuse
1363 * to respond to a poll event but poll is insistent that
1364 * we should.
1365 */
1366 m->handler.pfds[pos].events &= ~(state);
1367
1368 if (!thread) {
1369 if ((actual_state & (POLLHUP|POLLIN)) != POLLHUP)
1370 flog_err(EC_LIB_NO_THREAD,
1d5453d6 1371 "Attempting to process an I/O event but for fd: %d(%d) no thread to handle this!",
45f3d590 1372 m->handler.pfds[pos].fd, actual_state);
d62a17ae 1373 return 0;
45f3d590 1374 }
d62a17ae 1375
1376 if (thread->type == THREAD_READ)
1377 thread_array = m->read;
1378 else
1379 thread_array = m->write;
1380
1381 thread_array[thread->u.fd] = NULL;
c284542b 1382 thread_list_add_tail(&m->ready, thread);
d62a17ae 1383 thread->type = THREAD_READY;
45f3d590 1384
d62a17ae 1385 return 1;
5d4ccd4e
DS
1386}
1387
8797240e
QY
1388/**
1389 * Process I/O events.
1390 *
1391 * Walks through file descriptor array looking for those pollfds whose .revents
1392 * field has something interesting. Deletes any invalid file descriptors.
1393 *
1394 * @param m the thread master
1395 * @param num the number of active file descriptors (return value of poll())
1396 */
d62a17ae 1397static void thread_process_io(struct thread_master *m, unsigned int num)
0a95a0d0 1398{
d62a17ae 1399 unsigned int ready = 0;
1400 struct pollfd *pfds = m->handler.copy;
1401
1402 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1403 /* no event for current fd? immediately continue */
1404 if (pfds[i].revents == 0)
1405 continue;
1406
1407 ready++;
1408
d279ef57
DS
1409 /*
1410 * Unless someone has called thread_cancel from another
1411 * pthread, the only thing that could have changed in
1412 * m->handler.pfds while we were asleep is the .events
1413 * field in a given pollfd. Barring thread_cancel() that
1414 * value should be a superset of the values we have in our
1415 * copy, so there's no need to update it. Similarily,
1416 * barring deletion, the fd should still be a valid index
1417 * into the master's pfds.
d142453d
DS
1418 *
1419 * We are including POLLERR here to do a READ event
1420 * this is because the read should fail and the
1421 * read function should handle it appropriately
d279ef57 1422 */
d142453d 1423 if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
d62a17ae 1424 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
45f3d590
DS
1425 pfds[i].revents, i);
1426 }
d62a17ae 1427 if (pfds[i].revents & POLLOUT)
1428 thread_process_io_helper(m, m->write[pfds[i].fd],
45f3d590 1429 POLLOUT, pfds[i].revents, i);
d62a17ae 1430
1431 /* if one of our file descriptors is garbage, remove the same
1432 * from
1433 * both pfds + update sizes and index */
1434 if (pfds[i].revents & POLLNVAL) {
1435 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1436 (m->handler.pfdcount - i - 1)
1437 * sizeof(struct pollfd));
1438 m->handler.pfdcount--;
e985cda0
S
1439 m->handler.pfds[m->handler.pfdcount].fd = 0;
1440 m->handler.pfds[m->handler.pfdcount].events = 0;
d62a17ae 1441
1442 memmove(pfds + i, pfds + i + 1,
1443 (m->handler.copycount - i - 1)
1444 * sizeof(struct pollfd));
1445 m->handler.copycount--;
e985cda0
S
1446 m->handler.copy[m->handler.copycount].fd = 0;
1447 m->handler.copy[m->handler.copycount].events = 0;
d62a17ae 1448
1449 i--;
1450 }
1451 }
718e3744 1452}
1453
8b70d0b0 1454/* Add all timers that have popped to the ready list. */
27d29ced 1455static unsigned int thread_process_timers(struct thread_timer_list_head *timers,
d62a17ae 1456 struct timeval *timenow)
a48b4e6d 1457{
d62a17ae 1458 struct thread *thread;
1459 unsigned int ready = 0;
1460
27d29ced 1461 while ((thread = thread_timer_list_first(timers))) {
d62a17ae 1462 if (timercmp(timenow, &thread->u.sands, <))
1463 return ready;
27d29ced 1464 thread_timer_list_pop(timers);
d62a17ae 1465 thread->type = THREAD_READY;
c284542b 1466 thread_list_add_tail(&thread->master->ready, thread);
d62a17ae 1467 ready++;
1468 }
1469 return ready;
a48b4e6d 1470}
1471
2613abe6 1472/* process a list en masse, e.g. for event thread lists */
c284542b 1473static unsigned int thread_process(struct thread_list_head *list)
2613abe6 1474{
d62a17ae 1475 struct thread *thread;
d62a17ae 1476 unsigned int ready = 0;
1477
c284542b 1478 while ((thread = thread_list_pop(list))) {
d62a17ae 1479 thread->type = THREAD_READY;
c284542b 1480 thread_list_add_tail(&thread->master->ready, thread);
d62a17ae 1481 ready++;
1482 }
1483 return ready;
2613abe6
PJ
1484}
1485
1486
718e3744 1487/* Fetch next ready thread. */
d62a17ae 1488struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
718e3744 1489{
d62a17ae 1490 struct thread *thread = NULL;
1491 struct timeval now;
1492 struct timeval zerotime = {0, 0};
1493 struct timeval tv;
1494 struct timeval *tw = NULL;
d81ca9a3 1495 bool eintr_p = false;
d62a17ae 1496 int num = 0;
1497
1498 do {
1499 /* Handle signals if any */
1500 if (m->handle_signals)
1501 quagga_sigevent_process();
1502
1503 pthread_mutex_lock(&m->mtx);
1504
1505 /* Process any pending cancellation requests */
1506 do_thread_cancel(m);
1507
e3c9529e
QY
1508 /*
1509 * Attempt to flush ready queue before going into poll().
1510 * This is performance-critical. Think twice before modifying.
1511 */
c284542b 1512 if ((thread = thread_list_pop(&m->ready))) {
e3c9529e
QY
1513 fetch = thread_run(m, thread, fetch);
1514 if (fetch->ref)
1515 *fetch->ref = NULL;
1516 pthread_mutex_unlock(&m->mtx);
1517 break;
1518 }
1519
1520 /* otherwise, tick through scheduling sequence */
1521
bca37d17
QY
1522 /*
1523 * Post events to ready queue. This must come before the
1524 * following block since events should occur immediately
1525 */
d62a17ae 1526 thread_process(&m->event);
1527
bca37d17
QY
1528 /*
1529 * If there are no tasks on the ready queue, we will poll()
1530 * until a timer expires or we receive I/O, whichever comes
1531 * first. The strategy for doing this is:
d62a17ae 1532 *
1533 * - If there are events pending, set the poll() timeout to zero
1534 * - If there are no events pending, but there are timers
d279ef57
DS
1535 * pending, set the timeout to the smallest remaining time on
1536 * any timer.
d62a17ae 1537 * - If there are neither timers nor events pending, but there
d279ef57 1538 * are file descriptors pending, block indefinitely in poll()
d62a17ae 1539 * - If nothing is pending, it's time for the application to die
1540 *
1541 * In every case except the last, we need to hit poll() at least
bca37d17
QY
1542 * once per loop to avoid starvation by events
1543 */
c284542b 1544 if (!thread_list_count(&m->ready))
27d29ced 1545 tw = thread_timer_wait(&m->timer, &tv);
d62a17ae 1546
c284542b
DL
1547 if (thread_list_count(&m->ready) ||
1548 (tw && !timercmp(tw, &zerotime, >)))
d62a17ae 1549 tw = &zerotime;
1550
1551 if (!tw && m->handler.pfdcount == 0) { /* die */
1552 pthread_mutex_unlock(&m->mtx);
1553 fetch = NULL;
1554 break;
1555 }
1556
bca37d17
QY
1557 /*
1558 * Copy pollfd array + # active pollfds in it. Not necessary to
1559 * copy the array size as this is fixed.
1560 */
d62a17ae 1561 m->handler.copycount = m->handler.pfdcount;
1562 memcpy(m->handler.copy, m->handler.pfds,
1563 m->handler.copycount * sizeof(struct pollfd));
1564
e3c9529e
QY
1565 pthread_mutex_unlock(&m->mtx);
1566 {
d81ca9a3
MS
1567 eintr_p = false;
1568 num = fd_poll(m, tw, &eintr_p);
e3c9529e
QY
1569 }
1570 pthread_mutex_lock(&m->mtx);
d764d2cc 1571
e3c9529e
QY
1572 /* Handle any errors received in poll() */
1573 if (num < 0) {
d81ca9a3 1574 if (eintr_p) {
d62a17ae 1575 pthread_mutex_unlock(&m->mtx);
e3c9529e
QY
1576 /* loop around to signal handler */
1577 continue;
d62a17ae 1578 }
1579
e3c9529e 1580 /* else die */
450971aa 1581 flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
9ef9495e 1582 safe_strerror(errno));
e3c9529e
QY
1583 pthread_mutex_unlock(&m->mtx);
1584 fetch = NULL;
1585 break;
bca37d17 1586 }
d62a17ae 1587
1588 /* Post timers to ready queue. */
1589 monotime(&now);
27d29ced 1590 thread_process_timers(&m->timer, &now);
d62a17ae 1591
1592 /* Post I/O to ready queue. */
1593 if (num > 0)
1594 thread_process_io(m, num);
1595
d62a17ae 1596 pthread_mutex_unlock(&m->mtx);
1597
1598 } while (!thread && m->spin);
1599
1600 return fetch;
718e3744 1601}
1602
d62a17ae 1603static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
62f44022 1604{
d62a17ae 1605 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1606 + (a.tv_usec - b.tv_usec));
62f44022
QY
1607}
1608
d62a17ae 1609unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1610 unsigned long *cputime)
718e3744 1611{
d62a17ae 1612 /* This is 'user + sys' time. */
1613 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1614 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1615 return timeval_elapsed(now->real, start->real);
8b70d0b0 1616}
1617
50596be0
DS
1618/* We should aim to yield after yield milliseconds, which defaults
1619 to THREAD_YIELD_TIME_SLOT .
8b70d0b0 1620 Note: we are using real (wall clock) time for this calculation.
1621 It could be argued that CPU time may make more sense in certain
1622 contexts. The things to consider are whether the thread may have
1623 blocked (in which case wall time increases, but CPU time does not),
1624 or whether the system is heavily loaded with other processes competing
d62a17ae 1625 for CPU time. On balance, wall clock time seems to make sense.
8b70d0b0 1626 Plus it has the added benefit that gettimeofday should be faster
1627 than calling getrusage. */
d62a17ae 1628int thread_should_yield(struct thread *thread)
718e3744 1629{
d62a17ae 1630 int result;
00dffa8c 1631 frr_with_mutex(&thread->mtx) {
d62a17ae 1632 result = monotime_since(&thread->real, NULL)
1633 > (int64_t)thread->yield;
1634 }
d62a17ae 1635 return result;
50596be0
DS
1636}
1637
d62a17ae 1638void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
50596be0 1639{
00dffa8c 1640 frr_with_mutex(&thread->mtx) {
d62a17ae 1641 thread->yield = yield_time;
1642 }
718e3744 1643}
1644
d62a17ae 1645void thread_getrusage(RUSAGE_T *r)
db9c0df9 1646{
231db9a6
DS
1647#if defined RUSAGE_THREAD
1648#define FRR_RUSAGE RUSAGE_THREAD
1649#else
1650#define FRR_RUSAGE RUSAGE_SELF
1651#endif
d62a17ae 1652 monotime(&r->real);
f75e802d 1653#ifndef EXCLUDE_CPU_TIME
231db9a6 1654 getrusage(FRR_RUSAGE, &(r->cpu));
f75e802d 1655#endif
db9c0df9
PJ
1656}
1657
fbcac826
QY
1658/*
1659 * Call a thread.
1660 *
1661 * This function will atomically update the thread's usage history. At present
1662 * this is the only spot where usage history is written. Nevertheless the code
1663 * has been written such that the introduction of writers in the future should
1664 * not need to update it provided the writers atomically perform only the
1665 * operations done here, i.e. updating the total and maximum times. In
1666 * particular, the maximum real and cpu times must be monotonically increasing
1667 * or this code is not correct.
1668 */
d62a17ae 1669void thread_call(struct thread *thread)
718e3744 1670{
f75e802d 1671#ifndef EXCLUDE_CPU_TIME
fbcac826
QY
1672 _Atomic unsigned long realtime, cputime;
1673 unsigned long exp;
1674 unsigned long helper;
f75e802d 1675#endif
d62a17ae 1676 RUSAGE_T before, after;
cc8b13a0 1677
d62a17ae 1678 GETRUSAGE(&before);
1679 thread->real = before.real;
718e3744 1680
6c3aa850
DL
1681 frrtrace(9, frr_libfrr, thread_call, thread->master,
1682 thread->xref->funcname, thread->xref->xref.file,
1683 thread->xref->xref.line, NULL, thread->u.fd,
c7bb4f00 1684 thread->u.val, thread->arg, thread->u.sands.tv_sec);
abf96a87 1685
d62a17ae 1686 pthread_setspecific(thread_current, thread);
1687 (*thread->func)(thread);
1688 pthread_setspecific(thread_current, NULL);
718e3744 1689
d62a17ae 1690 GETRUSAGE(&after);
718e3744 1691
f75e802d 1692#ifndef EXCLUDE_CPU_TIME
fbcac826
QY
1693 realtime = thread_consumed_time(&after, &before, &helper);
1694 cputime = helper;
1695
1696 /* update realtime */
1697 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1698 memory_order_seq_cst);
1699 exp = atomic_load_explicit(&thread->hist->real.max,
1700 memory_order_seq_cst);
1701 while (exp < realtime
1702 && !atomic_compare_exchange_weak_explicit(
1703 &thread->hist->real.max, &exp, realtime,
1704 memory_order_seq_cst, memory_order_seq_cst))
1705 ;
1706
1707 /* update cputime */
1708 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1709 memory_order_seq_cst);
1710 exp = atomic_load_explicit(&thread->hist->cpu.max,
1711 memory_order_seq_cst);
1712 while (exp < cputime
1713 && !atomic_compare_exchange_weak_explicit(
1714 &thread->hist->cpu.max, &exp, cputime,
1715 memory_order_seq_cst, memory_order_seq_cst))
1716 ;
1717
1718 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1719 memory_order_seq_cst);
1720 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1721 memory_order_seq_cst);
718e3744 1722
924b9229 1723#ifdef CONSUMED_TIME_CHECK
d62a17ae 1724 if (realtime > CONSUMED_TIME_CHECK) {
1725 /*
1726 * We have a CPU Hog on our hands.
1727 * Whinge about it now, so we're aware this is yet another task
1728 * to fix.
1729 */
9ef9495e 1730 flog_warn(
450971aa 1731 EC_LIB_SLOW_THREAD,
d62a17ae 1732 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
60a3efec 1733 thread->xref->funcname, (unsigned long)thread->func,
d62a17ae 1734 realtime / 1000, cputime / 1000);
1735 }
924b9229 1736#endif /* CONSUMED_TIME_CHECK */
f75e802d 1737#endif /* Exclude CPU Time */
718e3744 1738}
1739
1740/* Execute thread */
60a3efec
DL
1741void _thread_execute(const struct xref_threadsched *xref,
1742 struct thread_master *m, int (*func)(struct thread *),
1743 void *arg, int val)
718e3744 1744{
c4345fbf 1745 struct thread *thread;
718e3744 1746
c4345fbf 1747 /* Get or allocate new thread to execute. */
00dffa8c 1748 frr_with_mutex(&m->mtx) {
60a3efec 1749 thread = thread_get(m, THREAD_EVENT, func, arg, xref);
9c7753e4 1750
c4345fbf 1751 /* Set its event value. */
00dffa8c 1752 frr_with_mutex(&thread->mtx) {
c4345fbf
RZ
1753 thread->add_type = THREAD_EXECUTE;
1754 thread->u.val = val;
1755 thread->ref = &thread;
1756 }
c4345fbf 1757 }
f7c62e11 1758
c4345fbf
RZ
1759 /* Execute thread doing all accounting. */
1760 thread_call(thread);
9c7753e4 1761
c4345fbf
RZ
1762 /* Give back or free thread. */
1763 thread_add_unuse(m, thread);
718e3744 1764}
1543c387
MS
1765
1766/* Debug signal mask - if 'sigs' is NULL, use current effective mask. */
1767void debug_signals(const sigset_t *sigs)
1768{
1769 int i, found;
1770 sigset_t tmpsigs;
1771 char buf[300];
1772
1773 /*
1774 * We're only looking at the non-realtime signals here, so we need
1775 * some limit value. Platform differences mean at some point we just
1776 * need to pick a reasonable value.
1777 */
1778#if defined SIGRTMIN
1779# define LAST_SIGNAL SIGRTMIN
1780#else
1781# define LAST_SIGNAL 32
1782#endif
1783
1784
1785 if (sigs == NULL) {
1786 sigemptyset(&tmpsigs);
1787 pthread_sigmask(SIG_BLOCK, NULL, &tmpsigs);
1788 sigs = &tmpsigs;
1789 }
1790
1791 found = 0;
1792 buf[0] = '\0';
1793
1794 for (i = 0; i < LAST_SIGNAL; i++) {
1795 char tmp[20];
1796
1797 if (sigismember(sigs, i) > 0) {
1798 if (found > 0)
1799 strlcat(buf, ",", sizeof(buf));
1800 snprintf(tmp, sizeof(tmp), "%d", i);
1801 strlcat(buf, tmp, sizeof(buf));
1802 found++;
1803 }
1804 }
1805
1806 if (found == 0)
1807 snprintf(buf, sizeof(buf), "<none>");
1808
1809 zlog_debug("%s: %s", __func__, buf);
1810}