1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
4 * This file is part of GNU Zebra.
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 #include <sys/resource.h>
35 #include "frratomic.h"
37 DEFINE_MTYPE_STATIC(LIB
, THREAD
, "Thread")
38 DEFINE_MTYPE_STATIC(LIB
, THREAD_MASTER
, "Thread master")
39 DEFINE_MTYPE_STATIC(LIB
, THREAD_STATS
, "Thread stats")
41 #if defined(__APPLE__)
42 #include <mach/mach.h>
43 #include <mach/mach_time.h>
48 static unsigned char wakebyte = 0x01; \
49 write(m->io_pipe[1], &wakebyte, 1); \
52 /* control variable for initializer */
53 pthread_once_t init_once
= PTHREAD_ONCE_INIT
;
54 pthread_key_t thread_current
;
56 pthread_mutex_t masters_mtx
= PTHREAD_MUTEX_INITIALIZER
;
57 static struct list
*masters
;
60 /* CLI start ---------------------------------------------------------------- */
61 static unsigned int cpu_record_hash_key(struct cpu_thread_history
*a
)
63 int size
= sizeof(&a
->func
);
65 return jhash(&a
->func
, size
, 0);
68 static int cpu_record_hash_cmp(const struct cpu_thread_history
*a
,
69 const struct cpu_thread_history
*b
)
71 return a
->func
== b
->func
;
74 static void *cpu_record_hash_alloc(struct cpu_thread_history
*a
)
76 struct cpu_thread_history
*new;
77 new = XCALLOC(MTYPE_THREAD_STATS
, sizeof(struct cpu_thread_history
));
79 new->funcname
= a
->funcname
;
83 static void cpu_record_hash_free(void *a
)
85 struct cpu_thread_history
*hist
= a
;
87 XFREE(MTYPE_THREAD_STATS
, hist
);
90 static void vty_out_cpu_thread_history(struct vty
*vty
,
91 struct cpu_thread_history
*a
)
93 vty_out(vty
, "%5d %10lu.%03lu %9u %8lu %9lu %8lu %9lu", a
->total_active
,
94 a
->cpu
.total
/ 1000, a
->cpu
.total
% 1000, a
->total_calls
,
95 a
->cpu
.total
/ a
->total_calls
, a
->cpu
.max
,
96 a
->real
.total
/ a
->total_calls
, a
->real
.max
);
97 vty_out(vty
, " %c%c%c%c%c %s\n",
98 a
->types
& (1 << THREAD_READ
) ? 'R' : ' ',
99 a
->types
& (1 << THREAD_WRITE
) ? 'W' : ' ',
100 a
->types
& (1 << THREAD_TIMER
) ? 'T' : ' ',
101 a
->types
& (1 << THREAD_EVENT
) ? 'E' : ' ',
102 a
->types
& (1 << THREAD_EXECUTE
) ? 'X' : ' ', a
->funcname
);
105 static void cpu_record_hash_print(struct hash_backet
*bucket
, void *args
[])
107 struct cpu_thread_history
*totals
= args
[0];
108 struct cpu_thread_history copy
;
109 struct vty
*vty
= args
[1];
110 uint8_t *filter
= args
[2];
112 struct cpu_thread_history
*a
= bucket
->data
;
115 atomic_load_explicit(&a
->total_active
, memory_order_seq_cst
);
117 atomic_load_explicit(&a
->total_calls
, memory_order_seq_cst
);
119 atomic_load_explicit(&a
->cpu
.total
, memory_order_seq_cst
);
120 copy
.cpu
.max
= atomic_load_explicit(&a
->cpu
.max
, memory_order_seq_cst
);
122 atomic_load_explicit(&a
->real
.total
, memory_order_seq_cst
);
124 atomic_load_explicit(&a
->real
.max
, memory_order_seq_cst
);
125 copy
.types
= atomic_load_explicit(&a
->types
, memory_order_seq_cst
);
126 copy
.funcname
= a
->funcname
;
128 if (!(copy
.types
& *filter
))
131 vty_out_cpu_thread_history(vty
, ©
);
132 totals
->total_active
+= copy
.total_active
;
133 totals
->total_calls
+= copy
.total_calls
;
134 totals
->real
.total
+= copy
.real
.total
;
135 if (totals
->real
.max
< copy
.real
.max
)
136 totals
->real
.max
= copy
.real
.max
;
137 totals
->cpu
.total
+= copy
.cpu
.total
;
138 if (totals
->cpu
.max
< copy
.cpu
.max
)
139 totals
->cpu
.max
= copy
.cpu
.max
;
142 static void cpu_record_print(struct vty
*vty
, uint8_t filter
)
144 struct cpu_thread_history tmp
;
145 void *args
[3] = {&tmp
, vty
, &filter
};
146 struct thread_master
*m
;
149 memset(&tmp
, 0, sizeof tmp
);
150 tmp
.funcname
= "TOTAL";
153 pthread_mutex_lock(&masters_mtx
);
155 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
156 const char *name
= m
->name
? m
->name
: "main";
158 char underline
[strlen(name
) + 1];
159 memset(underline
, '-', sizeof(underline
));
160 underline
[sizeof(underline
) - 1] = '\0';
163 vty_out(vty
, "Showing statistics for pthread %s\n",
165 vty_out(vty
, "-------------------------------%s\n",
167 vty_out(vty
, "%21s %18s %18s\n", "",
168 "CPU (user+system):", "Real (wall-clock):");
170 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
171 vty_out(vty
, " Avg uSec Max uSecs");
172 vty_out(vty
, " Type Thread\n");
174 if (m
->cpu_record
->count
)
177 (void (*)(struct hash_backet
*,
178 void *))cpu_record_hash_print
,
181 vty_out(vty
, "No data to display yet.\n");
186 pthread_mutex_unlock(&masters_mtx
);
189 vty_out(vty
, "Total thread statistics\n");
190 vty_out(vty
, "-------------------------\n");
191 vty_out(vty
, "%21s %18s %18s\n", "",
192 "CPU (user+system):", "Real (wall-clock):");
193 vty_out(vty
, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
194 vty_out(vty
, " Avg uSec Max uSecs");
195 vty_out(vty
, " Type Thread\n");
197 if (tmp
.total_calls
> 0)
198 vty_out_cpu_thread_history(vty
, &tmp
);
201 static void cpu_record_hash_clear(struct hash_backet
*bucket
, void *args
[])
203 uint8_t *filter
= args
[0];
204 struct hash
*cpu_record
= args
[1];
206 struct cpu_thread_history
*a
= bucket
->data
;
208 if (!(a
->types
& *filter
))
211 hash_release(cpu_record
, bucket
->data
);
214 static void cpu_record_clear(uint8_t filter
)
216 uint8_t *tmp
= &filter
;
217 struct thread_master
*m
;
220 pthread_mutex_lock(&masters_mtx
);
222 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
223 pthread_mutex_lock(&m
->mtx
);
225 void *args
[2] = {tmp
, m
->cpu_record
};
228 (void (*)(struct hash_backet
*,
229 void *))cpu_record_hash_clear
,
232 pthread_mutex_unlock(&m
->mtx
);
235 pthread_mutex_unlock(&masters_mtx
);
238 static uint8_t parse_filter(const char *filterstr
)
243 while (filterstr
[i
] != '\0') {
244 switch (filterstr
[i
]) {
247 filter
|= (1 << THREAD_READ
);
251 filter
|= (1 << THREAD_WRITE
);
255 filter
|= (1 << THREAD_TIMER
);
259 filter
|= (1 << THREAD_EVENT
);
263 filter
|= (1 << THREAD_EXECUTE
);
273 DEFUN (show_thread_cpu
,
275 "show thread cpu [FILTER]",
277 "Thread information\n"
279 "Display filter (rwtexb)\n")
281 uint8_t filter
= (uint8_t)-1U;
284 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
285 filter
= parse_filter(argv
[idx
]->arg
);
288 "Invalid filter \"%s\" specified; must contain at least"
295 cpu_record_print(vty
, filter
);
299 DEFUN (clear_thread_cpu
,
300 clear_thread_cpu_cmd
,
301 "clear thread cpu [FILTER]",
302 "Clear stored data in all pthreads\n"
303 "Thread information\n"
305 "Display filter (rwtexb)\n")
307 uint8_t filter
= (uint8_t)-1U;
310 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
311 filter
= parse_filter(argv
[idx
]->arg
);
314 "Invalid filter \"%s\" specified; must contain at least"
321 cpu_record_clear(filter
);
325 void thread_cmd_init(void)
327 install_element(VIEW_NODE
, &show_thread_cpu_cmd
);
328 install_element(ENABLE_NODE
, &clear_thread_cpu_cmd
);
330 /* CLI end ------------------------------------------------------------------ */
333 static int thread_timer_cmp(void *a
, void *b
)
335 struct thread
*thread_a
= a
;
336 struct thread
*thread_b
= b
;
338 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, <))
340 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, >))
345 static void thread_timer_update(void *node
, int actual_position
)
347 struct thread
*thread
= node
;
349 thread
->index
= actual_position
;
352 static void cancelreq_del(void *cr
)
354 XFREE(MTYPE_TMP
, cr
);
357 /* initializer, only ever called once */
358 static void initializer()
360 pthread_key_create(&thread_current
, NULL
);
363 struct thread_master
*thread_master_create(const char *name
)
365 struct thread_master
*rv
;
368 pthread_once(&init_once
, &initializer
);
370 rv
= XCALLOC(MTYPE_THREAD_MASTER
, sizeof(struct thread_master
));
374 /* Initialize master mutex */
375 pthread_mutex_init(&rv
->mtx
, NULL
);
376 pthread_cond_init(&rv
->cancel_cond
, NULL
);
379 rv
->name
= name
? XSTRDUP(MTYPE_THREAD_MASTER
, name
) : NULL
;
381 /* Initialize I/O task data structures */
382 getrlimit(RLIMIT_NOFILE
, &limit
);
383 rv
->fd_limit
= (int)limit
.rlim_cur
;
385 XCALLOC(MTYPE_THREAD
, sizeof(struct thread
*) * rv
->fd_limit
);
386 if (rv
->read
== NULL
) {
387 XFREE(MTYPE_THREAD_MASTER
, rv
);
391 XCALLOC(MTYPE_THREAD
, sizeof(struct thread
*) * rv
->fd_limit
);
392 if (rv
->write
== NULL
) {
393 XFREE(MTYPE_THREAD
, rv
->read
);
394 XFREE(MTYPE_THREAD_MASTER
, rv
);
398 rv
->cpu_record
= hash_create_size(
399 8, (unsigned int (*)(void *))cpu_record_hash_key
,
400 (int (*)(const void *, const void *))cpu_record_hash_cmp
,
404 /* Initialize the timer queues */
405 rv
->timer
= pqueue_create();
406 rv
->timer
->cmp
= thread_timer_cmp
;
407 rv
->timer
->update
= thread_timer_update
;
409 /* Initialize thread_fetch() settings */
411 rv
->handle_signals
= true;
413 /* Set pthread owner, should be updated by actual owner */
414 rv
->owner
= pthread_self();
415 rv
->cancel_req
= list_new();
416 rv
->cancel_req
->del
= cancelreq_del
;
419 /* Initialize pipe poker */
421 set_nonblocking(rv
->io_pipe
[0]);
422 set_nonblocking(rv
->io_pipe
[1]);
424 /* Initialize data structures for poll() */
425 rv
->handler
.pfdsize
= rv
->fd_limit
;
426 rv
->handler
.pfdcount
= 0;
427 rv
->handler
.pfds
= XCALLOC(MTYPE_THREAD_MASTER
,
428 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
429 rv
->handler
.copy
= XCALLOC(MTYPE_THREAD_MASTER
,
430 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
432 /* add to list of threadmasters */
433 pthread_mutex_lock(&masters_mtx
);
436 masters
= list_new();
438 listnode_add(masters
, rv
);
440 pthread_mutex_unlock(&masters_mtx
);
445 void thread_master_set_name(struct thread_master
*master
, const char *name
)
447 pthread_mutex_lock(&master
->mtx
);
450 XFREE(MTYPE_THREAD_MASTER
, master
->name
);
451 master
->name
= XSTRDUP(MTYPE_THREAD_MASTER
, name
);
453 pthread_mutex_unlock(&master
->mtx
);
456 /* Add a new thread to the list. */
457 static void thread_list_add(struct thread_list
*list
, struct thread
*thread
)
460 thread
->prev
= list
->tail
;
462 list
->tail
->next
= thread
;
469 /* Delete a thread from the list. */
470 static struct thread
*thread_list_delete(struct thread_list
*list
,
471 struct thread
*thread
)
474 thread
->next
->prev
= thread
->prev
;
476 list
->tail
= thread
->prev
;
478 thread
->prev
->next
= thread
->next
;
480 list
->head
= thread
->next
;
481 thread
->next
= thread
->prev
= NULL
;
486 /* Thread list is empty or not. */
487 static int thread_empty(struct thread_list
*list
)
489 return list
->head
? 0 : 1;
492 /* Delete top of the list and return it. */
493 static struct thread
*thread_trim_head(struct thread_list
*list
)
495 if (!thread_empty(list
))
496 return thread_list_delete(list
, list
->head
);
500 /* Move thread to unuse list. */
501 static void thread_add_unuse(struct thread_master
*m
, struct thread
*thread
)
503 assert(m
!= NULL
&& thread
!= NULL
);
504 assert(thread
->next
== NULL
);
505 assert(thread
->prev
== NULL
);
508 thread
->type
= THREAD_UNUSED
;
509 thread
->hist
->total_active
--;
510 thread_list_add(&m
->unuse
, thread
);
513 /* Free all unused thread. */
514 static void thread_list_free(struct thread_master
*m
, struct thread_list
*list
)
519 for (t
= list
->head
; t
; t
= next
) {
521 XFREE(MTYPE_THREAD
, t
);
527 static void thread_array_free(struct thread_master
*m
,
528 struct thread
**thread_array
)
533 for (index
= 0; index
< m
->fd_limit
; ++index
) {
534 t
= thread_array
[index
];
536 thread_array
[index
] = NULL
;
537 XFREE(MTYPE_THREAD
, t
);
541 XFREE(MTYPE_THREAD
, thread_array
);
544 static void thread_queue_free(struct thread_master
*m
, struct pqueue
*queue
)
548 for (i
= 0; i
< queue
->size
; i
++)
549 XFREE(MTYPE_THREAD
, queue
->array
[i
]);
551 m
->alloc
-= queue
->size
;
552 pqueue_delete(queue
);
556 * thread_master_free_unused
558 * As threads are finished with they are put on the
559 * unuse list for later reuse.
560 * If we are shutting down, Free up unused threads
561 * So we can see if we forget to shut anything off
563 void thread_master_free_unused(struct thread_master
*m
)
565 pthread_mutex_lock(&m
->mtx
);
568 while ((t
= thread_trim_head(&m
->unuse
)) != NULL
) {
569 pthread_mutex_destroy(&t
->mtx
);
570 XFREE(MTYPE_THREAD
, t
);
573 pthread_mutex_unlock(&m
->mtx
);
576 /* Stop thread scheduler. */
577 void thread_master_free(struct thread_master
*m
)
579 pthread_mutex_lock(&masters_mtx
);
581 listnode_delete(masters
, m
);
582 if (masters
->count
== 0) {
583 list_delete_and_null(&masters
);
586 pthread_mutex_unlock(&masters_mtx
);
588 thread_array_free(m
, m
->read
);
589 thread_array_free(m
, m
->write
);
590 thread_queue_free(m
, m
->timer
);
591 thread_list_free(m
, &m
->event
);
592 thread_list_free(m
, &m
->ready
);
593 thread_list_free(m
, &m
->unuse
);
594 pthread_mutex_destroy(&m
->mtx
);
595 pthread_cond_destroy(&m
->cancel_cond
);
596 close(m
->io_pipe
[0]);
597 close(m
->io_pipe
[1]);
598 list_delete_and_null(&m
->cancel_req
);
599 m
->cancel_req
= NULL
;
601 hash_clean(m
->cpu_record
, cpu_record_hash_free
);
602 hash_free(m
->cpu_record
);
603 m
->cpu_record
= NULL
;
606 XFREE(MTYPE_THREAD_MASTER
, m
->name
);
607 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.pfds
);
608 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.copy
);
609 XFREE(MTYPE_THREAD_MASTER
, m
);
612 /* Return remain time in second. */
613 unsigned long thread_timer_remain_second(struct thread
*thread
)
617 pthread_mutex_lock(&thread
->mtx
);
619 remain
= monotime_until(&thread
->u
.sands
, NULL
) / 1000000LL;
621 pthread_mutex_unlock(&thread
->mtx
);
623 return remain
< 0 ? 0 : remain
;
626 #define debugargdef const char *funcname, const char *schedfrom, int fromln
627 #define debugargpass funcname, schedfrom, fromln
629 struct timeval
thread_timer_remain(struct thread
*thread
)
631 struct timeval remain
;
632 pthread_mutex_lock(&thread
->mtx
);
634 monotime_until(&thread
->u
.sands
, &remain
);
636 pthread_mutex_unlock(&thread
->mtx
);
640 /* Get new thread. */
641 static struct thread
*thread_get(struct thread_master
*m
, uint8_t type
,
642 int (*func
)(struct thread
*), void *arg
,
645 struct thread
*thread
= thread_trim_head(&m
->unuse
);
646 struct cpu_thread_history tmp
;
649 thread
= XCALLOC(MTYPE_THREAD
, sizeof(struct thread
));
650 /* mutex only needs to be initialized at struct creation. */
651 pthread_mutex_init(&thread
->mtx
, NULL
);
656 thread
->add_type
= type
;
660 thread
->yield
= THREAD_YIELD_TIME_SLOT
; /* default */
664 * So if the passed in funcname is not what we have
665 * stored that means the thread->hist needs to be
666 * updated. We keep the last one around in unused
667 * under the assumption that we are probably
668 * going to immediately allocate the same
670 * This hopefully saves us some serious
673 if (thread
->funcname
!= funcname
|| thread
->func
!= func
) {
675 tmp
.funcname
= funcname
;
677 hash_get(m
->cpu_record
, &tmp
,
678 (void *(*)(void *))cpu_record_hash_alloc
);
680 thread
->hist
->total_active
++;
682 thread
->funcname
= funcname
;
683 thread
->schedfrom
= schedfrom
;
684 thread
->schedfrom_line
= fromln
;
689 static int fd_poll(struct thread_master
*m
, struct pollfd
*pfds
, nfds_t pfdsize
,
690 nfds_t count
, const struct timeval
*timer_wait
)
692 /* If timer_wait is null here, that means poll() should block
694 * unless the thread_master has overriden it by setting
695 * ->selectpoll_timeout.
696 * If the value is positive, it specifies the maximum number of
698 * to wait. If the timeout is -1, it specifies that we should never wait
700 * always return immediately even if no event is detected. If the value
702 * zero, the behavior is default. */
705 /* number of file descriptors with events */
708 if (timer_wait
!= NULL
709 && m
->selectpoll_timeout
== 0) // use the default value
710 timeout
= (timer_wait
->tv_sec
* 1000)
711 + (timer_wait
->tv_usec
/ 1000);
712 else if (m
->selectpoll_timeout
> 0) // use the user's timeout
713 timeout
= m
->selectpoll_timeout
;
714 else if (m
->selectpoll_timeout
715 < 0) // effect a poll (return immediately)
718 /* add poll pipe poker */
719 assert(count
+ 1 < pfdsize
);
720 pfds
[count
].fd
= m
->io_pipe
[0];
721 pfds
[count
].events
= POLLIN
;
722 pfds
[count
].revents
= 0x00;
724 num
= poll(pfds
, count
+ 1, timeout
);
726 unsigned char trash
[64];
727 if (num
> 0 && pfds
[count
].revents
!= 0 && num
--)
728 while (read(m
->io_pipe
[0], &trash
, sizeof(trash
)) > 0)
734 /* Add new read thread. */
735 struct thread
*funcname_thread_add_read_write(int dir
, struct thread_master
*m
,
736 int (*func
)(struct thread
*),
738 struct thread
**t_ptr
,
741 struct thread
*thread
= NULL
;
743 pthread_mutex_lock(&m
->mtx
);
746 && *t_ptr
) // thread is already scheduled; don't reschedule
748 pthread_mutex_unlock(&m
->mtx
);
752 /* default to a new pollfd */
753 nfds_t queuepos
= m
->handler
.pfdcount
;
755 /* if we already have a pollfd for our file descriptor, find and
757 for (nfds_t i
= 0; i
< m
->handler
.pfdcount
; i
++)
758 if (m
->handler
.pfds
[i
].fd
== fd
) {
763 /* make sure we have room for this fd + pipe poker fd */
764 assert(queuepos
+ 1 < m
->handler
.pfdsize
);
766 thread
= thread_get(m
, dir
, func
, arg
, debugargpass
);
768 m
->handler
.pfds
[queuepos
].fd
= fd
;
769 m
->handler
.pfds
[queuepos
].events
|=
770 (dir
== THREAD_READ
? POLLIN
: POLLOUT
);
772 if (queuepos
== m
->handler
.pfdcount
)
773 m
->handler
.pfdcount
++;
776 pthread_mutex_lock(&thread
->mtx
);
779 if (dir
== THREAD_READ
)
780 m
->read
[thread
->u
.fd
] = thread
;
782 m
->write
[thread
->u
.fd
] = thread
;
784 pthread_mutex_unlock(&thread
->mtx
);
794 pthread_mutex_unlock(&m
->mtx
);
799 static struct thread
*
800 funcname_thread_add_timer_timeval(struct thread_master
*m
,
801 int (*func
)(struct thread
*), int type
,
802 void *arg
, struct timeval
*time_relative
,
803 struct thread
**t_ptr
, debugargdef
)
805 struct thread
*thread
;
806 struct pqueue
*queue
;
810 assert(type
== THREAD_TIMER
);
811 assert(time_relative
);
813 pthread_mutex_lock(&m
->mtx
);
816 && *t_ptr
) // thread is already scheduled; don't reschedule
818 pthread_mutex_unlock(&m
->mtx
);
823 thread
= thread_get(m
, type
, func
, arg
, debugargpass
);
825 pthread_mutex_lock(&thread
->mtx
);
827 monotime(&thread
->u
.sands
);
828 timeradd(&thread
->u
.sands
, time_relative
,
830 pqueue_enqueue(thread
, queue
);
836 pthread_mutex_unlock(&thread
->mtx
);
840 pthread_mutex_unlock(&m
->mtx
);
846 /* Add timer event thread. */
847 struct thread
*funcname_thread_add_timer(struct thread_master
*m
,
848 int (*func
)(struct thread
*),
849 void *arg
, long timer
,
850 struct thread
**t_ptr
, debugargdef
)
859 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
860 &trel
, t_ptr
, debugargpass
);
863 /* Add timer event thread with "millisecond" resolution */
864 struct thread
*funcname_thread_add_timer_msec(struct thread_master
*m
,
865 int (*func
)(struct thread
*),
866 void *arg
, long timer
,
867 struct thread
**t_ptr
,
874 trel
.tv_sec
= timer
/ 1000;
875 trel
.tv_usec
= 1000 * (timer
% 1000);
877 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
878 &trel
, t_ptr
, debugargpass
);
881 /* Add timer event thread with "millisecond" resolution */
882 struct thread
*funcname_thread_add_timer_tv(struct thread_master
*m
,
883 int (*func
)(struct thread
*),
884 void *arg
, struct timeval
*tv
,
885 struct thread
**t_ptr
, debugargdef
)
887 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
, tv
,
888 t_ptr
, debugargpass
);
891 /* Add simple event thread. */
892 struct thread
*funcname_thread_add_event(struct thread_master
*m
,
893 int (*func
)(struct thread
*),
895 struct thread
**t_ptr
, debugargdef
)
897 struct thread
*thread
;
901 pthread_mutex_lock(&m
->mtx
);
904 && *t_ptr
) // thread is already scheduled; don't reschedule
906 pthread_mutex_unlock(&m
->mtx
);
910 thread
= thread_get(m
, THREAD_EVENT
, func
, arg
, debugargpass
);
911 pthread_mutex_lock(&thread
->mtx
);
914 thread_list_add(&m
->event
, thread
);
916 pthread_mutex_unlock(&thread
->mtx
);
925 pthread_mutex_unlock(&m
->mtx
);
930 /* Thread cancellation ------------------------------------------------------ */
933 * NOT's out the .events field of pollfd corresponding to the given file
934 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
936 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
937 * implementation for details.
941 * @param state the event to cancel. One or more (OR'd together) of the
946 static void thread_cancel_rw(struct thread_master
*master
, int fd
, short state
)
950 /* Cancel POLLHUP too just in case some bozo set it */
953 /* find the index of corresponding pollfd */
956 for (i
= 0; i
< master
->handler
.pfdcount
; i
++)
957 if (master
->handler
.pfds
[i
].fd
== fd
) {
964 "[!] Received cancellation request for nonexistent rw job");
965 zlog_debug("[!] threadmaster: %s | fd: %d",
966 master
->name
? master
->name
: "", fd
);
971 master
->handler
.pfds
[i
].events
&= ~(state
);
973 /* If all events are canceled, delete / resize the pollfd array. */
974 if (master
->handler
.pfds
[i
].events
== 0) {
975 memmove(master
->handler
.pfds
+ i
, master
->handler
.pfds
+ i
+ 1,
976 (master
->handler
.pfdcount
- i
- 1)
977 * sizeof(struct pollfd
));
978 master
->handler
.pfdcount
--;
981 /* If we have the same pollfd in the copy, perform the same operations,
982 * otherwise return. */
983 if (i
>= master
->handler
.copycount
)
986 master
->handler
.copy
[i
].events
&= ~(state
);
988 if (master
->handler
.copy
[i
].events
== 0) {
989 memmove(master
->handler
.copy
+ i
, master
->handler
.copy
+ i
+ 1,
990 (master
->handler
.copycount
- i
- 1)
991 * sizeof(struct pollfd
));
992 master
->handler
.copycount
--;
997 * Process cancellation requests.
999 * This may only be run from the pthread which owns the thread_master.
1001 * @param master the thread master to process
1002 * @REQUIRE master->mtx
1004 static void do_thread_cancel(struct thread_master
*master
)
1006 struct thread_list
*list
= NULL
;
1007 struct pqueue
*queue
= NULL
;
1008 struct thread
**thread_array
= NULL
;
1009 struct thread
*thread
;
1011 struct cancel_req
*cr
;
1012 struct listnode
*ln
;
1013 for (ALL_LIST_ELEMENTS_RO(master
->cancel_req
, ln
, cr
)) {
1014 /* If this is an event object cancellation, linear search
1016 * list deleting any events which have the specified argument.
1018 * need to check every thread in the ready queue. */
1021 thread
= master
->event
.head
;
1027 if (t
->arg
== cr
->eventobj
) {
1028 thread_list_delete(&master
->event
, t
);
1031 thread_add_unuse(master
, t
);
1035 thread
= master
->ready
.head
;
1040 if (t
->arg
== cr
->eventobj
) {
1041 thread_list_delete(&master
->ready
, t
);
1044 thread_add_unuse(master
, t
);
1050 /* The pointer varies depending on whether the cancellation
1052 * made asynchronously or not. If it was, we need to check
1054 * thread even exists anymore before cancelling it. */
1055 thread
= (cr
->thread
) ? cr
->thread
: *cr
->threadref
;
1060 /* Determine the appropriate queue to cancel the thread from */
1061 switch (thread
->type
) {
1063 thread_cancel_rw(master
, thread
->u
.fd
, POLLIN
);
1064 thread_array
= master
->read
;
1067 thread_cancel_rw(master
, thread
->u
.fd
, POLLOUT
);
1068 thread_array
= master
->write
;
1071 queue
= master
->timer
;
1074 list
= &master
->event
;
1077 list
= &master
->ready
;
1085 assert(thread
->index
>= 0);
1086 assert(thread
== queue
->array
[thread
->index
]);
1087 pqueue_remove_at(thread
->index
, queue
);
1089 thread_list_delete(list
, thread
);
1090 } else if (thread_array
) {
1091 thread_array
[thread
->u
.fd
] = NULL
;
1093 assert(!"Thread should be either in queue or list or array!");
1097 *thread
->ref
= NULL
;
1099 thread_add_unuse(thread
->master
, thread
);
1102 /* Delete and free all cancellation requests */
1103 list_delete_all_node(master
->cancel_req
);
1105 /* Wake up any threads which may be blocked in thread_cancel_async() */
1106 master
->canceled
= true;
1107 pthread_cond_broadcast(&master
->cancel_cond
);
1111 * Cancel any events which have the specified argument.
1115 * @param m the thread_master to cancel from
1116 * @param arg the argument passed when creating the event
1118 void thread_cancel_event(struct thread_master
*master
, void *arg
)
1120 assert(master
->owner
== pthread_self());
1122 pthread_mutex_lock(&master
->mtx
);
1124 struct cancel_req
*cr
=
1125 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1127 listnode_add(master
->cancel_req
, cr
);
1128 do_thread_cancel(master
);
1130 pthread_mutex_unlock(&master
->mtx
);
1134 * Cancel a specific task.
1138 * @param thread task to cancel
1140 void thread_cancel(struct thread
*thread
)
1142 assert(thread
->master
->owner
== pthread_self());
1144 pthread_mutex_lock(&thread
->master
->mtx
);
1146 struct cancel_req
*cr
=
1147 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1148 cr
->thread
= thread
;
1149 listnode_add(thread
->master
->cancel_req
, cr
);
1150 do_thread_cancel(thread
->master
);
1152 pthread_mutex_unlock(&thread
->master
->mtx
);
1156 * Asynchronous cancellation.
1158 * Called with either a struct thread ** or void * to an event argument,
1159 * this function posts the correct cancellation request and blocks until it is
1162 * If the thread is currently running, execution blocks until it completes.
1164 * The last two parameters are mutually exclusive, i.e. if you pass one the
1165 * other must be NULL.
1167 * When the cancellation procedure executes on the target thread_master, the
1168 * thread * provided is checked for nullity. If it is null, the thread is
1169 * assumed to no longer exist and the cancellation request is a no-op. Thus
1170 * users of this API must pass a back-reference when scheduling the original
1175 * @param master the thread master with the relevant event / task
1176 * @param thread pointer to thread to cancel
1177 * @param eventobj the event
1179 void thread_cancel_async(struct thread_master
*master
, struct thread
**thread
,
1182 assert(!(thread
&& eventobj
) && (thread
|| eventobj
));
1183 assert(master
->owner
!= pthread_self());
1185 pthread_mutex_lock(&master
->mtx
);
1187 master
->canceled
= false;
1190 struct cancel_req
*cr
=
1191 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1192 cr
->threadref
= thread
;
1193 listnode_add(master
->cancel_req
, cr
);
1194 } else if (eventobj
) {
1195 struct cancel_req
*cr
=
1196 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1197 cr
->eventobj
= eventobj
;
1198 listnode_add(master
->cancel_req
, cr
);
1202 while (!master
->canceled
)
1203 pthread_cond_wait(&master
->cancel_cond
, &master
->mtx
);
1205 pthread_mutex_unlock(&master
->mtx
);
1207 /* ------------------------------------------------------------------------- */
1209 static struct timeval
*thread_timer_wait(struct pqueue
*queue
,
1210 struct timeval
*timer_val
)
1213 struct thread
*next_timer
= queue
->array
[0];
1214 monotime_until(&next_timer
->u
.sands
, timer_val
);
1220 static struct thread
*thread_run(struct thread_master
*m
, struct thread
*thread
,
1221 struct thread
*fetch
)
1224 thread_add_unuse(m
, thread
);
1228 static int thread_process_io_helper(struct thread_master
*m
,
1229 struct thread
*thread
, short state
, int pos
)
1231 struct thread
**thread_array
;
1236 if (thread
->type
== THREAD_READ
)
1237 thread_array
= m
->read
;
1239 thread_array
= m
->write
;
1241 thread_array
[thread
->u
.fd
] = NULL
;
1242 thread_list_add(&m
->ready
, thread
);
1243 thread
->type
= THREAD_READY
;
1244 /* if another pthread scheduled this file descriptor for the event we're
1245 * responding to, no problem; we're getting to it now */
1246 thread
->master
->handler
.pfds
[pos
].events
&= ~(state
);
1251 * Process I/O events.
1253 * Walks through file descriptor array looking for those pollfds whose .revents
1254 * field has something interesting. Deletes any invalid file descriptors.
1256 * @param m the thread master
1257 * @param num the number of active file descriptors (return value of poll())
1259 static void thread_process_io(struct thread_master
*m
, unsigned int num
)
1261 unsigned int ready
= 0;
1262 struct pollfd
*pfds
= m
->handler
.copy
;
1264 for (nfds_t i
= 0; i
< m
->handler
.copycount
&& ready
< num
; ++i
) {
1265 /* no event for current fd? immediately continue */
1266 if (pfds
[i
].revents
== 0)
1271 /* Unless someone has called thread_cancel from another pthread,
1273 * thing that could have changed in m->handler.pfds while we
1275 * asleep is the .events field in a given pollfd. Barring
1277 * that value should be a superset of the values we have in our
1279 * there's no need to update it. Similarily, barring deletion,
1281 * should still be a valid index into the master's pfds. */
1282 if (pfds
[i
].revents
& (POLLIN
| POLLHUP
))
1283 thread_process_io_helper(m
, m
->read
[pfds
[i
].fd
], POLLIN
,
1285 if (pfds
[i
].revents
& POLLOUT
)
1286 thread_process_io_helper(m
, m
->write
[pfds
[i
].fd
],
1289 /* if one of our file descriptors is garbage, remove the same
1291 * both pfds + update sizes and index */
1292 if (pfds
[i
].revents
& POLLNVAL
) {
1293 memmove(m
->handler
.pfds
+ i
, m
->handler
.pfds
+ i
+ 1,
1294 (m
->handler
.pfdcount
- i
- 1)
1295 * sizeof(struct pollfd
));
1296 m
->handler
.pfdcount
--;
1298 memmove(pfds
+ i
, pfds
+ i
+ 1,
1299 (m
->handler
.copycount
- i
- 1)
1300 * sizeof(struct pollfd
));
1301 m
->handler
.copycount
--;
1308 /* Add all timers that have popped to the ready list. */
1309 static unsigned int thread_process_timers(struct pqueue
*queue
,
1310 struct timeval
*timenow
)
1312 struct thread
*thread
;
1313 unsigned int ready
= 0;
1315 while (queue
->size
) {
1316 thread
= queue
->array
[0];
1317 if (timercmp(timenow
, &thread
->u
.sands
, <))
1319 pqueue_dequeue(queue
);
1320 thread
->type
= THREAD_READY
;
1321 thread_list_add(&thread
->master
->ready
, thread
);
1327 /* process a list en masse, e.g. for event thread lists */
1328 static unsigned int thread_process(struct thread_list
*list
)
1330 struct thread
*thread
;
1331 struct thread
*next
;
1332 unsigned int ready
= 0;
1334 for (thread
= list
->head
; thread
; thread
= next
) {
1335 next
= thread
->next
;
1336 thread_list_delete(list
, thread
);
1337 thread
->type
= THREAD_READY
;
1338 thread_list_add(&thread
->master
->ready
, thread
);
1345 /* Fetch next ready thread. */
1346 struct thread
*thread_fetch(struct thread_master
*m
, struct thread
*fetch
)
1348 struct thread
*thread
= NULL
;
1350 struct timeval zerotime
= {0, 0};
1352 struct timeval
*tw
= NULL
;
1357 /* Handle signals if any */
1358 if (m
->handle_signals
)
1359 quagga_sigevent_process();
1361 pthread_mutex_lock(&m
->mtx
);
1363 /* Process any pending cancellation requests */
1364 do_thread_cancel(m
);
1367 * Attempt to flush ready queue before going into poll().
1368 * This is performance-critical. Think twice before modifying.
1370 if ((thread
= thread_trim_head(&m
->ready
))) {
1371 fetch
= thread_run(m
, thread
, fetch
);
1374 pthread_mutex_unlock(&m
->mtx
);
1378 /* otherwise, tick through scheduling sequence */
1381 * Post events to ready queue. This must come before the
1382 * following block since events should occur immediately
1384 thread_process(&m
->event
);
1387 * If there are no tasks on the ready queue, we will poll()
1388 * until a timer expires or we receive I/O, whichever comes
1389 * first. The strategy for doing this is:
1391 * - If there are events pending, set the poll() timeout to zero
1392 * - If there are no events pending, but there are timers
1394 * timeout to the smallest remaining time on any timer
1395 * - If there are neither timers nor events pending, but there
1397 * descriptors pending, block indefinitely in poll()
1398 * - If nothing is pending, it's time for the application to die
1400 * In every case except the last, we need to hit poll() at least
1401 * once per loop to avoid starvation by events
1403 if (m
->ready
.count
== 0)
1404 tw
= thread_timer_wait(m
->timer
, &tv
);
1406 if (m
->ready
.count
!= 0 || (tw
&& !timercmp(tw
, &zerotime
, >)))
1409 if (!tw
&& m
->handler
.pfdcount
== 0) { /* die */
1410 pthread_mutex_unlock(&m
->mtx
);
1416 * Copy pollfd array + # active pollfds in it. Not necessary to
1417 * copy the array size as this is fixed.
1419 m
->handler
.copycount
= m
->handler
.pfdcount
;
1420 memcpy(m
->handler
.copy
, m
->handler
.pfds
,
1421 m
->handler
.copycount
* sizeof(struct pollfd
));
1423 pthread_mutex_unlock(&m
->mtx
);
1425 num
= fd_poll(m
, m
->handler
.copy
, m
->handler
.pfdsize
,
1426 m
->handler
.copycount
, tw
);
1428 pthread_mutex_lock(&m
->mtx
);
1430 /* Handle any errors received in poll() */
1432 if (errno
== EINTR
) {
1433 pthread_mutex_unlock(&m
->mtx
);
1434 /* loop around to signal handler */
1439 zlog_warn("poll() error: %s", safe_strerror(errno
));
1440 pthread_mutex_unlock(&m
->mtx
);
1445 /* Post timers to ready queue. */
1447 thread_process_timers(m
->timer
, &now
);
1449 /* Post I/O to ready queue. */
1451 thread_process_io(m
, num
);
1453 pthread_mutex_unlock(&m
->mtx
);
1455 } while (!thread
&& m
->spin
);
1460 static unsigned long timeval_elapsed(struct timeval a
, struct timeval b
)
1462 return (((a
.tv_sec
- b
.tv_sec
) * TIMER_SECOND_MICRO
)
1463 + (a
.tv_usec
- b
.tv_usec
));
1466 unsigned long thread_consumed_time(RUSAGE_T
*now
, RUSAGE_T
*start
,
1467 unsigned long *cputime
)
1469 /* This is 'user + sys' time. */
1470 *cputime
= timeval_elapsed(now
->cpu
.ru_utime
, start
->cpu
.ru_utime
)
1471 + timeval_elapsed(now
->cpu
.ru_stime
, start
->cpu
.ru_stime
);
1472 return timeval_elapsed(now
->real
, start
->real
);
1475 /* We should aim to yield after yield milliseconds, which defaults
1476 to THREAD_YIELD_TIME_SLOT .
1477 Note: we are using real (wall clock) time for this calculation.
1478 It could be argued that CPU time may make more sense in certain
1479 contexts. The things to consider are whether the thread may have
1480 blocked (in which case wall time increases, but CPU time does not),
1481 or whether the system is heavily loaded with other processes competing
1482 for CPU time. On balance, wall clock time seems to make sense.
1483 Plus it has the added benefit that gettimeofday should be faster
1484 than calling getrusage. */
1485 int thread_should_yield(struct thread
*thread
)
1488 pthread_mutex_lock(&thread
->mtx
);
1490 result
= monotime_since(&thread
->real
, NULL
)
1491 > (int64_t)thread
->yield
;
1493 pthread_mutex_unlock(&thread
->mtx
);
1497 void thread_set_yield_time(struct thread
*thread
, unsigned long yield_time
)
1499 pthread_mutex_lock(&thread
->mtx
);
1501 thread
->yield
= yield_time
;
1503 pthread_mutex_unlock(&thread
->mtx
);
1506 void thread_getrusage(RUSAGE_T
*r
)
1509 getrusage(RUSAGE_SELF
, &(r
->cpu
));
1515 * This function will atomically update the thread's usage history. At present
1516 * this is the only spot where usage history is written. Nevertheless the code
1517 * has been written such that the introduction of writers in the future should
1518 * not need to update it provided the writers atomically perform only the
1519 * operations done here, i.e. updating the total and maximum times. In
1520 * particular, the maximum real and cpu times must be monotonically increasing
1521 * or this code is not correct.
1523 void thread_call(struct thread
*thread
)
1525 _Atomic
unsigned long realtime
, cputime
;
1527 unsigned long helper
;
1528 RUSAGE_T before
, after
;
1531 thread
->real
= before
.real
;
1533 pthread_setspecific(thread_current
, thread
);
1534 (*thread
->func
)(thread
);
1535 pthread_setspecific(thread_current
, NULL
);
1539 realtime
= thread_consumed_time(&after
, &before
, &helper
);
1542 /* update realtime */
1543 atomic_fetch_add_explicit(&thread
->hist
->real
.total
, realtime
,
1544 memory_order_seq_cst
);
1545 exp
= atomic_load_explicit(&thread
->hist
->real
.max
,
1546 memory_order_seq_cst
);
1547 while (exp
< realtime
1548 && !atomic_compare_exchange_weak_explicit(
1549 &thread
->hist
->real
.max
, &exp
, realtime
,
1550 memory_order_seq_cst
, memory_order_seq_cst
))
1553 /* update cputime */
1554 atomic_fetch_add_explicit(&thread
->hist
->cpu
.total
, cputime
,
1555 memory_order_seq_cst
);
1556 exp
= atomic_load_explicit(&thread
->hist
->cpu
.max
,
1557 memory_order_seq_cst
);
1558 while (exp
< cputime
1559 && !atomic_compare_exchange_weak_explicit(
1560 &thread
->hist
->cpu
.max
, &exp
, cputime
,
1561 memory_order_seq_cst
, memory_order_seq_cst
))
1564 atomic_fetch_add_explicit(&thread
->hist
->total_calls
, 1,
1565 memory_order_seq_cst
);
1566 atomic_fetch_or_explicit(&thread
->hist
->types
, 1 << thread
->add_type
,
1567 memory_order_seq_cst
);
1569 #ifdef CONSUMED_TIME_CHECK
1570 if (realtime
> CONSUMED_TIME_CHECK
) {
1572 * We have a CPU Hog on our hands.
1573 * Whinge about it now, so we're aware this is yet another task
1577 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1578 thread
->funcname
, (unsigned long)thread
->func
,
1579 realtime
/ 1000, cputime
/ 1000);
1581 #endif /* CONSUMED_TIME_CHECK */
1584 /* Execute thread */
1585 void funcname_thread_execute(struct thread_master
*m
,
1586 int (*func
)(struct thread
*), void *arg
, int val
,
1589 struct cpu_thread_history tmp
;
1590 struct thread dummy
;
1592 memset(&dummy
, 0, sizeof(struct thread
));
1594 pthread_mutex_init(&dummy
.mtx
, NULL
);
1595 dummy
.type
= THREAD_EVENT
;
1596 dummy
.add_type
= THREAD_EXECUTE
;
1597 dummy
.master
= NULL
;
1601 tmp
.func
= dummy
.func
= func
;
1602 tmp
.funcname
= dummy
.funcname
= funcname
;
1603 dummy
.hist
= hash_get(m
->cpu_record
, &tmp
,
1604 (void *(*)(void *))cpu_record_hash_alloc
);
1606 dummy
.schedfrom
= schedfrom
;
1607 dummy
.schedfrom_line
= fromln
;
1609 thread_call(&dummy
);