1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
4 * This file is part of GNU Zebra.
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 #include <sys/resource.h>
35 DEFINE_MTYPE_STATIC(LIB
, THREAD
, "Thread")
36 DEFINE_MTYPE_STATIC(LIB
, THREAD_MASTER
, "Thread master")
37 DEFINE_MTYPE_STATIC(LIB
, THREAD_STATS
, "Thread stats")
39 #if defined(__APPLE__)
40 #include <mach/mach.h>
41 #include <mach/mach_time.h>
46 static unsigned char wakebyte = 0x01; \
47 write(m->io_pipe[1], &wakebyte, 1); \
50 /* control variable for initializer */
51 pthread_once_t init_once
= PTHREAD_ONCE_INIT
;
52 pthread_key_t thread_current
;
54 pthread_mutex_t masters_mtx
= PTHREAD_MUTEX_INITIALIZER
;
55 static struct list
*masters
;
58 /* CLI start ---------------------------------------------------------------- */
59 static unsigned int cpu_record_hash_key(struct cpu_thread_history
*a
)
61 return (uintptr_t)a
->func
;
64 static int cpu_record_hash_cmp(const struct cpu_thread_history
*a
,
65 const struct cpu_thread_history
*b
)
67 return a
->func
== b
->func
;
70 static void *cpu_record_hash_alloc(struct cpu_thread_history
*a
)
72 struct cpu_thread_history
*new;
73 new = XCALLOC(MTYPE_THREAD_STATS
, sizeof(struct cpu_thread_history
));
75 new->funcname
= a
->funcname
;
79 static void cpu_record_hash_free(void *a
)
81 struct cpu_thread_history
*hist
= a
;
83 XFREE(MTYPE_THREAD_STATS
, hist
);
86 static void vty_out_cpu_thread_history(struct vty
*vty
,
87 struct cpu_thread_history
*a
)
89 vty_out(vty
, "%5d %10ld.%03ld %9d %8ld %9ld %8ld %9ld", a
->total_active
,
90 a
->cpu
.total
/ 1000, a
->cpu
.total
% 1000, a
->total_calls
,
91 a
->cpu
.total
/ a
->total_calls
, a
->cpu
.max
,
92 a
->real
.total
/ a
->total_calls
, a
->real
.max
);
93 vty_out(vty
, " %c%c%c%c%c %s\n",
94 a
->types
& (1 << THREAD_READ
) ? 'R' : ' ',
95 a
->types
& (1 << THREAD_WRITE
) ? 'W' : ' ',
96 a
->types
& (1 << THREAD_TIMER
) ? 'T' : ' ',
97 a
->types
& (1 << THREAD_EVENT
) ? 'E' : ' ',
98 a
->types
& (1 << THREAD_EXECUTE
) ? 'X' : ' ', a
->funcname
);
101 static void cpu_record_hash_print(struct hash_backet
*bucket
, void *args
[])
103 struct cpu_thread_history
*totals
= args
[0];
104 struct vty
*vty
= args
[1];
105 thread_type
*filter
= args
[2];
107 struct cpu_thread_history
*a
= bucket
->data
;
109 if (!(a
->types
& *filter
))
111 vty_out_cpu_thread_history(vty
, a
);
112 totals
->total_active
+= a
->total_active
;
113 totals
->total_calls
+= a
->total_calls
;
114 totals
->real
.total
+= a
->real
.total
;
115 if (totals
->real
.max
< a
->real
.max
)
116 totals
->real
.max
= a
->real
.max
;
117 totals
->cpu
.total
+= a
->cpu
.total
;
118 if (totals
->cpu
.max
< a
->cpu
.max
)
119 totals
->cpu
.max
= a
->cpu
.max
;
122 static void cpu_record_print(struct vty
*vty
, thread_type filter
)
124 struct cpu_thread_history tmp
;
125 void *args
[3] = {&tmp
, vty
, &filter
};
126 struct thread_master
*m
;
129 memset(&tmp
, 0, sizeof tmp
);
130 tmp
.funcname
= "TOTAL";
133 pthread_mutex_lock(&masters_mtx
);
135 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
136 const char *name
= m
->name
? m
->name
: "main";
138 char underline
[strlen(name
) + 1];
139 memset(underline
, '-', sizeof(underline
));
140 underline
[sizeof(underline
)] = '\0';
143 vty_out(vty
, "Showing statistics for pthread %s\n",
145 vty_out(vty
, "-------------------------------%s\n",
147 vty_out(vty
, "%21s %18s %18s\n", "",
148 "CPU (user+system):", "Real (wall-clock):");
150 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
151 vty_out(vty
, " Avg uSec Max uSecs");
152 vty_out(vty
, " Type Thread\n");
154 if (m
->cpu_record
->count
)
157 (void (*)(struct hash_backet
*,
158 void *))cpu_record_hash_print
,
161 vty_out(vty
, "No data to display yet.\n");
166 pthread_mutex_unlock(&masters_mtx
);
169 vty_out(vty
, "Total thread statistics\n");
170 vty_out(vty
, "-------------------------\n");
171 vty_out(vty
, "%21s %18s %18s\n", "",
172 "CPU (user+system):", "Real (wall-clock):");
173 vty_out(vty
, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
174 vty_out(vty
, " Avg uSec Max uSecs");
175 vty_out(vty
, " Type Thread\n");
177 if (tmp
.total_calls
> 0)
178 vty_out_cpu_thread_history(vty
, &tmp
);
181 static void cpu_record_hash_clear(struct hash_backet
*bucket
, void *args
[])
183 thread_type
*filter
= args
[0];
184 struct hash
*cpu_record
= args
[1];
186 struct cpu_thread_history
*a
= bucket
->data
;
188 if (!(a
->types
& *filter
))
191 hash_release(cpu_record
, bucket
->data
);
194 static void cpu_record_clear(thread_type filter
)
196 thread_type
*tmp
= &filter
;
197 struct thread_master
*m
;
200 pthread_mutex_lock(&masters_mtx
);
202 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
203 pthread_mutex_lock(&m
->mtx
);
205 void *args
[2] = {tmp
, m
->cpu_record
};
208 (void (*)(struct hash_backet
*,
209 void *))cpu_record_hash_clear
,
212 pthread_mutex_unlock(&m
->mtx
);
215 pthread_mutex_unlock(&masters_mtx
);
218 static thread_type
parse_filter(const char *filterstr
)
223 while (filterstr
[i
] != '\0') {
224 switch (filterstr
[i
]) {
227 filter
|= (1 << THREAD_READ
);
231 filter
|= (1 << THREAD_WRITE
);
235 filter
|= (1 << THREAD_TIMER
);
239 filter
|= (1 << THREAD_EVENT
);
243 filter
|= (1 << THREAD_EXECUTE
);
253 DEFUN (show_thread_cpu
,
255 "show thread cpu [FILTER]",
257 "Thread information\n"
259 "Display filter (rwtexb)\n")
261 thread_type filter
= (thread_type
)-1U;
264 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
265 filter
= parse_filter(argv
[idx
]->arg
);
268 "Invalid filter \"%s\" specified; must contain at least"
275 cpu_record_print(vty
, filter
);
279 DEFUN (clear_thread_cpu
,
280 clear_thread_cpu_cmd
,
281 "clear thread cpu [FILTER]",
282 "Clear stored data in all pthreads\n"
283 "Thread information\n"
285 "Display filter (rwtexb)\n")
287 thread_type filter
= (thread_type
)-1U;
290 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
291 filter
= parse_filter(argv
[idx
]->arg
);
294 "Invalid filter \"%s\" specified; must contain at least"
301 cpu_record_clear(filter
);
305 void thread_cmd_init(void)
307 install_element(VIEW_NODE
, &show_thread_cpu_cmd
);
308 install_element(ENABLE_NODE
, &clear_thread_cpu_cmd
);
310 /* CLI end ------------------------------------------------------------------ */
313 static int thread_timer_cmp(void *a
, void *b
)
315 struct thread
*thread_a
= a
;
316 struct thread
*thread_b
= b
;
318 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, <))
320 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, >))
325 static void thread_timer_update(void *node
, int actual_position
)
327 struct thread
*thread
= node
;
329 thread
->index
= actual_position
;
332 static void cancelreq_del(void *cr
)
334 XFREE(MTYPE_TMP
, cr
);
337 /* initializer, only ever called once */
338 static void initializer()
341 masters
= list_new();
343 pthread_key_create(&thread_current
, NULL
);
346 /* Allocate new thread master. */
347 struct thread_master
*thread_master_create(const char *name
)
349 struct thread_master
*rv
;
352 pthread_once(&init_once
, &initializer
);
354 rv
= XCALLOC(MTYPE_THREAD_MASTER
, sizeof(struct thread_master
));
358 /* Initialize master mutex */
359 pthread_mutex_init(&rv
->mtx
, NULL
);
360 pthread_cond_init(&rv
->cancel_cond
, NULL
);
363 rv
->name
= name
? XSTRDUP(MTYPE_THREAD_MASTER
, name
) : NULL
;
365 /* Initialize I/O task data structures */
366 getrlimit(RLIMIT_NOFILE
, &limit
);
367 rv
->fd_limit
= (int)limit
.rlim_cur
;
369 XCALLOC(MTYPE_THREAD
, sizeof(struct thread
*) * rv
->fd_limit
);
370 if (rv
->read
== NULL
) {
371 XFREE(MTYPE_THREAD_MASTER
, rv
);
375 XCALLOC(MTYPE_THREAD
, sizeof(struct thread
*) * rv
->fd_limit
);
376 if (rv
->write
== NULL
) {
377 XFREE(MTYPE_THREAD
, rv
->read
);
378 XFREE(MTYPE_THREAD_MASTER
, rv
);
382 rv
->cpu_record
= hash_create(
383 (unsigned int (*)(void *))cpu_record_hash_key
,
384 (int (*)(const void *, const void *))cpu_record_hash_cmp
, NULL
);
387 /* Initialize the timer queues */
388 rv
->timer
= pqueue_create();
389 rv
->timer
->cmp
= thread_timer_cmp
;
390 rv
->timer
->update
= thread_timer_update
;
392 /* Initialize thread_fetch() settings */
394 rv
->handle_signals
= true;
396 /* Set pthread owner, should be updated by actual owner */
397 rv
->owner
= pthread_self();
398 rv
->cancel_req
= list_new();
399 rv
->cancel_req
->del
= cancelreq_del
;
402 /* Initialize pipe poker */
404 set_nonblocking(rv
->io_pipe
[0]);
405 set_nonblocking(rv
->io_pipe
[1]);
407 /* Initialize data structures for poll() */
408 rv
->handler
.pfdsize
= rv
->fd_limit
;
409 rv
->handler
.pfdcount
= 0;
410 rv
->handler
.pfds
= XCALLOC(MTYPE_THREAD_MASTER
,
411 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
412 rv
->handler
.copy
= XCALLOC(MTYPE_THREAD_MASTER
,
413 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
416 pthread_mutex_lock(&masters_mtx
);
418 listnode_add(masters
, rv
);
420 pthread_mutex_unlock(&masters_mtx
);
425 /* Add a new thread to the list. */
426 static void thread_list_add(struct thread_list
*list
, struct thread
*thread
)
429 thread
->prev
= list
->tail
;
431 list
->tail
->next
= thread
;
438 /* Delete a thread from the list. */
439 static struct thread
*thread_list_delete(struct thread_list
*list
,
440 struct thread
*thread
)
443 thread
->next
->prev
= thread
->prev
;
445 list
->tail
= thread
->prev
;
447 thread
->prev
->next
= thread
->next
;
449 list
->head
= thread
->next
;
450 thread
->next
= thread
->prev
= NULL
;
455 /* Thread list is empty or not. */
456 static int thread_empty(struct thread_list
*list
)
458 return list
->head
? 0 : 1;
461 /* Delete top of the list and return it. */
462 static struct thread
*thread_trim_head(struct thread_list
*list
)
464 if (!thread_empty(list
))
465 return thread_list_delete(list
, list
->head
);
469 /* Move thread to unuse list. */
470 static void thread_add_unuse(struct thread_master
*m
, struct thread
*thread
)
472 assert(m
!= NULL
&& thread
!= NULL
);
473 assert(thread
->next
== NULL
);
474 assert(thread
->prev
== NULL
);
477 thread
->type
= THREAD_UNUSED
;
478 thread
->hist
->total_active
--;
479 thread_list_add(&m
->unuse
, thread
);
482 /* Free all unused thread. */
483 static void thread_list_free(struct thread_master
*m
, struct thread_list
*list
)
488 for (t
= list
->head
; t
; t
= next
) {
490 XFREE(MTYPE_THREAD
, t
);
496 static void thread_array_free(struct thread_master
*m
,
497 struct thread
**thread_array
)
502 for (index
= 0; index
< m
->fd_limit
; ++index
) {
503 t
= thread_array
[index
];
505 thread_array
[index
] = NULL
;
506 XFREE(MTYPE_THREAD
, t
);
510 XFREE(MTYPE_THREAD
, thread_array
);
513 static void thread_queue_free(struct thread_master
*m
, struct pqueue
*queue
)
517 for (i
= 0; i
< queue
->size
; i
++)
518 XFREE(MTYPE_THREAD
, queue
->array
[i
]);
520 m
->alloc
-= queue
->size
;
521 pqueue_delete(queue
);
525 * thread_master_free_unused
527 * As threads are finished with they are put on the
528 * unuse list for later reuse.
529 * If we are shutting down, Free up unused threads
530 * So we can see if we forget to shut anything off
532 void thread_master_free_unused(struct thread_master
*m
)
534 pthread_mutex_lock(&m
->mtx
);
537 while ((t
= thread_trim_head(&m
->unuse
)) != NULL
) {
538 pthread_mutex_destroy(&t
->mtx
);
539 XFREE(MTYPE_THREAD
, t
);
542 pthread_mutex_unlock(&m
->mtx
);
545 /* Stop thread scheduler. */
546 void thread_master_free(struct thread_master
*m
)
548 pthread_mutex_lock(&masters_mtx
);
550 listnode_delete(masters
, m
);
552 pthread_mutex_unlock(&masters_mtx
);
554 thread_array_free(m
, m
->read
);
555 thread_array_free(m
, m
->write
);
556 thread_queue_free(m
, m
->timer
);
557 thread_list_free(m
, &m
->event
);
558 thread_list_free(m
, &m
->ready
);
559 thread_list_free(m
, &m
->unuse
);
560 pthread_mutex_destroy(&m
->mtx
);
561 close(m
->io_pipe
[0]);
562 close(m
->io_pipe
[1]);
563 list_delete(m
->cancel_req
);
565 hash_clean(m
->cpu_record
, cpu_record_hash_free
);
566 hash_free(m
->cpu_record
);
567 m
->cpu_record
= NULL
;
569 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.pfds
);
570 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.copy
);
571 XFREE(MTYPE_THREAD_MASTER
, m
);
574 /* Return remain time in second. */
575 unsigned long thread_timer_remain_second(struct thread
*thread
)
579 pthread_mutex_lock(&thread
->mtx
);
581 remain
= monotime_until(&thread
->u
.sands
, NULL
) / 1000000LL;
583 pthread_mutex_unlock(&thread
->mtx
);
585 return remain
< 0 ? 0 : remain
;
588 #define debugargdef const char *funcname, const char *schedfrom, int fromln
589 #define debugargpass funcname, schedfrom, fromln
591 struct timeval
thread_timer_remain(struct thread
*thread
)
593 struct timeval remain
;
594 pthread_mutex_lock(&thread
->mtx
);
596 monotime_until(&thread
->u
.sands
, &remain
);
598 pthread_mutex_unlock(&thread
->mtx
);
602 /* Get new thread. */
603 static struct thread
*thread_get(struct thread_master
*m
, u_char type
,
604 int (*func
)(struct thread
*), void *arg
,
607 struct thread
*thread
= thread_trim_head(&m
->unuse
);
608 struct cpu_thread_history tmp
;
611 thread
= XCALLOC(MTYPE_THREAD
, sizeof(struct thread
));
612 /* mutex only needs to be initialized at struct creation. */
613 pthread_mutex_init(&thread
->mtx
, NULL
);
618 thread
->add_type
= type
;
622 thread
->yield
= THREAD_YIELD_TIME_SLOT
; /* default */
626 * So if the passed in funcname is not what we have
627 * stored that means the thread->hist needs to be
628 * updated. We keep the last one around in unused
629 * under the assumption that we are probably
630 * going to immediately allocate the same
632 * This hopefully saves us some serious
635 if (thread
->funcname
!= funcname
|| thread
->func
!= func
) {
637 tmp
.funcname
= funcname
;
639 hash_get(m
->cpu_record
, &tmp
,
640 (void *(*)(void *))cpu_record_hash_alloc
);
642 thread
->hist
->total_active
++;
644 thread
->funcname
= funcname
;
645 thread
->schedfrom
= schedfrom
;
646 thread
->schedfrom_line
= fromln
;
651 static int fd_poll(struct thread_master
*m
, struct pollfd
*pfds
, nfds_t pfdsize
,
652 nfds_t count
, const struct timeval
*timer_wait
)
654 /* If timer_wait is null here, that means poll() should block
656 * unless the thread_master has overriden it by setting
657 * ->selectpoll_timeout.
658 * If the value is positive, it specifies the maximum number of
660 * to wait. If the timeout is -1, it specifies that we should never wait
662 * always return immediately even if no event is detected. If the value
664 * zero, the behavior is default. */
667 /* number of file descriptors with events */
670 if (timer_wait
!= NULL
671 && m
->selectpoll_timeout
== 0) // use the default value
672 timeout
= (timer_wait
->tv_sec
* 1000)
673 + (timer_wait
->tv_usec
/ 1000);
674 else if (m
->selectpoll_timeout
> 0) // use the user's timeout
675 timeout
= m
->selectpoll_timeout
;
676 else if (m
->selectpoll_timeout
677 < 0) // effect a poll (return immediately)
680 /* add poll pipe poker */
681 assert(count
+ 1 < pfdsize
);
682 pfds
[count
].fd
= m
->io_pipe
[0];
683 pfds
[count
].events
= POLLIN
;
684 pfds
[count
].revents
= 0x00;
686 num
= poll(pfds
, count
+ 1, timeout
);
688 unsigned char trash
[64];
689 if (num
> 0 && pfds
[count
].revents
!= 0 && num
--)
690 while (read(m
->io_pipe
[0], &trash
, sizeof(trash
)) > 0)
696 /* Add new read thread. */
697 struct thread
*funcname_thread_add_read_write(int dir
, struct thread_master
*m
,
698 int (*func
)(struct thread
*),
700 struct thread
**t_ptr
,
703 struct thread
*thread
= NULL
;
705 pthread_mutex_lock(&m
->mtx
);
708 && *t_ptr
) // thread is already scheduled; don't reschedule
710 pthread_mutex_unlock(&m
->mtx
);
714 /* default to a new pollfd */
715 nfds_t queuepos
= m
->handler
.pfdcount
;
717 /* if we already have a pollfd for our file descriptor, find and
719 for (nfds_t i
= 0; i
< m
->handler
.pfdcount
; i
++)
720 if (m
->handler
.pfds
[i
].fd
== fd
) {
725 /* make sure we have room for this fd + pipe poker fd */
726 assert(queuepos
+ 1 < m
->handler
.pfdsize
);
728 thread
= thread_get(m
, dir
, func
, arg
, debugargpass
);
730 m
->handler
.pfds
[queuepos
].fd
= fd
;
731 m
->handler
.pfds
[queuepos
].events
|=
732 (dir
== THREAD_READ
? POLLIN
: POLLOUT
);
734 if (queuepos
== m
->handler
.pfdcount
)
735 m
->handler
.pfdcount
++;
738 pthread_mutex_lock(&thread
->mtx
);
741 if (dir
== THREAD_READ
)
742 m
->read
[thread
->u
.fd
] = thread
;
744 m
->write
[thread
->u
.fd
] = thread
;
746 pthread_mutex_unlock(&thread
->mtx
);
756 pthread_mutex_unlock(&m
->mtx
);
761 static struct thread
*
762 funcname_thread_add_timer_timeval(struct thread_master
*m
,
763 int (*func
)(struct thread
*), int type
,
764 void *arg
, struct timeval
*time_relative
,
765 struct thread
**t_ptr
, debugargdef
)
767 struct thread
*thread
;
768 struct pqueue
*queue
;
772 assert(type
== THREAD_TIMER
);
773 assert(time_relative
);
775 pthread_mutex_lock(&m
->mtx
);
778 && *t_ptr
) // thread is already scheduled; don't reschedule
780 pthread_mutex_unlock(&m
->mtx
);
785 thread
= thread_get(m
, type
, func
, arg
, debugargpass
);
787 pthread_mutex_lock(&thread
->mtx
);
789 monotime(&thread
->u
.sands
);
790 timeradd(&thread
->u
.sands
, time_relative
,
792 pqueue_enqueue(thread
, queue
);
798 pthread_mutex_unlock(&thread
->mtx
);
802 pthread_mutex_unlock(&m
->mtx
);
808 /* Add timer event thread. */
809 struct thread
*funcname_thread_add_timer(struct thread_master
*m
,
810 int (*func
)(struct thread
*),
811 void *arg
, long timer
,
812 struct thread
**t_ptr
, debugargdef
)
821 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
822 &trel
, t_ptr
, debugargpass
);
825 /* Add timer event thread with "millisecond" resolution */
826 struct thread
*funcname_thread_add_timer_msec(struct thread_master
*m
,
827 int (*func
)(struct thread
*),
828 void *arg
, long timer
,
829 struct thread
**t_ptr
,
836 trel
.tv_sec
= timer
/ 1000;
837 trel
.tv_usec
= 1000 * (timer
% 1000);
839 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
840 &trel
, t_ptr
, debugargpass
);
843 /* Add timer event thread with "millisecond" resolution */
844 struct thread
*funcname_thread_add_timer_tv(struct thread_master
*m
,
845 int (*func
)(struct thread
*),
846 void *arg
, struct timeval
*tv
,
847 struct thread
**t_ptr
, debugargdef
)
849 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
, tv
,
850 t_ptr
, debugargpass
);
853 /* Add simple event thread. */
854 struct thread
*funcname_thread_add_event(struct thread_master
*m
,
855 int (*func
)(struct thread
*),
857 struct thread
**t_ptr
, debugargdef
)
859 struct thread
*thread
;
863 pthread_mutex_lock(&m
->mtx
);
866 && *t_ptr
) // thread is already scheduled; don't reschedule
868 pthread_mutex_unlock(&m
->mtx
);
872 thread
= thread_get(m
, THREAD_EVENT
, func
, arg
, debugargpass
);
873 pthread_mutex_lock(&thread
->mtx
);
876 thread_list_add(&m
->event
, thread
);
878 pthread_mutex_unlock(&thread
->mtx
);
887 pthread_mutex_unlock(&m
->mtx
);
892 /* Thread cancellation ------------------------------------------------------ */
895 * NOT's out the .events field of pollfd corresponding to the given file
896 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
898 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
899 * implementation for details.
903 * @param state the event to cancel. One or more (OR'd together) of the
908 static void thread_cancel_rw(struct thread_master
*master
, int fd
, short state
)
910 /* Cancel POLLHUP too just in case some bozo set it */
913 /* find the index of corresponding pollfd */
916 for (i
= 0; i
< master
->handler
.pfdcount
; i
++)
917 if (master
->handler
.pfds
[i
].fd
== fd
)
921 master
->handler
.pfds
[i
].events
&= ~(state
);
923 /* If all events are canceled, delete / resize the pollfd array. */
924 if (master
->handler
.pfds
[i
].events
== 0) {
925 memmove(master
->handler
.pfds
+ i
, master
->handler
.pfds
+ i
+ 1,
926 (master
->handler
.pfdcount
- i
- 1)
927 * sizeof(struct pollfd
));
928 master
->handler
.pfdcount
--;
931 /* If we have the same pollfd in the copy, perform the same operations,
932 * otherwise return. */
933 if (i
>= master
->handler
.copycount
)
936 master
->handler
.copy
[i
].events
&= ~(state
);
938 if (master
->handler
.copy
[i
].events
== 0) {
939 memmove(master
->handler
.copy
+ i
, master
->handler
.copy
+ i
+ 1,
940 (master
->handler
.copycount
- i
- 1)
941 * sizeof(struct pollfd
));
942 master
->handler
.copycount
--;
947 * Process cancellation requests.
949 * This may only be run from the pthread which owns the thread_master.
951 * @param master the thread master to process
952 * @REQUIRE master->mtx
954 static void do_thread_cancel(struct thread_master
*master
)
956 struct thread_list
*list
= NULL
;
957 struct pqueue
*queue
= NULL
;
958 struct thread
**thread_array
= NULL
;
959 struct thread
*thread
;
961 struct cancel_req
*cr
;
963 for (ALL_LIST_ELEMENTS_RO(master
->cancel_req
, ln
, cr
)) {
964 /* If this is an event object cancellation, linear search
966 * list deleting any events which have the specified argument.
968 * need to check every thread in the ready queue. */
971 thread
= master
->event
.head
;
977 if (t
->arg
== cr
->eventobj
) {
978 thread_list_delete(&master
->event
, t
);
981 thread_add_unuse(master
, t
);
985 thread
= master
->ready
.head
;
990 if (t
->arg
== cr
->eventobj
) {
991 thread_list_delete(&master
->ready
, t
);
994 thread_add_unuse(master
, t
);
1000 /* The pointer varies depending on whether the cancellation
1002 * made asynchronously or not. If it was, we need to check
1004 * thread even exists anymore before cancelling it. */
1005 thread
= (cr
->thread
) ? cr
->thread
: *cr
->threadref
;
1010 /* Determine the appropriate queue to cancel the thread from */
1011 switch (thread
->type
) {
1013 thread_cancel_rw(master
, thread
->u
.fd
, POLLIN
);
1014 thread_array
= master
->read
;
1017 thread_cancel_rw(master
, thread
->u
.fd
, POLLOUT
);
1018 thread_array
= master
->write
;
1021 queue
= master
->timer
;
1024 list
= &master
->event
;
1027 list
= &master
->ready
;
1035 assert(thread
->index
>= 0);
1036 pqueue_remove(thread
, queue
);
1038 thread_list_delete(list
, thread
);
1039 } else if (thread_array
) {
1040 thread_array
[thread
->u
.fd
] = NULL
;
1042 assert(!"Thread should be either in queue or list or array!");
1046 *thread
->ref
= NULL
;
1048 thread_add_unuse(thread
->master
, thread
);
1051 /* Delete and free all cancellation requests */
1052 list_delete_all_node(master
->cancel_req
);
1054 /* Wake up any threads which may be blocked in thread_cancel_async() */
1055 master
->canceled
= true;
1056 pthread_cond_broadcast(&master
->cancel_cond
);
1060 * Cancel any events which have the specified argument.
1064 * @param m the thread_master to cancel from
1065 * @param arg the argument passed when creating the event
1067 void thread_cancel_event(struct thread_master
*master
, void *arg
)
1069 assert(master
->owner
== pthread_self());
1071 pthread_mutex_lock(&master
->mtx
);
1073 struct cancel_req
*cr
=
1074 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1076 listnode_add(master
->cancel_req
, cr
);
1077 do_thread_cancel(master
);
1079 pthread_mutex_unlock(&master
->mtx
);
1083 * Cancel a specific task.
1087 * @param thread task to cancel
1089 void thread_cancel(struct thread
*thread
)
1091 assert(thread
->master
->owner
== pthread_self());
1093 pthread_mutex_lock(&thread
->master
->mtx
);
1095 struct cancel_req
*cr
=
1096 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1097 cr
->thread
= thread
;
1098 listnode_add(thread
->master
->cancel_req
, cr
);
1099 do_thread_cancel(thread
->master
);
1101 pthread_mutex_unlock(&thread
->master
->mtx
);
1105 * Asynchronous cancellation.
1107 * Called with either a struct thread ** or void * to an event argument,
1108 * this function posts the correct cancellation request and blocks until it is
1111 * If the thread is currently running, execution blocks until it completes.
1113 * The last two parameters are mutually exclusive, i.e. if you pass one the
1114 * other must be NULL.
1116 * When the cancellation procedure executes on the target thread_master, the
1117 * thread * provided is checked for nullity. If it is null, the thread is
1118 * assumed to no longer exist and the cancellation request is a no-op. Thus
1119 * users of this API must pass a back-reference when scheduling the original
1124 * @param master the thread master with the relevant event / task
1125 * @param thread pointer to thread to cancel
1126 * @param eventobj the event
1128 void thread_cancel_async(struct thread_master
*master
, struct thread
**thread
,
1131 assert(!(thread
&& eventobj
) && (thread
|| eventobj
));
1132 assert(master
->owner
!= pthread_self());
1134 pthread_mutex_lock(&master
->mtx
);
1136 master
->canceled
= false;
1139 struct cancel_req
*cr
=
1140 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1141 cr
->threadref
= thread
;
1142 listnode_add(master
->cancel_req
, cr
);
1143 } else if (eventobj
) {
1144 struct cancel_req
*cr
=
1145 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1146 cr
->eventobj
= eventobj
;
1147 listnode_add(master
->cancel_req
, cr
);
1151 while (!master
->canceled
)
1152 pthread_cond_wait(&master
->cancel_cond
, &master
->mtx
);
1154 pthread_mutex_unlock(&master
->mtx
);
1156 /* ------------------------------------------------------------------------- */
1158 static struct timeval
*thread_timer_wait(struct pqueue
*queue
,
1159 struct timeval
*timer_val
)
1162 struct thread
*next_timer
= queue
->array
[0];
1163 monotime_until(&next_timer
->u
.sands
, timer_val
);
1169 static struct thread
*thread_run(struct thread_master
*m
, struct thread
*thread
,
1170 struct thread
*fetch
)
1173 thread_add_unuse(m
, thread
);
1177 static int thread_process_io_helper(struct thread_master
*m
,
1178 struct thread
*thread
, short state
, int pos
)
1180 struct thread
**thread_array
;
1185 if (thread
->type
== THREAD_READ
)
1186 thread_array
= m
->read
;
1188 thread_array
= m
->write
;
1190 thread_array
[thread
->u
.fd
] = NULL
;
1191 thread_list_add(&m
->ready
, thread
);
1192 thread
->type
= THREAD_READY
;
1193 /* if another pthread scheduled this file descriptor for the event we're
1194 * responding to, no problem; we're getting to it now */
1195 thread
->master
->handler
.pfds
[pos
].events
&= ~(state
);
1200 * Process I/O events.
1202 * Walks through file descriptor array looking for those pollfds whose .revents
1203 * field has something interesting. Deletes any invalid file descriptors.
1205 * @param m the thread master
1206 * @param num the number of active file descriptors (return value of poll())
1208 static void thread_process_io(struct thread_master
*m
, unsigned int num
)
1210 unsigned int ready
= 0;
1211 struct pollfd
*pfds
= m
->handler
.copy
;
1213 for (nfds_t i
= 0; i
< m
->handler
.copycount
&& ready
< num
; ++i
) {
1214 /* no event for current fd? immediately continue */
1215 if (pfds
[i
].revents
== 0)
1220 /* Unless someone has called thread_cancel from another pthread,
1222 * thing that could have changed in m->handler.pfds while we
1224 * asleep is the .events field in a given pollfd. Barring
1226 * that value should be a superset of the values we have in our
1228 * there's no need to update it. Similarily, barring deletion,
1230 * should still be a valid index into the master's pfds. */
1231 if (pfds
[i
].revents
& (POLLIN
| POLLHUP
))
1232 thread_process_io_helper(m
, m
->read
[pfds
[i
].fd
], POLLIN
,
1234 if (pfds
[i
].revents
& POLLOUT
)
1235 thread_process_io_helper(m
, m
->write
[pfds
[i
].fd
],
1238 /* if one of our file descriptors is garbage, remove the same
1240 * both pfds + update sizes and index */
1241 if (pfds
[i
].revents
& POLLNVAL
) {
1242 memmove(m
->handler
.pfds
+ i
, m
->handler
.pfds
+ i
+ 1,
1243 (m
->handler
.pfdcount
- i
- 1)
1244 * sizeof(struct pollfd
));
1245 m
->handler
.pfdcount
--;
1247 memmove(pfds
+ i
, pfds
+ i
+ 1,
1248 (m
->handler
.copycount
- i
- 1)
1249 * sizeof(struct pollfd
));
1250 m
->handler
.copycount
--;
1257 /* Add all timers that have popped to the ready list. */
1258 static unsigned int thread_process_timers(struct pqueue
*queue
,
1259 struct timeval
*timenow
)
1261 struct thread
*thread
;
1262 unsigned int ready
= 0;
1264 while (queue
->size
) {
1265 thread
= queue
->array
[0];
1266 if (timercmp(timenow
, &thread
->u
.sands
, <))
1268 pqueue_dequeue(queue
);
1269 thread
->type
= THREAD_READY
;
1270 thread_list_add(&thread
->master
->ready
, thread
);
1276 /* process a list en masse, e.g. for event thread lists */
1277 static unsigned int thread_process(struct thread_list
*list
)
1279 struct thread
*thread
;
1280 struct thread
*next
;
1281 unsigned int ready
= 0;
1283 for (thread
= list
->head
; thread
; thread
= next
) {
1284 next
= thread
->next
;
1285 thread_list_delete(list
, thread
);
1286 thread
->type
= THREAD_READY
;
1287 thread_list_add(&thread
->master
->ready
, thread
);
1294 /* Fetch next ready thread. */
1295 struct thread
*thread_fetch(struct thread_master
*m
, struct thread
*fetch
)
1297 struct thread
*thread
= NULL
;
1299 struct timeval zerotime
= {0, 0};
1301 struct timeval
*tw
= NULL
;
1306 /* Handle signals if any */
1307 if (m
->handle_signals
)
1308 quagga_sigevent_process();
1310 pthread_mutex_lock(&m
->mtx
);
1312 /* Process any pending cancellation requests */
1313 do_thread_cancel(m
);
1315 /* Post events to ready queue. This must come before the
1317 * since events should occur immediately */
1318 thread_process(&m
->event
);
1320 /* If there are no tasks on the ready queue, we will poll()
1322 * expires or we receive I/O, whichever comes first. The
1323 * strategy for doing
1326 * - If there are events pending, set the poll() timeout to zero
1327 * - If there are no events pending, but there are timers
1329 * timeout to the smallest remaining time on any timer
1330 * - If there are neither timers nor events pending, but there
1332 * descriptors pending, block indefinitely in poll()
1333 * - If nothing is pending, it's time for the application to die
1335 * In every case except the last, we need to hit poll() at least
1337 * loop to avoid starvation by events */
1339 if (m
->ready
.count
== 0)
1340 tw
= thread_timer_wait(m
->timer
, &tv
);
1342 if (m
->ready
.count
!= 0 || (tw
&& !timercmp(tw
, &zerotime
, >)))
1345 if (!tw
&& m
->handler
.pfdcount
== 0) { /* die */
1346 pthread_mutex_unlock(&m
->mtx
);
1351 /* Copy pollfd array + # active pollfds in it. Not necessary to
1353 * the array size as this is fixed. */
1354 m
->handler
.copycount
= m
->handler
.pfdcount
;
1355 memcpy(m
->handler
.copy
, m
->handler
.pfds
,
1356 m
->handler
.copycount
* sizeof(struct pollfd
));
1358 pthread_mutex_unlock(&m
->mtx
);
1360 num
= fd_poll(m
, m
->handler
.copy
, m
->handler
.pfdsize
,
1361 m
->handler
.copycount
, tw
);
1363 pthread_mutex_lock(&m
->mtx
);
1365 /* Handle any errors received in poll() */
1367 if (errno
== EINTR
) {
1368 pthread_mutex_unlock(&m
->mtx
);
1369 continue; /* loop around to signal handler */
1373 zlog_warn("poll() error: %s", safe_strerror(errno
));
1374 pthread_mutex_unlock(&m
->mtx
);
1379 /* Since we could have received more cancellation requests
1380 * during poll(), process those */
1381 do_thread_cancel(m
);
1383 /* Post timers to ready queue. */
1385 thread_process_timers(m
->timer
, &now
);
1387 /* Post I/O to ready queue. */
1389 thread_process_io(m
, num
);
1391 /* If we have a ready task, break the loop and return it to the
1393 if ((thread
= thread_trim_head(&m
->ready
))) {
1394 fetch
= thread_run(m
, thread
, fetch
);
1399 pthread_mutex_unlock(&m
->mtx
);
1401 } while (!thread
&& m
->spin
);
1406 static unsigned long timeval_elapsed(struct timeval a
, struct timeval b
)
1408 return (((a
.tv_sec
- b
.tv_sec
) * TIMER_SECOND_MICRO
)
1409 + (a
.tv_usec
- b
.tv_usec
));
1412 unsigned long thread_consumed_time(RUSAGE_T
*now
, RUSAGE_T
*start
,
1413 unsigned long *cputime
)
1415 /* This is 'user + sys' time. */
1416 *cputime
= timeval_elapsed(now
->cpu
.ru_utime
, start
->cpu
.ru_utime
)
1417 + timeval_elapsed(now
->cpu
.ru_stime
, start
->cpu
.ru_stime
);
1418 return timeval_elapsed(now
->real
, start
->real
);
1421 /* We should aim to yield after yield milliseconds, which defaults
1422 to THREAD_YIELD_TIME_SLOT .
1423 Note: we are using real (wall clock) time for this calculation.
1424 It could be argued that CPU time may make more sense in certain
1425 contexts. The things to consider are whether the thread may have
1426 blocked (in which case wall time increases, but CPU time does not),
1427 or whether the system is heavily loaded with other processes competing
1428 for CPU time. On balance, wall clock time seems to make sense.
1429 Plus it has the added benefit that gettimeofday should be faster
1430 than calling getrusage. */
1431 int thread_should_yield(struct thread
*thread
)
1434 pthread_mutex_lock(&thread
->mtx
);
1436 result
= monotime_since(&thread
->real
, NULL
)
1437 > (int64_t)thread
->yield
;
1439 pthread_mutex_unlock(&thread
->mtx
);
1443 void thread_set_yield_time(struct thread
*thread
, unsigned long yield_time
)
1445 pthread_mutex_lock(&thread
->mtx
);
1447 thread
->yield
= yield_time
;
1449 pthread_mutex_unlock(&thread
->mtx
);
1452 void thread_getrusage(RUSAGE_T
*r
)
1455 getrusage(RUSAGE_SELF
, &(r
->cpu
));
1458 /* We check thread consumed time. If the system has getrusage, we'll
1459 use that to get in-depth stats on the performance of the thread in addition
1460 to wall clock time stats from gettimeofday. */
1461 void thread_call(struct thread
*thread
)
1463 unsigned long realtime
, cputime
;
1464 RUSAGE_T before
, after
;
1467 thread
->real
= before
.real
;
1469 pthread_setspecific(thread_current
, thread
);
1470 (*thread
->func
)(thread
);
1471 pthread_setspecific(thread_current
, NULL
);
1475 realtime
= thread_consumed_time(&after
, &before
, &cputime
);
1476 thread
->hist
->real
.total
+= realtime
;
1477 if (thread
->hist
->real
.max
< realtime
)
1478 thread
->hist
->real
.max
= realtime
;
1479 thread
->hist
->cpu
.total
+= cputime
;
1480 if (thread
->hist
->cpu
.max
< cputime
)
1481 thread
->hist
->cpu
.max
= cputime
;
1483 ++(thread
->hist
->total_calls
);
1484 thread
->hist
->types
|= (1 << thread
->add_type
);
1486 #ifdef CONSUMED_TIME_CHECK
1487 if (realtime
> CONSUMED_TIME_CHECK
) {
1489 * We have a CPU Hog on our hands.
1490 * Whinge about it now, so we're aware this is yet another task
1494 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1495 thread
->funcname
, (unsigned long)thread
->func
,
1496 realtime
/ 1000, cputime
/ 1000);
1498 #endif /* CONSUMED_TIME_CHECK */
1501 /* Execute thread */
1502 void funcname_thread_execute(struct thread_master
*m
,
1503 int (*func
)(struct thread
*), void *arg
, int val
,
1506 struct cpu_thread_history tmp
;
1507 struct thread dummy
;
1509 memset(&dummy
, 0, sizeof(struct thread
));
1511 pthread_mutex_init(&dummy
.mtx
, NULL
);
1512 dummy
.type
= THREAD_EVENT
;
1513 dummy
.add_type
= THREAD_EXECUTE
;
1514 dummy
.master
= NULL
;
1518 tmp
.func
= dummy
.func
= func
;
1519 tmp
.funcname
= dummy
.funcname
= funcname
;
1520 dummy
.hist
= hash_get(m
->cpu_record
, &tmp
,
1521 (void *(*)(void *))cpu_record_hash_alloc
);
1523 dummy
.schedfrom
= schedfrom
;
1524 dummy
.schedfrom_line
= fromln
;
1526 thread_call(&dummy
);