]> git.proxmox.com Git - mirror_frr.git/blob - lib/thread.c
Merge pull request #5155 from GalaxyGorilla/libyang_debug_logging
[mirror_frr.git] / lib / thread.c
1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
21 /* #define DEBUG */
22
23 #include <zebra.h>
24 #include <sys/resource.h>
25
26 #include "thread.h"
27 #include "memory.h"
28 #include "frrcu.h"
29 #include "log.h"
30 #include "hash.h"
31 #include "command.h"
32 #include "sigevent.h"
33 #include "network.h"
34 #include "jhash.h"
35 #include "frratomic.h"
36 #include "frr_pthread.h"
37 #include "lib_errors.h"
38
39 DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
40 DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
41 DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
42 DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
43
44 DECLARE_LIST(thread_list, struct thread, threaditem)
45
46 static int thread_timer_cmp(const struct thread *a, const struct thread *b)
47 {
48 if (a->u.sands.tv_sec < b->u.sands.tv_sec)
49 return -1;
50 if (a->u.sands.tv_sec > b->u.sands.tv_sec)
51 return 1;
52 if (a->u.sands.tv_usec < b->u.sands.tv_usec)
53 return -1;
54 if (a->u.sands.tv_usec > b->u.sands.tv_usec)
55 return 1;
56 return 0;
57 }
58
59 DECLARE_HEAP(thread_timer_list, struct thread, timeritem,
60 thread_timer_cmp)
61
62 #if defined(__APPLE__)
63 #include <mach/mach.h>
64 #include <mach/mach_time.h>
65 #endif
66
67 #define AWAKEN(m) \
68 do { \
69 static unsigned char wakebyte = 0x01; \
70 write(m->io_pipe[1], &wakebyte, 1); \
71 } while (0);
72
73 /* control variable for initializer */
74 static pthread_once_t init_once = PTHREAD_ONCE_INIT;
75 pthread_key_t thread_current;
76
77 static pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
78 static struct list *masters;
79
80 static void thread_free(struct thread_master *master, struct thread *thread);
81
82 /* CLI start ---------------------------------------------------------------- */
83 static unsigned int cpu_record_hash_key(const struct cpu_thread_history *a)
84 {
85 int size = sizeof(a->func);
86
87 return jhash(&a->func, size, 0);
88 }
89
90 static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
91 const struct cpu_thread_history *b)
92 {
93 return a->func == b->func;
94 }
95
96 static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
97 {
98 struct cpu_thread_history *new;
99 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
100 new->func = a->func;
101 new->funcname = a->funcname;
102 return new;
103 }
104
105 static void cpu_record_hash_free(void *a)
106 {
107 struct cpu_thread_history *hist = a;
108
109 XFREE(MTYPE_THREAD_STATS, hist);
110 }
111
112 static void vty_out_cpu_thread_history(struct vty *vty,
113 struct cpu_thread_history *a)
114 {
115 vty_out(vty, "%5zu %10zu.%03zu %9zu %8zu %9zu %8zu %9zu",
116 (size_t)a->total_active,
117 a->cpu.total / 1000, a->cpu.total % 1000,
118 (size_t)a->total_calls,
119 a->cpu.total / a->total_calls, a->cpu.max,
120 a->real.total / a->total_calls, a->real.max);
121 vty_out(vty, " %c%c%c%c%c %s\n",
122 a->types & (1 << THREAD_READ) ? 'R' : ' ',
123 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
124 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
125 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
126 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
127 }
128
129 static void cpu_record_hash_print(struct hash_bucket *bucket, void *args[])
130 {
131 struct cpu_thread_history *totals = args[0];
132 struct cpu_thread_history copy;
133 struct vty *vty = args[1];
134 uint8_t *filter = args[2];
135
136 struct cpu_thread_history *a = bucket->data;
137
138 copy.total_active =
139 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
140 copy.total_calls =
141 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
142 copy.cpu.total =
143 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
144 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
145 copy.real.total =
146 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
147 copy.real.max =
148 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
149 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
150 copy.funcname = a->funcname;
151
152 if (!(copy.types & *filter))
153 return;
154
155 vty_out_cpu_thread_history(vty, &copy);
156 totals->total_active += copy.total_active;
157 totals->total_calls += copy.total_calls;
158 totals->real.total += copy.real.total;
159 if (totals->real.max < copy.real.max)
160 totals->real.max = copy.real.max;
161 totals->cpu.total += copy.cpu.total;
162 if (totals->cpu.max < copy.cpu.max)
163 totals->cpu.max = copy.cpu.max;
164 }
165
166 static void cpu_record_print(struct vty *vty, uint8_t filter)
167 {
168 struct cpu_thread_history tmp;
169 void *args[3] = {&tmp, vty, &filter};
170 struct thread_master *m;
171 struct listnode *ln;
172
173 memset(&tmp, 0, sizeof tmp);
174 tmp.funcname = "TOTAL";
175 tmp.types = filter;
176
177 frr_with_mutex(&masters_mtx) {
178 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
179 const char *name = m->name ? m->name : "main";
180
181 char underline[strlen(name) + 1];
182 memset(underline, '-', sizeof(underline));
183 underline[sizeof(underline) - 1] = '\0';
184
185 vty_out(vty, "\n");
186 vty_out(vty, "Showing statistics for pthread %s\n",
187 name);
188 vty_out(vty, "-------------------------------%s\n",
189 underline);
190 vty_out(vty, "%21s %18s %18s\n", "",
191 "CPU (user+system):", "Real (wall-clock):");
192 vty_out(vty,
193 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
194 vty_out(vty, " Avg uSec Max uSecs");
195 vty_out(vty, " Type Thread\n");
196
197 if (m->cpu_record->count)
198 hash_iterate(
199 m->cpu_record,
200 (void (*)(struct hash_bucket *,
201 void *))cpu_record_hash_print,
202 args);
203 else
204 vty_out(vty, "No data to display yet.\n");
205
206 vty_out(vty, "\n");
207 }
208 }
209
210 vty_out(vty, "\n");
211 vty_out(vty, "Total thread statistics\n");
212 vty_out(vty, "-------------------------\n");
213 vty_out(vty, "%21s %18s %18s\n", "",
214 "CPU (user+system):", "Real (wall-clock):");
215 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
216 vty_out(vty, " Avg uSec Max uSecs");
217 vty_out(vty, " Type Thread\n");
218
219 if (tmp.total_calls > 0)
220 vty_out_cpu_thread_history(vty, &tmp);
221 }
222
223 static void cpu_record_hash_clear(struct hash_bucket *bucket, void *args[])
224 {
225 uint8_t *filter = args[0];
226 struct hash *cpu_record = args[1];
227
228 struct cpu_thread_history *a = bucket->data;
229
230 if (!(a->types & *filter))
231 return;
232
233 hash_release(cpu_record, bucket->data);
234 }
235
236 static void cpu_record_clear(uint8_t filter)
237 {
238 uint8_t *tmp = &filter;
239 struct thread_master *m;
240 struct listnode *ln;
241
242 frr_with_mutex(&masters_mtx) {
243 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
244 frr_with_mutex(&m->mtx) {
245 void *args[2] = {tmp, m->cpu_record};
246 hash_iterate(
247 m->cpu_record,
248 (void (*)(struct hash_bucket *,
249 void *))cpu_record_hash_clear,
250 args);
251 }
252 }
253 }
254 }
255
256 static uint8_t parse_filter(const char *filterstr)
257 {
258 int i = 0;
259 int filter = 0;
260
261 while (filterstr[i] != '\0') {
262 switch (filterstr[i]) {
263 case 'r':
264 case 'R':
265 filter |= (1 << THREAD_READ);
266 break;
267 case 'w':
268 case 'W':
269 filter |= (1 << THREAD_WRITE);
270 break;
271 case 't':
272 case 'T':
273 filter |= (1 << THREAD_TIMER);
274 break;
275 case 'e':
276 case 'E':
277 filter |= (1 << THREAD_EVENT);
278 break;
279 case 'x':
280 case 'X':
281 filter |= (1 << THREAD_EXECUTE);
282 break;
283 default:
284 break;
285 }
286 ++i;
287 }
288 return filter;
289 }
290
291 DEFUN (show_thread_cpu,
292 show_thread_cpu_cmd,
293 "show thread cpu [FILTER]",
294 SHOW_STR
295 "Thread information\n"
296 "Thread CPU usage\n"
297 "Display filter (rwtex)\n")
298 {
299 uint8_t filter = (uint8_t)-1U;
300 int idx = 0;
301
302 if (argv_find(argv, argc, "FILTER", &idx)) {
303 filter = parse_filter(argv[idx]->arg);
304 if (!filter) {
305 vty_out(vty,
306 "Invalid filter \"%s\" specified; must contain at least"
307 "one of 'RWTEXB'\n",
308 argv[idx]->arg);
309 return CMD_WARNING;
310 }
311 }
312
313 cpu_record_print(vty, filter);
314 return CMD_SUCCESS;
315 }
316
317 static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
318 {
319 const char *name = m->name ? m->name : "main";
320 char underline[strlen(name) + 1];
321 struct thread *thread;
322 uint32_t i;
323
324 memset(underline, '-', sizeof(underline));
325 underline[sizeof(underline) - 1] = '\0';
326
327 vty_out(vty, "\nShowing poll FD's for %s\n", name);
328 vty_out(vty, "----------------------%s\n", underline);
329 vty_out(vty, "Count: %u/%d\n", (uint32_t)m->handler.pfdcount,
330 m->fd_limit);
331 for (i = 0; i < m->handler.pfdcount; i++) {
332 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\t\t", i,
333 m->handler.pfds[i].fd, m->handler.pfds[i].events,
334 m->handler.pfds[i].revents);
335
336 if (m->handler.pfds[i].events & POLLIN) {
337 thread = m->read[m->handler.pfds[i].fd];
338
339 if (!thread)
340 vty_out(vty, "ERROR ");
341 else
342 vty_out(vty, "%s ", thread->funcname);
343 } else
344 vty_out(vty, " ");
345
346 if (m->handler.pfds[i].events & POLLOUT) {
347 thread = m->write[m->handler.pfds[i].fd];
348
349 if (!thread)
350 vty_out(vty, "ERROR\n");
351 else
352 vty_out(vty, "%s\n", thread->funcname);
353 } else
354 vty_out(vty, "\n");
355 }
356 }
357
358 DEFUN (show_thread_poll,
359 show_thread_poll_cmd,
360 "show thread poll",
361 SHOW_STR
362 "Thread information\n"
363 "Show poll FD's and information\n")
364 {
365 struct listnode *node;
366 struct thread_master *m;
367
368 frr_with_mutex(&masters_mtx) {
369 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
370 show_thread_poll_helper(vty, m);
371 }
372 }
373
374 return CMD_SUCCESS;
375 }
376
377
378 DEFUN (clear_thread_cpu,
379 clear_thread_cpu_cmd,
380 "clear thread cpu [FILTER]",
381 "Clear stored data in all pthreads\n"
382 "Thread information\n"
383 "Thread CPU usage\n"
384 "Display filter (rwtexb)\n")
385 {
386 uint8_t filter = (uint8_t)-1U;
387 int idx = 0;
388
389 if (argv_find(argv, argc, "FILTER", &idx)) {
390 filter = parse_filter(argv[idx]->arg);
391 if (!filter) {
392 vty_out(vty,
393 "Invalid filter \"%s\" specified; must contain at least"
394 "one of 'RWTEXB'\n",
395 argv[idx]->arg);
396 return CMD_WARNING;
397 }
398 }
399
400 cpu_record_clear(filter);
401 return CMD_SUCCESS;
402 }
403
404 void thread_cmd_init(void)
405 {
406 install_element(VIEW_NODE, &show_thread_cpu_cmd);
407 install_element(VIEW_NODE, &show_thread_poll_cmd);
408 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
409 }
410 /* CLI end ------------------------------------------------------------------ */
411
412
413 static void cancelreq_del(void *cr)
414 {
415 XFREE(MTYPE_TMP, cr);
416 }
417
418 /* initializer, only ever called once */
419 static void initializer(void)
420 {
421 pthread_key_create(&thread_current, NULL);
422 }
423
424 struct thread_master *thread_master_create(const char *name)
425 {
426 struct thread_master *rv;
427 struct rlimit limit;
428
429 pthread_once(&init_once, &initializer);
430
431 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
432
433 /* Initialize master mutex */
434 pthread_mutex_init(&rv->mtx, NULL);
435 pthread_cond_init(&rv->cancel_cond, NULL);
436
437 /* Set name */
438 rv->name = name ? XSTRDUP(MTYPE_THREAD_MASTER, name) : NULL;
439
440 /* Initialize I/O task data structures */
441 getrlimit(RLIMIT_NOFILE, &limit);
442 rv->fd_limit = (int)limit.rlim_cur;
443 rv->read = XCALLOC(MTYPE_THREAD_POLL,
444 sizeof(struct thread *) * rv->fd_limit);
445
446 rv->write = XCALLOC(MTYPE_THREAD_POLL,
447 sizeof(struct thread *) * rv->fd_limit);
448
449 rv->cpu_record = hash_create_size(
450 8, (unsigned int (*)(const void *))cpu_record_hash_key,
451 (bool (*)(const void *, const void *))cpu_record_hash_cmp,
452 "Thread Hash");
453
454 thread_list_init(&rv->event);
455 thread_list_init(&rv->ready);
456 thread_list_init(&rv->unuse);
457 thread_timer_list_init(&rv->timer);
458
459 /* Initialize thread_fetch() settings */
460 rv->spin = true;
461 rv->handle_signals = true;
462
463 /* Set pthread owner, should be updated by actual owner */
464 rv->owner = pthread_self();
465 rv->cancel_req = list_new();
466 rv->cancel_req->del = cancelreq_del;
467 rv->canceled = true;
468
469 /* Initialize pipe poker */
470 pipe(rv->io_pipe);
471 set_nonblocking(rv->io_pipe[0]);
472 set_nonblocking(rv->io_pipe[1]);
473
474 /* Initialize data structures for poll() */
475 rv->handler.pfdsize = rv->fd_limit;
476 rv->handler.pfdcount = 0;
477 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
478 sizeof(struct pollfd) * rv->handler.pfdsize);
479 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
480 sizeof(struct pollfd) * rv->handler.pfdsize);
481
482 /* add to list of threadmasters */
483 frr_with_mutex(&masters_mtx) {
484 if (!masters)
485 masters = list_new();
486
487 listnode_add(masters, rv);
488 }
489
490 return rv;
491 }
492
493 void thread_master_set_name(struct thread_master *master, const char *name)
494 {
495 frr_with_mutex(&master->mtx) {
496 XFREE(MTYPE_THREAD_MASTER, master->name);
497 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
498 }
499 }
500
501 #define THREAD_UNUSED_DEPTH 10
502
503 /* Move thread to unuse list. */
504 static void thread_add_unuse(struct thread_master *m, struct thread *thread)
505 {
506 pthread_mutex_t mtxc = thread->mtx;
507
508 assert(m != NULL && thread != NULL);
509
510 thread->hist->total_active--;
511 memset(thread, 0, sizeof(struct thread));
512 thread->type = THREAD_UNUSED;
513
514 /* Restore the thread mutex context. */
515 thread->mtx = mtxc;
516
517 if (thread_list_count(&m->unuse) < THREAD_UNUSED_DEPTH) {
518 thread_list_add_tail(&m->unuse, thread);
519 return;
520 }
521
522 thread_free(m, thread);
523 }
524
525 /* Free all unused thread. */
526 static void thread_list_free(struct thread_master *m,
527 struct thread_list_head *list)
528 {
529 struct thread *t;
530
531 while ((t = thread_list_pop(list)))
532 thread_free(m, t);
533 }
534
535 static void thread_array_free(struct thread_master *m,
536 struct thread **thread_array)
537 {
538 struct thread *t;
539 int index;
540
541 for (index = 0; index < m->fd_limit; ++index) {
542 t = thread_array[index];
543 if (t) {
544 thread_array[index] = NULL;
545 thread_free(m, t);
546 }
547 }
548 XFREE(MTYPE_THREAD_POLL, thread_array);
549 }
550
551 /*
552 * thread_master_free_unused
553 *
554 * As threads are finished with they are put on the
555 * unuse list for later reuse.
556 * If we are shutting down, Free up unused threads
557 * So we can see if we forget to shut anything off
558 */
559 void thread_master_free_unused(struct thread_master *m)
560 {
561 frr_with_mutex(&m->mtx) {
562 struct thread *t;
563 while ((t = thread_list_pop(&m->unuse)))
564 thread_free(m, t);
565 }
566 }
567
568 /* Stop thread scheduler. */
569 void thread_master_free(struct thread_master *m)
570 {
571 struct thread *t;
572
573 frr_with_mutex(&masters_mtx) {
574 listnode_delete(masters, m);
575 if (masters->count == 0) {
576 list_delete(&masters);
577 }
578 }
579
580 thread_array_free(m, m->read);
581 thread_array_free(m, m->write);
582 while ((t = thread_timer_list_pop(&m->timer)))
583 thread_free(m, t);
584 thread_list_free(m, &m->event);
585 thread_list_free(m, &m->ready);
586 thread_list_free(m, &m->unuse);
587 pthread_mutex_destroy(&m->mtx);
588 pthread_cond_destroy(&m->cancel_cond);
589 close(m->io_pipe[0]);
590 close(m->io_pipe[1]);
591 list_delete(&m->cancel_req);
592 m->cancel_req = NULL;
593
594 hash_clean(m->cpu_record, cpu_record_hash_free);
595 hash_free(m->cpu_record);
596 m->cpu_record = NULL;
597
598 XFREE(MTYPE_THREAD_MASTER, m->name);
599 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
600 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
601 XFREE(MTYPE_THREAD_MASTER, m);
602 }
603
604 /* Return remain time in miliseconds. */
605 unsigned long thread_timer_remain_msec(struct thread *thread)
606 {
607 int64_t remain;
608
609 frr_with_mutex(&thread->mtx) {
610 remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
611 }
612
613 return remain < 0 ? 0 : remain;
614 }
615
616 /* Return remain time in seconds. */
617 unsigned long thread_timer_remain_second(struct thread *thread)
618 {
619 return thread_timer_remain_msec(thread) / 1000LL;
620 }
621
622 #define debugargdef const char *funcname, const char *schedfrom, int fromln
623 #define debugargpass funcname, schedfrom, fromln
624
625 struct timeval thread_timer_remain(struct thread *thread)
626 {
627 struct timeval remain;
628 frr_with_mutex(&thread->mtx) {
629 monotime_until(&thread->u.sands, &remain);
630 }
631 return remain;
632 }
633
634 /* Get new thread. */
635 static struct thread *thread_get(struct thread_master *m, uint8_t type,
636 int (*func)(struct thread *), void *arg,
637 debugargdef)
638 {
639 struct thread *thread = thread_list_pop(&m->unuse);
640 struct cpu_thread_history tmp;
641
642 if (!thread) {
643 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
644 /* mutex only needs to be initialized at struct creation. */
645 pthread_mutex_init(&thread->mtx, NULL);
646 m->alloc++;
647 }
648
649 thread->type = type;
650 thread->add_type = type;
651 thread->master = m;
652 thread->arg = arg;
653 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
654 thread->ref = NULL;
655
656 /*
657 * So if the passed in funcname is not what we have
658 * stored that means the thread->hist needs to be
659 * updated. We keep the last one around in unused
660 * under the assumption that we are probably
661 * going to immediately allocate the same
662 * type of thread.
663 * This hopefully saves us some serious
664 * hash_get lookups.
665 */
666 if (thread->funcname != funcname || thread->func != func) {
667 tmp.func = func;
668 tmp.funcname = funcname;
669 thread->hist =
670 hash_get(m->cpu_record, &tmp,
671 (void *(*)(void *))cpu_record_hash_alloc);
672 }
673 thread->hist->total_active++;
674 thread->func = func;
675 thread->funcname = funcname;
676 thread->schedfrom = schedfrom;
677 thread->schedfrom_line = fromln;
678
679 return thread;
680 }
681
682 static void thread_free(struct thread_master *master, struct thread *thread)
683 {
684 /* Update statistics. */
685 assert(master->alloc > 0);
686 master->alloc--;
687
688 /* Free allocated resources. */
689 pthread_mutex_destroy(&thread->mtx);
690 XFREE(MTYPE_THREAD, thread);
691 }
692
693 static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
694 nfds_t count, const struct timeval *timer_wait)
695 {
696 /* If timer_wait is null here, that means poll() should block
697 * indefinitely,
698 * unless the thread_master has overridden it by setting
699 * ->selectpoll_timeout.
700 * If the value is positive, it specifies the maximum number of
701 * milliseconds
702 * to wait. If the timeout is -1, it specifies that we should never wait
703 * and
704 * always return immediately even if no event is detected. If the value
705 * is
706 * zero, the behavior is default. */
707 int timeout = -1;
708
709 /* number of file descriptors with events */
710 int num;
711
712 if (timer_wait != NULL
713 && m->selectpoll_timeout == 0) // use the default value
714 timeout = (timer_wait->tv_sec * 1000)
715 + (timer_wait->tv_usec / 1000);
716 else if (m->selectpoll_timeout > 0) // use the user's timeout
717 timeout = m->selectpoll_timeout;
718 else if (m->selectpoll_timeout
719 < 0) // effect a poll (return immediately)
720 timeout = 0;
721
722 rcu_read_unlock();
723 rcu_assert_read_unlocked();
724
725 /* add poll pipe poker */
726 assert(count + 1 < pfdsize);
727 pfds[count].fd = m->io_pipe[0];
728 pfds[count].events = POLLIN;
729 pfds[count].revents = 0x00;
730
731 num = poll(pfds, count + 1, timeout);
732
733 unsigned char trash[64];
734 if (num > 0 && pfds[count].revents != 0 && num--)
735 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
736 ;
737
738 rcu_read_lock();
739
740 return num;
741 }
742
743 /* Add new read thread. */
744 struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
745 int (*func)(struct thread *),
746 void *arg, int fd,
747 struct thread **t_ptr,
748 debugargdef)
749 {
750 struct thread *thread = NULL;
751 struct thread **thread_array;
752
753 assert(fd >= 0 && fd < m->fd_limit);
754 frr_with_mutex(&m->mtx) {
755 if (t_ptr && *t_ptr)
756 // thread is already scheduled; don't reschedule
757 break;
758
759 /* default to a new pollfd */
760 nfds_t queuepos = m->handler.pfdcount;
761
762 if (dir == THREAD_READ)
763 thread_array = m->read;
764 else
765 thread_array = m->write;
766
767 /* if we already have a pollfd for our file descriptor, find and
768 * use it */
769 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
770 if (m->handler.pfds[i].fd == fd) {
771 queuepos = i;
772
773 #ifdef DEV_BUILD
774 /*
775 * What happens if we have a thread already
776 * created for this event?
777 */
778 if (thread_array[fd])
779 assert(!"Thread already scheduled for file descriptor");
780 #endif
781 break;
782 }
783
784 /* make sure we have room for this fd + pipe poker fd */
785 assert(queuepos + 1 < m->handler.pfdsize);
786
787 thread = thread_get(m, dir, func, arg, debugargpass);
788
789 m->handler.pfds[queuepos].fd = fd;
790 m->handler.pfds[queuepos].events |=
791 (dir == THREAD_READ ? POLLIN : POLLOUT);
792
793 if (queuepos == m->handler.pfdcount)
794 m->handler.pfdcount++;
795
796 if (thread) {
797 frr_with_mutex(&thread->mtx) {
798 thread->u.fd = fd;
799 thread_array[thread->u.fd] = thread;
800 }
801
802 if (t_ptr) {
803 *t_ptr = thread;
804 thread->ref = t_ptr;
805 }
806 }
807
808 AWAKEN(m);
809 }
810
811 return thread;
812 }
813
814 static struct thread *
815 funcname_thread_add_timer_timeval(struct thread_master *m,
816 int (*func)(struct thread *), int type,
817 void *arg, struct timeval *time_relative,
818 struct thread **t_ptr, debugargdef)
819 {
820 struct thread *thread;
821
822 assert(m != NULL);
823
824 assert(type == THREAD_TIMER);
825 assert(time_relative);
826
827 frr_with_mutex(&m->mtx) {
828 if (t_ptr && *t_ptr)
829 // thread is already scheduled; don't reschedule
830 return NULL;
831
832 thread = thread_get(m, type, func, arg, debugargpass);
833
834 frr_with_mutex(&thread->mtx) {
835 monotime(&thread->u.sands);
836 timeradd(&thread->u.sands, time_relative,
837 &thread->u.sands);
838 thread_timer_list_add(&m->timer, thread);
839 if (t_ptr) {
840 *t_ptr = thread;
841 thread->ref = t_ptr;
842 }
843 }
844
845 AWAKEN(m);
846 }
847
848 return thread;
849 }
850
851
852 /* Add timer event thread. */
853 struct thread *funcname_thread_add_timer(struct thread_master *m,
854 int (*func)(struct thread *),
855 void *arg, long timer,
856 struct thread **t_ptr, debugargdef)
857 {
858 struct timeval trel;
859
860 assert(m != NULL);
861
862 trel.tv_sec = timer;
863 trel.tv_usec = 0;
864
865 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
866 &trel, t_ptr, debugargpass);
867 }
868
869 /* Add timer event thread with "millisecond" resolution */
870 struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
871 int (*func)(struct thread *),
872 void *arg, long timer,
873 struct thread **t_ptr,
874 debugargdef)
875 {
876 struct timeval trel;
877
878 assert(m != NULL);
879
880 trel.tv_sec = timer / 1000;
881 trel.tv_usec = 1000 * (timer % 1000);
882
883 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
884 &trel, t_ptr, debugargpass);
885 }
886
887 /* Add timer event thread with "millisecond" resolution */
888 struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
889 int (*func)(struct thread *),
890 void *arg, struct timeval *tv,
891 struct thread **t_ptr, debugargdef)
892 {
893 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
894 t_ptr, debugargpass);
895 }
896
897 /* Add simple event thread. */
898 struct thread *funcname_thread_add_event(struct thread_master *m,
899 int (*func)(struct thread *),
900 void *arg, int val,
901 struct thread **t_ptr, debugargdef)
902 {
903 struct thread *thread = NULL;
904
905 assert(m != NULL);
906
907 frr_with_mutex(&m->mtx) {
908 if (t_ptr && *t_ptr)
909 // thread is already scheduled; don't reschedule
910 break;
911
912 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
913 frr_with_mutex(&thread->mtx) {
914 thread->u.val = val;
915 thread_list_add_tail(&m->event, thread);
916 }
917
918 if (t_ptr) {
919 *t_ptr = thread;
920 thread->ref = t_ptr;
921 }
922
923 AWAKEN(m);
924 }
925
926 return thread;
927 }
928
929 /* Thread cancellation ------------------------------------------------------ */
930
931 /**
932 * NOT's out the .events field of pollfd corresponding to the given file
933 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
934 *
935 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
936 * implementation for details.
937 *
938 * @param master
939 * @param fd
940 * @param state the event to cancel. One or more (OR'd together) of the
941 * following:
942 * - POLLIN
943 * - POLLOUT
944 */
945 static void thread_cancel_rw(struct thread_master *master, int fd, short state)
946 {
947 bool found = false;
948
949 /* Cancel POLLHUP too just in case some bozo set it */
950 state |= POLLHUP;
951
952 /* find the index of corresponding pollfd */
953 nfds_t i;
954
955 for (i = 0; i < master->handler.pfdcount; i++)
956 if (master->handler.pfds[i].fd == fd) {
957 found = true;
958 break;
959 }
960
961 if (!found) {
962 zlog_debug(
963 "[!] Received cancellation request for nonexistent rw job");
964 zlog_debug("[!] threadmaster: %s | fd: %d",
965 master->name ? master->name : "", fd);
966 return;
967 }
968
969 /* NOT out event. */
970 master->handler.pfds[i].events &= ~(state);
971
972 /* If all events are canceled, delete / resize the pollfd array. */
973 if (master->handler.pfds[i].events == 0) {
974 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
975 (master->handler.pfdcount - i - 1)
976 * sizeof(struct pollfd));
977 master->handler.pfdcount--;
978 master->handler.pfds[master->handler.pfdcount].fd = 0;
979 master->handler.pfds[master->handler.pfdcount].events = 0;
980 }
981
982 /* If we have the same pollfd in the copy, perform the same operations,
983 * otherwise return. */
984 if (i >= master->handler.copycount)
985 return;
986
987 master->handler.copy[i].events &= ~(state);
988
989 if (master->handler.copy[i].events == 0) {
990 memmove(master->handler.copy + i, master->handler.copy + i + 1,
991 (master->handler.copycount - i - 1)
992 * sizeof(struct pollfd));
993 master->handler.copycount--;
994 master->handler.copy[master->handler.copycount].fd = 0;
995 master->handler.copy[master->handler.copycount].events = 0;
996 }
997 }
998
999 /**
1000 * Process cancellation requests.
1001 *
1002 * This may only be run from the pthread which owns the thread_master.
1003 *
1004 * @param master the thread master to process
1005 * @REQUIRE master->mtx
1006 */
1007 static void do_thread_cancel(struct thread_master *master)
1008 {
1009 struct thread_list_head *list = NULL;
1010 struct thread **thread_array = NULL;
1011 struct thread *thread;
1012
1013 struct cancel_req *cr;
1014 struct listnode *ln;
1015 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1016 /* If this is an event object cancellation, linear search
1017 * through event
1018 * list deleting any events which have the specified argument.
1019 * We also
1020 * need to check every thread in the ready queue. */
1021 if (cr->eventobj) {
1022 struct thread *t;
1023
1024 frr_each_safe(thread_list, &master->event, t) {
1025 if (t->arg != cr->eventobj)
1026 continue;
1027 thread_list_del(&master->event, t);
1028 if (t->ref)
1029 *t->ref = NULL;
1030 thread_add_unuse(master, t);
1031 }
1032
1033 frr_each_safe(thread_list, &master->ready, t) {
1034 if (t->arg != cr->eventobj)
1035 continue;
1036 thread_list_del(&master->ready, t);
1037 if (t->ref)
1038 *t->ref = NULL;
1039 thread_add_unuse(master, t);
1040 }
1041 continue;
1042 }
1043
1044 /* The pointer varies depending on whether the cancellation
1045 * request was
1046 * made asynchronously or not. If it was, we need to check
1047 * whether the
1048 * thread even exists anymore before cancelling it. */
1049 thread = (cr->thread) ? cr->thread : *cr->threadref;
1050
1051 if (!thread)
1052 continue;
1053
1054 /* Determine the appropriate queue to cancel the thread from */
1055 switch (thread->type) {
1056 case THREAD_READ:
1057 thread_cancel_rw(master, thread->u.fd, POLLIN);
1058 thread_array = master->read;
1059 break;
1060 case THREAD_WRITE:
1061 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1062 thread_array = master->write;
1063 break;
1064 case THREAD_TIMER:
1065 thread_timer_list_del(&master->timer, thread);
1066 break;
1067 case THREAD_EVENT:
1068 list = &master->event;
1069 break;
1070 case THREAD_READY:
1071 list = &master->ready;
1072 break;
1073 default:
1074 continue;
1075 break;
1076 }
1077
1078 if (list) {
1079 thread_list_del(list, thread);
1080 } else if (thread_array) {
1081 thread_array[thread->u.fd] = NULL;
1082 }
1083
1084 if (thread->ref)
1085 *thread->ref = NULL;
1086
1087 thread_add_unuse(thread->master, thread);
1088 }
1089
1090 /* Delete and free all cancellation requests */
1091 list_delete_all_node(master->cancel_req);
1092
1093 /* Wake up any threads which may be blocked in thread_cancel_async() */
1094 master->canceled = true;
1095 pthread_cond_broadcast(&master->cancel_cond);
1096 }
1097
1098 /**
1099 * Cancel any events which have the specified argument.
1100 *
1101 * MT-Unsafe
1102 *
1103 * @param m the thread_master to cancel from
1104 * @param arg the argument passed when creating the event
1105 */
1106 void thread_cancel_event(struct thread_master *master, void *arg)
1107 {
1108 assert(master->owner == pthread_self());
1109
1110 frr_with_mutex(&master->mtx) {
1111 struct cancel_req *cr =
1112 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1113 cr->eventobj = arg;
1114 listnode_add(master->cancel_req, cr);
1115 do_thread_cancel(master);
1116 }
1117 }
1118
1119 /**
1120 * Cancel a specific task.
1121 *
1122 * MT-Unsafe
1123 *
1124 * @param thread task to cancel
1125 */
1126 void thread_cancel(struct thread *thread)
1127 {
1128 struct thread_master *master = thread->master;
1129
1130 assert(master->owner == pthread_self());
1131
1132 frr_with_mutex(&master->mtx) {
1133 struct cancel_req *cr =
1134 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1135 cr->thread = thread;
1136 listnode_add(master->cancel_req, cr);
1137 do_thread_cancel(master);
1138 }
1139 }
1140
1141 /**
1142 * Asynchronous cancellation.
1143 *
1144 * Called with either a struct thread ** or void * to an event argument,
1145 * this function posts the correct cancellation request and blocks until it is
1146 * serviced.
1147 *
1148 * If the thread is currently running, execution blocks until it completes.
1149 *
1150 * The last two parameters are mutually exclusive, i.e. if you pass one the
1151 * other must be NULL.
1152 *
1153 * When the cancellation procedure executes on the target thread_master, the
1154 * thread * provided is checked for nullity. If it is null, the thread is
1155 * assumed to no longer exist and the cancellation request is a no-op. Thus
1156 * users of this API must pass a back-reference when scheduling the original
1157 * task.
1158 *
1159 * MT-Safe
1160 *
1161 * @param master the thread master with the relevant event / task
1162 * @param thread pointer to thread to cancel
1163 * @param eventobj the event
1164 */
1165 void thread_cancel_async(struct thread_master *master, struct thread **thread,
1166 void *eventobj)
1167 {
1168 assert(!(thread && eventobj) && (thread || eventobj));
1169 assert(master->owner != pthread_self());
1170
1171 frr_with_mutex(&master->mtx) {
1172 master->canceled = false;
1173
1174 if (thread) {
1175 struct cancel_req *cr =
1176 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1177 cr->threadref = thread;
1178 listnode_add(master->cancel_req, cr);
1179 } else if (eventobj) {
1180 struct cancel_req *cr =
1181 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1182 cr->eventobj = eventobj;
1183 listnode_add(master->cancel_req, cr);
1184 }
1185 AWAKEN(master);
1186
1187 while (!master->canceled)
1188 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1189 }
1190 }
1191 /* ------------------------------------------------------------------------- */
1192
1193 static struct timeval *thread_timer_wait(struct thread_timer_list_head *timers,
1194 struct timeval *timer_val)
1195 {
1196 if (!thread_timer_list_count(timers))
1197 return NULL;
1198
1199 struct thread *next_timer = thread_timer_list_first(timers);
1200 monotime_until(&next_timer->u.sands, timer_val);
1201 return timer_val;
1202 }
1203
1204 static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1205 struct thread *fetch)
1206 {
1207 *fetch = *thread;
1208 thread_add_unuse(m, thread);
1209 return fetch;
1210 }
1211
1212 static int thread_process_io_helper(struct thread_master *m,
1213 struct thread *thread, short state,
1214 short actual_state, int pos)
1215 {
1216 struct thread **thread_array;
1217
1218 /*
1219 * poll() clears the .events field, but the pollfd array we
1220 * pass to poll() is a copy of the one used to schedule threads.
1221 * We need to synchronize state between the two here by applying
1222 * the same changes poll() made on the copy of the "real" pollfd
1223 * array.
1224 *
1225 * This cleans up a possible infinite loop where we refuse
1226 * to respond to a poll event but poll is insistent that
1227 * we should.
1228 */
1229 m->handler.pfds[pos].events &= ~(state);
1230
1231 if (!thread) {
1232 if ((actual_state & (POLLHUP|POLLIN)) != POLLHUP)
1233 flog_err(EC_LIB_NO_THREAD,
1234 "Attempting to process an I/O event but for fd: %d(%d) no thread to handle this!\n",
1235 m->handler.pfds[pos].fd, actual_state);
1236 return 0;
1237 }
1238
1239 if (thread->type == THREAD_READ)
1240 thread_array = m->read;
1241 else
1242 thread_array = m->write;
1243
1244 thread_array[thread->u.fd] = NULL;
1245 thread_list_add_tail(&m->ready, thread);
1246 thread->type = THREAD_READY;
1247
1248 return 1;
1249 }
1250
1251 /**
1252 * Process I/O events.
1253 *
1254 * Walks through file descriptor array looking for those pollfds whose .revents
1255 * field has something interesting. Deletes any invalid file descriptors.
1256 *
1257 * @param m the thread master
1258 * @param num the number of active file descriptors (return value of poll())
1259 */
1260 static void thread_process_io(struct thread_master *m, unsigned int num)
1261 {
1262 unsigned int ready = 0;
1263 struct pollfd *pfds = m->handler.copy;
1264
1265 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1266 /* no event for current fd? immediately continue */
1267 if (pfds[i].revents == 0)
1268 continue;
1269
1270 ready++;
1271
1272 /* Unless someone has called thread_cancel from another pthread,
1273 * the only
1274 * thing that could have changed in m->handler.pfds while we
1275 * were
1276 * asleep is the .events field in a given pollfd. Barring
1277 * thread_cancel()
1278 * that value should be a superset of the values we have in our
1279 * copy, so
1280 * there's no need to update it. Similarily, barring deletion,
1281 * the fd
1282 * should still be a valid index into the master's pfds. */
1283 if (pfds[i].revents & (POLLIN | POLLHUP)) {
1284 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
1285 pfds[i].revents, i);
1286 }
1287 if (pfds[i].revents & POLLOUT)
1288 thread_process_io_helper(m, m->write[pfds[i].fd],
1289 POLLOUT, pfds[i].revents, i);
1290
1291 /* if one of our file descriptors is garbage, remove the same
1292 * from
1293 * both pfds + update sizes and index */
1294 if (pfds[i].revents & POLLNVAL) {
1295 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1296 (m->handler.pfdcount - i - 1)
1297 * sizeof(struct pollfd));
1298 m->handler.pfdcount--;
1299 m->handler.pfds[m->handler.pfdcount].fd = 0;
1300 m->handler.pfds[m->handler.pfdcount].events = 0;
1301
1302 memmove(pfds + i, pfds + i + 1,
1303 (m->handler.copycount - i - 1)
1304 * sizeof(struct pollfd));
1305 m->handler.copycount--;
1306 m->handler.copy[m->handler.copycount].fd = 0;
1307 m->handler.copy[m->handler.copycount].events = 0;
1308
1309 i--;
1310 }
1311 }
1312 }
1313
1314 /* Add all timers that have popped to the ready list. */
1315 static unsigned int thread_process_timers(struct thread_timer_list_head *timers,
1316 struct timeval *timenow)
1317 {
1318 struct thread *thread;
1319 unsigned int ready = 0;
1320
1321 while ((thread = thread_timer_list_first(timers))) {
1322 if (timercmp(timenow, &thread->u.sands, <))
1323 return ready;
1324 thread_timer_list_pop(timers);
1325 thread->type = THREAD_READY;
1326 thread_list_add_tail(&thread->master->ready, thread);
1327 ready++;
1328 }
1329 return ready;
1330 }
1331
1332 /* process a list en masse, e.g. for event thread lists */
1333 static unsigned int thread_process(struct thread_list_head *list)
1334 {
1335 struct thread *thread;
1336 unsigned int ready = 0;
1337
1338 while ((thread = thread_list_pop(list))) {
1339 thread->type = THREAD_READY;
1340 thread_list_add_tail(&thread->master->ready, thread);
1341 ready++;
1342 }
1343 return ready;
1344 }
1345
1346
1347 /* Fetch next ready thread. */
1348 struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
1349 {
1350 struct thread *thread = NULL;
1351 struct timeval now;
1352 struct timeval zerotime = {0, 0};
1353 struct timeval tv;
1354 struct timeval *tw = NULL;
1355
1356 int num = 0;
1357
1358 do {
1359 /* Handle signals if any */
1360 if (m->handle_signals)
1361 quagga_sigevent_process();
1362
1363 pthread_mutex_lock(&m->mtx);
1364
1365 /* Process any pending cancellation requests */
1366 do_thread_cancel(m);
1367
1368 /*
1369 * Attempt to flush ready queue before going into poll().
1370 * This is performance-critical. Think twice before modifying.
1371 */
1372 if ((thread = thread_list_pop(&m->ready))) {
1373 fetch = thread_run(m, thread, fetch);
1374 if (fetch->ref)
1375 *fetch->ref = NULL;
1376 pthread_mutex_unlock(&m->mtx);
1377 break;
1378 }
1379
1380 /* otherwise, tick through scheduling sequence */
1381
1382 /*
1383 * Post events to ready queue. This must come before the
1384 * following block since events should occur immediately
1385 */
1386 thread_process(&m->event);
1387
1388 /*
1389 * If there are no tasks on the ready queue, we will poll()
1390 * until a timer expires or we receive I/O, whichever comes
1391 * first. The strategy for doing this is:
1392 *
1393 * - If there are events pending, set the poll() timeout to zero
1394 * - If there are no events pending, but there are timers
1395 * pending, set the
1396 * timeout to the smallest remaining time on any timer
1397 * - If there are neither timers nor events pending, but there
1398 * are file
1399 * descriptors pending, block indefinitely in poll()
1400 * - If nothing is pending, it's time for the application to die
1401 *
1402 * In every case except the last, we need to hit poll() at least
1403 * once per loop to avoid starvation by events
1404 */
1405 if (!thread_list_count(&m->ready))
1406 tw = thread_timer_wait(&m->timer, &tv);
1407
1408 if (thread_list_count(&m->ready) ||
1409 (tw && !timercmp(tw, &zerotime, >)))
1410 tw = &zerotime;
1411
1412 if (!tw && m->handler.pfdcount == 0) { /* die */
1413 pthread_mutex_unlock(&m->mtx);
1414 fetch = NULL;
1415 break;
1416 }
1417
1418 /*
1419 * Copy pollfd array + # active pollfds in it. Not necessary to
1420 * copy the array size as this is fixed.
1421 */
1422 m->handler.copycount = m->handler.pfdcount;
1423 memcpy(m->handler.copy, m->handler.pfds,
1424 m->handler.copycount * sizeof(struct pollfd));
1425
1426 pthread_mutex_unlock(&m->mtx);
1427 {
1428 num = fd_poll(m, m->handler.copy, m->handler.pfdsize,
1429 m->handler.copycount, tw);
1430 }
1431 pthread_mutex_lock(&m->mtx);
1432
1433 /* Handle any errors received in poll() */
1434 if (num < 0) {
1435 if (errno == EINTR) {
1436 pthread_mutex_unlock(&m->mtx);
1437 /* loop around to signal handler */
1438 continue;
1439 }
1440
1441 /* else die */
1442 flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
1443 safe_strerror(errno));
1444 pthread_mutex_unlock(&m->mtx);
1445 fetch = NULL;
1446 break;
1447 }
1448
1449 /* Post timers to ready queue. */
1450 monotime(&now);
1451 thread_process_timers(&m->timer, &now);
1452
1453 /* Post I/O to ready queue. */
1454 if (num > 0)
1455 thread_process_io(m, num);
1456
1457 pthread_mutex_unlock(&m->mtx);
1458
1459 } while (!thread && m->spin);
1460
1461 return fetch;
1462 }
1463
1464 static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
1465 {
1466 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1467 + (a.tv_usec - b.tv_usec));
1468 }
1469
1470 unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1471 unsigned long *cputime)
1472 {
1473 /* This is 'user + sys' time. */
1474 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1475 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1476 return timeval_elapsed(now->real, start->real);
1477 }
1478
1479 /* We should aim to yield after yield milliseconds, which defaults
1480 to THREAD_YIELD_TIME_SLOT .
1481 Note: we are using real (wall clock) time for this calculation.
1482 It could be argued that CPU time may make more sense in certain
1483 contexts. The things to consider are whether the thread may have
1484 blocked (in which case wall time increases, but CPU time does not),
1485 or whether the system is heavily loaded with other processes competing
1486 for CPU time. On balance, wall clock time seems to make sense.
1487 Plus it has the added benefit that gettimeofday should be faster
1488 than calling getrusage. */
1489 int thread_should_yield(struct thread *thread)
1490 {
1491 int result;
1492 frr_with_mutex(&thread->mtx) {
1493 result = monotime_since(&thread->real, NULL)
1494 > (int64_t)thread->yield;
1495 }
1496 return result;
1497 }
1498
1499 void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
1500 {
1501 frr_with_mutex(&thread->mtx) {
1502 thread->yield = yield_time;
1503 }
1504 }
1505
1506 void thread_getrusage(RUSAGE_T *r)
1507 {
1508 #if defined RUSAGE_THREAD
1509 #define FRR_RUSAGE RUSAGE_THREAD
1510 #else
1511 #define FRR_RUSAGE RUSAGE_SELF
1512 #endif
1513 monotime(&r->real);
1514 getrusage(FRR_RUSAGE, &(r->cpu));
1515 }
1516
1517 /*
1518 * Call a thread.
1519 *
1520 * This function will atomically update the thread's usage history. At present
1521 * this is the only spot where usage history is written. Nevertheless the code
1522 * has been written such that the introduction of writers in the future should
1523 * not need to update it provided the writers atomically perform only the
1524 * operations done here, i.e. updating the total and maximum times. In
1525 * particular, the maximum real and cpu times must be monotonically increasing
1526 * or this code is not correct.
1527 */
1528 void thread_call(struct thread *thread)
1529 {
1530 _Atomic unsigned long realtime, cputime;
1531 unsigned long exp;
1532 unsigned long helper;
1533 RUSAGE_T before, after;
1534
1535 GETRUSAGE(&before);
1536 thread->real = before.real;
1537
1538 pthread_setspecific(thread_current, thread);
1539 (*thread->func)(thread);
1540 pthread_setspecific(thread_current, NULL);
1541
1542 GETRUSAGE(&after);
1543
1544 realtime = thread_consumed_time(&after, &before, &helper);
1545 cputime = helper;
1546
1547 /* update realtime */
1548 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1549 memory_order_seq_cst);
1550 exp = atomic_load_explicit(&thread->hist->real.max,
1551 memory_order_seq_cst);
1552 while (exp < realtime
1553 && !atomic_compare_exchange_weak_explicit(
1554 &thread->hist->real.max, &exp, realtime,
1555 memory_order_seq_cst, memory_order_seq_cst))
1556 ;
1557
1558 /* update cputime */
1559 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1560 memory_order_seq_cst);
1561 exp = atomic_load_explicit(&thread->hist->cpu.max,
1562 memory_order_seq_cst);
1563 while (exp < cputime
1564 && !atomic_compare_exchange_weak_explicit(
1565 &thread->hist->cpu.max, &exp, cputime,
1566 memory_order_seq_cst, memory_order_seq_cst))
1567 ;
1568
1569 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1570 memory_order_seq_cst);
1571 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1572 memory_order_seq_cst);
1573
1574 #ifdef CONSUMED_TIME_CHECK
1575 if (realtime > CONSUMED_TIME_CHECK) {
1576 /*
1577 * We have a CPU Hog on our hands.
1578 * Whinge about it now, so we're aware this is yet another task
1579 * to fix.
1580 */
1581 flog_warn(
1582 EC_LIB_SLOW_THREAD,
1583 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1584 thread->funcname, (unsigned long)thread->func,
1585 realtime / 1000, cputime / 1000);
1586 }
1587 #endif /* CONSUMED_TIME_CHECK */
1588 }
1589
1590 /* Execute thread */
1591 void funcname_thread_execute(struct thread_master *m,
1592 int (*func)(struct thread *), void *arg, int val,
1593 debugargdef)
1594 {
1595 struct thread *thread;
1596
1597 /* Get or allocate new thread to execute. */
1598 frr_with_mutex(&m->mtx) {
1599 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
1600
1601 /* Set its event value. */
1602 frr_with_mutex(&thread->mtx) {
1603 thread->add_type = THREAD_EXECUTE;
1604 thread->u.val = val;
1605 thread->ref = &thread;
1606 }
1607 }
1608
1609 /* Execute thread doing all accounting. */
1610 thread_call(thread);
1611
1612 /* Give back or free thread. */
1613 thread_add_unuse(m, thread);
1614 }