]> git.proxmox.com Git - mirror_frr.git/blame - lib/thread.c
Merge pull request #8036 from qlyoung/disable-mallinfo
[mirror_frr.git] / lib / thread.c
CommitLineData
718e3744 1/* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
896014f4
DL
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
718e3744 19 */
20
21/* #define DEBUG */
22
23#include <zebra.h>
308d14ae 24#include <sys/resource.h>
718e3744 25
26#include "thread.h"
27#include "memory.h"
3e41733f 28#include "frrcu.h"
718e3744 29#include "log.h"
e04ab74d 30#include "hash.h"
31#include "command.h"
05c447dd 32#include "sigevent.h"
3bf2673b 33#include "network.h"
bd74dc61 34#include "jhash.h"
fbcac826 35#include "frratomic.h"
00dffa8c 36#include "frr_pthread.h"
9ef9495e 37#include "lib_errors.h"
912d45a1 38#include "libfrr_trace.h"
1a9f340b 39#include "libfrr.h"
d6be5fb9 40
d62a17ae 41DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
4a1ab8e4 42DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
a6f235f3 43DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
d62a17ae 44DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
4a1ab8e4 45
c284542b
DL
46DECLARE_LIST(thread_list, struct thread, threaditem)
47
27d29ced
DL
48static int thread_timer_cmp(const struct thread *a, const struct thread *b)
49{
50 if (a->u.sands.tv_sec < b->u.sands.tv_sec)
51 return -1;
52 if (a->u.sands.tv_sec > b->u.sands.tv_sec)
53 return 1;
54 if (a->u.sands.tv_usec < b->u.sands.tv_usec)
55 return -1;
56 if (a->u.sands.tv_usec > b->u.sands.tv_usec)
57 return 1;
58 return 0;
59}
60
61DECLARE_HEAP(thread_timer_list, struct thread, timeritem,
62 thread_timer_cmp)
63
3b96b781
HT
64#if defined(__APPLE__)
65#include <mach/mach.h>
66#include <mach/mach_time.h>
67#endif
68
d62a17ae 69#define AWAKEN(m) \
70 do { \
2b64873d 71 const unsigned char wakebyte = 0x01; \
d62a17ae 72 write(m->io_pipe[1], &wakebyte, 1); \
73 } while (0);
3bf2673b 74
62f44022 75/* control variable for initializer */
c17faa4b 76static pthread_once_t init_once = PTHREAD_ONCE_INIT;
e0bebc7c 77pthread_key_t thread_current;
6b0655a2 78
c17faa4b 79static pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
62f44022
QY
80static struct list *masters;
81
6655966d 82static void thread_free(struct thread_master *master, struct thread *thread);
6b0655a2 83
62f44022 84/* CLI start ---------------------------------------------------------------- */
d8b87afe 85static unsigned int cpu_record_hash_key(const struct cpu_thread_history *a)
e04ab74d 86{
883cc51d 87 int size = sizeof(a->func);
bd74dc61
DS
88
89 return jhash(&a->func, size, 0);
e04ab74d 90}
91
74df8d6d 92static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
d62a17ae 93 const struct cpu_thread_history *b)
e04ab74d 94{
d62a17ae 95 return a->func == b->func;
e04ab74d 96}
97
d62a17ae 98static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
e04ab74d 99{
d62a17ae 100 struct cpu_thread_history *new;
101 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
102 new->func = a->func;
103 new->funcname = a->funcname;
104 return new;
e04ab74d 105}
106
d62a17ae 107static void cpu_record_hash_free(void *a)
228da428 108{
d62a17ae 109 struct cpu_thread_history *hist = a;
110
111 XFREE(MTYPE_THREAD_STATS, hist);
228da428
CC
112}
113
f75e802d 114#ifndef EXCLUDE_CPU_TIME
d62a17ae 115static void vty_out_cpu_thread_history(struct vty *vty,
116 struct cpu_thread_history *a)
e04ab74d 117{
9a8a7b0e 118 vty_out(vty, "%5zu %10zu.%03zu %9zu %8zu %9zu %8zu %9zu",
72327cf3
MS
119 a->total_active, a->cpu.total / 1000, a->cpu.total % 1000,
120 a->total_calls, (a->cpu.total / a->total_calls), a->cpu.max,
121 (a->real.total / a->total_calls), a->real.max);
84d951d0 122 vty_out(vty, " %c%c%c%c%c %s\n",
d62a17ae 123 a->types & (1 << THREAD_READ) ? 'R' : ' ',
124 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
125 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
126 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
127 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
e04ab74d 128}
129
e3b78da8 130static void cpu_record_hash_print(struct hash_bucket *bucket, void *args[])
e04ab74d 131{
d62a17ae 132 struct cpu_thread_history *totals = args[0];
fbcac826 133 struct cpu_thread_history copy;
d62a17ae 134 struct vty *vty = args[1];
fbcac826 135 uint8_t *filter = args[2];
d62a17ae 136
137 struct cpu_thread_history *a = bucket->data;
138
fbcac826
QY
139 copy.total_active =
140 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
141 copy.total_calls =
142 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
143 copy.cpu.total =
144 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
145 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
146 copy.real.total =
147 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
148 copy.real.max =
149 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
150 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
151 copy.funcname = a->funcname;
152
153 if (!(copy.types & *filter))
d62a17ae 154 return;
fbcac826
QY
155
156 vty_out_cpu_thread_history(vty, &copy);
157 totals->total_active += copy.total_active;
158 totals->total_calls += copy.total_calls;
159 totals->real.total += copy.real.total;
160 if (totals->real.max < copy.real.max)
161 totals->real.max = copy.real.max;
162 totals->cpu.total += copy.cpu.total;
163 if (totals->cpu.max < copy.cpu.max)
164 totals->cpu.max = copy.cpu.max;
e04ab74d 165}
166
fbcac826 167static void cpu_record_print(struct vty *vty, uint8_t filter)
e04ab74d 168{
d62a17ae 169 struct cpu_thread_history tmp;
170 void *args[3] = {&tmp, vty, &filter};
171 struct thread_master *m;
172 struct listnode *ln;
173
0d6f7fd6 174 memset(&tmp, 0, sizeof(tmp));
d62a17ae 175 tmp.funcname = "TOTAL";
176 tmp.types = filter;
177
00dffa8c 178 frr_with_mutex(&masters_mtx) {
d62a17ae 179 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
180 const char *name = m->name ? m->name : "main";
181
182 char underline[strlen(name) + 1];
183 memset(underline, '-', sizeof(underline));
4f113d60 184 underline[sizeof(underline) - 1] = '\0';
d62a17ae 185
186 vty_out(vty, "\n");
187 vty_out(vty, "Showing statistics for pthread %s\n",
188 name);
189 vty_out(vty, "-------------------------------%s\n",
190 underline);
84d951d0 191 vty_out(vty, "%30s %18s %18s\n", "",
d62a17ae 192 "CPU (user+system):", "Real (wall-clock):");
193 vty_out(vty,
194 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
195 vty_out(vty, " Avg uSec Max uSecs");
196 vty_out(vty, " Type Thread\n");
197
198 if (m->cpu_record->count)
199 hash_iterate(
200 m->cpu_record,
e3b78da8 201 (void (*)(struct hash_bucket *,
d62a17ae 202 void *))cpu_record_hash_print,
203 args);
204 else
205 vty_out(vty, "No data to display yet.\n");
206
207 vty_out(vty, "\n");
208 }
209 }
d62a17ae 210
211 vty_out(vty, "\n");
212 vty_out(vty, "Total thread statistics\n");
213 vty_out(vty, "-------------------------\n");
84d951d0 214 vty_out(vty, "%30s %18s %18s\n", "",
d62a17ae 215 "CPU (user+system):", "Real (wall-clock):");
216 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
217 vty_out(vty, " Avg uSec Max uSecs");
218 vty_out(vty, " Type Thread\n");
219
220 if (tmp.total_calls > 0)
221 vty_out_cpu_thread_history(vty, &tmp);
e04ab74d 222}
f75e802d 223#endif
e04ab74d 224
e3b78da8 225static void cpu_record_hash_clear(struct hash_bucket *bucket, void *args[])
e276eb82 226{
fbcac826 227 uint8_t *filter = args[0];
d62a17ae 228 struct hash *cpu_record = args[1];
229
230 struct cpu_thread_history *a = bucket->data;
62f44022 231
d62a17ae 232 if (!(a->types & *filter))
233 return;
f48f65d2 234
d62a17ae 235 hash_release(cpu_record, bucket->data);
e276eb82
PJ
236}
237
fbcac826 238static void cpu_record_clear(uint8_t filter)
e276eb82 239{
fbcac826 240 uint8_t *tmp = &filter;
d62a17ae 241 struct thread_master *m;
242 struct listnode *ln;
243
00dffa8c 244 frr_with_mutex(&masters_mtx) {
d62a17ae 245 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
00dffa8c 246 frr_with_mutex(&m->mtx) {
d62a17ae 247 void *args[2] = {tmp, m->cpu_record};
248 hash_iterate(
249 m->cpu_record,
e3b78da8 250 (void (*)(struct hash_bucket *,
d62a17ae 251 void *))cpu_record_hash_clear,
252 args);
253 }
d62a17ae 254 }
255 }
62f44022
QY
256}
257
fbcac826 258static uint8_t parse_filter(const char *filterstr)
62f44022 259{
d62a17ae 260 int i = 0;
261 int filter = 0;
262
263 while (filterstr[i] != '\0') {
264 switch (filterstr[i]) {
265 case 'r':
266 case 'R':
267 filter |= (1 << THREAD_READ);
268 break;
269 case 'w':
270 case 'W':
271 filter |= (1 << THREAD_WRITE);
272 break;
273 case 't':
274 case 'T':
275 filter |= (1 << THREAD_TIMER);
276 break;
277 case 'e':
278 case 'E':
279 filter |= (1 << THREAD_EVENT);
280 break;
281 case 'x':
282 case 'X':
283 filter |= (1 << THREAD_EXECUTE);
284 break;
285 default:
286 break;
287 }
288 ++i;
289 }
290 return filter;
62f44022
QY
291}
292
f75e802d 293#ifndef EXCLUDE_CPU_TIME
62f44022
QY
294DEFUN (show_thread_cpu,
295 show_thread_cpu_cmd,
296 "show thread cpu [FILTER]",
297 SHOW_STR
298 "Thread information\n"
299 "Thread CPU usage\n"
61fa0b97 300 "Display filter (rwtex)\n")
62f44022 301{
fbcac826 302 uint8_t filter = (uint8_t)-1U;
d62a17ae 303 int idx = 0;
304
305 if (argv_find(argv, argc, "FILTER", &idx)) {
306 filter = parse_filter(argv[idx]->arg);
307 if (!filter) {
308 vty_out(vty,
3efd0893 309 "Invalid filter \"%s\" specified; must contain at leastone of 'RWTEXB'\n",
d62a17ae 310 argv[idx]->arg);
311 return CMD_WARNING;
312 }
313 }
314
315 cpu_record_print(vty, filter);
316 return CMD_SUCCESS;
e276eb82 317}
f75e802d 318#endif
e276eb82 319
8872626b
DS
320static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
321{
322 const char *name = m->name ? m->name : "main";
323 char underline[strlen(name) + 1];
a0b36ae6 324 struct thread *thread;
8872626b
DS
325 uint32_t i;
326
327 memset(underline, '-', sizeof(underline));
328 underline[sizeof(underline) - 1] = '\0';
329
330 vty_out(vty, "\nShowing poll FD's for %s\n", name);
331 vty_out(vty, "----------------------%s\n", underline);
6c19478a
DS
332 vty_out(vty, "Count: %u/%d\n", (uint32_t)m->handler.pfdcount,
333 m->fd_limit);
a0b36ae6
DS
334 for (i = 0; i < m->handler.pfdcount; i++) {
335 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\t\t", i,
336 m->handler.pfds[i].fd, m->handler.pfds[i].events,
8872626b 337 m->handler.pfds[i].revents);
a0b36ae6
DS
338
339 if (m->handler.pfds[i].events & POLLIN) {
340 thread = m->read[m->handler.pfds[i].fd];
341
342 if (!thread)
343 vty_out(vty, "ERROR ");
344 else
60a3efec 345 vty_out(vty, "%s ", thread->xref->funcname);
a0b36ae6
DS
346 } else
347 vty_out(vty, " ");
348
349 if (m->handler.pfds[i].events & POLLOUT) {
350 thread = m->write[m->handler.pfds[i].fd];
351
352 if (!thread)
353 vty_out(vty, "ERROR\n");
354 else
60a3efec 355 vty_out(vty, "%s\n", thread->xref->funcname);
a0b36ae6
DS
356 } else
357 vty_out(vty, "\n");
358 }
8872626b
DS
359}
360
361DEFUN (show_thread_poll,
362 show_thread_poll_cmd,
363 "show thread poll",
364 SHOW_STR
365 "Thread information\n"
366 "Show poll FD's and information\n")
367{
368 struct listnode *node;
369 struct thread_master *m;
370
00dffa8c 371 frr_with_mutex(&masters_mtx) {
8872626b
DS
372 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
373 show_thread_poll_helper(vty, m);
374 }
375 }
8872626b
DS
376
377 return CMD_SUCCESS;
378}
379
380
49d41a26
DS
381DEFUN (clear_thread_cpu,
382 clear_thread_cpu_cmd,
383 "clear thread cpu [FILTER]",
62f44022 384 "Clear stored data in all pthreads\n"
49d41a26
DS
385 "Thread information\n"
386 "Thread CPU usage\n"
387 "Display filter (rwtexb)\n")
e276eb82 388{
fbcac826 389 uint8_t filter = (uint8_t)-1U;
d62a17ae 390 int idx = 0;
391
392 if (argv_find(argv, argc, "FILTER", &idx)) {
393 filter = parse_filter(argv[idx]->arg);
394 if (!filter) {
395 vty_out(vty,
3efd0893 396 "Invalid filter \"%s\" specified; must contain at leastone of 'RWTEXB'\n",
d62a17ae 397 argv[idx]->arg);
398 return CMD_WARNING;
399 }
400 }
401
402 cpu_record_clear(filter);
403 return CMD_SUCCESS;
e276eb82 404}
6b0655a2 405
d62a17ae 406void thread_cmd_init(void)
0b84f294 407{
f75e802d 408#ifndef EXCLUDE_CPU_TIME
d62a17ae 409 install_element(VIEW_NODE, &show_thread_cpu_cmd);
f75e802d 410#endif
8872626b 411 install_element(VIEW_NODE, &show_thread_poll_cmd);
d62a17ae 412 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
0b84f294 413}
62f44022
QY
414/* CLI end ------------------------------------------------------------------ */
415
0b84f294 416
d62a17ae 417static void cancelreq_del(void *cr)
63ccb9cb 418{
d62a17ae 419 XFREE(MTYPE_TMP, cr);
63ccb9cb
QY
420}
421
e0bebc7c 422/* initializer, only ever called once */
4d762f26 423static void initializer(void)
e0bebc7c 424{
d62a17ae 425 pthread_key_create(&thread_current, NULL);
e0bebc7c
QY
426}
427
d62a17ae 428struct thread_master *thread_master_create(const char *name)
718e3744 429{
d62a17ae 430 struct thread_master *rv;
431 struct rlimit limit;
432
433 pthread_once(&init_once, &initializer);
434
435 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
d62a17ae 436
437 /* Initialize master mutex */
438 pthread_mutex_init(&rv->mtx, NULL);
439 pthread_cond_init(&rv->cancel_cond, NULL);
440
441 /* Set name */
7ffcd8bd
QY
442 name = name ? name : "default";
443 rv->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
d62a17ae 444
445 /* Initialize I/O task data structures */
1a9f340b
MS
446
447 /* Use configured limit if present, ulimit otherwise. */
448 rv->fd_limit = frr_get_fd_limit();
449 if (rv->fd_limit == 0) {
450 getrlimit(RLIMIT_NOFILE, &limit);
451 rv->fd_limit = (int)limit.rlim_cur;
452 }
453
a6f235f3
DS
454 rv->read = XCALLOC(MTYPE_THREAD_POLL,
455 sizeof(struct thread *) * rv->fd_limit);
456
457 rv->write = XCALLOC(MTYPE_THREAD_POLL,
458 sizeof(struct thread *) * rv->fd_limit);
d62a17ae 459
7ffcd8bd
QY
460 char tmhashname[strlen(name) + 32];
461 snprintf(tmhashname, sizeof(tmhashname), "%s - threadmaster event hash",
462 name);
bd74dc61 463 rv->cpu_record = hash_create_size(
d8b87afe 464 8, (unsigned int (*)(const void *))cpu_record_hash_key,
74df8d6d 465 (bool (*)(const void *, const void *))cpu_record_hash_cmp,
7ffcd8bd 466 tmhashname);
d62a17ae 467
c284542b
DL
468 thread_list_init(&rv->event);
469 thread_list_init(&rv->ready);
470 thread_list_init(&rv->unuse);
27d29ced 471 thread_timer_list_init(&rv->timer);
d62a17ae 472
473 /* Initialize thread_fetch() settings */
474 rv->spin = true;
475 rv->handle_signals = true;
476
477 /* Set pthread owner, should be updated by actual owner */
478 rv->owner = pthread_self();
479 rv->cancel_req = list_new();
480 rv->cancel_req->del = cancelreq_del;
481 rv->canceled = true;
482
483 /* Initialize pipe poker */
484 pipe(rv->io_pipe);
485 set_nonblocking(rv->io_pipe[0]);
486 set_nonblocking(rv->io_pipe[1]);
487
488 /* Initialize data structures for poll() */
489 rv->handler.pfdsize = rv->fd_limit;
490 rv->handler.pfdcount = 0;
491 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
492 sizeof(struct pollfd) * rv->handler.pfdsize);
493 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
494 sizeof(struct pollfd) * rv->handler.pfdsize);
495
eff09c66 496 /* add to list of threadmasters */
00dffa8c 497 frr_with_mutex(&masters_mtx) {
eff09c66
QY
498 if (!masters)
499 masters = list_new();
500
d62a17ae 501 listnode_add(masters, rv);
502 }
d62a17ae 503
504 return rv;
718e3744 505}
506
d8a8a8de
QY
507void thread_master_set_name(struct thread_master *master, const char *name)
508{
00dffa8c 509 frr_with_mutex(&master->mtx) {
0a22ddfb 510 XFREE(MTYPE_THREAD_MASTER, master->name);
d8a8a8de
QY
511 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
512 }
d8a8a8de
QY
513}
514
6ed04aa2
DS
515#define THREAD_UNUSED_DEPTH 10
516
718e3744 517/* Move thread to unuse list. */
d62a17ae 518static void thread_add_unuse(struct thread_master *m, struct thread *thread)
718e3744 519{
6655966d
RZ
520 pthread_mutex_t mtxc = thread->mtx;
521
d62a17ae 522 assert(m != NULL && thread != NULL);
d62a17ae 523
d62a17ae 524 thread->hist->total_active--;
6ed04aa2
DS
525 memset(thread, 0, sizeof(struct thread));
526 thread->type = THREAD_UNUSED;
527
6655966d
RZ
528 /* Restore the thread mutex context. */
529 thread->mtx = mtxc;
530
c284542b
DL
531 if (thread_list_count(&m->unuse) < THREAD_UNUSED_DEPTH) {
532 thread_list_add_tail(&m->unuse, thread);
6655966d
RZ
533 return;
534 }
535
536 thread_free(m, thread);
718e3744 537}
538
539/* Free all unused thread. */
c284542b
DL
540static void thread_list_free(struct thread_master *m,
541 struct thread_list_head *list)
718e3744 542{
d62a17ae 543 struct thread *t;
d62a17ae 544
c284542b 545 while ((t = thread_list_pop(list)))
6655966d 546 thread_free(m, t);
718e3744 547}
548
d62a17ae 549static void thread_array_free(struct thread_master *m,
550 struct thread **thread_array)
308d14ae 551{
d62a17ae 552 struct thread *t;
553 int index;
554
555 for (index = 0; index < m->fd_limit; ++index) {
556 t = thread_array[index];
557 if (t) {
558 thread_array[index] = NULL;
6655966d 559 thread_free(m, t);
d62a17ae 560 }
561 }
a6f235f3 562 XFREE(MTYPE_THREAD_POLL, thread_array);
308d14ae
DV
563}
564
495f0b13
DS
565/*
566 * thread_master_free_unused
567 *
568 * As threads are finished with they are put on the
569 * unuse list for later reuse.
570 * If we are shutting down, Free up unused threads
571 * So we can see if we forget to shut anything off
572 */
d62a17ae 573void thread_master_free_unused(struct thread_master *m)
495f0b13 574{
00dffa8c 575 frr_with_mutex(&m->mtx) {
d62a17ae 576 struct thread *t;
c284542b 577 while ((t = thread_list_pop(&m->unuse)))
6655966d 578 thread_free(m, t);
d62a17ae 579 }
495f0b13
DS
580}
581
718e3744 582/* Stop thread scheduler. */
d62a17ae 583void thread_master_free(struct thread_master *m)
718e3744 584{
27d29ced
DL
585 struct thread *t;
586
00dffa8c 587 frr_with_mutex(&masters_mtx) {
d62a17ae 588 listnode_delete(masters, m);
eff09c66 589 if (masters->count == 0) {
6a154c88 590 list_delete(&masters);
eff09c66 591 }
d62a17ae 592 }
d62a17ae 593
594 thread_array_free(m, m->read);
595 thread_array_free(m, m->write);
27d29ced
DL
596 while ((t = thread_timer_list_pop(&m->timer)))
597 thread_free(m, t);
d62a17ae 598 thread_list_free(m, &m->event);
599 thread_list_free(m, &m->ready);
600 thread_list_free(m, &m->unuse);
601 pthread_mutex_destroy(&m->mtx);
33844bbe 602 pthread_cond_destroy(&m->cancel_cond);
d62a17ae 603 close(m->io_pipe[0]);
604 close(m->io_pipe[1]);
6a154c88 605 list_delete(&m->cancel_req);
1a0a92ea 606 m->cancel_req = NULL;
d62a17ae 607
608 hash_clean(m->cpu_record, cpu_record_hash_free);
609 hash_free(m->cpu_record);
610 m->cpu_record = NULL;
611
0a22ddfb 612 XFREE(MTYPE_THREAD_MASTER, m->name);
d62a17ae 613 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
614 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
615 XFREE(MTYPE_THREAD_MASTER, m);
718e3744 616}
617
78ca0342
CF
618/* Return remain time in miliseconds. */
619unsigned long thread_timer_remain_msec(struct thread *thread)
718e3744 620{
d62a17ae 621 int64_t remain;
1189d95f 622
00dffa8c 623 frr_with_mutex(&thread->mtx) {
78ca0342 624 remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
d62a17ae 625 }
1189d95f 626
d62a17ae 627 return remain < 0 ? 0 : remain;
718e3744 628}
629
78ca0342
CF
630/* Return remain time in seconds. */
631unsigned long thread_timer_remain_second(struct thread *thread)
632{
633 return thread_timer_remain_msec(thread) / 1000LL;
634}
635
d62a17ae 636struct timeval thread_timer_remain(struct thread *thread)
6ac44687 637{
d62a17ae 638 struct timeval remain;
00dffa8c 639 frr_with_mutex(&thread->mtx) {
d62a17ae 640 monotime_until(&thread->u.sands, &remain);
641 }
d62a17ae 642 return remain;
6ac44687
CF
643}
644
0447957e
AK
645static int time_hhmmss(char *buf, int buf_size, long sec)
646{
647 long hh;
648 long mm;
649 int wr;
650
651 zassert(buf_size >= 8);
652
653 hh = sec / 3600;
654 sec %= 3600;
655 mm = sec / 60;
656 sec %= 60;
657
658 wr = snprintf(buf, buf_size, "%02ld:%02ld:%02ld", hh, mm, sec);
659
660 return wr != 8;
661}
662
663char *thread_timer_to_hhmmss(char *buf, int buf_size,
664 struct thread *t_timer)
665{
666 if (t_timer) {
667 time_hhmmss(buf, buf_size,
668 thread_timer_remain_second(t_timer));
669 } else {
670 snprintf(buf, buf_size, "--:--:--");
671 }
672 return buf;
673}
674
718e3744 675/* Get new thread. */
d7c0a89a 676static struct thread *thread_get(struct thread_master *m, uint8_t type,
d62a17ae 677 int (*func)(struct thread *), void *arg,
60a3efec 678 const struct xref_threadsched *xref)
718e3744 679{
c284542b 680 struct thread *thread = thread_list_pop(&m->unuse);
d62a17ae 681 struct cpu_thread_history tmp;
682
683 if (!thread) {
684 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
685 /* mutex only needs to be initialized at struct creation. */
686 pthread_mutex_init(&thread->mtx, NULL);
687 m->alloc++;
688 }
689
690 thread->type = type;
691 thread->add_type = type;
692 thread->master = m;
693 thread->arg = arg;
d62a17ae 694 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
695 thread->ref = NULL;
696
697 /*
698 * So if the passed in funcname is not what we have
699 * stored that means the thread->hist needs to be
700 * updated. We keep the last one around in unused
701 * under the assumption that we are probably
702 * going to immediately allocate the same
703 * type of thread.
704 * This hopefully saves us some serious
705 * hash_get lookups.
706 */
60a3efec
DL
707 if ((thread->xref && thread->xref->funcname != xref->funcname)
708 || thread->func != func) {
d62a17ae 709 tmp.func = func;
60a3efec 710 tmp.funcname = xref->funcname;
d62a17ae 711 thread->hist =
712 hash_get(m->cpu_record, &tmp,
713 (void *(*)(void *))cpu_record_hash_alloc);
714 }
715 thread->hist->total_active++;
716 thread->func = func;
60a3efec 717 thread->xref = xref;
d62a17ae 718
719 return thread;
718e3744 720}
721
6655966d
RZ
722static void thread_free(struct thread_master *master, struct thread *thread)
723{
724 /* Update statistics. */
725 assert(master->alloc > 0);
726 master->alloc--;
727
728 /* Free allocated resources. */
729 pthread_mutex_destroy(&thread->mtx);
730 XFREE(MTYPE_THREAD, thread);
731}
732
d81ca9a3
MS
733static int fd_poll(struct thread_master *m, const struct timeval *timer_wait,
734 bool *eintr_p)
209a72a6 735{
d81ca9a3
MS
736 sigset_t origsigs;
737 unsigned char trash[64];
738 nfds_t count = m->handler.copycount;
739
d279ef57
DS
740 /*
741 * If timer_wait is null here, that means poll() should block
742 * indefinitely, unless the thread_master has overridden it by setting
d62a17ae 743 * ->selectpoll_timeout.
d279ef57 744 *
d62a17ae 745 * If the value is positive, it specifies the maximum number of
d279ef57
DS
746 * milliseconds to wait. If the timeout is -1, it specifies that
747 * we should never wait and always return immediately even if no
748 * event is detected. If the value is zero, the behavior is default.
749 */
d62a17ae 750 int timeout = -1;
751
752 /* number of file descriptors with events */
753 int num;
754
755 if (timer_wait != NULL
756 && m->selectpoll_timeout == 0) // use the default value
757 timeout = (timer_wait->tv_sec * 1000)
758 + (timer_wait->tv_usec / 1000);
759 else if (m->selectpoll_timeout > 0) // use the user's timeout
760 timeout = m->selectpoll_timeout;
761 else if (m->selectpoll_timeout
762 < 0) // effect a poll (return immediately)
763 timeout = 0;
764
0bdeb5e5 765 zlog_tls_buffer_flush();
3e41733f
DL
766 rcu_read_unlock();
767 rcu_assert_read_unlocked();
768
d62a17ae 769 /* add poll pipe poker */
d81ca9a3
MS
770 assert(count + 1 < m->handler.pfdsize);
771 m->handler.copy[count].fd = m->io_pipe[0];
772 m->handler.copy[count].events = POLLIN;
773 m->handler.copy[count].revents = 0x00;
774
775 /* We need to deal with a signal-handling race here: we
776 * don't want to miss a crucial signal, such as SIGTERM or SIGINT,
777 * that may arrive just before we enter poll(). We will block the
778 * key signals, then check whether any have arrived - if so, we return
779 * before calling poll(). If not, we'll re-enable the signals
780 * in the ppoll() call.
781 */
782
783 sigemptyset(&origsigs);
784 if (m->handle_signals) {
785 /* Main pthread that handles the app signals */
786 if (frr_sigevent_check(&origsigs)) {
787 /* Signal to process - restore signal mask and return */
788 pthread_sigmask(SIG_SETMASK, &origsigs, NULL);
789 num = -1;
790 *eintr_p = true;
791 goto done;
792 }
793 } else {
794 /* Don't make any changes for the non-main pthreads */
795 pthread_sigmask(SIG_SETMASK, NULL, &origsigs);
796 }
d62a17ae 797
d81ca9a3
MS
798#if defined(HAVE_PPOLL)
799 struct timespec ts, *tsp;
800
801 if (timeout >= 0) {
802 ts.tv_sec = timeout / 1000;
803 ts.tv_nsec = (timeout % 1000) * 1000000;
804 tsp = &ts;
805 } else
806 tsp = NULL;
807
808 num = ppoll(m->handler.copy, count + 1, tsp, &origsigs);
809 pthread_sigmask(SIG_SETMASK, &origsigs, NULL);
810#else
811 /* Not ideal - there is a race after we restore the signal mask */
812 pthread_sigmask(SIG_SETMASK, &origsigs, NULL);
813 num = poll(m->handler.copy, count + 1, timeout);
814#endif
d62a17ae 815
d81ca9a3
MS
816done:
817
818 if (num < 0 && errno == EINTR)
819 *eintr_p = true;
820
821 if (num > 0 && m->handler.copy[count].revents != 0 && num--)
d62a17ae 822 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
823 ;
824
3e41733f
DL
825 rcu_read_lock();
826
d62a17ae 827 return num;
209a72a6
DS
828}
829
718e3744 830/* Add new read thread. */
60a3efec
DL
831struct thread *_thread_add_read_write(const struct xref_threadsched *xref,
832 struct thread_master *m,
833 int (*func)(struct thread *),
834 void *arg, int fd, struct thread **t_ptr)
718e3744 835{
60a3efec 836 int dir = xref->thread_type;
d62a17ae 837 struct thread *thread = NULL;
1ef14bee 838 struct thread **thread_array;
d62a17ae 839
abf96a87 840 if (dir == THREAD_READ)
6c3aa850
DL
841 frrtrace(9, frr_libfrr, schedule_read, m,
842 xref->funcname, xref->xref.file, xref->xref.line,
843 t_ptr, fd, 0, arg, 0);
abf96a87 844 else
6c3aa850
DL
845 frrtrace(9, frr_libfrr, schedule_write, m,
846 xref->funcname, xref->xref.file, xref->xref.line,
847 t_ptr, fd, 0, arg, 0);
abf96a87 848
9b864cd3 849 assert(fd >= 0 && fd < m->fd_limit);
00dffa8c
DL
850 frr_with_mutex(&m->mtx) {
851 if (t_ptr && *t_ptr)
852 // thread is already scheduled; don't reschedule
853 break;
d62a17ae 854
855 /* default to a new pollfd */
856 nfds_t queuepos = m->handler.pfdcount;
857
1ef14bee
DS
858 if (dir == THREAD_READ)
859 thread_array = m->read;
860 else
861 thread_array = m->write;
862
d62a17ae 863 /* if we already have a pollfd for our file descriptor, find and
864 * use it */
865 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
866 if (m->handler.pfds[i].fd == fd) {
867 queuepos = i;
1ef14bee
DS
868
869#ifdef DEV_BUILD
870 /*
871 * What happens if we have a thread already
872 * created for this event?
873 */
874 if (thread_array[fd])
875 assert(!"Thread already scheduled for file descriptor");
876#endif
d62a17ae 877 break;
878 }
879
880 /* make sure we have room for this fd + pipe poker fd */
881 assert(queuepos + 1 < m->handler.pfdsize);
882
60a3efec 883 thread = thread_get(m, dir, func, arg, xref);
d62a17ae 884
885 m->handler.pfds[queuepos].fd = fd;
886 m->handler.pfds[queuepos].events |=
887 (dir == THREAD_READ ? POLLIN : POLLOUT);
888
889 if (queuepos == m->handler.pfdcount)
890 m->handler.pfdcount++;
891
892 if (thread) {
00dffa8c 893 frr_with_mutex(&thread->mtx) {
d62a17ae 894 thread->u.fd = fd;
1ef14bee 895 thread_array[thread->u.fd] = thread;
d62a17ae 896 }
d62a17ae 897
898 if (t_ptr) {
899 *t_ptr = thread;
900 thread->ref = t_ptr;
901 }
902 }
903
904 AWAKEN(m);
905 }
d62a17ae 906
907 return thread;
718e3744 908}
909
56a94b36 910static struct thread *
60a3efec
DL
911_thread_add_timer_timeval(const struct xref_threadsched *xref,
912 struct thread_master *m, int (*func)(struct thread *),
913 int type, void *arg, struct timeval *time_relative,
914 struct thread **t_ptr)
718e3744 915{
d62a17ae 916 struct thread *thread;
d62a17ae 917
918 assert(m != NULL);
919
920 assert(type == THREAD_TIMER);
921 assert(time_relative);
922
6c3aa850
DL
923 frrtrace(9, frr_libfrr, schedule_timer, m,
924 xref->funcname, xref->xref.file, xref->xref.line,
c7bb4f00 925 t_ptr, 0, 0, arg, (long)time_relative->tv_sec);
abf96a87 926
00dffa8c
DL
927 frr_with_mutex(&m->mtx) {
928 if (t_ptr && *t_ptr)
d279ef57 929 /* thread is already scheduled; don't reschedule */
d62a17ae 930 return NULL;
d62a17ae 931
60a3efec 932 thread = thread_get(m, type, func, arg, xref);
d62a17ae 933
00dffa8c 934 frr_with_mutex(&thread->mtx) {
d62a17ae 935 monotime(&thread->u.sands);
936 timeradd(&thread->u.sands, time_relative,
937 &thread->u.sands);
27d29ced 938 thread_timer_list_add(&m->timer, thread);
d62a17ae 939 if (t_ptr) {
940 *t_ptr = thread;
941 thread->ref = t_ptr;
942 }
943 }
d62a17ae 944
945 AWAKEN(m);
946 }
d62a17ae 947
948 return thread;
9e867fe6 949}
950
98c91ac6 951
952/* Add timer event thread. */
60a3efec
DL
953struct thread *_thread_add_timer(const struct xref_threadsched *xref,
954 struct thread_master *m,
955 int (*func)(struct thread *),
956 void *arg, long timer, struct thread **t_ptr)
9e867fe6 957{
d62a17ae 958 struct timeval trel;
9e867fe6 959
d62a17ae 960 assert(m != NULL);
9e867fe6 961
d62a17ae 962 trel.tv_sec = timer;
963 trel.tv_usec = 0;
9e867fe6 964
60a3efec
DL
965 return _thread_add_timer_timeval(xref, m, func, THREAD_TIMER, arg,
966 &trel, t_ptr);
98c91ac6 967}
9e867fe6 968
98c91ac6 969/* Add timer event thread with "millisecond" resolution */
60a3efec
DL
970struct thread *_thread_add_timer_msec(const struct xref_threadsched *xref,
971 struct thread_master *m,
972 int (*func)(struct thread *),
973 void *arg, long timer,
974 struct thread **t_ptr)
98c91ac6 975{
d62a17ae 976 struct timeval trel;
9e867fe6 977
d62a17ae 978 assert(m != NULL);
718e3744 979
d62a17ae 980 trel.tv_sec = timer / 1000;
981 trel.tv_usec = 1000 * (timer % 1000);
98c91ac6 982
60a3efec
DL
983 return _thread_add_timer_timeval(xref, m, func, THREAD_TIMER, arg,
984 &trel, t_ptr);
a48b4e6d 985}
986
d03c4cbd 987/* Add timer event thread with "millisecond" resolution */
60a3efec
DL
988struct thread *_thread_add_timer_tv(const struct xref_threadsched *xref,
989 struct thread_master *m,
990 int (*func)(struct thread *),
991 void *arg, struct timeval *tv,
992 struct thread **t_ptr)
d03c4cbd 993{
60a3efec
DL
994 return _thread_add_timer_timeval(xref, m, func, THREAD_TIMER, arg, tv,
995 t_ptr);
d03c4cbd
DL
996}
997
718e3744 998/* Add simple event thread. */
60a3efec
DL
999struct thread *_thread_add_event(const struct xref_threadsched *xref,
1000 struct thread_master *m,
1001 int (*func)(struct thread *),
1002 void *arg, int val, struct thread **t_ptr)
718e3744 1003{
00dffa8c 1004 struct thread *thread = NULL;
d62a17ae 1005
6c3aa850
DL
1006 frrtrace(9, frr_libfrr, schedule_event, m,
1007 xref->funcname, xref->xref.file, xref->xref.line,
c7bb4f00 1008 t_ptr, 0, val, arg, 0);
abf96a87 1009
d62a17ae 1010 assert(m != NULL);
1011
00dffa8c
DL
1012 frr_with_mutex(&m->mtx) {
1013 if (t_ptr && *t_ptr)
d279ef57 1014 /* thread is already scheduled; don't reschedule */
00dffa8c 1015 break;
d62a17ae 1016
60a3efec 1017 thread = thread_get(m, THREAD_EVENT, func, arg, xref);
00dffa8c 1018 frr_with_mutex(&thread->mtx) {
d62a17ae 1019 thread->u.val = val;
c284542b 1020 thread_list_add_tail(&m->event, thread);
d62a17ae 1021 }
d62a17ae 1022
1023 if (t_ptr) {
1024 *t_ptr = thread;
1025 thread->ref = t_ptr;
1026 }
1027
1028 AWAKEN(m);
1029 }
d62a17ae 1030
1031 return thread;
718e3744 1032}
1033
63ccb9cb
QY
1034/* Thread cancellation ------------------------------------------------------ */
1035
8797240e
QY
1036/**
1037 * NOT's out the .events field of pollfd corresponding to the given file
1038 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
1039 *
1040 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
1041 * implementation for details.
1042 *
1043 * @param master
1044 * @param fd
1045 * @param state the event to cancel. One or more (OR'd together) of the
1046 * following:
1047 * - POLLIN
1048 * - POLLOUT
1049 */
d62a17ae 1050static void thread_cancel_rw(struct thread_master *master, int fd, short state)
0a95a0d0 1051{
42d74538
QY
1052 bool found = false;
1053
d62a17ae 1054 /* Cancel POLLHUP too just in case some bozo set it */
1055 state |= POLLHUP;
1056
1057 /* find the index of corresponding pollfd */
1058 nfds_t i;
1059
1060 for (i = 0; i < master->handler.pfdcount; i++)
42d74538
QY
1061 if (master->handler.pfds[i].fd == fd) {
1062 found = true;
d62a17ae 1063 break;
42d74538
QY
1064 }
1065
1066 if (!found) {
1067 zlog_debug(
1068 "[!] Received cancellation request for nonexistent rw job");
1069 zlog_debug("[!] threadmaster: %s | fd: %d",
996c9314 1070 master->name ? master->name : "", fd);
42d74538
QY
1071 return;
1072 }
d62a17ae 1073
1074 /* NOT out event. */
1075 master->handler.pfds[i].events &= ~(state);
1076
1077 /* If all events are canceled, delete / resize the pollfd array. */
1078 if (master->handler.pfds[i].events == 0) {
1079 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
1080 (master->handler.pfdcount - i - 1)
1081 * sizeof(struct pollfd));
1082 master->handler.pfdcount--;
e985cda0
S
1083 master->handler.pfds[master->handler.pfdcount].fd = 0;
1084 master->handler.pfds[master->handler.pfdcount].events = 0;
d62a17ae 1085 }
1086
1087 /* If we have the same pollfd in the copy, perform the same operations,
1088 * otherwise return. */
1089 if (i >= master->handler.copycount)
1090 return;
1091
1092 master->handler.copy[i].events &= ~(state);
1093
1094 if (master->handler.copy[i].events == 0) {
1095 memmove(master->handler.copy + i, master->handler.copy + i + 1,
1096 (master->handler.copycount - i - 1)
1097 * sizeof(struct pollfd));
1098 master->handler.copycount--;
e985cda0
S
1099 master->handler.copy[master->handler.copycount].fd = 0;
1100 master->handler.copy[master->handler.copycount].events = 0;
d62a17ae 1101 }
0a95a0d0
DS
1102}
1103
1189d95f 1104/**
63ccb9cb 1105 * Process cancellation requests.
1189d95f 1106 *
63ccb9cb
QY
1107 * This may only be run from the pthread which owns the thread_master.
1108 *
1109 * @param master the thread master to process
1110 * @REQUIRE master->mtx
1189d95f 1111 */
d62a17ae 1112static void do_thread_cancel(struct thread_master *master)
718e3744 1113{
c284542b 1114 struct thread_list_head *list = NULL;
d62a17ae 1115 struct thread **thread_array = NULL;
1116 struct thread *thread;
1117
1118 struct cancel_req *cr;
1119 struct listnode *ln;
1120 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
d279ef57
DS
1121 /*
1122 * If this is an event object cancellation, linear search
1123 * through event list deleting any events which have the
1124 * specified argument. We also need to check every thread
1125 * in the ready queue.
1126 */
d62a17ae 1127 if (cr->eventobj) {
1128 struct thread *t;
c284542b 1129
81fddbe7 1130 frr_each_safe(thread_list, &master->event, t) {
c284542b
DL
1131 if (t->arg != cr->eventobj)
1132 continue;
1133 thread_list_del(&master->event, t);
1134 if (t->ref)
1135 *t->ref = NULL;
1136 thread_add_unuse(master, t);
d62a17ae 1137 }
1138
81fddbe7 1139 frr_each_safe(thread_list, &master->ready, t) {
c284542b
DL
1140 if (t->arg != cr->eventobj)
1141 continue;
1142 thread_list_del(&master->ready, t);
1143 if (t->ref)
1144 *t->ref = NULL;
1145 thread_add_unuse(master, t);
d62a17ae 1146 }
1147 continue;
1148 }
1149
d279ef57
DS
1150 /*
1151 * The pointer varies depending on whether the cancellation
1152 * request was made asynchronously or not. If it was, we
1153 * need to check whether the thread even exists anymore
1154 * before cancelling it.
1155 */
d62a17ae 1156 thread = (cr->thread) ? cr->thread : *cr->threadref;
1157
1158 if (!thread)
1159 continue;
1160
1161 /* Determine the appropriate queue to cancel the thread from */
1162 switch (thread->type) {
1163 case THREAD_READ:
1164 thread_cancel_rw(master, thread->u.fd, POLLIN);
1165 thread_array = master->read;
1166 break;
1167 case THREAD_WRITE:
1168 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1169 thread_array = master->write;
1170 break;
1171 case THREAD_TIMER:
27d29ced 1172 thread_timer_list_del(&master->timer, thread);
d62a17ae 1173 break;
1174 case THREAD_EVENT:
1175 list = &master->event;
1176 break;
1177 case THREAD_READY:
1178 list = &master->ready;
1179 break;
1180 default:
1181 continue;
1182 break;
1183 }
1184
27d29ced 1185 if (list) {
c284542b 1186 thread_list_del(list, thread);
d62a17ae 1187 } else if (thread_array) {
1188 thread_array[thread->u.fd] = NULL;
d62a17ae 1189 }
1190
1191 if (thread->ref)
1192 *thread->ref = NULL;
1193
1194 thread_add_unuse(thread->master, thread);
1195 }
1196
1197 /* Delete and free all cancellation requests */
41b21bfa
MS
1198 if (master->cancel_req)
1199 list_delete_all_node(master->cancel_req);
d62a17ae 1200
1201 /* Wake up any threads which may be blocked in thread_cancel_async() */
1202 master->canceled = true;
1203 pthread_cond_broadcast(&master->cancel_cond);
718e3744 1204}
1205
63ccb9cb
QY
1206/**
1207 * Cancel any events which have the specified argument.
1208 *
1209 * MT-Unsafe
1210 *
1211 * @param m the thread_master to cancel from
1212 * @param arg the argument passed when creating the event
1213 */
d62a17ae 1214void thread_cancel_event(struct thread_master *master, void *arg)
718e3744 1215{
d62a17ae 1216 assert(master->owner == pthread_self());
1217
00dffa8c 1218 frr_with_mutex(&master->mtx) {
d62a17ae 1219 struct cancel_req *cr =
1220 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1221 cr->eventobj = arg;
1222 listnode_add(master->cancel_req, cr);
1223 do_thread_cancel(master);
1224 }
63ccb9cb 1225}
1189d95f 1226
63ccb9cb
QY
1227/**
1228 * Cancel a specific task.
1229 *
1230 * MT-Unsafe
1231 *
1232 * @param thread task to cancel
1233 */
b3d6bc6e 1234void thread_cancel(struct thread **thread)
63ccb9cb 1235{
b3d6bc6e
MS
1236 struct thread_master *master;
1237
1238 if (thread == NULL || *thread == NULL)
1239 return;
1240
1241 master = (*thread)->master;
d62a17ae 1242
6c3aa850
DL
1243 frrtrace(9, frr_libfrr, thread_cancel, master,
1244 (*thread)->xref->funcname, (*thread)->xref->xref.file,
1245 (*thread)->xref->xref.line, NULL, (*thread)->u.fd,
b4d6e855 1246 (*thread)->u.val, (*thread)->arg, (*thread)->u.sands.tv_sec);
abf96a87 1247
6ed04aa2
DS
1248 assert(master->owner == pthread_self());
1249
00dffa8c 1250 frr_with_mutex(&master->mtx) {
d62a17ae 1251 struct cancel_req *cr =
1252 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
b3d6bc6e 1253 cr->thread = *thread;
6ed04aa2
DS
1254 listnode_add(master->cancel_req, cr);
1255 do_thread_cancel(master);
d62a17ae 1256 }
b3d6bc6e
MS
1257
1258 *thread = NULL;
63ccb9cb 1259}
1189d95f 1260
63ccb9cb
QY
1261/**
1262 * Asynchronous cancellation.
1263 *
8797240e
QY
1264 * Called with either a struct thread ** or void * to an event argument,
1265 * this function posts the correct cancellation request and blocks until it is
1266 * serviced.
63ccb9cb
QY
1267 *
1268 * If the thread is currently running, execution blocks until it completes.
1269 *
8797240e
QY
1270 * The last two parameters are mutually exclusive, i.e. if you pass one the
1271 * other must be NULL.
1272 *
1273 * When the cancellation procedure executes on the target thread_master, the
1274 * thread * provided is checked for nullity. If it is null, the thread is
1275 * assumed to no longer exist and the cancellation request is a no-op. Thus
1276 * users of this API must pass a back-reference when scheduling the original
1277 * task.
1278 *
63ccb9cb
QY
1279 * MT-Safe
1280 *
8797240e
QY
1281 * @param master the thread master with the relevant event / task
1282 * @param thread pointer to thread to cancel
1283 * @param eventobj the event
63ccb9cb 1284 */
d62a17ae 1285void thread_cancel_async(struct thread_master *master, struct thread **thread,
1286 void *eventobj)
63ccb9cb 1287{
d62a17ae 1288 assert(!(thread && eventobj) && (thread || eventobj));
abf96a87
QY
1289
1290 if (thread && *thread)
c7bb4f00 1291 frrtrace(9, frr_libfrr, thread_cancel_async, master,
6c3aa850
DL
1292 (*thread)->xref->funcname, (*thread)->xref->xref.file,
1293 (*thread)->xref->xref.line, NULL, (*thread)->u.fd,
c7bb4f00
QY
1294 (*thread)->u.val, (*thread)->arg,
1295 (*thread)->u.sands.tv_sec);
abf96a87 1296 else
c7bb4f00
QY
1297 frrtrace(9, frr_libfrr, thread_cancel_async, master, NULL, NULL,
1298 0, NULL, 0, 0, eventobj, 0);
abf96a87 1299
d62a17ae 1300 assert(master->owner != pthread_self());
1301
00dffa8c 1302 frr_with_mutex(&master->mtx) {
d62a17ae 1303 master->canceled = false;
1304
1305 if (thread) {
1306 struct cancel_req *cr =
1307 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1308 cr->threadref = thread;
1309 listnode_add(master->cancel_req, cr);
1310 } else if (eventobj) {
1311 struct cancel_req *cr =
1312 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1313 cr->eventobj = eventobj;
1314 listnode_add(master->cancel_req, cr);
1315 }
1316 AWAKEN(master);
1317
1318 while (!master->canceled)
1319 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1320 }
50478845
MS
1321
1322 if (thread)
1323 *thread = NULL;
718e3744 1324}
63ccb9cb 1325/* ------------------------------------------------------------------------- */
718e3744 1326
27d29ced 1327static struct timeval *thread_timer_wait(struct thread_timer_list_head *timers,
d62a17ae 1328 struct timeval *timer_val)
718e3744 1329{
27d29ced
DL
1330 if (!thread_timer_list_count(timers))
1331 return NULL;
1332
1333 struct thread *next_timer = thread_timer_list_first(timers);
1334 monotime_until(&next_timer->u.sands, timer_val);
1335 return timer_val;
718e3744 1336}
718e3744 1337
d62a17ae 1338static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1339 struct thread *fetch)
718e3744 1340{
d62a17ae 1341 *fetch = *thread;
1342 thread_add_unuse(m, thread);
1343 return fetch;
718e3744 1344}
1345
d62a17ae 1346static int thread_process_io_helper(struct thread_master *m,
45f3d590
DS
1347 struct thread *thread, short state,
1348 short actual_state, int pos)
5d4ccd4e 1349{
d62a17ae 1350 struct thread **thread_array;
1351
45f3d590
DS
1352 /*
1353 * poll() clears the .events field, but the pollfd array we
1354 * pass to poll() is a copy of the one used to schedule threads.
1355 * We need to synchronize state between the two here by applying
1356 * the same changes poll() made on the copy of the "real" pollfd
1357 * array.
1358 *
1359 * This cleans up a possible infinite loop where we refuse
1360 * to respond to a poll event but poll is insistent that
1361 * we should.
1362 */
1363 m->handler.pfds[pos].events &= ~(state);
1364
1365 if (!thread) {
1366 if ((actual_state & (POLLHUP|POLLIN)) != POLLHUP)
1367 flog_err(EC_LIB_NO_THREAD,
1368 "Attempting to process an I/O event but for fd: %d(%d) no thread to handle this!\n",
1369 m->handler.pfds[pos].fd, actual_state);
d62a17ae 1370 return 0;
45f3d590 1371 }
d62a17ae 1372
1373 if (thread->type == THREAD_READ)
1374 thread_array = m->read;
1375 else
1376 thread_array = m->write;
1377
1378 thread_array[thread->u.fd] = NULL;
c284542b 1379 thread_list_add_tail(&m->ready, thread);
d62a17ae 1380 thread->type = THREAD_READY;
45f3d590 1381
d62a17ae 1382 return 1;
5d4ccd4e
DS
1383}
1384
8797240e
QY
1385/**
1386 * Process I/O events.
1387 *
1388 * Walks through file descriptor array looking for those pollfds whose .revents
1389 * field has something interesting. Deletes any invalid file descriptors.
1390 *
1391 * @param m the thread master
1392 * @param num the number of active file descriptors (return value of poll())
1393 */
d62a17ae 1394static void thread_process_io(struct thread_master *m, unsigned int num)
0a95a0d0 1395{
d62a17ae 1396 unsigned int ready = 0;
1397 struct pollfd *pfds = m->handler.copy;
1398
1399 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1400 /* no event for current fd? immediately continue */
1401 if (pfds[i].revents == 0)
1402 continue;
1403
1404 ready++;
1405
d279ef57
DS
1406 /*
1407 * Unless someone has called thread_cancel from another
1408 * pthread, the only thing that could have changed in
1409 * m->handler.pfds while we were asleep is the .events
1410 * field in a given pollfd. Barring thread_cancel() that
1411 * value should be a superset of the values we have in our
1412 * copy, so there's no need to update it. Similarily,
1413 * barring deletion, the fd should still be a valid index
1414 * into the master's pfds.
d142453d
DS
1415 *
1416 * We are including POLLERR here to do a READ event
1417 * this is because the read should fail and the
1418 * read function should handle it appropriately
d279ef57 1419 */
d142453d 1420 if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
d62a17ae 1421 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
45f3d590
DS
1422 pfds[i].revents, i);
1423 }
d62a17ae 1424 if (pfds[i].revents & POLLOUT)
1425 thread_process_io_helper(m, m->write[pfds[i].fd],
45f3d590 1426 POLLOUT, pfds[i].revents, i);
d62a17ae 1427
1428 /* if one of our file descriptors is garbage, remove the same
1429 * from
1430 * both pfds + update sizes and index */
1431 if (pfds[i].revents & POLLNVAL) {
1432 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1433 (m->handler.pfdcount - i - 1)
1434 * sizeof(struct pollfd));
1435 m->handler.pfdcount--;
e985cda0
S
1436 m->handler.pfds[m->handler.pfdcount].fd = 0;
1437 m->handler.pfds[m->handler.pfdcount].events = 0;
d62a17ae 1438
1439 memmove(pfds + i, pfds + i + 1,
1440 (m->handler.copycount - i - 1)
1441 * sizeof(struct pollfd));
1442 m->handler.copycount--;
e985cda0
S
1443 m->handler.copy[m->handler.copycount].fd = 0;
1444 m->handler.copy[m->handler.copycount].events = 0;
d62a17ae 1445
1446 i--;
1447 }
1448 }
718e3744 1449}
1450
8b70d0b0 1451/* Add all timers that have popped to the ready list. */
27d29ced 1452static unsigned int thread_process_timers(struct thread_timer_list_head *timers,
d62a17ae 1453 struct timeval *timenow)
a48b4e6d 1454{
d62a17ae 1455 struct thread *thread;
1456 unsigned int ready = 0;
1457
27d29ced 1458 while ((thread = thread_timer_list_first(timers))) {
d62a17ae 1459 if (timercmp(timenow, &thread->u.sands, <))
1460 return ready;
27d29ced 1461 thread_timer_list_pop(timers);
d62a17ae 1462 thread->type = THREAD_READY;
c284542b 1463 thread_list_add_tail(&thread->master->ready, thread);
d62a17ae 1464 ready++;
1465 }
1466 return ready;
a48b4e6d 1467}
1468
2613abe6 1469/* process a list en masse, e.g. for event thread lists */
c284542b 1470static unsigned int thread_process(struct thread_list_head *list)
2613abe6 1471{
d62a17ae 1472 struct thread *thread;
d62a17ae 1473 unsigned int ready = 0;
1474
c284542b 1475 while ((thread = thread_list_pop(list))) {
d62a17ae 1476 thread->type = THREAD_READY;
c284542b 1477 thread_list_add_tail(&thread->master->ready, thread);
d62a17ae 1478 ready++;
1479 }
1480 return ready;
2613abe6
PJ
1481}
1482
1483
718e3744 1484/* Fetch next ready thread. */
d62a17ae 1485struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
718e3744 1486{
d62a17ae 1487 struct thread *thread = NULL;
1488 struct timeval now;
1489 struct timeval zerotime = {0, 0};
1490 struct timeval tv;
1491 struct timeval *tw = NULL;
d81ca9a3 1492 bool eintr_p = false;
d62a17ae 1493 int num = 0;
1494
1495 do {
1496 /* Handle signals if any */
1497 if (m->handle_signals)
1498 quagga_sigevent_process();
1499
1500 pthread_mutex_lock(&m->mtx);
1501
1502 /* Process any pending cancellation requests */
1503 do_thread_cancel(m);
1504
e3c9529e
QY
1505 /*
1506 * Attempt to flush ready queue before going into poll().
1507 * This is performance-critical. Think twice before modifying.
1508 */
c284542b 1509 if ((thread = thread_list_pop(&m->ready))) {
e3c9529e
QY
1510 fetch = thread_run(m, thread, fetch);
1511 if (fetch->ref)
1512 *fetch->ref = NULL;
1513 pthread_mutex_unlock(&m->mtx);
1514 break;
1515 }
1516
1517 /* otherwise, tick through scheduling sequence */
1518
bca37d17
QY
1519 /*
1520 * Post events to ready queue. This must come before the
1521 * following block since events should occur immediately
1522 */
d62a17ae 1523 thread_process(&m->event);
1524
bca37d17
QY
1525 /*
1526 * If there are no tasks on the ready queue, we will poll()
1527 * until a timer expires or we receive I/O, whichever comes
1528 * first. The strategy for doing this is:
d62a17ae 1529 *
1530 * - If there are events pending, set the poll() timeout to zero
1531 * - If there are no events pending, but there are timers
d279ef57
DS
1532 * pending, set the timeout to the smallest remaining time on
1533 * any timer.
d62a17ae 1534 * - If there are neither timers nor events pending, but there
d279ef57 1535 * are file descriptors pending, block indefinitely in poll()
d62a17ae 1536 * - If nothing is pending, it's time for the application to die
1537 *
1538 * In every case except the last, we need to hit poll() at least
bca37d17
QY
1539 * once per loop to avoid starvation by events
1540 */
c284542b 1541 if (!thread_list_count(&m->ready))
27d29ced 1542 tw = thread_timer_wait(&m->timer, &tv);
d62a17ae 1543
c284542b
DL
1544 if (thread_list_count(&m->ready) ||
1545 (tw && !timercmp(tw, &zerotime, >)))
d62a17ae 1546 tw = &zerotime;
1547
1548 if (!tw && m->handler.pfdcount == 0) { /* die */
1549 pthread_mutex_unlock(&m->mtx);
1550 fetch = NULL;
1551 break;
1552 }
1553
bca37d17
QY
1554 /*
1555 * Copy pollfd array + # active pollfds in it. Not necessary to
1556 * copy the array size as this is fixed.
1557 */
d62a17ae 1558 m->handler.copycount = m->handler.pfdcount;
1559 memcpy(m->handler.copy, m->handler.pfds,
1560 m->handler.copycount * sizeof(struct pollfd));
1561
e3c9529e
QY
1562 pthread_mutex_unlock(&m->mtx);
1563 {
d81ca9a3
MS
1564 eintr_p = false;
1565 num = fd_poll(m, tw, &eintr_p);
e3c9529e
QY
1566 }
1567 pthread_mutex_lock(&m->mtx);
d764d2cc 1568
e3c9529e
QY
1569 /* Handle any errors received in poll() */
1570 if (num < 0) {
d81ca9a3 1571 if (eintr_p) {
d62a17ae 1572 pthread_mutex_unlock(&m->mtx);
e3c9529e
QY
1573 /* loop around to signal handler */
1574 continue;
d62a17ae 1575 }
1576
e3c9529e 1577 /* else die */
450971aa 1578 flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
9ef9495e 1579 safe_strerror(errno));
e3c9529e
QY
1580 pthread_mutex_unlock(&m->mtx);
1581 fetch = NULL;
1582 break;
bca37d17 1583 }
d62a17ae 1584
1585 /* Post timers to ready queue. */
1586 monotime(&now);
27d29ced 1587 thread_process_timers(&m->timer, &now);
d62a17ae 1588
1589 /* Post I/O to ready queue. */
1590 if (num > 0)
1591 thread_process_io(m, num);
1592
d62a17ae 1593 pthread_mutex_unlock(&m->mtx);
1594
1595 } while (!thread && m->spin);
1596
1597 return fetch;
718e3744 1598}
1599
d62a17ae 1600static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
62f44022 1601{
d62a17ae 1602 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1603 + (a.tv_usec - b.tv_usec));
62f44022
QY
1604}
1605
d62a17ae 1606unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1607 unsigned long *cputime)
718e3744 1608{
d62a17ae 1609 /* This is 'user + sys' time. */
1610 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1611 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1612 return timeval_elapsed(now->real, start->real);
8b70d0b0 1613}
1614
50596be0
DS
1615/* We should aim to yield after yield milliseconds, which defaults
1616 to THREAD_YIELD_TIME_SLOT .
8b70d0b0 1617 Note: we are using real (wall clock) time for this calculation.
1618 It could be argued that CPU time may make more sense in certain
1619 contexts. The things to consider are whether the thread may have
1620 blocked (in which case wall time increases, but CPU time does not),
1621 or whether the system is heavily loaded with other processes competing
d62a17ae 1622 for CPU time. On balance, wall clock time seems to make sense.
8b70d0b0 1623 Plus it has the added benefit that gettimeofday should be faster
1624 than calling getrusage. */
d62a17ae 1625int thread_should_yield(struct thread *thread)
718e3744 1626{
d62a17ae 1627 int result;
00dffa8c 1628 frr_with_mutex(&thread->mtx) {
d62a17ae 1629 result = monotime_since(&thread->real, NULL)
1630 > (int64_t)thread->yield;
1631 }
d62a17ae 1632 return result;
50596be0
DS
1633}
1634
d62a17ae 1635void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
50596be0 1636{
00dffa8c 1637 frr_with_mutex(&thread->mtx) {
d62a17ae 1638 thread->yield = yield_time;
1639 }
718e3744 1640}
1641
d62a17ae 1642void thread_getrusage(RUSAGE_T *r)
db9c0df9 1643{
231db9a6
DS
1644#if defined RUSAGE_THREAD
1645#define FRR_RUSAGE RUSAGE_THREAD
1646#else
1647#define FRR_RUSAGE RUSAGE_SELF
1648#endif
d62a17ae 1649 monotime(&r->real);
f75e802d 1650#ifndef EXCLUDE_CPU_TIME
231db9a6 1651 getrusage(FRR_RUSAGE, &(r->cpu));
f75e802d 1652#endif
db9c0df9
PJ
1653}
1654
fbcac826
QY
1655/*
1656 * Call a thread.
1657 *
1658 * This function will atomically update the thread's usage history. At present
1659 * this is the only spot where usage history is written. Nevertheless the code
1660 * has been written such that the introduction of writers in the future should
1661 * not need to update it provided the writers atomically perform only the
1662 * operations done here, i.e. updating the total and maximum times. In
1663 * particular, the maximum real and cpu times must be monotonically increasing
1664 * or this code is not correct.
1665 */
d62a17ae 1666void thread_call(struct thread *thread)
718e3744 1667{
f75e802d 1668#ifndef EXCLUDE_CPU_TIME
fbcac826
QY
1669 _Atomic unsigned long realtime, cputime;
1670 unsigned long exp;
1671 unsigned long helper;
f75e802d 1672#endif
d62a17ae 1673 RUSAGE_T before, after;
cc8b13a0 1674
d62a17ae 1675 GETRUSAGE(&before);
1676 thread->real = before.real;
718e3744 1677
6c3aa850
DL
1678 frrtrace(9, frr_libfrr, thread_call, thread->master,
1679 thread->xref->funcname, thread->xref->xref.file,
1680 thread->xref->xref.line, NULL, thread->u.fd,
c7bb4f00 1681 thread->u.val, thread->arg, thread->u.sands.tv_sec);
abf96a87 1682
d62a17ae 1683 pthread_setspecific(thread_current, thread);
1684 (*thread->func)(thread);
1685 pthread_setspecific(thread_current, NULL);
718e3744 1686
d62a17ae 1687 GETRUSAGE(&after);
718e3744 1688
f75e802d 1689#ifndef EXCLUDE_CPU_TIME
fbcac826
QY
1690 realtime = thread_consumed_time(&after, &before, &helper);
1691 cputime = helper;
1692
1693 /* update realtime */
1694 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1695 memory_order_seq_cst);
1696 exp = atomic_load_explicit(&thread->hist->real.max,
1697 memory_order_seq_cst);
1698 while (exp < realtime
1699 && !atomic_compare_exchange_weak_explicit(
1700 &thread->hist->real.max, &exp, realtime,
1701 memory_order_seq_cst, memory_order_seq_cst))
1702 ;
1703
1704 /* update cputime */
1705 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1706 memory_order_seq_cst);
1707 exp = atomic_load_explicit(&thread->hist->cpu.max,
1708 memory_order_seq_cst);
1709 while (exp < cputime
1710 && !atomic_compare_exchange_weak_explicit(
1711 &thread->hist->cpu.max, &exp, cputime,
1712 memory_order_seq_cst, memory_order_seq_cst))
1713 ;
1714
1715 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1716 memory_order_seq_cst);
1717 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1718 memory_order_seq_cst);
718e3744 1719
924b9229 1720#ifdef CONSUMED_TIME_CHECK
d62a17ae 1721 if (realtime > CONSUMED_TIME_CHECK) {
1722 /*
1723 * We have a CPU Hog on our hands.
1724 * Whinge about it now, so we're aware this is yet another task
1725 * to fix.
1726 */
9ef9495e 1727 flog_warn(
450971aa 1728 EC_LIB_SLOW_THREAD,
d62a17ae 1729 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
60a3efec 1730 thread->xref->funcname, (unsigned long)thread->func,
d62a17ae 1731 realtime / 1000, cputime / 1000);
1732 }
924b9229 1733#endif /* CONSUMED_TIME_CHECK */
f75e802d 1734#endif /* Exclude CPU Time */
718e3744 1735}
1736
1737/* Execute thread */
60a3efec
DL
1738void _thread_execute(const struct xref_threadsched *xref,
1739 struct thread_master *m, int (*func)(struct thread *),
1740 void *arg, int val)
718e3744 1741{
c4345fbf 1742 struct thread *thread;
718e3744 1743
c4345fbf 1744 /* Get or allocate new thread to execute. */
00dffa8c 1745 frr_with_mutex(&m->mtx) {
60a3efec 1746 thread = thread_get(m, THREAD_EVENT, func, arg, xref);
9c7753e4 1747
c4345fbf 1748 /* Set its event value. */
00dffa8c 1749 frr_with_mutex(&thread->mtx) {
c4345fbf
RZ
1750 thread->add_type = THREAD_EXECUTE;
1751 thread->u.val = val;
1752 thread->ref = &thread;
1753 }
c4345fbf 1754 }
f7c62e11 1755
c4345fbf
RZ
1756 /* Execute thread doing all accounting. */
1757 thread_call(thread);
9c7753e4 1758
c4345fbf
RZ
1759 /* Give back or free thread. */
1760 thread_add_unuse(m, thread);
718e3744 1761}
1543c387
MS
1762
1763/* Debug signal mask - if 'sigs' is NULL, use current effective mask. */
1764void debug_signals(const sigset_t *sigs)
1765{
1766 int i, found;
1767 sigset_t tmpsigs;
1768 char buf[300];
1769
1770 /*
1771 * We're only looking at the non-realtime signals here, so we need
1772 * some limit value. Platform differences mean at some point we just
1773 * need to pick a reasonable value.
1774 */
1775#if defined SIGRTMIN
1776# define LAST_SIGNAL SIGRTMIN
1777#else
1778# define LAST_SIGNAL 32
1779#endif
1780
1781
1782 if (sigs == NULL) {
1783 sigemptyset(&tmpsigs);
1784 pthread_sigmask(SIG_BLOCK, NULL, &tmpsigs);
1785 sigs = &tmpsigs;
1786 }
1787
1788 found = 0;
1789 buf[0] = '\0';
1790
1791 for (i = 0; i < LAST_SIGNAL; i++) {
1792 char tmp[20];
1793
1794 if (sigismember(sigs, i) > 0) {
1795 if (found > 0)
1796 strlcat(buf, ",", sizeof(buf));
1797 snprintf(tmp, sizeof(tmp), "%d", i);
1798 strlcat(buf, tmp, sizeof(buf));
1799 found++;
1800 }
1801 }
1802
1803 if (found == 0)
1804 snprintf(buf, sizeof(buf), "<none>");
1805
1806 zlog_debug("%s: %s", __func__, buf);
1807}