]> git.proxmox.com Git - mirror_frr.git/blame - lib/thread.c
Merge pull request #6565 from donaldsharp/clear_longer
[mirror_frr.git] / lib / thread.c
CommitLineData
718e3744 1/* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
896014f4
DL
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
718e3744 19 */
20
21/* #define DEBUG */
22
23#include <zebra.h>
308d14ae 24#include <sys/resource.h>
718e3744 25
26#include "thread.h"
27#include "memory.h"
3e41733f 28#include "frrcu.h"
718e3744 29#include "log.h"
e04ab74d 30#include "hash.h"
31#include "command.h"
05c447dd 32#include "sigevent.h"
3bf2673b 33#include "network.h"
bd74dc61 34#include "jhash.h"
fbcac826 35#include "frratomic.h"
00dffa8c 36#include "frr_pthread.h"
9ef9495e 37#include "lib_errors.h"
d6be5fb9 38
d62a17ae 39DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
4a1ab8e4 40DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
a6f235f3 41DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
d62a17ae 42DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
4a1ab8e4 43
c284542b
DL
44DECLARE_LIST(thread_list, struct thread, threaditem)
45
27d29ced
DL
46static int thread_timer_cmp(const struct thread *a, const struct thread *b)
47{
48 if (a->u.sands.tv_sec < b->u.sands.tv_sec)
49 return -1;
50 if (a->u.sands.tv_sec > b->u.sands.tv_sec)
51 return 1;
52 if (a->u.sands.tv_usec < b->u.sands.tv_usec)
53 return -1;
54 if (a->u.sands.tv_usec > b->u.sands.tv_usec)
55 return 1;
56 return 0;
57}
58
59DECLARE_HEAP(thread_timer_list, struct thread, timeritem,
60 thread_timer_cmp)
61
3b96b781
HT
62#if defined(__APPLE__)
63#include <mach/mach.h>
64#include <mach/mach_time.h>
65#endif
66
d62a17ae 67#define AWAKEN(m) \
68 do { \
2b64873d 69 const unsigned char wakebyte = 0x01; \
d62a17ae 70 write(m->io_pipe[1], &wakebyte, 1); \
71 } while (0);
3bf2673b 72
62f44022 73/* control variable for initializer */
c17faa4b 74static pthread_once_t init_once = PTHREAD_ONCE_INIT;
e0bebc7c 75pthread_key_t thread_current;
6b0655a2 76
c17faa4b 77static pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
62f44022
QY
78static struct list *masters;
79
6655966d 80static void thread_free(struct thread_master *master, struct thread *thread);
6b0655a2 81
62f44022 82/* CLI start ---------------------------------------------------------------- */
d8b87afe 83static unsigned int cpu_record_hash_key(const struct cpu_thread_history *a)
e04ab74d 84{
883cc51d 85 int size = sizeof(a->func);
bd74dc61
DS
86
87 return jhash(&a->func, size, 0);
e04ab74d 88}
89
74df8d6d 90static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
d62a17ae 91 const struct cpu_thread_history *b)
e04ab74d 92{
d62a17ae 93 return a->func == b->func;
e04ab74d 94}
95
d62a17ae 96static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
e04ab74d 97{
d62a17ae 98 struct cpu_thread_history *new;
99 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
100 new->func = a->func;
101 new->funcname = a->funcname;
102 return new;
e04ab74d 103}
104
d62a17ae 105static void cpu_record_hash_free(void *a)
228da428 106{
d62a17ae 107 struct cpu_thread_history *hist = a;
108
109 XFREE(MTYPE_THREAD_STATS, hist);
228da428
CC
110}
111
f75e802d 112#ifndef EXCLUDE_CPU_TIME
d62a17ae 113static void vty_out_cpu_thread_history(struct vty *vty,
114 struct cpu_thread_history *a)
e04ab74d 115{
9a8a7b0e 116 vty_out(vty, "%5zu %10zu.%03zu %9zu %8zu %9zu %8zu %9zu",
566bdaf6
DL
117 (size_t)a->total_active, a->cpu.total / 1000,
118 a->cpu.total % 1000, (size_t)a->total_calls,
119 (size_t)(a->cpu.total / a->total_calls), a->cpu.max,
120 (size_t)(a->real.total / a->total_calls), a->real.max);
d62a17ae 121 vty_out(vty, " %c%c%c%c%c %s\n",
122 a->types & (1 << THREAD_READ) ? 'R' : ' ',
123 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
124 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
125 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
126 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
e04ab74d 127}
128
e3b78da8 129static void cpu_record_hash_print(struct hash_bucket *bucket, void *args[])
e04ab74d 130{
d62a17ae 131 struct cpu_thread_history *totals = args[0];
fbcac826 132 struct cpu_thread_history copy;
d62a17ae 133 struct vty *vty = args[1];
fbcac826 134 uint8_t *filter = args[2];
d62a17ae 135
136 struct cpu_thread_history *a = bucket->data;
137
fbcac826
QY
138 copy.total_active =
139 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
140 copy.total_calls =
141 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
142 copy.cpu.total =
143 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
144 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
145 copy.real.total =
146 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
147 copy.real.max =
148 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
149 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
150 copy.funcname = a->funcname;
151
152 if (!(copy.types & *filter))
d62a17ae 153 return;
fbcac826
QY
154
155 vty_out_cpu_thread_history(vty, &copy);
156 totals->total_active += copy.total_active;
157 totals->total_calls += copy.total_calls;
158 totals->real.total += copy.real.total;
159 if (totals->real.max < copy.real.max)
160 totals->real.max = copy.real.max;
161 totals->cpu.total += copy.cpu.total;
162 if (totals->cpu.max < copy.cpu.max)
163 totals->cpu.max = copy.cpu.max;
e04ab74d 164}
165
fbcac826 166static void cpu_record_print(struct vty *vty, uint8_t filter)
e04ab74d 167{
d62a17ae 168 struct cpu_thread_history tmp;
169 void *args[3] = {&tmp, vty, &filter};
170 struct thread_master *m;
171 struct listnode *ln;
172
0d6f7fd6 173 memset(&tmp, 0, sizeof(tmp));
d62a17ae 174 tmp.funcname = "TOTAL";
175 tmp.types = filter;
176
00dffa8c 177 frr_with_mutex(&masters_mtx) {
d62a17ae 178 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
179 const char *name = m->name ? m->name : "main";
180
181 char underline[strlen(name) + 1];
182 memset(underline, '-', sizeof(underline));
4f113d60 183 underline[sizeof(underline) - 1] = '\0';
d62a17ae 184
185 vty_out(vty, "\n");
186 vty_out(vty, "Showing statistics for pthread %s\n",
187 name);
188 vty_out(vty, "-------------------------------%s\n",
189 underline);
190 vty_out(vty, "%21s %18s %18s\n", "",
191 "CPU (user+system):", "Real (wall-clock):");
192 vty_out(vty,
193 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
194 vty_out(vty, " Avg uSec Max uSecs");
195 vty_out(vty, " Type Thread\n");
196
197 if (m->cpu_record->count)
198 hash_iterate(
199 m->cpu_record,
e3b78da8 200 (void (*)(struct hash_bucket *,
d62a17ae 201 void *))cpu_record_hash_print,
202 args);
203 else
204 vty_out(vty, "No data to display yet.\n");
205
206 vty_out(vty, "\n");
207 }
208 }
d62a17ae 209
210 vty_out(vty, "\n");
211 vty_out(vty, "Total thread statistics\n");
212 vty_out(vty, "-------------------------\n");
213 vty_out(vty, "%21s %18s %18s\n", "",
214 "CPU (user+system):", "Real (wall-clock):");
215 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
216 vty_out(vty, " Avg uSec Max uSecs");
217 vty_out(vty, " Type Thread\n");
218
219 if (tmp.total_calls > 0)
220 vty_out_cpu_thread_history(vty, &tmp);
e04ab74d 221}
f75e802d 222#endif
e04ab74d 223
e3b78da8 224static void cpu_record_hash_clear(struct hash_bucket *bucket, void *args[])
e276eb82 225{
fbcac826 226 uint8_t *filter = args[0];
d62a17ae 227 struct hash *cpu_record = args[1];
228
229 struct cpu_thread_history *a = bucket->data;
62f44022 230
d62a17ae 231 if (!(a->types & *filter))
232 return;
f48f65d2 233
d62a17ae 234 hash_release(cpu_record, bucket->data);
e276eb82
PJ
235}
236
fbcac826 237static void cpu_record_clear(uint8_t filter)
e276eb82 238{
fbcac826 239 uint8_t *tmp = &filter;
d62a17ae 240 struct thread_master *m;
241 struct listnode *ln;
242
00dffa8c 243 frr_with_mutex(&masters_mtx) {
d62a17ae 244 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
00dffa8c 245 frr_with_mutex(&m->mtx) {
d62a17ae 246 void *args[2] = {tmp, m->cpu_record};
247 hash_iterate(
248 m->cpu_record,
e3b78da8 249 (void (*)(struct hash_bucket *,
d62a17ae 250 void *))cpu_record_hash_clear,
251 args);
252 }
d62a17ae 253 }
254 }
62f44022
QY
255}
256
fbcac826 257static uint8_t parse_filter(const char *filterstr)
62f44022 258{
d62a17ae 259 int i = 0;
260 int filter = 0;
261
262 while (filterstr[i] != '\0') {
263 switch (filterstr[i]) {
264 case 'r':
265 case 'R':
266 filter |= (1 << THREAD_READ);
267 break;
268 case 'w':
269 case 'W':
270 filter |= (1 << THREAD_WRITE);
271 break;
272 case 't':
273 case 'T':
274 filter |= (1 << THREAD_TIMER);
275 break;
276 case 'e':
277 case 'E':
278 filter |= (1 << THREAD_EVENT);
279 break;
280 case 'x':
281 case 'X':
282 filter |= (1 << THREAD_EXECUTE);
283 break;
284 default:
285 break;
286 }
287 ++i;
288 }
289 return filter;
62f44022
QY
290}
291
f75e802d 292#ifndef EXCLUDE_CPU_TIME
62f44022
QY
293DEFUN (show_thread_cpu,
294 show_thread_cpu_cmd,
295 "show thread cpu [FILTER]",
296 SHOW_STR
297 "Thread information\n"
298 "Thread CPU usage\n"
61fa0b97 299 "Display filter (rwtex)\n")
62f44022 300{
fbcac826 301 uint8_t filter = (uint8_t)-1U;
d62a17ae 302 int idx = 0;
303
304 if (argv_find(argv, argc, "FILTER", &idx)) {
305 filter = parse_filter(argv[idx]->arg);
306 if (!filter) {
307 vty_out(vty,
308 "Invalid filter \"%s\" specified; must contain at least"
309 "one of 'RWTEXB'\n",
310 argv[idx]->arg);
311 return CMD_WARNING;
312 }
313 }
314
315 cpu_record_print(vty, filter);
316 return CMD_SUCCESS;
e276eb82 317}
f75e802d 318#endif
e276eb82 319
8872626b
DS
320static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
321{
322 const char *name = m->name ? m->name : "main";
323 char underline[strlen(name) + 1];
a0b36ae6 324 struct thread *thread;
8872626b
DS
325 uint32_t i;
326
327 memset(underline, '-', sizeof(underline));
328 underline[sizeof(underline) - 1] = '\0';
329
330 vty_out(vty, "\nShowing poll FD's for %s\n", name);
331 vty_out(vty, "----------------------%s\n", underline);
6c19478a
DS
332 vty_out(vty, "Count: %u/%d\n", (uint32_t)m->handler.pfdcount,
333 m->fd_limit);
a0b36ae6
DS
334 for (i = 0; i < m->handler.pfdcount; i++) {
335 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\t\t", i,
336 m->handler.pfds[i].fd, m->handler.pfds[i].events,
8872626b 337 m->handler.pfds[i].revents);
a0b36ae6
DS
338
339 if (m->handler.pfds[i].events & POLLIN) {
340 thread = m->read[m->handler.pfds[i].fd];
341
342 if (!thread)
343 vty_out(vty, "ERROR ");
344 else
345 vty_out(vty, "%s ", thread->funcname);
346 } else
347 vty_out(vty, " ");
348
349 if (m->handler.pfds[i].events & POLLOUT) {
350 thread = m->write[m->handler.pfds[i].fd];
351
352 if (!thread)
353 vty_out(vty, "ERROR\n");
354 else
355 vty_out(vty, "%s\n", thread->funcname);
356 } else
357 vty_out(vty, "\n");
358 }
8872626b
DS
359}
360
361DEFUN (show_thread_poll,
362 show_thread_poll_cmd,
363 "show thread poll",
364 SHOW_STR
365 "Thread information\n"
366 "Show poll FD's and information\n")
367{
368 struct listnode *node;
369 struct thread_master *m;
370
00dffa8c 371 frr_with_mutex(&masters_mtx) {
8872626b
DS
372 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
373 show_thread_poll_helper(vty, m);
374 }
375 }
8872626b
DS
376
377 return CMD_SUCCESS;
378}
379
380
49d41a26
DS
381DEFUN (clear_thread_cpu,
382 clear_thread_cpu_cmd,
383 "clear thread cpu [FILTER]",
62f44022 384 "Clear stored data in all pthreads\n"
49d41a26
DS
385 "Thread information\n"
386 "Thread CPU usage\n"
387 "Display filter (rwtexb)\n")
e276eb82 388{
fbcac826 389 uint8_t filter = (uint8_t)-1U;
d62a17ae 390 int idx = 0;
391
392 if (argv_find(argv, argc, "FILTER", &idx)) {
393 filter = parse_filter(argv[idx]->arg);
394 if (!filter) {
395 vty_out(vty,
396 "Invalid filter \"%s\" specified; must contain at least"
397 "one of 'RWTEXB'\n",
398 argv[idx]->arg);
399 return CMD_WARNING;
400 }
401 }
402
403 cpu_record_clear(filter);
404 return CMD_SUCCESS;
e276eb82 405}
6b0655a2 406
d62a17ae 407void thread_cmd_init(void)
0b84f294 408{
f75e802d 409#ifndef EXCLUDE_CPU_TIME
d62a17ae 410 install_element(VIEW_NODE, &show_thread_cpu_cmd);
f75e802d 411#endif
8872626b 412 install_element(VIEW_NODE, &show_thread_poll_cmd);
d62a17ae 413 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
0b84f294 414}
62f44022
QY
415/* CLI end ------------------------------------------------------------------ */
416
0b84f294 417
d62a17ae 418static void cancelreq_del(void *cr)
63ccb9cb 419{
d62a17ae 420 XFREE(MTYPE_TMP, cr);
63ccb9cb
QY
421}
422
e0bebc7c 423/* initializer, only ever called once */
4d762f26 424static void initializer(void)
e0bebc7c 425{
d62a17ae 426 pthread_key_create(&thread_current, NULL);
e0bebc7c
QY
427}
428
d62a17ae 429struct thread_master *thread_master_create(const char *name)
718e3744 430{
d62a17ae 431 struct thread_master *rv;
432 struct rlimit limit;
433
434 pthread_once(&init_once, &initializer);
435
436 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
d62a17ae 437
438 /* Initialize master mutex */
439 pthread_mutex_init(&rv->mtx, NULL);
440 pthread_cond_init(&rv->cancel_cond, NULL);
441
442 /* Set name */
443 rv->name = name ? XSTRDUP(MTYPE_THREAD_MASTER, name) : NULL;
444
445 /* Initialize I/O task data structures */
446 getrlimit(RLIMIT_NOFILE, &limit);
447 rv->fd_limit = (int)limit.rlim_cur;
a6f235f3
DS
448 rv->read = XCALLOC(MTYPE_THREAD_POLL,
449 sizeof(struct thread *) * rv->fd_limit);
450
451 rv->write = XCALLOC(MTYPE_THREAD_POLL,
452 sizeof(struct thread *) * rv->fd_limit);
d62a17ae 453
bd74dc61 454 rv->cpu_record = hash_create_size(
d8b87afe 455 8, (unsigned int (*)(const void *))cpu_record_hash_key,
74df8d6d 456 (bool (*)(const void *, const void *))cpu_record_hash_cmp,
bd74dc61 457 "Thread Hash");
d62a17ae 458
c284542b
DL
459 thread_list_init(&rv->event);
460 thread_list_init(&rv->ready);
461 thread_list_init(&rv->unuse);
27d29ced 462 thread_timer_list_init(&rv->timer);
d62a17ae 463
464 /* Initialize thread_fetch() settings */
465 rv->spin = true;
466 rv->handle_signals = true;
467
468 /* Set pthread owner, should be updated by actual owner */
469 rv->owner = pthread_self();
470 rv->cancel_req = list_new();
471 rv->cancel_req->del = cancelreq_del;
472 rv->canceled = true;
473
474 /* Initialize pipe poker */
475 pipe(rv->io_pipe);
476 set_nonblocking(rv->io_pipe[0]);
477 set_nonblocking(rv->io_pipe[1]);
478
479 /* Initialize data structures for poll() */
480 rv->handler.pfdsize = rv->fd_limit;
481 rv->handler.pfdcount = 0;
482 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
483 sizeof(struct pollfd) * rv->handler.pfdsize);
484 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
485 sizeof(struct pollfd) * rv->handler.pfdsize);
486
eff09c66 487 /* add to list of threadmasters */
00dffa8c 488 frr_with_mutex(&masters_mtx) {
eff09c66
QY
489 if (!masters)
490 masters = list_new();
491
d62a17ae 492 listnode_add(masters, rv);
493 }
d62a17ae 494
495 return rv;
718e3744 496}
497
d8a8a8de
QY
498void thread_master_set_name(struct thread_master *master, const char *name)
499{
00dffa8c 500 frr_with_mutex(&master->mtx) {
0a22ddfb 501 XFREE(MTYPE_THREAD_MASTER, master->name);
d8a8a8de
QY
502 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
503 }
d8a8a8de
QY
504}
505
6ed04aa2
DS
506#define THREAD_UNUSED_DEPTH 10
507
718e3744 508/* Move thread to unuse list. */
d62a17ae 509static void thread_add_unuse(struct thread_master *m, struct thread *thread)
718e3744 510{
6655966d
RZ
511 pthread_mutex_t mtxc = thread->mtx;
512
d62a17ae 513 assert(m != NULL && thread != NULL);
d62a17ae 514
d62a17ae 515 thread->hist->total_active--;
6ed04aa2
DS
516 memset(thread, 0, sizeof(struct thread));
517 thread->type = THREAD_UNUSED;
518
6655966d
RZ
519 /* Restore the thread mutex context. */
520 thread->mtx = mtxc;
521
c284542b
DL
522 if (thread_list_count(&m->unuse) < THREAD_UNUSED_DEPTH) {
523 thread_list_add_tail(&m->unuse, thread);
6655966d
RZ
524 return;
525 }
526
527 thread_free(m, thread);
718e3744 528}
529
530/* Free all unused thread. */
c284542b
DL
531static void thread_list_free(struct thread_master *m,
532 struct thread_list_head *list)
718e3744 533{
d62a17ae 534 struct thread *t;
d62a17ae 535
c284542b 536 while ((t = thread_list_pop(list)))
6655966d 537 thread_free(m, t);
718e3744 538}
539
d62a17ae 540static void thread_array_free(struct thread_master *m,
541 struct thread **thread_array)
308d14ae 542{
d62a17ae 543 struct thread *t;
544 int index;
545
546 for (index = 0; index < m->fd_limit; ++index) {
547 t = thread_array[index];
548 if (t) {
549 thread_array[index] = NULL;
6655966d 550 thread_free(m, t);
d62a17ae 551 }
552 }
a6f235f3 553 XFREE(MTYPE_THREAD_POLL, thread_array);
308d14ae
DV
554}
555
495f0b13
DS
556/*
557 * thread_master_free_unused
558 *
559 * As threads are finished with they are put on the
560 * unuse list for later reuse.
561 * If we are shutting down, Free up unused threads
562 * So we can see if we forget to shut anything off
563 */
d62a17ae 564void thread_master_free_unused(struct thread_master *m)
495f0b13 565{
00dffa8c 566 frr_with_mutex(&m->mtx) {
d62a17ae 567 struct thread *t;
c284542b 568 while ((t = thread_list_pop(&m->unuse)))
6655966d 569 thread_free(m, t);
d62a17ae 570 }
495f0b13
DS
571}
572
718e3744 573/* Stop thread scheduler. */
d62a17ae 574void thread_master_free(struct thread_master *m)
718e3744 575{
27d29ced
DL
576 struct thread *t;
577
00dffa8c 578 frr_with_mutex(&masters_mtx) {
d62a17ae 579 listnode_delete(masters, m);
eff09c66 580 if (masters->count == 0) {
6a154c88 581 list_delete(&masters);
eff09c66 582 }
d62a17ae 583 }
d62a17ae 584
585 thread_array_free(m, m->read);
586 thread_array_free(m, m->write);
27d29ced
DL
587 while ((t = thread_timer_list_pop(&m->timer)))
588 thread_free(m, t);
d62a17ae 589 thread_list_free(m, &m->event);
590 thread_list_free(m, &m->ready);
591 thread_list_free(m, &m->unuse);
592 pthread_mutex_destroy(&m->mtx);
33844bbe 593 pthread_cond_destroy(&m->cancel_cond);
d62a17ae 594 close(m->io_pipe[0]);
595 close(m->io_pipe[1]);
6a154c88 596 list_delete(&m->cancel_req);
1a0a92ea 597 m->cancel_req = NULL;
d62a17ae 598
599 hash_clean(m->cpu_record, cpu_record_hash_free);
600 hash_free(m->cpu_record);
601 m->cpu_record = NULL;
602
0a22ddfb 603 XFREE(MTYPE_THREAD_MASTER, m->name);
d62a17ae 604 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
605 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
606 XFREE(MTYPE_THREAD_MASTER, m);
718e3744 607}
608
78ca0342
CF
609/* Return remain time in miliseconds. */
610unsigned long thread_timer_remain_msec(struct thread *thread)
718e3744 611{
d62a17ae 612 int64_t remain;
1189d95f 613
00dffa8c 614 frr_with_mutex(&thread->mtx) {
78ca0342 615 remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
d62a17ae 616 }
1189d95f 617
d62a17ae 618 return remain < 0 ? 0 : remain;
718e3744 619}
620
78ca0342
CF
621/* Return remain time in seconds. */
622unsigned long thread_timer_remain_second(struct thread *thread)
623{
624 return thread_timer_remain_msec(thread) / 1000LL;
625}
626
9c7753e4
DL
627#define debugargdef const char *funcname, const char *schedfrom, int fromln
628#define debugargpass funcname, schedfrom, fromln
e04ab74d 629
d62a17ae 630struct timeval thread_timer_remain(struct thread *thread)
6ac44687 631{
d62a17ae 632 struct timeval remain;
00dffa8c 633 frr_with_mutex(&thread->mtx) {
d62a17ae 634 monotime_until(&thread->u.sands, &remain);
635 }
d62a17ae 636 return remain;
6ac44687
CF
637}
638
718e3744 639/* Get new thread. */
d7c0a89a 640static struct thread *thread_get(struct thread_master *m, uint8_t type,
d62a17ae 641 int (*func)(struct thread *), void *arg,
642 debugargdef)
718e3744 643{
c284542b 644 struct thread *thread = thread_list_pop(&m->unuse);
d62a17ae 645 struct cpu_thread_history tmp;
646
647 if (!thread) {
648 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
649 /* mutex only needs to be initialized at struct creation. */
650 pthread_mutex_init(&thread->mtx, NULL);
651 m->alloc++;
652 }
653
654 thread->type = type;
655 thread->add_type = type;
656 thread->master = m;
657 thread->arg = arg;
d62a17ae 658 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
659 thread->ref = NULL;
660
661 /*
662 * So if the passed in funcname is not what we have
663 * stored that means the thread->hist needs to be
664 * updated. We keep the last one around in unused
665 * under the assumption that we are probably
666 * going to immediately allocate the same
667 * type of thread.
668 * This hopefully saves us some serious
669 * hash_get lookups.
670 */
671 if (thread->funcname != funcname || thread->func != func) {
672 tmp.func = func;
673 tmp.funcname = funcname;
674 thread->hist =
675 hash_get(m->cpu_record, &tmp,
676 (void *(*)(void *))cpu_record_hash_alloc);
677 }
678 thread->hist->total_active++;
679 thread->func = func;
680 thread->funcname = funcname;
681 thread->schedfrom = schedfrom;
682 thread->schedfrom_line = fromln;
683
684 return thread;
718e3744 685}
686
6655966d
RZ
687static void thread_free(struct thread_master *master, struct thread *thread)
688{
689 /* Update statistics. */
690 assert(master->alloc > 0);
691 master->alloc--;
692
693 /* Free allocated resources. */
694 pthread_mutex_destroy(&thread->mtx);
695 XFREE(MTYPE_THREAD, thread);
696}
697
d62a17ae 698static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
699 nfds_t count, const struct timeval *timer_wait)
209a72a6 700{
d62a17ae 701 /* If timer_wait is null here, that means poll() should block
702 * indefinitely,
f79f7a7b 703 * unless the thread_master has overridden it by setting
d62a17ae 704 * ->selectpoll_timeout.
705 * If the value is positive, it specifies the maximum number of
706 * milliseconds
707 * to wait. If the timeout is -1, it specifies that we should never wait
708 * and
709 * always return immediately even if no event is detected. If the value
710 * is
711 * zero, the behavior is default. */
712 int timeout = -1;
713
714 /* number of file descriptors with events */
715 int num;
716
717 if (timer_wait != NULL
718 && m->selectpoll_timeout == 0) // use the default value
719 timeout = (timer_wait->tv_sec * 1000)
720 + (timer_wait->tv_usec / 1000);
721 else if (m->selectpoll_timeout > 0) // use the user's timeout
722 timeout = m->selectpoll_timeout;
723 else if (m->selectpoll_timeout
724 < 0) // effect a poll (return immediately)
725 timeout = 0;
726
0bdeb5e5 727 zlog_tls_buffer_flush();
3e41733f
DL
728 rcu_read_unlock();
729 rcu_assert_read_unlocked();
730
d62a17ae 731 /* add poll pipe poker */
732 assert(count + 1 < pfdsize);
733 pfds[count].fd = m->io_pipe[0];
734 pfds[count].events = POLLIN;
735 pfds[count].revents = 0x00;
736
737 num = poll(pfds, count + 1, timeout);
738
739 unsigned char trash[64];
740 if (num > 0 && pfds[count].revents != 0 && num--)
741 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
742 ;
743
3e41733f
DL
744 rcu_read_lock();
745
d62a17ae 746 return num;
209a72a6
DS
747}
748
718e3744 749/* Add new read thread. */
d62a17ae 750struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
751 int (*func)(struct thread *),
752 void *arg, int fd,
753 struct thread **t_ptr,
754 debugargdef)
718e3744 755{
d62a17ae 756 struct thread *thread = NULL;
1ef14bee 757 struct thread **thread_array;
d62a17ae 758
9b864cd3 759 assert(fd >= 0 && fd < m->fd_limit);
00dffa8c
DL
760 frr_with_mutex(&m->mtx) {
761 if (t_ptr && *t_ptr)
762 // thread is already scheduled; don't reschedule
763 break;
d62a17ae 764
765 /* default to a new pollfd */
766 nfds_t queuepos = m->handler.pfdcount;
767
1ef14bee
DS
768 if (dir == THREAD_READ)
769 thread_array = m->read;
770 else
771 thread_array = m->write;
772
d62a17ae 773 /* if we already have a pollfd for our file descriptor, find and
774 * use it */
775 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
776 if (m->handler.pfds[i].fd == fd) {
777 queuepos = i;
1ef14bee
DS
778
779#ifdef DEV_BUILD
780 /*
781 * What happens if we have a thread already
782 * created for this event?
783 */
784 if (thread_array[fd])
785 assert(!"Thread already scheduled for file descriptor");
786#endif
d62a17ae 787 break;
788 }
789
790 /* make sure we have room for this fd + pipe poker fd */
791 assert(queuepos + 1 < m->handler.pfdsize);
792
793 thread = thread_get(m, dir, func, arg, debugargpass);
794
795 m->handler.pfds[queuepos].fd = fd;
796 m->handler.pfds[queuepos].events |=
797 (dir == THREAD_READ ? POLLIN : POLLOUT);
798
799 if (queuepos == m->handler.pfdcount)
800 m->handler.pfdcount++;
801
802 if (thread) {
00dffa8c 803 frr_with_mutex(&thread->mtx) {
d62a17ae 804 thread->u.fd = fd;
1ef14bee 805 thread_array[thread->u.fd] = thread;
d62a17ae 806 }
d62a17ae 807
808 if (t_ptr) {
809 *t_ptr = thread;
810 thread->ref = t_ptr;
811 }
812 }
813
814 AWAKEN(m);
815 }
d62a17ae 816
817 return thread;
718e3744 818}
819
56a94b36 820static struct thread *
d62a17ae 821funcname_thread_add_timer_timeval(struct thread_master *m,
822 int (*func)(struct thread *), int type,
823 void *arg, struct timeval *time_relative,
824 struct thread **t_ptr, debugargdef)
718e3744 825{
d62a17ae 826 struct thread *thread;
d62a17ae 827
828 assert(m != NULL);
829
830 assert(type == THREAD_TIMER);
831 assert(time_relative);
832
00dffa8c
DL
833 frr_with_mutex(&m->mtx) {
834 if (t_ptr && *t_ptr)
835 // thread is already scheduled; don't reschedule
d62a17ae 836 return NULL;
d62a17ae 837
d62a17ae 838 thread = thread_get(m, type, func, arg, debugargpass);
839
00dffa8c 840 frr_with_mutex(&thread->mtx) {
d62a17ae 841 monotime(&thread->u.sands);
842 timeradd(&thread->u.sands, time_relative,
843 &thread->u.sands);
27d29ced 844 thread_timer_list_add(&m->timer, thread);
d62a17ae 845 if (t_ptr) {
846 *t_ptr = thread;
847 thread->ref = t_ptr;
848 }
849 }
d62a17ae 850
851 AWAKEN(m);
852 }
d62a17ae 853
854 return thread;
9e867fe6 855}
856
98c91ac6 857
858/* Add timer event thread. */
d62a17ae 859struct thread *funcname_thread_add_timer(struct thread_master *m,
860 int (*func)(struct thread *),
861 void *arg, long timer,
862 struct thread **t_ptr, debugargdef)
9e867fe6 863{
d62a17ae 864 struct timeval trel;
9e867fe6 865
d62a17ae 866 assert(m != NULL);
9e867fe6 867
d62a17ae 868 trel.tv_sec = timer;
869 trel.tv_usec = 0;
9e867fe6 870
d62a17ae 871 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
872 &trel, t_ptr, debugargpass);
98c91ac6 873}
9e867fe6 874
98c91ac6 875/* Add timer event thread with "millisecond" resolution */
d62a17ae 876struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
877 int (*func)(struct thread *),
878 void *arg, long timer,
879 struct thread **t_ptr,
880 debugargdef)
98c91ac6 881{
d62a17ae 882 struct timeval trel;
9e867fe6 883
d62a17ae 884 assert(m != NULL);
718e3744 885
d62a17ae 886 trel.tv_sec = timer / 1000;
887 trel.tv_usec = 1000 * (timer % 1000);
98c91ac6 888
d62a17ae 889 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
890 &trel, t_ptr, debugargpass);
a48b4e6d 891}
892
d03c4cbd 893/* Add timer event thread with "millisecond" resolution */
d62a17ae 894struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
895 int (*func)(struct thread *),
896 void *arg, struct timeval *tv,
897 struct thread **t_ptr, debugargdef)
d03c4cbd 898{
d62a17ae 899 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
900 t_ptr, debugargpass);
d03c4cbd
DL
901}
902
718e3744 903/* Add simple event thread. */
d62a17ae 904struct thread *funcname_thread_add_event(struct thread_master *m,
905 int (*func)(struct thread *),
906 void *arg, int val,
907 struct thread **t_ptr, debugargdef)
718e3744 908{
00dffa8c 909 struct thread *thread = NULL;
d62a17ae 910
911 assert(m != NULL);
912
00dffa8c
DL
913 frr_with_mutex(&m->mtx) {
914 if (t_ptr && *t_ptr)
915 // thread is already scheduled; don't reschedule
916 break;
d62a17ae 917
918 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
00dffa8c 919 frr_with_mutex(&thread->mtx) {
d62a17ae 920 thread->u.val = val;
c284542b 921 thread_list_add_tail(&m->event, thread);
d62a17ae 922 }
d62a17ae 923
924 if (t_ptr) {
925 *t_ptr = thread;
926 thread->ref = t_ptr;
927 }
928
929 AWAKEN(m);
930 }
d62a17ae 931
932 return thread;
718e3744 933}
934
63ccb9cb
QY
935/* Thread cancellation ------------------------------------------------------ */
936
8797240e
QY
937/**
938 * NOT's out the .events field of pollfd corresponding to the given file
939 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
940 *
941 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
942 * implementation for details.
943 *
944 * @param master
945 * @param fd
946 * @param state the event to cancel. One or more (OR'd together) of the
947 * following:
948 * - POLLIN
949 * - POLLOUT
950 */
d62a17ae 951static void thread_cancel_rw(struct thread_master *master, int fd, short state)
0a95a0d0 952{
42d74538
QY
953 bool found = false;
954
d62a17ae 955 /* Cancel POLLHUP too just in case some bozo set it */
956 state |= POLLHUP;
957
958 /* find the index of corresponding pollfd */
959 nfds_t i;
960
961 for (i = 0; i < master->handler.pfdcount; i++)
42d74538
QY
962 if (master->handler.pfds[i].fd == fd) {
963 found = true;
d62a17ae 964 break;
42d74538
QY
965 }
966
967 if (!found) {
968 zlog_debug(
969 "[!] Received cancellation request for nonexistent rw job");
970 zlog_debug("[!] threadmaster: %s | fd: %d",
996c9314 971 master->name ? master->name : "", fd);
42d74538
QY
972 return;
973 }
d62a17ae 974
975 /* NOT out event. */
976 master->handler.pfds[i].events &= ~(state);
977
978 /* If all events are canceled, delete / resize the pollfd array. */
979 if (master->handler.pfds[i].events == 0) {
980 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
981 (master->handler.pfdcount - i - 1)
982 * sizeof(struct pollfd));
983 master->handler.pfdcount--;
e985cda0
S
984 master->handler.pfds[master->handler.pfdcount].fd = 0;
985 master->handler.pfds[master->handler.pfdcount].events = 0;
d62a17ae 986 }
987
988 /* If we have the same pollfd in the copy, perform the same operations,
989 * otherwise return. */
990 if (i >= master->handler.copycount)
991 return;
992
993 master->handler.copy[i].events &= ~(state);
994
995 if (master->handler.copy[i].events == 0) {
996 memmove(master->handler.copy + i, master->handler.copy + i + 1,
997 (master->handler.copycount - i - 1)
998 * sizeof(struct pollfd));
999 master->handler.copycount--;
e985cda0
S
1000 master->handler.copy[master->handler.copycount].fd = 0;
1001 master->handler.copy[master->handler.copycount].events = 0;
d62a17ae 1002 }
0a95a0d0
DS
1003}
1004
1189d95f 1005/**
63ccb9cb 1006 * Process cancellation requests.
1189d95f 1007 *
63ccb9cb
QY
1008 * This may only be run from the pthread which owns the thread_master.
1009 *
1010 * @param master the thread master to process
1011 * @REQUIRE master->mtx
1189d95f 1012 */
d62a17ae 1013static void do_thread_cancel(struct thread_master *master)
718e3744 1014{
c284542b 1015 struct thread_list_head *list = NULL;
d62a17ae 1016 struct thread **thread_array = NULL;
1017 struct thread *thread;
1018
1019 struct cancel_req *cr;
1020 struct listnode *ln;
1021 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1022 /* If this is an event object cancellation, linear search
1023 * through event
1024 * list deleting any events which have the specified argument.
1025 * We also
1026 * need to check every thread in the ready queue. */
1027 if (cr->eventobj) {
1028 struct thread *t;
c284542b 1029
81fddbe7 1030 frr_each_safe(thread_list, &master->event, t) {
c284542b
DL
1031 if (t->arg != cr->eventobj)
1032 continue;
1033 thread_list_del(&master->event, t);
1034 if (t->ref)
1035 *t->ref = NULL;
1036 thread_add_unuse(master, t);
d62a17ae 1037 }
1038
81fddbe7 1039 frr_each_safe(thread_list, &master->ready, t) {
c284542b
DL
1040 if (t->arg != cr->eventobj)
1041 continue;
1042 thread_list_del(&master->ready, t);
1043 if (t->ref)
1044 *t->ref = NULL;
1045 thread_add_unuse(master, t);
d62a17ae 1046 }
1047 continue;
1048 }
1049
1050 /* The pointer varies depending on whether the cancellation
1051 * request was
1052 * made asynchronously or not. If it was, we need to check
1053 * whether the
1054 * thread even exists anymore before cancelling it. */
1055 thread = (cr->thread) ? cr->thread : *cr->threadref;
1056
1057 if (!thread)
1058 continue;
1059
1060 /* Determine the appropriate queue to cancel the thread from */
1061 switch (thread->type) {
1062 case THREAD_READ:
1063 thread_cancel_rw(master, thread->u.fd, POLLIN);
1064 thread_array = master->read;
1065 break;
1066 case THREAD_WRITE:
1067 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1068 thread_array = master->write;
1069 break;
1070 case THREAD_TIMER:
27d29ced 1071 thread_timer_list_del(&master->timer, thread);
d62a17ae 1072 break;
1073 case THREAD_EVENT:
1074 list = &master->event;
1075 break;
1076 case THREAD_READY:
1077 list = &master->ready;
1078 break;
1079 default:
1080 continue;
1081 break;
1082 }
1083
27d29ced 1084 if (list) {
c284542b 1085 thread_list_del(list, thread);
d62a17ae 1086 } else if (thread_array) {
1087 thread_array[thread->u.fd] = NULL;
d62a17ae 1088 }
1089
1090 if (thread->ref)
1091 *thread->ref = NULL;
1092
1093 thread_add_unuse(thread->master, thread);
1094 }
1095
1096 /* Delete and free all cancellation requests */
1097 list_delete_all_node(master->cancel_req);
1098
1099 /* Wake up any threads which may be blocked in thread_cancel_async() */
1100 master->canceled = true;
1101 pthread_cond_broadcast(&master->cancel_cond);
718e3744 1102}
1103
63ccb9cb
QY
1104/**
1105 * Cancel any events which have the specified argument.
1106 *
1107 * MT-Unsafe
1108 *
1109 * @param m the thread_master to cancel from
1110 * @param arg the argument passed when creating the event
1111 */
d62a17ae 1112void thread_cancel_event(struct thread_master *master, void *arg)
718e3744 1113{
d62a17ae 1114 assert(master->owner == pthread_self());
1115
00dffa8c 1116 frr_with_mutex(&master->mtx) {
d62a17ae 1117 struct cancel_req *cr =
1118 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1119 cr->eventobj = arg;
1120 listnode_add(master->cancel_req, cr);
1121 do_thread_cancel(master);
1122 }
63ccb9cb 1123}
1189d95f 1124
63ccb9cb
QY
1125/**
1126 * Cancel a specific task.
1127 *
1128 * MT-Unsafe
1129 *
1130 * @param thread task to cancel
1131 */
d62a17ae 1132void thread_cancel(struct thread *thread)
63ccb9cb 1133{
6ed04aa2 1134 struct thread_master *master = thread->master;
d62a17ae 1135
6ed04aa2
DS
1136 assert(master->owner == pthread_self());
1137
00dffa8c 1138 frr_with_mutex(&master->mtx) {
d62a17ae 1139 struct cancel_req *cr =
1140 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1141 cr->thread = thread;
6ed04aa2
DS
1142 listnode_add(master->cancel_req, cr);
1143 do_thread_cancel(master);
d62a17ae 1144 }
63ccb9cb 1145}
1189d95f 1146
63ccb9cb
QY
1147/**
1148 * Asynchronous cancellation.
1149 *
8797240e
QY
1150 * Called with either a struct thread ** or void * to an event argument,
1151 * this function posts the correct cancellation request and blocks until it is
1152 * serviced.
63ccb9cb
QY
1153 *
1154 * If the thread is currently running, execution blocks until it completes.
1155 *
8797240e
QY
1156 * The last two parameters are mutually exclusive, i.e. if you pass one the
1157 * other must be NULL.
1158 *
1159 * When the cancellation procedure executes on the target thread_master, the
1160 * thread * provided is checked for nullity. If it is null, the thread is
1161 * assumed to no longer exist and the cancellation request is a no-op. Thus
1162 * users of this API must pass a back-reference when scheduling the original
1163 * task.
1164 *
63ccb9cb
QY
1165 * MT-Safe
1166 *
8797240e
QY
1167 * @param master the thread master with the relevant event / task
1168 * @param thread pointer to thread to cancel
1169 * @param eventobj the event
63ccb9cb 1170 */
d62a17ae 1171void thread_cancel_async(struct thread_master *master, struct thread **thread,
1172 void *eventobj)
63ccb9cb 1173{
d62a17ae 1174 assert(!(thread && eventobj) && (thread || eventobj));
1175 assert(master->owner != pthread_self());
1176
00dffa8c 1177 frr_with_mutex(&master->mtx) {
d62a17ae 1178 master->canceled = false;
1179
1180 if (thread) {
1181 struct cancel_req *cr =
1182 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1183 cr->threadref = thread;
1184 listnode_add(master->cancel_req, cr);
1185 } else if (eventobj) {
1186 struct cancel_req *cr =
1187 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1188 cr->eventobj = eventobj;
1189 listnode_add(master->cancel_req, cr);
1190 }
1191 AWAKEN(master);
1192
1193 while (!master->canceled)
1194 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1195 }
718e3744 1196}
63ccb9cb 1197/* ------------------------------------------------------------------------- */
718e3744 1198
27d29ced 1199static struct timeval *thread_timer_wait(struct thread_timer_list_head *timers,
d62a17ae 1200 struct timeval *timer_val)
718e3744 1201{
27d29ced
DL
1202 if (!thread_timer_list_count(timers))
1203 return NULL;
1204
1205 struct thread *next_timer = thread_timer_list_first(timers);
1206 monotime_until(&next_timer->u.sands, timer_val);
1207 return timer_val;
718e3744 1208}
718e3744 1209
d62a17ae 1210static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1211 struct thread *fetch)
718e3744 1212{
d62a17ae 1213 *fetch = *thread;
1214 thread_add_unuse(m, thread);
1215 return fetch;
718e3744 1216}
1217
d62a17ae 1218static int thread_process_io_helper(struct thread_master *m,
45f3d590
DS
1219 struct thread *thread, short state,
1220 short actual_state, int pos)
5d4ccd4e 1221{
d62a17ae 1222 struct thread **thread_array;
1223
45f3d590
DS
1224 /*
1225 * poll() clears the .events field, but the pollfd array we
1226 * pass to poll() is a copy of the one used to schedule threads.
1227 * We need to synchronize state between the two here by applying
1228 * the same changes poll() made on the copy of the "real" pollfd
1229 * array.
1230 *
1231 * This cleans up a possible infinite loop where we refuse
1232 * to respond to a poll event but poll is insistent that
1233 * we should.
1234 */
1235 m->handler.pfds[pos].events &= ~(state);
1236
1237 if (!thread) {
1238 if ((actual_state & (POLLHUP|POLLIN)) != POLLHUP)
1239 flog_err(EC_LIB_NO_THREAD,
1240 "Attempting to process an I/O event but for fd: %d(%d) no thread to handle this!\n",
1241 m->handler.pfds[pos].fd, actual_state);
d62a17ae 1242 return 0;
45f3d590 1243 }
d62a17ae 1244
1245 if (thread->type == THREAD_READ)
1246 thread_array = m->read;
1247 else
1248 thread_array = m->write;
1249
1250 thread_array[thread->u.fd] = NULL;
c284542b 1251 thread_list_add_tail(&m->ready, thread);
d62a17ae 1252 thread->type = THREAD_READY;
45f3d590 1253
d62a17ae 1254 return 1;
5d4ccd4e
DS
1255}
1256
8797240e
QY
1257/**
1258 * Process I/O events.
1259 *
1260 * Walks through file descriptor array looking for those pollfds whose .revents
1261 * field has something interesting. Deletes any invalid file descriptors.
1262 *
1263 * @param m the thread master
1264 * @param num the number of active file descriptors (return value of poll())
1265 */
d62a17ae 1266static void thread_process_io(struct thread_master *m, unsigned int num)
0a95a0d0 1267{
d62a17ae 1268 unsigned int ready = 0;
1269 struct pollfd *pfds = m->handler.copy;
1270
1271 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1272 /* no event for current fd? immediately continue */
1273 if (pfds[i].revents == 0)
1274 continue;
1275
1276 ready++;
1277
1278 /* Unless someone has called thread_cancel from another pthread,
1279 * the only
1280 * thing that could have changed in m->handler.pfds while we
1281 * were
1282 * asleep is the .events field in a given pollfd. Barring
1283 * thread_cancel()
1284 * that value should be a superset of the values we have in our
1285 * copy, so
1286 * there's no need to update it. Similarily, barring deletion,
1287 * the fd
1288 * should still be a valid index into the master's pfds. */
45f3d590 1289 if (pfds[i].revents & (POLLIN | POLLHUP)) {
d62a17ae 1290 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
45f3d590
DS
1291 pfds[i].revents, i);
1292 }
d62a17ae 1293 if (pfds[i].revents & POLLOUT)
1294 thread_process_io_helper(m, m->write[pfds[i].fd],
45f3d590 1295 POLLOUT, pfds[i].revents, i);
d62a17ae 1296
1297 /* if one of our file descriptors is garbage, remove the same
1298 * from
1299 * both pfds + update sizes and index */
1300 if (pfds[i].revents & POLLNVAL) {
1301 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1302 (m->handler.pfdcount - i - 1)
1303 * sizeof(struct pollfd));
1304 m->handler.pfdcount--;
e985cda0
S
1305 m->handler.pfds[m->handler.pfdcount].fd = 0;
1306 m->handler.pfds[m->handler.pfdcount].events = 0;
d62a17ae 1307
1308 memmove(pfds + i, pfds + i + 1,
1309 (m->handler.copycount - i - 1)
1310 * sizeof(struct pollfd));
1311 m->handler.copycount--;
e985cda0
S
1312 m->handler.copy[m->handler.copycount].fd = 0;
1313 m->handler.copy[m->handler.copycount].events = 0;
d62a17ae 1314
1315 i--;
1316 }
1317 }
718e3744 1318}
1319
8b70d0b0 1320/* Add all timers that have popped to the ready list. */
27d29ced 1321static unsigned int thread_process_timers(struct thread_timer_list_head *timers,
d62a17ae 1322 struct timeval *timenow)
a48b4e6d 1323{
d62a17ae 1324 struct thread *thread;
1325 unsigned int ready = 0;
1326
27d29ced 1327 while ((thread = thread_timer_list_first(timers))) {
d62a17ae 1328 if (timercmp(timenow, &thread->u.sands, <))
1329 return ready;
27d29ced 1330 thread_timer_list_pop(timers);
d62a17ae 1331 thread->type = THREAD_READY;
c284542b 1332 thread_list_add_tail(&thread->master->ready, thread);
d62a17ae 1333 ready++;
1334 }
1335 return ready;
a48b4e6d 1336}
1337
2613abe6 1338/* process a list en masse, e.g. for event thread lists */
c284542b 1339static unsigned int thread_process(struct thread_list_head *list)
2613abe6 1340{
d62a17ae 1341 struct thread *thread;
d62a17ae 1342 unsigned int ready = 0;
1343
c284542b 1344 while ((thread = thread_list_pop(list))) {
d62a17ae 1345 thread->type = THREAD_READY;
c284542b 1346 thread_list_add_tail(&thread->master->ready, thread);
d62a17ae 1347 ready++;
1348 }
1349 return ready;
2613abe6
PJ
1350}
1351
1352
718e3744 1353/* Fetch next ready thread. */
d62a17ae 1354struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
718e3744 1355{
d62a17ae 1356 struct thread *thread = NULL;
1357 struct timeval now;
1358 struct timeval zerotime = {0, 0};
1359 struct timeval tv;
1360 struct timeval *tw = NULL;
1361
1362 int num = 0;
1363
1364 do {
1365 /* Handle signals if any */
1366 if (m->handle_signals)
1367 quagga_sigevent_process();
1368
1369 pthread_mutex_lock(&m->mtx);
1370
1371 /* Process any pending cancellation requests */
1372 do_thread_cancel(m);
1373
e3c9529e
QY
1374 /*
1375 * Attempt to flush ready queue before going into poll().
1376 * This is performance-critical. Think twice before modifying.
1377 */
c284542b 1378 if ((thread = thread_list_pop(&m->ready))) {
e3c9529e
QY
1379 fetch = thread_run(m, thread, fetch);
1380 if (fetch->ref)
1381 *fetch->ref = NULL;
1382 pthread_mutex_unlock(&m->mtx);
1383 break;
1384 }
1385
1386 /* otherwise, tick through scheduling sequence */
1387
bca37d17
QY
1388 /*
1389 * Post events to ready queue. This must come before the
1390 * following block since events should occur immediately
1391 */
d62a17ae 1392 thread_process(&m->event);
1393
bca37d17
QY
1394 /*
1395 * If there are no tasks on the ready queue, we will poll()
1396 * until a timer expires or we receive I/O, whichever comes
1397 * first. The strategy for doing this is:
d62a17ae 1398 *
1399 * - If there are events pending, set the poll() timeout to zero
1400 * - If there are no events pending, but there are timers
1401 * pending, set the
1402 * timeout to the smallest remaining time on any timer
1403 * - If there are neither timers nor events pending, but there
1404 * are file
1405 * descriptors pending, block indefinitely in poll()
1406 * - If nothing is pending, it's time for the application to die
1407 *
1408 * In every case except the last, we need to hit poll() at least
bca37d17
QY
1409 * once per loop to avoid starvation by events
1410 */
c284542b 1411 if (!thread_list_count(&m->ready))
27d29ced 1412 tw = thread_timer_wait(&m->timer, &tv);
d62a17ae 1413
c284542b
DL
1414 if (thread_list_count(&m->ready) ||
1415 (tw && !timercmp(tw, &zerotime, >)))
d62a17ae 1416 tw = &zerotime;
1417
1418 if (!tw && m->handler.pfdcount == 0) { /* die */
1419 pthread_mutex_unlock(&m->mtx);
1420 fetch = NULL;
1421 break;
1422 }
1423
bca37d17
QY
1424 /*
1425 * Copy pollfd array + # active pollfds in it. Not necessary to
1426 * copy the array size as this is fixed.
1427 */
d62a17ae 1428 m->handler.copycount = m->handler.pfdcount;
1429 memcpy(m->handler.copy, m->handler.pfds,
1430 m->handler.copycount * sizeof(struct pollfd));
1431
e3c9529e
QY
1432 pthread_mutex_unlock(&m->mtx);
1433 {
1434 num = fd_poll(m, m->handler.copy, m->handler.pfdsize,
1435 m->handler.copycount, tw);
1436 }
1437 pthread_mutex_lock(&m->mtx);
d764d2cc 1438
e3c9529e
QY
1439 /* Handle any errors received in poll() */
1440 if (num < 0) {
1441 if (errno == EINTR) {
d62a17ae 1442 pthread_mutex_unlock(&m->mtx);
e3c9529e
QY
1443 /* loop around to signal handler */
1444 continue;
d62a17ae 1445 }
1446
e3c9529e 1447 /* else die */
450971aa 1448 flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
9ef9495e 1449 safe_strerror(errno));
e3c9529e
QY
1450 pthread_mutex_unlock(&m->mtx);
1451 fetch = NULL;
1452 break;
bca37d17 1453 }
d62a17ae 1454
1455 /* Post timers to ready queue. */
1456 monotime(&now);
27d29ced 1457 thread_process_timers(&m->timer, &now);
d62a17ae 1458
1459 /* Post I/O to ready queue. */
1460 if (num > 0)
1461 thread_process_io(m, num);
1462
d62a17ae 1463 pthread_mutex_unlock(&m->mtx);
1464
1465 } while (!thread && m->spin);
1466
1467 return fetch;
718e3744 1468}
1469
d62a17ae 1470static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
62f44022 1471{
d62a17ae 1472 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1473 + (a.tv_usec - b.tv_usec));
62f44022
QY
1474}
1475
d62a17ae 1476unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1477 unsigned long *cputime)
718e3744 1478{
d62a17ae 1479 /* This is 'user + sys' time. */
1480 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1481 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1482 return timeval_elapsed(now->real, start->real);
8b70d0b0 1483}
1484
50596be0
DS
1485/* We should aim to yield after yield milliseconds, which defaults
1486 to THREAD_YIELD_TIME_SLOT .
8b70d0b0 1487 Note: we are using real (wall clock) time for this calculation.
1488 It could be argued that CPU time may make more sense in certain
1489 contexts. The things to consider are whether the thread may have
1490 blocked (in which case wall time increases, but CPU time does not),
1491 or whether the system is heavily loaded with other processes competing
d62a17ae 1492 for CPU time. On balance, wall clock time seems to make sense.
8b70d0b0 1493 Plus it has the added benefit that gettimeofday should be faster
1494 than calling getrusage. */
d62a17ae 1495int thread_should_yield(struct thread *thread)
718e3744 1496{
d62a17ae 1497 int result;
00dffa8c 1498 frr_with_mutex(&thread->mtx) {
d62a17ae 1499 result = monotime_since(&thread->real, NULL)
1500 > (int64_t)thread->yield;
1501 }
d62a17ae 1502 return result;
50596be0
DS
1503}
1504
d62a17ae 1505void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
50596be0 1506{
00dffa8c 1507 frr_with_mutex(&thread->mtx) {
d62a17ae 1508 thread->yield = yield_time;
1509 }
718e3744 1510}
1511
d62a17ae 1512void thread_getrusage(RUSAGE_T *r)
db9c0df9 1513{
231db9a6
DS
1514#if defined RUSAGE_THREAD
1515#define FRR_RUSAGE RUSAGE_THREAD
1516#else
1517#define FRR_RUSAGE RUSAGE_SELF
1518#endif
d62a17ae 1519 monotime(&r->real);
f75e802d 1520#ifndef EXCLUDE_CPU_TIME
231db9a6 1521 getrusage(FRR_RUSAGE, &(r->cpu));
f75e802d 1522#endif
db9c0df9
PJ
1523}
1524
fbcac826
QY
1525/*
1526 * Call a thread.
1527 *
1528 * This function will atomically update the thread's usage history. At present
1529 * this is the only spot where usage history is written. Nevertheless the code
1530 * has been written such that the introduction of writers in the future should
1531 * not need to update it provided the writers atomically perform only the
1532 * operations done here, i.e. updating the total and maximum times. In
1533 * particular, the maximum real and cpu times must be monotonically increasing
1534 * or this code is not correct.
1535 */
d62a17ae 1536void thread_call(struct thread *thread)
718e3744 1537{
f75e802d 1538#ifndef EXCLUDE_CPU_TIME
fbcac826
QY
1539 _Atomic unsigned long realtime, cputime;
1540 unsigned long exp;
1541 unsigned long helper;
f75e802d 1542#endif
d62a17ae 1543 RUSAGE_T before, after;
cc8b13a0 1544
d62a17ae 1545 GETRUSAGE(&before);
1546 thread->real = before.real;
718e3744 1547
d62a17ae 1548 pthread_setspecific(thread_current, thread);
1549 (*thread->func)(thread);
1550 pthread_setspecific(thread_current, NULL);
718e3744 1551
d62a17ae 1552 GETRUSAGE(&after);
718e3744 1553
f75e802d 1554#ifndef EXCLUDE_CPU_TIME
fbcac826
QY
1555 realtime = thread_consumed_time(&after, &before, &helper);
1556 cputime = helper;
1557
1558 /* update realtime */
1559 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1560 memory_order_seq_cst);
1561 exp = atomic_load_explicit(&thread->hist->real.max,
1562 memory_order_seq_cst);
1563 while (exp < realtime
1564 && !atomic_compare_exchange_weak_explicit(
1565 &thread->hist->real.max, &exp, realtime,
1566 memory_order_seq_cst, memory_order_seq_cst))
1567 ;
1568
1569 /* update cputime */
1570 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1571 memory_order_seq_cst);
1572 exp = atomic_load_explicit(&thread->hist->cpu.max,
1573 memory_order_seq_cst);
1574 while (exp < cputime
1575 && !atomic_compare_exchange_weak_explicit(
1576 &thread->hist->cpu.max, &exp, cputime,
1577 memory_order_seq_cst, memory_order_seq_cst))
1578 ;
1579
1580 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1581 memory_order_seq_cst);
1582 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1583 memory_order_seq_cst);
718e3744 1584
924b9229 1585#ifdef CONSUMED_TIME_CHECK
d62a17ae 1586 if (realtime > CONSUMED_TIME_CHECK) {
1587 /*
1588 * We have a CPU Hog on our hands.
1589 * Whinge about it now, so we're aware this is yet another task
1590 * to fix.
1591 */
9ef9495e 1592 flog_warn(
450971aa 1593 EC_LIB_SLOW_THREAD,
d62a17ae 1594 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1595 thread->funcname, (unsigned long)thread->func,
1596 realtime / 1000, cputime / 1000);
1597 }
924b9229 1598#endif /* CONSUMED_TIME_CHECK */
f75e802d 1599#endif /* Exclude CPU Time */
718e3744 1600}
1601
1602/* Execute thread */
d62a17ae 1603void funcname_thread_execute(struct thread_master *m,
1604 int (*func)(struct thread *), void *arg, int val,
1605 debugargdef)
718e3744 1606{
c4345fbf 1607 struct thread *thread;
718e3744 1608
c4345fbf 1609 /* Get or allocate new thread to execute. */
00dffa8c 1610 frr_with_mutex(&m->mtx) {
c4345fbf 1611 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
9c7753e4 1612
c4345fbf 1613 /* Set its event value. */
00dffa8c 1614 frr_with_mutex(&thread->mtx) {
c4345fbf
RZ
1615 thread->add_type = THREAD_EXECUTE;
1616 thread->u.val = val;
1617 thread->ref = &thread;
1618 }
c4345fbf 1619 }
f7c62e11 1620
c4345fbf
RZ
1621 /* Execute thread doing all accounting. */
1622 thread_call(thread);
9c7753e4 1623
c4345fbf
RZ
1624 /* Give back or free thread. */
1625 thread_add_unuse(m, thread);
718e3744 1626}