1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
4 * This file is part of GNU Zebra.
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 #include <sys/resource.h>
35 #include "frratomic.h"
36 #include "lib_errors.h"
38 DEFINE_MTYPE_STATIC(LIB
, THREAD
, "Thread")
39 DEFINE_MTYPE_STATIC(LIB
, THREAD_MASTER
, "Thread master")
40 DEFINE_MTYPE_STATIC(LIB
, THREAD_POLL
, "Thread Poll Info")
41 DEFINE_MTYPE_STATIC(LIB
, THREAD_STATS
, "Thread stats")
43 #if defined(__APPLE__)
44 #include <mach/mach.h>
45 #include <mach/mach_time.h>
50 static unsigned char wakebyte = 0x01; \
51 write(m->io_pipe[1], &wakebyte, 1); \
54 /* control variable for initializer */
55 static pthread_once_t init_once
= PTHREAD_ONCE_INIT
;
56 pthread_key_t thread_current
;
58 static pthread_mutex_t masters_mtx
= PTHREAD_MUTEX_INITIALIZER
;
59 static struct list
*masters
;
61 static void thread_free(struct thread_master
*master
, struct thread
*thread
);
63 /* CLI start ---------------------------------------------------------------- */
64 static unsigned int cpu_record_hash_key(struct cpu_thread_history
*a
)
66 int size
= sizeof(a
->func
);
68 return jhash(&a
->func
, size
, 0);
71 static bool cpu_record_hash_cmp(const struct cpu_thread_history
*a
,
72 const struct cpu_thread_history
*b
)
74 return a
->func
== b
->func
;
77 static void *cpu_record_hash_alloc(struct cpu_thread_history
*a
)
79 struct cpu_thread_history
*new;
80 new = XCALLOC(MTYPE_THREAD_STATS
, sizeof(struct cpu_thread_history
));
82 new->funcname
= a
->funcname
;
86 static void cpu_record_hash_free(void *a
)
88 struct cpu_thread_history
*hist
= a
;
90 XFREE(MTYPE_THREAD_STATS
, hist
);
93 static void vty_out_cpu_thread_history(struct vty
*vty
,
94 struct cpu_thread_history
*a
)
96 vty_out(vty
, "%5"PRIdFAST32
" %10lu.%03lu %9"PRIuFAST32
97 " %8lu %9lu %8lu %9lu", a
->total_active
,
98 a
->cpu
.total
/ 1000, a
->cpu
.total
% 1000, a
->total_calls
,
99 a
->cpu
.total
/ a
->total_calls
, a
->cpu
.max
,
100 a
->real
.total
/ a
->total_calls
, a
->real
.max
);
101 vty_out(vty
, " %c%c%c%c%c %s\n",
102 a
->types
& (1 << THREAD_READ
) ? 'R' : ' ',
103 a
->types
& (1 << THREAD_WRITE
) ? 'W' : ' ',
104 a
->types
& (1 << THREAD_TIMER
) ? 'T' : ' ',
105 a
->types
& (1 << THREAD_EVENT
) ? 'E' : ' ',
106 a
->types
& (1 << THREAD_EXECUTE
) ? 'X' : ' ', a
->funcname
);
109 static void cpu_record_hash_print(struct hash_bucket
*bucket
, void *args
[])
111 struct cpu_thread_history
*totals
= args
[0];
112 struct cpu_thread_history copy
;
113 struct vty
*vty
= args
[1];
114 uint8_t *filter
= args
[2];
116 struct cpu_thread_history
*a
= bucket
->data
;
119 atomic_load_explicit(&a
->total_active
, memory_order_seq_cst
);
121 atomic_load_explicit(&a
->total_calls
, memory_order_seq_cst
);
123 atomic_load_explicit(&a
->cpu
.total
, memory_order_seq_cst
);
124 copy
.cpu
.max
= atomic_load_explicit(&a
->cpu
.max
, memory_order_seq_cst
);
126 atomic_load_explicit(&a
->real
.total
, memory_order_seq_cst
);
128 atomic_load_explicit(&a
->real
.max
, memory_order_seq_cst
);
129 copy
.types
= atomic_load_explicit(&a
->types
, memory_order_seq_cst
);
130 copy
.funcname
= a
->funcname
;
132 if (!(copy
.types
& *filter
))
135 vty_out_cpu_thread_history(vty
, ©
);
136 totals
->total_active
+= copy
.total_active
;
137 totals
->total_calls
+= copy
.total_calls
;
138 totals
->real
.total
+= copy
.real
.total
;
139 if (totals
->real
.max
< copy
.real
.max
)
140 totals
->real
.max
= copy
.real
.max
;
141 totals
->cpu
.total
+= copy
.cpu
.total
;
142 if (totals
->cpu
.max
< copy
.cpu
.max
)
143 totals
->cpu
.max
= copy
.cpu
.max
;
146 static void cpu_record_print(struct vty
*vty
, uint8_t filter
)
148 struct cpu_thread_history tmp
;
149 void *args
[3] = {&tmp
, vty
, &filter
};
150 struct thread_master
*m
;
153 memset(&tmp
, 0, sizeof tmp
);
154 tmp
.funcname
= "TOTAL";
157 pthread_mutex_lock(&masters_mtx
);
159 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
160 const char *name
= m
->name
? m
->name
: "main";
162 char underline
[strlen(name
) + 1];
163 memset(underline
, '-', sizeof(underline
));
164 underline
[sizeof(underline
) - 1] = '\0';
167 vty_out(vty
, "Showing statistics for pthread %s\n",
169 vty_out(vty
, "-------------------------------%s\n",
171 vty_out(vty
, "%21s %18s %18s\n", "",
172 "CPU (user+system):", "Real (wall-clock):");
174 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
175 vty_out(vty
, " Avg uSec Max uSecs");
176 vty_out(vty
, " Type Thread\n");
178 if (m
->cpu_record
->count
)
181 (void (*)(struct hash_bucket
*,
182 void *))cpu_record_hash_print
,
185 vty_out(vty
, "No data to display yet.\n");
190 pthread_mutex_unlock(&masters_mtx
);
193 vty_out(vty
, "Total thread statistics\n");
194 vty_out(vty
, "-------------------------\n");
195 vty_out(vty
, "%21s %18s %18s\n", "",
196 "CPU (user+system):", "Real (wall-clock):");
197 vty_out(vty
, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
198 vty_out(vty
, " Avg uSec Max uSecs");
199 vty_out(vty
, " Type Thread\n");
201 if (tmp
.total_calls
> 0)
202 vty_out_cpu_thread_history(vty
, &tmp
);
205 static void cpu_record_hash_clear(struct hash_bucket
*bucket
, void *args
[])
207 uint8_t *filter
= args
[0];
208 struct hash
*cpu_record
= args
[1];
210 struct cpu_thread_history
*a
= bucket
->data
;
212 if (!(a
->types
& *filter
))
215 hash_release(cpu_record
, bucket
->data
);
218 static void cpu_record_clear(uint8_t filter
)
220 uint8_t *tmp
= &filter
;
221 struct thread_master
*m
;
224 pthread_mutex_lock(&masters_mtx
);
226 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
227 pthread_mutex_lock(&m
->mtx
);
229 void *args
[2] = {tmp
, m
->cpu_record
};
232 (void (*)(struct hash_bucket
*,
233 void *))cpu_record_hash_clear
,
236 pthread_mutex_unlock(&m
->mtx
);
239 pthread_mutex_unlock(&masters_mtx
);
242 static uint8_t parse_filter(const char *filterstr
)
247 while (filterstr
[i
] != '\0') {
248 switch (filterstr
[i
]) {
251 filter
|= (1 << THREAD_READ
);
255 filter
|= (1 << THREAD_WRITE
);
259 filter
|= (1 << THREAD_TIMER
);
263 filter
|= (1 << THREAD_EVENT
);
267 filter
|= (1 << THREAD_EXECUTE
);
277 DEFUN (show_thread_cpu
,
279 "show thread cpu [FILTER]",
281 "Thread information\n"
283 "Display filter (rwtexb)\n")
285 uint8_t filter
= (uint8_t)-1U;
288 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
289 filter
= parse_filter(argv
[idx
]->arg
);
292 "Invalid filter \"%s\" specified; must contain at least"
299 cpu_record_print(vty
, filter
);
303 static void show_thread_poll_helper(struct vty
*vty
, struct thread_master
*m
)
305 const char *name
= m
->name
? m
->name
: "main";
306 char underline
[strlen(name
) + 1];
309 memset(underline
, '-', sizeof(underline
));
310 underline
[sizeof(underline
) - 1] = '\0';
312 vty_out(vty
, "\nShowing poll FD's for %s\n", name
);
313 vty_out(vty
, "----------------------%s\n", underline
);
314 vty_out(vty
, "Count: %u\n", (uint32_t)m
->handler
.pfdcount
);
315 for (i
= 0; i
< m
->handler
.pfdcount
; i
++)
316 vty_out(vty
, "\t%6d fd:%6d events:%2d revents:%2d\n", i
,
317 m
->handler
.pfds
[i
].fd
,
318 m
->handler
.pfds
[i
].events
,
319 m
->handler
.pfds
[i
].revents
);
322 DEFUN (show_thread_poll
,
323 show_thread_poll_cmd
,
326 "Thread information\n"
327 "Show poll FD's and information\n")
329 struct listnode
*node
;
330 struct thread_master
*m
;
332 pthread_mutex_lock(&masters_mtx
);
334 for (ALL_LIST_ELEMENTS_RO(masters
, node
, m
)) {
335 show_thread_poll_helper(vty
, m
);
338 pthread_mutex_unlock(&masters_mtx
);
344 DEFUN (clear_thread_cpu
,
345 clear_thread_cpu_cmd
,
346 "clear thread cpu [FILTER]",
347 "Clear stored data in all pthreads\n"
348 "Thread information\n"
350 "Display filter (rwtexb)\n")
352 uint8_t filter
= (uint8_t)-1U;
355 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
356 filter
= parse_filter(argv
[idx
]->arg
);
359 "Invalid filter \"%s\" specified; must contain at least"
366 cpu_record_clear(filter
);
370 void thread_cmd_init(void)
372 install_element(VIEW_NODE
, &show_thread_cpu_cmd
);
373 install_element(VIEW_NODE
, &show_thread_poll_cmd
);
374 install_element(ENABLE_NODE
, &clear_thread_cpu_cmd
);
376 /* CLI end ------------------------------------------------------------------ */
379 static int thread_timer_cmp(void *a
, void *b
)
381 struct thread
*thread_a
= a
;
382 struct thread
*thread_b
= b
;
384 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, <))
386 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, >))
391 static void thread_timer_update(void *node
, int actual_position
)
393 struct thread
*thread
= node
;
395 thread
->index
= actual_position
;
398 static void cancelreq_del(void *cr
)
400 XFREE(MTYPE_TMP
, cr
);
403 /* initializer, only ever called once */
404 static void initializer(void)
406 pthread_key_create(&thread_current
, NULL
);
409 struct thread_master
*thread_master_create(const char *name
)
411 struct thread_master
*rv
;
414 pthread_once(&init_once
, &initializer
);
416 rv
= XCALLOC(MTYPE_THREAD_MASTER
, sizeof(struct thread_master
));
418 /* Initialize master mutex */
419 pthread_mutex_init(&rv
->mtx
, NULL
);
420 pthread_cond_init(&rv
->cancel_cond
, NULL
);
423 rv
->name
= name
? XSTRDUP(MTYPE_THREAD_MASTER
, name
) : NULL
;
425 /* Initialize I/O task data structures */
426 getrlimit(RLIMIT_NOFILE
, &limit
);
427 rv
->fd_limit
= (int)limit
.rlim_cur
;
428 rv
->read
= XCALLOC(MTYPE_THREAD_POLL
,
429 sizeof(struct thread
*) * rv
->fd_limit
);
431 rv
->write
= XCALLOC(MTYPE_THREAD_POLL
,
432 sizeof(struct thread
*) * rv
->fd_limit
);
434 rv
->cpu_record
= hash_create_size(
435 8, (unsigned int (*)(void *))cpu_record_hash_key
,
436 (bool (*)(const void *, const void *))cpu_record_hash_cmp
,
440 /* Initialize the timer queues */
441 rv
->timer
= pqueue_create();
442 rv
->timer
->cmp
= thread_timer_cmp
;
443 rv
->timer
->update
= thread_timer_update
;
445 /* Initialize thread_fetch() settings */
447 rv
->handle_signals
= true;
449 /* Set pthread owner, should be updated by actual owner */
450 rv
->owner
= pthread_self();
451 rv
->cancel_req
= list_new();
452 rv
->cancel_req
->del
= cancelreq_del
;
455 /* Initialize pipe poker */
457 set_nonblocking(rv
->io_pipe
[0]);
458 set_nonblocking(rv
->io_pipe
[1]);
460 /* Initialize data structures for poll() */
461 rv
->handler
.pfdsize
= rv
->fd_limit
;
462 rv
->handler
.pfdcount
= 0;
463 rv
->handler
.pfds
= XCALLOC(MTYPE_THREAD_MASTER
,
464 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
465 rv
->handler
.copy
= XCALLOC(MTYPE_THREAD_MASTER
,
466 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
468 /* add to list of threadmasters */
469 pthread_mutex_lock(&masters_mtx
);
472 masters
= list_new();
474 listnode_add(masters
, rv
);
476 pthread_mutex_unlock(&masters_mtx
);
481 void thread_master_set_name(struct thread_master
*master
, const char *name
)
483 pthread_mutex_lock(&master
->mtx
);
485 XFREE(MTYPE_THREAD_MASTER
, master
->name
);
486 master
->name
= XSTRDUP(MTYPE_THREAD_MASTER
, name
);
488 pthread_mutex_unlock(&master
->mtx
);
491 /* Add a new thread to the list. */
492 static void thread_list_add(struct thread_list
*list
, struct thread
*thread
)
495 thread
->prev
= list
->tail
;
497 list
->tail
->next
= thread
;
504 /* Delete a thread from the list. */
505 static struct thread
*thread_list_delete(struct thread_list
*list
,
506 struct thread
*thread
)
509 thread
->next
->prev
= thread
->prev
;
511 list
->tail
= thread
->prev
;
513 thread
->prev
->next
= thread
->next
;
515 list
->head
= thread
->next
;
516 thread
->next
= thread
->prev
= NULL
;
521 /* Thread list is empty or not. */
522 static int thread_empty(struct thread_list
*list
)
524 return list
->head
? 0 : 1;
527 /* Delete top of the list and return it. */
528 static struct thread
*thread_trim_head(struct thread_list
*list
)
530 if (!thread_empty(list
))
531 return thread_list_delete(list
, list
->head
);
535 #define THREAD_UNUSED_DEPTH 10
537 /* Move thread to unuse list. */
538 static void thread_add_unuse(struct thread_master
*m
, struct thread
*thread
)
540 pthread_mutex_t mtxc
= thread
->mtx
;
542 assert(m
!= NULL
&& thread
!= NULL
);
543 assert(thread
->next
== NULL
);
544 assert(thread
->prev
== NULL
);
546 thread
->hist
->total_active
--;
547 memset(thread
, 0, sizeof(struct thread
));
548 thread
->type
= THREAD_UNUSED
;
550 /* Restore the thread mutex context. */
553 if (m
->unuse
.count
< THREAD_UNUSED_DEPTH
) {
554 thread_list_add(&m
->unuse
, thread
);
558 thread_free(m
, thread
);
561 /* Free all unused thread. */
562 static void thread_list_free(struct thread_master
*m
, struct thread_list
*list
)
567 for (t
= list
->head
; t
; t
= next
) {
574 static void thread_array_free(struct thread_master
*m
,
575 struct thread
**thread_array
)
580 for (index
= 0; index
< m
->fd_limit
; ++index
) {
581 t
= thread_array
[index
];
583 thread_array
[index
] = NULL
;
587 XFREE(MTYPE_THREAD_POLL
, thread_array
);
590 static void thread_queue_free(struct thread_master
*m
, struct pqueue
*queue
)
594 for (i
= 0; i
< queue
->size
; i
++)
595 thread_free(m
, queue
->array
[i
]);
597 pqueue_delete(queue
);
601 * thread_master_free_unused
603 * As threads are finished with they are put on the
604 * unuse list for later reuse.
605 * If we are shutting down, Free up unused threads
606 * So we can see if we forget to shut anything off
608 void thread_master_free_unused(struct thread_master
*m
)
610 pthread_mutex_lock(&m
->mtx
);
613 while ((t
= thread_trim_head(&m
->unuse
)) != NULL
) {
617 pthread_mutex_unlock(&m
->mtx
);
620 /* Stop thread scheduler. */
621 void thread_master_free(struct thread_master
*m
)
623 pthread_mutex_lock(&masters_mtx
);
625 listnode_delete(masters
, m
);
626 if (masters
->count
== 0) {
627 list_delete(&masters
);
630 pthread_mutex_unlock(&masters_mtx
);
632 thread_array_free(m
, m
->read
);
633 thread_array_free(m
, m
->write
);
634 thread_queue_free(m
, m
->timer
);
635 thread_list_free(m
, &m
->event
);
636 thread_list_free(m
, &m
->ready
);
637 thread_list_free(m
, &m
->unuse
);
638 pthread_mutex_destroy(&m
->mtx
);
639 pthread_cond_destroy(&m
->cancel_cond
);
640 close(m
->io_pipe
[0]);
641 close(m
->io_pipe
[1]);
642 list_delete(&m
->cancel_req
);
643 m
->cancel_req
= NULL
;
645 hash_clean(m
->cpu_record
, cpu_record_hash_free
);
646 hash_free(m
->cpu_record
);
647 m
->cpu_record
= NULL
;
649 XFREE(MTYPE_THREAD_MASTER
, m
->name
);
650 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.pfds
);
651 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.copy
);
652 XFREE(MTYPE_THREAD_MASTER
, m
);
655 /* Return remain time in miliseconds. */
656 unsigned long thread_timer_remain_msec(struct thread
*thread
)
660 pthread_mutex_lock(&thread
->mtx
);
662 remain
= monotime_until(&thread
->u
.sands
, NULL
) / 1000LL;
664 pthread_mutex_unlock(&thread
->mtx
);
666 return remain
< 0 ? 0 : remain
;
669 /* Return remain time in seconds. */
670 unsigned long thread_timer_remain_second(struct thread
*thread
)
672 return thread_timer_remain_msec(thread
) / 1000LL;
675 #define debugargdef const char *funcname, const char *schedfrom, int fromln
676 #define debugargpass funcname, schedfrom, fromln
678 struct timeval
thread_timer_remain(struct thread
*thread
)
680 struct timeval remain
;
681 pthread_mutex_lock(&thread
->mtx
);
683 monotime_until(&thread
->u
.sands
, &remain
);
685 pthread_mutex_unlock(&thread
->mtx
);
689 /* Get new thread. */
690 static struct thread
*thread_get(struct thread_master
*m
, uint8_t type
,
691 int (*func
)(struct thread
*), void *arg
,
694 struct thread
*thread
= thread_trim_head(&m
->unuse
);
695 struct cpu_thread_history tmp
;
698 thread
= XCALLOC(MTYPE_THREAD
, sizeof(struct thread
));
699 /* mutex only needs to be initialized at struct creation. */
700 pthread_mutex_init(&thread
->mtx
, NULL
);
705 thread
->add_type
= type
;
709 thread
->yield
= THREAD_YIELD_TIME_SLOT
; /* default */
713 * So if the passed in funcname is not what we have
714 * stored that means the thread->hist needs to be
715 * updated. We keep the last one around in unused
716 * under the assumption that we are probably
717 * going to immediately allocate the same
719 * This hopefully saves us some serious
722 if (thread
->funcname
!= funcname
|| thread
->func
!= func
) {
724 tmp
.funcname
= funcname
;
726 hash_get(m
->cpu_record
, &tmp
,
727 (void *(*)(void *))cpu_record_hash_alloc
);
729 thread
->hist
->total_active
++;
731 thread
->funcname
= funcname
;
732 thread
->schedfrom
= schedfrom
;
733 thread
->schedfrom_line
= fromln
;
738 static void thread_free(struct thread_master
*master
, struct thread
*thread
)
740 /* Update statistics. */
741 assert(master
->alloc
> 0);
744 /* Free allocated resources. */
745 pthread_mutex_destroy(&thread
->mtx
);
746 XFREE(MTYPE_THREAD
, thread
);
749 static int fd_poll(struct thread_master
*m
, struct pollfd
*pfds
, nfds_t pfdsize
,
750 nfds_t count
, const struct timeval
*timer_wait
)
752 /* If timer_wait is null here, that means poll() should block
754 * unless the thread_master has overriden it by setting
755 * ->selectpoll_timeout.
756 * If the value is positive, it specifies the maximum number of
758 * to wait. If the timeout is -1, it specifies that we should never wait
760 * always return immediately even if no event is detected. If the value
762 * zero, the behavior is default. */
765 /* number of file descriptors with events */
768 if (timer_wait
!= NULL
769 && m
->selectpoll_timeout
== 0) // use the default value
770 timeout
= (timer_wait
->tv_sec
* 1000)
771 + (timer_wait
->tv_usec
/ 1000);
772 else if (m
->selectpoll_timeout
> 0) // use the user's timeout
773 timeout
= m
->selectpoll_timeout
;
774 else if (m
->selectpoll_timeout
775 < 0) // effect a poll (return immediately)
778 /* add poll pipe poker */
779 assert(count
+ 1 < pfdsize
);
780 pfds
[count
].fd
= m
->io_pipe
[0];
781 pfds
[count
].events
= POLLIN
;
782 pfds
[count
].revents
= 0x00;
784 num
= poll(pfds
, count
+ 1, timeout
);
786 unsigned char trash
[64];
787 if (num
> 0 && pfds
[count
].revents
!= 0 && num
--)
788 while (read(m
->io_pipe
[0], &trash
, sizeof(trash
)) > 0)
794 /* Add new read thread. */
795 struct thread
*funcname_thread_add_read_write(int dir
, struct thread_master
*m
,
796 int (*func
)(struct thread
*),
798 struct thread
**t_ptr
,
801 struct thread
*thread
= NULL
;
803 assert(fd
>= 0 && fd
< m
->fd_limit
);
804 pthread_mutex_lock(&m
->mtx
);
807 && *t_ptr
) // thread is already scheduled; don't reschedule
809 pthread_mutex_unlock(&m
->mtx
);
813 /* default to a new pollfd */
814 nfds_t queuepos
= m
->handler
.pfdcount
;
816 /* if we already have a pollfd for our file descriptor, find and
818 for (nfds_t i
= 0; i
< m
->handler
.pfdcount
; i
++)
819 if (m
->handler
.pfds
[i
].fd
== fd
) {
824 /* make sure we have room for this fd + pipe poker fd */
825 assert(queuepos
+ 1 < m
->handler
.pfdsize
);
827 thread
= thread_get(m
, dir
, func
, arg
, debugargpass
);
829 m
->handler
.pfds
[queuepos
].fd
= fd
;
830 m
->handler
.pfds
[queuepos
].events
|=
831 (dir
== THREAD_READ
? POLLIN
: POLLOUT
);
833 if (queuepos
== m
->handler
.pfdcount
)
834 m
->handler
.pfdcount
++;
837 pthread_mutex_lock(&thread
->mtx
);
840 if (dir
== THREAD_READ
)
841 m
->read
[thread
->u
.fd
] = thread
;
843 m
->write
[thread
->u
.fd
] = thread
;
845 pthread_mutex_unlock(&thread
->mtx
);
855 pthread_mutex_unlock(&m
->mtx
);
860 static struct thread
*
861 funcname_thread_add_timer_timeval(struct thread_master
*m
,
862 int (*func
)(struct thread
*), int type
,
863 void *arg
, struct timeval
*time_relative
,
864 struct thread
**t_ptr
, debugargdef
)
866 struct thread
*thread
;
867 struct pqueue
*queue
;
871 assert(type
== THREAD_TIMER
);
872 assert(time_relative
);
874 pthread_mutex_lock(&m
->mtx
);
877 && *t_ptr
) // thread is already scheduled; don't reschedule
879 pthread_mutex_unlock(&m
->mtx
);
884 thread
= thread_get(m
, type
, func
, arg
, debugargpass
);
886 pthread_mutex_lock(&thread
->mtx
);
888 monotime(&thread
->u
.sands
);
889 timeradd(&thread
->u
.sands
, time_relative
,
891 pqueue_enqueue(thread
, queue
);
897 pthread_mutex_unlock(&thread
->mtx
);
901 pthread_mutex_unlock(&m
->mtx
);
907 /* Add timer event thread. */
908 struct thread
*funcname_thread_add_timer(struct thread_master
*m
,
909 int (*func
)(struct thread
*),
910 void *arg
, long timer
,
911 struct thread
**t_ptr
, debugargdef
)
920 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
921 &trel
, t_ptr
, debugargpass
);
924 /* Add timer event thread with "millisecond" resolution */
925 struct thread
*funcname_thread_add_timer_msec(struct thread_master
*m
,
926 int (*func
)(struct thread
*),
927 void *arg
, long timer
,
928 struct thread
**t_ptr
,
935 trel
.tv_sec
= timer
/ 1000;
936 trel
.tv_usec
= 1000 * (timer
% 1000);
938 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
939 &trel
, t_ptr
, debugargpass
);
942 /* Add timer event thread with "millisecond" resolution */
943 struct thread
*funcname_thread_add_timer_tv(struct thread_master
*m
,
944 int (*func
)(struct thread
*),
945 void *arg
, struct timeval
*tv
,
946 struct thread
**t_ptr
, debugargdef
)
948 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
, tv
,
949 t_ptr
, debugargpass
);
952 /* Add simple event thread. */
953 struct thread
*funcname_thread_add_event(struct thread_master
*m
,
954 int (*func
)(struct thread
*),
956 struct thread
**t_ptr
, debugargdef
)
958 struct thread
*thread
;
962 pthread_mutex_lock(&m
->mtx
);
965 && *t_ptr
) // thread is already scheduled; don't reschedule
967 pthread_mutex_unlock(&m
->mtx
);
971 thread
= thread_get(m
, THREAD_EVENT
, func
, arg
, debugargpass
);
972 pthread_mutex_lock(&thread
->mtx
);
975 thread_list_add(&m
->event
, thread
);
977 pthread_mutex_unlock(&thread
->mtx
);
986 pthread_mutex_unlock(&m
->mtx
);
991 /* Thread cancellation ------------------------------------------------------ */
994 * NOT's out the .events field of pollfd corresponding to the given file
995 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
997 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
998 * implementation for details.
1002 * @param state the event to cancel. One or more (OR'd together) of the
1007 static void thread_cancel_rw(struct thread_master
*master
, int fd
, short state
)
1011 /* Cancel POLLHUP too just in case some bozo set it */
1014 /* find the index of corresponding pollfd */
1017 for (i
= 0; i
< master
->handler
.pfdcount
; i
++)
1018 if (master
->handler
.pfds
[i
].fd
== fd
) {
1025 "[!] Received cancellation request for nonexistent rw job");
1026 zlog_debug("[!] threadmaster: %s | fd: %d",
1027 master
->name
? master
->name
: "", fd
);
1031 /* NOT out event. */
1032 master
->handler
.pfds
[i
].events
&= ~(state
);
1034 /* If all events are canceled, delete / resize the pollfd array. */
1035 if (master
->handler
.pfds
[i
].events
== 0) {
1036 memmove(master
->handler
.pfds
+ i
, master
->handler
.pfds
+ i
+ 1,
1037 (master
->handler
.pfdcount
- i
- 1)
1038 * sizeof(struct pollfd
));
1039 master
->handler
.pfdcount
--;
1042 /* If we have the same pollfd in the copy, perform the same operations,
1043 * otherwise return. */
1044 if (i
>= master
->handler
.copycount
)
1047 master
->handler
.copy
[i
].events
&= ~(state
);
1049 if (master
->handler
.copy
[i
].events
== 0) {
1050 memmove(master
->handler
.copy
+ i
, master
->handler
.copy
+ i
+ 1,
1051 (master
->handler
.copycount
- i
- 1)
1052 * sizeof(struct pollfd
));
1053 master
->handler
.copycount
--;
1058 * Process cancellation requests.
1060 * This may only be run from the pthread which owns the thread_master.
1062 * @param master the thread master to process
1063 * @REQUIRE master->mtx
1065 static void do_thread_cancel(struct thread_master
*master
)
1067 struct thread_list
*list
= NULL
;
1068 struct pqueue
*queue
= NULL
;
1069 struct thread
**thread_array
= NULL
;
1070 struct thread
*thread
;
1072 struct cancel_req
*cr
;
1073 struct listnode
*ln
;
1074 for (ALL_LIST_ELEMENTS_RO(master
->cancel_req
, ln
, cr
)) {
1075 /* If this is an event object cancellation, linear search
1077 * list deleting any events which have the specified argument.
1079 * need to check every thread in the ready queue. */
1082 thread
= master
->event
.head
;
1088 if (t
->arg
== cr
->eventobj
) {
1089 thread_list_delete(&master
->event
, t
);
1092 thread_add_unuse(master
, t
);
1096 thread
= master
->ready
.head
;
1101 if (t
->arg
== cr
->eventobj
) {
1102 thread_list_delete(&master
->ready
, t
);
1105 thread_add_unuse(master
, t
);
1111 /* The pointer varies depending on whether the cancellation
1113 * made asynchronously or not. If it was, we need to check
1115 * thread even exists anymore before cancelling it. */
1116 thread
= (cr
->thread
) ? cr
->thread
: *cr
->threadref
;
1121 /* Determine the appropriate queue to cancel the thread from */
1122 switch (thread
->type
) {
1124 thread_cancel_rw(master
, thread
->u
.fd
, POLLIN
);
1125 thread_array
= master
->read
;
1128 thread_cancel_rw(master
, thread
->u
.fd
, POLLOUT
);
1129 thread_array
= master
->write
;
1132 queue
= master
->timer
;
1135 list
= &master
->event
;
1138 list
= &master
->ready
;
1146 assert(thread
->index
>= 0);
1147 assert(thread
== queue
->array
[thread
->index
]);
1148 pqueue_remove_at(thread
->index
, queue
);
1150 thread_list_delete(list
, thread
);
1151 } else if (thread_array
) {
1152 thread_array
[thread
->u
.fd
] = NULL
;
1154 assert(!"Thread should be either in queue or list or array!");
1158 *thread
->ref
= NULL
;
1160 thread_add_unuse(thread
->master
, thread
);
1163 /* Delete and free all cancellation requests */
1164 list_delete_all_node(master
->cancel_req
);
1166 /* Wake up any threads which may be blocked in thread_cancel_async() */
1167 master
->canceled
= true;
1168 pthread_cond_broadcast(&master
->cancel_cond
);
1172 * Cancel any events which have the specified argument.
1176 * @param m the thread_master to cancel from
1177 * @param arg the argument passed when creating the event
1179 void thread_cancel_event(struct thread_master
*master
, void *arg
)
1181 assert(master
->owner
== pthread_self());
1183 pthread_mutex_lock(&master
->mtx
);
1185 struct cancel_req
*cr
=
1186 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1188 listnode_add(master
->cancel_req
, cr
);
1189 do_thread_cancel(master
);
1191 pthread_mutex_unlock(&master
->mtx
);
1195 * Cancel a specific task.
1199 * @param thread task to cancel
1201 void thread_cancel(struct thread
*thread
)
1203 struct thread_master
*master
= thread
->master
;
1205 assert(master
->owner
== pthread_self());
1207 pthread_mutex_lock(&master
->mtx
);
1209 struct cancel_req
*cr
=
1210 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1211 cr
->thread
= thread
;
1212 listnode_add(master
->cancel_req
, cr
);
1213 do_thread_cancel(master
);
1215 pthread_mutex_unlock(&master
->mtx
);
1219 * Asynchronous cancellation.
1221 * Called with either a struct thread ** or void * to an event argument,
1222 * this function posts the correct cancellation request and blocks until it is
1225 * If the thread is currently running, execution blocks until it completes.
1227 * The last two parameters are mutually exclusive, i.e. if you pass one the
1228 * other must be NULL.
1230 * When the cancellation procedure executes on the target thread_master, the
1231 * thread * provided is checked for nullity. If it is null, the thread is
1232 * assumed to no longer exist and the cancellation request is a no-op. Thus
1233 * users of this API must pass a back-reference when scheduling the original
1238 * @param master the thread master with the relevant event / task
1239 * @param thread pointer to thread to cancel
1240 * @param eventobj the event
1242 void thread_cancel_async(struct thread_master
*master
, struct thread
**thread
,
1245 assert(!(thread
&& eventobj
) && (thread
|| eventobj
));
1246 assert(master
->owner
!= pthread_self());
1248 pthread_mutex_lock(&master
->mtx
);
1250 master
->canceled
= false;
1253 struct cancel_req
*cr
=
1254 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1255 cr
->threadref
= thread
;
1256 listnode_add(master
->cancel_req
, cr
);
1257 } else if (eventobj
) {
1258 struct cancel_req
*cr
=
1259 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1260 cr
->eventobj
= eventobj
;
1261 listnode_add(master
->cancel_req
, cr
);
1265 while (!master
->canceled
)
1266 pthread_cond_wait(&master
->cancel_cond
, &master
->mtx
);
1268 pthread_mutex_unlock(&master
->mtx
);
1270 /* ------------------------------------------------------------------------- */
1272 static struct timeval
*thread_timer_wait(struct pqueue
*queue
,
1273 struct timeval
*timer_val
)
1276 struct thread
*next_timer
= queue
->array
[0];
1277 monotime_until(&next_timer
->u
.sands
, timer_val
);
1283 static struct thread
*thread_run(struct thread_master
*m
, struct thread
*thread
,
1284 struct thread
*fetch
)
1287 thread_add_unuse(m
, thread
);
1291 static int thread_process_io_helper(struct thread_master
*m
,
1292 struct thread
*thread
, short state
, int pos
)
1294 struct thread
**thread_array
;
1299 if (thread
->type
== THREAD_READ
)
1300 thread_array
= m
->read
;
1302 thread_array
= m
->write
;
1304 thread_array
[thread
->u
.fd
] = NULL
;
1305 thread_list_add(&m
->ready
, thread
);
1306 thread
->type
= THREAD_READY
;
1307 /* if another pthread scheduled this file descriptor for the event we're
1308 * responding to, no problem; we're getting to it now */
1309 thread
->master
->handler
.pfds
[pos
].events
&= ~(state
);
1314 * Process I/O events.
1316 * Walks through file descriptor array looking for those pollfds whose .revents
1317 * field has something interesting. Deletes any invalid file descriptors.
1319 * @param m the thread master
1320 * @param num the number of active file descriptors (return value of poll())
1322 static void thread_process_io(struct thread_master
*m
, unsigned int num
)
1324 unsigned int ready
= 0;
1325 struct pollfd
*pfds
= m
->handler
.copy
;
1327 for (nfds_t i
= 0; i
< m
->handler
.copycount
&& ready
< num
; ++i
) {
1328 /* no event for current fd? immediately continue */
1329 if (pfds
[i
].revents
== 0)
1334 /* Unless someone has called thread_cancel from another pthread,
1336 * thing that could have changed in m->handler.pfds while we
1338 * asleep is the .events field in a given pollfd. Barring
1340 * that value should be a superset of the values we have in our
1342 * there's no need to update it. Similarily, barring deletion,
1344 * should still be a valid index into the master's pfds. */
1345 if (pfds
[i
].revents
& (POLLIN
| POLLHUP
))
1346 thread_process_io_helper(m
, m
->read
[pfds
[i
].fd
], POLLIN
,
1348 if (pfds
[i
].revents
& POLLOUT
)
1349 thread_process_io_helper(m
, m
->write
[pfds
[i
].fd
],
1352 /* if one of our file descriptors is garbage, remove the same
1354 * both pfds + update sizes and index */
1355 if (pfds
[i
].revents
& POLLNVAL
) {
1356 memmove(m
->handler
.pfds
+ i
, m
->handler
.pfds
+ i
+ 1,
1357 (m
->handler
.pfdcount
- i
- 1)
1358 * sizeof(struct pollfd
));
1359 m
->handler
.pfdcount
--;
1361 memmove(pfds
+ i
, pfds
+ i
+ 1,
1362 (m
->handler
.copycount
- i
- 1)
1363 * sizeof(struct pollfd
));
1364 m
->handler
.copycount
--;
1371 /* Add all timers that have popped to the ready list. */
1372 static unsigned int thread_process_timers(struct pqueue
*queue
,
1373 struct timeval
*timenow
)
1375 struct thread
*thread
;
1376 unsigned int ready
= 0;
1378 while (queue
->size
) {
1379 thread
= queue
->array
[0];
1380 if (timercmp(timenow
, &thread
->u
.sands
, <))
1382 pqueue_dequeue(queue
);
1383 thread
->type
= THREAD_READY
;
1384 thread_list_add(&thread
->master
->ready
, thread
);
1390 /* process a list en masse, e.g. for event thread lists */
1391 static unsigned int thread_process(struct thread_list
*list
)
1393 struct thread
*thread
;
1394 struct thread
*next
;
1395 unsigned int ready
= 0;
1397 for (thread
= list
->head
; thread
; thread
= next
) {
1398 next
= thread
->next
;
1399 thread_list_delete(list
, thread
);
1400 thread
->type
= THREAD_READY
;
1401 thread_list_add(&thread
->master
->ready
, thread
);
1408 /* Fetch next ready thread. */
1409 struct thread
*thread_fetch(struct thread_master
*m
, struct thread
*fetch
)
1411 struct thread
*thread
= NULL
;
1413 struct timeval zerotime
= {0, 0};
1415 struct timeval
*tw
= NULL
;
1420 /* Handle signals if any */
1421 if (m
->handle_signals
)
1422 quagga_sigevent_process();
1424 pthread_mutex_lock(&m
->mtx
);
1426 /* Process any pending cancellation requests */
1427 do_thread_cancel(m
);
1430 * Attempt to flush ready queue before going into poll().
1431 * This is performance-critical. Think twice before modifying.
1433 if ((thread
= thread_trim_head(&m
->ready
))) {
1434 fetch
= thread_run(m
, thread
, fetch
);
1437 pthread_mutex_unlock(&m
->mtx
);
1441 /* otherwise, tick through scheduling sequence */
1444 * Post events to ready queue. This must come before the
1445 * following block since events should occur immediately
1447 thread_process(&m
->event
);
1450 * If there are no tasks on the ready queue, we will poll()
1451 * until a timer expires or we receive I/O, whichever comes
1452 * first. The strategy for doing this is:
1454 * - If there are events pending, set the poll() timeout to zero
1455 * - If there are no events pending, but there are timers
1457 * timeout to the smallest remaining time on any timer
1458 * - If there are neither timers nor events pending, but there
1460 * descriptors pending, block indefinitely in poll()
1461 * - If nothing is pending, it's time for the application to die
1463 * In every case except the last, we need to hit poll() at least
1464 * once per loop to avoid starvation by events
1466 if (m
->ready
.count
== 0)
1467 tw
= thread_timer_wait(m
->timer
, &tv
);
1469 if (m
->ready
.count
!= 0 || (tw
&& !timercmp(tw
, &zerotime
, >)))
1472 if (!tw
&& m
->handler
.pfdcount
== 0) { /* die */
1473 pthread_mutex_unlock(&m
->mtx
);
1479 * Copy pollfd array + # active pollfds in it. Not necessary to
1480 * copy the array size as this is fixed.
1482 m
->handler
.copycount
= m
->handler
.pfdcount
;
1483 memcpy(m
->handler
.copy
, m
->handler
.pfds
,
1484 m
->handler
.copycount
* sizeof(struct pollfd
));
1486 pthread_mutex_unlock(&m
->mtx
);
1488 num
= fd_poll(m
, m
->handler
.copy
, m
->handler
.pfdsize
,
1489 m
->handler
.copycount
, tw
);
1491 pthread_mutex_lock(&m
->mtx
);
1493 /* Handle any errors received in poll() */
1495 if (errno
== EINTR
) {
1496 pthread_mutex_unlock(&m
->mtx
);
1497 /* loop around to signal handler */
1502 flog_err(EC_LIB_SYSTEM_CALL
, "poll() error: %s",
1503 safe_strerror(errno
));
1504 pthread_mutex_unlock(&m
->mtx
);
1509 /* Post timers to ready queue. */
1511 thread_process_timers(m
->timer
, &now
);
1513 /* Post I/O to ready queue. */
1515 thread_process_io(m
, num
);
1517 pthread_mutex_unlock(&m
->mtx
);
1519 } while (!thread
&& m
->spin
);
1524 static unsigned long timeval_elapsed(struct timeval a
, struct timeval b
)
1526 return (((a
.tv_sec
- b
.tv_sec
) * TIMER_SECOND_MICRO
)
1527 + (a
.tv_usec
- b
.tv_usec
));
1530 unsigned long thread_consumed_time(RUSAGE_T
*now
, RUSAGE_T
*start
,
1531 unsigned long *cputime
)
1533 /* This is 'user + sys' time. */
1534 *cputime
= timeval_elapsed(now
->cpu
.ru_utime
, start
->cpu
.ru_utime
)
1535 + timeval_elapsed(now
->cpu
.ru_stime
, start
->cpu
.ru_stime
);
1536 return timeval_elapsed(now
->real
, start
->real
);
1539 /* We should aim to yield after yield milliseconds, which defaults
1540 to THREAD_YIELD_TIME_SLOT .
1541 Note: we are using real (wall clock) time for this calculation.
1542 It could be argued that CPU time may make more sense in certain
1543 contexts. The things to consider are whether the thread may have
1544 blocked (in which case wall time increases, but CPU time does not),
1545 or whether the system is heavily loaded with other processes competing
1546 for CPU time. On balance, wall clock time seems to make sense.
1547 Plus it has the added benefit that gettimeofday should be faster
1548 than calling getrusage. */
1549 int thread_should_yield(struct thread
*thread
)
1552 pthread_mutex_lock(&thread
->mtx
);
1554 result
= monotime_since(&thread
->real
, NULL
)
1555 > (int64_t)thread
->yield
;
1557 pthread_mutex_unlock(&thread
->mtx
);
1561 void thread_set_yield_time(struct thread
*thread
, unsigned long yield_time
)
1563 pthread_mutex_lock(&thread
->mtx
);
1565 thread
->yield
= yield_time
;
1567 pthread_mutex_unlock(&thread
->mtx
);
1570 void thread_getrusage(RUSAGE_T
*r
)
1572 #if defined RUSAGE_THREAD
1573 #define FRR_RUSAGE RUSAGE_THREAD
1575 #define FRR_RUSAGE RUSAGE_SELF
1578 getrusage(FRR_RUSAGE
, &(r
->cpu
));
1584 * This function will atomically update the thread's usage history. At present
1585 * this is the only spot where usage history is written. Nevertheless the code
1586 * has been written such that the introduction of writers in the future should
1587 * not need to update it provided the writers atomically perform only the
1588 * operations done here, i.e. updating the total and maximum times. In
1589 * particular, the maximum real and cpu times must be monotonically increasing
1590 * or this code is not correct.
1592 void thread_call(struct thread
*thread
)
1594 _Atomic
unsigned long realtime
, cputime
;
1596 unsigned long helper
;
1597 RUSAGE_T before
, after
;
1600 thread
->real
= before
.real
;
1602 pthread_setspecific(thread_current
, thread
);
1603 (*thread
->func
)(thread
);
1604 pthread_setspecific(thread_current
, NULL
);
1608 realtime
= thread_consumed_time(&after
, &before
, &helper
);
1611 /* update realtime */
1612 atomic_fetch_add_explicit(&thread
->hist
->real
.total
, realtime
,
1613 memory_order_seq_cst
);
1614 exp
= atomic_load_explicit(&thread
->hist
->real
.max
,
1615 memory_order_seq_cst
);
1616 while (exp
< realtime
1617 && !atomic_compare_exchange_weak_explicit(
1618 &thread
->hist
->real
.max
, &exp
, realtime
,
1619 memory_order_seq_cst
, memory_order_seq_cst
))
1622 /* update cputime */
1623 atomic_fetch_add_explicit(&thread
->hist
->cpu
.total
, cputime
,
1624 memory_order_seq_cst
);
1625 exp
= atomic_load_explicit(&thread
->hist
->cpu
.max
,
1626 memory_order_seq_cst
);
1627 while (exp
< cputime
1628 && !atomic_compare_exchange_weak_explicit(
1629 &thread
->hist
->cpu
.max
, &exp
, cputime
,
1630 memory_order_seq_cst
, memory_order_seq_cst
))
1633 atomic_fetch_add_explicit(&thread
->hist
->total_calls
, 1,
1634 memory_order_seq_cst
);
1635 atomic_fetch_or_explicit(&thread
->hist
->types
, 1 << thread
->add_type
,
1636 memory_order_seq_cst
);
1638 #ifdef CONSUMED_TIME_CHECK
1639 if (realtime
> CONSUMED_TIME_CHECK
) {
1641 * We have a CPU Hog on our hands.
1642 * Whinge about it now, so we're aware this is yet another task
1647 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1648 thread
->funcname
, (unsigned long)thread
->func
,
1649 realtime
/ 1000, cputime
/ 1000);
1651 #endif /* CONSUMED_TIME_CHECK */
1654 /* Execute thread */
1655 void funcname_thread_execute(struct thread_master
*m
,
1656 int (*func
)(struct thread
*), void *arg
, int val
,
1659 struct thread
*thread
;
1661 /* Get or allocate new thread to execute. */
1662 pthread_mutex_lock(&m
->mtx
);
1664 thread
= thread_get(m
, THREAD_EVENT
, func
, arg
, debugargpass
);
1666 /* Set its event value. */
1667 pthread_mutex_lock(&thread
->mtx
);
1669 thread
->add_type
= THREAD_EXECUTE
;
1670 thread
->u
.val
= val
;
1671 thread
->ref
= &thread
;
1673 pthread_mutex_unlock(&thread
->mtx
);
1675 pthread_mutex_unlock(&m
->mtx
);
1677 /* Execute thread doing all accounting. */
1678 thread_call(thread
);
1680 /* Give back or free thread. */
1681 thread_add_unuse(m
, thread
);