1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
4 * This file is part of GNU Zebra.
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 #include <sys/resource.h>
35 #include "frratomic.h"
37 DEFINE_MTYPE_STATIC(LIB
, THREAD
, "Thread")
38 DEFINE_MTYPE_STATIC(LIB
, THREAD_MASTER
, "Thread master")
39 DEFINE_MTYPE_STATIC(LIB
, THREAD_POLL
, "Thread Poll Info")
40 DEFINE_MTYPE_STATIC(LIB
, THREAD_STATS
, "Thread stats")
42 #if defined(__APPLE__)
43 #include <mach/mach.h>
44 #include <mach/mach_time.h>
49 static unsigned char wakebyte = 0x01; \
50 write(m->io_pipe[1], &wakebyte, 1); \
53 /* control variable for initializer */
54 pthread_once_t init_once
= PTHREAD_ONCE_INIT
;
55 pthread_key_t thread_current
;
57 pthread_mutex_t masters_mtx
= PTHREAD_MUTEX_INITIALIZER
;
58 static struct list
*masters
;
60 static void thread_free(struct thread_master
*master
, struct thread
*thread
);
62 /* CLI start ---------------------------------------------------------------- */
63 static unsigned int cpu_record_hash_key(struct cpu_thread_history
*a
)
65 int size
= sizeof(a
->func
);
67 return jhash(&a
->func
, size
, 0);
70 static int cpu_record_hash_cmp(const struct cpu_thread_history
*a
,
71 const struct cpu_thread_history
*b
)
73 return a
->func
== b
->func
;
76 static void *cpu_record_hash_alloc(struct cpu_thread_history
*a
)
78 struct cpu_thread_history
*new;
79 new = XCALLOC(MTYPE_THREAD_STATS
, sizeof(struct cpu_thread_history
));
81 new->funcname
= a
->funcname
;
85 static void cpu_record_hash_free(void *a
)
87 struct cpu_thread_history
*hist
= a
;
89 XFREE(MTYPE_THREAD_STATS
, hist
);
92 static void vty_out_cpu_thread_history(struct vty
*vty
,
93 struct cpu_thread_history
*a
)
95 vty_out(vty
, "%5d %10lu.%03lu %9u %8lu %9lu %8lu %9lu", a
->total_active
,
96 a
->cpu
.total
/ 1000, a
->cpu
.total
% 1000, a
->total_calls
,
97 a
->cpu
.total
/ a
->total_calls
, a
->cpu
.max
,
98 a
->real
.total
/ a
->total_calls
, a
->real
.max
);
99 vty_out(vty
, " %c%c%c%c%c %s\n",
100 a
->types
& (1 << THREAD_READ
) ? 'R' : ' ',
101 a
->types
& (1 << THREAD_WRITE
) ? 'W' : ' ',
102 a
->types
& (1 << THREAD_TIMER
) ? 'T' : ' ',
103 a
->types
& (1 << THREAD_EVENT
) ? 'E' : ' ',
104 a
->types
& (1 << THREAD_EXECUTE
) ? 'X' : ' ', a
->funcname
);
107 static void cpu_record_hash_print(struct hash_backet
*bucket
, void *args
[])
109 struct cpu_thread_history
*totals
= args
[0];
110 struct cpu_thread_history copy
;
111 struct vty
*vty
= args
[1];
112 uint8_t *filter
= args
[2];
114 struct cpu_thread_history
*a
= bucket
->data
;
117 atomic_load_explicit(&a
->total_active
, memory_order_seq_cst
);
119 atomic_load_explicit(&a
->total_calls
, memory_order_seq_cst
);
121 atomic_load_explicit(&a
->cpu
.total
, memory_order_seq_cst
);
122 copy
.cpu
.max
= atomic_load_explicit(&a
->cpu
.max
, memory_order_seq_cst
);
124 atomic_load_explicit(&a
->real
.total
, memory_order_seq_cst
);
126 atomic_load_explicit(&a
->real
.max
, memory_order_seq_cst
);
127 copy
.types
= atomic_load_explicit(&a
->types
, memory_order_seq_cst
);
128 copy
.funcname
= a
->funcname
;
130 if (!(copy
.types
& *filter
))
133 vty_out_cpu_thread_history(vty
, ©
);
134 totals
->total_active
+= copy
.total_active
;
135 totals
->total_calls
+= copy
.total_calls
;
136 totals
->real
.total
+= copy
.real
.total
;
137 if (totals
->real
.max
< copy
.real
.max
)
138 totals
->real
.max
= copy
.real
.max
;
139 totals
->cpu
.total
+= copy
.cpu
.total
;
140 if (totals
->cpu
.max
< copy
.cpu
.max
)
141 totals
->cpu
.max
= copy
.cpu
.max
;
144 static void cpu_record_print(struct vty
*vty
, uint8_t filter
)
146 struct cpu_thread_history tmp
;
147 void *args
[3] = {&tmp
, vty
, &filter
};
148 struct thread_master
*m
;
151 memset(&tmp
, 0, sizeof tmp
);
152 tmp
.funcname
= "TOTAL";
155 pthread_mutex_lock(&masters_mtx
);
157 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
158 const char *name
= m
->name
? m
->name
: "main";
160 char underline
[strlen(name
) + 1];
161 memset(underline
, '-', sizeof(underline
));
162 underline
[sizeof(underline
) - 1] = '\0';
165 vty_out(vty
, "Showing statistics for pthread %s\n",
167 vty_out(vty
, "-------------------------------%s\n",
169 vty_out(vty
, "%21s %18s %18s\n", "",
170 "CPU (user+system):", "Real (wall-clock):");
172 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
173 vty_out(vty
, " Avg uSec Max uSecs");
174 vty_out(vty
, " Type Thread\n");
176 if (m
->cpu_record
->count
)
179 (void (*)(struct hash_backet
*,
180 void *))cpu_record_hash_print
,
183 vty_out(vty
, "No data to display yet.\n");
188 pthread_mutex_unlock(&masters_mtx
);
191 vty_out(vty
, "Total thread statistics\n");
192 vty_out(vty
, "-------------------------\n");
193 vty_out(vty
, "%21s %18s %18s\n", "",
194 "CPU (user+system):", "Real (wall-clock):");
195 vty_out(vty
, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
196 vty_out(vty
, " Avg uSec Max uSecs");
197 vty_out(vty
, " Type Thread\n");
199 if (tmp
.total_calls
> 0)
200 vty_out_cpu_thread_history(vty
, &tmp
);
203 static void cpu_record_hash_clear(struct hash_backet
*bucket
, void *args
[])
205 uint8_t *filter
= args
[0];
206 struct hash
*cpu_record
= args
[1];
208 struct cpu_thread_history
*a
= bucket
->data
;
210 if (!(a
->types
& *filter
))
213 hash_release(cpu_record
, bucket
->data
);
216 static void cpu_record_clear(uint8_t filter
)
218 uint8_t *tmp
= &filter
;
219 struct thread_master
*m
;
222 pthread_mutex_lock(&masters_mtx
);
224 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
225 pthread_mutex_lock(&m
->mtx
);
227 void *args
[2] = {tmp
, m
->cpu_record
};
230 (void (*)(struct hash_backet
*,
231 void *))cpu_record_hash_clear
,
234 pthread_mutex_unlock(&m
->mtx
);
237 pthread_mutex_unlock(&masters_mtx
);
240 static uint8_t parse_filter(const char *filterstr
)
245 while (filterstr
[i
] != '\0') {
246 switch (filterstr
[i
]) {
249 filter
|= (1 << THREAD_READ
);
253 filter
|= (1 << THREAD_WRITE
);
257 filter
|= (1 << THREAD_TIMER
);
261 filter
|= (1 << THREAD_EVENT
);
265 filter
|= (1 << THREAD_EXECUTE
);
275 DEFUN (show_thread_cpu
,
277 "show thread cpu [FILTER]",
279 "Thread information\n"
281 "Display filter (rwtexb)\n")
283 uint8_t filter
= (uint8_t)-1U;
286 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
287 filter
= parse_filter(argv
[idx
]->arg
);
290 "Invalid filter \"%s\" specified; must contain at least"
297 cpu_record_print(vty
, filter
);
301 static void show_thread_poll_helper(struct vty
*vty
, struct thread_master
*m
)
303 const char *name
= m
->name
? m
->name
: "main";
304 char underline
[strlen(name
) + 1];
307 memset(underline
, '-', sizeof(underline
));
308 underline
[sizeof(underline
) - 1] = '\0';
310 vty_out(vty
, "\nShowing poll FD's for %s\n", name
);
311 vty_out(vty
, "----------------------%s\n", underline
);
312 vty_out(vty
, "Count: %u\n", (uint32_t)m
->handler
.pfdcount
);
313 for (i
= 0; i
< m
->handler
.pfdcount
; i
++)
314 vty_out(vty
, "\t%6d fd:%6d events:%2d revents:%2d\n", i
,
315 m
->handler
.pfds
[i
].fd
,
316 m
->handler
.pfds
[i
].events
,
317 m
->handler
.pfds
[i
].revents
);
320 DEFUN (show_thread_poll
,
321 show_thread_poll_cmd
,
324 "Thread information\n"
325 "Show poll FD's and information\n")
327 struct listnode
*node
;
328 struct thread_master
*m
;
330 pthread_mutex_lock(&masters_mtx
);
332 for (ALL_LIST_ELEMENTS_RO(masters
, node
, m
)) {
333 show_thread_poll_helper(vty
, m
);
336 pthread_mutex_unlock(&masters_mtx
);
342 DEFUN (clear_thread_cpu
,
343 clear_thread_cpu_cmd
,
344 "clear thread cpu [FILTER]",
345 "Clear stored data in all pthreads\n"
346 "Thread information\n"
348 "Display filter (rwtexb)\n")
350 uint8_t filter
= (uint8_t)-1U;
353 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
354 filter
= parse_filter(argv
[idx
]->arg
);
357 "Invalid filter \"%s\" specified; must contain at least"
364 cpu_record_clear(filter
);
368 void thread_cmd_init(void)
370 install_element(VIEW_NODE
, &show_thread_cpu_cmd
);
371 install_element(VIEW_NODE
, &show_thread_poll_cmd
);
372 install_element(ENABLE_NODE
, &clear_thread_cpu_cmd
);
374 /* CLI end ------------------------------------------------------------------ */
377 static int thread_timer_cmp(void *a
, void *b
)
379 struct thread
*thread_a
= a
;
380 struct thread
*thread_b
= b
;
382 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, <))
384 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, >))
389 static void thread_timer_update(void *node
, int actual_position
)
391 struct thread
*thread
= node
;
393 thread
->index
= actual_position
;
396 static void cancelreq_del(void *cr
)
398 XFREE(MTYPE_TMP
, cr
);
401 /* initializer, only ever called once */
402 static void initializer()
404 pthread_key_create(&thread_current
, NULL
);
407 struct thread_master
*thread_master_create(const char *name
)
409 struct thread_master
*rv
;
412 pthread_once(&init_once
, &initializer
);
414 rv
= XCALLOC(MTYPE_THREAD_MASTER
, sizeof(struct thread_master
));
418 /* Initialize master mutex */
419 pthread_mutex_init(&rv
->mtx
, NULL
);
420 pthread_cond_init(&rv
->cancel_cond
, NULL
);
423 rv
->name
= name
? XSTRDUP(MTYPE_THREAD_MASTER
, name
) : NULL
;
425 /* Initialize I/O task data structures */
426 getrlimit(RLIMIT_NOFILE
, &limit
);
427 rv
->fd_limit
= (int)limit
.rlim_cur
;
428 rv
->read
= XCALLOC(MTYPE_THREAD_POLL
,
429 sizeof(struct thread
*) * rv
->fd_limit
);
431 rv
->write
= XCALLOC(MTYPE_THREAD_POLL
,
432 sizeof(struct thread
*) * rv
->fd_limit
);
434 rv
->cpu_record
= hash_create_size(
435 8, (unsigned int (*)(void *))cpu_record_hash_key
,
436 (int (*)(const void *, const void *))cpu_record_hash_cmp
,
440 /* Initialize the timer queues */
441 rv
->timer
= pqueue_create();
442 rv
->timer
->cmp
= thread_timer_cmp
;
443 rv
->timer
->update
= thread_timer_update
;
445 /* Initialize thread_fetch() settings */
447 rv
->handle_signals
= true;
449 /* Set pthread owner, should be updated by actual owner */
450 rv
->owner
= pthread_self();
451 rv
->cancel_req
= list_new();
452 rv
->cancel_req
->del
= cancelreq_del
;
455 /* Initialize pipe poker */
457 set_nonblocking(rv
->io_pipe
[0]);
458 set_nonblocking(rv
->io_pipe
[1]);
460 /* Initialize data structures for poll() */
461 rv
->handler
.pfdsize
= rv
->fd_limit
;
462 rv
->handler
.pfdcount
= 0;
463 rv
->handler
.pfds
= XCALLOC(MTYPE_THREAD_MASTER
,
464 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
465 rv
->handler
.copy
= XCALLOC(MTYPE_THREAD_MASTER
,
466 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
468 /* add to list of threadmasters */
469 pthread_mutex_lock(&masters_mtx
);
472 masters
= list_new();
474 listnode_add(masters
, rv
);
476 pthread_mutex_unlock(&masters_mtx
);
481 void thread_master_set_name(struct thread_master
*master
, const char *name
)
483 pthread_mutex_lock(&master
->mtx
);
486 XFREE(MTYPE_THREAD_MASTER
, master
->name
);
487 master
->name
= XSTRDUP(MTYPE_THREAD_MASTER
, name
);
489 pthread_mutex_unlock(&master
->mtx
);
492 /* Add a new thread to the list. */
493 static void thread_list_add(struct thread_list
*list
, struct thread
*thread
)
496 thread
->prev
= list
->tail
;
498 list
->tail
->next
= thread
;
505 /* Delete a thread from the list. */
506 static struct thread
*thread_list_delete(struct thread_list
*list
,
507 struct thread
*thread
)
510 thread
->next
->prev
= thread
->prev
;
512 list
->tail
= thread
->prev
;
514 thread
->prev
->next
= thread
->next
;
516 list
->head
= thread
->next
;
517 thread
->next
= thread
->prev
= NULL
;
522 /* Thread list is empty or not. */
523 static int thread_empty(struct thread_list
*list
)
525 return list
->head
? 0 : 1;
528 /* Delete top of the list and return it. */
529 static struct thread
*thread_trim_head(struct thread_list
*list
)
531 if (!thread_empty(list
))
532 return thread_list_delete(list
, list
->head
);
536 #define THREAD_UNUSED_DEPTH 10
538 /* Move thread to unuse list. */
539 static void thread_add_unuse(struct thread_master
*m
, struct thread
*thread
)
541 pthread_mutex_t mtxc
= thread
->mtx
;
543 assert(m
!= NULL
&& thread
!= NULL
);
544 assert(thread
->next
== NULL
);
545 assert(thread
->prev
== NULL
);
547 thread
->hist
->total_active
--;
548 memset(thread
, 0, sizeof(struct thread
));
549 thread
->type
= THREAD_UNUSED
;
551 /* Restore the thread mutex context. */
554 if (m
->unuse
.count
< THREAD_UNUSED_DEPTH
) {
555 thread_list_add(&m
->unuse
, thread
);
559 thread_free(m
, thread
);
562 /* Free all unused thread. */
563 static void thread_list_free(struct thread_master
*m
, struct thread_list
*list
)
568 for (t
= list
->head
; t
; t
= next
) {
575 static void thread_array_free(struct thread_master
*m
,
576 struct thread
**thread_array
)
581 for (index
= 0; index
< m
->fd_limit
; ++index
) {
582 t
= thread_array
[index
];
584 thread_array
[index
] = NULL
;
588 XFREE(MTYPE_THREAD_POLL
, thread_array
);
591 static void thread_queue_free(struct thread_master
*m
, struct pqueue
*queue
)
595 for (i
= 0; i
< queue
->size
; i
++)
596 thread_free(m
, queue
->array
[i
]);
598 pqueue_delete(queue
);
602 * thread_master_free_unused
604 * As threads are finished with they are put on the
605 * unuse list for later reuse.
606 * If we are shutting down, Free up unused threads
607 * So we can see if we forget to shut anything off
609 void thread_master_free_unused(struct thread_master
*m
)
611 pthread_mutex_lock(&m
->mtx
);
614 while ((t
= thread_trim_head(&m
->unuse
)) != NULL
) {
618 pthread_mutex_unlock(&m
->mtx
);
621 /* Stop thread scheduler. */
622 void thread_master_free(struct thread_master
*m
)
624 pthread_mutex_lock(&masters_mtx
);
626 listnode_delete(masters
, m
);
627 if (masters
->count
== 0) {
628 list_delete_and_null(&masters
);
631 pthread_mutex_unlock(&masters_mtx
);
633 thread_array_free(m
, m
->read
);
634 thread_array_free(m
, m
->write
);
635 thread_queue_free(m
, m
->timer
);
636 thread_list_free(m
, &m
->event
);
637 thread_list_free(m
, &m
->ready
);
638 thread_list_free(m
, &m
->unuse
);
639 pthread_mutex_destroy(&m
->mtx
);
640 pthread_cond_destroy(&m
->cancel_cond
);
641 close(m
->io_pipe
[0]);
642 close(m
->io_pipe
[1]);
643 list_delete_and_null(&m
->cancel_req
);
644 m
->cancel_req
= NULL
;
646 hash_clean(m
->cpu_record
, cpu_record_hash_free
);
647 hash_free(m
->cpu_record
);
648 m
->cpu_record
= NULL
;
651 XFREE(MTYPE_THREAD_MASTER
, m
->name
);
652 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.pfds
);
653 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.copy
);
654 XFREE(MTYPE_THREAD_MASTER
, m
);
657 /* Return remain time in second. */
658 unsigned long thread_timer_remain_second(struct thread
*thread
)
662 pthread_mutex_lock(&thread
->mtx
);
664 remain
= monotime_until(&thread
->u
.sands
, NULL
) / 1000000LL;
666 pthread_mutex_unlock(&thread
->mtx
);
668 return remain
< 0 ? 0 : remain
;
671 #define debugargdef const char *funcname, const char *schedfrom, int fromln
672 #define debugargpass funcname, schedfrom, fromln
674 struct timeval
thread_timer_remain(struct thread
*thread
)
676 struct timeval remain
;
677 pthread_mutex_lock(&thread
->mtx
);
679 monotime_until(&thread
->u
.sands
, &remain
);
681 pthread_mutex_unlock(&thread
->mtx
);
685 /* Get new thread. */
686 static struct thread
*thread_get(struct thread_master
*m
, uint8_t type
,
687 int (*func
)(struct thread
*), void *arg
,
690 struct thread
*thread
= thread_trim_head(&m
->unuse
);
691 struct cpu_thread_history tmp
;
694 thread
= XCALLOC(MTYPE_THREAD
, sizeof(struct thread
));
695 /* mutex only needs to be initialized at struct creation. */
696 pthread_mutex_init(&thread
->mtx
, NULL
);
701 thread
->add_type
= type
;
705 thread
->yield
= THREAD_YIELD_TIME_SLOT
; /* default */
709 * So if the passed in funcname is not what we have
710 * stored that means the thread->hist needs to be
711 * updated. We keep the last one around in unused
712 * under the assumption that we are probably
713 * going to immediately allocate the same
715 * This hopefully saves us some serious
718 if (thread
->funcname
!= funcname
|| thread
->func
!= func
) {
720 tmp
.funcname
= funcname
;
722 hash_get(m
->cpu_record
, &tmp
,
723 (void *(*)(void *))cpu_record_hash_alloc
);
725 thread
->hist
->total_active
++;
727 thread
->funcname
= funcname
;
728 thread
->schedfrom
= schedfrom
;
729 thread
->schedfrom_line
= fromln
;
734 static void thread_free(struct thread_master
*master
, struct thread
*thread
)
736 /* Update statistics. */
737 assert(master
->alloc
> 0);
740 /* Free allocated resources. */
741 pthread_mutex_destroy(&thread
->mtx
);
742 XFREE(MTYPE_THREAD
, thread
);
745 static int fd_poll(struct thread_master
*m
, struct pollfd
*pfds
, nfds_t pfdsize
,
746 nfds_t count
, const struct timeval
*timer_wait
)
748 /* If timer_wait is null here, that means poll() should block
750 * unless the thread_master has overriden it by setting
751 * ->selectpoll_timeout.
752 * If the value is positive, it specifies the maximum number of
754 * to wait. If the timeout is -1, it specifies that we should never wait
756 * always return immediately even if no event is detected. If the value
758 * zero, the behavior is default. */
761 /* number of file descriptors with events */
764 if (timer_wait
!= NULL
765 && m
->selectpoll_timeout
== 0) // use the default value
766 timeout
= (timer_wait
->tv_sec
* 1000)
767 + (timer_wait
->tv_usec
/ 1000);
768 else if (m
->selectpoll_timeout
> 0) // use the user's timeout
769 timeout
= m
->selectpoll_timeout
;
770 else if (m
->selectpoll_timeout
771 < 0) // effect a poll (return immediately)
774 /* add poll pipe poker */
775 assert(count
+ 1 < pfdsize
);
776 pfds
[count
].fd
= m
->io_pipe
[0];
777 pfds
[count
].events
= POLLIN
;
778 pfds
[count
].revents
= 0x00;
780 num
= poll(pfds
, count
+ 1, timeout
);
782 unsigned char trash
[64];
783 if (num
> 0 && pfds
[count
].revents
!= 0 && num
--)
784 while (read(m
->io_pipe
[0], &trash
, sizeof(trash
)) > 0)
790 /* Add new read thread. */
791 struct thread
*funcname_thread_add_read_write(int dir
, struct thread_master
*m
,
792 int (*func
)(struct thread
*),
794 struct thread
**t_ptr
,
797 struct thread
*thread
= NULL
;
799 assert(fd
>= 0 && fd
< m
->fd_limit
);
800 pthread_mutex_lock(&m
->mtx
);
803 && *t_ptr
) // thread is already scheduled; don't reschedule
805 pthread_mutex_unlock(&m
->mtx
);
809 /* default to a new pollfd */
810 nfds_t queuepos
= m
->handler
.pfdcount
;
812 /* if we already have a pollfd for our file descriptor, find and
814 for (nfds_t i
= 0; i
< m
->handler
.pfdcount
; i
++)
815 if (m
->handler
.pfds
[i
].fd
== fd
) {
820 /* make sure we have room for this fd + pipe poker fd */
821 assert(queuepos
+ 1 < m
->handler
.pfdsize
);
823 thread
= thread_get(m
, dir
, func
, arg
, debugargpass
);
825 m
->handler
.pfds
[queuepos
].fd
= fd
;
826 m
->handler
.pfds
[queuepos
].events
|=
827 (dir
== THREAD_READ
? POLLIN
: POLLOUT
);
829 if (queuepos
== m
->handler
.pfdcount
)
830 m
->handler
.pfdcount
++;
833 pthread_mutex_lock(&thread
->mtx
);
836 if (dir
== THREAD_READ
)
837 m
->read
[thread
->u
.fd
] = thread
;
839 m
->write
[thread
->u
.fd
] = thread
;
841 pthread_mutex_unlock(&thread
->mtx
);
851 pthread_mutex_unlock(&m
->mtx
);
856 static struct thread
*
857 funcname_thread_add_timer_timeval(struct thread_master
*m
,
858 int (*func
)(struct thread
*), int type
,
859 void *arg
, struct timeval
*time_relative
,
860 struct thread
**t_ptr
, debugargdef
)
862 struct thread
*thread
;
863 struct pqueue
*queue
;
867 assert(type
== THREAD_TIMER
);
868 assert(time_relative
);
870 pthread_mutex_lock(&m
->mtx
);
873 && *t_ptr
) // thread is already scheduled; don't reschedule
875 pthread_mutex_unlock(&m
->mtx
);
880 thread
= thread_get(m
, type
, func
, arg
, debugargpass
);
882 pthread_mutex_lock(&thread
->mtx
);
884 monotime(&thread
->u
.sands
);
885 timeradd(&thread
->u
.sands
, time_relative
,
887 pqueue_enqueue(thread
, queue
);
893 pthread_mutex_unlock(&thread
->mtx
);
897 pthread_mutex_unlock(&m
->mtx
);
903 /* Add timer event thread. */
904 struct thread
*funcname_thread_add_timer(struct thread_master
*m
,
905 int (*func
)(struct thread
*),
906 void *arg
, long timer
,
907 struct thread
**t_ptr
, debugargdef
)
916 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
917 &trel
, t_ptr
, debugargpass
);
920 /* Add timer event thread with "millisecond" resolution */
921 struct thread
*funcname_thread_add_timer_msec(struct thread_master
*m
,
922 int (*func
)(struct thread
*),
923 void *arg
, long timer
,
924 struct thread
**t_ptr
,
931 trel
.tv_sec
= timer
/ 1000;
932 trel
.tv_usec
= 1000 * (timer
% 1000);
934 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
935 &trel
, t_ptr
, debugargpass
);
938 /* Add timer event thread with "millisecond" resolution */
939 struct thread
*funcname_thread_add_timer_tv(struct thread_master
*m
,
940 int (*func
)(struct thread
*),
941 void *arg
, struct timeval
*tv
,
942 struct thread
**t_ptr
, debugargdef
)
944 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
, tv
,
945 t_ptr
, debugargpass
);
948 /* Add simple event thread. */
949 struct thread
*funcname_thread_add_event(struct thread_master
*m
,
950 int (*func
)(struct thread
*),
952 struct thread
**t_ptr
, debugargdef
)
954 struct thread
*thread
;
958 pthread_mutex_lock(&m
->mtx
);
961 && *t_ptr
) // thread is already scheduled; don't reschedule
963 pthread_mutex_unlock(&m
->mtx
);
967 thread
= thread_get(m
, THREAD_EVENT
, func
, arg
, debugargpass
);
968 pthread_mutex_lock(&thread
->mtx
);
971 thread_list_add(&m
->event
, thread
);
973 pthread_mutex_unlock(&thread
->mtx
);
982 pthread_mutex_unlock(&m
->mtx
);
987 /* Thread cancellation ------------------------------------------------------ */
990 * NOT's out the .events field of pollfd corresponding to the given file
991 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
993 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
994 * implementation for details.
998 * @param state the event to cancel. One or more (OR'd together) of the
1003 static void thread_cancel_rw(struct thread_master
*master
, int fd
, short state
)
1007 /* Cancel POLLHUP too just in case some bozo set it */
1010 /* find the index of corresponding pollfd */
1013 for (i
= 0; i
< master
->handler
.pfdcount
; i
++)
1014 if (master
->handler
.pfds
[i
].fd
== fd
) {
1021 "[!] Received cancellation request for nonexistent rw job");
1022 zlog_debug("[!] threadmaster: %s | fd: %d",
1023 master
->name
? master
->name
: "", fd
);
1027 /* NOT out event. */
1028 master
->handler
.pfds
[i
].events
&= ~(state
);
1030 /* If all events are canceled, delete / resize the pollfd array. */
1031 if (master
->handler
.pfds
[i
].events
== 0) {
1032 memmove(master
->handler
.pfds
+ i
, master
->handler
.pfds
+ i
+ 1,
1033 (master
->handler
.pfdcount
- i
- 1)
1034 * sizeof(struct pollfd
));
1035 master
->handler
.pfdcount
--;
1038 /* If we have the same pollfd in the copy, perform the same operations,
1039 * otherwise return. */
1040 if (i
>= master
->handler
.copycount
)
1043 master
->handler
.copy
[i
].events
&= ~(state
);
1045 if (master
->handler
.copy
[i
].events
== 0) {
1046 memmove(master
->handler
.copy
+ i
, master
->handler
.copy
+ i
+ 1,
1047 (master
->handler
.copycount
- i
- 1)
1048 * sizeof(struct pollfd
));
1049 master
->handler
.copycount
--;
1054 * Process cancellation requests.
1056 * This may only be run from the pthread which owns the thread_master.
1058 * @param master the thread master to process
1059 * @REQUIRE master->mtx
1061 static void do_thread_cancel(struct thread_master
*master
)
1063 struct thread_list
*list
= NULL
;
1064 struct pqueue
*queue
= NULL
;
1065 struct thread
**thread_array
= NULL
;
1066 struct thread
*thread
;
1068 struct cancel_req
*cr
;
1069 struct listnode
*ln
;
1070 for (ALL_LIST_ELEMENTS_RO(master
->cancel_req
, ln
, cr
)) {
1071 /* If this is an event object cancellation, linear search
1073 * list deleting any events which have the specified argument.
1075 * need to check every thread in the ready queue. */
1078 thread
= master
->event
.head
;
1084 if (t
->arg
== cr
->eventobj
) {
1085 thread_list_delete(&master
->event
, t
);
1088 thread_add_unuse(master
, t
);
1092 thread
= master
->ready
.head
;
1097 if (t
->arg
== cr
->eventobj
) {
1098 thread_list_delete(&master
->ready
, t
);
1101 thread_add_unuse(master
, t
);
1107 /* The pointer varies depending on whether the cancellation
1109 * made asynchronously or not. If it was, we need to check
1111 * thread even exists anymore before cancelling it. */
1112 thread
= (cr
->thread
) ? cr
->thread
: *cr
->threadref
;
1117 /* Determine the appropriate queue to cancel the thread from */
1118 switch (thread
->type
) {
1120 thread_cancel_rw(master
, thread
->u
.fd
, POLLIN
);
1121 thread_array
= master
->read
;
1124 thread_cancel_rw(master
, thread
->u
.fd
, POLLOUT
);
1125 thread_array
= master
->write
;
1128 queue
= master
->timer
;
1131 list
= &master
->event
;
1134 list
= &master
->ready
;
1142 assert(thread
->index
>= 0);
1143 assert(thread
== queue
->array
[thread
->index
]);
1144 pqueue_remove_at(thread
->index
, queue
);
1146 thread_list_delete(list
, thread
);
1147 } else if (thread_array
) {
1148 thread_array
[thread
->u
.fd
] = NULL
;
1150 assert(!"Thread should be either in queue or list or array!");
1154 *thread
->ref
= NULL
;
1156 thread_add_unuse(thread
->master
, thread
);
1159 /* Delete and free all cancellation requests */
1160 list_delete_all_node(master
->cancel_req
);
1162 /* Wake up any threads which may be blocked in thread_cancel_async() */
1163 master
->canceled
= true;
1164 pthread_cond_broadcast(&master
->cancel_cond
);
1168 * Cancel any events which have the specified argument.
1172 * @param m the thread_master to cancel from
1173 * @param arg the argument passed when creating the event
1175 void thread_cancel_event(struct thread_master
*master
, void *arg
)
1177 assert(master
->owner
== pthread_self());
1179 pthread_mutex_lock(&master
->mtx
);
1181 struct cancel_req
*cr
=
1182 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1184 listnode_add(master
->cancel_req
, cr
);
1185 do_thread_cancel(master
);
1187 pthread_mutex_unlock(&master
->mtx
);
1191 * Cancel a specific task.
1195 * @param thread task to cancel
1197 void thread_cancel(struct thread
*thread
)
1199 struct thread_master
*master
= thread
->master
;
1201 assert(master
->owner
== pthread_self());
1203 pthread_mutex_lock(&master
->mtx
);
1205 struct cancel_req
*cr
=
1206 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1207 cr
->thread
= thread
;
1208 listnode_add(master
->cancel_req
, cr
);
1209 do_thread_cancel(master
);
1211 pthread_mutex_unlock(&master
->mtx
);
1215 * Asynchronous cancellation.
1217 * Called with either a struct thread ** or void * to an event argument,
1218 * this function posts the correct cancellation request and blocks until it is
1221 * If the thread is currently running, execution blocks until it completes.
1223 * The last two parameters are mutually exclusive, i.e. if you pass one the
1224 * other must be NULL.
1226 * When the cancellation procedure executes on the target thread_master, the
1227 * thread * provided is checked for nullity. If it is null, the thread is
1228 * assumed to no longer exist and the cancellation request is a no-op. Thus
1229 * users of this API must pass a back-reference when scheduling the original
1234 * @param master the thread master with the relevant event / task
1235 * @param thread pointer to thread to cancel
1236 * @param eventobj the event
1238 void thread_cancel_async(struct thread_master
*master
, struct thread
**thread
,
1241 assert(!(thread
&& eventobj
) && (thread
|| eventobj
));
1242 assert(master
->owner
!= pthread_self());
1244 pthread_mutex_lock(&master
->mtx
);
1246 master
->canceled
= false;
1249 struct cancel_req
*cr
=
1250 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1251 cr
->threadref
= thread
;
1252 listnode_add(master
->cancel_req
, cr
);
1253 } else if (eventobj
) {
1254 struct cancel_req
*cr
=
1255 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1256 cr
->eventobj
= eventobj
;
1257 listnode_add(master
->cancel_req
, cr
);
1261 while (!master
->canceled
)
1262 pthread_cond_wait(&master
->cancel_cond
, &master
->mtx
);
1264 pthread_mutex_unlock(&master
->mtx
);
1266 /* ------------------------------------------------------------------------- */
1268 static struct timeval
*thread_timer_wait(struct pqueue
*queue
,
1269 struct timeval
*timer_val
)
1272 struct thread
*next_timer
= queue
->array
[0];
1273 monotime_until(&next_timer
->u
.sands
, timer_val
);
1279 static struct thread
*thread_run(struct thread_master
*m
, struct thread
*thread
,
1280 struct thread
*fetch
)
1283 thread_add_unuse(m
, thread
);
1287 static int thread_process_io_helper(struct thread_master
*m
,
1288 struct thread
*thread
, short state
, int pos
)
1290 struct thread
**thread_array
;
1295 if (thread
->type
== THREAD_READ
)
1296 thread_array
= m
->read
;
1298 thread_array
= m
->write
;
1300 thread_array
[thread
->u
.fd
] = NULL
;
1301 thread_list_add(&m
->ready
, thread
);
1302 thread
->type
= THREAD_READY
;
1303 /* if another pthread scheduled this file descriptor for the event we're
1304 * responding to, no problem; we're getting to it now */
1305 thread
->master
->handler
.pfds
[pos
].events
&= ~(state
);
1310 * Process I/O events.
1312 * Walks through file descriptor array looking for those pollfds whose .revents
1313 * field has something interesting. Deletes any invalid file descriptors.
1315 * @param m the thread master
1316 * @param num the number of active file descriptors (return value of poll())
1318 static void thread_process_io(struct thread_master
*m
, unsigned int num
)
1320 unsigned int ready
= 0;
1321 struct pollfd
*pfds
= m
->handler
.copy
;
1323 for (nfds_t i
= 0; i
< m
->handler
.copycount
&& ready
< num
; ++i
) {
1324 /* no event for current fd? immediately continue */
1325 if (pfds
[i
].revents
== 0)
1330 /* Unless someone has called thread_cancel from another pthread,
1332 * thing that could have changed in m->handler.pfds while we
1334 * asleep is the .events field in a given pollfd. Barring
1336 * that value should be a superset of the values we have in our
1338 * there's no need to update it. Similarily, barring deletion,
1340 * should still be a valid index into the master's pfds. */
1341 if (pfds
[i
].revents
& (POLLIN
| POLLHUP
))
1342 thread_process_io_helper(m
, m
->read
[pfds
[i
].fd
], POLLIN
,
1344 if (pfds
[i
].revents
& POLLOUT
)
1345 thread_process_io_helper(m
, m
->write
[pfds
[i
].fd
],
1348 /* if one of our file descriptors is garbage, remove the same
1350 * both pfds + update sizes and index */
1351 if (pfds
[i
].revents
& POLLNVAL
) {
1352 memmove(m
->handler
.pfds
+ i
, m
->handler
.pfds
+ i
+ 1,
1353 (m
->handler
.pfdcount
- i
- 1)
1354 * sizeof(struct pollfd
));
1355 m
->handler
.pfdcount
--;
1357 memmove(pfds
+ i
, pfds
+ i
+ 1,
1358 (m
->handler
.copycount
- i
- 1)
1359 * sizeof(struct pollfd
));
1360 m
->handler
.copycount
--;
1367 /* Add all timers that have popped to the ready list. */
1368 static unsigned int thread_process_timers(struct pqueue
*queue
,
1369 struct timeval
*timenow
)
1371 struct thread
*thread
;
1372 unsigned int ready
= 0;
1374 while (queue
->size
) {
1375 thread
= queue
->array
[0];
1376 if (timercmp(timenow
, &thread
->u
.sands
, <))
1378 pqueue_dequeue(queue
);
1379 thread
->type
= THREAD_READY
;
1380 thread_list_add(&thread
->master
->ready
, thread
);
1386 /* process a list en masse, e.g. for event thread lists */
1387 static unsigned int thread_process(struct thread_list
*list
)
1389 struct thread
*thread
;
1390 struct thread
*next
;
1391 unsigned int ready
= 0;
1393 for (thread
= list
->head
; thread
; thread
= next
) {
1394 next
= thread
->next
;
1395 thread_list_delete(list
, thread
);
1396 thread
->type
= THREAD_READY
;
1397 thread_list_add(&thread
->master
->ready
, thread
);
1404 /* Fetch next ready thread. */
1405 struct thread
*thread_fetch(struct thread_master
*m
, struct thread
*fetch
)
1407 struct thread
*thread
= NULL
;
1409 struct timeval zerotime
= {0, 0};
1411 struct timeval
*tw
= NULL
;
1416 /* Handle signals if any */
1417 if (m
->handle_signals
)
1418 quagga_sigevent_process();
1420 pthread_mutex_lock(&m
->mtx
);
1422 /* Process any pending cancellation requests */
1423 do_thread_cancel(m
);
1426 * Attempt to flush ready queue before going into poll().
1427 * This is performance-critical. Think twice before modifying.
1429 if ((thread
= thread_trim_head(&m
->ready
))) {
1430 fetch
= thread_run(m
, thread
, fetch
);
1433 pthread_mutex_unlock(&m
->mtx
);
1437 /* otherwise, tick through scheduling sequence */
1440 * Post events to ready queue. This must come before the
1441 * following block since events should occur immediately
1443 thread_process(&m
->event
);
1446 * If there are no tasks on the ready queue, we will poll()
1447 * until a timer expires or we receive I/O, whichever comes
1448 * first. The strategy for doing this is:
1450 * - If there are events pending, set the poll() timeout to zero
1451 * - If there are no events pending, but there are timers
1453 * timeout to the smallest remaining time on any timer
1454 * - If there are neither timers nor events pending, but there
1456 * descriptors pending, block indefinitely in poll()
1457 * - If nothing is pending, it's time for the application to die
1459 * In every case except the last, we need to hit poll() at least
1460 * once per loop to avoid starvation by events
1462 if (m
->ready
.count
== 0)
1463 tw
= thread_timer_wait(m
->timer
, &tv
);
1465 if (m
->ready
.count
!= 0 || (tw
&& !timercmp(tw
, &zerotime
, >)))
1468 if (!tw
&& m
->handler
.pfdcount
== 0) { /* die */
1469 pthread_mutex_unlock(&m
->mtx
);
1475 * Copy pollfd array + # active pollfds in it. Not necessary to
1476 * copy the array size as this is fixed.
1478 m
->handler
.copycount
= m
->handler
.pfdcount
;
1479 memcpy(m
->handler
.copy
, m
->handler
.pfds
,
1480 m
->handler
.copycount
* sizeof(struct pollfd
));
1482 pthread_mutex_unlock(&m
->mtx
);
1484 num
= fd_poll(m
, m
->handler
.copy
, m
->handler
.pfdsize
,
1485 m
->handler
.copycount
, tw
);
1487 pthread_mutex_lock(&m
->mtx
);
1489 /* Handle any errors received in poll() */
1491 if (errno
== EINTR
) {
1492 pthread_mutex_unlock(&m
->mtx
);
1493 /* loop around to signal handler */
1498 zlog_warn("poll() error: %s", safe_strerror(errno
));
1499 pthread_mutex_unlock(&m
->mtx
);
1504 /* Post timers to ready queue. */
1506 thread_process_timers(m
->timer
, &now
);
1508 /* Post I/O to ready queue. */
1510 thread_process_io(m
, num
);
1512 pthread_mutex_unlock(&m
->mtx
);
1514 } while (!thread
&& m
->spin
);
1519 static unsigned long timeval_elapsed(struct timeval a
, struct timeval b
)
1521 return (((a
.tv_sec
- b
.tv_sec
) * TIMER_SECOND_MICRO
)
1522 + (a
.tv_usec
- b
.tv_usec
));
1525 unsigned long thread_consumed_time(RUSAGE_T
*now
, RUSAGE_T
*start
,
1526 unsigned long *cputime
)
1528 /* This is 'user + sys' time. */
1529 *cputime
= timeval_elapsed(now
->cpu
.ru_utime
, start
->cpu
.ru_utime
)
1530 + timeval_elapsed(now
->cpu
.ru_stime
, start
->cpu
.ru_stime
);
1531 return timeval_elapsed(now
->real
, start
->real
);
1534 /* We should aim to yield after yield milliseconds, which defaults
1535 to THREAD_YIELD_TIME_SLOT .
1536 Note: we are using real (wall clock) time for this calculation.
1537 It could be argued that CPU time may make more sense in certain
1538 contexts. The things to consider are whether the thread may have
1539 blocked (in which case wall time increases, but CPU time does not),
1540 or whether the system is heavily loaded with other processes competing
1541 for CPU time. On balance, wall clock time seems to make sense.
1542 Plus it has the added benefit that gettimeofday should be faster
1543 than calling getrusage. */
1544 int thread_should_yield(struct thread
*thread
)
1547 pthread_mutex_lock(&thread
->mtx
);
1549 result
= monotime_since(&thread
->real
, NULL
)
1550 > (int64_t)thread
->yield
;
1552 pthread_mutex_unlock(&thread
->mtx
);
1556 void thread_set_yield_time(struct thread
*thread
, unsigned long yield_time
)
1558 pthread_mutex_lock(&thread
->mtx
);
1560 thread
->yield
= yield_time
;
1562 pthread_mutex_unlock(&thread
->mtx
);
1565 void thread_getrusage(RUSAGE_T
*r
)
1568 getrusage(RUSAGE_SELF
, &(r
->cpu
));
1574 * This function will atomically update the thread's usage history. At present
1575 * this is the only spot where usage history is written. Nevertheless the code
1576 * has been written such that the introduction of writers in the future should
1577 * not need to update it provided the writers atomically perform only the
1578 * operations done here, i.e. updating the total and maximum times. In
1579 * particular, the maximum real and cpu times must be monotonically increasing
1580 * or this code is not correct.
1582 void thread_call(struct thread
*thread
)
1584 _Atomic
unsigned long realtime
, cputime
;
1586 unsigned long helper
;
1587 RUSAGE_T before
, after
;
1590 thread
->real
= before
.real
;
1592 pthread_setspecific(thread_current
, thread
);
1593 (*thread
->func
)(thread
);
1594 pthread_setspecific(thread_current
, NULL
);
1598 realtime
= thread_consumed_time(&after
, &before
, &helper
);
1601 /* update realtime */
1602 atomic_fetch_add_explicit(&thread
->hist
->real
.total
, realtime
,
1603 memory_order_seq_cst
);
1604 exp
= atomic_load_explicit(&thread
->hist
->real
.max
,
1605 memory_order_seq_cst
);
1606 while (exp
< realtime
1607 && !atomic_compare_exchange_weak_explicit(
1608 &thread
->hist
->real
.max
, &exp
, realtime
,
1609 memory_order_seq_cst
, memory_order_seq_cst
))
1612 /* update cputime */
1613 atomic_fetch_add_explicit(&thread
->hist
->cpu
.total
, cputime
,
1614 memory_order_seq_cst
);
1615 exp
= atomic_load_explicit(&thread
->hist
->cpu
.max
,
1616 memory_order_seq_cst
);
1617 while (exp
< cputime
1618 && !atomic_compare_exchange_weak_explicit(
1619 &thread
->hist
->cpu
.max
, &exp
, cputime
,
1620 memory_order_seq_cst
, memory_order_seq_cst
))
1623 atomic_fetch_add_explicit(&thread
->hist
->total_calls
, 1,
1624 memory_order_seq_cst
);
1625 atomic_fetch_or_explicit(&thread
->hist
->types
, 1 << thread
->add_type
,
1626 memory_order_seq_cst
);
1628 #ifdef CONSUMED_TIME_CHECK
1629 if (realtime
> CONSUMED_TIME_CHECK
) {
1631 * We have a CPU Hog on our hands.
1632 * Whinge about it now, so we're aware this is yet another task
1636 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1637 thread
->funcname
, (unsigned long)thread
->func
,
1638 realtime
/ 1000, cputime
/ 1000);
1640 #endif /* CONSUMED_TIME_CHECK */
1643 /* Execute thread */
1644 void funcname_thread_execute(struct thread_master
*m
,
1645 int (*func
)(struct thread
*), void *arg
, int val
,
1648 struct thread
*thread
;
1650 /* Get or allocate new thread to execute. */
1651 pthread_mutex_lock(&m
->mtx
);
1653 thread
= thread_get(m
, THREAD_EVENT
, func
, arg
, debugargpass
);
1655 /* Set its event value. */
1656 pthread_mutex_lock(&thread
->mtx
);
1658 thread
->add_type
= THREAD_EXECUTE
;
1659 thread
->u
.val
= val
;
1660 thread
->ref
= &thread
;
1662 pthread_mutex_unlock(&thread
->mtx
);
1664 pthread_mutex_unlock(&m
->mtx
);
1666 /* Execute thread doing all accounting. */
1667 thread_call(thread
);
1669 /* Give back or free thread. */
1670 thread_add_unuse(m
, thread
);