1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
4 * This file is part of GNU Zebra.
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 #include <sys/resource.h>
36 DEFINE_MTYPE_STATIC(LIB
, THREAD
, "Thread")
37 DEFINE_MTYPE_STATIC(LIB
, THREAD_MASTER
, "Thread master")
38 DEFINE_MTYPE_STATIC(LIB
, THREAD_STATS
, "Thread stats")
40 #if defined(__APPLE__)
41 #include <mach/mach.h>
42 #include <mach/mach_time.h>
47 static unsigned char wakebyte = 0x01; \
48 write(m->io_pipe[1], &wakebyte, 1); \
51 /* control variable for initializer */
52 pthread_once_t init_once
= PTHREAD_ONCE_INIT
;
53 pthread_key_t thread_current
;
55 pthread_mutex_t masters_mtx
= PTHREAD_MUTEX_INITIALIZER
;
56 static struct list
*masters
;
59 /* CLI start ---------------------------------------------------------------- */
60 static unsigned int cpu_record_hash_key(struct cpu_thread_history
*a
)
62 int size
= sizeof (&a
->func
);
64 return jhash(&a
->func
, size
, 0);
67 static int cpu_record_hash_cmp(const struct cpu_thread_history
*a
,
68 const struct cpu_thread_history
*b
)
70 return a
->func
== b
->func
;
73 static void *cpu_record_hash_alloc(struct cpu_thread_history
*a
)
75 struct cpu_thread_history
*new;
76 new = XCALLOC(MTYPE_THREAD_STATS
, sizeof(struct cpu_thread_history
));
78 new->funcname
= a
->funcname
;
82 static void cpu_record_hash_free(void *a
)
84 struct cpu_thread_history
*hist
= a
;
86 XFREE(MTYPE_THREAD_STATS
, hist
);
89 static void vty_out_cpu_thread_history(struct vty
*vty
,
90 struct cpu_thread_history
*a
)
92 vty_out(vty
, "%5d %10ld.%03ld %9d %8ld %9ld %8ld %9ld", a
->total_active
,
93 a
->cpu
.total
/ 1000, a
->cpu
.total
% 1000, a
->total_calls
,
94 a
->cpu
.total
/ a
->total_calls
, a
->cpu
.max
,
95 a
->real
.total
/ a
->total_calls
, a
->real
.max
);
96 vty_out(vty
, " %c%c%c%c%c %s\n",
97 a
->types
& (1 << THREAD_READ
) ? 'R' : ' ',
98 a
->types
& (1 << THREAD_WRITE
) ? 'W' : ' ',
99 a
->types
& (1 << THREAD_TIMER
) ? 'T' : ' ',
100 a
->types
& (1 << THREAD_EVENT
) ? 'E' : ' ',
101 a
->types
& (1 << THREAD_EXECUTE
) ? 'X' : ' ', a
->funcname
);
104 static void cpu_record_hash_print(struct hash_backet
*bucket
, void *args
[])
106 struct cpu_thread_history
*totals
= args
[0];
107 struct vty
*vty
= args
[1];
108 thread_type
*filter
= args
[2];
110 struct cpu_thread_history
*a
= bucket
->data
;
112 if (!(a
->types
& *filter
))
114 vty_out_cpu_thread_history(vty
, a
);
115 totals
->total_active
+= a
->total_active
;
116 totals
->total_calls
+= a
->total_calls
;
117 totals
->real
.total
+= a
->real
.total
;
118 if (totals
->real
.max
< a
->real
.max
)
119 totals
->real
.max
= a
->real
.max
;
120 totals
->cpu
.total
+= a
->cpu
.total
;
121 if (totals
->cpu
.max
< a
->cpu
.max
)
122 totals
->cpu
.max
= a
->cpu
.max
;
125 static void cpu_record_print(struct vty
*vty
, thread_type filter
)
127 struct cpu_thread_history tmp
;
128 void *args
[3] = {&tmp
, vty
, &filter
};
129 struct thread_master
*m
;
132 memset(&tmp
, 0, sizeof tmp
);
133 tmp
.funcname
= "TOTAL";
136 pthread_mutex_lock(&masters_mtx
);
138 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
139 const char *name
= m
->name
? m
->name
: "main";
141 char underline
[strlen(name
) + 1];
142 memset(underline
, '-', sizeof(underline
));
143 underline
[sizeof(underline
)] = '\0';
146 vty_out(vty
, "Showing statistics for pthread %s\n",
148 vty_out(vty
, "-------------------------------%s\n",
150 vty_out(vty
, "%21s %18s %18s\n", "",
151 "CPU (user+system):", "Real (wall-clock):");
153 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
154 vty_out(vty
, " Avg uSec Max uSecs");
155 vty_out(vty
, " Type Thread\n");
157 if (m
->cpu_record
->count
)
160 (void (*)(struct hash_backet
*,
161 void *))cpu_record_hash_print
,
164 vty_out(vty
, "No data to display yet.\n");
169 pthread_mutex_unlock(&masters_mtx
);
172 vty_out(vty
, "Total thread statistics\n");
173 vty_out(vty
, "-------------------------\n");
174 vty_out(vty
, "%21s %18s %18s\n", "",
175 "CPU (user+system):", "Real (wall-clock):");
176 vty_out(vty
, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
177 vty_out(vty
, " Avg uSec Max uSecs");
178 vty_out(vty
, " Type Thread\n");
180 if (tmp
.total_calls
> 0)
181 vty_out_cpu_thread_history(vty
, &tmp
);
184 static void cpu_record_hash_clear(struct hash_backet
*bucket
, void *args
[])
186 thread_type
*filter
= args
[0];
187 struct hash
*cpu_record
= args
[1];
189 struct cpu_thread_history
*a
= bucket
->data
;
191 if (!(a
->types
& *filter
))
194 hash_release(cpu_record
, bucket
->data
);
197 static void cpu_record_clear(thread_type filter
)
199 thread_type
*tmp
= &filter
;
200 struct thread_master
*m
;
203 pthread_mutex_lock(&masters_mtx
);
205 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
206 pthread_mutex_lock(&m
->mtx
);
208 void *args
[2] = {tmp
, m
->cpu_record
};
211 (void (*)(struct hash_backet
*,
212 void *))cpu_record_hash_clear
,
215 pthread_mutex_unlock(&m
->mtx
);
218 pthread_mutex_unlock(&masters_mtx
);
221 static thread_type
parse_filter(const char *filterstr
)
226 while (filterstr
[i
] != '\0') {
227 switch (filterstr
[i
]) {
230 filter
|= (1 << THREAD_READ
);
234 filter
|= (1 << THREAD_WRITE
);
238 filter
|= (1 << THREAD_TIMER
);
242 filter
|= (1 << THREAD_EVENT
);
246 filter
|= (1 << THREAD_EXECUTE
);
256 DEFUN (show_thread_cpu
,
258 "show thread cpu [FILTER]",
260 "Thread information\n"
262 "Display filter (rwtexb)\n")
264 thread_type filter
= (thread_type
)-1U;
267 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
268 filter
= parse_filter(argv
[idx
]->arg
);
271 "Invalid filter \"%s\" specified; must contain at least"
278 cpu_record_print(vty
, filter
);
282 DEFUN (clear_thread_cpu
,
283 clear_thread_cpu_cmd
,
284 "clear thread cpu [FILTER]",
285 "Clear stored data in all pthreads\n"
286 "Thread information\n"
288 "Display filter (rwtexb)\n")
290 thread_type filter
= (thread_type
)-1U;
293 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
294 filter
= parse_filter(argv
[idx
]->arg
);
297 "Invalid filter \"%s\" specified; must contain at least"
304 cpu_record_clear(filter
);
308 void thread_cmd_init(void)
310 install_element(VIEW_NODE
, &show_thread_cpu_cmd
);
311 install_element(ENABLE_NODE
, &clear_thread_cpu_cmd
);
313 /* CLI end ------------------------------------------------------------------ */
316 static int thread_timer_cmp(void *a
, void *b
)
318 struct thread
*thread_a
= a
;
319 struct thread
*thread_b
= b
;
321 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, <))
323 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, >))
328 static void thread_timer_update(void *node
, int actual_position
)
330 struct thread
*thread
= node
;
332 thread
->index
= actual_position
;
335 static void cancelreq_del(void *cr
)
337 XFREE(MTYPE_TMP
, cr
);
340 /* initializer, only ever called once */
341 static void initializer()
343 pthread_key_create(&thread_current
, NULL
);
346 /* Allocate new thread master. */
347 struct thread_master
*thread_master_create(const char *name
)
349 struct thread_master
*rv
;
352 pthread_once(&init_once
, &initializer
);
354 rv
= XCALLOC(MTYPE_THREAD_MASTER
, sizeof(struct thread_master
));
358 /* Initialize master mutex */
359 pthread_mutex_init(&rv
->mtx
, NULL
);
360 pthread_cond_init(&rv
->cancel_cond
, NULL
);
363 rv
->name
= name
? XSTRDUP(MTYPE_THREAD_MASTER
, name
) : NULL
;
365 /* Initialize I/O task data structures */
366 getrlimit(RLIMIT_NOFILE
, &limit
);
367 rv
->fd_limit
= (int)limit
.rlim_cur
;
369 XCALLOC(MTYPE_THREAD
, sizeof(struct thread
*) * rv
->fd_limit
);
370 if (rv
->read
== NULL
) {
371 XFREE(MTYPE_THREAD_MASTER
, rv
);
375 XCALLOC(MTYPE_THREAD
, sizeof(struct thread
*) * rv
->fd_limit
);
376 if (rv
->write
== NULL
) {
377 XFREE(MTYPE_THREAD
, rv
->read
);
378 XFREE(MTYPE_THREAD_MASTER
, rv
);
382 rv
->cpu_record
= hash_create_size(
384 (unsigned int (*)(void *))cpu_record_hash_key
,
385 (int (*)(const void *, const void *))cpu_record_hash_cmp
,
389 /* Initialize the timer queues */
390 rv
->timer
= pqueue_create();
391 rv
->timer
->cmp
= thread_timer_cmp
;
392 rv
->timer
->update
= thread_timer_update
;
394 /* Initialize thread_fetch() settings */
396 rv
->handle_signals
= true;
398 /* Set pthread owner, should be updated by actual owner */
399 rv
->owner
= pthread_self();
400 rv
->cancel_req
= list_new();
401 rv
->cancel_req
->del
= cancelreq_del
;
404 /* Initialize pipe poker */
406 set_nonblocking(rv
->io_pipe
[0]);
407 set_nonblocking(rv
->io_pipe
[1]);
409 /* Initialize data structures for poll() */
410 rv
->handler
.pfdsize
= rv
->fd_limit
;
411 rv
->handler
.pfdcount
= 0;
412 rv
->handler
.pfds
= XCALLOC(MTYPE_THREAD_MASTER
,
413 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
414 rv
->handler
.copy
= XCALLOC(MTYPE_THREAD_MASTER
,
415 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
417 /* add to list of threadmasters */
418 pthread_mutex_lock(&masters_mtx
);
421 masters
= list_new();
423 listnode_add(masters
, rv
);
425 pthread_mutex_unlock(&masters_mtx
);
430 /* Add a new thread to the list. */
431 static void thread_list_add(struct thread_list
*list
, struct thread
*thread
)
434 thread
->prev
= list
->tail
;
436 list
->tail
->next
= thread
;
443 /* Delete a thread from the list. */
444 static struct thread
*thread_list_delete(struct thread_list
*list
,
445 struct thread
*thread
)
448 thread
->next
->prev
= thread
->prev
;
450 list
->tail
= thread
->prev
;
452 thread
->prev
->next
= thread
->next
;
454 list
->head
= thread
->next
;
455 thread
->next
= thread
->prev
= NULL
;
460 /* Thread list is empty or not. */
461 static int thread_empty(struct thread_list
*list
)
463 return list
->head
? 0 : 1;
466 /* Delete top of the list and return it. */
467 static struct thread
*thread_trim_head(struct thread_list
*list
)
469 if (!thread_empty(list
))
470 return thread_list_delete(list
, list
->head
);
474 /* Move thread to unuse list. */
475 static void thread_add_unuse(struct thread_master
*m
, struct thread
*thread
)
477 assert(m
!= NULL
&& thread
!= NULL
);
478 assert(thread
->next
== NULL
);
479 assert(thread
->prev
== NULL
);
482 thread
->type
= THREAD_UNUSED
;
483 thread
->hist
->total_active
--;
484 thread_list_add(&m
->unuse
, thread
);
487 /* Free all unused thread. */
488 static void thread_list_free(struct thread_master
*m
, struct thread_list
*list
)
493 for (t
= list
->head
; t
; t
= next
) {
495 XFREE(MTYPE_THREAD
, t
);
501 static void thread_array_free(struct thread_master
*m
,
502 struct thread
**thread_array
)
507 for (index
= 0; index
< m
->fd_limit
; ++index
) {
508 t
= thread_array
[index
];
510 thread_array
[index
] = NULL
;
511 XFREE(MTYPE_THREAD
, t
);
515 XFREE(MTYPE_THREAD
, thread_array
);
518 static void thread_queue_free(struct thread_master
*m
, struct pqueue
*queue
)
522 for (i
= 0; i
< queue
->size
; i
++)
523 XFREE(MTYPE_THREAD
, queue
->array
[i
]);
525 m
->alloc
-= queue
->size
;
526 pqueue_delete(queue
);
530 * thread_master_free_unused
532 * As threads are finished with they are put on the
533 * unuse list for later reuse.
534 * If we are shutting down, Free up unused threads
535 * So we can see if we forget to shut anything off
537 void thread_master_free_unused(struct thread_master
*m
)
539 pthread_mutex_lock(&m
->mtx
);
542 while ((t
= thread_trim_head(&m
->unuse
)) != NULL
) {
543 pthread_mutex_destroy(&t
->mtx
);
544 XFREE(MTYPE_THREAD
, t
);
547 pthread_mutex_unlock(&m
->mtx
);
550 /* Stop thread scheduler. */
551 void thread_master_free(struct thread_master
*m
)
553 pthread_mutex_lock(&masters_mtx
);
555 listnode_delete(masters
, m
);
556 if (masters
->count
== 0) {
561 pthread_mutex_unlock(&masters_mtx
);
563 thread_array_free(m
, m
->read
);
564 thread_array_free(m
, m
->write
);
565 thread_queue_free(m
, m
->timer
);
566 thread_list_free(m
, &m
->event
);
567 thread_list_free(m
, &m
->ready
);
568 thread_list_free(m
, &m
->unuse
);
569 pthread_mutex_destroy(&m
->mtx
);
570 pthread_cond_destroy(&m
->cancel_cond
);
571 close(m
->io_pipe
[0]);
572 close(m
->io_pipe
[1]);
573 list_delete(m
->cancel_req
);
574 m
->cancel_req
= NULL
;
576 hash_clean(m
->cpu_record
, cpu_record_hash_free
);
577 hash_free(m
->cpu_record
);
578 m
->cpu_record
= NULL
;
581 XFREE(MTYPE_THREAD_MASTER
, m
->name
);
582 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.pfds
);
583 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.copy
);
584 XFREE(MTYPE_THREAD_MASTER
, m
);
587 /* Return remain time in second. */
588 unsigned long thread_timer_remain_second(struct thread
*thread
)
592 pthread_mutex_lock(&thread
->mtx
);
594 remain
= monotime_until(&thread
->u
.sands
, NULL
) / 1000000LL;
596 pthread_mutex_unlock(&thread
->mtx
);
598 return remain
< 0 ? 0 : remain
;
601 #define debugargdef const char *funcname, const char *schedfrom, int fromln
602 #define debugargpass funcname, schedfrom, fromln
604 struct timeval
thread_timer_remain(struct thread
*thread
)
606 struct timeval remain
;
607 pthread_mutex_lock(&thread
->mtx
);
609 monotime_until(&thread
->u
.sands
, &remain
);
611 pthread_mutex_unlock(&thread
->mtx
);
615 /* Get new thread. */
616 static struct thread
*thread_get(struct thread_master
*m
, u_char type
,
617 int (*func
)(struct thread
*), void *arg
,
620 struct thread
*thread
= thread_trim_head(&m
->unuse
);
621 struct cpu_thread_history tmp
;
624 thread
= XCALLOC(MTYPE_THREAD
, sizeof(struct thread
));
625 /* mutex only needs to be initialized at struct creation. */
626 pthread_mutex_init(&thread
->mtx
, NULL
);
631 thread
->add_type
= type
;
635 thread
->yield
= THREAD_YIELD_TIME_SLOT
; /* default */
639 * So if the passed in funcname is not what we have
640 * stored that means the thread->hist needs to be
641 * updated. We keep the last one around in unused
642 * under the assumption that we are probably
643 * going to immediately allocate the same
645 * This hopefully saves us some serious
648 if (thread
->funcname
!= funcname
|| thread
->func
!= func
) {
650 tmp
.funcname
= funcname
;
652 hash_get(m
->cpu_record
, &tmp
,
653 (void *(*)(void *))cpu_record_hash_alloc
);
655 thread
->hist
->total_active
++;
657 thread
->funcname
= funcname
;
658 thread
->schedfrom
= schedfrom
;
659 thread
->schedfrom_line
= fromln
;
664 static int fd_poll(struct thread_master
*m
, struct pollfd
*pfds
, nfds_t pfdsize
,
665 nfds_t count
, const struct timeval
*timer_wait
)
667 /* If timer_wait is null here, that means poll() should block
669 * unless the thread_master has overriden it by setting
670 * ->selectpoll_timeout.
671 * If the value is positive, it specifies the maximum number of
673 * to wait. If the timeout is -1, it specifies that we should never wait
675 * always return immediately even if no event is detected. If the value
677 * zero, the behavior is default. */
680 /* number of file descriptors with events */
683 if (timer_wait
!= NULL
684 && m
->selectpoll_timeout
== 0) // use the default value
685 timeout
= (timer_wait
->tv_sec
* 1000)
686 + (timer_wait
->tv_usec
/ 1000);
687 else if (m
->selectpoll_timeout
> 0) // use the user's timeout
688 timeout
= m
->selectpoll_timeout
;
689 else if (m
->selectpoll_timeout
690 < 0) // effect a poll (return immediately)
693 /* add poll pipe poker */
694 assert(count
+ 1 < pfdsize
);
695 pfds
[count
].fd
= m
->io_pipe
[0];
696 pfds
[count
].events
= POLLIN
;
697 pfds
[count
].revents
= 0x00;
699 num
= poll(pfds
, count
+ 1, timeout
);
701 unsigned char trash
[64];
702 if (num
> 0 && pfds
[count
].revents
!= 0 && num
--)
703 while (read(m
->io_pipe
[0], &trash
, sizeof(trash
)) > 0)
709 /* Add new read thread. */
710 struct thread
*funcname_thread_add_read_write(int dir
, struct thread_master
*m
,
711 int (*func
)(struct thread
*),
713 struct thread
**t_ptr
,
716 struct thread
*thread
= NULL
;
718 pthread_mutex_lock(&m
->mtx
);
721 && *t_ptr
) // thread is already scheduled; don't reschedule
723 pthread_mutex_unlock(&m
->mtx
);
727 /* default to a new pollfd */
728 nfds_t queuepos
= m
->handler
.pfdcount
;
730 /* if we already have a pollfd for our file descriptor, find and
732 for (nfds_t i
= 0; i
< m
->handler
.pfdcount
; i
++)
733 if (m
->handler
.pfds
[i
].fd
== fd
) {
738 /* make sure we have room for this fd + pipe poker fd */
739 assert(queuepos
+ 1 < m
->handler
.pfdsize
);
741 thread
= thread_get(m
, dir
, func
, arg
, debugargpass
);
743 m
->handler
.pfds
[queuepos
].fd
= fd
;
744 m
->handler
.pfds
[queuepos
].events
|=
745 (dir
== THREAD_READ
? POLLIN
: POLLOUT
);
747 if (queuepos
== m
->handler
.pfdcount
)
748 m
->handler
.pfdcount
++;
751 pthread_mutex_lock(&thread
->mtx
);
754 if (dir
== THREAD_READ
)
755 m
->read
[thread
->u
.fd
] = thread
;
757 m
->write
[thread
->u
.fd
] = thread
;
759 pthread_mutex_unlock(&thread
->mtx
);
769 pthread_mutex_unlock(&m
->mtx
);
774 static struct thread
*
775 funcname_thread_add_timer_timeval(struct thread_master
*m
,
776 int (*func
)(struct thread
*), int type
,
777 void *arg
, struct timeval
*time_relative
,
778 struct thread
**t_ptr
, debugargdef
)
780 struct thread
*thread
;
781 struct pqueue
*queue
;
785 assert(type
== THREAD_TIMER
);
786 assert(time_relative
);
788 pthread_mutex_lock(&m
->mtx
);
791 && *t_ptr
) // thread is already scheduled; don't reschedule
793 pthread_mutex_unlock(&m
->mtx
);
798 thread
= thread_get(m
, type
, func
, arg
, debugargpass
);
800 pthread_mutex_lock(&thread
->mtx
);
802 monotime(&thread
->u
.sands
);
803 timeradd(&thread
->u
.sands
, time_relative
,
805 pqueue_enqueue(thread
, queue
);
811 pthread_mutex_unlock(&thread
->mtx
);
815 pthread_mutex_unlock(&m
->mtx
);
821 /* Add timer event thread. */
822 struct thread
*funcname_thread_add_timer(struct thread_master
*m
,
823 int (*func
)(struct thread
*),
824 void *arg
, long timer
,
825 struct thread
**t_ptr
, debugargdef
)
834 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
835 &trel
, t_ptr
, debugargpass
);
838 /* Add timer event thread with "millisecond" resolution */
839 struct thread
*funcname_thread_add_timer_msec(struct thread_master
*m
,
840 int (*func
)(struct thread
*),
841 void *arg
, long timer
,
842 struct thread
**t_ptr
,
849 trel
.tv_sec
= timer
/ 1000;
850 trel
.tv_usec
= 1000 * (timer
% 1000);
852 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
853 &trel
, t_ptr
, debugargpass
);
856 /* Add timer event thread with "millisecond" resolution */
857 struct thread
*funcname_thread_add_timer_tv(struct thread_master
*m
,
858 int (*func
)(struct thread
*),
859 void *arg
, struct timeval
*tv
,
860 struct thread
**t_ptr
, debugargdef
)
862 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
, tv
,
863 t_ptr
, debugargpass
);
866 /* Add simple event thread. */
867 struct thread
*funcname_thread_add_event(struct thread_master
*m
,
868 int (*func
)(struct thread
*),
870 struct thread
**t_ptr
, debugargdef
)
872 struct thread
*thread
;
876 pthread_mutex_lock(&m
->mtx
);
879 && *t_ptr
) // thread is already scheduled; don't reschedule
881 pthread_mutex_unlock(&m
->mtx
);
885 thread
= thread_get(m
, THREAD_EVENT
, func
, arg
, debugargpass
);
886 pthread_mutex_lock(&thread
->mtx
);
889 thread_list_add(&m
->event
, thread
);
891 pthread_mutex_unlock(&thread
->mtx
);
900 pthread_mutex_unlock(&m
->mtx
);
905 /* Thread cancellation ------------------------------------------------------ */
908 * NOT's out the .events field of pollfd corresponding to the given file
909 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
911 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
912 * implementation for details.
916 * @param state the event to cancel. One or more (OR'd together) of the
921 static void thread_cancel_rw(struct thread_master
*master
, int fd
, short state
)
923 /* Cancel POLLHUP too just in case some bozo set it */
926 /* find the index of corresponding pollfd */
929 for (i
= 0; i
< master
->handler
.pfdcount
; i
++)
930 if (master
->handler
.pfds
[i
].fd
== fd
)
934 master
->handler
.pfds
[i
].events
&= ~(state
);
936 /* If all events are canceled, delete / resize the pollfd array. */
937 if (master
->handler
.pfds
[i
].events
== 0) {
938 memmove(master
->handler
.pfds
+ i
, master
->handler
.pfds
+ i
+ 1,
939 (master
->handler
.pfdcount
- i
- 1)
940 * sizeof(struct pollfd
));
941 master
->handler
.pfdcount
--;
944 /* If we have the same pollfd in the copy, perform the same operations,
945 * otherwise return. */
946 if (i
>= master
->handler
.copycount
)
949 master
->handler
.copy
[i
].events
&= ~(state
);
951 if (master
->handler
.copy
[i
].events
== 0) {
952 memmove(master
->handler
.copy
+ i
, master
->handler
.copy
+ i
+ 1,
953 (master
->handler
.copycount
- i
- 1)
954 * sizeof(struct pollfd
));
955 master
->handler
.copycount
--;
960 * Process cancellation requests.
962 * This may only be run from the pthread which owns the thread_master.
964 * @param master the thread master to process
965 * @REQUIRE master->mtx
967 static void do_thread_cancel(struct thread_master
*master
)
969 struct thread_list
*list
= NULL
;
970 struct pqueue
*queue
= NULL
;
971 struct thread
**thread_array
= NULL
;
972 struct thread
*thread
;
974 struct cancel_req
*cr
;
976 for (ALL_LIST_ELEMENTS_RO(master
->cancel_req
, ln
, cr
)) {
977 /* If this is an event object cancellation, linear search
979 * list deleting any events which have the specified argument.
981 * need to check every thread in the ready queue. */
984 thread
= master
->event
.head
;
990 if (t
->arg
== cr
->eventobj
) {
991 thread_list_delete(&master
->event
, t
);
994 thread_add_unuse(master
, t
);
998 thread
= master
->ready
.head
;
1003 if (t
->arg
== cr
->eventobj
) {
1004 thread_list_delete(&master
->ready
, t
);
1007 thread_add_unuse(master
, t
);
1013 /* The pointer varies depending on whether the cancellation
1015 * made asynchronously or not. If it was, we need to check
1017 * thread even exists anymore before cancelling it. */
1018 thread
= (cr
->thread
) ? cr
->thread
: *cr
->threadref
;
1023 /* Determine the appropriate queue to cancel the thread from */
1024 switch (thread
->type
) {
1026 thread_cancel_rw(master
, thread
->u
.fd
, POLLIN
);
1027 thread_array
= master
->read
;
1030 thread_cancel_rw(master
, thread
->u
.fd
, POLLOUT
);
1031 thread_array
= master
->write
;
1034 queue
= master
->timer
;
1037 list
= &master
->event
;
1040 list
= &master
->ready
;
1048 assert(thread
->index
>= 0);
1049 pqueue_remove(thread
, queue
);
1051 thread_list_delete(list
, thread
);
1052 } else if (thread_array
) {
1053 thread_array
[thread
->u
.fd
] = NULL
;
1055 assert(!"Thread should be either in queue or list or array!");
1059 *thread
->ref
= NULL
;
1061 thread_add_unuse(thread
->master
, thread
);
1064 /* Delete and free all cancellation requests */
1065 list_delete_all_node(master
->cancel_req
);
1067 /* Wake up any threads which may be blocked in thread_cancel_async() */
1068 master
->canceled
= true;
1069 pthread_cond_broadcast(&master
->cancel_cond
);
1073 * Cancel any events which have the specified argument.
1077 * @param m the thread_master to cancel from
1078 * @param arg the argument passed when creating the event
1080 void thread_cancel_event(struct thread_master
*master
, void *arg
)
1082 assert(master
->owner
== pthread_self());
1084 pthread_mutex_lock(&master
->mtx
);
1086 struct cancel_req
*cr
=
1087 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1089 listnode_add(master
->cancel_req
, cr
);
1090 do_thread_cancel(master
);
1092 pthread_mutex_unlock(&master
->mtx
);
1096 * Cancel a specific task.
1100 * @param thread task to cancel
1102 void thread_cancel(struct thread
*thread
)
1104 assert(thread
->master
->owner
== pthread_self());
1106 pthread_mutex_lock(&thread
->master
->mtx
);
1108 struct cancel_req
*cr
=
1109 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1110 cr
->thread
= thread
;
1111 listnode_add(thread
->master
->cancel_req
, cr
);
1112 do_thread_cancel(thread
->master
);
1114 pthread_mutex_unlock(&thread
->master
->mtx
);
1118 * Asynchronous cancellation.
1120 * Called with either a struct thread ** or void * to an event argument,
1121 * this function posts the correct cancellation request and blocks until it is
1124 * If the thread is currently running, execution blocks until it completes.
1126 * The last two parameters are mutually exclusive, i.e. if you pass one the
1127 * other must be NULL.
1129 * When the cancellation procedure executes on the target thread_master, the
1130 * thread * provided is checked for nullity. If it is null, the thread is
1131 * assumed to no longer exist and the cancellation request is a no-op. Thus
1132 * users of this API must pass a back-reference when scheduling the original
1137 * @param master the thread master with the relevant event / task
1138 * @param thread pointer to thread to cancel
1139 * @param eventobj the event
1141 void thread_cancel_async(struct thread_master
*master
, struct thread
**thread
,
1144 assert(!(thread
&& eventobj
) && (thread
|| eventobj
));
1145 assert(master
->owner
!= pthread_self());
1147 pthread_mutex_lock(&master
->mtx
);
1149 master
->canceled
= false;
1152 struct cancel_req
*cr
=
1153 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1154 cr
->threadref
= thread
;
1155 listnode_add(master
->cancel_req
, cr
);
1156 } else if (eventobj
) {
1157 struct cancel_req
*cr
=
1158 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1159 cr
->eventobj
= eventobj
;
1160 listnode_add(master
->cancel_req
, cr
);
1164 while (!master
->canceled
)
1165 pthread_cond_wait(&master
->cancel_cond
, &master
->mtx
);
1167 pthread_mutex_unlock(&master
->mtx
);
1169 /* ------------------------------------------------------------------------- */
1171 static struct timeval
*thread_timer_wait(struct pqueue
*queue
,
1172 struct timeval
*timer_val
)
1175 struct thread
*next_timer
= queue
->array
[0];
1176 monotime_until(&next_timer
->u
.sands
, timer_val
);
1182 static struct thread
*thread_run(struct thread_master
*m
, struct thread
*thread
,
1183 struct thread
*fetch
)
1186 thread_add_unuse(m
, thread
);
1190 static int thread_process_io_helper(struct thread_master
*m
,
1191 struct thread
*thread
, short state
, int pos
)
1193 struct thread
**thread_array
;
1198 if (thread
->type
== THREAD_READ
)
1199 thread_array
= m
->read
;
1201 thread_array
= m
->write
;
1203 thread_array
[thread
->u
.fd
] = NULL
;
1204 thread_list_add(&m
->ready
, thread
);
1205 thread
->type
= THREAD_READY
;
1206 /* if another pthread scheduled this file descriptor for the event we're
1207 * responding to, no problem; we're getting to it now */
1208 thread
->master
->handler
.pfds
[pos
].events
&= ~(state
);
1213 * Process I/O events.
1215 * Walks through file descriptor array looking for those pollfds whose .revents
1216 * field has something interesting. Deletes any invalid file descriptors.
1218 * @param m the thread master
1219 * @param num the number of active file descriptors (return value of poll())
1221 static void thread_process_io(struct thread_master
*m
, unsigned int num
)
1223 unsigned int ready
= 0;
1224 struct pollfd
*pfds
= m
->handler
.copy
;
1226 for (nfds_t i
= 0; i
< m
->handler
.copycount
&& ready
< num
; ++i
) {
1227 /* no event for current fd? immediately continue */
1228 if (pfds
[i
].revents
== 0)
1233 /* Unless someone has called thread_cancel from another pthread,
1235 * thing that could have changed in m->handler.pfds while we
1237 * asleep is the .events field in a given pollfd. Barring
1239 * that value should be a superset of the values we have in our
1241 * there's no need to update it. Similarily, barring deletion,
1243 * should still be a valid index into the master's pfds. */
1244 if (pfds
[i
].revents
& (POLLIN
| POLLHUP
))
1245 thread_process_io_helper(m
, m
->read
[pfds
[i
].fd
], POLLIN
,
1247 if (pfds
[i
].revents
& POLLOUT
)
1248 thread_process_io_helper(m
, m
->write
[pfds
[i
].fd
],
1251 /* if one of our file descriptors is garbage, remove the same
1253 * both pfds + update sizes and index */
1254 if (pfds
[i
].revents
& POLLNVAL
) {
1255 memmove(m
->handler
.pfds
+ i
, m
->handler
.pfds
+ i
+ 1,
1256 (m
->handler
.pfdcount
- i
- 1)
1257 * sizeof(struct pollfd
));
1258 m
->handler
.pfdcount
--;
1260 memmove(pfds
+ i
, pfds
+ i
+ 1,
1261 (m
->handler
.copycount
- i
- 1)
1262 * sizeof(struct pollfd
));
1263 m
->handler
.copycount
--;
1270 /* Add all timers that have popped to the ready list. */
1271 static unsigned int thread_process_timers(struct pqueue
*queue
,
1272 struct timeval
*timenow
)
1274 struct thread
*thread
;
1275 unsigned int ready
= 0;
1277 while (queue
->size
) {
1278 thread
= queue
->array
[0];
1279 if (timercmp(timenow
, &thread
->u
.sands
, <))
1281 pqueue_dequeue(queue
);
1282 thread
->type
= THREAD_READY
;
1283 thread_list_add(&thread
->master
->ready
, thread
);
1289 /* process a list en masse, e.g. for event thread lists */
1290 static unsigned int thread_process(struct thread_list
*list
)
1292 struct thread
*thread
;
1293 struct thread
*next
;
1294 unsigned int ready
= 0;
1296 for (thread
= list
->head
; thread
; thread
= next
) {
1297 next
= thread
->next
;
1298 thread_list_delete(list
, thread
);
1299 thread
->type
= THREAD_READY
;
1300 thread_list_add(&thread
->master
->ready
, thread
);
1307 /* Fetch next ready thread. */
1308 struct thread
*thread_fetch(struct thread_master
*m
, struct thread
*fetch
)
1310 struct thread
*thread
= NULL
;
1312 struct timeval zerotime
= {0, 0};
1314 struct timeval
*tw
= NULL
;
1319 /* Handle signals if any */
1320 if (m
->handle_signals
)
1321 quagga_sigevent_process();
1323 pthread_mutex_lock(&m
->mtx
);
1325 /* Process any pending cancellation requests */
1326 do_thread_cancel(m
);
1329 * Attempt to flush ready queue before going into poll().
1330 * This is performance-critical. Think twice before modifying.
1332 if ((thread
= thread_trim_head(&m
->ready
))) {
1333 fetch
= thread_run(m
, thread
, fetch
);
1336 pthread_mutex_unlock(&m
->mtx
);
1340 /* otherwise, tick through scheduling sequence */
1343 * Post events to ready queue. This must come before the
1344 * following block since events should occur immediately
1346 thread_process(&m
->event
);
1349 * If there are no tasks on the ready queue, we will poll()
1350 * until a timer expires or we receive I/O, whichever comes
1351 * first. The strategy for doing this is:
1353 * - If there are events pending, set the poll() timeout to zero
1354 * - If there are no events pending, but there are timers
1356 * timeout to the smallest remaining time on any timer
1357 * - If there are neither timers nor events pending, but there
1359 * descriptors pending, block indefinitely in poll()
1360 * - If nothing is pending, it's time for the application to die
1362 * In every case except the last, we need to hit poll() at least
1363 * once per loop to avoid starvation by events
1365 if (m
->ready
.count
== 0)
1366 tw
= thread_timer_wait(m
->timer
, &tv
);
1368 if (m
->ready
.count
!= 0 || (tw
&& !timercmp(tw
, &zerotime
, >)))
1371 if (!tw
&& m
->handler
.pfdcount
== 0) { /* die */
1372 pthread_mutex_unlock(&m
->mtx
);
1378 * Copy pollfd array + # active pollfds in it. Not necessary to
1379 * copy the array size as this is fixed.
1381 m
->handler
.copycount
= m
->handler
.pfdcount
;
1382 memcpy(m
->handler
.copy
, m
->handler
.pfds
,
1383 m
->handler
.copycount
* sizeof(struct pollfd
));
1385 pthread_mutex_unlock(&m
->mtx
);
1387 num
= fd_poll(m
, m
->handler
.copy
, m
->handler
.pfdsize
,
1388 m
->handler
.copycount
, tw
);
1390 pthread_mutex_lock(&m
->mtx
);
1392 /* Handle any errors received in poll() */
1394 if (errno
== EINTR
) {
1395 pthread_mutex_unlock(&m
->mtx
);
1396 /* loop around to signal handler */
1401 zlog_warn("poll() error: %s", safe_strerror(errno
));
1402 pthread_mutex_unlock(&m
->mtx
);
1407 /* Post timers to ready queue. */
1409 thread_process_timers(m
->timer
, &now
);
1411 /* Post I/O to ready queue. */
1413 thread_process_io(m
, num
);
1415 pthread_mutex_unlock(&m
->mtx
);
1417 } while (!thread
&& m
->spin
);
1422 static unsigned long timeval_elapsed(struct timeval a
, struct timeval b
)
1424 return (((a
.tv_sec
- b
.tv_sec
) * TIMER_SECOND_MICRO
)
1425 + (a
.tv_usec
- b
.tv_usec
));
1428 unsigned long thread_consumed_time(RUSAGE_T
*now
, RUSAGE_T
*start
,
1429 unsigned long *cputime
)
1431 /* This is 'user + sys' time. */
1432 *cputime
= timeval_elapsed(now
->cpu
.ru_utime
, start
->cpu
.ru_utime
)
1433 + timeval_elapsed(now
->cpu
.ru_stime
, start
->cpu
.ru_stime
);
1434 return timeval_elapsed(now
->real
, start
->real
);
1437 /* We should aim to yield after yield milliseconds, which defaults
1438 to THREAD_YIELD_TIME_SLOT .
1439 Note: we are using real (wall clock) time for this calculation.
1440 It could be argued that CPU time may make more sense in certain
1441 contexts. The things to consider are whether the thread may have
1442 blocked (in which case wall time increases, but CPU time does not),
1443 or whether the system is heavily loaded with other processes competing
1444 for CPU time. On balance, wall clock time seems to make sense.
1445 Plus it has the added benefit that gettimeofday should be faster
1446 than calling getrusage. */
1447 int thread_should_yield(struct thread
*thread
)
1450 pthread_mutex_lock(&thread
->mtx
);
1452 result
= monotime_since(&thread
->real
, NULL
)
1453 > (int64_t)thread
->yield
;
1455 pthread_mutex_unlock(&thread
->mtx
);
1459 void thread_set_yield_time(struct thread
*thread
, unsigned long yield_time
)
1461 pthread_mutex_lock(&thread
->mtx
);
1463 thread
->yield
= yield_time
;
1465 pthread_mutex_unlock(&thread
->mtx
);
1468 void thread_getrusage(RUSAGE_T
*r
)
1471 getrusage(RUSAGE_SELF
, &(r
->cpu
));
1474 /* We check thread consumed time. If the system has getrusage, we'll
1475 use that to get in-depth stats on the performance of the thread in addition
1476 to wall clock time stats from gettimeofday. */
1477 void thread_call(struct thread
*thread
)
1479 unsigned long realtime
, cputime
;
1480 RUSAGE_T before
, after
;
1483 thread
->real
= before
.real
;
1485 pthread_setspecific(thread_current
, thread
);
1486 (*thread
->func
)(thread
);
1487 pthread_setspecific(thread_current
, NULL
);
1491 realtime
= thread_consumed_time(&after
, &before
, &cputime
);
1492 thread
->hist
->real
.total
+= realtime
;
1493 if (thread
->hist
->real
.max
< realtime
)
1494 thread
->hist
->real
.max
= realtime
;
1495 thread
->hist
->cpu
.total
+= cputime
;
1496 if (thread
->hist
->cpu
.max
< cputime
)
1497 thread
->hist
->cpu
.max
= cputime
;
1499 ++(thread
->hist
->total_calls
);
1500 thread
->hist
->types
|= (1 << thread
->add_type
);
1502 #ifdef CONSUMED_TIME_CHECK
1503 if (realtime
> CONSUMED_TIME_CHECK
) {
1505 * We have a CPU Hog on our hands.
1506 * Whinge about it now, so we're aware this is yet another task
1510 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1511 thread
->funcname
, (unsigned long)thread
->func
,
1512 realtime
/ 1000, cputime
/ 1000);
1514 #endif /* CONSUMED_TIME_CHECK */
1517 /* Execute thread */
1518 void funcname_thread_execute(struct thread_master
*m
,
1519 int (*func
)(struct thread
*), void *arg
, int val
,
1522 struct cpu_thread_history tmp
;
1523 struct thread dummy
;
1525 memset(&dummy
, 0, sizeof(struct thread
));
1527 pthread_mutex_init(&dummy
.mtx
, NULL
);
1528 dummy
.type
= THREAD_EVENT
;
1529 dummy
.add_type
= THREAD_EXECUTE
;
1530 dummy
.master
= NULL
;
1534 tmp
.func
= dummy
.func
= func
;
1535 tmp
.funcname
= dummy
.funcname
= funcname
;
1536 dummy
.hist
= hash_get(m
->cpu_record
, &tmp
,
1537 (void *(*)(void *))cpu_record_hash_alloc
);
1539 dummy
.schedfrom
= schedfrom
;
1540 dummy
.schedfrom_line
= fromln
;
1542 thread_call(&dummy
);