]> git.proxmox.com Git - mirror_frr.git/blob - lib/thread.c
Merge pull request #6240 from ton31337/fix/null_bnc_bgp_show_hostname
[mirror_frr.git] / lib / thread.c
1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
21 /* #define DEBUG */
22
23 #include <zebra.h>
24 #include <sys/resource.h>
25
26 #include "thread.h"
27 #include "memory.h"
28 #include "frrcu.h"
29 #include "log.h"
30 #include "hash.h"
31 #include "command.h"
32 #include "sigevent.h"
33 #include "network.h"
34 #include "jhash.h"
35 #include "frratomic.h"
36 #include "frr_pthread.h"
37 #include "lib_errors.h"
38
39 DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
40 DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
41 DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
42 DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
43
44 DECLARE_LIST(thread_list, struct thread, threaditem)
45
46 static int thread_timer_cmp(const struct thread *a, const struct thread *b)
47 {
48 if (a->u.sands.tv_sec < b->u.sands.tv_sec)
49 return -1;
50 if (a->u.sands.tv_sec > b->u.sands.tv_sec)
51 return 1;
52 if (a->u.sands.tv_usec < b->u.sands.tv_usec)
53 return -1;
54 if (a->u.sands.tv_usec > b->u.sands.tv_usec)
55 return 1;
56 return 0;
57 }
58
59 DECLARE_HEAP(thread_timer_list, struct thread, timeritem,
60 thread_timer_cmp)
61
62 #if defined(__APPLE__)
63 #include <mach/mach.h>
64 #include <mach/mach_time.h>
65 #endif
66
67 #define AWAKEN(m) \
68 do { \
69 const unsigned char wakebyte = 0x01; \
70 write(m->io_pipe[1], &wakebyte, 1); \
71 } while (0);
72
73 /* control variable for initializer */
74 static pthread_once_t init_once = PTHREAD_ONCE_INIT;
75 pthread_key_t thread_current;
76
77 static pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
78 static struct list *masters;
79
80 static void thread_free(struct thread_master *master, struct thread *thread);
81
82 /* CLI start ---------------------------------------------------------------- */
83 static unsigned int cpu_record_hash_key(const struct cpu_thread_history *a)
84 {
85 int size = sizeof(a->func);
86
87 return jhash(&a->func, size, 0);
88 }
89
90 static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
91 const struct cpu_thread_history *b)
92 {
93 return a->func == b->func;
94 }
95
96 static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
97 {
98 struct cpu_thread_history *new;
99 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
100 new->func = a->func;
101 new->funcname = a->funcname;
102 return new;
103 }
104
105 static void cpu_record_hash_free(void *a)
106 {
107 struct cpu_thread_history *hist = a;
108
109 XFREE(MTYPE_THREAD_STATS, hist);
110 }
111
112 #ifndef EXCLUDE_CPU_TIME
113 static void vty_out_cpu_thread_history(struct vty *vty,
114 struct cpu_thread_history *a)
115 {
116 vty_out(vty, "%5zu %10zu.%03zu %9zu %8zu %9zu %8zu %9zu",
117 (size_t)a->total_active, a->cpu.total / 1000,
118 a->cpu.total % 1000, (size_t)a->total_calls,
119 (size_t)(a->cpu.total / a->total_calls), a->cpu.max,
120 (size_t)(a->real.total / a->total_calls), a->real.max);
121 vty_out(vty, " %c%c%c%c%c %s\n",
122 a->types & (1 << THREAD_READ) ? 'R' : ' ',
123 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
124 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
125 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
126 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
127 }
128
129 static void cpu_record_hash_print(struct hash_bucket *bucket, void *args[])
130 {
131 struct cpu_thread_history *totals = args[0];
132 struct cpu_thread_history copy;
133 struct vty *vty = args[1];
134 uint8_t *filter = args[2];
135
136 struct cpu_thread_history *a = bucket->data;
137
138 copy.total_active =
139 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
140 copy.total_calls =
141 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
142 copy.cpu.total =
143 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
144 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
145 copy.real.total =
146 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
147 copy.real.max =
148 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
149 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
150 copy.funcname = a->funcname;
151
152 if (!(copy.types & *filter))
153 return;
154
155 vty_out_cpu_thread_history(vty, &copy);
156 totals->total_active += copy.total_active;
157 totals->total_calls += copy.total_calls;
158 totals->real.total += copy.real.total;
159 if (totals->real.max < copy.real.max)
160 totals->real.max = copy.real.max;
161 totals->cpu.total += copy.cpu.total;
162 if (totals->cpu.max < copy.cpu.max)
163 totals->cpu.max = copy.cpu.max;
164 }
165
166 static void cpu_record_print(struct vty *vty, uint8_t filter)
167 {
168 struct cpu_thread_history tmp;
169 void *args[3] = {&tmp, vty, &filter};
170 struct thread_master *m;
171 struct listnode *ln;
172
173 memset(&tmp, 0, sizeof(tmp));
174 tmp.funcname = "TOTAL";
175 tmp.types = filter;
176
177 frr_with_mutex(&masters_mtx) {
178 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
179 const char *name = m->name ? m->name : "main";
180
181 char underline[strlen(name) + 1];
182 memset(underline, '-', sizeof(underline));
183 underline[sizeof(underline) - 1] = '\0';
184
185 vty_out(vty, "\n");
186 vty_out(vty, "Showing statistics for pthread %s\n",
187 name);
188 vty_out(vty, "-------------------------------%s\n",
189 underline);
190 vty_out(vty, "%21s %18s %18s\n", "",
191 "CPU (user+system):", "Real (wall-clock):");
192 vty_out(vty,
193 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
194 vty_out(vty, " Avg uSec Max uSecs");
195 vty_out(vty, " Type Thread\n");
196
197 if (m->cpu_record->count)
198 hash_iterate(
199 m->cpu_record,
200 (void (*)(struct hash_bucket *,
201 void *))cpu_record_hash_print,
202 args);
203 else
204 vty_out(vty, "No data to display yet.\n");
205
206 vty_out(vty, "\n");
207 }
208 }
209
210 vty_out(vty, "\n");
211 vty_out(vty, "Total thread statistics\n");
212 vty_out(vty, "-------------------------\n");
213 vty_out(vty, "%21s %18s %18s\n", "",
214 "CPU (user+system):", "Real (wall-clock):");
215 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
216 vty_out(vty, " Avg uSec Max uSecs");
217 vty_out(vty, " Type Thread\n");
218
219 if (tmp.total_calls > 0)
220 vty_out_cpu_thread_history(vty, &tmp);
221 }
222 #endif
223
224 static void cpu_record_hash_clear(struct hash_bucket *bucket, void *args[])
225 {
226 uint8_t *filter = args[0];
227 struct hash *cpu_record = args[1];
228
229 struct cpu_thread_history *a = bucket->data;
230
231 if (!(a->types & *filter))
232 return;
233
234 hash_release(cpu_record, bucket->data);
235 }
236
237 static void cpu_record_clear(uint8_t filter)
238 {
239 uint8_t *tmp = &filter;
240 struct thread_master *m;
241 struct listnode *ln;
242
243 frr_with_mutex(&masters_mtx) {
244 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
245 frr_with_mutex(&m->mtx) {
246 void *args[2] = {tmp, m->cpu_record};
247 hash_iterate(
248 m->cpu_record,
249 (void (*)(struct hash_bucket *,
250 void *))cpu_record_hash_clear,
251 args);
252 }
253 }
254 }
255 }
256
257 static uint8_t parse_filter(const char *filterstr)
258 {
259 int i = 0;
260 int filter = 0;
261
262 while (filterstr[i] != '\0') {
263 switch (filterstr[i]) {
264 case 'r':
265 case 'R':
266 filter |= (1 << THREAD_READ);
267 break;
268 case 'w':
269 case 'W':
270 filter |= (1 << THREAD_WRITE);
271 break;
272 case 't':
273 case 'T':
274 filter |= (1 << THREAD_TIMER);
275 break;
276 case 'e':
277 case 'E':
278 filter |= (1 << THREAD_EVENT);
279 break;
280 case 'x':
281 case 'X':
282 filter |= (1 << THREAD_EXECUTE);
283 break;
284 default:
285 break;
286 }
287 ++i;
288 }
289 return filter;
290 }
291
292 #ifndef EXCLUDE_CPU_TIME
293 DEFUN (show_thread_cpu,
294 show_thread_cpu_cmd,
295 "show thread cpu [FILTER]",
296 SHOW_STR
297 "Thread information\n"
298 "Thread CPU usage\n"
299 "Display filter (rwtex)\n")
300 {
301 uint8_t filter = (uint8_t)-1U;
302 int idx = 0;
303
304 if (argv_find(argv, argc, "FILTER", &idx)) {
305 filter = parse_filter(argv[idx]->arg);
306 if (!filter) {
307 vty_out(vty,
308 "Invalid filter \"%s\" specified; must contain at least"
309 "one of 'RWTEXB'\n",
310 argv[idx]->arg);
311 return CMD_WARNING;
312 }
313 }
314
315 cpu_record_print(vty, filter);
316 return CMD_SUCCESS;
317 }
318 #endif
319
320 static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
321 {
322 const char *name = m->name ? m->name : "main";
323 char underline[strlen(name) + 1];
324 struct thread *thread;
325 uint32_t i;
326
327 memset(underline, '-', sizeof(underline));
328 underline[sizeof(underline) - 1] = '\0';
329
330 vty_out(vty, "\nShowing poll FD's for %s\n", name);
331 vty_out(vty, "----------------------%s\n", underline);
332 vty_out(vty, "Count: %u/%d\n", (uint32_t)m->handler.pfdcount,
333 m->fd_limit);
334 for (i = 0; i < m->handler.pfdcount; i++) {
335 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\t\t", i,
336 m->handler.pfds[i].fd, m->handler.pfds[i].events,
337 m->handler.pfds[i].revents);
338
339 if (m->handler.pfds[i].events & POLLIN) {
340 thread = m->read[m->handler.pfds[i].fd];
341
342 if (!thread)
343 vty_out(vty, "ERROR ");
344 else
345 vty_out(vty, "%s ", thread->funcname);
346 } else
347 vty_out(vty, " ");
348
349 if (m->handler.pfds[i].events & POLLOUT) {
350 thread = m->write[m->handler.pfds[i].fd];
351
352 if (!thread)
353 vty_out(vty, "ERROR\n");
354 else
355 vty_out(vty, "%s\n", thread->funcname);
356 } else
357 vty_out(vty, "\n");
358 }
359 }
360
361 DEFUN (show_thread_poll,
362 show_thread_poll_cmd,
363 "show thread poll",
364 SHOW_STR
365 "Thread information\n"
366 "Show poll FD's and information\n")
367 {
368 struct listnode *node;
369 struct thread_master *m;
370
371 frr_with_mutex(&masters_mtx) {
372 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
373 show_thread_poll_helper(vty, m);
374 }
375 }
376
377 return CMD_SUCCESS;
378 }
379
380
381 DEFUN (clear_thread_cpu,
382 clear_thread_cpu_cmd,
383 "clear thread cpu [FILTER]",
384 "Clear stored data in all pthreads\n"
385 "Thread information\n"
386 "Thread CPU usage\n"
387 "Display filter (rwtexb)\n")
388 {
389 uint8_t filter = (uint8_t)-1U;
390 int idx = 0;
391
392 if (argv_find(argv, argc, "FILTER", &idx)) {
393 filter = parse_filter(argv[idx]->arg);
394 if (!filter) {
395 vty_out(vty,
396 "Invalid filter \"%s\" specified; must contain at least"
397 "one of 'RWTEXB'\n",
398 argv[idx]->arg);
399 return CMD_WARNING;
400 }
401 }
402
403 cpu_record_clear(filter);
404 return CMD_SUCCESS;
405 }
406
407 void thread_cmd_init(void)
408 {
409 #ifndef EXCLUDE_CPU_TIME
410 install_element(VIEW_NODE, &show_thread_cpu_cmd);
411 #endif
412 install_element(VIEW_NODE, &show_thread_poll_cmd);
413 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
414 }
415 /* CLI end ------------------------------------------------------------------ */
416
417
418 static void cancelreq_del(void *cr)
419 {
420 XFREE(MTYPE_TMP, cr);
421 }
422
423 /* initializer, only ever called once */
424 static void initializer(void)
425 {
426 pthread_key_create(&thread_current, NULL);
427 }
428
429 struct thread_master *thread_master_create(const char *name)
430 {
431 struct thread_master *rv;
432 struct rlimit limit;
433
434 pthread_once(&init_once, &initializer);
435
436 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
437
438 /* Initialize master mutex */
439 pthread_mutex_init(&rv->mtx, NULL);
440 pthread_cond_init(&rv->cancel_cond, NULL);
441
442 /* Set name */
443 rv->name = name ? XSTRDUP(MTYPE_THREAD_MASTER, name) : NULL;
444
445 /* Initialize I/O task data structures */
446 getrlimit(RLIMIT_NOFILE, &limit);
447 rv->fd_limit = (int)limit.rlim_cur;
448 rv->read = XCALLOC(MTYPE_THREAD_POLL,
449 sizeof(struct thread *) * rv->fd_limit);
450
451 rv->write = XCALLOC(MTYPE_THREAD_POLL,
452 sizeof(struct thread *) * rv->fd_limit);
453
454 rv->cpu_record = hash_create_size(
455 8, (unsigned int (*)(const void *))cpu_record_hash_key,
456 (bool (*)(const void *, const void *))cpu_record_hash_cmp,
457 "Thread Hash");
458
459 thread_list_init(&rv->event);
460 thread_list_init(&rv->ready);
461 thread_list_init(&rv->unuse);
462 thread_timer_list_init(&rv->timer);
463
464 /* Initialize thread_fetch() settings */
465 rv->spin = true;
466 rv->handle_signals = true;
467
468 /* Set pthread owner, should be updated by actual owner */
469 rv->owner = pthread_self();
470 rv->cancel_req = list_new();
471 rv->cancel_req->del = cancelreq_del;
472 rv->canceled = true;
473
474 /* Initialize pipe poker */
475 pipe(rv->io_pipe);
476 set_nonblocking(rv->io_pipe[0]);
477 set_nonblocking(rv->io_pipe[1]);
478
479 /* Initialize data structures for poll() */
480 rv->handler.pfdsize = rv->fd_limit;
481 rv->handler.pfdcount = 0;
482 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
483 sizeof(struct pollfd) * rv->handler.pfdsize);
484 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
485 sizeof(struct pollfd) * rv->handler.pfdsize);
486
487 /* add to list of threadmasters */
488 frr_with_mutex(&masters_mtx) {
489 if (!masters)
490 masters = list_new();
491
492 listnode_add(masters, rv);
493 }
494
495 return rv;
496 }
497
498 void thread_master_set_name(struct thread_master *master, const char *name)
499 {
500 frr_with_mutex(&master->mtx) {
501 XFREE(MTYPE_THREAD_MASTER, master->name);
502 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
503 }
504 }
505
506 #define THREAD_UNUSED_DEPTH 10
507
508 /* Move thread to unuse list. */
509 static void thread_add_unuse(struct thread_master *m, struct thread *thread)
510 {
511 pthread_mutex_t mtxc = thread->mtx;
512
513 assert(m != NULL && thread != NULL);
514
515 thread->hist->total_active--;
516 memset(thread, 0, sizeof(struct thread));
517 thread->type = THREAD_UNUSED;
518
519 /* Restore the thread mutex context. */
520 thread->mtx = mtxc;
521
522 if (thread_list_count(&m->unuse) < THREAD_UNUSED_DEPTH) {
523 thread_list_add_tail(&m->unuse, thread);
524 return;
525 }
526
527 thread_free(m, thread);
528 }
529
530 /* Free all unused thread. */
531 static void thread_list_free(struct thread_master *m,
532 struct thread_list_head *list)
533 {
534 struct thread *t;
535
536 while ((t = thread_list_pop(list)))
537 thread_free(m, t);
538 }
539
540 static void thread_array_free(struct thread_master *m,
541 struct thread **thread_array)
542 {
543 struct thread *t;
544 int index;
545
546 for (index = 0; index < m->fd_limit; ++index) {
547 t = thread_array[index];
548 if (t) {
549 thread_array[index] = NULL;
550 thread_free(m, t);
551 }
552 }
553 XFREE(MTYPE_THREAD_POLL, thread_array);
554 }
555
556 /*
557 * thread_master_free_unused
558 *
559 * As threads are finished with they are put on the
560 * unuse list for later reuse.
561 * If we are shutting down, Free up unused threads
562 * So we can see if we forget to shut anything off
563 */
564 void thread_master_free_unused(struct thread_master *m)
565 {
566 frr_with_mutex(&m->mtx) {
567 struct thread *t;
568 while ((t = thread_list_pop(&m->unuse)))
569 thread_free(m, t);
570 }
571 }
572
573 /* Stop thread scheduler. */
574 void thread_master_free(struct thread_master *m)
575 {
576 struct thread *t;
577
578 frr_with_mutex(&masters_mtx) {
579 listnode_delete(masters, m);
580 if (masters->count == 0) {
581 list_delete(&masters);
582 }
583 }
584
585 thread_array_free(m, m->read);
586 thread_array_free(m, m->write);
587 while ((t = thread_timer_list_pop(&m->timer)))
588 thread_free(m, t);
589 thread_list_free(m, &m->event);
590 thread_list_free(m, &m->ready);
591 thread_list_free(m, &m->unuse);
592 pthread_mutex_destroy(&m->mtx);
593 pthread_cond_destroy(&m->cancel_cond);
594 close(m->io_pipe[0]);
595 close(m->io_pipe[1]);
596 list_delete(&m->cancel_req);
597 m->cancel_req = NULL;
598
599 hash_clean(m->cpu_record, cpu_record_hash_free);
600 hash_free(m->cpu_record);
601 m->cpu_record = NULL;
602
603 XFREE(MTYPE_THREAD_MASTER, m->name);
604 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
605 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
606 XFREE(MTYPE_THREAD_MASTER, m);
607 }
608
609 /* Return remain time in miliseconds. */
610 unsigned long thread_timer_remain_msec(struct thread *thread)
611 {
612 int64_t remain;
613
614 frr_with_mutex(&thread->mtx) {
615 remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
616 }
617
618 return remain < 0 ? 0 : remain;
619 }
620
621 /* Return remain time in seconds. */
622 unsigned long thread_timer_remain_second(struct thread *thread)
623 {
624 return thread_timer_remain_msec(thread) / 1000LL;
625 }
626
627 #define debugargdef const char *funcname, const char *schedfrom, int fromln
628 #define debugargpass funcname, schedfrom, fromln
629
630 struct timeval thread_timer_remain(struct thread *thread)
631 {
632 struct timeval remain;
633 frr_with_mutex(&thread->mtx) {
634 monotime_until(&thread->u.sands, &remain);
635 }
636 return remain;
637 }
638
639 /* Get new thread. */
640 static struct thread *thread_get(struct thread_master *m, uint8_t type,
641 int (*func)(struct thread *), void *arg,
642 debugargdef)
643 {
644 struct thread *thread = thread_list_pop(&m->unuse);
645 struct cpu_thread_history tmp;
646
647 if (!thread) {
648 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
649 /* mutex only needs to be initialized at struct creation. */
650 pthread_mutex_init(&thread->mtx, NULL);
651 m->alloc++;
652 }
653
654 thread->type = type;
655 thread->add_type = type;
656 thread->master = m;
657 thread->arg = arg;
658 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
659 thread->ref = NULL;
660
661 /*
662 * So if the passed in funcname is not what we have
663 * stored that means the thread->hist needs to be
664 * updated. We keep the last one around in unused
665 * under the assumption that we are probably
666 * going to immediately allocate the same
667 * type of thread.
668 * This hopefully saves us some serious
669 * hash_get lookups.
670 */
671 if (thread->funcname != funcname || thread->func != func) {
672 tmp.func = func;
673 tmp.funcname = funcname;
674 thread->hist =
675 hash_get(m->cpu_record, &tmp,
676 (void *(*)(void *))cpu_record_hash_alloc);
677 }
678 thread->hist->total_active++;
679 thread->func = func;
680 thread->funcname = funcname;
681 thread->schedfrom = schedfrom;
682 thread->schedfrom_line = fromln;
683
684 return thread;
685 }
686
687 static void thread_free(struct thread_master *master, struct thread *thread)
688 {
689 /* Update statistics. */
690 assert(master->alloc > 0);
691 master->alloc--;
692
693 /* Free allocated resources. */
694 pthread_mutex_destroy(&thread->mtx);
695 XFREE(MTYPE_THREAD, thread);
696 }
697
698 static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
699 nfds_t count, const struct timeval *timer_wait)
700 {
701 /* If timer_wait is null here, that means poll() should block
702 * indefinitely,
703 * unless the thread_master has overridden it by setting
704 * ->selectpoll_timeout.
705 * If the value is positive, it specifies the maximum number of
706 * milliseconds
707 * to wait. If the timeout is -1, it specifies that we should never wait
708 * and
709 * always return immediately even if no event is detected. If the value
710 * is
711 * zero, the behavior is default. */
712 int timeout = -1;
713
714 /* number of file descriptors with events */
715 int num;
716
717 if (timer_wait != NULL
718 && m->selectpoll_timeout == 0) // use the default value
719 timeout = (timer_wait->tv_sec * 1000)
720 + (timer_wait->tv_usec / 1000);
721 else if (m->selectpoll_timeout > 0) // use the user's timeout
722 timeout = m->selectpoll_timeout;
723 else if (m->selectpoll_timeout
724 < 0) // effect a poll (return immediately)
725 timeout = 0;
726
727 zlog_tls_buffer_flush();
728 rcu_read_unlock();
729 rcu_assert_read_unlocked();
730
731 /* add poll pipe poker */
732 assert(count + 1 < pfdsize);
733 pfds[count].fd = m->io_pipe[0];
734 pfds[count].events = POLLIN;
735 pfds[count].revents = 0x00;
736
737 num = poll(pfds, count + 1, timeout);
738
739 unsigned char trash[64];
740 if (num > 0 && pfds[count].revents != 0 && num--)
741 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
742 ;
743
744 rcu_read_lock();
745
746 return num;
747 }
748
749 /* Add new read thread. */
750 struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
751 int (*func)(struct thread *),
752 void *arg, int fd,
753 struct thread **t_ptr,
754 debugargdef)
755 {
756 struct thread *thread = NULL;
757 struct thread **thread_array;
758
759 assert(fd >= 0 && fd < m->fd_limit);
760 frr_with_mutex(&m->mtx) {
761 if (t_ptr && *t_ptr)
762 // thread is already scheduled; don't reschedule
763 break;
764
765 /* default to a new pollfd */
766 nfds_t queuepos = m->handler.pfdcount;
767
768 if (dir == THREAD_READ)
769 thread_array = m->read;
770 else
771 thread_array = m->write;
772
773 /* if we already have a pollfd for our file descriptor, find and
774 * use it */
775 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
776 if (m->handler.pfds[i].fd == fd) {
777 queuepos = i;
778
779 #ifdef DEV_BUILD
780 /*
781 * What happens if we have a thread already
782 * created for this event?
783 */
784 if (thread_array[fd])
785 assert(!"Thread already scheduled for file descriptor");
786 #endif
787 break;
788 }
789
790 /* make sure we have room for this fd + pipe poker fd */
791 assert(queuepos + 1 < m->handler.pfdsize);
792
793 thread = thread_get(m, dir, func, arg, debugargpass);
794
795 m->handler.pfds[queuepos].fd = fd;
796 m->handler.pfds[queuepos].events |=
797 (dir == THREAD_READ ? POLLIN : POLLOUT);
798
799 if (queuepos == m->handler.pfdcount)
800 m->handler.pfdcount++;
801
802 if (thread) {
803 frr_with_mutex(&thread->mtx) {
804 thread->u.fd = fd;
805 thread_array[thread->u.fd] = thread;
806 }
807
808 if (t_ptr) {
809 *t_ptr = thread;
810 thread->ref = t_ptr;
811 }
812 }
813
814 AWAKEN(m);
815 }
816
817 return thread;
818 }
819
820 static struct thread *
821 funcname_thread_add_timer_timeval(struct thread_master *m,
822 int (*func)(struct thread *), int type,
823 void *arg, struct timeval *time_relative,
824 struct thread **t_ptr, debugargdef)
825 {
826 struct thread *thread;
827
828 assert(m != NULL);
829
830 assert(type == THREAD_TIMER);
831 assert(time_relative);
832
833 frr_with_mutex(&m->mtx) {
834 if (t_ptr && *t_ptr)
835 // thread is already scheduled; don't reschedule
836 return NULL;
837
838 thread = thread_get(m, type, func, arg, debugargpass);
839
840 frr_with_mutex(&thread->mtx) {
841 monotime(&thread->u.sands);
842 timeradd(&thread->u.sands, time_relative,
843 &thread->u.sands);
844 thread_timer_list_add(&m->timer, thread);
845 if (t_ptr) {
846 *t_ptr = thread;
847 thread->ref = t_ptr;
848 }
849 }
850
851 AWAKEN(m);
852 }
853
854 return thread;
855 }
856
857
858 /* Add timer event thread. */
859 struct thread *funcname_thread_add_timer(struct thread_master *m,
860 int (*func)(struct thread *),
861 void *arg, long timer,
862 struct thread **t_ptr, debugargdef)
863 {
864 struct timeval trel;
865
866 assert(m != NULL);
867
868 trel.tv_sec = timer;
869 trel.tv_usec = 0;
870
871 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
872 &trel, t_ptr, debugargpass);
873 }
874
875 /* Add timer event thread with "millisecond" resolution */
876 struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
877 int (*func)(struct thread *),
878 void *arg, long timer,
879 struct thread **t_ptr,
880 debugargdef)
881 {
882 struct timeval trel;
883
884 assert(m != NULL);
885
886 trel.tv_sec = timer / 1000;
887 trel.tv_usec = 1000 * (timer % 1000);
888
889 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
890 &trel, t_ptr, debugargpass);
891 }
892
893 /* Add timer event thread with "millisecond" resolution */
894 struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
895 int (*func)(struct thread *),
896 void *arg, struct timeval *tv,
897 struct thread **t_ptr, debugargdef)
898 {
899 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
900 t_ptr, debugargpass);
901 }
902
903 /* Add simple event thread. */
904 struct thread *funcname_thread_add_event(struct thread_master *m,
905 int (*func)(struct thread *),
906 void *arg, int val,
907 struct thread **t_ptr, debugargdef)
908 {
909 struct thread *thread = NULL;
910
911 assert(m != NULL);
912
913 frr_with_mutex(&m->mtx) {
914 if (t_ptr && *t_ptr)
915 // thread is already scheduled; don't reschedule
916 break;
917
918 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
919 frr_with_mutex(&thread->mtx) {
920 thread->u.val = val;
921 thread_list_add_tail(&m->event, thread);
922 }
923
924 if (t_ptr) {
925 *t_ptr = thread;
926 thread->ref = t_ptr;
927 }
928
929 AWAKEN(m);
930 }
931
932 return thread;
933 }
934
935 /* Thread cancellation ------------------------------------------------------ */
936
937 /**
938 * NOT's out the .events field of pollfd corresponding to the given file
939 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
940 *
941 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
942 * implementation for details.
943 *
944 * @param master
945 * @param fd
946 * @param state the event to cancel. One or more (OR'd together) of the
947 * following:
948 * - POLLIN
949 * - POLLOUT
950 */
951 static void thread_cancel_rw(struct thread_master *master, int fd, short state)
952 {
953 bool found = false;
954
955 /* Cancel POLLHUP too just in case some bozo set it */
956 state |= POLLHUP;
957
958 /* find the index of corresponding pollfd */
959 nfds_t i;
960
961 for (i = 0; i < master->handler.pfdcount; i++)
962 if (master->handler.pfds[i].fd == fd) {
963 found = true;
964 break;
965 }
966
967 if (!found) {
968 zlog_debug(
969 "[!] Received cancellation request for nonexistent rw job");
970 zlog_debug("[!] threadmaster: %s | fd: %d",
971 master->name ? master->name : "", fd);
972 return;
973 }
974
975 /* NOT out event. */
976 master->handler.pfds[i].events &= ~(state);
977
978 /* If all events are canceled, delete / resize the pollfd array. */
979 if (master->handler.pfds[i].events == 0) {
980 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
981 (master->handler.pfdcount - i - 1)
982 * sizeof(struct pollfd));
983 master->handler.pfdcount--;
984 master->handler.pfds[master->handler.pfdcount].fd = 0;
985 master->handler.pfds[master->handler.pfdcount].events = 0;
986 }
987
988 /* If we have the same pollfd in the copy, perform the same operations,
989 * otherwise return. */
990 if (i >= master->handler.copycount)
991 return;
992
993 master->handler.copy[i].events &= ~(state);
994
995 if (master->handler.copy[i].events == 0) {
996 memmove(master->handler.copy + i, master->handler.copy + i + 1,
997 (master->handler.copycount - i - 1)
998 * sizeof(struct pollfd));
999 master->handler.copycount--;
1000 master->handler.copy[master->handler.copycount].fd = 0;
1001 master->handler.copy[master->handler.copycount].events = 0;
1002 }
1003 }
1004
1005 /**
1006 * Process cancellation requests.
1007 *
1008 * This may only be run from the pthread which owns the thread_master.
1009 *
1010 * @param master the thread master to process
1011 * @REQUIRE master->mtx
1012 */
1013 static void do_thread_cancel(struct thread_master *master)
1014 {
1015 struct thread_list_head *list = NULL;
1016 struct thread **thread_array = NULL;
1017 struct thread *thread;
1018
1019 struct cancel_req *cr;
1020 struct listnode *ln;
1021 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1022 /* If this is an event object cancellation, linear search
1023 * through event
1024 * list deleting any events which have the specified argument.
1025 * We also
1026 * need to check every thread in the ready queue. */
1027 if (cr->eventobj) {
1028 struct thread *t;
1029
1030 frr_each_safe(thread_list, &master->event, t) {
1031 if (t->arg != cr->eventobj)
1032 continue;
1033 thread_list_del(&master->event, t);
1034 if (t->ref)
1035 *t->ref = NULL;
1036 thread_add_unuse(master, t);
1037 }
1038
1039 frr_each_safe(thread_list, &master->ready, t) {
1040 if (t->arg != cr->eventobj)
1041 continue;
1042 thread_list_del(&master->ready, t);
1043 if (t->ref)
1044 *t->ref = NULL;
1045 thread_add_unuse(master, t);
1046 }
1047 continue;
1048 }
1049
1050 /* The pointer varies depending on whether the cancellation
1051 * request was
1052 * made asynchronously or not. If it was, we need to check
1053 * whether the
1054 * thread even exists anymore before cancelling it. */
1055 thread = (cr->thread) ? cr->thread : *cr->threadref;
1056
1057 if (!thread)
1058 continue;
1059
1060 /* Determine the appropriate queue to cancel the thread from */
1061 switch (thread->type) {
1062 case THREAD_READ:
1063 thread_cancel_rw(master, thread->u.fd, POLLIN);
1064 thread_array = master->read;
1065 break;
1066 case THREAD_WRITE:
1067 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1068 thread_array = master->write;
1069 break;
1070 case THREAD_TIMER:
1071 thread_timer_list_del(&master->timer, thread);
1072 break;
1073 case THREAD_EVENT:
1074 list = &master->event;
1075 break;
1076 case THREAD_READY:
1077 list = &master->ready;
1078 break;
1079 default:
1080 continue;
1081 break;
1082 }
1083
1084 if (list) {
1085 thread_list_del(list, thread);
1086 } else if (thread_array) {
1087 thread_array[thread->u.fd] = NULL;
1088 }
1089
1090 if (thread->ref)
1091 *thread->ref = NULL;
1092
1093 thread_add_unuse(thread->master, thread);
1094 }
1095
1096 /* Delete and free all cancellation requests */
1097 list_delete_all_node(master->cancel_req);
1098
1099 /* Wake up any threads which may be blocked in thread_cancel_async() */
1100 master->canceled = true;
1101 pthread_cond_broadcast(&master->cancel_cond);
1102 }
1103
1104 /**
1105 * Cancel any events which have the specified argument.
1106 *
1107 * MT-Unsafe
1108 *
1109 * @param m the thread_master to cancel from
1110 * @param arg the argument passed when creating the event
1111 */
1112 void thread_cancel_event(struct thread_master *master, void *arg)
1113 {
1114 assert(master->owner == pthread_self());
1115
1116 frr_with_mutex(&master->mtx) {
1117 struct cancel_req *cr =
1118 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1119 cr->eventobj = arg;
1120 listnode_add(master->cancel_req, cr);
1121 do_thread_cancel(master);
1122 }
1123 }
1124
1125 /**
1126 * Cancel a specific task.
1127 *
1128 * MT-Unsafe
1129 *
1130 * @param thread task to cancel
1131 */
1132 void thread_cancel(struct thread *thread)
1133 {
1134 struct thread_master *master = thread->master;
1135
1136 assert(master->owner == pthread_self());
1137
1138 frr_with_mutex(&master->mtx) {
1139 struct cancel_req *cr =
1140 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1141 cr->thread = thread;
1142 listnode_add(master->cancel_req, cr);
1143 do_thread_cancel(master);
1144 }
1145 }
1146
1147 /**
1148 * Asynchronous cancellation.
1149 *
1150 * Called with either a struct thread ** or void * to an event argument,
1151 * this function posts the correct cancellation request and blocks until it is
1152 * serviced.
1153 *
1154 * If the thread is currently running, execution blocks until it completes.
1155 *
1156 * The last two parameters are mutually exclusive, i.e. if you pass one the
1157 * other must be NULL.
1158 *
1159 * When the cancellation procedure executes on the target thread_master, the
1160 * thread * provided is checked for nullity. If it is null, the thread is
1161 * assumed to no longer exist and the cancellation request is a no-op. Thus
1162 * users of this API must pass a back-reference when scheduling the original
1163 * task.
1164 *
1165 * MT-Safe
1166 *
1167 * @param master the thread master with the relevant event / task
1168 * @param thread pointer to thread to cancel
1169 * @param eventobj the event
1170 */
1171 void thread_cancel_async(struct thread_master *master, struct thread **thread,
1172 void *eventobj)
1173 {
1174 assert(!(thread && eventobj) && (thread || eventobj));
1175 assert(master->owner != pthread_self());
1176
1177 frr_with_mutex(&master->mtx) {
1178 master->canceled = false;
1179
1180 if (thread) {
1181 struct cancel_req *cr =
1182 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1183 cr->threadref = thread;
1184 listnode_add(master->cancel_req, cr);
1185 } else if (eventobj) {
1186 struct cancel_req *cr =
1187 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1188 cr->eventobj = eventobj;
1189 listnode_add(master->cancel_req, cr);
1190 }
1191 AWAKEN(master);
1192
1193 while (!master->canceled)
1194 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1195 }
1196 }
1197 /* ------------------------------------------------------------------------- */
1198
1199 static struct timeval *thread_timer_wait(struct thread_timer_list_head *timers,
1200 struct timeval *timer_val)
1201 {
1202 if (!thread_timer_list_count(timers))
1203 return NULL;
1204
1205 struct thread *next_timer = thread_timer_list_first(timers);
1206 monotime_until(&next_timer->u.sands, timer_val);
1207 return timer_val;
1208 }
1209
1210 static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1211 struct thread *fetch)
1212 {
1213 *fetch = *thread;
1214 thread_add_unuse(m, thread);
1215 return fetch;
1216 }
1217
1218 static int thread_process_io_helper(struct thread_master *m,
1219 struct thread *thread, short state,
1220 short actual_state, int pos)
1221 {
1222 struct thread **thread_array;
1223
1224 /*
1225 * poll() clears the .events field, but the pollfd array we
1226 * pass to poll() is a copy of the one used to schedule threads.
1227 * We need to synchronize state between the two here by applying
1228 * the same changes poll() made on the copy of the "real" pollfd
1229 * array.
1230 *
1231 * This cleans up a possible infinite loop where we refuse
1232 * to respond to a poll event but poll is insistent that
1233 * we should.
1234 */
1235 m->handler.pfds[pos].events &= ~(state);
1236
1237 if (!thread) {
1238 if ((actual_state & (POLLHUP|POLLIN)) != POLLHUP)
1239 flog_err(EC_LIB_NO_THREAD,
1240 "Attempting to process an I/O event but for fd: %d(%d) no thread to handle this!\n",
1241 m->handler.pfds[pos].fd, actual_state);
1242 return 0;
1243 }
1244
1245 if (thread->type == THREAD_READ)
1246 thread_array = m->read;
1247 else
1248 thread_array = m->write;
1249
1250 thread_array[thread->u.fd] = NULL;
1251 thread_list_add_tail(&m->ready, thread);
1252 thread->type = THREAD_READY;
1253
1254 return 1;
1255 }
1256
1257 /**
1258 * Process I/O events.
1259 *
1260 * Walks through file descriptor array looking for those pollfds whose .revents
1261 * field has something interesting. Deletes any invalid file descriptors.
1262 *
1263 * @param m the thread master
1264 * @param num the number of active file descriptors (return value of poll())
1265 */
1266 static void thread_process_io(struct thread_master *m, unsigned int num)
1267 {
1268 unsigned int ready = 0;
1269 struct pollfd *pfds = m->handler.copy;
1270
1271 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1272 /* no event for current fd? immediately continue */
1273 if (pfds[i].revents == 0)
1274 continue;
1275
1276 ready++;
1277
1278 /* Unless someone has called thread_cancel from another pthread,
1279 * the only
1280 * thing that could have changed in m->handler.pfds while we
1281 * were
1282 * asleep is the .events field in a given pollfd. Barring
1283 * thread_cancel()
1284 * that value should be a superset of the values we have in our
1285 * copy, so
1286 * there's no need to update it. Similarily, barring deletion,
1287 * the fd
1288 * should still be a valid index into the master's pfds. */
1289 if (pfds[i].revents & (POLLIN | POLLHUP)) {
1290 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
1291 pfds[i].revents, i);
1292 }
1293 if (pfds[i].revents & POLLOUT)
1294 thread_process_io_helper(m, m->write[pfds[i].fd],
1295 POLLOUT, pfds[i].revents, i);
1296
1297 /* if one of our file descriptors is garbage, remove the same
1298 * from
1299 * both pfds + update sizes and index */
1300 if (pfds[i].revents & POLLNVAL) {
1301 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1302 (m->handler.pfdcount - i - 1)
1303 * sizeof(struct pollfd));
1304 m->handler.pfdcount--;
1305 m->handler.pfds[m->handler.pfdcount].fd = 0;
1306 m->handler.pfds[m->handler.pfdcount].events = 0;
1307
1308 memmove(pfds + i, pfds + i + 1,
1309 (m->handler.copycount - i - 1)
1310 * sizeof(struct pollfd));
1311 m->handler.copycount--;
1312 m->handler.copy[m->handler.copycount].fd = 0;
1313 m->handler.copy[m->handler.copycount].events = 0;
1314
1315 i--;
1316 }
1317 }
1318 }
1319
1320 /* Add all timers that have popped to the ready list. */
1321 static unsigned int thread_process_timers(struct thread_timer_list_head *timers,
1322 struct timeval *timenow)
1323 {
1324 struct thread *thread;
1325 unsigned int ready = 0;
1326
1327 while ((thread = thread_timer_list_first(timers))) {
1328 if (timercmp(timenow, &thread->u.sands, <))
1329 return ready;
1330 thread_timer_list_pop(timers);
1331 thread->type = THREAD_READY;
1332 thread_list_add_tail(&thread->master->ready, thread);
1333 ready++;
1334 }
1335 return ready;
1336 }
1337
1338 /* process a list en masse, e.g. for event thread lists */
1339 static unsigned int thread_process(struct thread_list_head *list)
1340 {
1341 struct thread *thread;
1342 unsigned int ready = 0;
1343
1344 while ((thread = thread_list_pop(list))) {
1345 thread->type = THREAD_READY;
1346 thread_list_add_tail(&thread->master->ready, thread);
1347 ready++;
1348 }
1349 return ready;
1350 }
1351
1352
1353 /* Fetch next ready thread. */
1354 struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
1355 {
1356 struct thread *thread = NULL;
1357 struct timeval now;
1358 struct timeval zerotime = {0, 0};
1359 struct timeval tv;
1360 struct timeval *tw = NULL;
1361
1362 int num = 0;
1363
1364 do {
1365 /* Handle signals if any */
1366 if (m->handle_signals)
1367 quagga_sigevent_process();
1368
1369 pthread_mutex_lock(&m->mtx);
1370
1371 /* Process any pending cancellation requests */
1372 do_thread_cancel(m);
1373
1374 /*
1375 * Attempt to flush ready queue before going into poll().
1376 * This is performance-critical. Think twice before modifying.
1377 */
1378 if ((thread = thread_list_pop(&m->ready))) {
1379 fetch = thread_run(m, thread, fetch);
1380 if (fetch->ref)
1381 *fetch->ref = NULL;
1382 pthread_mutex_unlock(&m->mtx);
1383 break;
1384 }
1385
1386 /* otherwise, tick through scheduling sequence */
1387
1388 /*
1389 * Post events to ready queue. This must come before the
1390 * following block since events should occur immediately
1391 */
1392 thread_process(&m->event);
1393
1394 /*
1395 * If there are no tasks on the ready queue, we will poll()
1396 * until a timer expires or we receive I/O, whichever comes
1397 * first. The strategy for doing this is:
1398 *
1399 * - If there are events pending, set the poll() timeout to zero
1400 * - If there are no events pending, but there are timers
1401 * pending, set the
1402 * timeout to the smallest remaining time on any timer
1403 * - If there are neither timers nor events pending, but there
1404 * are file
1405 * descriptors pending, block indefinitely in poll()
1406 * - If nothing is pending, it's time for the application to die
1407 *
1408 * In every case except the last, we need to hit poll() at least
1409 * once per loop to avoid starvation by events
1410 */
1411 if (!thread_list_count(&m->ready))
1412 tw = thread_timer_wait(&m->timer, &tv);
1413
1414 if (thread_list_count(&m->ready) ||
1415 (tw && !timercmp(tw, &zerotime, >)))
1416 tw = &zerotime;
1417
1418 if (!tw && m->handler.pfdcount == 0) { /* die */
1419 pthread_mutex_unlock(&m->mtx);
1420 fetch = NULL;
1421 break;
1422 }
1423
1424 /*
1425 * Copy pollfd array + # active pollfds in it. Not necessary to
1426 * copy the array size as this is fixed.
1427 */
1428 m->handler.copycount = m->handler.pfdcount;
1429 memcpy(m->handler.copy, m->handler.pfds,
1430 m->handler.copycount * sizeof(struct pollfd));
1431
1432 pthread_mutex_unlock(&m->mtx);
1433 {
1434 num = fd_poll(m, m->handler.copy, m->handler.pfdsize,
1435 m->handler.copycount, tw);
1436 }
1437 pthread_mutex_lock(&m->mtx);
1438
1439 /* Handle any errors received in poll() */
1440 if (num < 0) {
1441 if (errno == EINTR) {
1442 pthread_mutex_unlock(&m->mtx);
1443 /* loop around to signal handler */
1444 continue;
1445 }
1446
1447 /* else die */
1448 flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
1449 safe_strerror(errno));
1450 pthread_mutex_unlock(&m->mtx);
1451 fetch = NULL;
1452 break;
1453 }
1454
1455 /* Post timers to ready queue. */
1456 monotime(&now);
1457 thread_process_timers(&m->timer, &now);
1458
1459 /* Post I/O to ready queue. */
1460 if (num > 0)
1461 thread_process_io(m, num);
1462
1463 pthread_mutex_unlock(&m->mtx);
1464
1465 } while (!thread && m->spin);
1466
1467 return fetch;
1468 }
1469
1470 static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
1471 {
1472 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1473 + (a.tv_usec - b.tv_usec));
1474 }
1475
1476 unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1477 unsigned long *cputime)
1478 {
1479 /* This is 'user + sys' time. */
1480 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1481 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1482 return timeval_elapsed(now->real, start->real);
1483 }
1484
1485 /* We should aim to yield after yield milliseconds, which defaults
1486 to THREAD_YIELD_TIME_SLOT .
1487 Note: we are using real (wall clock) time for this calculation.
1488 It could be argued that CPU time may make more sense in certain
1489 contexts. The things to consider are whether the thread may have
1490 blocked (in which case wall time increases, but CPU time does not),
1491 or whether the system is heavily loaded with other processes competing
1492 for CPU time. On balance, wall clock time seems to make sense.
1493 Plus it has the added benefit that gettimeofday should be faster
1494 than calling getrusage. */
1495 int thread_should_yield(struct thread *thread)
1496 {
1497 int result;
1498 frr_with_mutex(&thread->mtx) {
1499 result = monotime_since(&thread->real, NULL)
1500 > (int64_t)thread->yield;
1501 }
1502 return result;
1503 }
1504
1505 void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
1506 {
1507 frr_with_mutex(&thread->mtx) {
1508 thread->yield = yield_time;
1509 }
1510 }
1511
1512 void thread_getrusage(RUSAGE_T *r)
1513 {
1514 #if defined RUSAGE_THREAD
1515 #define FRR_RUSAGE RUSAGE_THREAD
1516 #else
1517 #define FRR_RUSAGE RUSAGE_SELF
1518 #endif
1519 monotime(&r->real);
1520 #ifndef EXCLUDE_CPU_TIME
1521 getrusage(FRR_RUSAGE, &(r->cpu));
1522 #endif
1523 }
1524
1525 /*
1526 * Call a thread.
1527 *
1528 * This function will atomically update the thread's usage history. At present
1529 * this is the only spot where usage history is written. Nevertheless the code
1530 * has been written such that the introduction of writers in the future should
1531 * not need to update it provided the writers atomically perform only the
1532 * operations done here, i.e. updating the total and maximum times. In
1533 * particular, the maximum real and cpu times must be monotonically increasing
1534 * or this code is not correct.
1535 */
1536 void thread_call(struct thread *thread)
1537 {
1538 #ifndef EXCLUDE_CPU_TIME
1539 _Atomic unsigned long realtime, cputime;
1540 unsigned long exp;
1541 unsigned long helper;
1542 #endif
1543 RUSAGE_T before, after;
1544
1545 GETRUSAGE(&before);
1546 thread->real = before.real;
1547
1548 pthread_setspecific(thread_current, thread);
1549 (*thread->func)(thread);
1550 pthread_setspecific(thread_current, NULL);
1551
1552 GETRUSAGE(&after);
1553
1554 #ifndef EXCLUDE_CPU_TIME
1555 realtime = thread_consumed_time(&after, &before, &helper);
1556 cputime = helper;
1557
1558 /* update realtime */
1559 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1560 memory_order_seq_cst);
1561 exp = atomic_load_explicit(&thread->hist->real.max,
1562 memory_order_seq_cst);
1563 while (exp < realtime
1564 && !atomic_compare_exchange_weak_explicit(
1565 &thread->hist->real.max, &exp, realtime,
1566 memory_order_seq_cst, memory_order_seq_cst))
1567 ;
1568
1569 /* update cputime */
1570 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1571 memory_order_seq_cst);
1572 exp = atomic_load_explicit(&thread->hist->cpu.max,
1573 memory_order_seq_cst);
1574 while (exp < cputime
1575 && !atomic_compare_exchange_weak_explicit(
1576 &thread->hist->cpu.max, &exp, cputime,
1577 memory_order_seq_cst, memory_order_seq_cst))
1578 ;
1579
1580 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1581 memory_order_seq_cst);
1582 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1583 memory_order_seq_cst);
1584
1585 #ifdef CONSUMED_TIME_CHECK
1586 if (realtime > CONSUMED_TIME_CHECK) {
1587 /*
1588 * We have a CPU Hog on our hands.
1589 * Whinge about it now, so we're aware this is yet another task
1590 * to fix.
1591 */
1592 flog_warn(
1593 EC_LIB_SLOW_THREAD,
1594 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1595 thread->funcname, (unsigned long)thread->func,
1596 realtime / 1000, cputime / 1000);
1597 }
1598 #endif /* CONSUMED_TIME_CHECK */
1599 #endif /* Exclude CPU Time */
1600 }
1601
1602 /* Execute thread */
1603 void funcname_thread_execute(struct thread_master *m,
1604 int (*func)(struct thread *), void *arg, int val,
1605 debugargdef)
1606 {
1607 struct thread *thread;
1608
1609 /* Get or allocate new thread to execute. */
1610 frr_with_mutex(&m->mtx) {
1611 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
1612
1613 /* Set its event value. */
1614 frr_with_mutex(&thread->mtx) {
1615 thread->add_type = THREAD_EXECUTE;
1616 thread->u.val = val;
1617 thread->ref = &thread;
1618 }
1619 }
1620
1621 /* Execute thread doing all accounting. */
1622 thread_call(thread);
1623
1624 /* Give back or free thread. */
1625 thread_add_unuse(m, thread);
1626 }