1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
4 * This file is part of GNU Zebra.
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 #include <sys/resource.h>
35 #include "frratomic.h"
36 #include "lib_errors.h"
38 DEFINE_MTYPE_STATIC(LIB
, THREAD
, "Thread")
39 DEFINE_MTYPE_STATIC(LIB
, THREAD_MASTER
, "Thread master")
40 DEFINE_MTYPE_STATIC(LIB
, THREAD_POLL
, "Thread Poll Info")
41 DEFINE_MTYPE_STATIC(LIB
, THREAD_STATS
, "Thread stats")
43 DECLARE_LIST(thread_list
, struct thread
, threaditem
)
45 #if defined(__APPLE__)
46 #include <mach/mach.h>
47 #include <mach/mach_time.h>
52 static unsigned char wakebyte = 0x01; \
53 write(m->io_pipe[1], &wakebyte, 1); \
56 /* control variable for initializer */
57 static pthread_once_t init_once
= PTHREAD_ONCE_INIT
;
58 pthread_key_t thread_current
;
60 static pthread_mutex_t masters_mtx
= PTHREAD_MUTEX_INITIALIZER
;
61 static struct list
*masters
;
63 static void thread_free(struct thread_master
*master
, struct thread
*thread
);
65 /* CLI start ---------------------------------------------------------------- */
66 static unsigned int cpu_record_hash_key(struct cpu_thread_history
*a
)
68 int size
= sizeof(a
->func
);
70 return jhash(&a
->func
, size
, 0);
73 static bool cpu_record_hash_cmp(const struct cpu_thread_history
*a
,
74 const struct cpu_thread_history
*b
)
76 return a
->func
== b
->func
;
79 static void *cpu_record_hash_alloc(struct cpu_thread_history
*a
)
81 struct cpu_thread_history
*new;
82 new = XCALLOC(MTYPE_THREAD_STATS
, sizeof(struct cpu_thread_history
));
84 new->funcname
= a
->funcname
;
88 static void cpu_record_hash_free(void *a
)
90 struct cpu_thread_history
*hist
= a
;
92 XFREE(MTYPE_THREAD_STATS
, hist
);
95 static void vty_out_cpu_thread_history(struct vty
*vty
,
96 struct cpu_thread_history
*a
)
98 vty_out(vty
, "%5zu %10zu.%03lu %9zu %8zu %9zu %8lu %9lu",
99 a
->total_active
, a
->cpu
.total
/ 1000, a
->cpu
.total
% 1000,
100 a
->total_calls
, a
->cpu
.total
/ a
->total_calls
, a
->cpu
.max
,
101 a
->real
.total
/ a
->total_calls
, a
->real
.max
);
102 vty_out(vty
, " %c%c%c%c%c %s\n",
103 a
->types
& (1 << THREAD_READ
) ? 'R' : ' ',
104 a
->types
& (1 << THREAD_WRITE
) ? 'W' : ' ',
105 a
->types
& (1 << THREAD_TIMER
) ? 'T' : ' ',
106 a
->types
& (1 << THREAD_EVENT
) ? 'E' : ' ',
107 a
->types
& (1 << THREAD_EXECUTE
) ? 'X' : ' ', a
->funcname
);
110 static void cpu_record_hash_print(struct hash_bucket
*bucket
, void *args
[])
112 struct cpu_thread_history
*totals
= args
[0];
113 struct cpu_thread_history copy
;
114 struct vty
*vty
= args
[1];
115 uint8_t *filter
= args
[2];
117 struct cpu_thread_history
*a
= bucket
->data
;
120 atomic_load_explicit(&a
->total_active
, memory_order_seq_cst
);
122 atomic_load_explicit(&a
->total_calls
, memory_order_seq_cst
);
124 atomic_load_explicit(&a
->cpu
.total
, memory_order_seq_cst
);
125 copy
.cpu
.max
= atomic_load_explicit(&a
->cpu
.max
, memory_order_seq_cst
);
127 atomic_load_explicit(&a
->real
.total
, memory_order_seq_cst
);
129 atomic_load_explicit(&a
->real
.max
, memory_order_seq_cst
);
130 copy
.types
= atomic_load_explicit(&a
->types
, memory_order_seq_cst
);
131 copy
.funcname
= a
->funcname
;
133 if (!(copy
.types
& *filter
))
136 vty_out_cpu_thread_history(vty
, ©
);
137 totals
->total_active
+= copy
.total_active
;
138 totals
->total_calls
+= copy
.total_calls
;
139 totals
->real
.total
+= copy
.real
.total
;
140 if (totals
->real
.max
< copy
.real
.max
)
141 totals
->real
.max
= copy
.real
.max
;
142 totals
->cpu
.total
+= copy
.cpu
.total
;
143 if (totals
->cpu
.max
< copy
.cpu
.max
)
144 totals
->cpu
.max
= copy
.cpu
.max
;
147 static void cpu_record_print(struct vty
*vty
, uint8_t filter
)
149 struct cpu_thread_history tmp
;
150 void *args
[3] = {&tmp
, vty
, &filter
};
151 struct thread_master
*m
;
154 memset(&tmp
, 0, sizeof tmp
);
155 tmp
.funcname
= "TOTAL";
158 pthread_mutex_lock(&masters_mtx
);
160 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
161 const char *name
= m
->name
? m
->name
: "main";
163 char underline
[strlen(name
) + 1];
164 memset(underline
, '-', sizeof(underline
));
165 underline
[sizeof(underline
) - 1] = '\0';
168 vty_out(vty
, "Showing statistics for pthread %s\n",
170 vty_out(vty
, "-------------------------------%s\n",
172 vty_out(vty
, "%21s %18s %18s\n", "",
173 "CPU (user+system):", "Real (wall-clock):");
175 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
176 vty_out(vty
, " Avg uSec Max uSecs");
177 vty_out(vty
, " Type Thread\n");
179 if (m
->cpu_record
->count
)
182 (void (*)(struct hash_bucket
*,
183 void *))cpu_record_hash_print
,
186 vty_out(vty
, "No data to display yet.\n");
191 pthread_mutex_unlock(&masters_mtx
);
194 vty_out(vty
, "Total thread statistics\n");
195 vty_out(vty
, "-------------------------\n");
196 vty_out(vty
, "%21s %18s %18s\n", "",
197 "CPU (user+system):", "Real (wall-clock):");
198 vty_out(vty
, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
199 vty_out(vty
, " Avg uSec Max uSecs");
200 vty_out(vty
, " Type Thread\n");
202 if (tmp
.total_calls
> 0)
203 vty_out_cpu_thread_history(vty
, &tmp
);
206 static void cpu_record_hash_clear(struct hash_bucket
*bucket
, void *args
[])
208 uint8_t *filter
= args
[0];
209 struct hash
*cpu_record
= args
[1];
211 struct cpu_thread_history
*a
= bucket
->data
;
213 if (!(a
->types
& *filter
))
216 hash_release(cpu_record
, bucket
->data
);
219 static void cpu_record_clear(uint8_t filter
)
221 uint8_t *tmp
= &filter
;
222 struct thread_master
*m
;
225 pthread_mutex_lock(&masters_mtx
);
227 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
228 pthread_mutex_lock(&m
->mtx
);
230 void *args
[2] = {tmp
, m
->cpu_record
};
233 (void (*)(struct hash_bucket
*,
234 void *))cpu_record_hash_clear
,
237 pthread_mutex_unlock(&m
->mtx
);
240 pthread_mutex_unlock(&masters_mtx
);
243 static uint8_t parse_filter(const char *filterstr
)
248 while (filterstr
[i
] != '\0') {
249 switch (filterstr
[i
]) {
252 filter
|= (1 << THREAD_READ
);
256 filter
|= (1 << THREAD_WRITE
);
260 filter
|= (1 << THREAD_TIMER
);
264 filter
|= (1 << THREAD_EVENT
);
268 filter
|= (1 << THREAD_EXECUTE
);
278 DEFUN (show_thread_cpu
,
280 "show thread cpu [FILTER]",
282 "Thread information\n"
284 "Display filter (rwtexb)\n")
286 uint8_t filter
= (uint8_t)-1U;
289 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
290 filter
= parse_filter(argv
[idx
]->arg
);
293 "Invalid filter \"%s\" specified; must contain at least"
300 cpu_record_print(vty
, filter
);
304 static void show_thread_poll_helper(struct vty
*vty
, struct thread_master
*m
)
306 const char *name
= m
->name
? m
->name
: "main";
307 char underline
[strlen(name
) + 1];
310 memset(underline
, '-', sizeof(underline
));
311 underline
[sizeof(underline
) - 1] = '\0';
313 vty_out(vty
, "\nShowing poll FD's for %s\n", name
);
314 vty_out(vty
, "----------------------%s\n", underline
);
315 vty_out(vty
, "Count: %u\n", (uint32_t)m
->handler
.pfdcount
);
316 for (i
= 0; i
< m
->handler
.pfdcount
; i
++)
317 vty_out(vty
, "\t%6d fd:%6d events:%2d revents:%2d\n", i
,
318 m
->handler
.pfds
[i
].fd
,
319 m
->handler
.pfds
[i
].events
,
320 m
->handler
.pfds
[i
].revents
);
323 DEFUN (show_thread_poll
,
324 show_thread_poll_cmd
,
327 "Thread information\n"
328 "Show poll FD's and information\n")
330 struct listnode
*node
;
331 struct thread_master
*m
;
333 pthread_mutex_lock(&masters_mtx
);
335 for (ALL_LIST_ELEMENTS_RO(masters
, node
, m
)) {
336 show_thread_poll_helper(vty
, m
);
339 pthread_mutex_unlock(&masters_mtx
);
345 DEFUN (clear_thread_cpu
,
346 clear_thread_cpu_cmd
,
347 "clear thread cpu [FILTER]",
348 "Clear stored data in all pthreads\n"
349 "Thread information\n"
351 "Display filter (rwtexb)\n")
353 uint8_t filter
= (uint8_t)-1U;
356 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
357 filter
= parse_filter(argv
[idx
]->arg
);
360 "Invalid filter \"%s\" specified; must contain at least"
367 cpu_record_clear(filter
);
371 void thread_cmd_init(void)
373 install_element(VIEW_NODE
, &show_thread_cpu_cmd
);
374 install_element(VIEW_NODE
, &show_thread_poll_cmd
);
375 install_element(ENABLE_NODE
, &clear_thread_cpu_cmd
);
377 /* CLI end ------------------------------------------------------------------ */
380 static int thread_timer_cmp(void *a
, void *b
)
382 struct thread
*thread_a
= a
;
383 struct thread
*thread_b
= b
;
385 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, <))
387 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, >))
392 static void thread_timer_update(void *node
, int actual_position
)
394 struct thread
*thread
= node
;
396 thread
->index
= actual_position
;
399 static void cancelreq_del(void *cr
)
401 XFREE(MTYPE_TMP
, cr
);
404 /* initializer, only ever called once */
405 static void initializer(void)
407 pthread_key_create(&thread_current
, NULL
);
410 struct thread_master
*thread_master_create(const char *name
)
412 struct thread_master
*rv
;
415 pthread_once(&init_once
, &initializer
);
417 rv
= XCALLOC(MTYPE_THREAD_MASTER
, sizeof(struct thread_master
));
419 /* Initialize master mutex */
420 pthread_mutex_init(&rv
->mtx
, NULL
);
421 pthread_cond_init(&rv
->cancel_cond
, NULL
);
424 rv
->name
= name
? XSTRDUP(MTYPE_THREAD_MASTER
, name
) : NULL
;
426 /* Initialize I/O task data structures */
427 getrlimit(RLIMIT_NOFILE
, &limit
);
428 rv
->fd_limit
= (int)limit
.rlim_cur
;
429 rv
->read
= XCALLOC(MTYPE_THREAD_POLL
,
430 sizeof(struct thread
*) * rv
->fd_limit
);
432 rv
->write
= XCALLOC(MTYPE_THREAD_POLL
,
433 sizeof(struct thread
*) * rv
->fd_limit
);
435 rv
->cpu_record
= hash_create_size(
436 8, (unsigned int (*)(void *))cpu_record_hash_key
,
437 (bool (*)(const void *, const void *))cpu_record_hash_cmp
,
440 thread_list_init(&rv
->event
);
441 thread_list_init(&rv
->ready
);
442 thread_list_init(&rv
->unuse
);
444 /* Initialize the timer queues */
445 rv
->timer
= pqueue_create();
446 rv
->timer
->cmp
= thread_timer_cmp
;
447 rv
->timer
->update
= thread_timer_update
;
449 /* Initialize thread_fetch() settings */
451 rv
->handle_signals
= true;
453 /* Set pthread owner, should be updated by actual owner */
454 rv
->owner
= pthread_self();
455 rv
->cancel_req
= list_new();
456 rv
->cancel_req
->del
= cancelreq_del
;
459 /* Initialize pipe poker */
461 set_nonblocking(rv
->io_pipe
[0]);
462 set_nonblocking(rv
->io_pipe
[1]);
464 /* Initialize data structures for poll() */
465 rv
->handler
.pfdsize
= rv
->fd_limit
;
466 rv
->handler
.pfdcount
= 0;
467 rv
->handler
.pfds
= XCALLOC(MTYPE_THREAD_MASTER
,
468 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
469 rv
->handler
.copy
= XCALLOC(MTYPE_THREAD_MASTER
,
470 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
472 /* add to list of threadmasters */
473 pthread_mutex_lock(&masters_mtx
);
476 masters
= list_new();
478 listnode_add(masters
, rv
);
480 pthread_mutex_unlock(&masters_mtx
);
485 void thread_master_set_name(struct thread_master
*master
, const char *name
)
487 pthread_mutex_lock(&master
->mtx
);
489 XFREE(MTYPE_THREAD_MASTER
, master
->name
);
490 master
->name
= XSTRDUP(MTYPE_THREAD_MASTER
, name
);
492 pthread_mutex_unlock(&master
->mtx
);
495 #define THREAD_UNUSED_DEPTH 10
497 /* Move thread to unuse list. */
498 static void thread_add_unuse(struct thread_master
*m
, struct thread
*thread
)
500 pthread_mutex_t mtxc
= thread
->mtx
;
502 assert(m
!= NULL
&& thread
!= NULL
);
504 thread
->hist
->total_active
--;
505 memset(thread
, 0, sizeof(struct thread
));
506 thread
->type
= THREAD_UNUSED
;
508 /* Restore the thread mutex context. */
511 if (thread_list_count(&m
->unuse
) < THREAD_UNUSED_DEPTH
) {
512 thread_list_add_tail(&m
->unuse
, thread
);
516 thread_free(m
, thread
);
519 /* Free all unused thread. */
520 static void thread_list_free(struct thread_master
*m
,
521 struct thread_list_head
*list
)
525 while ((t
= thread_list_pop(list
)))
529 static void thread_array_free(struct thread_master
*m
,
530 struct thread
**thread_array
)
535 for (index
= 0; index
< m
->fd_limit
; ++index
) {
536 t
= thread_array
[index
];
538 thread_array
[index
] = NULL
;
542 XFREE(MTYPE_THREAD_POLL
, thread_array
);
545 static void thread_queue_free(struct thread_master
*m
, struct pqueue
*queue
)
549 for (i
= 0; i
< queue
->size
; i
++)
550 thread_free(m
, queue
->array
[i
]);
552 pqueue_delete(queue
);
556 * thread_master_free_unused
558 * As threads are finished with they are put on the
559 * unuse list for later reuse.
560 * If we are shutting down, Free up unused threads
561 * So we can see if we forget to shut anything off
563 void thread_master_free_unused(struct thread_master
*m
)
565 pthread_mutex_lock(&m
->mtx
);
568 while ((t
= thread_list_pop(&m
->unuse
)))
571 pthread_mutex_unlock(&m
->mtx
);
574 /* Stop thread scheduler. */
575 void thread_master_free(struct thread_master
*m
)
577 pthread_mutex_lock(&masters_mtx
);
579 listnode_delete(masters
, m
);
580 if (masters
->count
== 0) {
581 list_delete(&masters
);
584 pthread_mutex_unlock(&masters_mtx
);
586 thread_array_free(m
, m
->read
);
587 thread_array_free(m
, m
->write
);
588 thread_queue_free(m
, m
->timer
);
589 thread_list_free(m
, &m
->event
);
590 thread_list_free(m
, &m
->ready
);
591 thread_list_free(m
, &m
->unuse
);
592 pthread_mutex_destroy(&m
->mtx
);
593 pthread_cond_destroy(&m
->cancel_cond
);
594 close(m
->io_pipe
[0]);
595 close(m
->io_pipe
[1]);
596 list_delete(&m
->cancel_req
);
597 m
->cancel_req
= NULL
;
599 hash_clean(m
->cpu_record
, cpu_record_hash_free
);
600 hash_free(m
->cpu_record
);
601 m
->cpu_record
= NULL
;
603 XFREE(MTYPE_THREAD_MASTER
, m
->name
);
604 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.pfds
);
605 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.copy
);
606 XFREE(MTYPE_THREAD_MASTER
, m
);
609 /* Return remain time in miliseconds. */
610 unsigned long thread_timer_remain_msec(struct thread
*thread
)
614 pthread_mutex_lock(&thread
->mtx
);
616 remain
= monotime_until(&thread
->u
.sands
, NULL
) / 1000LL;
618 pthread_mutex_unlock(&thread
->mtx
);
620 return remain
< 0 ? 0 : remain
;
623 /* Return remain time in seconds. */
624 unsigned long thread_timer_remain_second(struct thread
*thread
)
626 return thread_timer_remain_msec(thread
) / 1000LL;
629 #define debugargdef const char *funcname, const char *schedfrom, int fromln
630 #define debugargpass funcname, schedfrom, fromln
632 struct timeval
thread_timer_remain(struct thread
*thread
)
634 struct timeval remain
;
635 pthread_mutex_lock(&thread
->mtx
);
637 monotime_until(&thread
->u
.sands
, &remain
);
639 pthread_mutex_unlock(&thread
->mtx
);
643 /* Get new thread. */
644 static struct thread
*thread_get(struct thread_master
*m
, uint8_t type
,
645 int (*func
)(struct thread
*), void *arg
,
648 struct thread
*thread
= thread_list_pop(&m
->unuse
);
649 struct cpu_thread_history tmp
;
652 thread
= XCALLOC(MTYPE_THREAD
, sizeof(struct thread
));
653 /* mutex only needs to be initialized at struct creation. */
654 pthread_mutex_init(&thread
->mtx
, NULL
);
659 thread
->add_type
= type
;
663 thread
->yield
= THREAD_YIELD_TIME_SLOT
; /* default */
667 * So if the passed in funcname is not what we have
668 * stored that means the thread->hist needs to be
669 * updated. We keep the last one around in unused
670 * under the assumption that we are probably
671 * going to immediately allocate the same
673 * This hopefully saves us some serious
676 if (thread
->funcname
!= funcname
|| thread
->func
!= func
) {
678 tmp
.funcname
= funcname
;
680 hash_get(m
->cpu_record
, &tmp
,
681 (void *(*)(void *))cpu_record_hash_alloc
);
683 thread
->hist
->total_active
++;
685 thread
->funcname
= funcname
;
686 thread
->schedfrom
= schedfrom
;
687 thread
->schedfrom_line
= fromln
;
692 static void thread_free(struct thread_master
*master
, struct thread
*thread
)
694 /* Update statistics. */
695 assert(master
->alloc
> 0);
698 /* Free allocated resources. */
699 pthread_mutex_destroy(&thread
->mtx
);
700 XFREE(MTYPE_THREAD
, thread
);
703 static int fd_poll(struct thread_master
*m
, struct pollfd
*pfds
, nfds_t pfdsize
,
704 nfds_t count
, const struct timeval
*timer_wait
)
706 /* If timer_wait is null here, that means poll() should block
708 * unless the thread_master has overriden it by setting
709 * ->selectpoll_timeout.
710 * If the value is positive, it specifies the maximum number of
712 * to wait. If the timeout is -1, it specifies that we should never wait
714 * always return immediately even if no event is detected. If the value
716 * zero, the behavior is default. */
719 /* number of file descriptors with events */
722 if (timer_wait
!= NULL
723 && m
->selectpoll_timeout
== 0) // use the default value
724 timeout
= (timer_wait
->tv_sec
* 1000)
725 + (timer_wait
->tv_usec
/ 1000);
726 else if (m
->selectpoll_timeout
> 0) // use the user's timeout
727 timeout
= m
->selectpoll_timeout
;
728 else if (m
->selectpoll_timeout
729 < 0) // effect a poll (return immediately)
732 /* add poll pipe poker */
733 assert(count
+ 1 < pfdsize
);
734 pfds
[count
].fd
= m
->io_pipe
[0];
735 pfds
[count
].events
= POLLIN
;
736 pfds
[count
].revents
= 0x00;
738 num
= poll(pfds
, count
+ 1, timeout
);
740 unsigned char trash
[64];
741 if (num
> 0 && pfds
[count
].revents
!= 0 && num
--)
742 while (read(m
->io_pipe
[0], &trash
, sizeof(trash
)) > 0)
748 /* Add new read thread. */
749 struct thread
*funcname_thread_add_read_write(int dir
, struct thread_master
*m
,
750 int (*func
)(struct thread
*),
752 struct thread
**t_ptr
,
755 struct thread
*thread
= NULL
;
757 assert(fd
>= 0 && fd
< m
->fd_limit
);
758 pthread_mutex_lock(&m
->mtx
);
761 && *t_ptr
) // thread is already scheduled; don't reschedule
763 pthread_mutex_unlock(&m
->mtx
);
767 /* default to a new pollfd */
768 nfds_t queuepos
= m
->handler
.pfdcount
;
770 /* if we already have a pollfd for our file descriptor, find and
772 for (nfds_t i
= 0; i
< m
->handler
.pfdcount
; i
++)
773 if (m
->handler
.pfds
[i
].fd
== fd
) {
778 /* make sure we have room for this fd + pipe poker fd */
779 assert(queuepos
+ 1 < m
->handler
.pfdsize
);
781 thread
= thread_get(m
, dir
, func
, arg
, debugargpass
);
783 m
->handler
.pfds
[queuepos
].fd
= fd
;
784 m
->handler
.pfds
[queuepos
].events
|=
785 (dir
== THREAD_READ
? POLLIN
: POLLOUT
);
787 if (queuepos
== m
->handler
.pfdcount
)
788 m
->handler
.pfdcount
++;
791 pthread_mutex_lock(&thread
->mtx
);
794 if (dir
== THREAD_READ
)
795 m
->read
[thread
->u
.fd
] = thread
;
797 m
->write
[thread
->u
.fd
] = thread
;
799 pthread_mutex_unlock(&thread
->mtx
);
809 pthread_mutex_unlock(&m
->mtx
);
814 static struct thread
*
815 funcname_thread_add_timer_timeval(struct thread_master
*m
,
816 int (*func
)(struct thread
*), int type
,
817 void *arg
, struct timeval
*time_relative
,
818 struct thread
**t_ptr
, debugargdef
)
820 struct thread
*thread
;
821 struct pqueue
*queue
;
825 assert(type
== THREAD_TIMER
);
826 assert(time_relative
);
828 pthread_mutex_lock(&m
->mtx
);
831 && *t_ptr
) // thread is already scheduled; don't reschedule
833 pthread_mutex_unlock(&m
->mtx
);
838 thread
= thread_get(m
, type
, func
, arg
, debugargpass
);
840 pthread_mutex_lock(&thread
->mtx
);
842 monotime(&thread
->u
.sands
);
843 timeradd(&thread
->u
.sands
, time_relative
,
845 pqueue_enqueue(thread
, queue
);
851 pthread_mutex_unlock(&thread
->mtx
);
855 pthread_mutex_unlock(&m
->mtx
);
861 /* Add timer event thread. */
862 struct thread
*funcname_thread_add_timer(struct thread_master
*m
,
863 int (*func
)(struct thread
*),
864 void *arg
, long timer
,
865 struct thread
**t_ptr
, debugargdef
)
874 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
875 &trel
, t_ptr
, debugargpass
);
878 /* Add timer event thread with "millisecond" resolution */
879 struct thread
*funcname_thread_add_timer_msec(struct thread_master
*m
,
880 int (*func
)(struct thread
*),
881 void *arg
, long timer
,
882 struct thread
**t_ptr
,
889 trel
.tv_sec
= timer
/ 1000;
890 trel
.tv_usec
= 1000 * (timer
% 1000);
892 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
893 &trel
, t_ptr
, debugargpass
);
896 /* Add timer event thread with "millisecond" resolution */
897 struct thread
*funcname_thread_add_timer_tv(struct thread_master
*m
,
898 int (*func
)(struct thread
*),
899 void *arg
, struct timeval
*tv
,
900 struct thread
**t_ptr
, debugargdef
)
902 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
, tv
,
903 t_ptr
, debugargpass
);
906 /* Add simple event thread. */
907 struct thread
*funcname_thread_add_event(struct thread_master
*m
,
908 int (*func
)(struct thread
*),
910 struct thread
**t_ptr
, debugargdef
)
912 struct thread
*thread
;
916 pthread_mutex_lock(&m
->mtx
);
919 && *t_ptr
) // thread is already scheduled; don't reschedule
921 pthread_mutex_unlock(&m
->mtx
);
925 thread
= thread_get(m
, THREAD_EVENT
, func
, arg
, debugargpass
);
926 pthread_mutex_lock(&thread
->mtx
);
929 thread_list_add_tail(&m
->event
, thread
);
931 pthread_mutex_unlock(&thread
->mtx
);
940 pthread_mutex_unlock(&m
->mtx
);
945 /* Thread cancellation ------------------------------------------------------ */
948 * NOT's out the .events field of pollfd corresponding to the given file
949 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
951 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
952 * implementation for details.
956 * @param state the event to cancel. One or more (OR'd together) of the
961 static void thread_cancel_rw(struct thread_master
*master
, int fd
, short state
)
965 /* Cancel POLLHUP too just in case some bozo set it */
968 /* find the index of corresponding pollfd */
971 for (i
= 0; i
< master
->handler
.pfdcount
; i
++)
972 if (master
->handler
.pfds
[i
].fd
== fd
) {
979 "[!] Received cancellation request for nonexistent rw job");
980 zlog_debug("[!] threadmaster: %s | fd: %d",
981 master
->name
? master
->name
: "", fd
);
986 master
->handler
.pfds
[i
].events
&= ~(state
);
988 /* If all events are canceled, delete / resize the pollfd array. */
989 if (master
->handler
.pfds
[i
].events
== 0) {
990 memmove(master
->handler
.pfds
+ i
, master
->handler
.pfds
+ i
+ 1,
991 (master
->handler
.pfdcount
- i
- 1)
992 * sizeof(struct pollfd
));
993 master
->handler
.pfdcount
--;
996 /* If we have the same pollfd in the copy, perform the same operations,
997 * otherwise return. */
998 if (i
>= master
->handler
.copycount
)
1001 master
->handler
.copy
[i
].events
&= ~(state
);
1003 if (master
->handler
.copy
[i
].events
== 0) {
1004 memmove(master
->handler
.copy
+ i
, master
->handler
.copy
+ i
+ 1,
1005 (master
->handler
.copycount
- i
- 1)
1006 * sizeof(struct pollfd
));
1007 master
->handler
.copycount
--;
1012 * Process cancellation requests.
1014 * This may only be run from the pthread which owns the thread_master.
1016 * @param master the thread master to process
1017 * @REQUIRE master->mtx
1019 static void do_thread_cancel(struct thread_master
*master
)
1021 struct thread_list_head
*list
= NULL
;
1022 struct pqueue
*queue
= NULL
;
1023 struct thread
**thread_array
= NULL
;
1024 struct thread
*thread
;
1026 struct cancel_req
*cr
;
1027 struct listnode
*ln
;
1028 for (ALL_LIST_ELEMENTS_RO(master
->cancel_req
, ln
, cr
)) {
1029 /* If this is an event object cancellation, linear search
1031 * list deleting any events which have the specified argument.
1033 * need to check every thread in the ready queue. */
1037 for_each_safe(thread_list
, &master
->event
, t
) {
1038 if (t
->arg
!= cr
->eventobj
)
1040 thread_list_del(&master
->event
, t
);
1043 thread_add_unuse(master
, t
);
1046 for_each_safe(thread_list
, &master
->ready
, t
) {
1047 if (t
->arg
!= cr
->eventobj
)
1049 thread_list_del(&master
->ready
, t
);
1052 thread_add_unuse(master
, t
);
1057 /* The pointer varies depending on whether the cancellation
1059 * made asynchronously or not. If it was, we need to check
1061 * thread even exists anymore before cancelling it. */
1062 thread
= (cr
->thread
) ? cr
->thread
: *cr
->threadref
;
1067 /* Determine the appropriate queue to cancel the thread from */
1068 switch (thread
->type
) {
1070 thread_cancel_rw(master
, thread
->u
.fd
, POLLIN
);
1071 thread_array
= master
->read
;
1074 thread_cancel_rw(master
, thread
->u
.fd
, POLLOUT
);
1075 thread_array
= master
->write
;
1078 queue
= master
->timer
;
1081 list
= &master
->event
;
1084 list
= &master
->ready
;
1092 assert(thread
->index
>= 0);
1093 assert(thread
== queue
->array
[thread
->index
]);
1094 pqueue_remove_at(thread
->index
, queue
);
1096 thread_list_del(list
, thread
);
1097 } else if (thread_array
) {
1098 thread_array
[thread
->u
.fd
] = NULL
;
1100 assert(!"Thread should be either in queue or list or array!");
1104 *thread
->ref
= NULL
;
1106 thread_add_unuse(thread
->master
, thread
);
1109 /* Delete and free all cancellation requests */
1110 list_delete_all_node(master
->cancel_req
);
1112 /* Wake up any threads which may be blocked in thread_cancel_async() */
1113 master
->canceled
= true;
1114 pthread_cond_broadcast(&master
->cancel_cond
);
1118 * Cancel any events which have the specified argument.
1122 * @param m the thread_master to cancel from
1123 * @param arg the argument passed when creating the event
1125 void thread_cancel_event(struct thread_master
*master
, void *arg
)
1127 assert(master
->owner
== pthread_self());
1129 pthread_mutex_lock(&master
->mtx
);
1131 struct cancel_req
*cr
=
1132 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1134 listnode_add(master
->cancel_req
, cr
);
1135 do_thread_cancel(master
);
1137 pthread_mutex_unlock(&master
->mtx
);
1141 * Cancel a specific task.
1145 * @param thread task to cancel
1147 void thread_cancel(struct thread
*thread
)
1149 struct thread_master
*master
= thread
->master
;
1151 assert(master
->owner
== pthread_self());
1153 pthread_mutex_lock(&master
->mtx
);
1155 struct cancel_req
*cr
=
1156 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1157 cr
->thread
= thread
;
1158 listnode_add(master
->cancel_req
, cr
);
1159 do_thread_cancel(master
);
1161 pthread_mutex_unlock(&master
->mtx
);
1165 * Asynchronous cancellation.
1167 * Called with either a struct thread ** or void * to an event argument,
1168 * this function posts the correct cancellation request and blocks until it is
1171 * If the thread is currently running, execution blocks until it completes.
1173 * The last two parameters are mutually exclusive, i.e. if you pass one the
1174 * other must be NULL.
1176 * When the cancellation procedure executes on the target thread_master, the
1177 * thread * provided is checked for nullity. If it is null, the thread is
1178 * assumed to no longer exist and the cancellation request is a no-op. Thus
1179 * users of this API must pass a back-reference when scheduling the original
1184 * @param master the thread master with the relevant event / task
1185 * @param thread pointer to thread to cancel
1186 * @param eventobj the event
1188 void thread_cancel_async(struct thread_master
*master
, struct thread
**thread
,
1191 assert(!(thread
&& eventobj
) && (thread
|| eventobj
));
1192 assert(master
->owner
!= pthread_self());
1194 pthread_mutex_lock(&master
->mtx
);
1196 master
->canceled
= false;
1199 struct cancel_req
*cr
=
1200 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1201 cr
->threadref
= thread
;
1202 listnode_add(master
->cancel_req
, cr
);
1203 } else if (eventobj
) {
1204 struct cancel_req
*cr
=
1205 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1206 cr
->eventobj
= eventobj
;
1207 listnode_add(master
->cancel_req
, cr
);
1211 while (!master
->canceled
)
1212 pthread_cond_wait(&master
->cancel_cond
, &master
->mtx
);
1214 pthread_mutex_unlock(&master
->mtx
);
1216 /* ------------------------------------------------------------------------- */
1218 static struct timeval
*thread_timer_wait(struct pqueue
*queue
,
1219 struct timeval
*timer_val
)
1222 struct thread
*next_timer
= queue
->array
[0];
1223 monotime_until(&next_timer
->u
.sands
, timer_val
);
1229 static struct thread
*thread_run(struct thread_master
*m
, struct thread
*thread
,
1230 struct thread
*fetch
)
1233 thread_add_unuse(m
, thread
);
1237 static int thread_process_io_helper(struct thread_master
*m
,
1238 struct thread
*thread
, short state
, int pos
)
1240 struct thread
**thread_array
;
1245 if (thread
->type
== THREAD_READ
)
1246 thread_array
= m
->read
;
1248 thread_array
= m
->write
;
1250 thread_array
[thread
->u
.fd
] = NULL
;
1251 thread_list_add_tail(&m
->ready
, thread
);
1252 thread
->type
= THREAD_READY
;
1253 /* if another pthread scheduled this file descriptor for the event we're
1254 * responding to, no problem; we're getting to it now */
1255 thread
->master
->handler
.pfds
[pos
].events
&= ~(state
);
1260 * Process I/O events.
1262 * Walks through file descriptor array looking for those pollfds whose .revents
1263 * field has something interesting. Deletes any invalid file descriptors.
1265 * @param m the thread master
1266 * @param num the number of active file descriptors (return value of poll())
1268 static void thread_process_io(struct thread_master
*m
, unsigned int num
)
1270 unsigned int ready
= 0;
1271 struct pollfd
*pfds
= m
->handler
.copy
;
1273 for (nfds_t i
= 0; i
< m
->handler
.copycount
&& ready
< num
; ++i
) {
1274 /* no event for current fd? immediately continue */
1275 if (pfds
[i
].revents
== 0)
1280 /* Unless someone has called thread_cancel from another pthread,
1282 * thing that could have changed in m->handler.pfds while we
1284 * asleep is the .events field in a given pollfd. Barring
1286 * that value should be a superset of the values we have in our
1288 * there's no need to update it. Similarily, barring deletion,
1290 * should still be a valid index into the master's pfds. */
1291 if (pfds
[i
].revents
& (POLLIN
| POLLHUP
))
1292 thread_process_io_helper(m
, m
->read
[pfds
[i
].fd
], POLLIN
,
1294 if (pfds
[i
].revents
& POLLOUT
)
1295 thread_process_io_helper(m
, m
->write
[pfds
[i
].fd
],
1298 /* if one of our file descriptors is garbage, remove the same
1300 * both pfds + update sizes and index */
1301 if (pfds
[i
].revents
& POLLNVAL
) {
1302 memmove(m
->handler
.pfds
+ i
, m
->handler
.pfds
+ i
+ 1,
1303 (m
->handler
.pfdcount
- i
- 1)
1304 * sizeof(struct pollfd
));
1305 m
->handler
.pfdcount
--;
1307 memmove(pfds
+ i
, pfds
+ i
+ 1,
1308 (m
->handler
.copycount
- i
- 1)
1309 * sizeof(struct pollfd
));
1310 m
->handler
.copycount
--;
1317 /* Add all timers that have popped to the ready list. */
1318 static unsigned int thread_process_timers(struct pqueue
*queue
,
1319 struct timeval
*timenow
)
1321 struct thread
*thread
;
1322 unsigned int ready
= 0;
1324 while (queue
->size
) {
1325 thread
= queue
->array
[0];
1326 if (timercmp(timenow
, &thread
->u
.sands
, <))
1328 pqueue_dequeue(queue
);
1329 thread
->type
= THREAD_READY
;
1330 thread_list_add_tail(&thread
->master
->ready
, thread
);
1336 /* process a list en masse, e.g. for event thread lists */
1337 static unsigned int thread_process(struct thread_list_head
*list
)
1339 struct thread
*thread
;
1340 unsigned int ready
= 0;
1342 while ((thread
= thread_list_pop(list
))) {
1343 thread
->type
= THREAD_READY
;
1344 thread_list_add_tail(&thread
->master
->ready
, thread
);
1351 /* Fetch next ready thread. */
1352 struct thread
*thread_fetch(struct thread_master
*m
, struct thread
*fetch
)
1354 struct thread
*thread
= NULL
;
1356 struct timeval zerotime
= {0, 0};
1358 struct timeval
*tw
= NULL
;
1363 /* Handle signals if any */
1364 if (m
->handle_signals
)
1365 quagga_sigevent_process();
1367 pthread_mutex_lock(&m
->mtx
);
1369 /* Process any pending cancellation requests */
1370 do_thread_cancel(m
);
1373 * Attempt to flush ready queue before going into poll().
1374 * This is performance-critical. Think twice before modifying.
1376 if ((thread
= thread_list_pop(&m
->ready
))) {
1377 fetch
= thread_run(m
, thread
, fetch
);
1380 pthread_mutex_unlock(&m
->mtx
);
1384 /* otherwise, tick through scheduling sequence */
1387 * Post events to ready queue. This must come before the
1388 * following block since events should occur immediately
1390 thread_process(&m
->event
);
1393 * If there are no tasks on the ready queue, we will poll()
1394 * until a timer expires or we receive I/O, whichever comes
1395 * first. The strategy for doing this is:
1397 * - If there are events pending, set the poll() timeout to zero
1398 * - If there are no events pending, but there are timers
1400 * timeout to the smallest remaining time on any timer
1401 * - If there are neither timers nor events pending, but there
1403 * descriptors pending, block indefinitely in poll()
1404 * - If nothing is pending, it's time for the application to die
1406 * In every case except the last, we need to hit poll() at least
1407 * once per loop to avoid starvation by events
1409 if (!thread_list_count(&m
->ready
))
1410 tw
= thread_timer_wait(m
->timer
, &tv
);
1412 if (thread_list_count(&m
->ready
) ||
1413 (tw
&& !timercmp(tw
, &zerotime
, >)))
1416 if (!tw
&& m
->handler
.pfdcount
== 0) { /* die */
1417 pthread_mutex_unlock(&m
->mtx
);
1423 * Copy pollfd array + # active pollfds in it. Not necessary to
1424 * copy the array size as this is fixed.
1426 m
->handler
.copycount
= m
->handler
.pfdcount
;
1427 memcpy(m
->handler
.copy
, m
->handler
.pfds
,
1428 m
->handler
.copycount
* sizeof(struct pollfd
));
1430 pthread_mutex_unlock(&m
->mtx
);
1432 num
= fd_poll(m
, m
->handler
.copy
, m
->handler
.pfdsize
,
1433 m
->handler
.copycount
, tw
);
1435 pthread_mutex_lock(&m
->mtx
);
1437 /* Handle any errors received in poll() */
1439 if (errno
== EINTR
) {
1440 pthread_mutex_unlock(&m
->mtx
);
1441 /* loop around to signal handler */
1446 flog_err(EC_LIB_SYSTEM_CALL
, "poll() error: %s",
1447 safe_strerror(errno
));
1448 pthread_mutex_unlock(&m
->mtx
);
1453 /* Post timers to ready queue. */
1455 thread_process_timers(m
->timer
, &now
);
1457 /* Post I/O to ready queue. */
1459 thread_process_io(m
, num
);
1461 pthread_mutex_unlock(&m
->mtx
);
1463 } while (!thread
&& m
->spin
);
1468 static unsigned long timeval_elapsed(struct timeval a
, struct timeval b
)
1470 return (((a
.tv_sec
- b
.tv_sec
) * TIMER_SECOND_MICRO
)
1471 + (a
.tv_usec
- b
.tv_usec
));
1474 unsigned long thread_consumed_time(RUSAGE_T
*now
, RUSAGE_T
*start
,
1475 unsigned long *cputime
)
1477 /* This is 'user + sys' time. */
1478 *cputime
= timeval_elapsed(now
->cpu
.ru_utime
, start
->cpu
.ru_utime
)
1479 + timeval_elapsed(now
->cpu
.ru_stime
, start
->cpu
.ru_stime
);
1480 return timeval_elapsed(now
->real
, start
->real
);
1483 /* We should aim to yield after yield milliseconds, which defaults
1484 to THREAD_YIELD_TIME_SLOT .
1485 Note: we are using real (wall clock) time for this calculation.
1486 It could be argued that CPU time may make more sense in certain
1487 contexts. The things to consider are whether the thread may have
1488 blocked (in which case wall time increases, but CPU time does not),
1489 or whether the system is heavily loaded with other processes competing
1490 for CPU time. On balance, wall clock time seems to make sense.
1491 Plus it has the added benefit that gettimeofday should be faster
1492 than calling getrusage. */
1493 int thread_should_yield(struct thread
*thread
)
1496 pthread_mutex_lock(&thread
->mtx
);
1498 result
= monotime_since(&thread
->real
, NULL
)
1499 > (int64_t)thread
->yield
;
1501 pthread_mutex_unlock(&thread
->mtx
);
1505 void thread_set_yield_time(struct thread
*thread
, unsigned long yield_time
)
1507 pthread_mutex_lock(&thread
->mtx
);
1509 thread
->yield
= yield_time
;
1511 pthread_mutex_unlock(&thread
->mtx
);
1514 void thread_getrusage(RUSAGE_T
*r
)
1516 #if defined RUSAGE_THREAD
1517 #define FRR_RUSAGE RUSAGE_THREAD
1519 #define FRR_RUSAGE RUSAGE_SELF
1522 getrusage(FRR_RUSAGE
, &(r
->cpu
));
1528 * This function will atomically update the thread's usage history. At present
1529 * this is the only spot where usage history is written. Nevertheless the code
1530 * has been written such that the introduction of writers in the future should
1531 * not need to update it provided the writers atomically perform only the
1532 * operations done here, i.e. updating the total and maximum times. In
1533 * particular, the maximum real and cpu times must be monotonically increasing
1534 * or this code is not correct.
1536 void thread_call(struct thread
*thread
)
1538 _Atomic
unsigned long realtime
, cputime
;
1540 unsigned long helper
;
1541 RUSAGE_T before
, after
;
1544 thread
->real
= before
.real
;
1546 pthread_setspecific(thread_current
, thread
);
1547 (*thread
->func
)(thread
);
1548 pthread_setspecific(thread_current
, NULL
);
1552 realtime
= thread_consumed_time(&after
, &before
, &helper
);
1555 /* update realtime */
1556 atomic_fetch_add_explicit(&thread
->hist
->real
.total
, realtime
,
1557 memory_order_seq_cst
);
1558 exp
= atomic_load_explicit(&thread
->hist
->real
.max
,
1559 memory_order_seq_cst
);
1560 while (exp
< realtime
1561 && !atomic_compare_exchange_weak_explicit(
1562 &thread
->hist
->real
.max
, &exp
, realtime
,
1563 memory_order_seq_cst
, memory_order_seq_cst
))
1566 /* update cputime */
1567 atomic_fetch_add_explicit(&thread
->hist
->cpu
.total
, cputime
,
1568 memory_order_seq_cst
);
1569 exp
= atomic_load_explicit(&thread
->hist
->cpu
.max
,
1570 memory_order_seq_cst
);
1571 while (exp
< cputime
1572 && !atomic_compare_exchange_weak_explicit(
1573 &thread
->hist
->cpu
.max
, &exp
, cputime
,
1574 memory_order_seq_cst
, memory_order_seq_cst
))
1577 atomic_fetch_add_explicit(&thread
->hist
->total_calls
, 1,
1578 memory_order_seq_cst
);
1579 atomic_fetch_or_explicit(&thread
->hist
->types
, 1 << thread
->add_type
,
1580 memory_order_seq_cst
);
1582 #ifdef CONSUMED_TIME_CHECK
1583 if (realtime
> CONSUMED_TIME_CHECK
) {
1585 * We have a CPU Hog on our hands.
1586 * Whinge about it now, so we're aware this is yet another task
1591 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1592 thread
->funcname
, (unsigned long)thread
->func
,
1593 realtime
/ 1000, cputime
/ 1000);
1595 #endif /* CONSUMED_TIME_CHECK */
1598 /* Execute thread */
1599 void funcname_thread_execute(struct thread_master
*m
,
1600 int (*func
)(struct thread
*), void *arg
, int val
,
1603 struct thread
*thread
;
1605 /* Get or allocate new thread to execute. */
1606 pthread_mutex_lock(&m
->mtx
);
1608 thread
= thread_get(m
, THREAD_EVENT
, func
, arg
, debugargpass
);
1610 /* Set its event value. */
1611 pthread_mutex_lock(&thread
->mtx
);
1613 thread
->add_type
= THREAD_EXECUTE
;
1614 thread
->u
.val
= val
;
1615 thread
->ref
= &thread
;
1617 pthread_mutex_unlock(&thread
->mtx
);
1619 pthread_mutex_unlock(&m
->mtx
);
1621 /* Execute thread doing all accounting. */
1622 thread_call(thread
);
1624 /* Give back or free thread. */
1625 thread_add_unuse(m
, thread
);