]> git.proxmox.com Git - mirror_frr.git/blame - lib/thread.c
Merge pull request #5746 from donaldsharp/bgp_sa
[mirror_frr.git] / lib / thread.c
CommitLineData
718e3744 1/* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
896014f4
DL
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
718e3744 19 */
20
21/* #define DEBUG */
22
23#include <zebra.h>
308d14ae 24#include <sys/resource.h>
718e3744 25
26#include "thread.h"
27#include "memory.h"
3e41733f 28#include "frrcu.h"
718e3744 29#include "log.h"
e04ab74d 30#include "hash.h"
31#include "command.h"
05c447dd 32#include "sigevent.h"
3bf2673b 33#include "network.h"
bd74dc61 34#include "jhash.h"
fbcac826 35#include "frratomic.h"
00dffa8c 36#include "frr_pthread.h"
9ef9495e 37#include "lib_errors.h"
d6be5fb9 38
d62a17ae 39DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
4a1ab8e4 40DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
a6f235f3 41DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
d62a17ae 42DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
4a1ab8e4 43
c284542b
DL
44DECLARE_LIST(thread_list, struct thread, threaditem)
45
27d29ced
DL
46static int thread_timer_cmp(const struct thread *a, const struct thread *b)
47{
48 if (a->u.sands.tv_sec < b->u.sands.tv_sec)
49 return -1;
50 if (a->u.sands.tv_sec > b->u.sands.tv_sec)
51 return 1;
52 if (a->u.sands.tv_usec < b->u.sands.tv_usec)
53 return -1;
54 if (a->u.sands.tv_usec > b->u.sands.tv_usec)
55 return 1;
56 return 0;
57}
58
59DECLARE_HEAP(thread_timer_list, struct thread, timeritem,
60 thread_timer_cmp)
61
3b96b781
HT
62#if defined(__APPLE__)
63#include <mach/mach.h>
64#include <mach/mach_time.h>
65#endif
66
d62a17ae 67#define AWAKEN(m) \
68 do { \
2b64873d 69 const unsigned char wakebyte = 0x01; \
d62a17ae 70 write(m->io_pipe[1], &wakebyte, 1); \
71 } while (0);
3bf2673b 72
62f44022 73/* control variable for initializer */
c17faa4b 74static pthread_once_t init_once = PTHREAD_ONCE_INIT;
e0bebc7c 75pthread_key_t thread_current;
6b0655a2 76
c17faa4b 77static pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
62f44022
QY
78static struct list *masters;
79
6655966d 80static void thread_free(struct thread_master *master, struct thread *thread);
6b0655a2 81
62f44022 82/* CLI start ---------------------------------------------------------------- */
d8b87afe 83static unsigned int cpu_record_hash_key(const struct cpu_thread_history *a)
e04ab74d 84{
883cc51d 85 int size = sizeof(a->func);
bd74dc61
DS
86
87 return jhash(&a->func, size, 0);
e04ab74d 88}
89
74df8d6d 90static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
d62a17ae 91 const struct cpu_thread_history *b)
e04ab74d 92{
d62a17ae 93 return a->func == b->func;
e04ab74d 94}
95
d62a17ae 96static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
e04ab74d 97{
d62a17ae 98 struct cpu_thread_history *new;
99 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
100 new->func = a->func;
101 new->funcname = a->funcname;
102 return new;
e04ab74d 103}
104
d62a17ae 105static void cpu_record_hash_free(void *a)
228da428 106{
d62a17ae 107 struct cpu_thread_history *hist = a;
108
109 XFREE(MTYPE_THREAD_STATS, hist);
228da428
CC
110}
111
f75e802d 112#ifndef EXCLUDE_CPU_TIME
d62a17ae 113static void vty_out_cpu_thread_history(struct vty *vty,
114 struct cpu_thread_history *a)
e04ab74d 115{
9a8a7b0e 116 vty_out(vty, "%5zu %10zu.%03zu %9zu %8zu %9zu %8zu %9zu",
051a0be4
DL
117 (size_t)a->total_active,
118 a->cpu.total / 1000, a->cpu.total % 1000,
119 (size_t)a->total_calls,
120 a->cpu.total / a->total_calls, a->cpu.max,
d62a17ae 121 a->real.total / a->total_calls, a->real.max);
122 vty_out(vty, " %c%c%c%c%c %s\n",
123 a->types & (1 << THREAD_READ) ? 'R' : ' ',
124 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
125 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
126 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
127 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
e04ab74d 128}
129
e3b78da8 130static void cpu_record_hash_print(struct hash_bucket *bucket, void *args[])
e04ab74d 131{
d62a17ae 132 struct cpu_thread_history *totals = args[0];
fbcac826 133 struct cpu_thread_history copy;
d62a17ae 134 struct vty *vty = args[1];
fbcac826 135 uint8_t *filter = args[2];
d62a17ae 136
137 struct cpu_thread_history *a = bucket->data;
138
fbcac826
QY
139 copy.total_active =
140 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
141 copy.total_calls =
142 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
143 copy.cpu.total =
144 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
145 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
146 copy.real.total =
147 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
148 copy.real.max =
149 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
150 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
151 copy.funcname = a->funcname;
152
153 if (!(copy.types & *filter))
d62a17ae 154 return;
fbcac826
QY
155
156 vty_out_cpu_thread_history(vty, &copy);
157 totals->total_active += copy.total_active;
158 totals->total_calls += copy.total_calls;
159 totals->real.total += copy.real.total;
160 if (totals->real.max < copy.real.max)
161 totals->real.max = copy.real.max;
162 totals->cpu.total += copy.cpu.total;
163 if (totals->cpu.max < copy.cpu.max)
164 totals->cpu.max = copy.cpu.max;
e04ab74d 165}
166
fbcac826 167static void cpu_record_print(struct vty *vty, uint8_t filter)
e04ab74d 168{
d62a17ae 169 struct cpu_thread_history tmp;
170 void *args[3] = {&tmp, vty, &filter};
171 struct thread_master *m;
172 struct listnode *ln;
173
174 memset(&tmp, 0, sizeof tmp);
175 tmp.funcname = "TOTAL";
176 tmp.types = filter;
177
00dffa8c 178 frr_with_mutex(&masters_mtx) {
d62a17ae 179 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
180 const char *name = m->name ? m->name : "main";
181
182 char underline[strlen(name) + 1];
183 memset(underline, '-', sizeof(underline));
4f113d60 184 underline[sizeof(underline) - 1] = '\0';
d62a17ae 185
186 vty_out(vty, "\n");
187 vty_out(vty, "Showing statistics for pthread %s\n",
188 name);
189 vty_out(vty, "-------------------------------%s\n",
190 underline);
191 vty_out(vty, "%21s %18s %18s\n", "",
192 "CPU (user+system):", "Real (wall-clock):");
193 vty_out(vty,
194 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
195 vty_out(vty, " Avg uSec Max uSecs");
196 vty_out(vty, " Type Thread\n");
197
198 if (m->cpu_record->count)
199 hash_iterate(
200 m->cpu_record,
e3b78da8 201 (void (*)(struct hash_bucket *,
d62a17ae 202 void *))cpu_record_hash_print,
203 args);
204 else
205 vty_out(vty, "No data to display yet.\n");
206
207 vty_out(vty, "\n");
208 }
209 }
d62a17ae 210
211 vty_out(vty, "\n");
212 vty_out(vty, "Total thread statistics\n");
213 vty_out(vty, "-------------------------\n");
214 vty_out(vty, "%21s %18s %18s\n", "",
215 "CPU (user+system):", "Real (wall-clock):");
216 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
217 vty_out(vty, " Avg uSec Max uSecs");
218 vty_out(vty, " Type Thread\n");
219
220 if (tmp.total_calls > 0)
221 vty_out_cpu_thread_history(vty, &tmp);
e04ab74d 222}
f75e802d 223#endif
e04ab74d 224
e3b78da8 225static void cpu_record_hash_clear(struct hash_bucket *bucket, void *args[])
e276eb82 226{
fbcac826 227 uint8_t *filter = args[0];
d62a17ae 228 struct hash *cpu_record = args[1];
229
230 struct cpu_thread_history *a = bucket->data;
62f44022 231
d62a17ae 232 if (!(a->types & *filter))
233 return;
f48f65d2 234
d62a17ae 235 hash_release(cpu_record, bucket->data);
e276eb82
PJ
236}
237
fbcac826 238static void cpu_record_clear(uint8_t filter)
e276eb82 239{
fbcac826 240 uint8_t *tmp = &filter;
d62a17ae 241 struct thread_master *m;
242 struct listnode *ln;
243
00dffa8c 244 frr_with_mutex(&masters_mtx) {
d62a17ae 245 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
00dffa8c 246 frr_with_mutex(&m->mtx) {
d62a17ae 247 void *args[2] = {tmp, m->cpu_record};
248 hash_iterate(
249 m->cpu_record,
e3b78da8 250 (void (*)(struct hash_bucket *,
d62a17ae 251 void *))cpu_record_hash_clear,
252 args);
253 }
d62a17ae 254 }
255 }
62f44022
QY
256}
257
fbcac826 258static uint8_t parse_filter(const char *filterstr)
62f44022 259{
d62a17ae 260 int i = 0;
261 int filter = 0;
262
263 while (filterstr[i] != '\0') {
264 switch (filterstr[i]) {
265 case 'r':
266 case 'R':
267 filter |= (1 << THREAD_READ);
268 break;
269 case 'w':
270 case 'W':
271 filter |= (1 << THREAD_WRITE);
272 break;
273 case 't':
274 case 'T':
275 filter |= (1 << THREAD_TIMER);
276 break;
277 case 'e':
278 case 'E':
279 filter |= (1 << THREAD_EVENT);
280 break;
281 case 'x':
282 case 'X':
283 filter |= (1 << THREAD_EXECUTE);
284 break;
285 default:
286 break;
287 }
288 ++i;
289 }
290 return filter;
62f44022
QY
291}
292
f75e802d 293#ifndef EXCLUDE_CPU_TIME
62f44022
QY
294DEFUN (show_thread_cpu,
295 show_thread_cpu_cmd,
296 "show thread cpu [FILTER]",
297 SHOW_STR
298 "Thread information\n"
299 "Thread CPU usage\n"
61fa0b97 300 "Display filter (rwtex)\n")
62f44022 301{
fbcac826 302 uint8_t filter = (uint8_t)-1U;
d62a17ae 303 int idx = 0;
304
305 if (argv_find(argv, argc, "FILTER", &idx)) {
306 filter = parse_filter(argv[idx]->arg);
307 if (!filter) {
308 vty_out(vty,
309 "Invalid filter \"%s\" specified; must contain at least"
310 "one of 'RWTEXB'\n",
311 argv[idx]->arg);
312 return CMD_WARNING;
313 }
314 }
315
316 cpu_record_print(vty, filter);
317 return CMD_SUCCESS;
e276eb82 318}
f75e802d 319#endif
e276eb82 320
8872626b
DS
321static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
322{
323 const char *name = m->name ? m->name : "main";
324 char underline[strlen(name) + 1];
a0b36ae6 325 struct thread *thread;
8872626b
DS
326 uint32_t i;
327
328 memset(underline, '-', sizeof(underline));
329 underline[sizeof(underline) - 1] = '\0';
330
331 vty_out(vty, "\nShowing poll FD's for %s\n", name);
332 vty_out(vty, "----------------------%s\n", underline);
6c19478a
DS
333 vty_out(vty, "Count: %u/%d\n", (uint32_t)m->handler.pfdcount,
334 m->fd_limit);
a0b36ae6
DS
335 for (i = 0; i < m->handler.pfdcount; i++) {
336 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\t\t", i,
337 m->handler.pfds[i].fd, m->handler.pfds[i].events,
8872626b 338 m->handler.pfds[i].revents);
a0b36ae6
DS
339
340 if (m->handler.pfds[i].events & POLLIN) {
341 thread = m->read[m->handler.pfds[i].fd];
342
343 if (!thread)
344 vty_out(vty, "ERROR ");
345 else
346 vty_out(vty, "%s ", thread->funcname);
347 } else
348 vty_out(vty, " ");
349
350 if (m->handler.pfds[i].events & POLLOUT) {
351 thread = m->write[m->handler.pfds[i].fd];
352
353 if (!thread)
354 vty_out(vty, "ERROR\n");
355 else
356 vty_out(vty, "%s\n", thread->funcname);
357 } else
358 vty_out(vty, "\n");
359 }
8872626b
DS
360}
361
362DEFUN (show_thread_poll,
363 show_thread_poll_cmd,
364 "show thread poll",
365 SHOW_STR
366 "Thread information\n"
367 "Show poll FD's and information\n")
368{
369 struct listnode *node;
370 struct thread_master *m;
371
00dffa8c 372 frr_with_mutex(&masters_mtx) {
8872626b
DS
373 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
374 show_thread_poll_helper(vty, m);
375 }
376 }
8872626b
DS
377
378 return CMD_SUCCESS;
379}
380
381
49d41a26
DS
382DEFUN (clear_thread_cpu,
383 clear_thread_cpu_cmd,
384 "clear thread cpu [FILTER]",
62f44022 385 "Clear stored data in all pthreads\n"
49d41a26
DS
386 "Thread information\n"
387 "Thread CPU usage\n"
388 "Display filter (rwtexb)\n")
e276eb82 389{
fbcac826 390 uint8_t filter = (uint8_t)-1U;
d62a17ae 391 int idx = 0;
392
393 if (argv_find(argv, argc, "FILTER", &idx)) {
394 filter = parse_filter(argv[idx]->arg);
395 if (!filter) {
396 vty_out(vty,
397 "Invalid filter \"%s\" specified; must contain at least"
398 "one of 'RWTEXB'\n",
399 argv[idx]->arg);
400 return CMD_WARNING;
401 }
402 }
403
404 cpu_record_clear(filter);
405 return CMD_SUCCESS;
e276eb82 406}
6b0655a2 407
d62a17ae 408void thread_cmd_init(void)
0b84f294 409{
f75e802d 410#ifndef EXCLUDE_CPU_TIME
d62a17ae 411 install_element(VIEW_NODE, &show_thread_cpu_cmd);
f75e802d 412#endif
8872626b 413 install_element(VIEW_NODE, &show_thread_poll_cmd);
d62a17ae 414 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
0b84f294 415}
62f44022
QY
416/* CLI end ------------------------------------------------------------------ */
417
0b84f294 418
d62a17ae 419static void cancelreq_del(void *cr)
63ccb9cb 420{
d62a17ae 421 XFREE(MTYPE_TMP, cr);
63ccb9cb
QY
422}
423
e0bebc7c 424/* initializer, only ever called once */
4d762f26 425static void initializer(void)
e0bebc7c 426{
d62a17ae 427 pthread_key_create(&thread_current, NULL);
e0bebc7c
QY
428}
429
d62a17ae 430struct thread_master *thread_master_create(const char *name)
718e3744 431{
d62a17ae 432 struct thread_master *rv;
433 struct rlimit limit;
434
435 pthread_once(&init_once, &initializer);
436
437 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
d62a17ae 438
439 /* Initialize master mutex */
440 pthread_mutex_init(&rv->mtx, NULL);
441 pthread_cond_init(&rv->cancel_cond, NULL);
442
443 /* Set name */
444 rv->name = name ? XSTRDUP(MTYPE_THREAD_MASTER, name) : NULL;
445
446 /* Initialize I/O task data structures */
447 getrlimit(RLIMIT_NOFILE, &limit);
448 rv->fd_limit = (int)limit.rlim_cur;
a6f235f3
DS
449 rv->read = XCALLOC(MTYPE_THREAD_POLL,
450 sizeof(struct thread *) * rv->fd_limit);
451
452 rv->write = XCALLOC(MTYPE_THREAD_POLL,
453 sizeof(struct thread *) * rv->fd_limit);
d62a17ae 454
bd74dc61 455 rv->cpu_record = hash_create_size(
d8b87afe 456 8, (unsigned int (*)(const void *))cpu_record_hash_key,
74df8d6d 457 (bool (*)(const void *, const void *))cpu_record_hash_cmp,
bd74dc61 458 "Thread Hash");
d62a17ae 459
c284542b
DL
460 thread_list_init(&rv->event);
461 thread_list_init(&rv->ready);
462 thread_list_init(&rv->unuse);
27d29ced 463 thread_timer_list_init(&rv->timer);
d62a17ae 464
465 /* Initialize thread_fetch() settings */
466 rv->spin = true;
467 rv->handle_signals = true;
468
469 /* Set pthread owner, should be updated by actual owner */
470 rv->owner = pthread_self();
471 rv->cancel_req = list_new();
472 rv->cancel_req->del = cancelreq_del;
473 rv->canceled = true;
474
475 /* Initialize pipe poker */
476 pipe(rv->io_pipe);
477 set_nonblocking(rv->io_pipe[0]);
478 set_nonblocking(rv->io_pipe[1]);
479
480 /* Initialize data structures for poll() */
481 rv->handler.pfdsize = rv->fd_limit;
482 rv->handler.pfdcount = 0;
483 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
484 sizeof(struct pollfd) * rv->handler.pfdsize);
485 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
486 sizeof(struct pollfd) * rv->handler.pfdsize);
487
eff09c66 488 /* add to list of threadmasters */
00dffa8c 489 frr_with_mutex(&masters_mtx) {
eff09c66
QY
490 if (!masters)
491 masters = list_new();
492
d62a17ae 493 listnode_add(masters, rv);
494 }
d62a17ae 495
496 return rv;
718e3744 497}
498
d8a8a8de
QY
499void thread_master_set_name(struct thread_master *master, const char *name)
500{
00dffa8c 501 frr_with_mutex(&master->mtx) {
0a22ddfb 502 XFREE(MTYPE_THREAD_MASTER, master->name);
d8a8a8de
QY
503 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
504 }
d8a8a8de
QY
505}
506
6ed04aa2
DS
507#define THREAD_UNUSED_DEPTH 10
508
718e3744 509/* Move thread to unuse list. */
d62a17ae 510static void thread_add_unuse(struct thread_master *m, struct thread *thread)
718e3744 511{
6655966d
RZ
512 pthread_mutex_t mtxc = thread->mtx;
513
d62a17ae 514 assert(m != NULL && thread != NULL);
d62a17ae 515
d62a17ae 516 thread->hist->total_active--;
6ed04aa2
DS
517 memset(thread, 0, sizeof(struct thread));
518 thread->type = THREAD_UNUSED;
519
6655966d
RZ
520 /* Restore the thread mutex context. */
521 thread->mtx = mtxc;
522
c284542b
DL
523 if (thread_list_count(&m->unuse) < THREAD_UNUSED_DEPTH) {
524 thread_list_add_tail(&m->unuse, thread);
6655966d
RZ
525 return;
526 }
527
528 thread_free(m, thread);
718e3744 529}
530
531/* Free all unused thread. */
c284542b
DL
532static void thread_list_free(struct thread_master *m,
533 struct thread_list_head *list)
718e3744 534{
d62a17ae 535 struct thread *t;
d62a17ae 536
c284542b 537 while ((t = thread_list_pop(list)))
6655966d 538 thread_free(m, t);
718e3744 539}
540
d62a17ae 541static void thread_array_free(struct thread_master *m,
542 struct thread **thread_array)
308d14ae 543{
d62a17ae 544 struct thread *t;
545 int index;
546
547 for (index = 0; index < m->fd_limit; ++index) {
548 t = thread_array[index];
549 if (t) {
550 thread_array[index] = NULL;
6655966d 551 thread_free(m, t);
d62a17ae 552 }
553 }
a6f235f3 554 XFREE(MTYPE_THREAD_POLL, thread_array);
308d14ae
DV
555}
556
495f0b13
DS
557/*
558 * thread_master_free_unused
559 *
560 * As threads are finished with they are put on the
561 * unuse list for later reuse.
562 * If we are shutting down, Free up unused threads
563 * So we can see if we forget to shut anything off
564 */
d62a17ae 565void thread_master_free_unused(struct thread_master *m)
495f0b13 566{
00dffa8c 567 frr_with_mutex(&m->mtx) {
d62a17ae 568 struct thread *t;
c284542b 569 while ((t = thread_list_pop(&m->unuse)))
6655966d 570 thread_free(m, t);
d62a17ae 571 }
495f0b13
DS
572}
573
718e3744 574/* Stop thread scheduler. */
d62a17ae 575void thread_master_free(struct thread_master *m)
718e3744 576{
27d29ced
DL
577 struct thread *t;
578
00dffa8c 579 frr_with_mutex(&masters_mtx) {
d62a17ae 580 listnode_delete(masters, m);
eff09c66 581 if (masters->count == 0) {
6a154c88 582 list_delete(&masters);
eff09c66 583 }
d62a17ae 584 }
d62a17ae 585
586 thread_array_free(m, m->read);
587 thread_array_free(m, m->write);
27d29ced
DL
588 while ((t = thread_timer_list_pop(&m->timer)))
589 thread_free(m, t);
d62a17ae 590 thread_list_free(m, &m->event);
591 thread_list_free(m, &m->ready);
592 thread_list_free(m, &m->unuse);
593 pthread_mutex_destroy(&m->mtx);
33844bbe 594 pthread_cond_destroy(&m->cancel_cond);
d62a17ae 595 close(m->io_pipe[0]);
596 close(m->io_pipe[1]);
6a154c88 597 list_delete(&m->cancel_req);
1a0a92ea 598 m->cancel_req = NULL;
d62a17ae 599
600 hash_clean(m->cpu_record, cpu_record_hash_free);
601 hash_free(m->cpu_record);
602 m->cpu_record = NULL;
603
0a22ddfb 604 XFREE(MTYPE_THREAD_MASTER, m->name);
d62a17ae 605 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
606 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
607 XFREE(MTYPE_THREAD_MASTER, m);
718e3744 608}
609
78ca0342
CF
610/* Return remain time in miliseconds. */
611unsigned long thread_timer_remain_msec(struct thread *thread)
718e3744 612{
d62a17ae 613 int64_t remain;
1189d95f 614
00dffa8c 615 frr_with_mutex(&thread->mtx) {
78ca0342 616 remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
d62a17ae 617 }
1189d95f 618
d62a17ae 619 return remain < 0 ? 0 : remain;
718e3744 620}
621
78ca0342
CF
622/* Return remain time in seconds. */
623unsigned long thread_timer_remain_second(struct thread *thread)
624{
625 return thread_timer_remain_msec(thread) / 1000LL;
626}
627
9c7753e4
DL
628#define debugargdef const char *funcname, const char *schedfrom, int fromln
629#define debugargpass funcname, schedfrom, fromln
e04ab74d 630
d62a17ae 631struct timeval thread_timer_remain(struct thread *thread)
6ac44687 632{
d62a17ae 633 struct timeval remain;
00dffa8c 634 frr_with_mutex(&thread->mtx) {
d62a17ae 635 monotime_until(&thread->u.sands, &remain);
636 }
d62a17ae 637 return remain;
6ac44687
CF
638}
639
718e3744 640/* Get new thread. */
d7c0a89a 641static struct thread *thread_get(struct thread_master *m, uint8_t type,
d62a17ae 642 int (*func)(struct thread *), void *arg,
643 debugargdef)
718e3744 644{
c284542b 645 struct thread *thread = thread_list_pop(&m->unuse);
d62a17ae 646 struct cpu_thread_history tmp;
647
648 if (!thread) {
649 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
650 /* mutex only needs to be initialized at struct creation. */
651 pthread_mutex_init(&thread->mtx, NULL);
652 m->alloc++;
653 }
654
655 thread->type = type;
656 thread->add_type = type;
657 thread->master = m;
658 thread->arg = arg;
d62a17ae 659 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
660 thread->ref = NULL;
661
662 /*
663 * So if the passed in funcname is not what we have
664 * stored that means the thread->hist needs to be
665 * updated. We keep the last one around in unused
666 * under the assumption that we are probably
667 * going to immediately allocate the same
668 * type of thread.
669 * This hopefully saves us some serious
670 * hash_get lookups.
671 */
672 if (thread->funcname != funcname || thread->func != func) {
673 tmp.func = func;
674 tmp.funcname = funcname;
675 thread->hist =
676 hash_get(m->cpu_record, &tmp,
677 (void *(*)(void *))cpu_record_hash_alloc);
678 }
679 thread->hist->total_active++;
680 thread->func = func;
681 thread->funcname = funcname;
682 thread->schedfrom = schedfrom;
683 thread->schedfrom_line = fromln;
684
685 return thread;
718e3744 686}
687
6655966d
RZ
688static void thread_free(struct thread_master *master, struct thread *thread)
689{
690 /* Update statistics. */
691 assert(master->alloc > 0);
692 master->alloc--;
693
694 /* Free allocated resources. */
695 pthread_mutex_destroy(&thread->mtx);
696 XFREE(MTYPE_THREAD, thread);
697}
698
d62a17ae 699static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
700 nfds_t count, const struct timeval *timer_wait)
209a72a6 701{
d62a17ae 702 /* If timer_wait is null here, that means poll() should block
703 * indefinitely,
f79f7a7b 704 * unless the thread_master has overridden it by setting
d62a17ae 705 * ->selectpoll_timeout.
706 * If the value is positive, it specifies the maximum number of
707 * milliseconds
708 * to wait. If the timeout is -1, it specifies that we should never wait
709 * and
710 * always return immediately even if no event is detected. If the value
711 * is
712 * zero, the behavior is default. */
713 int timeout = -1;
714
715 /* number of file descriptors with events */
716 int num;
717
718 if (timer_wait != NULL
719 && m->selectpoll_timeout == 0) // use the default value
720 timeout = (timer_wait->tv_sec * 1000)
721 + (timer_wait->tv_usec / 1000);
722 else if (m->selectpoll_timeout > 0) // use the user's timeout
723 timeout = m->selectpoll_timeout;
724 else if (m->selectpoll_timeout
725 < 0) // effect a poll (return immediately)
726 timeout = 0;
727
3e41733f
DL
728 rcu_read_unlock();
729 rcu_assert_read_unlocked();
730
d62a17ae 731 /* add poll pipe poker */
732 assert(count + 1 < pfdsize);
733 pfds[count].fd = m->io_pipe[0];
734 pfds[count].events = POLLIN;
735 pfds[count].revents = 0x00;
736
737 num = poll(pfds, count + 1, timeout);
738
739 unsigned char trash[64];
740 if (num > 0 && pfds[count].revents != 0 && num--)
741 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
742 ;
743
3e41733f
DL
744 rcu_read_lock();
745
d62a17ae 746 return num;
209a72a6
DS
747}
748
718e3744 749/* Add new read thread. */
d62a17ae 750struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
751 int (*func)(struct thread *),
752 void *arg, int fd,
753 struct thread **t_ptr,
754 debugargdef)
718e3744 755{
d62a17ae 756 struct thread *thread = NULL;
1ef14bee 757 struct thread **thread_array;
d62a17ae 758
9b864cd3 759 assert(fd >= 0 && fd < m->fd_limit);
00dffa8c
DL
760 frr_with_mutex(&m->mtx) {
761 if (t_ptr && *t_ptr)
762 // thread is already scheduled; don't reschedule
763 break;
d62a17ae 764
765 /* default to a new pollfd */
766 nfds_t queuepos = m->handler.pfdcount;
767
1ef14bee
DS
768 if (dir == THREAD_READ)
769 thread_array = m->read;
770 else
771 thread_array = m->write;
772
d62a17ae 773 /* if we already have a pollfd for our file descriptor, find and
774 * use it */
775 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
776 if (m->handler.pfds[i].fd == fd) {
777 queuepos = i;
1ef14bee
DS
778
779#ifdef DEV_BUILD
780 /*
781 * What happens if we have a thread already
782 * created for this event?
783 */
784 if (thread_array[fd])
785 assert(!"Thread already scheduled for file descriptor");
786#endif
d62a17ae 787 break;
788 }
789
790 /* make sure we have room for this fd + pipe poker fd */
791 assert(queuepos + 1 < m->handler.pfdsize);
792
793 thread = thread_get(m, dir, func, arg, debugargpass);
794
795 m->handler.pfds[queuepos].fd = fd;
796 m->handler.pfds[queuepos].events |=
797 (dir == THREAD_READ ? POLLIN : POLLOUT);
798
799 if (queuepos == m->handler.pfdcount)
800 m->handler.pfdcount++;
801
802 if (thread) {
00dffa8c 803 frr_with_mutex(&thread->mtx) {
d62a17ae 804 thread->u.fd = fd;
1ef14bee 805 thread_array[thread->u.fd] = thread;
d62a17ae 806 }
d62a17ae 807
808 if (t_ptr) {
809 *t_ptr = thread;
810 thread->ref = t_ptr;
811 }
812 }
813
814 AWAKEN(m);
815 }
d62a17ae 816
817 return thread;
718e3744 818}
819
56a94b36 820static struct thread *
d62a17ae 821funcname_thread_add_timer_timeval(struct thread_master *m,
822 int (*func)(struct thread *), int type,
823 void *arg, struct timeval *time_relative,
824 struct thread **t_ptr, debugargdef)
718e3744 825{
d62a17ae 826 struct thread *thread;
d62a17ae 827
828 assert(m != NULL);
829
830 assert(type == THREAD_TIMER);
831 assert(time_relative);
832
00dffa8c
DL
833 frr_with_mutex(&m->mtx) {
834 if (t_ptr && *t_ptr)
835 // thread is already scheduled; don't reschedule
d62a17ae 836 return NULL;
d62a17ae 837
d62a17ae 838 thread = thread_get(m, type, func, arg, debugargpass);
839
00dffa8c 840 frr_with_mutex(&thread->mtx) {
d62a17ae 841 monotime(&thread->u.sands);
842 timeradd(&thread->u.sands, time_relative,
843 &thread->u.sands);
27d29ced 844 thread_timer_list_add(&m->timer, thread);
d62a17ae 845 if (t_ptr) {
846 *t_ptr = thread;
847 thread->ref = t_ptr;
848 }
849 }
d62a17ae 850
851 AWAKEN(m);
852 }
d62a17ae 853
854 return thread;
9e867fe6 855}
856
98c91ac6 857
858/* Add timer event thread. */
d62a17ae 859struct thread *funcname_thread_add_timer(struct thread_master *m,
860 int (*func)(struct thread *),
861 void *arg, long timer,
862 struct thread **t_ptr, debugargdef)
9e867fe6 863{
d62a17ae 864 struct timeval trel;
9e867fe6 865
d62a17ae 866 assert(m != NULL);
9e867fe6 867
d62a17ae 868 trel.tv_sec = timer;
869 trel.tv_usec = 0;
9e867fe6 870
d62a17ae 871 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
872 &trel, t_ptr, debugargpass);
98c91ac6 873}
9e867fe6 874
98c91ac6 875/* Add timer event thread with "millisecond" resolution */
d62a17ae 876struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
877 int (*func)(struct thread *),
878 void *arg, long timer,
879 struct thread **t_ptr,
880 debugargdef)
98c91ac6 881{
d62a17ae 882 struct timeval trel;
9e867fe6 883
d62a17ae 884 assert(m != NULL);
718e3744 885
d62a17ae 886 trel.tv_sec = timer / 1000;
887 trel.tv_usec = 1000 * (timer % 1000);
98c91ac6 888
d62a17ae 889 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
890 &trel, t_ptr, debugargpass);
a48b4e6d 891}
892
d03c4cbd 893/* Add timer event thread with "millisecond" resolution */
d62a17ae 894struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
895 int (*func)(struct thread *),
896 void *arg, struct timeval *tv,
897 struct thread **t_ptr, debugargdef)
d03c4cbd 898{
d62a17ae 899 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
900 t_ptr, debugargpass);
d03c4cbd
DL
901}
902
718e3744 903/* Add simple event thread. */
d62a17ae 904struct thread *funcname_thread_add_event(struct thread_master *m,
905 int (*func)(struct thread *),
906 void *arg, int val,
907 struct thread **t_ptr, debugargdef)
718e3744 908{
00dffa8c 909 struct thread *thread = NULL;
d62a17ae 910
911 assert(m != NULL);
912
00dffa8c
DL
913 frr_with_mutex(&m->mtx) {
914 if (t_ptr && *t_ptr)
915 // thread is already scheduled; don't reschedule
916 break;
d62a17ae 917
918 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
00dffa8c 919 frr_with_mutex(&thread->mtx) {
d62a17ae 920 thread->u.val = val;
c284542b 921 thread_list_add_tail(&m->event, thread);
d62a17ae 922 }
d62a17ae 923
924 if (t_ptr) {
925 *t_ptr = thread;
926 thread->ref = t_ptr;
927 }
928
929 AWAKEN(m);
930 }
d62a17ae 931
932 return thread;
718e3744 933}
934
63ccb9cb
QY
935/* Thread cancellation ------------------------------------------------------ */
936
8797240e
QY
937/**
938 * NOT's out the .events field of pollfd corresponding to the given file
939 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
940 *
941 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
942 * implementation for details.
943 *
944 * @param master
945 * @param fd
946 * @param state the event to cancel. One or more (OR'd together) of the
947 * following:
948 * - POLLIN
949 * - POLLOUT
950 */
d62a17ae 951static void thread_cancel_rw(struct thread_master *master, int fd, short state)
0a95a0d0 952{
42d74538
QY
953 bool found = false;
954
d62a17ae 955 /* Cancel POLLHUP too just in case some bozo set it */
956 state |= POLLHUP;
957
958 /* find the index of corresponding pollfd */
959 nfds_t i;
960
961 for (i = 0; i < master->handler.pfdcount; i++)
42d74538
QY
962 if (master->handler.pfds[i].fd == fd) {
963 found = true;
d62a17ae 964 break;
42d74538
QY
965 }
966
967 if (!found) {
968 zlog_debug(
969 "[!] Received cancellation request for nonexistent rw job");
970 zlog_debug("[!] threadmaster: %s | fd: %d",
996c9314 971 master->name ? master->name : "", fd);
42d74538
QY
972 return;
973 }
d62a17ae 974
975 /* NOT out event. */
976 master->handler.pfds[i].events &= ~(state);
977
978 /* If all events are canceled, delete / resize the pollfd array. */
979 if (master->handler.pfds[i].events == 0) {
980 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
981 (master->handler.pfdcount - i - 1)
982 * sizeof(struct pollfd));
983 master->handler.pfdcount--;
e985cda0
S
984 master->handler.pfds[master->handler.pfdcount].fd = 0;
985 master->handler.pfds[master->handler.pfdcount].events = 0;
d62a17ae 986 }
987
988 /* If we have the same pollfd in the copy, perform the same operations,
989 * otherwise return. */
990 if (i >= master->handler.copycount)
991 return;
992
993 master->handler.copy[i].events &= ~(state);
994
995 if (master->handler.copy[i].events == 0) {
996 memmove(master->handler.copy + i, master->handler.copy + i + 1,
997 (master->handler.copycount - i - 1)
998 * sizeof(struct pollfd));
999 master->handler.copycount--;
e985cda0
S
1000 master->handler.copy[master->handler.copycount].fd = 0;
1001 master->handler.copy[master->handler.copycount].events = 0;
d62a17ae 1002 }
0a95a0d0
DS
1003}
1004
1189d95f 1005/**
63ccb9cb 1006 * Process cancellation requests.
1189d95f 1007 *
63ccb9cb
QY
1008 * This may only be run from the pthread which owns the thread_master.
1009 *
1010 * @param master the thread master to process
1011 * @REQUIRE master->mtx
1189d95f 1012 */
d62a17ae 1013static void do_thread_cancel(struct thread_master *master)
718e3744 1014{
c284542b 1015 struct thread_list_head *list = NULL;
d62a17ae 1016 struct thread **thread_array = NULL;
1017 struct thread *thread;
1018
1019 struct cancel_req *cr;
1020 struct listnode *ln;
1021 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1022 /* If this is an event object cancellation, linear search
1023 * through event
1024 * list deleting any events which have the specified argument.
1025 * We also
1026 * need to check every thread in the ready queue. */
1027 if (cr->eventobj) {
1028 struct thread *t;
c284542b 1029
81fddbe7 1030 frr_each_safe(thread_list, &master->event, t) {
c284542b
DL
1031 if (t->arg != cr->eventobj)
1032 continue;
1033 thread_list_del(&master->event, t);
1034 if (t->ref)
1035 *t->ref = NULL;
1036 thread_add_unuse(master, t);
d62a17ae 1037 }
1038
81fddbe7 1039 frr_each_safe(thread_list, &master->ready, t) {
c284542b
DL
1040 if (t->arg != cr->eventobj)
1041 continue;
1042 thread_list_del(&master->ready, t);
1043 if (t->ref)
1044 *t->ref = NULL;
1045 thread_add_unuse(master, t);
d62a17ae 1046 }
1047 continue;
1048 }
1049
1050 /* The pointer varies depending on whether the cancellation
1051 * request was
1052 * made asynchronously or not. If it was, we need to check
1053 * whether the
1054 * thread even exists anymore before cancelling it. */
1055 thread = (cr->thread) ? cr->thread : *cr->threadref;
1056
1057 if (!thread)
1058 continue;
1059
1060 /* Determine the appropriate queue to cancel the thread from */
1061 switch (thread->type) {
1062 case THREAD_READ:
1063 thread_cancel_rw(master, thread->u.fd, POLLIN);
1064 thread_array = master->read;
1065 break;
1066 case THREAD_WRITE:
1067 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1068 thread_array = master->write;
1069 break;
1070 case THREAD_TIMER:
27d29ced 1071 thread_timer_list_del(&master->timer, thread);
d62a17ae 1072 break;
1073 case THREAD_EVENT:
1074 list = &master->event;
1075 break;
1076 case THREAD_READY:
1077 list = &master->ready;
1078 break;
1079 default:
1080 continue;
1081 break;
1082 }
1083
27d29ced 1084 if (list) {
c284542b 1085 thread_list_del(list, thread);
d62a17ae 1086 } else if (thread_array) {
1087 thread_array[thread->u.fd] = NULL;
d62a17ae 1088 }
1089
1090 if (thread->ref)
1091 *thread->ref = NULL;
1092
1093 thread_add_unuse(thread->master, thread);
1094 }
1095
1096 /* Delete and free all cancellation requests */
1097 list_delete_all_node(master->cancel_req);
1098
1099 /* Wake up any threads which may be blocked in thread_cancel_async() */
1100 master->canceled = true;
1101 pthread_cond_broadcast(&master->cancel_cond);
718e3744 1102}
1103
63ccb9cb
QY
1104/**
1105 * Cancel any events which have the specified argument.
1106 *
1107 * MT-Unsafe
1108 *
1109 * @param m the thread_master to cancel from
1110 * @param arg the argument passed when creating the event
1111 */
d62a17ae 1112void thread_cancel_event(struct thread_master *master, void *arg)
718e3744 1113{
d62a17ae 1114 assert(master->owner == pthread_self());
1115
00dffa8c 1116 frr_with_mutex(&master->mtx) {
d62a17ae 1117 struct cancel_req *cr =
1118 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1119 cr->eventobj = arg;
1120 listnode_add(master->cancel_req, cr);
1121 do_thread_cancel(master);
1122 }
63ccb9cb 1123}
1189d95f 1124
63ccb9cb
QY
1125/**
1126 * Cancel a specific task.
1127 *
1128 * MT-Unsafe
1129 *
1130 * @param thread task to cancel
1131 */
d62a17ae 1132void thread_cancel(struct thread *thread)
63ccb9cb 1133{
6ed04aa2 1134 struct thread_master *master = thread->master;
d62a17ae 1135
6ed04aa2
DS
1136 assert(master->owner == pthread_self());
1137
00dffa8c 1138 frr_with_mutex(&master->mtx) {
d62a17ae 1139 struct cancel_req *cr =
1140 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1141 cr->thread = thread;
6ed04aa2
DS
1142 listnode_add(master->cancel_req, cr);
1143 do_thread_cancel(master);
d62a17ae 1144 }
63ccb9cb 1145}
1189d95f 1146
63ccb9cb
QY
1147/**
1148 * Asynchronous cancellation.
1149 *
8797240e
QY
1150 * Called with either a struct thread ** or void * to an event argument,
1151 * this function posts the correct cancellation request and blocks until it is
1152 * serviced.
63ccb9cb
QY
1153 *
1154 * If the thread is currently running, execution blocks until it completes.
1155 *
8797240e
QY
1156 * The last two parameters are mutually exclusive, i.e. if you pass one the
1157 * other must be NULL.
1158 *
1159 * When the cancellation procedure executes on the target thread_master, the
1160 * thread * provided is checked for nullity. If it is null, the thread is
1161 * assumed to no longer exist and the cancellation request is a no-op. Thus
1162 * users of this API must pass a back-reference when scheduling the original
1163 * task.
1164 *
63ccb9cb
QY
1165 * MT-Safe
1166 *
8797240e
QY
1167 * @param master the thread master with the relevant event / task
1168 * @param thread pointer to thread to cancel
1169 * @param eventobj the event
63ccb9cb 1170 */
d62a17ae 1171void thread_cancel_async(struct thread_master *master, struct thread **thread,
1172 void *eventobj)
63ccb9cb 1173{
d62a17ae 1174 assert(!(thread && eventobj) && (thread || eventobj));
1175 assert(master->owner != pthread_self());
1176
00dffa8c 1177 frr_with_mutex(&master->mtx) {
d62a17ae 1178 master->canceled = false;
1179
1180 if (thread) {
1181 struct cancel_req *cr =
1182 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1183 cr->threadref = thread;
1184 listnode_add(master->cancel_req, cr);
1185 } else if (eventobj) {
1186 struct cancel_req *cr =
1187 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1188 cr->eventobj = eventobj;
1189 listnode_add(master->cancel_req, cr);
1190 }
1191 AWAKEN(master);
1192
1193 while (!master->canceled)
1194 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1195 }
718e3744 1196}
63ccb9cb 1197/* ------------------------------------------------------------------------- */
718e3744 1198
27d29ced 1199static struct timeval *thread_timer_wait(struct thread_timer_list_head *timers,
d62a17ae 1200 struct timeval *timer_val)
718e3744 1201{
27d29ced
DL
1202 if (!thread_timer_list_count(timers))
1203 return NULL;
1204
1205 struct thread *next_timer = thread_timer_list_first(timers);
1206 monotime_until(&next_timer->u.sands, timer_val);
1207 return timer_val;
718e3744 1208}
718e3744 1209
d62a17ae 1210static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1211 struct thread *fetch)
718e3744 1212{
d62a17ae 1213 *fetch = *thread;
1214 thread_add_unuse(m, thread);
1215 return fetch;
718e3744 1216}
1217
d62a17ae 1218static int thread_process_io_helper(struct thread_master *m,
45f3d590
DS
1219 struct thread *thread, short state,
1220 short actual_state, int pos)
5d4ccd4e 1221{
d62a17ae 1222 struct thread **thread_array;
1223
45f3d590
DS
1224 /*
1225 * poll() clears the .events field, but the pollfd array we
1226 * pass to poll() is a copy of the one used to schedule threads.
1227 * We need to synchronize state between the two here by applying
1228 * the same changes poll() made on the copy of the "real" pollfd
1229 * array.
1230 *
1231 * This cleans up a possible infinite loop where we refuse
1232 * to respond to a poll event but poll is insistent that
1233 * we should.
1234 */
1235 m->handler.pfds[pos].events &= ~(state);
1236
1237 if (!thread) {
1238 if ((actual_state & (POLLHUP|POLLIN)) != POLLHUP)
1239 flog_err(EC_LIB_NO_THREAD,
1240 "Attempting to process an I/O event but for fd: %d(%d) no thread to handle this!\n",
1241 m->handler.pfds[pos].fd, actual_state);
d62a17ae 1242 return 0;
45f3d590 1243 }
d62a17ae 1244
1245 if (thread->type == THREAD_READ)
1246 thread_array = m->read;
1247 else
1248 thread_array = m->write;
1249
1250 thread_array[thread->u.fd] = NULL;
c284542b 1251 thread_list_add_tail(&m->ready, thread);
d62a17ae 1252 thread->type = THREAD_READY;
45f3d590 1253
d62a17ae 1254 return 1;
5d4ccd4e
DS
1255}
1256
8797240e
QY
1257/**
1258 * Process I/O events.
1259 *
1260 * Walks through file descriptor array looking for those pollfds whose .revents
1261 * field has something interesting. Deletes any invalid file descriptors.
1262 *
1263 * @param m the thread master
1264 * @param num the number of active file descriptors (return value of poll())
1265 */
d62a17ae 1266static void thread_process_io(struct thread_master *m, unsigned int num)
0a95a0d0 1267{
d62a17ae 1268 unsigned int ready = 0;
1269 struct pollfd *pfds = m->handler.copy;
1270
1271 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1272 /* no event for current fd? immediately continue */
1273 if (pfds[i].revents == 0)
1274 continue;
1275
1276 ready++;
1277
1278 /* Unless someone has called thread_cancel from another pthread,
1279 * the only
1280 * thing that could have changed in m->handler.pfds while we
1281 * were
1282 * asleep is the .events field in a given pollfd. Barring
1283 * thread_cancel()
1284 * that value should be a superset of the values we have in our
1285 * copy, so
1286 * there's no need to update it. Similarily, barring deletion,
1287 * the fd
1288 * should still be a valid index into the master's pfds. */
45f3d590 1289 if (pfds[i].revents & (POLLIN | POLLHUP)) {
d62a17ae 1290 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
45f3d590
DS
1291 pfds[i].revents, i);
1292 }
d62a17ae 1293 if (pfds[i].revents & POLLOUT)
1294 thread_process_io_helper(m, m->write[pfds[i].fd],
45f3d590 1295 POLLOUT, pfds[i].revents, i);
d62a17ae 1296
1297 /* if one of our file descriptors is garbage, remove the same
1298 * from
1299 * both pfds + update sizes and index */
1300 if (pfds[i].revents & POLLNVAL) {
1301 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1302 (m->handler.pfdcount - i - 1)
1303 * sizeof(struct pollfd));
1304 m->handler.pfdcount--;
e985cda0
S
1305 m->handler.pfds[m->handler.pfdcount].fd = 0;
1306 m->handler.pfds[m->handler.pfdcount].events = 0;
d62a17ae 1307
1308 memmove(pfds + i, pfds + i + 1,
1309 (m->handler.copycount - i - 1)
1310 * sizeof(struct pollfd));
1311 m->handler.copycount--;
e985cda0
S
1312 m->handler.copy[m->handler.copycount].fd = 0;
1313 m->handler.copy[m->handler.copycount].events = 0;
d62a17ae 1314
1315 i--;
1316 }
1317 }
718e3744 1318}
1319
8b70d0b0 1320/* Add all timers that have popped to the ready list. */
27d29ced 1321static unsigned int thread_process_timers(struct thread_timer_list_head *timers,
d62a17ae 1322 struct timeval *timenow)
a48b4e6d 1323{
d62a17ae 1324 struct thread *thread;
1325 unsigned int ready = 0;
1326
27d29ced 1327 while ((thread = thread_timer_list_first(timers))) {
d62a17ae 1328 if (timercmp(timenow, &thread->u.sands, <))
1329 return ready;
27d29ced 1330 thread_timer_list_pop(timers);
d62a17ae 1331 thread->type = THREAD_READY;
c284542b 1332 thread_list_add_tail(&thread->master->ready, thread);
d62a17ae 1333 ready++;
1334 }
1335 return ready;
a48b4e6d 1336}
1337
2613abe6 1338/* process a list en masse, e.g. for event thread lists */
c284542b 1339static unsigned int thread_process(struct thread_list_head *list)
2613abe6 1340{
d62a17ae 1341 struct thread *thread;
d62a17ae 1342 unsigned int ready = 0;
1343
c284542b 1344 while ((thread = thread_list_pop(list))) {
d62a17ae 1345 thread->type = THREAD_READY;
c284542b 1346 thread_list_add_tail(&thread->master->ready, thread);
d62a17ae 1347 ready++;
1348 }
1349 return ready;
2613abe6
PJ
1350}
1351
1352
718e3744 1353/* Fetch next ready thread. */
d62a17ae 1354struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
718e3744 1355{
d62a17ae 1356 struct thread *thread = NULL;
1357 struct timeval now;
1358 struct timeval zerotime = {0, 0};
1359 struct timeval tv;
1360 struct timeval *tw = NULL;
1361
1362 int num = 0;
1363
1364 do {
1365 /* Handle signals if any */
1366 if (m->handle_signals)
1367 quagga_sigevent_process();
1368
1369 pthread_mutex_lock(&m->mtx);
1370
1371 /* Process any pending cancellation requests */
1372 do_thread_cancel(m);
1373
e3c9529e
QY
1374 /*
1375 * Attempt to flush ready queue before going into poll().
1376 * This is performance-critical. Think twice before modifying.
1377 */
c284542b 1378 if ((thread = thread_list_pop(&m->ready))) {
e3c9529e
QY
1379 fetch = thread_run(m, thread, fetch);
1380 if (fetch->ref)
1381 *fetch->ref = NULL;
1382 pthread_mutex_unlock(&m->mtx);
1383 break;
1384 }
1385
1386 /* otherwise, tick through scheduling sequence */
1387
bca37d17
QY
1388 /*
1389 * Post events to ready queue. This must come before the
1390 * following block since events should occur immediately
1391 */
d62a17ae 1392 thread_process(&m->event);
1393
bca37d17
QY
1394 /*
1395 * If there are no tasks on the ready queue, we will poll()
1396 * until a timer expires or we receive I/O, whichever comes
1397 * first. The strategy for doing this is:
d62a17ae 1398 *
1399 * - If there are events pending, set the poll() timeout to zero
1400 * - If there are no events pending, but there are timers
1401 * pending, set the
1402 * timeout to the smallest remaining time on any timer
1403 * - If there are neither timers nor events pending, but there
1404 * are file
1405 * descriptors pending, block indefinitely in poll()
1406 * - If nothing is pending, it's time for the application to die
1407 *
1408 * In every case except the last, we need to hit poll() at least
bca37d17
QY
1409 * once per loop to avoid starvation by events
1410 */
c284542b 1411 if (!thread_list_count(&m->ready))
27d29ced 1412 tw = thread_timer_wait(&m->timer, &tv);
d62a17ae 1413
c284542b
DL
1414 if (thread_list_count(&m->ready) ||
1415 (tw && !timercmp(tw, &zerotime, >)))
d62a17ae 1416 tw = &zerotime;
1417
1418 if (!tw && m->handler.pfdcount == 0) { /* die */
1419 pthread_mutex_unlock(&m->mtx);
1420 fetch = NULL;
1421 break;
1422 }
1423
bca37d17
QY
1424 /*
1425 * Copy pollfd array + # active pollfds in it. Not necessary to
1426 * copy the array size as this is fixed.
1427 */
d62a17ae 1428 m->handler.copycount = m->handler.pfdcount;
1429 memcpy(m->handler.copy, m->handler.pfds,
1430 m->handler.copycount * sizeof(struct pollfd));
1431
e3c9529e
QY
1432 pthread_mutex_unlock(&m->mtx);
1433 {
1434 num = fd_poll(m, m->handler.copy, m->handler.pfdsize,
1435 m->handler.copycount, tw);
1436 }
1437 pthread_mutex_lock(&m->mtx);
d764d2cc 1438
e3c9529e
QY
1439 /* Handle any errors received in poll() */
1440 if (num < 0) {
1441 if (errno == EINTR) {
d62a17ae 1442 pthread_mutex_unlock(&m->mtx);
e3c9529e
QY
1443 /* loop around to signal handler */
1444 continue;
d62a17ae 1445 }
1446
e3c9529e 1447 /* else die */
450971aa 1448 flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
9ef9495e 1449 safe_strerror(errno));
e3c9529e
QY
1450 pthread_mutex_unlock(&m->mtx);
1451 fetch = NULL;
1452 break;
bca37d17 1453 }
d62a17ae 1454
1455 /* Post timers to ready queue. */
1456 monotime(&now);
27d29ced 1457 thread_process_timers(&m->timer, &now);
d62a17ae 1458
1459 /* Post I/O to ready queue. */
1460 if (num > 0)
1461 thread_process_io(m, num);
1462
d62a17ae 1463 pthread_mutex_unlock(&m->mtx);
1464
1465 } while (!thread && m->spin);
1466
1467 return fetch;
718e3744 1468}
1469
d62a17ae 1470static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
62f44022 1471{
d62a17ae 1472 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1473 + (a.tv_usec - b.tv_usec));
62f44022
QY
1474}
1475
d62a17ae 1476unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1477 unsigned long *cputime)
718e3744 1478{
d62a17ae 1479 /* This is 'user + sys' time. */
1480 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1481 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1482 return timeval_elapsed(now->real, start->real);
8b70d0b0 1483}
1484
50596be0
DS
1485/* We should aim to yield after yield milliseconds, which defaults
1486 to THREAD_YIELD_TIME_SLOT .
8b70d0b0 1487 Note: we are using real (wall clock) time for this calculation.
1488 It could be argued that CPU time may make more sense in certain
1489 contexts. The things to consider are whether the thread may have
1490 blocked (in which case wall time increases, but CPU time does not),
1491 or whether the system is heavily loaded with other processes competing
d62a17ae 1492 for CPU time. On balance, wall clock time seems to make sense.
8b70d0b0 1493 Plus it has the added benefit that gettimeofday should be faster
1494 than calling getrusage. */
d62a17ae 1495int thread_should_yield(struct thread *thread)
718e3744 1496{
d62a17ae 1497 int result;
00dffa8c 1498 frr_with_mutex(&thread->mtx) {
d62a17ae 1499 result = monotime_since(&thread->real, NULL)
1500 > (int64_t)thread->yield;
1501 }
d62a17ae 1502 return result;
50596be0
DS
1503}
1504
d62a17ae 1505void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
50596be0 1506{
00dffa8c 1507 frr_with_mutex(&thread->mtx) {
d62a17ae 1508 thread->yield = yield_time;
1509 }
718e3744 1510}
1511
d62a17ae 1512void thread_getrusage(RUSAGE_T *r)
db9c0df9 1513{
231db9a6
DS
1514#if defined RUSAGE_THREAD
1515#define FRR_RUSAGE RUSAGE_THREAD
1516#else
1517#define FRR_RUSAGE RUSAGE_SELF
1518#endif
d62a17ae 1519 monotime(&r->real);
f75e802d 1520#ifndef EXCLUDE_CPU_TIME
231db9a6 1521 getrusage(FRR_RUSAGE, &(r->cpu));
f75e802d 1522#endif
db9c0df9
PJ
1523}
1524
fbcac826
QY
1525/*
1526 * Call a thread.
1527 *
1528 * This function will atomically update the thread's usage history. At present
1529 * this is the only spot where usage history is written. Nevertheless the code
1530 * has been written such that the introduction of writers in the future should
1531 * not need to update it provided the writers atomically perform only the
1532 * operations done here, i.e. updating the total and maximum times. In
1533 * particular, the maximum real and cpu times must be monotonically increasing
1534 * or this code is not correct.
1535 */
d62a17ae 1536void thread_call(struct thread *thread)
718e3744 1537{
f75e802d 1538#ifndef EXCLUDE_CPU_TIME
fbcac826
QY
1539 _Atomic unsigned long realtime, cputime;
1540 unsigned long exp;
1541 unsigned long helper;
f75e802d 1542#endif
d62a17ae 1543 RUSAGE_T before, after;
cc8b13a0 1544
d62a17ae 1545 GETRUSAGE(&before);
1546 thread->real = before.real;
718e3744 1547
d62a17ae 1548 pthread_setspecific(thread_current, thread);
1549 (*thread->func)(thread);
1550 pthread_setspecific(thread_current, NULL);
718e3744 1551
d62a17ae 1552 GETRUSAGE(&after);
718e3744 1553
f75e802d 1554#ifndef EXCLUDE_CPU_TIME
fbcac826
QY
1555 realtime = thread_consumed_time(&after, &before, &helper);
1556 cputime = helper;
1557
1558 /* update realtime */
1559 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1560 memory_order_seq_cst);
1561 exp = atomic_load_explicit(&thread->hist->real.max,
1562 memory_order_seq_cst);
1563 while (exp < realtime
1564 && !atomic_compare_exchange_weak_explicit(
1565 &thread->hist->real.max, &exp, realtime,
1566 memory_order_seq_cst, memory_order_seq_cst))
1567 ;
1568
1569 /* update cputime */
1570 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1571 memory_order_seq_cst);
1572 exp = atomic_load_explicit(&thread->hist->cpu.max,
1573 memory_order_seq_cst);
1574 while (exp < cputime
1575 && !atomic_compare_exchange_weak_explicit(
1576 &thread->hist->cpu.max, &exp, cputime,
1577 memory_order_seq_cst, memory_order_seq_cst))
1578 ;
1579
1580 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1581 memory_order_seq_cst);
1582 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1583 memory_order_seq_cst);
718e3744 1584
924b9229 1585#ifdef CONSUMED_TIME_CHECK
d62a17ae 1586 if (realtime > CONSUMED_TIME_CHECK) {
1587 /*
1588 * We have a CPU Hog on our hands.
1589 * Whinge about it now, so we're aware this is yet another task
1590 * to fix.
1591 */
9ef9495e 1592 flog_warn(
450971aa 1593 EC_LIB_SLOW_THREAD,
d62a17ae 1594 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1595 thread->funcname, (unsigned long)thread->func,
1596 realtime / 1000, cputime / 1000);
1597 }
924b9229 1598#endif /* CONSUMED_TIME_CHECK */
f75e802d 1599#endif /* Exclude CPU Time */
718e3744 1600}
1601
1602/* Execute thread */
d62a17ae 1603void funcname_thread_execute(struct thread_master *m,
1604 int (*func)(struct thread *), void *arg, int val,
1605 debugargdef)
718e3744 1606{
c4345fbf 1607 struct thread *thread;
718e3744 1608
c4345fbf 1609 /* Get or allocate new thread to execute. */
00dffa8c 1610 frr_with_mutex(&m->mtx) {
c4345fbf 1611 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
9c7753e4 1612
c4345fbf 1613 /* Set its event value. */
00dffa8c 1614 frr_with_mutex(&thread->mtx) {
c4345fbf
RZ
1615 thread->add_type = THREAD_EXECUTE;
1616 thread->u.val = val;
1617 thread->ref = &thread;
1618 }
c4345fbf 1619 }
f7c62e11 1620
c4345fbf
RZ
1621 /* Execute thread doing all accounting. */
1622 thread_call(thread);
9c7753e4 1623
c4345fbf
RZ
1624 /* Give back or free thread. */
1625 thread_add_unuse(m, thread);
718e3744 1626}