]> git.proxmox.com Git - mirror_frr.git/blob - lib/thread.c
Merge pull request #5703 from ton31337/feature/limit_outgoing_prefixes
[mirror_frr.git] / lib / thread.c
1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
21 /* #define DEBUG */
22
23 #include <zebra.h>
24 #include <sys/resource.h>
25
26 #include "thread.h"
27 #include "memory.h"
28 #include "frrcu.h"
29 #include "log.h"
30 #include "hash.h"
31 #include "command.h"
32 #include "sigevent.h"
33 #include "network.h"
34 #include "jhash.h"
35 #include "frratomic.h"
36 #include "frr_pthread.h"
37 #include "lib_errors.h"
38
39 DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
40 DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
41 DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
42 DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
43
44 DECLARE_LIST(thread_list, struct thread, threaditem)
45
46 static int thread_timer_cmp(const struct thread *a, const struct thread *b)
47 {
48 if (a->u.sands.tv_sec < b->u.sands.tv_sec)
49 return -1;
50 if (a->u.sands.tv_sec > b->u.sands.tv_sec)
51 return 1;
52 if (a->u.sands.tv_usec < b->u.sands.tv_usec)
53 return -1;
54 if (a->u.sands.tv_usec > b->u.sands.tv_usec)
55 return 1;
56 return 0;
57 }
58
59 DECLARE_HEAP(thread_timer_list, struct thread, timeritem,
60 thread_timer_cmp)
61
62 #if defined(__APPLE__)
63 #include <mach/mach.h>
64 #include <mach/mach_time.h>
65 #endif
66
67 #define AWAKEN(m) \
68 do { \
69 const unsigned char wakebyte = 0x01; \
70 write(m->io_pipe[1], &wakebyte, 1); \
71 } while (0);
72
73 /* control variable for initializer */
74 static pthread_once_t init_once = PTHREAD_ONCE_INIT;
75 pthread_key_t thread_current;
76
77 static pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
78 static struct list *masters;
79
80 static void thread_free(struct thread_master *master, struct thread *thread);
81
82 /* CLI start ---------------------------------------------------------------- */
83 static unsigned int cpu_record_hash_key(const struct cpu_thread_history *a)
84 {
85 int size = sizeof(a->func);
86
87 return jhash(&a->func, size, 0);
88 }
89
90 static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
91 const struct cpu_thread_history *b)
92 {
93 return a->func == b->func;
94 }
95
96 static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
97 {
98 struct cpu_thread_history *new;
99 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
100 new->func = a->func;
101 new->funcname = a->funcname;
102 return new;
103 }
104
105 static void cpu_record_hash_free(void *a)
106 {
107 struct cpu_thread_history *hist = a;
108
109 XFREE(MTYPE_THREAD_STATS, hist);
110 }
111
112 #ifndef EXCLUDE_CPU_TIME
113 static void vty_out_cpu_thread_history(struct vty *vty,
114 struct cpu_thread_history *a)
115 {
116 vty_out(vty, "%5zu %10zu.%03zu %9zu %8zu %9zu %8zu %9zu",
117 (size_t)a->total_active,
118 a->cpu.total / 1000, a->cpu.total % 1000,
119 (size_t)a->total_calls,
120 a->cpu.total / a->total_calls, a->cpu.max,
121 a->real.total / a->total_calls, a->real.max);
122 vty_out(vty, " %c%c%c%c%c %s\n",
123 a->types & (1 << THREAD_READ) ? 'R' : ' ',
124 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
125 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
126 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
127 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
128 }
129
130 static void cpu_record_hash_print(struct hash_bucket *bucket, void *args[])
131 {
132 struct cpu_thread_history *totals = args[0];
133 struct cpu_thread_history copy;
134 struct vty *vty = args[1];
135 uint8_t *filter = args[2];
136
137 struct cpu_thread_history *a = bucket->data;
138
139 copy.total_active =
140 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
141 copy.total_calls =
142 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
143 copy.cpu.total =
144 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
145 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
146 copy.real.total =
147 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
148 copy.real.max =
149 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
150 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
151 copy.funcname = a->funcname;
152
153 if (!(copy.types & *filter))
154 return;
155
156 vty_out_cpu_thread_history(vty, &copy);
157 totals->total_active += copy.total_active;
158 totals->total_calls += copy.total_calls;
159 totals->real.total += copy.real.total;
160 if (totals->real.max < copy.real.max)
161 totals->real.max = copy.real.max;
162 totals->cpu.total += copy.cpu.total;
163 if (totals->cpu.max < copy.cpu.max)
164 totals->cpu.max = copy.cpu.max;
165 }
166
167 static void cpu_record_print(struct vty *vty, uint8_t filter)
168 {
169 struct cpu_thread_history tmp;
170 void *args[3] = {&tmp, vty, &filter};
171 struct thread_master *m;
172 struct listnode *ln;
173
174 memset(&tmp, 0, sizeof tmp);
175 tmp.funcname = "TOTAL";
176 tmp.types = filter;
177
178 frr_with_mutex(&masters_mtx) {
179 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
180 const char *name = m->name ? m->name : "main";
181
182 char underline[strlen(name) + 1];
183 memset(underline, '-', sizeof(underline));
184 underline[sizeof(underline) - 1] = '\0';
185
186 vty_out(vty, "\n");
187 vty_out(vty, "Showing statistics for pthread %s\n",
188 name);
189 vty_out(vty, "-------------------------------%s\n",
190 underline);
191 vty_out(vty, "%21s %18s %18s\n", "",
192 "CPU (user+system):", "Real (wall-clock):");
193 vty_out(vty,
194 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
195 vty_out(vty, " Avg uSec Max uSecs");
196 vty_out(vty, " Type Thread\n");
197
198 if (m->cpu_record->count)
199 hash_iterate(
200 m->cpu_record,
201 (void (*)(struct hash_bucket *,
202 void *))cpu_record_hash_print,
203 args);
204 else
205 vty_out(vty, "No data to display yet.\n");
206
207 vty_out(vty, "\n");
208 }
209 }
210
211 vty_out(vty, "\n");
212 vty_out(vty, "Total thread statistics\n");
213 vty_out(vty, "-------------------------\n");
214 vty_out(vty, "%21s %18s %18s\n", "",
215 "CPU (user+system):", "Real (wall-clock):");
216 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
217 vty_out(vty, " Avg uSec Max uSecs");
218 vty_out(vty, " Type Thread\n");
219
220 if (tmp.total_calls > 0)
221 vty_out_cpu_thread_history(vty, &tmp);
222 }
223 #endif
224
225 static void cpu_record_hash_clear(struct hash_bucket *bucket, void *args[])
226 {
227 uint8_t *filter = args[0];
228 struct hash *cpu_record = args[1];
229
230 struct cpu_thread_history *a = bucket->data;
231
232 if (!(a->types & *filter))
233 return;
234
235 hash_release(cpu_record, bucket->data);
236 }
237
238 static void cpu_record_clear(uint8_t filter)
239 {
240 uint8_t *tmp = &filter;
241 struct thread_master *m;
242 struct listnode *ln;
243
244 frr_with_mutex(&masters_mtx) {
245 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
246 frr_with_mutex(&m->mtx) {
247 void *args[2] = {tmp, m->cpu_record};
248 hash_iterate(
249 m->cpu_record,
250 (void (*)(struct hash_bucket *,
251 void *))cpu_record_hash_clear,
252 args);
253 }
254 }
255 }
256 }
257
258 static uint8_t parse_filter(const char *filterstr)
259 {
260 int i = 0;
261 int filter = 0;
262
263 while (filterstr[i] != '\0') {
264 switch (filterstr[i]) {
265 case 'r':
266 case 'R':
267 filter |= (1 << THREAD_READ);
268 break;
269 case 'w':
270 case 'W':
271 filter |= (1 << THREAD_WRITE);
272 break;
273 case 't':
274 case 'T':
275 filter |= (1 << THREAD_TIMER);
276 break;
277 case 'e':
278 case 'E':
279 filter |= (1 << THREAD_EVENT);
280 break;
281 case 'x':
282 case 'X':
283 filter |= (1 << THREAD_EXECUTE);
284 break;
285 default:
286 break;
287 }
288 ++i;
289 }
290 return filter;
291 }
292
293 #ifndef EXCLUDE_CPU_TIME
294 DEFUN (show_thread_cpu,
295 show_thread_cpu_cmd,
296 "show thread cpu [FILTER]",
297 SHOW_STR
298 "Thread information\n"
299 "Thread CPU usage\n"
300 "Display filter (rwtex)\n")
301 {
302 uint8_t filter = (uint8_t)-1U;
303 int idx = 0;
304
305 if (argv_find(argv, argc, "FILTER", &idx)) {
306 filter = parse_filter(argv[idx]->arg);
307 if (!filter) {
308 vty_out(vty,
309 "Invalid filter \"%s\" specified; must contain at least"
310 "one of 'RWTEXB'\n",
311 argv[idx]->arg);
312 return CMD_WARNING;
313 }
314 }
315
316 cpu_record_print(vty, filter);
317 return CMD_SUCCESS;
318 }
319 #endif
320
321 static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
322 {
323 const char *name = m->name ? m->name : "main";
324 char underline[strlen(name) + 1];
325 struct thread *thread;
326 uint32_t i;
327
328 memset(underline, '-', sizeof(underline));
329 underline[sizeof(underline) - 1] = '\0';
330
331 vty_out(vty, "\nShowing poll FD's for %s\n", name);
332 vty_out(vty, "----------------------%s\n", underline);
333 vty_out(vty, "Count: %u/%d\n", (uint32_t)m->handler.pfdcount,
334 m->fd_limit);
335 for (i = 0; i < m->handler.pfdcount; i++) {
336 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\t\t", i,
337 m->handler.pfds[i].fd, m->handler.pfds[i].events,
338 m->handler.pfds[i].revents);
339
340 if (m->handler.pfds[i].events & POLLIN) {
341 thread = m->read[m->handler.pfds[i].fd];
342
343 if (!thread)
344 vty_out(vty, "ERROR ");
345 else
346 vty_out(vty, "%s ", thread->funcname);
347 } else
348 vty_out(vty, " ");
349
350 if (m->handler.pfds[i].events & POLLOUT) {
351 thread = m->write[m->handler.pfds[i].fd];
352
353 if (!thread)
354 vty_out(vty, "ERROR\n");
355 else
356 vty_out(vty, "%s\n", thread->funcname);
357 } else
358 vty_out(vty, "\n");
359 }
360 }
361
362 DEFUN (show_thread_poll,
363 show_thread_poll_cmd,
364 "show thread poll",
365 SHOW_STR
366 "Thread information\n"
367 "Show poll FD's and information\n")
368 {
369 struct listnode *node;
370 struct thread_master *m;
371
372 frr_with_mutex(&masters_mtx) {
373 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
374 show_thread_poll_helper(vty, m);
375 }
376 }
377
378 return CMD_SUCCESS;
379 }
380
381
382 DEFUN (clear_thread_cpu,
383 clear_thread_cpu_cmd,
384 "clear thread cpu [FILTER]",
385 "Clear stored data in all pthreads\n"
386 "Thread information\n"
387 "Thread CPU usage\n"
388 "Display filter (rwtexb)\n")
389 {
390 uint8_t filter = (uint8_t)-1U;
391 int idx = 0;
392
393 if (argv_find(argv, argc, "FILTER", &idx)) {
394 filter = parse_filter(argv[idx]->arg);
395 if (!filter) {
396 vty_out(vty,
397 "Invalid filter \"%s\" specified; must contain at least"
398 "one of 'RWTEXB'\n",
399 argv[idx]->arg);
400 return CMD_WARNING;
401 }
402 }
403
404 cpu_record_clear(filter);
405 return CMD_SUCCESS;
406 }
407
408 void thread_cmd_init(void)
409 {
410 #ifndef EXCLUDE_CPU_TIME
411 install_element(VIEW_NODE, &show_thread_cpu_cmd);
412 #endif
413 install_element(VIEW_NODE, &show_thread_poll_cmd);
414 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
415 }
416 /* CLI end ------------------------------------------------------------------ */
417
418
419 static void cancelreq_del(void *cr)
420 {
421 XFREE(MTYPE_TMP, cr);
422 }
423
424 /* initializer, only ever called once */
425 static void initializer(void)
426 {
427 pthread_key_create(&thread_current, NULL);
428 }
429
430 struct thread_master *thread_master_create(const char *name)
431 {
432 struct thread_master *rv;
433 struct rlimit limit;
434
435 pthread_once(&init_once, &initializer);
436
437 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
438
439 /* Initialize master mutex */
440 pthread_mutex_init(&rv->mtx, NULL);
441 pthread_cond_init(&rv->cancel_cond, NULL);
442
443 /* Set name */
444 rv->name = name ? XSTRDUP(MTYPE_THREAD_MASTER, name) : NULL;
445
446 /* Initialize I/O task data structures */
447 getrlimit(RLIMIT_NOFILE, &limit);
448 rv->fd_limit = (int)limit.rlim_cur;
449 rv->read = XCALLOC(MTYPE_THREAD_POLL,
450 sizeof(struct thread *) * rv->fd_limit);
451
452 rv->write = XCALLOC(MTYPE_THREAD_POLL,
453 sizeof(struct thread *) * rv->fd_limit);
454
455 rv->cpu_record = hash_create_size(
456 8, (unsigned int (*)(const void *))cpu_record_hash_key,
457 (bool (*)(const void *, const void *))cpu_record_hash_cmp,
458 "Thread Hash");
459
460 thread_list_init(&rv->event);
461 thread_list_init(&rv->ready);
462 thread_list_init(&rv->unuse);
463 thread_timer_list_init(&rv->timer);
464
465 /* Initialize thread_fetch() settings */
466 rv->spin = true;
467 rv->handle_signals = true;
468
469 /* Set pthread owner, should be updated by actual owner */
470 rv->owner = pthread_self();
471 rv->cancel_req = list_new();
472 rv->cancel_req->del = cancelreq_del;
473 rv->canceled = true;
474
475 /* Initialize pipe poker */
476 pipe(rv->io_pipe);
477 set_nonblocking(rv->io_pipe[0]);
478 set_nonblocking(rv->io_pipe[1]);
479
480 /* Initialize data structures for poll() */
481 rv->handler.pfdsize = rv->fd_limit;
482 rv->handler.pfdcount = 0;
483 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
484 sizeof(struct pollfd) * rv->handler.pfdsize);
485 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
486 sizeof(struct pollfd) * rv->handler.pfdsize);
487
488 /* add to list of threadmasters */
489 frr_with_mutex(&masters_mtx) {
490 if (!masters)
491 masters = list_new();
492
493 listnode_add(masters, rv);
494 }
495
496 return rv;
497 }
498
499 void thread_master_set_name(struct thread_master *master, const char *name)
500 {
501 frr_with_mutex(&master->mtx) {
502 XFREE(MTYPE_THREAD_MASTER, master->name);
503 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
504 }
505 }
506
507 #define THREAD_UNUSED_DEPTH 10
508
509 /* Move thread to unuse list. */
510 static void thread_add_unuse(struct thread_master *m, struct thread *thread)
511 {
512 pthread_mutex_t mtxc = thread->mtx;
513
514 assert(m != NULL && thread != NULL);
515
516 thread->hist->total_active--;
517 memset(thread, 0, sizeof(struct thread));
518 thread->type = THREAD_UNUSED;
519
520 /* Restore the thread mutex context. */
521 thread->mtx = mtxc;
522
523 if (thread_list_count(&m->unuse) < THREAD_UNUSED_DEPTH) {
524 thread_list_add_tail(&m->unuse, thread);
525 return;
526 }
527
528 thread_free(m, thread);
529 }
530
531 /* Free all unused thread. */
532 static void thread_list_free(struct thread_master *m,
533 struct thread_list_head *list)
534 {
535 struct thread *t;
536
537 while ((t = thread_list_pop(list)))
538 thread_free(m, t);
539 }
540
541 static void thread_array_free(struct thread_master *m,
542 struct thread **thread_array)
543 {
544 struct thread *t;
545 int index;
546
547 for (index = 0; index < m->fd_limit; ++index) {
548 t = thread_array[index];
549 if (t) {
550 thread_array[index] = NULL;
551 thread_free(m, t);
552 }
553 }
554 XFREE(MTYPE_THREAD_POLL, thread_array);
555 }
556
557 /*
558 * thread_master_free_unused
559 *
560 * As threads are finished with they are put on the
561 * unuse list for later reuse.
562 * If we are shutting down, Free up unused threads
563 * So we can see if we forget to shut anything off
564 */
565 void thread_master_free_unused(struct thread_master *m)
566 {
567 frr_with_mutex(&m->mtx) {
568 struct thread *t;
569 while ((t = thread_list_pop(&m->unuse)))
570 thread_free(m, t);
571 }
572 }
573
574 /* Stop thread scheduler. */
575 void thread_master_free(struct thread_master *m)
576 {
577 struct thread *t;
578
579 frr_with_mutex(&masters_mtx) {
580 listnode_delete(masters, m);
581 if (masters->count == 0) {
582 list_delete(&masters);
583 }
584 }
585
586 thread_array_free(m, m->read);
587 thread_array_free(m, m->write);
588 while ((t = thread_timer_list_pop(&m->timer)))
589 thread_free(m, t);
590 thread_list_free(m, &m->event);
591 thread_list_free(m, &m->ready);
592 thread_list_free(m, &m->unuse);
593 pthread_mutex_destroy(&m->mtx);
594 pthread_cond_destroy(&m->cancel_cond);
595 close(m->io_pipe[0]);
596 close(m->io_pipe[1]);
597 list_delete(&m->cancel_req);
598 m->cancel_req = NULL;
599
600 hash_clean(m->cpu_record, cpu_record_hash_free);
601 hash_free(m->cpu_record);
602 m->cpu_record = NULL;
603
604 XFREE(MTYPE_THREAD_MASTER, m->name);
605 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
606 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
607 XFREE(MTYPE_THREAD_MASTER, m);
608 }
609
610 /* Return remain time in miliseconds. */
611 unsigned long thread_timer_remain_msec(struct thread *thread)
612 {
613 int64_t remain;
614
615 frr_with_mutex(&thread->mtx) {
616 remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
617 }
618
619 return remain < 0 ? 0 : remain;
620 }
621
622 /* Return remain time in seconds. */
623 unsigned long thread_timer_remain_second(struct thread *thread)
624 {
625 return thread_timer_remain_msec(thread) / 1000LL;
626 }
627
628 #define debugargdef const char *funcname, const char *schedfrom, int fromln
629 #define debugargpass funcname, schedfrom, fromln
630
631 struct timeval thread_timer_remain(struct thread *thread)
632 {
633 struct timeval remain;
634 frr_with_mutex(&thread->mtx) {
635 monotime_until(&thread->u.sands, &remain);
636 }
637 return remain;
638 }
639
640 /* Get new thread. */
641 static struct thread *thread_get(struct thread_master *m, uint8_t type,
642 int (*func)(struct thread *), void *arg,
643 debugargdef)
644 {
645 struct thread *thread = thread_list_pop(&m->unuse);
646 struct cpu_thread_history tmp;
647
648 if (!thread) {
649 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
650 /* mutex only needs to be initialized at struct creation. */
651 pthread_mutex_init(&thread->mtx, NULL);
652 m->alloc++;
653 }
654
655 thread->type = type;
656 thread->add_type = type;
657 thread->master = m;
658 thread->arg = arg;
659 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
660 thread->ref = NULL;
661
662 /*
663 * So if the passed in funcname is not what we have
664 * stored that means the thread->hist needs to be
665 * updated. We keep the last one around in unused
666 * under the assumption that we are probably
667 * going to immediately allocate the same
668 * type of thread.
669 * This hopefully saves us some serious
670 * hash_get lookups.
671 */
672 if (thread->funcname != funcname || thread->func != func) {
673 tmp.func = func;
674 tmp.funcname = funcname;
675 thread->hist =
676 hash_get(m->cpu_record, &tmp,
677 (void *(*)(void *))cpu_record_hash_alloc);
678 }
679 thread->hist->total_active++;
680 thread->func = func;
681 thread->funcname = funcname;
682 thread->schedfrom = schedfrom;
683 thread->schedfrom_line = fromln;
684
685 return thread;
686 }
687
688 static void thread_free(struct thread_master *master, struct thread *thread)
689 {
690 /* Update statistics. */
691 assert(master->alloc > 0);
692 master->alloc--;
693
694 /* Free allocated resources. */
695 pthread_mutex_destroy(&thread->mtx);
696 XFREE(MTYPE_THREAD, thread);
697 }
698
699 static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
700 nfds_t count, const struct timeval *timer_wait)
701 {
702 /* If timer_wait is null here, that means poll() should block
703 * indefinitely,
704 * unless the thread_master has overridden it by setting
705 * ->selectpoll_timeout.
706 * If the value is positive, it specifies the maximum number of
707 * milliseconds
708 * to wait. If the timeout is -1, it specifies that we should never wait
709 * and
710 * always return immediately even if no event is detected. If the value
711 * is
712 * zero, the behavior is default. */
713 int timeout = -1;
714
715 /* number of file descriptors with events */
716 int num;
717
718 if (timer_wait != NULL
719 && m->selectpoll_timeout == 0) // use the default value
720 timeout = (timer_wait->tv_sec * 1000)
721 + (timer_wait->tv_usec / 1000);
722 else if (m->selectpoll_timeout > 0) // use the user's timeout
723 timeout = m->selectpoll_timeout;
724 else if (m->selectpoll_timeout
725 < 0) // effect a poll (return immediately)
726 timeout = 0;
727
728 rcu_read_unlock();
729 rcu_assert_read_unlocked();
730
731 /* add poll pipe poker */
732 assert(count + 1 < pfdsize);
733 pfds[count].fd = m->io_pipe[0];
734 pfds[count].events = POLLIN;
735 pfds[count].revents = 0x00;
736
737 num = poll(pfds, count + 1, timeout);
738
739 unsigned char trash[64];
740 if (num > 0 && pfds[count].revents != 0 && num--)
741 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
742 ;
743
744 rcu_read_lock();
745
746 return num;
747 }
748
749 /* Add new read thread. */
750 struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
751 int (*func)(struct thread *),
752 void *arg, int fd,
753 struct thread **t_ptr,
754 debugargdef)
755 {
756 struct thread *thread = NULL;
757 struct thread **thread_array;
758
759 assert(fd >= 0 && fd < m->fd_limit);
760 frr_with_mutex(&m->mtx) {
761 if (t_ptr && *t_ptr)
762 // thread is already scheduled; don't reschedule
763 break;
764
765 /* default to a new pollfd */
766 nfds_t queuepos = m->handler.pfdcount;
767
768 if (dir == THREAD_READ)
769 thread_array = m->read;
770 else
771 thread_array = m->write;
772
773 /* if we already have a pollfd for our file descriptor, find and
774 * use it */
775 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
776 if (m->handler.pfds[i].fd == fd) {
777 queuepos = i;
778
779 #ifdef DEV_BUILD
780 /*
781 * What happens if we have a thread already
782 * created for this event?
783 */
784 if (thread_array[fd])
785 assert(!"Thread already scheduled for file descriptor");
786 #endif
787 break;
788 }
789
790 /* make sure we have room for this fd + pipe poker fd */
791 assert(queuepos + 1 < m->handler.pfdsize);
792
793 thread = thread_get(m, dir, func, arg, debugargpass);
794
795 m->handler.pfds[queuepos].fd = fd;
796 m->handler.pfds[queuepos].events |=
797 (dir == THREAD_READ ? POLLIN : POLLOUT);
798
799 if (queuepos == m->handler.pfdcount)
800 m->handler.pfdcount++;
801
802 if (thread) {
803 frr_with_mutex(&thread->mtx) {
804 thread->u.fd = fd;
805 thread_array[thread->u.fd] = thread;
806 }
807
808 if (t_ptr) {
809 *t_ptr = thread;
810 thread->ref = t_ptr;
811 }
812 }
813
814 AWAKEN(m);
815 }
816
817 return thread;
818 }
819
820 static struct thread *
821 funcname_thread_add_timer_timeval(struct thread_master *m,
822 int (*func)(struct thread *), int type,
823 void *arg, struct timeval *time_relative,
824 struct thread **t_ptr, debugargdef)
825 {
826 struct thread *thread;
827
828 assert(m != NULL);
829
830 assert(type == THREAD_TIMER);
831 assert(time_relative);
832
833 frr_with_mutex(&m->mtx) {
834 if (t_ptr && *t_ptr)
835 // thread is already scheduled; don't reschedule
836 return NULL;
837
838 thread = thread_get(m, type, func, arg, debugargpass);
839
840 frr_with_mutex(&thread->mtx) {
841 monotime(&thread->u.sands);
842 timeradd(&thread->u.sands, time_relative,
843 &thread->u.sands);
844 thread_timer_list_add(&m->timer, thread);
845 if (t_ptr) {
846 *t_ptr = thread;
847 thread->ref = t_ptr;
848 }
849 }
850
851 AWAKEN(m);
852 }
853
854 return thread;
855 }
856
857
858 /* Add timer event thread. */
859 struct thread *funcname_thread_add_timer(struct thread_master *m,
860 int (*func)(struct thread *),
861 void *arg, long timer,
862 struct thread **t_ptr, debugargdef)
863 {
864 struct timeval trel;
865
866 assert(m != NULL);
867
868 trel.tv_sec = timer;
869 trel.tv_usec = 0;
870
871 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
872 &trel, t_ptr, debugargpass);
873 }
874
875 /* Add timer event thread with "millisecond" resolution */
876 struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
877 int (*func)(struct thread *),
878 void *arg, long timer,
879 struct thread **t_ptr,
880 debugargdef)
881 {
882 struct timeval trel;
883
884 assert(m != NULL);
885
886 trel.tv_sec = timer / 1000;
887 trel.tv_usec = 1000 * (timer % 1000);
888
889 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
890 &trel, t_ptr, debugargpass);
891 }
892
893 /* Add timer event thread with "millisecond" resolution */
894 struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
895 int (*func)(struct thread *),
896 void *arg, struct timeval *tv,
897 struct thread **t_ptr, debugargdef)
898 {
899 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
900 t_ptr, debugargpass);
901 }
902
903 /* Add simple event thread. */
904 struct thread *funcname_thread_add_event(struct thread_master *m,
905 int (*func)(struct thread *),
906 void *arg, int val,
907 struct thread **t_ptr, debugargdef)
908 {
909 struct thread *thread = NULL;
910
911 assert(m != NULL);
912
913 frr_with_mutex(&m->mtx) {
914 if (t_ptr && *t_ptr)
915 // thread is already scheduled; don't reschedule
916 break;
917
918 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
919 frr_with_mutex(&thread->mtx) {
920 thread->u.val = val;
921 thread_list_add_tail(&m->event, thread);
922 }
923
924 if (t_ptr) {
925 *t_ptr = thread;
926 thread->ref = t_ptr;
927 }
928
929 AWAKEN(m);
930 }
931
932 return thread;
933 }
934
935 /* Thread cancellation ------------------------------------------------------ */
936
937 /**
938 * NOT's out the .events field of pollfd corresponding to the given file
939 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
940 *
941 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
942 * implementation for details.
943 *
944 * @param master
945 * @param fd
946 * @param state the event to cancel. One or more (OR'd together) of the
947 * following:
948 * - POLLIN
949 * - POLLOUT
950 */
951 static void thread_cancel_rw(struct thread_master *master, int fd, short state)
952 {
953 bool found = false;
954
955 /* Cancel POLLHUP too just in case some bozo set it */
956 state |= POLLHUP;
957
958 /* find the index of corresponding pollfd */
959 nfds_t i;
960
961 for (i = 0; i < master->handler.pfdcount; i++)
962 if (master->handler.pfds[i].fd == fd) {
963 found = true;
964 break;
965 }
966
967 if (!found) {
968 zlog_debug(
969 "[!] Received cancellation request for nonexistent rw job");
970 zlog_debug("[!] threadmaster: %s | fd: %d",
971 master->name ? master->name : "", fd);
972 return;
973 }
974
975 /* NOT out event. */
976 master->handler.pfds[i].events &= ~(state);
977
978 /* If all events are canceled, delete / resize the pollfd array. */
979 if (master->handler.pfds[i].events == 0) {
980 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
981 (master->handler.pfdcount - i - 1)
982 * sizeof(struct pollfd));
983 master->handler.pfdcount--;
984 master->handler.pfds[master->handler.pfdcount].fd = 0;
985 master->handler.pfds[master->handler.pfdcount].events = 0;
986 }
987
988 /* If we have the same pollfd in the copy, perform the same operations,
989 * otherwise return. */
990 if (i >= master->handler.copycount)
991 return;
992
993 master->handler.copy[i].events &= ~(state);
994
995 if (master->handler.copy[i].events == 0) {
996 memmove(master->handler.copy + i, master->handler.copy + i + 1,
997 (master->handler.copycount - i - 1)
998 * sizeof(struct pollfd));
999 master->handler.copycount--;
1000 master->handler.copy[master->handler.copycount].fd = 0;
1001 master->handler.copy[master->handler.copycount].events = 0;
1002 }
1003 }
1004
1005 /**
1006 * Process cancellation requests.
1007 *
1008 * This may only be run from the pthread which owns the thread_master.
1009 *
1010 * @param master the thread master to process
1011 * @REQUIRE master->mtx
1012 */
1013 static void do_thread_cancel(struct thread_master *master)
1014 {
1015 struct thread_list_head *list = NULL;
1016 struct thread **thread_array = NULL;
1017 struct thread *thread;
1018
1019 struct cancel_req *cr;
1020 struct listnode *ln;
1021 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1022 /* If this is an event object cancellation, linear search
1023 * through event
1024 * list deleting any events which have the specified argument.
1025 * We also
1026 * need to check every thread in the ready queue. */
1027 if (cr->eventobj) {
1028 struct thread *t;
1029
1030 frr_each_safe(thread_list, &master->event, t) {
1031 if (t->arg != cr->eventobj)
1032 continue;
1033 thread_list_del(&master->event, t);
1034 if (t->ref)
1035 *t->ref = NULL;
1036 thread_add_unuse(master, t);
1037 }
1038
1039 frr_each_safe(thread_list, &master->ready, t) {
1040 if (t->arg != cr->eventobj)
1041 continue;
1042 thread_list_del(&master->ready, t);
1043 if (t->ref)
1044 *t->ref = NULL;
1045 thread_add_unuse(master, t);
1046 }
1047 continue;
1048 }
1049
1050 /* The pointer varies depending on whether the cancellation
1051 * request was
1052 * made asynchronously or not. If it was, we need to check
1053 * whether the
1054 * thread even exists anymore before cancelling it. */
1055 thread = (cr->thread) ? cr->thread : *cr->threadref;
1056
1057 if (!thread)
1058 continue;
1059
1060 /* Determine the appropriate queue to cancel the thread from */
1061 switch (thread->type) {
1062 case THREAD_READ:
1063 thread_cancel_rw(master, thread->u.fd, POLLIN);
1064 thread_array = master->read;
1065 break;
1066 case THREAD_WRITE:
1067 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1068 thread_array = master->write;
1069 break;
1070 case THREAD_TIMER:
1071 thread_timer_list_del(&master->timer, thread);
1072 break;
1073 case THREAD_EVENT:
1074 list = &master->event;
1075 break;
1076 case THREAD_READY:
1077 list = &master->ready;
1078 break;
1079 default:
1080 continue;
1081 break;
1082 }
1083
1084 if (list) {
1085 thread_list_del(list, thread);
1086 } else if (thread_array) {
1087 thread_array[thread->u.fd] = NULL;
1088 }
1089
1090 if (thread->ref)
1091 *thread->ref = NULL;
1092
1093 thread_add_unuse(thread->master, thread);
1094 }
1095
1096 /* Delete and free all cancellation requests */
1097 list_delete_all_node(master->cancel_req);
1098
1099 /* Wake up any threads which may be blocked in thread_cancel_async() */
1100 master->canceled = true;
1101 pthread_cond_broadcast(&master->cancel_cond);
1102 }
1103
1104 /**
1105 * Cancel any events which have the specified argument.
1106 *
1107 * MT-Unsafe
1108 *
1109 * @param m the thread_master to cancel from
1110 * @param arg the argument passed when creating the event
1111 */
1112 void thread_cancel_event(struct thread_master *master, void *arg)
1113 {
1114 assert(master->owner == pthread_self());
1115
1116 frr_with_mutex(&master->mtx) {
1117 struct cancel_req *cr =
1118 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1119 cr->eventobj = arg;
1120 listnode_add(master->cancel_req, cr);
1121 do_thread_cancel(master);
1122 }
1123 }
1124
1125 /**
1126 * Cancel a specific task.
1127 *
1128 * MT-Unsafe
1129 *
1130 * @param thread task to cancel
1131 */
1132 void thread_cancel(struct thread *thread)
1133 {
1134 struct thread_master *master = thread->master;
1135
1136 assert(master->owner == pthread_self());
1137
1138 frr_with_mutex(&master->mtx) {
1139 struct cancel_req *cr =
1140 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1141 cr->thread = thread;
1142 listnode_add(master->cancel_req, cr);
1143 do_thread_cancel(master);
1144 }
1145 }
1146
1147 /**
1148 * Asynchronous cancellation.
1149 *
1150 * Called with either a struct thread ** or void * to an event argument,
1151 * this function posts the correct cancellation request and blocks until it is
1152 * serviced.
1153 *
1154 * If the thread is currently running, execution blocks until it completes.
1155 *
1156 * The last two parameters are mutually exclusive, i.e. if you pass one the
1157 * other must be NULL.
1158 *
1159 * When the cancellation procedure executes on the target thread_master, the
1160 * thread * provided is checked for nullity. If it is null, the thread is
1161 * assumed to no longer exist and the cancellation request is a no-op. Thus
1162 * users of this API must pass a back-reference when scheduling the original
1163 * task.
1164 *
1165 * MT-Safe
1166 *
1167 * @param master the thread master with the relevant event / task
1168 * @param thread pointer to thread to cancel
1169 * @param eventobj the event
1170 */
1171 void thread_cancel_async(struct thread_master *master, struct thread **thread,
1172 void *eventobj)
1173 {
1174 assert(!(thread && eventobj) && (thread || eventobj));
1175 assert(master->owner != pthread_self());
1176
1177 frr_with_mutex(&master->mtx) {
1178 master->canceled = false;
1179
1180 if (thread) {
1181 struct cancel_req *cr =
1182 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1183 cr->threadref = thread;
1184 listnode_add(master->cancel_req, cr);
1185 } else if (eventobj) {
1186 struct cancel_req *cr =
1187 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1188 cr->eventobj = eventobj;
1189 listnode_add(master->cancel_req, cr);
1190 }
1191 AWAKEN(master);
1192
1193 while (!master->canceled)
1194 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1195 }
1196 }
1197 /* ------------------------------------------------------------------------- */
1198
1199 static struct timeval *thread_timer_wait(struct thread_timer_list_head *timers,
1200 struct timeval *timer_val)
1201 {
1202 if (!thread_timer_list_count(timers))
1203 return NULL;
1204
1205 struct thread *next_timer = thread_timer_list_first(timers);
1206 monotime_until(&next_timer->u.sands, timer_val);
1207 return timer_val;
1208 }
1209
1210 static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1211 struct thread *fetch)
1212 {
1213 *fetch = *thread;
1214 thread_add_unuse(m, thread);
1215 return fetch;
1216 }
1217
1218 static int thread_process_io_helper(struct thread_master *m,
1219 struct thread *thread, short state,
1220 short actual_state, int pos)
1221 {
1222 struct thread **thread_array;
1223
1224 /*
1225 * poll() clears the .events field, but the pollfd array we
1226 * pass to poll() is a copy of the one used to schedule threads.
1227 * We need to synchronize state between the two here by applying
1228 * the same changes poll() made on the copy of the "real" pollfd
1229 * array.
1230 *
1231 * This cleans up a possible infinite loop where we refuse
1232 * to respond to a poll event but poll is insistent that
1233 * we should.
1234 */
1235 m->handler.pfds[pos].events &= ~(state);
1236
1237 if (!thread) {
1238 if ((actual_state & (POLLHUP|POLLIN)) != POLLHUP)
1239 flog_err(EC_LIB_NO_THREAD,
1240 "Attempting to process an I/O event but for fd: %d(%d) no thread to handle this!\n",
1241 m->handler.pfds[pos].fd, actual_state);
1242 return 0;
1243 }
1244
1245 if (thread->type == THREAD_READ)
1246 thread_array = m->read;
1247 else
1248 thread_array = m->write;
1249
1250 thread_array[thread->u.fd] = NULL;
1251 thread_list_add_tail(&m->ready, thread);
1252 thread->type = THREAD_READY;
1253
1254 return 1;
1255 }
1256
1257 /**
1258 * Process I/O events.
1259 *
1260 * Walks through file descriptor array looking for those pollfds whose .revents
1261 * field has something interesting. Deletes any invalid file descriptors.
1262 *
1263 * @param m the thread master
1264 * @param num the number of active file descriptors (return value of poll())
1265 */
1266 static void thread_process_io(struct thread_master *m, unsigned int num)
1267 {
1268 unsigned int ready = 0;
1269 struct pollfd *pfds = m->handler.copy;
1270
1271 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1272 /* no event for current fd? immediately continue */
1273 if (pfds[i].revents == 0)
1274 continue;
1275
1276 ready++;
1277
1278 /* Unless someone has called thread_cancel from another pthread,
1279 * the only
1280 * thing that could have changed in m->handler.pfds while we
1281 * were
1282 * asleep is the .events field in a given pollfd. Barring
1283 * thread_cancel()
1284 * that value should be a superset of the values we have in our
1285 * copy, so
1286 * there's no need to update it. Similarily, barring deletion,
1287 * the fd
1288 * should still be a valid index into the master's pfds. */
1289 if (pfds[i].revents & (POLLIN | POLLHUP)) {
1290 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
1291 pfds[i].revents, i);
1292 }
1293 if (pfds[i].revents & POLLOUT)
1294 thread_process_io_helper(m, m->write[pfds[i].fd],
1295 POLLOUT, pfds[i].revents, i);
1296
1297 /* if one of our file descriptors is garbage, remove the same
1298 * from
1299 * both pfds + update sizes and index */
1300 if (pfds[i].revents & POLLNVAL) {
1301 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1302 (m->handler.pfdcount - i - 1)
1303 * sizeof(struct pollfd));
1304 m->handler.pfdcount--;
1305 m->handler.pfds[m->handler.pfdcount].fd = 0;
1306 m->handler.pfds[m->handler.pfdcount].events = 0;
1307
1308 memmove(pfds + i, pfds + i + 1,
1309 (m->handler.copycount - i - 1)
1310 * sizeof(struct pollfd));
1311 m->handler.copycount--;
1312 m->handler.copy[m->handler.copycount].fd = 0;
1313 m->handler.copy[m->handler.copycount].events = 0;
1314
1315 i--;
1316 }
1317 }
1318 }
1319
1320 /* Add all timers that have popped to the ready list. */
1321 static unsigned int thread_process_timers(struct thread_timer_list_head *timers,
1322 struct timeval *timenow)
1323 {
1324 struct thread *thread;
1325 unsigned int ready = 0;
1326
1327 while ((thread = thread_timer_list_first(timers))) {
1328 if (timercmp(timenow, &thread->u.sands, <))
1329 return ready;
1330 thread_timer_list_pop(timers);
1331 thread->type = THREAD_READY;
1332 thread_list_add_tail(&thread->master->ready, thread);
1333 ready++;
1334 }
1335 return ready;
1336 }
1337
1338 /* process a list en masse, e.g. for event thread lists */
1339 static unsigned int thread_process(struct thread_list_head *list)
1340 {
1341 struct thread *thread;
1342 unsigned int ready = 0;
1343
1344 while ((thread = thread_list_pop(list))) {
1345 thread->type = THREAD_READY;
1346 thread_list_add_tail(&thread->master->ready, thread);
1347 ready++;
1348 }
1349 return ready;
1350 }
1351
1352
1353 /* Fetch next ready thread. */
1354 struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
1355 {
1356 struct thread *thread = NULL;
1357 struct timeval now;
1358 struct timeval zerotime = {0, 0};
1359 struct timeval tv;
1360 struct timeval *tw = NULL;
1361
1362 int num = 0;
1363
1364 do {
1365 /* Handle signals if any */
1366 if (m->handle_signals)
1367 quagga_sigevent_process();
1368
1369 pthread_mutex_lock(&m->mtx);
1370
1371 /* Process any pending cancellation requests */
1372 do_thread_cancel(m);
1373
1374 /*
1375 * Attempt to flush ready queue before going into poll().
1376 * This is performance-critical. Think twice before modifying.
1377 */
1378 if ((thread = thread_list_pop(&m->ready))) {
1379 fetch = thread_run(m, thread, fetch);
1380 if (fetch->ref)
1381 *fetch->ref = NULL;
1382 pthread_mutex_unlock(&m->mtx);
1383 break;
1384 }
1385
1386 /* otherwise, tick through scheduling sequence */
1387
1388 /*
1389 * Post events to ready queue. This must come before the
1390 * following block since events should occur immediately
1391 */
1392 thread_process(&m->event);
1393
1394 /*
1395 * If there are no tasks on the ready queue, we will poll()
1396 * until a timer expires or we receive I/O, whichever comes
1397 * first. The strategy for doing this is:
1398 *
1399 * - If there are events pending, set the poll() timeout to zero
1400 * - If there are no events pending, but there are timers
1401 * pending, set the
1402 * timeout to the smallest remaining time on any timer
1403 * - If there are neither timers nor events pending, but there
1404 * are file
1405 * descriptors pending, block indefinitely in poll()
1406 * - If nothing is pending, it's time for the application to die
1407 *
1408 * In every case except the last, we need to hit poll() at least
1409 * once per loop to avoid starvation by events
1410 */
1411 if (!thread_list_count(&m->ready))
1412 tw = thread_timer_wait(&m->timer, &tv);
1413
1414 if (thread_list_count(&m->ready) ||
1415 (tw && !timercmp(tw, &zerotime, >)))
1416 tw = &zerotime;
1417
1418 if (!tw && m->handler.pfdcount == 0) { /* die */
1419 pthread_mutex_unlock(&m->mtx);
1420 fetch = NULL;
1421 break;
1422 }
1423
1424 /*
1425 * Copy pollfd array + # active pollfds in it. Not necessary to
1426 * copy the array size as this is fixed.
1427 */
1428 m->handler.copycount = m->handler.pfdcount;
1429 memcpy(m->handler.copy, m->handler.pfds,
1430 m->handler.copycount * sizeof(struct pollfd));
1431
1432 pthread_mutex_unlock(&m->mtx);
1433 {
1434 num = fd_poll(m, m->handler.copy, m->handler.pfdsize,
1435 m->handler.copycount, tw);
1436 }
1437 pthread_mutex_lock(&m->mtx);
1438
1439 /* Handle any errors received in poll() */
1440 if (num < 0) {
1441 if (errno == EINTR) {
1442 pthread_mutex_unlock(&m->mtx);
1443 /* loop around to signal handler */
1444 continue;
1445 }
1446
1447 /* else die */
1448 flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
1449 safe_strerror(errno));
1450 pthread_mutex_unlock(&m->mtx);
1451 fetch = NULL;
1452 break;
1453 }
1454
1455 /* Post timers to ready queue. */
1456 monotime(&now);
1457 thread_process_timers(&m->timer, &now);
1458
1459 /* Post I/O to ready queue. */
1460 if (num > 0)
1461 thread_process_io(m, num);
1462
1463 pthread_mutex_unlock(&m->mtx);
1464
1465 } while (!thread && m->spin);
1466
1467 return fetch;
1468 }
1469
1470 static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
1471 {
1472 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1473 + (a.tv_usec - b.tv_usec));
1474 }
1475
1476 unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1477 unsigned long *cputime)
1478 {
1479 /* This is 'user + sys' time. */
1480 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1481 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1482 return timeval_elapsed(now->real, start->real);
1483 }
1484
1485 /* We should aim to yield after yield milliseconds, which defaults
1486 to THREAD_YIELD_TIME_SLOT .
1487 Note: we are using real (wall clock) time for this calculation.
1488 It could be argued that CPU time may make more sense in certain
1489 contexts. The things to consider are whether the thread may have
1490 blocked (in which case wall time increases, but CPU time does not),
1491 or whether the system is heavily loaded with other processes competing
1492 for CPU time. On balance, wall clock time seems to make sense.
1493 Plus it has the added benefit that gettimeofday should be faster
1494 than calling getrusage. */
1495 int thread_should_yield(struct thread *thread)
1496 {
1497 int result;
1498 frr_with_mutex(&thread->mtx) {
1499 result = monotime_since(&thread->real, NULL)
1500 > (int64_t)thread->yield;
1501 }
1502 return result;
1503 }
1504
1505 void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
1506 {
1507 frr_with_mutex(&thread->mtx) {
1508 thread->yield = yield_time;
1509 }
1510 }
1511
1512 void thread_getrusage(RUSAGE_T *r)
1513 {
1514 #if defined RUSAGE_THREAD
1515 #define FRR_RUSAGE RUSAGE_THREAD
1516 #else
1517 #define FRR_RUSAGE RUSAGE_SELF
1518 #endif
1519 monotime(&r->real);
1520 #ifndef EXCLUDE_CPU_TIME
1521 getrusage(FRR_RUSAGE, &(r->cpu));
1522 #endif
1523 }
1524
1525 /*
1526 * Call a thread.
1527 *
1528 * This function will atomically update the thread's usage history. At present
1529 * this is the only spot where usage history is written. Nevertheless the code
1530 * has been written such that the introduction of writers in the future should
1531 * not need to update it provided the writers atomically perform only the
1532 * operations done here, i.e. updating the total and maximum times. In
1533 * particular, the maximum real and cpu times must be monotonically increasing
1534 * or this code is not correct.
1535 */
1536 void thread_call(struct thread *thread)
1537 {
1538 #ifndef EXCLUDE_CPU_TIME
1539 _Atomic unsigned long realtime, cputime;
1540 unsigned long exp;
1541 unsigned long helper;
1542 #endif
1543 RUSAGE_T before, after;
1544
1545 GETRUSAGE(&before);
1546 thread->real = before.real;
1547
1548 pthread_setspecific(thread_current, thread);
1549 (*thread->func)(thread);
1550 pthread_setspecific(thread_current, NULL);
1551
1552 GETRUSAGE(&after);
1553
1554 #ifndef EXCLUDE_CPU_TIME
1555 realtime = thread_consumed_time(&after, &before, &helper);
1556 cputime = helper;
1557
1558 /* update realtime */
1559 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1560 memory_order_seq_cst);
1561 exp = atomic_load_explicit(&thread->hist->real.max,
1562 memory_order_seq_cst);
1563 while (exp < realtime
1564 && !atomic_compare_exchange_weak_explicit(
1565 &thread->hist->real.max, &exp, realtime,
1566 memory_order_seq_cst, memory_order_seq_cst))
1567 ;
1568
1569 /* update cputime */
1570 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1571 memory_order_seq_cst);
1572 exp = atomic_load_explicit(&thread->hist->cpu.max,
1573 memory_order_seq_cst);
1574 while (exp < cputime
1575 && !atomic_compare_exchange_weak_explicit(
1576 &thread->hist->cpu.max, &exp, cputime,
1577 memory_order_seq_cst, memory_order_seq_cst))
1578 ;
1579
1580 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1581 memory_order_seq_cst);
1582 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1583 memory_order_seq_cst);
1584
1585 #ifdef CONSUMED_TIME_CHECK
1586 if (realtime > CONSUMED_TIME_CHECK) {
1587 /*
1588 * We have a CPU Hog on our hands.
1589 * Whinge about it now, so we're aware this is yet another task
1590 * to fix.
1591 */
1592 flog_warn(
1593 EC_LIB_SLOW_THREAD,
1594 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1595 thread->funcname, (unsigned long)thread->func,
1596 realtime / 1000, cputime / 1000);
1597 }
1598 #endif /* CONSUMED_TIME_CHECK */
1599 #endif /* Exclude CPU Time */
1600 }
1601
1602 /* Execute thread */
1603 void funcname_thread_execute(struct thread_master *m,
1604 int (*func)(struct thread *), void *arg, int val,
1605 debugargdef)
1606 {
1607 struct thread *thread;
1608
1609 /* Get or allocate new thread to execute. */
1610 frr_with_mutex(&m->mtx) {
1611 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
1612
1613 /* Set its event value. */
1614 frr_with_mutex(&thread->mtx) {
1615 thread->add_type = THREAD_EXECUTE;
1616 thread->u.val = val;
1617 thread->ref = &thread;
1618 }
1619 }
1620
1621 /* Execute thread doing all accounting. */
1622 thread_call(thread);
1623
1624 /* Give back or free thread. */
1625 thread_add_unuse(m, thread);
1626 }