1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
4 * This file is part of GNU Zebra.
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 #include <sys/resource.h>
35 #include "frratomic.h"
36 #include "frr_pthread.h"
37 #include "lib_errors.h"
39 DEFINE_MTYPE_STATIC(LIB
, THREAD
, "Thread")
40 DEFINE_MTYPE_STATIC(LIB
, THREAD_MASTER
, "Thread master")
41 DEFINE_MTYPE_STATIC(LIB
, THREAD_POLL
, "Thread Poll Info")
42 DEFINE_MTYPE_STATIC(LIB
, THREAD_STATS
, "Thread stats")
44 DECLARE_LIST(thread_list
, struct thread
, threaditem
)
46 static int thread_timer_cmp(const struct thread
*a
, const struct thread
*b
)
48 if (a
->u
.sands
.tv_sec
< b
->u
.sands
.tv_sec
)
50 if (a
->u
.sands
.tv_sec
> b
->u
.sands
.tv_sec
)
52 if (a
->u
.sands
.tv_usec
< b
->u
.sands
.tv_usec
)
54 if (a
->u
.sands
.tv_usec
> b
->u
.sands
.tv_usec
)
59 DECLARE_HEAP(thread_timer_list
, struct thread
, timeritem
,
62 #if defined(__APPLE__)
63 #include <mach/mach.h>
64 #include <mach/mach_time.h>
69 const unsigned char wakebyte = 0x01; \
70 write(m->io_pipe[1], &wakebyte, 1); \
73 /* control variable for initializer */
74 static pthread_once_t init_once
= PTHREAD_ONCE_INIT
;
75 pthread_key_t thread_current
;
77 static pthread_mutex_t masters_mtx
= PTHREAD_MUTEX_INITIALIZER
;
78 static struct list
*masters
;
80 static void thread_free(struct thread_master
*master
, struct thread
*thread
);
82 /* CLI start ---------------------------------------------------------------- */
83 static unsigned int cpu_record_hash_key(const struct cpu_thread_history
*a
)
85 int size
= sizeof(a
->func
);
87 return jhash(&a
->func
, size
, 0);
90 static bool cpu_record_hash_cmp(const struct cpu_thread_history
*a
,
91 const struct cpu_thread_history
*b
)
93 return a
->func
== b
->func
;
96 static void *cpu_record_hash_alloc(struct cpu_thread_history
*a
)
98 struct cpu_thread_history
*new;
99 new = XCALLOC(MTYPE_THREAD_STATS
, sizeof(struct cpu_thread_history
));
101 new->funcname
= a
->funcname
;
105 static void cpu_record_hash_free(void *a
)
107 struct cpu_thread_history
*hist
= a
;
109 XFREE(MTYPE_THREAD_STATS
, hist
);
112 #ifndef EXCLUDE_CPU_TIME
113 static void vty_out_cpu_thread_history(struct vty
*vty
,
114 struct cpu_thread_history
*a
)
116 vty_out(vty
, "%5zu %10zu.%03zu %9zu %8zu %9zu %8zu %9zu",
117 (size_t)a
->total_active
, a
->cpu
.total
/ 1000,
118 a
->cpu
.total
% 1000, (size_t)a
->total_calls
,
119 (size_t)(a
->cpu
.total
/ a
->total_calls
), a
->cpu
.max
,
120 (size_t)(a
->real
.total
/ a
->total_calls
), a
->real
.max
);
121 vty_out(vty
, " %c%c%c%c%c %s\n",
122 a
->types
& (1 << THREAD_READ
) ? 'R' : ' ',
123 a
->types
& (1 << THREAD_WRITE
) ? 'W' : ' ',
124 a
->types
& (1 << THREAD_TIMER
) ? 'T' : ' ',
125 a
->types
& (1 << THREAD_EVENT
) ? 'E' : ' ',
126 a
->types
& (1 << THREAD_EXECUTE
) ? 'X' : ' ', a
->funcname
);
129 static void cpu_record_hash_print(struct hash_bucket
*bucket
, void *args
[])
131 struct cpu_thread_history
*totals
= args
[0];
132 struct cpu_thread_history copy
;
133 struct vty
*vty
= args
[1];
134 uint8_t *filter
= args
[2];
136 struct cpu_thread_history
*a
= bucket
->data
;
139 atomic_load_explicit(&a
->total_active
, memory_order_seq_cst
);
141 atomic_load_explicit(&a
->total_calls
, memory_order_seq_cst
);
143 atomic_load_explicit(&a
->cpu
.total
, memory_order_seq_cst
);
144 copy
.cpu
.max
= atomic_load_explicit(&a
->cpu
.max
, memory_order_seq_cst
);
146 atomic_load_explicit(&a
->real
.total
, memory_order_seq_cst
);
148 atomic_load_explicit(&a
->real
.max
, memory_order_seq_cst
);
149 copy
.types
= atomic_load_explicit(&a
->types
, memory_order_seq_cst
);
150 copy
.funcname
= a
->funcname
;
152 if (!(copy
.types
& *filter
))
155 vty_out_cpu_thread_history(vty
, ©
);
156 totals
->total_active
+= copy
.total_active
;
157 totals
->total_calls
+= copy
.total_calls
;
158 totals
->real
.total
+= copy
.real
.total
;
159 if (totals
->real
.max
< copy
.real
.max
)
160 totals
->real
.max
= copy
.real
.max
;
161 totals
->cpu
.total
+= copy
.cpu
.total
;
162 if (totals
->cpu
.max
< copy
.cpu
.max
)
163 totals
->cpu
.max
= copy
.cpu
.max
;
166 static void cpu_record_print(struct vty
*vty
, uint8_t filter
)
168 struct cpu_thread_history tmp
;
169 void *args
[3] = {&tmp
, vty
, &filter
};
170 struct thread_master
*m
;
173 memset(&tmp
, 0, sizeof(tmp
));
174 tmp
.funcname
= "TOTAL";
177 frr_with_mutex(&masters_mtx
) {
178 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
179 const char *name
= m
->name
? m
->name
: "main";
181 char underline
[strlen(name
) + 1];
182 memset(underline
, '-', sizeof(underline
));
183 underline
[sizeof(underline
) - 1] = '\0';
186 vty_out(vty
, "Showing statistics for pthread %s\n",
188 vty_out(vty
, "-------------------------------%s\n",
190 vty_out(vty
, "%21s %18s %18s\n", "",
191 "CPU (user+system):", "Real (wall-clock):");
193 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
194 vty_out(vty
, " Avg uSec Max uSecs");
195 vty_out(vty
, " Type Thread\n");
197 if (m
->cpu_record
->count
)
200 (void (*)(struct hash_bucket
*,
201 void *))cpu_record_hash_print
,
204 vty_out(vty
, "No data to display yet.\n");
211 vty_out(vty
, "Total thread statistics\n");
212 vty_out(vty
, "-------------------------\n");
213 vty_out(vty
, "%21s %18s %18s\n", "",
214 "CPU (user+system):", "Real (wall-clock):");
215 vty_out(vty
, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
216 vty_out(vty
, " Avg uSec Max uSecs");
217 vty_out(vty
, " Type Thread\n");
219 if (tmp
.total_calls
> 0)
220 vty_out_cpu_thread_history(vty
, &tmp
);
224 static void cpu_record_hash_clear(struct hash_bucket
*bucket
, void *args
[])
226 uint8_t *filter
= args
[0];
227 struct hash
*cpu_record
= args
[1];
229 struct cpu_thread_history
*a
= bucket
->data
;
231 if (!(a
->types
& *filter
))
234 hash_release(cpu_record
, bucket
->data
);
237 static void cpu_record_clear(uint8_t filter
)
239 uint8_t *tmp
= &filter
;
240 struct thread_master
*m
;
243 frr_with_mutex(&masters_mtx
) {
244 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
245 frr_with_mutex(&m
->mtx
) {
246 void *args
[2] = {tmp
, m
->cpu_record
};
249 (void (*)(struct hash_bucket
*,
250 void *))cpu_record_hash_clear
,
257 static uint8_t parse_filter(const char *filterstr
)
262 while (filterstr
[i
] != '\0') {
263 switch (filterstr
[i
]) {
266 filter
|= (1 << THREAD_READ
);
270 filter
|= (1 << THREAD_WRITE
);
274 filter
|= (1 << THREAD_TIMER
);
278 filter
|= (1 << THREAD_EVENT
);
282 filter
|= (1 << THREAD_EXECUTE
);
292 #ifndef EXCLUDE_CPU_TIME
293 DEFUN (show_thread_cpu
,
295 "show thread cpu [FILTER]",
297 "Thread information\n"
299 "Display filter (rwtex)\n")
301 uint8_t filter
= (uint8_t)-1U;
304 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
305 filter
= parse_filter(argv
[idx
]->arg
);
308 "Invalid filter \"%s\" specified; must contain at least"
315 cpu_record_print(vty
, filter
);
320 static void show_thread_poll_helper(struct vty
*vty
, struct thread_master
*m
)
322 const char *name
= m
->name
? m
->name
: "main";
323 char underline
[strlen(name
) + 1];
324 struct thread
*thread
;
327 memset(underline
, '-', sizeof(underline
));
328 underline
[sizeof(underline
) - 1] = '\0';
330 vty_out(vty
, "\nShowing poll FD's for %s\n", name
);
331 vty_out(vty
, "----------------------%s\n", underline
);
332 vty_out(vty
, "Count: %u/%d\n", (uint32_t)m
->handler
.pfdcount
,
334 for (i
= 0; i
< m
->handler
.pfdcount
; i
++) {
335 vty_out(vty
, "\t%6d fd:%6d events:%2d revents:%2d\t\t", i
,
336 m
->handler
.pfds
[i
].fd
, m
->handler
.pfds
[i
].events
,
337 m
->handler
.pfds
[i
].revents
);
339 if (m
->handler
.pfds
[i
].events
& POLLIN
) {
340 thread
= m
->read
[m
->handler
.pfds
[i
].fd
];
343 vty_out(vty
, "ERROR ");
345 vty_out(vty
, "%s ", thread
->funcname
);
349 if (m
->handler
.pfds
[i
].events
& POLLOUT
) {
350 thread
= m
->write
[m
->handler
.pfds
[i
].fd
];
353 vty_out(vty
, "ERROR\n");
355 vty_out(vty
, "%s\n", thread
->funcname
);
361 DEFUN (show_thread_poll
,
362 show_thread_poll_cmd
,
365 "Thread information\n"
366 "Show poll FD's and information\n")
368 struct listnode
*node
;
369 struct thread_master
*m
;
371 frr_with_mutex(&masters_mtx
) {
372 for (ALL_LIST_ELEMENTS_RO(masters
, node
, m
)) {
373 show_thread_poll_helper(vty
, m
);
381 DEFUN (clear_thread_cpu
,
382 clear_thread_cpu_cmd
,
383 "clear thread cpu [FILTER]",
384 "Clear stored data in all pthreads\n"
385 "Thread information\n"
387 "Display filter (rwtexb)\n")
389 uint8_t filter
= (uint8_t)-1U;
392 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
393 filter
= parse_filter(argv
[idx
]->arg
);
396 "Invalid filter \"%s\" specified; must contain at least"
403 cpu_record_clear(filter
);
407 void thread_cmd_init(void)
409 #ifndef EXCLUDE_CPU_TIME
410 install_element(VIEW_NODE
, &show_thread_cpu_cmd
);
412 install_element(VIEW_NODE
, &show_thread_poll_cmd
);
413 install_element(ENABLE_NODE
, &clear_thread_cpu_cmd
);
415 /* CLI end ------------------------------------------------------------------ */
418 static void cancelreq_del(void *cr
)
420 XFREE(MTYPE_TMP
, cr
);
423 /* initializer, only ever called once */
424 static void initializer(void)
426 pthread_key_create(&thread_current
, NULL
);
429 struct thread_master
*thread_master_create(const char *name
)
431 struct thread_master
*rv
;
434 pthread_once(&init_once
, &initializer
);
436 rv
= XCALLOC(MTYPE_THREAD_MASTER
, sizeof(struct thread_master
));
438 /* Initialize master mutex */
439 pthread_mutex_init(&rv
->mtx
, NULL
);
440 pthread_cond_init(&rv
->cancel_cond
, NULL
);
443 rv
->name
= name
? XSTRDUP(MTYPE_THREAD_MASTER
, name
) : NULL
;
445 /* Initialize I/O task data structures */
446 getrlimit(RLIMIT_NOFILE
, &limit
);
447 rv
->fd_limit
= (int)limit
.rlim_cur
;
448 rv
->read
= XCALLOC(MTYPE_THREAD_POLL
,
449 sizeof(struct thread
*) * rv
->fd_limit
);
451 rv
->write
= XCALLOC(MTYPE_THREAD_POLL
,
452 sizeof(struct thread
*) * rv
->fd_limit
);
454 rv
->cpu_record
= hash_create_size(
455 8, (unsigned int (*)(const void *))cpu_record_hash_key
,
456 (bool (*)(const void *, const void *))cpu_record_hash_cmp
,
459 thread_list_init(&rv
->event
);
460 thread_list_init(&rv
->ready
);
461 thread_list_init(&rv
->unuse
);
462 thread_timer_list_init(&rv
->timer
);
464 /* Initialize thread_fetch() settings */
466 rv
->handle_signals
= true;
468 /* Set pthread owner, should be updated by actual owner */
469 rv
->owner
= pthread_self();
470 rv
->cancel_req
= list_new();
471 rv
->cancel_req
->del
= cancelreq_del
;
474 /* Initialize pipe poker */
476 set_nonblocking(rv
->io_pipe
[0]);
477 set_nonblocking(rv
->io_pipe
[1]);
479 /* Initialize data structures for poll() */
480 rv
->handler
.pfdsize
= rv
->fd_limit
;
481 rv
->handler
.pfdcount
= 0;
482 rv
->handler
.pfds
= XCALLOC(MTYPE_THREAD_MASTER
,
483 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
484 rv
->handler
.copy
= XCALLOC(MTYPE_THREAD_MASTER
,
485 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
487 /* add to list of threadmasters */
488 frr_with_mutex(&masters_mtx
) {
490 masters
= list_new();
492 listnode_add(masters
, rv
);
498 void thread_master_set_name(struct thread_master
*master
, const char *name
)
500 frr_with_mutex(&master
->mtx
) {
501 XFREE(MTYPE_THREAD_MASTER
, master
->name
);
502 master
->name
= XSTRDUP(MTYPE_THREAD_MASTER
, name
);
506 #define THREAD_UNUSED_DEPTH 10
508 /* Move thread to unuse list. */
509 static void thread_add_unuse(struct thread_master
*m
, struct thread
*thread
)
511 pthread_mutex_t mtxc
= thread
->mtx
;
513 assert(m
!= NULL
&& thread
!= NULL
);
515 thread
->hist
->total_active
--;
516 memset(thread
, 0, sizeof(struct thread
));
517 thread
->type
= THREAD_UNUSED
;
519 /* Restore the thread mutex context. */
522 if (thread_list_count(&m
->unuse
) < THREAD_UNUSED_DEPTH
) {
523 thread_list_add_tail(&m
->unuse
, thread
);
527 thread_free(m
, thread
);
530 /* Free all unused thread. */
531 static void thread_list_free(struct thread_master
*m
,
532 struct thread_list_head
*list
)
536 while ((t
= thread_list_pop(list
)))
540 static void thread_array_free(struct thread_master
*m
,
541 struct thread
**thread_array
)
546 for (index
= 0; index
< m
->fd_limit
; ++index
) {
547 t
= thread_array
[index
];
549 thread_array
[index
] = NULL
;
553 XFREE(MTYPE_THREAD_POLL
, thread_array
);
557 * thread_master_free_unused
559 * As threads are finished with they are put on the
560 * unuse list for later reuse.
561 * If we are shutting down, Free up unused threads
562 * So we can see if we forget to shut anything off
564 void thread_master_free_unused(struct thread_master
*m
)
566 frr_with_mutex(&m
->mtx
) {
568 while ((t
= thread_list_pop(&m
->unuse
)))
573 /* Stop thread scheduler. */
574 void thread_master_free(struct thread_master
*m
)
578 frr_with_mutex(&masters_mtx
) {
579 listnode_delete(masters
, m
);
580 if (masters
->count
== 0) {
581 list_delete(&masters
);
585 thread_array_free(m
, m
->read
);
586 thread_array_free(m
, m
->write
);
587 while ((t
= thread_timer_list_pop(&m
->timer
)))
589 thread_list_free(m
, &m
->event
);
590 thread_list_free(m
, &m
->ready
);
591 thread_list_free(m
, &m
->unuse
);
592 pthread_mutex_destroy(&m
->mtx
);
593 pthread_cond_destroy(&m
->cancel_cond
);
594 close(m
->io_pipe
[0]);
595 close(m
->io_pipe
[1]);
596 list_delete(&m
->cancel_req
);
597 m
->cancel_req
= NULL
;
599 hash_clean(m
->cpu_record
, cpu_record_hash_free
);
600 hash_free(m
->cpu_record
);
601 m
->cpu_record
= NULL
;
603 XFREE(MTYPE_THREAD_MASTER
, m
->name
);
604 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.pfds
);
605 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.copy
);
606 XFREE(MTYPE_THREAD_MASTER
, m
);
609 /* Return remain time in miliseconds. */
610 unsigned long thread_timer_remain_msec(struct thread
*thread
)
614 frr_with_mutex(&thread
->mtx
) {
615 remain
= monotime_until(&thread
->u
.sands
, NULL
) / 1000LL;
618 return remain
< 0 ? 0 : remain
;
621 /* Return remain time in seconds. */
622 unsigned long thread_timer_remain_second(struct thread
*thread
)
624 return thread_timer_remain_msec(thread
) / 1000LL;
627 #define debugargdef const char *funcname, const char *schedfrom, int fromln
628 #define debugargpass funcname, schedfrom, fromln
630 struct timeval
thread_timer_remain(struct thread
*thread
)
632 struct timeval remain
;
633 frr_with_mutex(&thread
->mtx
) {
634 monotime_until(&thread
->u
.sands
, &remain
);
639 /* Get new thread. */
640 static struct thread
*thread_get(struct thread_master
*m
, uint8_t type
,
641 int (*func
)(struct thread
*), void *arg
,
644 struct thread
*thread
= thread_list_pop(&m
->unuse
);
645 struct cpu_thread_history tmp
;
648 thread
= XCALLOC(MTYPE_THREAD
, sizeof(struct thread
));
649 /* mutex only needs to be initialized at struct creation. */
650 pthread_mutex_init(&thread
->mtx
, NULL
);
655 thread
->add_type
= type
;
658 thread
->yield
= THREAD_YIELD_TIME_SLOT
; /* default */
662 * So if the passed in funcname is not what we have
663 * stored that means the thread->hist needs to be
664 * updated. We keep the last one around in unused
665 * under the assumption that we are probably
666 * going to immediately allocate the same
668 * This hopefully saves us some serious
671 if (thread
->funcname
!= funcname
|| thread
->func
!= func
) {
673 tmp
.funcname
= funcname
;
675 hash_get(m
->cpu_record
, &tmp
,
676 (void *(*)(void *))cpu_record_hash_alloc
);
678 thread
->hist
->total_active
++;
680 thread
->funcname
= funcname
;
681 thread
->schedfrom
= schedfrom
;
682 thread
->schedfrom_line
= fromln
;
687 static void thread_free(struct thread_master
*master
, struct thread
*thread
)
689 /* Update statistics. */
690 assert(master
->alloc
> 0);
693 /* Free allocated resources. */
694 pthread_mutex_destroy(&thread
->mtx
);
695 XFREE(MTYPE_THREAD
, thread
);
698 static int fd_poll(struct thread_master
*m
, struct pollfd
*pfds
, nfds_t pfdsize
,
699 nfds_t count
, const struct timeval
*timer_wait
)
701 /* If timer_wait is null here, that means poll() should block
703 * unless the thread_master has overridden it by setting
704 * ->selectpoll_timeout.
705 * If the value is positive, it specifies the maximum number of
707 * to wait. If the timeout is -1, it specifies that we should never wait
709 * always return immediately even if no event is detected. If the value
711 * zero, the behavior is default. */
714 /* number of file descriptors with events */
717 if (timer_wait
!= NULL
718 && m
->selectpoll_timeout
== 0) // use the default value
719 timeout
= (timer_wait
->tv_sec
* 1000)
720 + (timer_wait
->tv_usec
/ 1000);
721 else if (m
->selectpoll_timeout
> 0) // use the user's timeout
722 timeout
= m
->selectpoll_timeout
;
723 else if (m
->selectpoll_timeout
724 < 0) // effect a poll (return immediately)
727 zlog_tls_buffer_flush();
729 rcu_assert_read_unlocked();
731 /* add poll pipe poker */
732 assert(count
+ 1 < pfdsize
);
733 pfds
[count
].fd
= m
->io_pipe
[0];
734 pfds
[count
].events
= POLLIN
;
735 pfds
[count
].revents
= 0x00;
737 num
= poll(pfds
, count
+ 1, timeout
);
739 unsigned char trash
[64];
740 if (num
> 0 && pfds
[count
].revents
!= 0 && num
--)
741 while (read(m
->io_pipe
[0], &trash
, sizeof(trash
)) > 0)
749 /* Add new read thread. */
750 struct thread
*funcname_thread_add_read_write(int dir
, struct thread_master
*m
,
751 int (*func
)(struct thread
*),
753 struct thread
**t_ptr
,
756 struct thread
*thread
= NULL
;
757 struct thread
**thread_array
;
759 assert(fd
>= 0 && fd
< m
->fd_limit
);
760 frr_with_mutex(&m
->mtx
) {
762 // thread is already scheduled; don't reschedule
765 /* default to a new pollfd */
766 nfds_t queuepos
= m
->handler
.pfdcount
;
768 if (dir
== THREAD_READ
)
769 thread_array
= m
->read
;
771 thread_array
= m
->write
;
773 /* if we already have a pollfd for our file descriptor, find and
775 for (nfds_t i
= 0; i
< m
->handler
.pfdcount
; i
++)
776 if (m
->handler
.pfds
[i
].fd
== fd
) {
781 * What happens if we have a thread already
782 * created for this event?
784 if (thread_array
[fd
])
785 assert(!"Thread already scheduled for file descriptor");
790 /* make sure we have room for this fd + pipe poker fd */
791 assert(queuepos
+ 1 < m
->handler
.pfdsize
);
793 thread
= thread_get(m
, dir
, func
, arg
, debugargpass
);
795 m
->handler
.pfds
[queuepos
].fd
= fd
;
796 m
->handler
.pfds
[queuepos
].events
|=
797 (dir
== THREAD_READ
? POLLIN
: POLLOUT
);
799 if (queuepos
== m
->handler
.pfdcount
)
800 m
->handler
.pfdcount
++;
803 frr_with_mutex(&thread
->mtx
) {
805 thread_array
[thread
->u
.fd
] = thread
;
820 static struct thread
*
821 funcname_thread_add_timer_timeval(struct thread_master
*m
,
822 int (*func
)(struct thread
*), int type
,
823 void *arg
, struct timeval
*time_relative
,
824 struct thread
**t_ptr
, debugargdef
)
826 struct thread
*thread
;
830 assert(type
== THREAD_TIMER
);
831 assert(time_relative
);
833 frr_with_mutex(&m
->mtx
) {
835 // thread is already scheduled; don't reschedule
838 thread
= thread_get(m
, type
, func
, arg
, debugargpass
);
840 frr_with_mutex(&thread
->mtx
) {
841 monotime(&thread
->u
.sands
);
842 timeradd(&thread
->u
.sands
, time_relative
,
844 thread_timer_list_add(&m
->timer
, thread
);
858 /* Add timer event thread. */
859 struct thread
*funcname_thread_add_timer(struct thread_master
*m
,
860 int (*func
)(struct thread
*),
861 void *arg
, long timer
,
862 struct thread
**t_ptr
, debugargdef
)
871 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
872 &trel
, t_ptr
, debugargpass
);
875 /* Add timer event thread with "millisecond" resolution */
876 struct thread
*funcname_thread_add_timer_msec(struct thread_master
*m
,
877 int (*func
)(struct thread
*),
878 void *arg
, long timer
,
879 struct thread
**t_ptr
,
886 trel
.tv_sec
= timer
/ 1000;
887 trel
.tv_usec
= 1000 * (timer
% 1000);
889 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
890 &trel
, t_ptr
, debugargpass
);
893 /* Add timer event thread with "millisecond" resolution */
894 struct thread
*funcname_thread_add_timer_tv(struct thread_master
*m
,
895 int (*func
)(struct thread
*),
896 void *arg
, struct timeval
*tv
,
897 struct thread
**t_ptr
, debugargdef
)
899 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
, tv
,
900 t_ptr
, debugargpass
);
903 /* Add simple event thread. */
904 struct thread
*funcname_thread_add_event(struct thread_master
*m
,
905 int (*func
)(struct thread
*),
907 struct thread
**t_ptr
, debugargdef
)
909 struct thread
*thread
= NULL
;
913 frr_with_mutex(&m
->mtx
) {
915 // thread is already scheduled; don't reschedule
918 thread
= thread_get(m
, THREAD_EVENT
, func
, arg
, debugargpass
);
919 frr_with_mutex(&thread
->mtx
) {
921 thread_list_add_tail(&m
->event
, thread
);
935 /* Thread cancellation ------------------------------------------------------ */
938 * NOT's out the .events field of pollfd corresponding to the given file
939 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
941 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
942 * implementation for details.
946 * @param state the event to cancel. One or more (OR'd together) of the
951 static void thread_cancel_rw(struct thread_master
*master
, int fd
, short state
)
955 /* Cancel POLLHUP too just in case some bozo set it */
958 /* find the index of corresponding pollfd */
961 for (i
= 0; i
< master
->handler
.pfdcount
; i
++)
962 if (master
->handler
.pfds
[i
].fd
== fd
) {
969 "[!] Received cancellation request for nonexistent rw job");
970 zlog_debug("[!] threadmaster: %s | fd: %d",
971 master
->name
? master
->name
: "", fd
);
976 master
->handler
.pfds
[i
].events
&= ~(state
);
978 /* If all events are canceled, delete / resize the pollfd array. */
979 if (master
->handler
.pfds
[i
].events
== 0) {
980 memmove(master
->handler
.pfds
+ i
, master
->handler
.pfds
+ i
+ 1,
981 (master
->handler
.pfdcount
- i
- 1)
982 * sizeof(struct pollfd
));
983 master
->handler
.pfdcount
--;
984 master
->handler
.pfds
[master
->handler
.pfdcount
].fd
= 0;
985 master
->handler
.pfds
[master
->handler
.pfdcount
].events
= 0;
988 /* If we have the same pollfd in the copy, perform the same operations,
989 * otherwise return. */
990 if (i
>= master
->handler
.copycount
)
993 master
->handler
.copy
[i
].events
&= ~(state
);
995 if (master
->handler
.copy
[i
].events
== 0) {
996 memmove(master
->handler
.copy
+ i
, master
->handler
.copy
+ i
+ 1,
997 (master
->handler
.copycount
- i
- 1)
998 * sizeof(struct pollfd
));
999 master
->handler
.copycount
--;
1000 master
->handler
.copy
[master
->handler
.copycount
].fd
= 0;
1001 master
->handler
.copy
[master
->handler
.copycount
].events
= 0;
1006 * Process cancellation requests.
1008 * This may only be run from the pthread which owns the thread_master.
1010 * @param master the thread master to process
1011 * @REQUIRE master->mtx
1013 static void do_thread_cancel(struct thread_master
*master
)
1015 struct thread_list_head
*list
= NULL
;
1016 struct thread
**thread_array
= NULL
;
1017 struct thread
*thread
;
1019 struct cancel_req
*cr
;
1020 struct listnode
*ln
;
1021 for (ALL_LIST_ELEMENTS_RO(master
->cancel_req
, ln
, cr
)) {
1022 /* If this is an event object cancellation, linear search
1024 * list deleting any events which have the specified argument.
1026 * need to check every thread in the ready queue. */
1030 frr_each_safe(thread_list
, &master
->event
, t
) {
1031 if (t
->arg
!= cr
->eventobj
)
1033 thread_list_del(&master
->event
, t
);
1036 thread_add_unuse(master
, t
);
1039 frr_each_safe(thread_list
, &master
->ready
, t
) {
1040 if (t
->arg
!= cr
->eventobj
)
1042 thread_list_del(&master
->ready
, t
);
1045 thread_add_unuse(master
, t
);
1050 /* The pointer varies depending on whether the cancellation
1052 * made asynchronously or not. If it was, we need to check
1054 * thread even exists anymore before cancelling it. */
1055 thread
= (cr
->thread
) ? cr
->thread
: *cr
->threadref
;
1060 /* Determine the appropriate queue to cancel the thread from */
1061 switch (thread
->type
) {
1063 thread_cancel_rw(master
, thread
->u
.fd
, POLLIN
);
1064 thread_array
= master
->read
;
1067 thread_cancel_rw(master
, thread
->u
.fd
, POLLOUT
);
1068 thread_array
= master
->write
;
1071 thread_timer_list_del(&master
->timer
, thread
);
1074 list
= &master
->event
;
1077 list
= &master
->ready
;
1085 thread_list_del(list
, thread
);
1086 } else if (thread_array
) {
1087 thread_array
[thread
->u
.fd
] = NULL
;
1091 *thread
->ref
= NULL
;
1093 thread_add_unuse(thread
->master
, thread
);
1096 /* Delete and free all cancellation requests */
1097 list_delete_all_node(master
->cancel_req
);
1099 /* Wake up any threads which may be blocked in thread_cancel_async() */
1100 master
->canceled
= true;
1101 pthread_cond_broadcast(&master
->cancel_cond
);
1105 * Cancel any events which have the specified argument.
1109 * @param m the thread_master to cancel from
1110 * @param arg the argument passed when creating the event
1112 void thread_cancel_event(struct thread_master
*master
, void *arg
)
1114 assert(master
->owner
== pthread_self());
1116 frr_with_mutex(&master
->mtx
) {
1117 struct cancel_req
*cr
=
1118 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1120 listnode_add(master
->cancel_req
, cr
);
1121 do_thread_cancel(master
);
1126 * Cancel a specific task.
1130 * @param thread task to cancel
1132 void thread_cancel(struct thread
*thread
)
1134 struct thread_master
*master
= thread
->master
;
1136 assert(master
->owner
== pthread_self());
1138 frr_with_mutex(&master
->mtx
) {
1139 struct cancel_req
*cr
=
1140 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1141 cr
->thread
= thread
;
1142 listnode_add(master
->cancel_req
, cr
);
1143 do_thread_cancel(master
);
1148 * Asynchronous cancellation.
1150 * Called with either a struct thread ** or void * to an event argument,
1151 * this function posts the correct cancellation request and blocks until it is
1154 * If the thread is currently running, execution blocks until it completes.
1156 * The last two parameters are mutually exclusive, i.e. if you pass one the
1157 * other must be NULL.
1159 * When the cancellation procedure executes on the target thread_master, the
1160 * thread * provided is checked for nullity. If it is null, the thread is
1161 * assumed to no longer exist and the cancellation request is a no-op. Thus
1162 * users of this API must pass a back-reference when scheduling the original
1167 * @param master the thread master with the relevant event / task
1168 * @param thread pointer to thread to cancel
1169 * @param eventobj the event
1171 void thread_cancel_async(struct thread_master
*master
, struct thread
**thread
,
1174 assert(!(thread
&& eventobj
) && (thread
|| eventobj
));
1175 assert(master
->owner
!= pthread_self());
1177 frr_with_mutex(&master
->mtx
) {
1178 master
->canceled
= false;
1181 struct cancel_req
*cr
=
1182 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1183 cr
->threadref
= thread
;
1184 listnode_add(master
->cancel_req
, cr
);
1185 } else if (eventobj
) {
1186 struct cancel_req
*cr
=
1187 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1188 cr
->eventobj
= eventobj
;
1189 listnode_add(master
->cancel_req
, cr
);
1193 while (!master
->canceled
)
1194 pthread_cond_wait(&master
->cancel_cond
, &master
->mtx
);
1197 /* ------------------------------------------------------------------------- */
1199 static struct timeval
*thread_timer_wait(struct thread_timer_list_head
*timers
,
1200 struct timeval
*timer_val
)
1202 if (!thread_timer_list_count(timers
))
1205 struct thread
*next_timer
= thread_timer_list_first(timers
);
1206 monotime_until(&next_timer
->u
.sands
, timer_val
);
1210 static struct thread
*thread_run(struct thread_master
*m
, struct thread
*thread
,
1211 struct thread
*fetch
)
1214 thread_add_unuse(m
, thread
);
1218 static int thread_process_io_helper(struct thread_master
*m
,
1219 struct thread
*thread
, short state
,
1220 short actual_state
, int pos
)
1222 struct thread
**thread_array
;
1225 * poll() clears the .events field, but the pollfd array we
1226 * pass to poll() is a copy of the one used to schedule threads.
1227 * We need to synchronize state between the two here by applying
1228 * the same changes poll() made on the copy of the "real" pollfd
1231 * This cleans up a possible infinite loop where we refuse
1232 * to respond to a poll event but poll is insistent that
1235 m
->handler
.pfds
[pos
].events
&= ~(state
);
1238 if ((actual_state
& (POLLHUP
|POLLIN
)) != POLLHUP
)
1239 flog_err(EC_LIB_NO_THREAD
,
1240 "Attempting to process an I/O event but for fd: %d(%d) no thread to handle this!\n",
1241 m
->handler
.pfds
[pos
].fd
, actual_state
);
1245 if (thread
->type
== THREAD_READ
)
1246 thread_array
= m
->read
;
1248 thread_array
= m
->write
;
1250 thread_array
[thread
->u
.fd
] = NULL
;
1251 thread_list_add_tail(&m
->ready
, thread
);
1252 thread
->type
= THREAD_READY
;
1258 * Process I/O events.
1260 * Walks through file descriptor array looking for those pollfds whose .revents
1261 * field has something interesting. Deletes any invalid file descriptors.
1263 * @param m the thread master
1264 * @param num the number of active file descriptors (return value of poll())
1266 static void thread_process_io(struct thread_master
*m
, unsigned int num
)
1268 unsigned int ready
= 0;
1269 struct pollfd
*pfds
= m
->handler
.copy
;
1271 for (nfds_t i
= 0; i
< m
->handler
.copycount
&& ready
< num
; ++i
) {
1272 /* no event for current fd? immediately continue */
1273 if (pfds
[i
].revents
== 0)
1278 /* Unless someone has called thread_cancel from another pthread,
1280 * thing that could have changed in m->handler.pfds while we
1282 * asleep is the .events field in a given pollfd. Barring
1284 * that value should be a superset of the values we have in our
1286 * there's no need to update it. Similarily, barring deletion,
1288 * should still be a valid index into the master's pfds. */
1289 if (pfds
[i
].revents
& (POLLIN
| POLLHUP
)) {
1290 thread_process_io_helper(m
, m
->read
[pfds
[i
].fd
], POLLIN
,
1291 pfds
[i
].revents
, i
);
1293 if (pfds
[i
].revents
& POLLOUT
)
1294 thread_process_io_helper(m
, m
->write
[pfds
[i
].fd
],
1295 POLLOUT
, pfds
[i
].revents
, i
);
1297 /* if one of our file descriptors is garbage, remove the same
1299 * both pfds + update sizes and index */
1300 if (pfds
[i
].revents
& POLLNVAL
) {
1301 memmove(m
->handler
.pfds
+ i
, m
->handler
.pfds
+ i
+ 1,
1302 (m
->handler
.pfdcount
- i
- 1)
1303 * sizeof(struct pollfd
));
1304 m
->handler
.pfdcount
--;
1305 m
->handler
.pfds
[m
->handler
.pfdcount
].fd
= 0;
1306 m
->handler
.pfds
[m
->handler
.pfdcount
].events
= 0;
1308 memmove(pfds
+ i
, pfds
+ i
+ 1,
1309 (m
->handler
.copycount
- i
- 1)
1310 * sizeof(struct pollfd
));
1311 m
->handler
.copycount
--;
1312 m
->handler
.copy
[m
->handler
.copycount
].fd
= 0;
1313 m
->handler
.copy
[m
->handler
.copycount
].events
= 0;
1320 /* Add all timers that have popped to the ready list. */
1321 static unsigned int thread_process_timers(struct thread_timer_list_head
*timers
,
1322 struct timeval
*timenow
)
1324 struct thread
*thread
;
1325 unsigned int ready
= 0;
1327 while ((thread
= thread_timer_list_first(timers
))) {
1328 if (timercmp(timenow
, &thread
->u
.sands
, <))
1330 thread_timer_list_pop(timers
);
1331 thread
->type
= THREAD_READY
;
1332 thread_list_add_tail(&thread
->master
->ready
, thread
);
1338 /* process a list en masse, e.g. for event thread lists */
1339 static unsigned int thread_process(struct thread_list_head
*list
)
1341 struct thread
*thread
;
1342 unsigned int ready
= 0;
1344 while ((thread
= thread_list_pop(list
))) {
1345 thread
->type
= THREAD_READY
;
1346 thread_list_add_tail(&thread
->master
->ready
, thread
);
1353 /* Fetch next ready thread. */
1354 struct thread
*thread_fetch(struct thread_master
*m
, struct thread
*fetch
)
1356 struct thread
*thread
= NULL
;
1358 struct timeval zerotime
= {0, 0};
1360 struct timeval
*tw
= NULL
;
1365 /* Handle signals if any */
1366 if (m
->handle_signals
)
1367 quagga_sigevent_process();
1369 pthread_mutex_lock(&m
->mtx
);
1371 /* Process any pending cancellation requests */
1372 do_thread_cancel(m
);
1375 * Attempt to flush ready queue before going into poll().
1376 * This is performance-critical. Think twice before modifying.
1378 if ((thread
= thread_list_pop(&m
->ready
))) {
1379 fetch
= thread_run(m
, thread
, fetch
);
1382 pthread_mutex_unlock(&m
->mtx
);
1386 /* otherwise, tick through scheduling sequence */
1389 * Post events to ready queue. This must come before the
1390 * following block since events should occur immediately
1392 thread_process(&m
->event
);
1395 * If there are no tasks on the ready queue, we will poll()
1396 * until a timer expires or we receive I/O, whichever comes
1397 * first. The strategy for doing this is:
1399 * - If there are events pending, set the poll() timeout to zero
1400 * - If there are no events pending, but there are timers
1402 * timeout to the smallest remaining time on any timer
1403 * - If there are neither timers nor events pending, but there
1405 * descriptors pending, block indefinitely in poll()
1406 * - If nothing is pending, it's time for the application to die
1408 * In every case except the last, we need to hit poll() at least
1409 * once per loop to avoid starvation by events
1411 if (!thread_list_count(&m
->ready
))
1412 tw
= thread_timer_wait(&m
->timer
, &tv
);
1414 if (thread_list_count(&m
->ready
) ||
1415 (tw
&& !timercmp(tw
, &zerotime
, >)))
1418 if (!tw
&& m
->handler
.pfdcount
== 0) { /* die */
1419 pthread_mutex_unlock(&m
->mtx
);
1425 * Copy pollfd array + # active pollfds in it. Not necessary to
1426 * copy the array size as this is fixed.
1428 m
->handler
.copycount
= m
->handler
.pfdcount
;
1429 memcpy(m
->handler
.copy
, m
->handler
.pfds
,
1430 m
->handler
.copycount
* sizeof(struct pollfd
));
1432 pthread_mutex_unlock(&m
->mtx
);
1434 num
= fd_poll(m
, m
->handler
.copy
, m
->handler
.pfdsize
,
1435 m
->handler
.copycount
, tw
);
1437 pthread_mutex_lock(&m
->mtx
);
1439 /* Handle any errors received in poll() */
1441 if (errno
== EINTR
) {
1442 pthread_mutex_unlock(&m
->mtx
);
1443 /* loop around to signal handler */
1448 flog_err(EC_LIB_SYSTEM_CALL
, "poll() error: %s",
1449 safe_strerror(errno
));
1450 pthread_mutex_unlock(&m
->mtx
);
1455 /* Post timers to ready queue. */
1457 thread_process_timers(&m
->timer
, &now
);
1459 /* Post I/O to ready queue. */
1461 thread_process_io(m
, num
);
1463 pthread_mutex_unlock(&m
->mtx
);
1465 } while (!thread
&& m
->spin
);
1470 static unsigned long timeval_elapsed(struct timeval a
, struct timeval b
)
1472 return (((a
.tv_sec
- b
.tv_sec
) * TIMER_SECOND_MICRO
)
1473 + (a
.tv_usec
- b
.tv_usec
));
1476 unsigned long thread_consumed_time(RUSAGE_T
*now
, RUSAGE_T
*start
,
1477 unsigned long *cputime
)
1479 /* This is 'user + sys' time. */
1480 *cputime
= timeval_elapsed(now
->cpu
.ru_utime
, start
->cpu
.ru_utime
)
1481 + timeval_elapsed(now
->cpu
.ru_stime
, start
->cpu
.ru_stime
);
1482 return timeval_elapsed(now
->real
, start
->real
);
1485 /* We should aim to yield after yield milliseconds, which defaults
1486 to THREAD_YIELD_TIME_SLOT .
1487 Note: we are using real (wall clock) time for this calculation.
1488 It could be argued that CPU time may make more sense in certain
1489 contexts. The things to consider are whether the thread may have
1490 blocked (in which case wall time increases, but CPU time does not),
1491 or whether the system is heavily loaded with other processes competing
1492 for CPU time. On balance, wall clock time seems to make sense.
1493 Plus it has the added benefit that gettimeofday should be faster
1494 than calling getrusage. */
1495 int thread_should_yield(struct thread
*thread
)
1498 frr_with_mutex(&thread
->mtx
) {
1499 result
= monotime_since(&thread
->real
, NULL
)
1500 > (int64_t)thread
->yield
;
1505 void thread_set_yield_time(struct thread
*thread
, unsigned long yield_time
)
1507 frr_with_mutex(&thread
->mtx
) {
1508 thread
->yield
= yield_time
;
1512 void thread_getrusage(RUSAGE_T
*r
)
1514 #if defined RUSAGE_THREAD
1515 #define FRR_RUSAGE RUSAGE_THREAD
1517 #define FRR_RUSAGE RUSAGE_SELF
1520 #ifndef EXCLUDE_CPU_TIME
1521 getrusage(FRR_RUSAGE
, &(r
->cpu
));
1528 * This function will atomically update the thread's usage history. At present
1529 * this is the only spot where usage history is written. Nevertheless the code
1530 * has been written such that the introduction of writers in the future should
1531 * not need to update it provided the writers atomically perform only the
1532 * operations done here, i.e. updating the total and maximum times. In
1533 * particular, the maximum real and cpu times must be monotonically increasing
1534 * or this code is not correct.
1536 void thread_call(struct thread
*thread
)
1538 #ifndef EXCLUDE_CPU_TIME
1539 _Atomic
unsigned long realtime
, cputime
;
1541 unsigned long helper
;
1543 RUSAGE_T before
, after
;
1546 thread
->real
= before
.real
;
1548 pthread_setspecific(thread_current
, thread
);
1549 (*thread
->func
)(thread
);
1550 pthread_setspecific(thread_current
, NULL
);
1554 #ifndef EXCLUDE_CPU_TIME
1555 realtime
= thread_consumed_time(&after
, &before
, &helper
);
1558 /* update realtime */
1559 atomic_fetch_add_explicit(&thread
->hist
->real
.total
, realtime
,
1560 memory_order_seq_cst
);
1561 exp
= atomic_load_explicit(&thread
->hist
->real
.max
,
1562 memory_order_seq_cst
);
1563 while (exp
< realtime
1564 && !atomic_compare_exchange_weak_explicit(
1565 &thread
->hist
->real
.max
, &exp
, realtime
,
1566 memory_order_seq_cst
, memory_order_seq_cst
))
1569 /* update cputime */
1570 atomic_fetch_add_explicit(&thread
->hist
->cpu
.total
, cputime
,
1571 memory_order_seq_cst
);
1572 exp
= atomic_load_explicit(&thread
->hist
->cpu
.max
,
1573 memory_order_seq_cst
);
1574 while (exp
< cputime
1575 && !atomic_compare_exchange_weak_explicit(
1576 &thread
->hist
->cpu
.max
, &exp
, cputime
,
1577 memory_order_seq_cst
, memory_order_seq_cst
))
1580 atomic_fetch_add_explicit(&thread
->hist
->total_calls
, 1,
1581 memory_order_seq_cst
);
1582 atomic_fetch_or_explicit(&thread
->hist
->types
, 1 << thread
->add_type
,
1583 memory_order_seq_cst
);
1585 #ifdef CONSUMED_TIME_CHECK
1586 if (realtime
> CONSUMED_TIME_CHECK
) {
1588 * We have a CPU Hog on our hands.
1589 * Whinge about it now, so we're aware this is yet another task
1594 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1595 thread
->funcname
, (unsigned long)thread
->func
,
1596 realtime
/ 1000, cputime
/ 1000);
1598 #endif /* CONSUMED_TIME_CHECK */
1599 #endif /* Exclude CPU Time */
1602 /* Execute thread */
1603 void funcname_thread_execute(struct thread_master
*m
,
1604 int (*func
)(struct thread
*), void *arg
, int val
,
1607 struct thread
*thread
;
1609 /* Get or allocate new thread to execute. */
1610 frr_with_mutex(&m
->mtx
) {
1611 thread
= thread_get(m
, THREAD_EVENT
, func
, arg
, debugargpass
);
1613 /* Set its event value. */
1614 frr_with_mutex(&thread
->mtx
) {
1615 thread
->add_type
= THREAD_EXECUTE
;
1616 thread
->u
.val
= val
;
1617 thread
->ref
= &thread
;
1621 /* Execute thread doing all accounting. */
1622 thread_call(thread
);
1624 /* Give back or free thread. */
1625 thread_add_unuse(m
, thread
);