1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
4 * This file is part of GNU Zebra.
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 #include <sys/resource.h>
35 #include "frratomic.h"
36 #include "lib_errors.h"
38 DEFINE_MTYPE_STATIC(LIB
, THREAD
, "Thread")
39 DEFINE_MTYPE_STATIC(LIB
, THREAD_MASTER
, "Thread master")
40 DEFINE_MTYPE_STATIC(LIB
, THREAD_POLL
, "Thread Poll Info")
41 DEFINE_MTYPE_STATIC(LIB
, THREAD_STATS
, "Thread stats")
43 #if defined(__APPLE__)
44 #include <mach/mach.h>
45 #include <mach/mach_time.h>
50 static unsigned char wakebyte = 0x01; \
51 write(m->io_pipe[1], &wakebyte, 1); \
54 /* control variable for initializer */
55 pthread_once_t init_once
= PTHREAD_ONCE_INIT
;
56 pthread_key_t thread_current
;
58 pthread_mutex_t masters_mtx
= PTHREAD_MUTEX_INITIALIZER
;
59 static struct list
*masters
;
62 /* CLI start ---------------------------------------------------------------- */
63 static unsigned int cpu_record_hash_key(struct cpu_thread_history
*a
)
65 int size
= sizeof(a
->func
);
67 return jhash(&a
->func
, size
, 0);
70 static int cpu_record_hash_cmp(const struct cpu_thread_history
*a
,
71 const struct cpu_thread_history
*b
)
73 return a
->func
== b
->func
;
76 static void *cpu_record_hash_alloc(struct cpu_thread_history
*a
)
78 struct cpu_thread_history
*new;
79 new = XCALLOC(MTYPE_THREAD_STATS
, sizeof(struct cpu_thread_history
));
81 new->funcname
= a
->funcname
;
85 static void cpu_record_hash_free(void *a
)
87 struct cpu_thread_history
*hist
= a
;
89 XFREE(MTYPE_THREAD_STATS
, hist
);
92 static void vty_out_cpu_thread_history(struct vty
*vty
,
93 struct cpu_thread_history
*a
)
95 vty_out(vty
, "%5d %10lu.%03lu %9u %8lu %9lu %8lu %9lu", a
->total_active
,
96 a
->cpu
.total
/ 1000, a
->cpu
.total
% 1000, a
->total_calls
,
97 a
->cpu
.total
/ a
->total_calls
, a
->cpu
.max
,
98 a
->real
.total
/ a
->total_calls
, a
->real
.max
);
99 vty_out(vty
, " %c%c%c%c%c %s\n",
100 a
->types
& (1 << THREAD_READ
) ? 'R' : ' ',
101 a
->types
& (1 << THREAD_WRITE
) ? 'W' : ' ',
102 a
->types
& (1 << THREAD_TIMER
) ? 'T' : ' ',
103 a
->types
& (1 << THREAD_EVENT
) ? 'E' : ' ',
104 a
->types
& (1 << THREAD_EXECUTE
) ? 'X' : ' ', a
->funcname
);
107 static void cpu_record_hash_print(struct hash_backet
*bucket
, void *args
[])
109 struct cpu_thread_history
*totals
= args
[0];
110 struct cpu_thread_history copy
;
111 struct vty
*vty
= args
[1];
112 uint8_t *filter
= args
[2];
114 struct cpu_thread_history
*a
= bucket
->data
;
117 atomic_load_explicit(&a
->total_active
, memory_order_seq_cst
);
119 atomic_load_explicit(&a
->total_calls
, memory_order_seq_cst
);
121 atomic_load_explicit(&a
->cpu
.total
, memory_order_seq_cst
);
122 copy
.cpu
.max
= atomic_load_explicit(&a
->cpu
.max
, memory_order_seq_cst
);
124 atomic_load_explicit(&a
->real
.total
, memory_order_seq_cst
);
126 atomic_load_explicit(&a
->real
.max
, memory_order_seq_cst
);
127 copy
.types
= atomic_load_explicit(&a
->types
, memory_order_seq_cst
);
128 copy
.funcname
= a
->funcname
;
130 if (!(copy
.types
& *filter
))
133 vty_out_cpu_thread_history(vty
, ©
);
134 totals
->total_active
+= copy
.total_active
;
135 totals
->total_calls
+= copy
.total_calls
;
136 totals
->real
.total
+= copy
.real
.total
;
137 if (totals
->real
.max
< copy
.real
.max
)
138 totals
->real
.max
= copy
.real
.max
;
139 totals
->cpu
.total
+= copy
.cpu
.total
;
140 if (totals
->cpu
.max
< copy
.cpu
.max
)
141 totals
->cpu
.max
= copy
.cpu
.max
;
144 static void cpu_record_print(struct vty
*vty
, uint8_t filter
)
146 struct cpu_thread_history tmp
;
147 void *args
[3] = {&tmp
, vty
, &filter
};
148 struct thread_master
*m
;
151 memset(&tmp
, 0, sizeof tmp
);
152 tmp
.funcname
= "TOTAL";
155 pthread_mutex_lock(&masters_mtx
);
157 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
158 const char *name
= m
->name
? m
->name
: "main";
160 char underline
[strlen(name
) + 1];
161 memset(underline
, '-', sizeof(underline
));
162 underline
[sizeof(underline
) - 1] = '\0';
165 vty_out(vty
, "Showing statistics for pthread %s\n",
167 vty_out(vty
, "-------------------------------%s\n",
169 vty_out(vty
, "%21s %18s %18s\n", "",
170 "CPU (user+system):", "Real (wall-clock):");
172 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
173 vty_out(vty
, " Avg uSec Max uSecs");
174 vty_out(vty
, " Type Thread\n");
176 if (m
->cpu_record
->count
)
179 (void (*)(struct hash_backet
*,
180 void *))cpu_record_hash_print
,
183 vty_out(vty
, "No data to display yet.\n");
188 pthread_mutex_unlock(&masters_mtx
);
191 vty_out(vty
, "Total thread statistics\n");
192 vty_out(vty
, "-------------------------\n");
193 vty_out(vty
, "%21s %18s %18s\n", "",
194 "CPU (user+system):", "Real (wall-clock):");
195 vty_out(vty
, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
196 vty_out(vty
, " Avg uSec Max uSecs");
197 vty_out(vty
, " Type Thread\n");
199 if (tmp
.total_calls
> 0)
200 vty_out_cpu_thread_history(vty
, &tmp
);
203 static void cpu_record_hash_clear(struct hash_backet
*bucket
, void *args
[])
205 uint8_t *filter
= args
[0];
206 struct hash
*cpu_record
= args
[1];
208 struct cpu_thread_history
*a
= bucket
->data
;
210 if (!(a
->types
& *filter
))
213 hash_release(cpu_record
, bucket
->data
);
216 static void cpu_record_clear(uint8_t filter
)
218 uint8_t *tmp
= &filter
;
219 struct thread_master
*m
;
222 pthread_mutex_lock(&masters_mtx
);
224 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
225 pthread_mutex_lock(&m
->mtx
);
227 void *args
[2] = {tmp
, m
->cpu_record
};
230 (void (*)(struct hash_backet
*,
231 void *))cpu_record_hash_clear
,
234 pthread_mutex_unlock(&m
->mtx
);
237 pthread_mutex_unlock(&masters_mtx
);
240 static uint8_t parse_filter(const char *filterstr
)
245 while (filterstr
[i
] != '\0') {
246 switch (filterstr
[i
]) {
249 filter
|= (1 << THREAD_READ
);
253 filter
|= (1 << THREAD_WRITE
);
257 filter
|= (1 << THREAD_TIMER
);
261 filter
|= (1 << THREAD_EVENT
);
265 filter
|= (1 << THREAD_EXECUTE
);
275 DEFUN (show_thread_cpu
,
277 "show thread cpu [FILTER]",
279 "Thread information\n"
281 "Display filter (rwtexb)\n")
283 uint8_t filter
= (uint8_t)-1U;
286 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
287 filter
= parse_filter(argv
[idx
]->arg
);
290 "Invalid filter \"%s\" specified; must contain at least"
297 cpu_record_print(vty
, filter
);
301 static void show_thread_poll_helper(struct vty
*vty
, struct thread_master
*m
)
303 const char *name
= m
->name
? m
->name
: "main";
304 char underline
[strlen(name
) + 1];
307 memset(underline
, '-', sizeof(underline
));
308 underline
[sizeof(underline
) - 1] = '\0';
310 vty_out(vty
, "\nShowing poll FD's for %s\n", name
);
311 vty_out(vty
, "----------------------%s\n", underline
);
312 vty_out(vty
, "Count: %u\n", (uint32_t)m
->handler
.pfdcount
);
313 for (i
= 0; i
< m
->handler
.pfdcount
; i
++)
314 vty_out(vty
, "\t%6d fd:%6d events:%2d revents:%2d\n", i
,
315 m
->handler
.pfds
[i
].fd
,
316 m
->handler
.pfds
[i
].events
,
317 m
->handler
.pfds
[i
].revents
);
320 DEFUN (show_thread_poll
,
321 show_thread_poll_cmd
,
324 "Thread information\n"
325 "Show poll FD's and information\n")
327 struct listnode
*node
;
328 struct thread_master
*m
;
330 pthread_mutex_lock(&masters_mtx
);
332 for (ALL_LIST_ELEMENTS_RO(masters
, node
, m
)) {
333 show_thread_poll_helper(vty
, m
);
336 pthread_mutex_unlock(&masters_mtx
);
342 DEFUN (clear_thread_cpu
,
343 clear_thread_cpu_cmd
,
344 "clear thread cpu [FILTER]",
345 "Clear stored data in all pthreads\n"
346 "Thread information\n"
348 "Display filter (rwtexb)\n")
350 uint8_t filter
= (uint8_t)-1U;
353 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
354 filter
= parse_filter(argv
[idx
]->arg
);
357 "Invalid filter \"%s\" specified; must contain at least"
364 cpu_record_clear(filter
);
368 void thread_cmd_init(void)
370 install_element(VIEW_NODE
, &show_thread_cpu_cmd
);
371 install_element(VIEW_NODE
, &show_thread_poll_cmd
);
372 install_element(ENABLE_NODE
, &clear_thread_cpu_cmd
);
374 /* CLI end ------------------------------------------------------------------ */
377 static int thread_timer_cmp(void *a
, void *b
)
379 struct thread
*thread_a
= a
;
380 struct thread
*thread_b
= b
;
382 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, <))
384 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, >))
389 static void thread_timer_update(void *node
, int actual_position
)
391 struct thread
*thread
= node
;
393 thread
->index
= actual_position
;
396 static void cancelreq_del(void *cr
)
398 XFREE(MTYPE_TMP
, cr
);
401 /* initializer, only ever called once */
402 static void initializer()
404 pthread_key_create(&thread_current
, NULL
);
407 struct thread_master
*thread_master_create(const char *name
)
409 struct thread_master
*rv
;
412 pthread_once(&init_once
, &initializer
);
414 rv
= XCALLOC(MTYPE_THREAD_MASTER
, sizeof(struct thread_master
));
418 /* Initialize master mutex */
419 pthread_mutex_init(&rv
->mtx
, NULL
);
420 pthread_cond_init(&rv
->cancel_cond
, NULL
);
423 rv
->name
= name
? XSTRDUP(MTYPE_THREAD_MASTER
, name
) : NULL
;
425 /* Initialize I/O task data structures */
426 getrlimit(RLIMIT_NOFILE
, &limit
);
427 rv
->fd_limit
= (int)limit
.rlim_cur
;
428 rv
->read
= XCALLOC(MTYPE_THREAD_POLL
,
429 sizeof(struct thread
*) * rv
->fd_limit
);
431 rv
->write
= XCALLOC(MTYPE_THREAD_POLL
,
432 sizeof(struct thread
*) * rv
->fd_limit
);
434 rv
->cpu_record
= hash_create_size(
435 8, (unsigned int (*)(void *))cpu_record_hash_key
,
436 (int (*)(const void *, const void *))cpu_record_hash_cmp
,
440 /* Initialize the timer queues */
441 rv
->timer
= pqueue_create();
442 rv
->timer
->cmp
= thread_timer_cmp
;
443 rv
->timer
->update
= thread_timer_update
;
445 /* Initialize thread_fetch() settings */
447 rv
->handle_signals
= true;
449 /* Set pthread owner, should be updated by actual owner */
450 rv
->owner
= pthread_self();
451 rv
->cancel_req
= list_new();
452 rv
->cancel_req
->del
= cancelreq_del
;
455 /* Initialize pipe poker */
457 set_nonblocking(rv
->io_pipe
[0]);
458 set_nonblocking(rv
->io_pipe
[1]);
460 /* Initialize data structures for poll() */
461 rv
->handler
.pfdsize
= rv
->fd_limit
;
462 rv
->handler
.pfdcount
= 0;
463 rv
->handler
.pfds
= XCALLOC(MTYPE_THREAD_MASTER
,
464 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
465 rv
->handler
.copy
= XCALLOC(MTYPE_THREAD_MASTER
,
466 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
468 /* add to list of threadmasters */
469 pthread_mutex_lock(&masters_mtx
);
472 masters
= list_new();
474 listnode_add(masters
, rv
);
476 pthread_mutex_unlock(&masters_mtx
);
481 void thread_master_set_name(struct thread_master
*master
, const char *name
)
483 pthread_mutex_lock(&master
->mtx
);
486 XFREE(MTYPE_THREAD_MASTER
, master
->name
);
487 master
->name
= XSTRDUP(MTYPE_THREAD_MASTER
, name
);
489 pthread_mutex_unlock(&master
->mtx
);
492 /* Add a new thread to the list. */
493 static void thread_list_add(struct thread_list
*list
, struct thread
*thread
)
496 thread
->prev
= list
->tail
;
498 list
->tail
->next
= thread
;
505 /* Delete a thread from the list. */
506 static struct thread
*thread_list_delete(struct thread_list
*list
,
507 struct thread
*thread
)
510 thread
->next
->prev
= thread
->prev
;
512 list
->tail
= thread
->prev
;
514 thread
->prev
->next
= thread
->next
;
516 list
->head
= thread
->next
;
517 thread
->next
= thread
->prev
= NULL
;
522 /* Thread list is empty or not. */
523 static int thread_empty(struct thread_list
*list
)
525 return list
->head
? 0 : 1;
528 /* Delete top of the list and return it. */
529 static struct thread
*thread_trim_head(struct thread_list
*list
)
531 if (!thread_empty(list
))
532 return thread_list_delete(list
, list
->head
);
536 #define THREAD_UNUSED_DEPTH 10
538 /* Move thread to unuse list. */
539 static void thread_add_unuse(struct thread_master
*m
, struct thread
*thread
)
541 assert(m
!= NULL
&& thread
!= NULL
);
542 assert(thread
->next
== NULL
);
543 assert(thread
->prev
== NULL
);
545 thread
->hist
->total_active
--;
546 memset(thread
, 0, sizeof(struct thread
));
547 thread
->type
= THREAD_UNUSED
;
549 if (m
->unuse
.count
< THREAD_UNUSED_DEPTH
)
550 thread_list_add(&m
->unuse
, thread
);
552 XFREE(MTYPE_THREAD
, thread
);
555 /* Free all unused thread. */
556 static void thread_list_free(struct thread_master
*m
, struct thread_list
*list
)
561 for (t
= list
->head
; t
; t
= next
) {
563 XFREE(MTYPE_THREAD
, t
);
569 static void thread_array_free(struct thread_master
*m
,
570 struct thread
**thread_array
)
575 for (index
= 0; index
< m
->fd_limit
; ++index
) {
576 t
= thread_array
[index
];
578 thread_array
[index
] = NULL
;
579 XFREE(MTYPE_THREAD
, t
);
583 XFREE(MTYPE_THREAD_POLL
, thread_array
);
586 static void thread_queue_free(struct thread_master
*m
, struct pqueue
*queue
)
590 for (i
= 0; i
< queue
->size
; i
++)
591 XFREE(MTYPE_THREAD
, queue
->array
[i
]);
593 m
->alloc
-= queue
->size
;
594 pqueue_delete(queue
);
598 * thread_master_free_unused
600 * As threads are finished with they are put on the
601 * unuse list for later reuse.
602 * If we are shutting down, Free up unused threads
603 * So we can see if we forget to shut anything off
605 void thread_master_free_unused(struct thread_master
*m
)
607 pthread_mutex_lock(&m
->mtx
);
610 while ((t
= thread_trim_head(&m
->unuse
)) != NULL
) {
611 pthread_mutex_destroy(&t
->mtx
);
612 XFREE(MTYPE_THREAD
, t
);
615 pthread_mutex_unlock(&m
->mtx
);
618 /* Stop thread scheduler. */
619 void thread_master_free(struct thread_master
*m
)
621 pthread_mutex_lock(&masters_mtx
);
623 listnode_delete(masters
, m
);
624 if (masters
->count
== 0) {
625 list_delete_and_null(&masters
);
628 pthread_mutex_unlock(&masters_mtx
);
630 thread_array_free(m
, m
->read
);
631 thread_array_free(m
, m
->write
);
632 thread_queue_free(m
, m
->timer
);
633 thread_list_free(m
, &m
->event
);
634 thread_list_free(m
, &m
->ready
);
635 thread_list_free(m
, &m
->unuse
);
636 pthread_mutex_destroy(&m
->mtx
);
637 pthread_cond_destroy(&m
->cancel_cond
);
638 close(m
->io_pipe
[0]);
639 close(m
->io_pipe
[1]);
640 list_delete_and_null(&m
->cancel_req
);
641 m
->cancel_req
= NULL
;
643 hash_clean(m
->cpu_record
, cpu_record_hash_free
);
644 hash_free(m
->cpu_record
);
645 m
->cpu_record
= NULL
;
648 XFREE(MTYPE_THREAD_MASTER
, m
->name
);
649 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.pfds
);
650 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.copy
);
651 XFREE(MTYPE_THREAD_MASTER
, m
);
654 /* Return remain time in second. */
655 unsigned long thread_timer_remain_second(struct thread
*thread
)
659 pthread_mutex_lock(&thread
->mtx
);
661 remain
= monotime_until(&thread
->u
.sands
, NULL
) / 1000000LL;
663 pthread_mutex_unlock(&thread
->mtx
);
665 return remain
< 0 ? 0 : remain
;
668 #define debugargdef const char *funcname, const char *schedfrom, int fromln
669 #define debugargpass funcname, schedfrom, fromln
671 struct timeval
thread_timer_remain(struct thread
*thread
)
673 struct timeval remain
;
674 pthread_mutex_lock(&thread
->mtx
);
676 monotime_until(&thread
->u
.sands
, &remain
);
678 pthread_mutex_unlock(&thread
->mtx
);
682 /* Get new thread. */
683 static struct thread
*thread_get(struct thread_master
*m
, uint8_t type
,
684 int (*func
)(struct thread
*), void *arg
,
687 struct thread
*thread
= thread_trim_head(&m
->unuse
);
688 struct cpu_thread_history tmp
;
691 thread
= XCALLOC(MTYPE_THREAD
, sizeof(struct thread
));
692 /* mutex only needs to be initialized at struct creation. */
693 pthread_mutex_init(&thread
->mtx
, NULL
);
698 thread
->add_type
= type
;
702 thread
->yield
= THREAD_YIELD_TIME_SLOT
; /* default */
706 * So if the passed in funcname is not what we have
707 * stored that means the thread->hist needs to be
708 * updated. We keep the last one around in unused
709 * under the assumption that we are probably
710 * going to immediately allocate the same
712 * This hopefully saves us some serious
715 if (thread
->funcname
!= funcname
|| thread
->func
!= func
) {
717 tmp
.funcname
= funcname
;
719 hash_get(m
->cpu_record
, &tmp
,
720 (void *(*)(void *))cpu_record_hash_alloc
);
722 thread
->hist
->total_active
++;
724 thread
->funcname
= funcname
;
725 thread
->schedfrom
= schedfrom
;
726 thread
->schedfrom_line
= fromln
;
731 static int fd_poll(struct thread_master
*m
, struct pollfd
*pfds
, nfds_t pfdsize
,
732 nfds_t count
, const struct timeval
*timer_wait
)
734 /* If timer_wait is null here, that means poll() should block
736 * unless the thread_master has overriden it by setting
737 * ->selectpoll_timeout.
738 * If the value is positive, it specifies the maximum number of
740 * to wait. If the timeout is -1, it specifies that we should never wait
742 * always return immediately even if no event is detected. If the value
744 * zero, the behavior is default. */
747 /* number of file descriptors with events */
750 if (timer_wait
!= NULL
751 && m
->selectpoll_timeout
== 0) // use the default value
752 timeout
= (timer_wait
->tv_sec
* 1000)
753 + (timer_wait
->tv_usec
/ 1000);
754 else if (m
->selectpoll_timeout
> 0) // use the user's timeout
755 timeout
= m
->selectpoll_timeout
;
756 else if (m
->selectpoll_timeout
757 < 0) // effect a poll (return immediately)
760 /* add poll pipe poker */
761 assert(count
+ 1 < pfdsize
);
762 pfds
[count
].fd
= m
->io_pipe
[0];
763 pfds
[count
].events
= POLLIN
;
764 pfds
[count
].revents
= 0x00;
766 num
= poll(pfds
, count
+ 1, timeout
);
768 unsigned char trash
[64];
769 if (num
> 0 && pfds
[count
].revents
!= 0 && num
--)
770 while (read(m
->io_pipe
[0], &trash
, sizeof(trash
)) > 0)
776 /* Add new read thread. */
777 struct thread
*funcname_thread_add_read_write(int dir
, struct thread_master
*m
,
778 int (*func
)(struct thread
*),
780 struct thread
**t_ptr
,
783 struct thread
*thread
= NULL
;
785 assert(fd
>= 0 && fd
< m
->fd_limit
);
786 pthread_mutex_lock(&m
->mtx
);
789 && *t_ptr
) // thread is already scheduled; don't reschedule
791 pthread_mutex_unlock(&m
->mtx
);
795 /* default to a new pollfd */
796 nfds_t queuepos
= m
->handler
.pfdcount
;
798 /* if we already have a pollfd for our file descriptor, find and
800 for (nfds_t i
= 0; i
< m
->handler
.pfdcount
; i
++)
801 if (m
->handler
.pfds
[i
].fd
== fd
) {
806 /* make sure we have room for this fd + pipe poker fd */
807 assert(queuepos
+ 1 < m
->handler
.pfdsize
);
809 thread
= thread_get(m
, dir
, func
, arg
, debugargpass
);
811 m
->handler
.pfds
[queuepos
].fd
= fd
;
812 m
->handler
.pfds
[queuepos
].events
|=
813 (dir
== THREAD_READ
? POLLIN
: POLLOUT
);
815 if (queuepos
== m
->handler
.pfdcount
)
816 m
->handler
.pfdcount
++;
819 pthread_mutex_lock(&thread
->mtx
);
822 if (dir
== THREAD_READ
)
823 m
->read
[thread
->u
.fd
] = thread
;
825 m
->write
[thread
->u
.fd
] = thread
;
827 pthread_mutex_unlock(&thread
->mtx
);
837 pthread_mutex_unlock(&m
->mtx
);
842 static struct thread
*
843 funcname_thread_add_timer_timeval(struct thread_master
*m
,
844 int (*func
)(struct thread
*), int type
,
845 void *arg
, struct timeval
*time_relative
,
846 struct thread
**t_ptr
, debugargdef
)
848 struct thread
*thread
;
849 struct pqueue
*queue
;
853 assert(type
== THREAD_TIMER
);
854 assert(time_relative
);
856 pthread_mutex_lock(&m
->mtx
);
859 && *t_ptr
) // thread is already scheduled; don't reschedule
861 pthread_mutex_unlock(&m
->mtx
);
866 thread
= thread_get(m
, type
, func
, arg
, debugargpass
);
868 pthread_mutex_lock(&thread
->mtx
);
870 monotime(&thread
->u
.sands
);
871 timeradd(&thread
->u
.sands
, time_relative
,
873 pqueue_enqueue(thread
, queue
);
879 pthread_mutex_unlock(&thread
->mtx
);
883 pthread_mutex_unlock(&m
->mtx
);
889 /* Add timer event thread. */
890 struct thread
*funcname_thread_add_timer(struct thread_master
*m
,
891 int (*func
)(struct thread
*),
892 void *arg
, long timer
,
893 struct thread
**t_ptr
, debugargdef
)
902 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
903 &trel
, t_ptr
, debugargpass
);
906 /* Add timer event thread with "millisecond" resolution */
907 struct thread
*funcname_thread_add_timer_msec(struct thread_master
*m
,
908 int (*func
)(struct thread
*),
909 void *arg
, long timer
,
910 struct thread
**t_ptr
,
917 trel
.tv_sec
= timer
/ 1000;
918 trel
.tv_usec
= 1000 * (timer
% 1000);
920 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
921 &trel
, t_ptr
, debugargpass
);
924 /* Add timer event thread with "millisecond" resolution */
925 struct thread
*funcname_thread_add_timer_tv(struct thread_master
*m
,
926 int (*func
)(struct thread
*),
927 void *arg
, struct timeval
*tv
,
928 struct thread
**t_ptr
, debugargdef
)
930 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
, tv
,
931 t_ptr
, debugargpass
);
934 /* Add simple event thread. */
935 struct thread
*funcname_thread_add_event(struct thread_master
*m
,
936 int (*func
)(struct thread
*),
938 struct thread
**t_ptr
, debugargdef
)
940 struct thread
*thread
;
944 pthread_mutex_lock(&m
->mtx
);
947 && *t_ptr
) // thread is already scheduled; don't reschedule
949 pthread_mutex_unlock(&m
->mtx
);
953 thread
= thread_get(m
, THREAD_EVENT
, func
, arg
, debugargpass
);
954 pthread_mutex_lock(&thread
->mtx
);
957 thread_list_add(&m
->event
, thread
);
959 pthread_mutex_unlock(&thread
->mtx
);
968 pthread_mutex_unlock(&m
->mtx
);
973 /* Thread cancellation ------------------------------------------------------ */
976 * NOT's out the .events field of pollfd corresponding to the given file
977 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
979 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
980 * implementation for details.
984 * @param state the event to cancel. One or more (OR'd together) of the
989 static void thread_cancel_rw(struct thread_master
*master
, int fd
, short state
)
993 /* Cancel POLLHUP too just in case some bozo set it */
996 /* find the index of corresponding pollfd */
999 for (i
= 0; i
< master
->handler
.pfdcount
; i
++)
1000 if (master
->handler
.pfds
[i
].fd
== fd
) {
1007 "[!] Received cancellation request for nonexistent rw job");
1008 zlog_debug("[!] threadmaster: %s | fd: %d",
1009 master
->name
? master
->name
: "", fd
);
1013 /* NOT out event. */
1014 master
->handler
.pfds
[i
].events
&= ~(state
);
1016 /* If all events are canceled, delete / resize the pollfd array. */
1017 if (master
->handler
.pfds
[i
].events
== 0) {
1018 memmove(master
->handler
.pfds
+ i
, master
->handler
.pfds
+ i
+ 1,
1019 (master
->handler
.pfdcount
- i
- 1)
1020 * sizeof(struct pollfd
));
1021 master
->handler
.pfdcount
--;
1024 /* If we have the same pollfd in the copy, perform the same operations,
1025 * otherwise return. */
1026 if (i
>= master
->handler
.copycount
)
1029 master
->handler
.copy
[i
].events
&= ~(state
);
1031 if (master
->handler
.copy
[i
].events
== 0) {
1032 memmove(master
->handler
.copy
+ i
, master
->handler
.copy
+ i
+ 1,
1033 (master
->handler
.copycount
- i
- 1)
1034 * sizeof(struct pollfd
));
1035 master
->handler
.copycount
--;
1040 * Process cancellation requests.
1042 * This may only be run from the pthread which owns the thread_master.
1044 * @param master the thread master to process
1045 * @REQUIRE master->mtx
1047 static void do_thread_cancel(struct thread_master
*master
)
1049 struct thread_list
*list
= NULL
;
1050 struct pqueue
*queue
= NULL
;
1051 struct thread
**thread_array
= NULL
;
1052 struct thread
*thread
;
1054 struct cancel_req
*cr
;
1055 struct listnode
*ln
;
1056 for (ALL_LIST_ELEMENTS_RO(master
->cancel_req
, ln
, cr
)) {
1057 /* If this is an event object cancellation, linear search
1059 * list deleting any events which have the specified argument.
1061 * need to check every thread in the ready queue. */
1064 thread
= master
->event
.head
;
1070 if (t
->arg
== cr
->eventobj
) {
1071 thread_list_delete(&master
->event
, t
);
1074 thread_add_unuse(master
, t
);
1078 thread
= master
->ready
.head
;
1083 if (t
->arg
== cr
->eventobj
) {
1084 thread_list_delete(&master
->ready
, t
);
1087 thread_add_unuse(master
, t
);
1093 /* The pointer varies depending on whether the cancellation
1095 * made asynchronously or not. If it was, we need to check
1097 * thread even exists anymore before cancelling it. */
1098 thread
= (cr
->thread
) ? cr
->thread
: *cr
->threadref
;
1103 /* Determine the appropriate queue to cancel the thread from */
1104 switch (thread
->type
) {
1106 thread_cancel_rw(master
, thread
->u
.fd
, POLLIN
);
1107 thread_array
= master
->read
;
1110 thread_cancel_rw(master
, thread
->u
.fd
, POLLOUT
);
1111 thread_array
= master
->write
;
1114 queue
= master
->timer
;
1117 list
= &master
->event
;
1120 list
= &master
->ready
;
1128 assert(thread
->index
>= 0);
1129 assert(thread
== queue
->array
[thread
->index
]);
1130 pqueue_remove_at(thread
->index
, queue
);
1132 thread_list_delete(list
, thread
);
1133 } else if (thread_array
) {
1134 thread_array
[thread
->u
.fd
] = NULL
;
1136 assert(!"Thread should be either in queue or list or array!");
1140 *thread
->ref
= NULL
;
1142 thread_add_unuse(thread
->master
, thread
);
1145 /* Delete and free all cancellation requests */
1146 list_delete_all_node(master
->cancel_req
);
1148 /* Wake up any threads which may be blocked in thread_cancel_async() */
1149 master
->canceled
= true;
1150 pthread_cond_broadcast(&master
->cancel_cond
);
1154 * Cancel any events which have the specified argument.
1158 * @param m the thread_master to cancel from
1159 * @param arg the argument passed when creating the event
1161 void thread_cancel_event(struct thread_master
*master
, void *arg
)
1163 assert(master
->owner
== pthread_self());
1165 pthread_mutex_lock(&master
->mtx
);
1167 struct cancel_req
*cr
=
1168 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1170 listnode_add(master
->cancel_req
, cr
);
1171 do_thread_cancel(master
);
1173 pthread_mutex_unlock(&master
->mtx
);
1177 * Cancel a specific task.
1181 * @param thread task to cancel
1183 void thread_cancel(struct thread
*thread
)
1185 struct thread_master
*master
= thread
->master
;
1187 assert(master
->owner
== pthread_self());
1189 pthread_mutex_lock(&master
->mtx
);
1191 struct cancel_req
*cr
=
1192 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1193 cr
->thread
= thread
;
1194 listnode_add(master
->cancel_req
, cr
);
1195 do_thread_cancel(master
);
1197 pthread_mutex_unlock(&master
->mtx
);
1201 * Asynchronous cancellation.
1203 * Called with either a struct thread ** or void * to an event argument,
1204 * this function posts the correct cancellation request and blocks until it is
1207 * If the thread is currently running, execution blocks until it completes.
1209 * The last two parameters are mutually exclusive, i.e. if you pass one the
1210 * other must be NULL.
1212 * When the cancellation procedure executes on the target thread_master, the
1213 * thread * provided is checked for nullity. If it is null, the thread is
1214 * assumed to no longer exist and the cancellation request is a no-op. Thus
1215 * users of this API must pass a back-reference when scheduling the original
1220 * @param master the thread master with the relevant event / task
1221 * @param thread pointer to thread to cancel
1222 * @param eventobj the event
1224 void thread_cancel_async(struct thread_master
*master
, struct thread
**thread
,
1227 assert(!(thread
&& eventobj
) && (thread
|| eventobj
));
1228 assert(master
->owner
!= pthread_self());
1230 pthread_mutex_lock(&master
->mtx
);
1232 master
->canceled
= false;
1235 struct cancel_req
*cr
=
1236 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1237 cr
->threadref
= thread
;
1238 listnode_add(master
->cancel_req
, cr
);
1239 } else if (eventobj
) {
1240 struct cancel_req
*cr
=
1241 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1242 cr
->eventobj
= eventobj
;
1243 listnode_add(master
->cancel_req
, cr
);
1247 while (!master
->canceled
)
1248 pthread_cond_wait(&master
->cancel_cond
, &master
->mtx
);
1250 pthread_mutex_unlock(&master
->mtx
);
1252 /* ------------------------------------------------------------------------- */
1254 static struct timeval
*thread_timer_wait(struct pqueue
*queue
,
1255 struct timeval
*timer_val
)
1258 struct thread
*next_timer
= queue
->array
[0];
1259 monotime_until(&next_timer
->u
.sands
, timer_val
);
1265 static struct thread
*thread_run(struct thread_master
*m
, struct thread
*thread
,
1266 struct thread
*fetch
)
1269 thread_add_unuse(m
, thread
);
1273 static int thread_process_io_helper(struct thread_master
*m
,
1274 struct thread
*thread
, short state
, int pos
)
1276 struct thread
**thread_array
;
1281 if (thread
->type
== THREAD_READ
)
1282 thread_array
= m
->read
;
1284 thread_array
= m
->write
;
1286 thread_array
[thread
->u
.fd
] = NULL
;
1287 thread_list_add(&m
->ready
, thread
);
1288 thread
->type
= THREAD_READY
;
1289 /* if another pthread scheduled this file descriptor for the event we're
1290 * responding to, no problem; we're getting to it now */
1291 thread
->master
->handler
.pfds
[pos
].events
&= ~(state
);
1296 * Process I/O events.
1298 * Walks through file descriptor array looking for those pollfds whose .revents
1299 * field has something interesting. Deletes any invalid file descriptors.
1301 * @param m the thread master
1302 * @param num the number of active file descriptors (return value of poll())
1304 static void thread_process_io(struct thread_master
*m
, unsigned int num
)
1306 unsigned int ready
= 0;
1307 struct pollfd
*pfds
= m
->handler
.copy
;
1309 for (nfds_t i
= 0; i
< m
->handler
.copycount
&& ready
< num
; ++i
) {
1310 /* no event for current fd? immediately continue */
1311 if (pfds
[i
].revents
== 0)
1316 /* Unless someone has called thread_cancel from another pthread,
1318 * thing that could have changed in m->handler.pfds while we
1320 * asleep is the .events field in a given pollfd. Barring
1322 * that value should be a superset of the values we have in our
1324 * there's no need to update it. Similarily, barring deletion,
1326 * should still be a valid index into the master's pfds. */
1327 if (pfds
[i
].revents
& (POLLIN
| POLLHUP
))
1328 thread_process_io_helper(m
, m
->read
[pfds
[i
].fd
], POLLIN
,
1330 if (pfds
[i
].revents
& POLLOUT
)
1331 thread_process_io_helper(m
, m
->write
[pfds
[i
].fd
],
1334 /* if one of our file descriptors is garbage, remove the same
1336 * both pfds + update sizes and index */
1337 if (pfds
[i
].revents
& POLLNVAL
) {
1338 memmove(m
->handler
.pfds
+ i
, m
->handler
.pfds
+ i
+ 1,
1339 (m
->handler
.pfdcount
- i
- 1)
1340 * sizeof(struct pollfd
));
1341 m
->handler
.pfdcount
--;
1343 memmove(pfds
+ i
, pfds
+ i
+ 1,
1344 (m
->handler
.copycount
- i
- 1)
1345 * sizeof(struct pollfd
));
1346 m
->handler
.copycount
--;
1353 /* Add all timers that have popped to the ready list. */
1354 static unsigned int thread_process_timers(struct pqueue
*queue
,
1355 struct timeval
*timenow
)
1357 struct thread
*thread
;
1358 unsigned int ready
= 0;
1360 while (queue
->size
) {
1361 thread
= queue
->array
[0];
1362 if (timercmp(timenow
, &thread
->u
.sands
, <))
1364 pqueue_dequeue(queue
);
1365 thread
->type
= THREAD_READY
;
1366 thread_list_add(&thread
->master
->ready
, thread
);
1372 /* process a list en masse, e.g. for event thread lists */
1373 static unsigned int thread_process(struct thread_list
*list
)
1375 struct thread
*thread
;
1376 struct thread
*next
;
1377 unsigned int ready
= 0;
1379 for (thread
= list
->head
; thread
; thread
= next
) {
1380 next
= thread
->next
;
1381 thread_list_delete(list
, thread
);
1382 thread
->type
= THREAD_READY
;
1383 thread_list_add(&thread
->master
->ready
, thread
);
1390 /* Fetch next ready thread. */
1391 struct thread
*thread_fetch(struct thread_master
*m
, struct thread
*fetch
)
1393 struct thread
*thread
= NULL
;
1395 struct timeval zerotime
= {0, 0};
1397 struct timeval
*tw
= NULL
;
1402 /* Handle signals if any */
1403 if (m
->handle_signals
)
1404 quagga_sigevent_process();
1406 pthread_mutex_lock(&m
->mtx
);
1408 /* Process any pending cancellation requests */
1409 do_thread_cancel(m
);
1412 * Attempt to flush ready queue before going into poll().
1413 * This is performance-critical. Think twice before modifying.
1415 if ((thread
= thread_trim_head(&m
->ready
))) {
1416 fetch
= thread_run(m
, thread
, fetch
);
1419 pthread_mutex_unlock(&m
->mtx
);
1423 /* otherwise, tick through scheduling sequence */
1426 * Post events to ready queue. This must come before the
1427 * following block since events should occur immediately
1429 thread_process(&m
->event
);
1432 * If there are no tasks on the ready queue, we will poll()
1433 * until a timer expires or we receive I/O, whichever comes
1434 * first. The strategy for doing this is:
1436 * - If there are events pending, set the poll() timeout to zero
1437 * - If there are no events pending, but there are timers
1439 * timeout to the smallest remaining time on any timer
1440 * - If there are neither timers nor events pending, but there
1442 * descriptors pending, block indefinitely in poll()
1443 * - If nothing is pending, it's time for the application to die
1445 * In every case except the last, we need to hit poll() at least
1446 * once per loop to avoid starvation by events
1448 if (m
->ready
.count
== 0)
1449 tw
= thread_timer_wait(m
->timer
, &tv
);
1451 if (m
->ready
.count
!= 0 || (tw
&& !timercmp(tw
, &zerotime
, >)))
1454 if (!tw
&& m
->handler
.pfdcount
== 0) { /* die */
1455 pthread_mutex_unlock(&m
->mtx
);
1461 * Copy pollfd array + # active pollfds in it. Not necessary to
1462 * copy the array size as this is fixed.
1464 m
->handler
.copycount
= m
->handler
.pfdcount
;
1465 memcpy(m
->handler
.copy
, m
->handler
.pfds
,
1466 m
->handler
.copycount
* sizeof(struct pollfd
));
1468 pthread_mutex_unlock(&m
->mtx
);
1470 num
= fd_poll(m
, m
->handler
.copy
, m
->handler
.pfdsize
,
1471 m
->handler
.copycount
, tw
);
1473 pthread_mutex_lock(&m
->mtx
);
1475 /* Handle any errors received in poll() */
1477 if (errno
== EINTR
) {
1478 pthread_mutex_unlock(&m
->mtx
);
1479 /* loop around to signal handler */
1484 flog_err(LIB_ERR_SYSTEM_CALL
, "poll() error: %s",
1485 safe_strerror(errno
));
1486 pthread_mutex_unlock(&m
->mtx
);
1491 /* Post timers to ready queue. */
1493 thread_process_timers(m
->timer
, &now
);
1495 /* Post I/O to ready queue. */
1497 thread_process_io(m
, num
);
1499 pthread_mutex_unlock(&m
->mtx
);
1501 } while (!thread
&& m
->spin
);
1506 static unsigned long timeval_elapsed(struct timeval a
, struct timeval b
)
1508 return (((a
.tv_sec
- b
.tv_sec
) * TIMER_SECOND_MICRO
)
1509 + (a
.tv_usec
- b
.tv_usec
));
1512 unsigned long thread_consumed_time(RUSAGE_T
*now
, RUSAGE_T
*start
,
1513 unsigned long *cputime
)
1515 /* This is 'user + sys' time. */
1516 *cputime
= timeval_elapsed(now
->cpu
.ru_utime
, start
->cpu
.ru_utime
)
1517 + timeval_elapsed(now
->cpu
.ru_stime
, start
->cpu
.ru_stime
);
1518 return timeval_elapsed(now
->real
, start
->real
);
1521 /* We should aim to yield after yield milliseconds, which defaults
1522 to THREAD_YIELD_TIME_SLOT .
1523 Note: we are using real (wall clock) time for this calculation.
1524 It could be argued that CPU time may make more sense in certain
1525 contexts. The things to consider are whether the thread may have
1526 blocked (in which case wall time increases, but CPU time does not),
1527 or whether the system is heavily loaded with other processes competing
1528 for CPU time. On balance, wall clock time seems to make sense.
1529 Plus it has the added benefit that gettimeofday should be faster
1530 than calling getrusage. */
1531 int thread_should_yield(struct thread
*thread
)
1534 pthread_mutex_lock(&thread
->mtx
);
1536 result
= monotime_since(&thread
->real
, NULL
)
1537 > (int64_t)thread
->yield
;
1539 pthread_mutex_unlock(&thread
->mtx
);
1543 void thread_set_yield_time(struct thread
*thread
, unsigned long yield_time
)
1545 pthread_mutex_lock(&thread
->mtx
);
1547 thread
->yield
= yield_time
;
1549 pthread_mutex_unlock(&thread
->mtx
);
1552 void thread_getrusage(RUSAGE_T
*r
)
1555 getrusage(RUSAGE_SELF
, &(r
->cpu
));
1561 * This function will atomically update the thread's usage history. At present
1562 * this is the only spot where usage history is written. Nevertheless the code
1563 * has been written such that the introduction of writers in the future should
1564 * not need to update it provided the writers atomically perform only the
1565 * operations done here, i.e. updating the total and maximum times. In
1566 * particular, the maximum real and cpu times must be monotonically increasing
1567 * or this code is not correct.
1569 void thread_call(struct thread
*thread
)
1571 _Atomic
unsigned long realtime
, cputime
;
1573 unsigned long helper
;
1574 RUSAGE_T before
, after
;
1577 thread
->real
= before
.real
;
1579 pthread_setspecific(thread_current
, thread
);
1580 (*thread
->func
)(thread
);
1581 pthread_setspecific(thread_current
, NULL
);
1585 realtime
= thread_consumed_time(&after
, &before
, &helper
);
1588 /* update realtime */
1589 atomic_fetch_add_explicit(&thread
->hist
->real
.total
, realtime
,
1590 memory_order_seq_cst
);
1591 exp
= atomic_load_explicit(&thread
->hist
->real
.max
,
1592 memory_order_seq_cst
);
1593 while (exp
< realtime
1594 && !atomic_compare_exchange_weak_explicit(
1595 &thread
->hist
->real
.max
, &exp
, realtime
,
1596 memory_order_seq_cst
, memory_order_seq_cst
))
1599 /* update cputime */
1600 atomic_fetch_add_explicit(&thread
->hist
->cpu
.total
, cputime
,
1601 memory_order_seq_cst
);
1602 exp
= atomic_load_explicit(&thread
->hist
->cpu
.max
,
1603 memory_order_seq_cst
);
1604 while (exp
< cputime
1605 && !atomic_compare_exchange_weak_explicit(
1606 &thread
->hist
->cpu
.max
, &exp
, cputime
,
1607 memory_order_seq_cst
, memory_order_seq_cst
))
1610 atomic_fetch_add_explicit(&thread
->hist
->total_calls
, 1,
1611 memory_order_seq_cst
);
1612 atomic_fetch_or_explicit(&thread
->hist
->types
, 1 << thread
->add_type
,
1613 memory_order_seq_cst
);
1615 #ifdef CONSUMED_TIME_CHECK
1616 if (realtime
> CONSUMED_TIME_CHECK
) {
1618 * We have a CPU Hog on our hands.
1619 * Whinge about it now, so we're aware this is yet another task
1623 LIB_WARN_SLOW_THREAD
,
1624 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1625 thread
->funcname
, (unsigned long)thread
->func
,
1626 realtime
/ 1000, cputime
/ 1000);
1628 #endif /* CONSUMED_TIME_CHECK */
1631 /* Execute thread */
1632 void funcname_thread_execute(struct thread_master
*m
,
1633 int (*func
)(struct thread
*), void *arg
, int val
,
1636 struct cpu_thread_history tmp
;
1637 struct thread dummy
;
1639 memset(&dummy
, 0, sizeof(struct thread
));
1641 pthread_mutex_init(&dummy
.mtx
, NULL
);
1642 dummy
.type
= THREAD_EVENT
;
1643 dummy
.add_type
= THREAD_EXECUTE
;
1644 dummy
.master
= NULL
;
1648 tmp
.func
= dummy
.func
= func
;
1649 tmp
.funcname
= dummy
.funcname
= funcname
;
1650 dummy
.hist
= hash_get(m
->cpu_record
, &tmp
,
1651 (void *(*)(void *))cpu_record_hash_alloc
);
1653 dummy
.schedfrom
= schedfrom
;
1654 dummy
.schedfrom_line
= fromln
;
1656 thread_call(&dummy
);