1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
4 * This file is part of GNU Zebra.
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 #include <sys/resource.h>
35 #include "frratomic.h"
36 #include "lib_errors.h"
38 DEFINE_MTYPE_STATIC(LIB
, THREAD
, "Thread")
39 DEFINE_MTYPE_STATIC(LIB
, THREAD_MASTER
, "Thread master")
40 DEFINE_MTYPE_STATIC(LIB
, THREAD_POLL
, "Thread Poll Info")
41 DEFINE_MTYPE_STATIC(LIB
, THREAD_STATS
, "Thread stats")
43 #if defined(__APPLE__)
44 #include <mach/mach.h>
45 #include <mach/mach_time.h>
50 static unsigned char wakebyte = 0x01; \
51 write(m->io_pipe[1], &wakebyte, 1); \
54 /* control variable for initializer */
55 pthread_once_t init_once
= PTHREAD_ONCE_INIT
;
56 pthread_key_t thread_current
;
58 pthread_mutex_t masters_mtx
= PTHREAD_MUTEX_INITIALIZER
;
59 static struct list
*masters
;
61 static void thread_free(struct thread_master
*master
, struct thread
*thread
);
63 /* CLI start ---------------------------------------------------------------- */
64 static unsigned int cpu_record_hash_key(struct cpu_thread_history
*a
)
66 int size
= sizeof(a
->func
);
68 return jhash(&a
->func
, size
, 0);
71 static bool cpu_record_hash_cmp(const struct cpu_thread_history
*a
,
72 const struct cpu_thread_history
*b
)
74 return a
->func
== b
->func
;
77 static void *cpu_record_hash_alloc(struct cpu_thread_history
*a
)
79 struct cpu_thread_history
*new;
80 new = XCALLOC(MTYPE_THREAD_STATS
, sizeof(struct cpu_thread_history
));
82 new->funcname
= a
->funcname
;
86 static void cpu_record_hash_free(void *a
)
88 struct cpu_thread_history
*hist
= a
;
90 XFREE(MTYPE_THREAD_STATS
, hist
);
93 static void vty_out_cpu_thread_history(struct vty
*vty
,
94 struct cpu_thread_history
*a
)
96 vty_out(vty
, "%5d %10lu.%03lu %9u %8lu %9lu %8lu %9lu", a
->total_active
,
97 a
->cpu
.total
/ 1000, a
->cpu
.total
% 1000, a
->total_calls
,
98 a
->cpu
.total
/ a
->total_calls
, a
->cpu
.max
,
99 a
->real
.total
/ a
->total_calls
, a
->real
.max
);
100 vty_out(vty
, " %c%c%c%c%c %s\n",
101 a
->types
& (1 << THREAD_READ
) ? 'R' : ' ',
102 a
->types
& (1 << THREAD_WRITE
) ? 'W' : ' ',
103 a
->types
& (1 << THREAD_TIMER
) ? 'T' : ' ',
104 a
->types
& (1 << THREAD_EVENT
) ? 'E' : ' ',
105 a
->types
& (1 << THREAD_EXECUTE
) ? 'X' : ' ', a
->funcname
);
108 static void cpu_record_hash_print(struct hash_backet
*bucket
, void *args
[])
110 struct cpu_thread_history
*totals
= args
[0];
111 struct cpu_thread_history copy
;
112 struct vty
*vty
= args
[1];
113 uint8_t *filter
= args
[2];
115 struct cpu_thread_history
*a
= bucket
->data
;
118 atomic_load_explicit(&a
->total_active
, memory_order_seq_cst
);
120 atomic_load_explicit(&a
->total_calls
, memory_order_seq_cst
);
122 atomic_load_explicit(&a
->cpu
.total
, memory_order_seq_cst
);
123 copy
.cpu
.max
= atomic_load_explicit(&a
->cpu
.max
, memory_order_seq_cst
);
125 atomic_load_explicit(&a
->real
.total
, memory_order_seq_cst
);
127 atomic_load_explicit(&a
->real
.max
, memory_order_seq_cst
);
128 copy
.types
= atomic_load_explicit(&a
->types
, memory_order_seq_cst
);
129 copy
.funcname
= a
->funcname
;
131 if (!(copy
.types
& *filter
))
134 vty_out_cpu_thread_history(vty
, ©
);
135 totals
->total_active
+= copy
.total_active
;
136 totals
->total_calls
+= copy
.total_calls
;
137 totals
->real
.total
+= copy
.real
.total
;
138 if (totals
->real
.max
< copy
.real
.max
)
139 totals
->real
.max
= copy
.real
.max
;
140 totals
->cpu
.total
+= copy
.cpu
.total
;
141 if (totals
->cpu
.max
< copy
.cpu
.max
)
142 totals
->cpu
.max
= copy
.cpu
.max
;
145 static void cpu_record_print(struct vty
*vty
, uint8_t filter
)
147 struct cpu_thread_history tmp
;
148 void *args
[3] = {&tmp
, vty
, &filter
};
149 struct thread_master
*m
;
152 memset(&tmp
, 0, sizeof tmp
);
153 tmp
.funcname
= "TOTAL";
156 pthread_mutex_lock(&masters_mtx
);
158 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
159 const char *name
= m
->name
? m
->name
: "main";
161 char underline
[strlen(name
) + 1];
162 memset(underline
, '-', sizeof(underline
));
163 underline
[sizeof(underline
) - 1] = '\0';
166 vty_out(vty
, "Showing statistics for pthread %s\n",
168 vty_out(vty
, "-------------------------------%s\n",
170 vty_out(vty
, "%21s %18s %18s\n", "",
171 "CPU (user+system):", "Real (wall-clock):");
173 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
174 vty_out(vty
, " Avg uSec Max uSecs");
175 vty_out(vty
, " Type Thread\n");
177 if (m
->cpu_record
->count
)
180 (void (*)(struct hash_backet
*,
181 void *))cpu_record_hash_print
,
184 vty_out(vty
, "No data to display yet.\n");
189 pthread_mutex_unlock(&masters_mtx
);
192 vty_out(vty
, "Total thread statistics\n");
193 vty_out(vty
, "-------------------------\n");
194 vty_out(vty
, "%21s %18s %18s\n", "",
195 "CPU (user+system):", "Real (wall-clock):");
196 vty_out(vty
, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
197 vty_out(vty
, " Avg uSec Max uSecs");
198 vty_out(vty
, " Type Thread\n");
200 if (tmp
.total_calls
> 0)
201 vty_out_cpu_thread_history(vty
, &tmp
);
204 static void cpu_record_hash_clear(struct hash_backet
*bucket
, void *args
[])
206 uint8_t *filter
= args
[0];
207 struct hash
*cpu_record
= args
[1];
209 struct cpu_thread_history
*a
= bucket
->data
;
211 if (!(a
->types
& *filter
))
214 hash_release(cpu_record
, bucket
->data
);
217 static void cpu_record_clear(uint8_t filter
)
219 uint8_t *tmp
= &filter
;
220 struct thread_master
*m
;
223 pthread_mutex_lock(&masters_mtx
);
225 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
226 pthread_mutex_lock(&m
->mtx
);
228 void *args
[2] = {tmp
, m
->cpu_record
};
231 (void (*)(struct hash_backet
*,
232 void *))cpu_record_hash_clear
,
235 pthread_mutex_unlock(&m
->mtx
);
238 pthread_mutex_unlock(&masters_mtx
);
241 static uint8_t parse_filter(const char *filterstr
)
246 while (filterstr
[i
] != '\0') {
247 switch (filterstr
[i
]) {
250 filter
|= (1 << THREAD_READ
);
254 filter
|= (1 << THREAD_WRITE
);
258 filter
|= (1 << THREAD_TIMER
);
262 filter
|= (1 << THREAD_EVENT
);
266 filter
|= (1 << THREAD_EXECUTE
);
276 DEFUN (show_thread_cpu
,
278 "show thread cpu [FILTER]",
280 "Thread information\n"
282 "Display filter (rwtexb)\n")
284 uint8_t filter
= (uint8_t)-1U;
287 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
288 filter
= parse_filter(argv
[idx
]->arg
);
291 "Invalid filter \"%s\" specified; must contain at least"
298 cpu_record_print(vty
, filter
);
302 static void show_thread_poll_helper(struct vty
*vty
, struct thread_master
*m
)
304 const char *name
= m
->name
? m
->name
: "main";
305 char underline
[strlen(name
) + 1];
308 memset(underline
, '-', sizeof(underline
));
309 underline
[sizeof(underline
) - 1] = '\0';
311 vty_out(vty
, "\nShowing poll FD's for %s\n", name
);
312 vty_out(vty
, "----------------------%s\n", underline
);
313 vty_out(vty
, "Count: %u\n", (uint32_t)m
->handler
.pfdcount
);
314 for (i
= 0; i
< m
->handler
.pfdcount
; i
++)
315 vty_out(vty
, "\t%6d fd:%6d events:%2d revents:%2d\n", i
,
316 m
->handler
.pfds
[i
].fd
,
317 m
->handler
.pfds
[i
].events
,
318 m
->handler
.pfds
[i
].revents
);
321 DEFUN (show_thread_poll
,
322 show_thread_poll_cmd
,
325 "Thread information\n"
326 "Show poll FD's and information\n")
328 struct listnode
*node
;
329 struct thread_master
*m
;
331 pthread_mutex_lock(&masters_mtx
);
333 for (ALL_LIST_ELEMENTS_RO(masters
, node
, m
)) {
334 show_thread_poll_helper(vty
, m
);
337 pthread_mutex_unlock(&masters_mtx
);
343 DEFUN (clear_thread_cpu
,
344 clear_thread_cpu_cmd
,
345 "clear thread cpu [FILTER]",
346 "Clear stored data in all pthreads\n"
347 "Thread information\n"
349 "Display filter (rwtexb)\n")
351 uint8_t filter
= (uint8_t)-1U;
354 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
355 filter
= parse_filter(argv
[idx
]->arg
);
358 "Invalid filter \"%s\" specified; must contain at least"
365 cpu_record_clear(filter
);
369 void thread_cmd_init(void)
371 install_element(VIEW_NODE
, &show_thread_cpu_cmd
);
372 install_element(VIEW_NODE
, &show_thread_poll_cmd
);
373 install_element(ENABLE_NODE
, &clear_thread_cpu_cmd
);
375 /* CLI end ------------------------------------------------------------------ */
378 static int thread_timer_cmp(void *a
, void *b
)
380 struct thread
*thread_a
= a
;
381 struct thread
*thread_b
= b
;
383 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, <))
385 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, >))
390 static void thread_timer_update(void *node
, int actual_position
)
392 struct thread
*thread
= node
;
394 thread
->index
= actual_position
;
397 static void cancelreq_del(void *cr
)
399 XFREE(MTYPE_TMP
, cr
);
402 /* initializer, only ever called once */
403 static void initializer()
405 pthread_key_create(&thread_current
, NULL
);
408 struct thread_master
*thread_master_create(const char *name
)
410 struct thread_master
*rv
;
413 pthread_once(&init_once
, &initializer
);
415 rv
= XCALLOC(MTYPE_THREAD_MASTER
, sizeof(struct thread_master
));
419 /* Initialize master mutex */
420 pthread_mutex_init(&rv
->mtx
, NULL
);
421 pthread_cond_init(&rv
->cancel_cond
, NULL
);
424 rv
->name
= name
? XSTRDUP(MTYPE_THREAD_MASTER
, name
) : NULL
;
426 /* Initialize I/O task data structures */
427 getrlimit(RLIMIT_NOFILE
, &limit
);
428 rv
->fd_limit
= (int)limit
.rlim_cur
;
429 rv
->read
= XCALLOC(MTYPE_THREAD_POLL
,
430 sizeof(struct thread
*) * rv
->fd_limit
);
432 rv
->write
= XCALLOC(MTYPE_THREAD_POLL
,
433 sizeof(struct thread
*) * rv
->fd_limit
);
435 rv
->cpu_record
= hash_create_size(
436 8, (unsigned int (*)(void *))cpu_record_hash_key
,
437 (bool (*)(const void *, const void *))cpu_record_hash_cmp
,
441 /* Initialize the timer queues */
442 rv
->timer
= pqueue_create();
443 rv
->timer
->cmp
= thread_timer_cmp
;
444 rv
->timer
->update
= thread_timer_update
;
446 /* Initialize thread_fetch() settings */
448 rv
->handle_signals
= true;
450 /* Set pthread owner, should be updated by actual owner */
451 rv
->owner
= pthread_self();
452 rv
->cancel_req
= list_new();
453 rv
->cancel_req
->del
= cancelreq_del
;
456 /* Initialize pipe poker */
458 set_nonblocking(rv
->io_pipe
[0]);
459 set_nonblocking(rv
->io_pipe
[1]);
461 /* Initialize data structures for poll() */
462 rv
->handler
.pfdsize
= rv
->fd_limit
;
463 rv
->handler
.pfdcount
= 0;
464 rv
->handler
.pfds
= XCALLOC(MTYPE_THREAD_MASTER
,
465 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
466 rv
->handler
.copy
= XCALLOC(MTYPE_THREAD_MASTER
,
467 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
469 /* add to list of threadmasters */
470 pthread_mutex_lock(&masters_mtx
);
473 masters
= list_new();
475 listnode_add(masters
, rv
);
477 pthread_mutex_unlock(&masters_mtx
);
482 void thread_master_set_name(struct thread_master
*master
, const char *name
)
484 pthread_mutex_lock(&master
->mtx
);
487 XFREE(MTYPE_THREAD_MASTER
, master
->name
);
488 master
->name
= XSTRDUP(MTYPE_THREAD_MASTER
, name
);
490 pthread_mutex_unlock(&master
->mtx
);
493 /* Add a new thread to the list. */
494 static void thread_list_add(struct thread_list
*list
, struct thread
*thread
)
497 thread
->prev
= list
->tail
;
499 list
->tail
->next
= thread
;
506 /* Delete a thread from the list. */
507 static struct thread
*thread_list_delete(struct thread_list
*list
,
508 struct thread
*thread
)
511 thread
->next
->prev
= thread
->prev
;
513 list
->tail
= thread
->prev
;
515 thread
->prev
->next
= thread
->next
;
517 list
->head
= thread
->next
;
518 thread
->next
= thread
->prev
= NULL
;
523 /* Thread list is empty or not. */
524 static int thread_empty(struct thread_list
*list
)
526 return list
->head
? 0 : 1;
529 /* Delete top of the list and return it. */
530 static struct thread
*thread_trim_head(struct thread_list
*list
)
532 if (!thread_empty(list
))
533 return thread_list_delete(list
, list
->head
);
537 #define THREAD_UNUSED_DEPTH 10
539 /* Move thread to unuse list. */
540 static void thread_add_unuse(struct thread_master
*m
, struct thread
*thread
)
542 pthread_mutex_t mtxc
= thread
->mtx
;
544 assert(m
!= NULL
&& thread
!= NULL
);
545 assert(thread
->next
== NULL
);
546 assert(thread
->prev
== NULL
);
548 thread
->hist
->total_active
--;
549 memset(thread
, 0, sizeof(struct thread
));
550 thread
->type
= THREAD_UNUSED
;
552 /* Restore the thread mutex context. */
555 if (m
->unuse
.count
< THREAD_UNUSED_DEPTH
) {
556 thread_list_add(&m
->unuse
, thread
);
560 thread_free(m
, thread
);
563 /* Free all unused thread. */
564 static void thread_list_free(struct thread_master
*m
, struct thread_list
*list
)
569 for (t
= list
->head
; t
; t
= next
) {
576 static void thread_array_free(struct thread_master
*m
,
577 struct thread
**thread_array
)
582 for (index
= 0; index
< m
->fd_limit
; ++index
) {
583 t
= thread_array
[index
];
585 thread_array
[index
] = NULL
;
589 XFREE(MTYPE_THREAD_POLL
, thread_array
);
592 static void thread_queue_free(struct thread_master
*m
, struct pqueue
*queue
)
596 for (i
= 0; i
< queue
->size
; i
++)
597 thread_free(m
, queue
->array
[i
]);
599 pqueue_delete(queue
);
603 * thread_master_free_unused
605 * As threads are finished with they are put on the
606 * unuse list for later reuse.
607 * If we are shutting down, Free up unused threads
608 * So we can see if we forget to shut anything off
610 void thread_master_free_unused(struct thread_master
*m
)
612 pthread_mutex_lock(&m
->mtx
);
615 while ((t
= thread_trim_head(&m
->unuse
)) != NULL
) {
619 pthread_mutex_unlock(&m
->mtx
);
622 /* Stop thread scheduler. */
623 void thread_master_free(struct thread_master
*m
)
625 pthread_mutex_lock(&masters_mtx
);
627 listnode_delete(masters
, m
);
628 if (masters
->count
== 0) {
629 list_delete(&masters
);
632 pthread_mutex_unlock(&masters_mtx
);
634 thread_array_free(m
, m
->read
);
635 thread_array_free(m
, m
->write
);
636 thread_queue_free(m
, m
->timer
);
637 thread_list_free(m
, &m
->event
);
638 thread_list_free(m
, &m
->ready
);
639 thread_list_free(m
, &m
->unuse
);
640 pthread_mutex_destroy(&m
->mtx
);
641 pthread_cond_destroy(&m
->cancel_cond
);
642 close(m
->io_pipe
[0]);
643 close(m
->io_pipe
[1]);
644 list_delete(&m
->cancel_req
);
645 m
->cancel_req
= NULL
;
647 hash_clean(m
->cpu_record
, cpu_record_hash_free
);
648 hash_free(m
->cpu_record
);
649 m
->cpu_record
= NULL
;
652 XFREE(MTYPE_THREAD_MASTER
, m
->name
);
653 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.pfds
);
654 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.copy
);
655 XFREE(MTYPE_THREAD_MASTER
, m
);
658 /* Return remain time in miliseconds. */
659 unsigned long thread_timer_remain_msec(struct thread
*thread
)
663 pthread_mutex_lock(&thread
->mtx
);
665 remain
= monotime_until(&thread
->u
.sands
, NULL
) / 1000LL;
667 pthread_mutex_unlock(&thread
->mtx
);
669 return remain
< 0 ? 0 : remain
;
672 /* Return remain time in seconds. */
673 unsigned long thread_timer_remain_second(struct thread
*thread
)
675 return thread_timer_remain_msec(thread
) / 1000LL;
678 #define debugargdef const char *funcname, const char *schedfrom, int fromln
679 #define debugargpass funcname, schedfrom, fromln
681 struct timeval
thread_timer_remain(struct thread
*thread
)
683 struct timeval remain
;
684 pthread_mutex_lock(&thread
->mtx
);
686 monotime_until(&thread
->u
.sands
, &remain
);
688 pthread_mutex_unlock(&thread
->mtx
);
692 /* Get new thread. */
693 static struct thread
*thread_get(struct thread_master
*m
, uint8_t type
,
694 int (*func
)(struct thread
*), void *arg
,
697 struct thread
*thread
= thread_trim_head(&m
->unuse
);
698 struct cpu_thread_history tmp
;
701 thread
= XCALLOC(MTYPE_THREAD
, sizeof(struct thread
));
702 /* mutex only needs to be initialized at struct creation. */
703 pthread_mutex_init(&thread
->mtx
, NULL
);
708 thread
->add_type
= type
;
712 thread
->yield
= THREAD_YIELD_TIME_SLOT
; /* default */
716 * So if the passed in funcname is not what we have
717 * stored that means the thread->hist needs to be
718 * updated. We keep the last one around in unused
719 * under the assumption that we are probably
720 * going to immediately allocate the same
722 * This hopefully saves us some serious
725 if (thread
->funcname
!= funcname
|| thread
->func
!= func
) {
727 tmp
.funcname
= funcname
;
729 hash_get(m
->cpu_record
, &tmp
,
730 (void *(*)(void *))cpu_record_hash_alloc
);
732 thread
->hist
->total_active
++;
734 thread
->funcname
= funcname
;
735 thread
->schedfrom
= schedfrom
;
736 thread
->schedfrom_line
= fromln
;
741 static void thread_free(struct thread_master
*master
, struct thread
*thread
)
743 /* Update statistics. */
744 assert(master
->alloc
> 0);
747 /* Free allocated resources. */
748 pthread_mutex_destroy(&thread
->mtx
);
749 XFREE(MTYPE_THREAD
, thread
);
752 static int fd_poll(struct thread_master
*m
, struct pollfd
*pfds
, nfds_t pfdsize
,
753 nfds_t count
, const struct timeval
*timer_wait
)
755 /* If timer_wait is null here, that means poll() should block
757 * unless the thread_master has overriden it by setting
758 * ->selectpoll_timeout.
759 * If the value is positive, it specifies the maximum number of
761 * to wait. If the timeout is -1, it specifies that we should never wait
763 * always return immediately even if no event is detected. If the value
765 * zero, the behavior is default. */
768 /* number of file descriptors with events */
771 if (timer_wait
!= NULL
772 && m
->selectpoll_timeout
== 0) // use the default value
773 timeout
= (timer_wait
->tv_sec
* 1000)
774 + (timer_wait
->tv_usec
/ 1000);
775 else if (m
->selectpoll_timeout
> 0) // use the user's timeout
776 timeout
= m
->selectpoll_timeout
;
777 else if (m
->selectpoll_timeout
778 < 0) // effect a poll (return immediately)
781 /* add poll pipe poker */
782 assert(count
+ 1 < pfdsize
);
783 pfds
[count
].fd
= m
->io_pipe
[0];
784 pfds
[count
].events
= POLLIN
;
785 pfds
[count
].revents
= 0x00;
787 num
= poll(pfds
, count
+ 1, timeout
);
789 unsigned char trash
[64];
790 if (num
> 0 && pfds
[count
].revents
!= 0 && num
--)
791 while (read(m
->io_pipe
[0], &trash
, sizeof(trash
)) > 0)
797 /* Add new read thread. */
798 struct thread
*funcname_thread_add_read_write(int dir
, struct thread_master
*m
,
799 int (*func
)(struct thread
*),
801 struct thread
**t_ptr
,
804 struct thread
*thread
= NULL
;
806 assert(fd
>= 0 && fd
< m
->fd_limit
);
807 pthread_mutex_lock(&m
->mtx
);
810 && *t_ptr
) // thread is already scheduled; don't reschedule
812 pthread_mutex_unlock(&m
->mtx
);
816 /* default to a new pollfd */
817 nfds_t queuepos
= m
->handler
.pfdcount
;
819 /* if we already have a pollfd for our file descriptor, find and
821 for (nfds_t i
= 0; i
< m
->handler
.pfdcount
; i
++)
822 if (m
->handler
.pfds
[i
].fd
== fd
) {
827 /* make sure we have room for this fd + pipe poker fd */
828 assert(queuepos
+ 1 < m
->handler
.pfdsize
);
830 thread
= thread_get(m
, dir
, func
, arg
, debugargpass
);
832 m
->handler
.pfds
[queuepos
].fd
= fd
;
833 m
->handler
.pfds
[queuepos
].events
|=
834 (dir
== THREAD_READ
? POLLIN
: POLLOUT
);
836 if (queuepos
== m
->handler
.pfdcount
)
837 m
->handler
.pfdcount
++;
840 pthread_mutex_lock(&thread
->mtx
);
843 if (dir
== THREAD_READ
)
844 m
->read
[thread
->u
.fd
] = thread
;
846 m
->write
[thread
->u
.fd
] = thread
;
848 pthread_mutex_unlock(&thread
->mtx
);
858 pthread_mutex_unlock(&m
->mtx
);
863 static struct thread
*
864 funcname_thread_add_timer_timeval(struct thread_master
*m
,
865 int (*func
)(struct thread
*), int type
,
866 void *arg
, struct timeval
*time_relative
,
867 struct thread
**t_ptr
, debugargdef
)
869 struct thread
*thread
;
870 struct pqueue
*queue
;
874 assert(type
== THREAD_TIMER
);
875 assert(time_relative
);
877 pthread_mutex_lock(&m
->mtx
);
880 && *t_ptr
) // thread is already scheduled; don't reschedule
882 pthread_mutex_unlock(&m
->mtx
);
887 thread
= thread_get(m
, type
, func
, arg
, debugargpass
);
889 pthread_mutex_lock(&thread
->mtx
);
891 monotime(&thread
->u
.sands
);
892 timeradd(&thread
->u
.sands
, time_relative
,
894 pqueue_enqueue(thread
, queue
);
900 pthread_mutex_unlock(&thread
->mtx
);
904 pthread_mutex_unlock(&m
->mtx
);
910 /* Add timer event thread. */
911 struct thread
*funcname_thread_add_timer(struct thread_master
*m
,
912 int (*func
)(struct thread
*),
913 void *arg
, long timer
,
914 struct thread
**t_ptr
, debugargdef
)
923 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
924 &trel
, t_ptr
, debugargpass
);
927 /* Add timer event thread with "millisecond" resolution */
928 struct thread
*funcname_thread_add_timer_msec(struct thread_master
*m
,
929 int (*func
)(struct thread
*),
930 void *arg
, long timer
,
931 struct thread
**t_ptr
,
938 trel
.tv_sec
= timer
/ 1000;
939 trel
.tv_usec
= 1000 * (timer
% 1000);
941 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
942 &trel
, t_ptr
, debugargpass
);
945 /* Add timer event thread with "millisecond" resolution */
946 struct thread
*funcname_thread_add_timer_tv(struct thread_master
*m
,
947 int (*func
)(struct thread
*),
948 void *arg
, struct timeval
*tv
,
949 struct thread
**t_ptr
, debugargdef
)
951 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
, tv
,
952 t_ptr
, debugargpass
);
955 /* Add simple event thread. */
956 struct thread
*funcname_thread_add_event(struct thread_master
*m
,
957 int (*func
)(struct thread
*),
959 struct thread
**t_ptr
, debugargdef
)
961 struct thread
*thread
;
965 pthread_mutex_lock(&m
->mtx
);
968 && *t_ptr
) // thread is already scheduled; don't reschedule
970 pthread_mutex_unlock(&m
->mtx
);
974 thread
= thread_get(m
, THREAD_EVENT
, func
, arg
, debugargpass
);
975 pthread_mutex_lock(&thread
->mtx
);
978 thread_list_add(&m
->event
, thread
);
980 pthread_mutex_unlock(&thread
->mtx
);
989 pthread_mutex_unlock(&m
->mtx
);
994 /* Thread cancellation ------------------------------------------------------ */
997 * NOT's out the .events field of pollfd corresponding to the given file
998 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
1000 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
1001 * implementation for details.
1005 * @param state the event to cancel. One or more (OR'd together) of the
1010 static void thread_cancel_rw(struct thread_master
*master
, int fd
, short state
)
1014 /* Cancel POLLHUP too just in case some bozo set it */
1017 /* find the index of corresponding pollfd */
1020 for (i
= 0; i
< master
->handler
.pfdcount
; i
++)
1021 if (master
->handler
.pfds
[i
].fd
== fd
) {
1028 "[!] Received cancellation request for nonexistent rw job");
1029 zlog_debug("[!] threadmaster: %s | fd: %d",
1030 master
->name
? master
->name
: "", fd
);
1034 /* NOT out event. */
1035 master
->handler
.pfds
[i
].events
&= ~(state
);
1037 /* If all events are canceled, delete / resize the pollfd array. */
1038 if (master
->handler
.pfds
[i
].events
== 0) {
1039 memmove(master
->handler
.pfds
+ i
, master
->handler
.pfds
+ i
+ 1,
1040 (master
->handler
.pfdcount
- i
- 1)
1041 * sizeof(struct pollfd
));
1042 master
->handler
.pfdcount
--;
1045 /* If we have the same pollfd in the copy, perform the same operations,
1046 * otherwise return. */
1047 if (i
>= master
->handler
.copycount
)
1050 master
->handler
.copy
[i
].events
&= ~(state
);
1052 if (master
->handler
.copy
[i
].events
== 0) {
1053 memmove(master
->handler
.copy
+ i
, master
->handler
.copy
+ i
+ 1,
1054 (master
->handler
.copycount
- i
- 1)
1055 * sizeof(struct pollfd
));
1056 master
->handler
.copycount
--;
1061 * Process cancellation requests.
1063 * This may only be run from the pthread which owns the thread_master.
1065 * @param master the thread master to process
1066 * @REQUIRE master->mtx
1068 static void do_thread_cancel(struct thread_master
*master
)
1070 struct thread_list
*list
= NULL
;
1071 struct pqueue
*queue
= NULL
;
1072 struct thread
**thread_array
= NULL
;
1073 struct thread
*thread
;
1075 struct cancel_req
*cr
;
1076 struct listnode
*ln
;
1077 for (ALL_LIST_ELEMENTS_RO(master
->cancel_req
, ln
, cr
)) {
1078 /* If this is an event object cancellation, linear search
1080 * list deleting any events which have the specified argument.
1082 * need to check every thread in the ready queue. */
1085 thread
= master
->event
.head
;
1091 if (t
->arg
== cr
->eventobj
) {
1092 thread_list_delete(&master
->event
, t
);
1095 thread_add_unuse(master
, t
);
1099 thread
= master
->ready
.head
;
1104 if (t
->arg
== cr
->eventobj
) {
1105 thread_list_delete(&master
->ready
, t
);
1108 thread_add_unuse(master
, t
);
1114 /* The pointer varies depending on whether the cancellation
1116 * made asynchronously or not. If it was, we need to check
1118 * thread even exists anymore before cancelling it. */
1119 thread
= (cr
->thread
) ? cr
->thread
: *cr
->threadref
;
1124 /* Determine the appropriate queue to cancel the thread from */
1125 switch (thread
->type
) {
1127 thread_cancel_rw(master
, thread
->u
.fd
, POLLIN
);
1128 thread_array
= master
->read
;
1131 thread_cancel_rw(master
, thread
->u
.fd
, POLLOUT
);
1132 thread_array
= master
->write
;
1135 queue
= master
->timer
;
1138 list
= &master
->event
;
1141 list
= &master
->ready
;
1149 assert(thread
->index
>= 0);
1150 assert(thread
== queue
->array
[thread
->index
]);
1151 pqueue_remove_at(thread
->index
, queue
);
1153 thread_list_delete(list
, thread
);
1154 } else if (thread_array
) {
1155 thread_array
[thread
->u
.fd
] = NULL
;
1157 assert(!"Thread should be either in queue or list or array!");
1161 *thread
->ref
= NULL
;
1163 thread_add_unuse(thread
->master
, thread
);
1166 /* Delete and free all cancellation requests */
1167 list_delete_all_node(master
->cancel_req
);
1169 /* Wake up any threads which may be blocked in thread_cancel_async() */
1170 master
->canceled
= true;
1171 pthread_cond_broadcast(&master
->cancel_cond
);
1175 * Cancel any events which have the specified argument.
1179 * @param m the thread_master to cancel from
1180 * @param arg the argument passed when creating the event
1182 void thread_cancel_event(struct thread_master
*master
, void *arg
)
1184 assert(master
->owner
== pthread_self());
1186 pthread_mutex_lock(&master
->mtx
);
1188 struct cancel_req
*cr
=
1189 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1191 listnode_add(master
->cancel_req
, cr
);
1192 do_thread_cancel(master
);
1194 pthread_mutex_unlock(&master
->mtx
);
1198 * Cancel a specific task.
1202 * @param thread task to cancel
1204 void thread_cancel(struct thread
*thread
)
1206 struct thread_master
*master
= thread
->master
;
1208 assert(master
->owner
== pthread_self());
1210 pthread_mutex_lock(&master
->mtx
);
1212 struct cancel_req
*cr
=
1213 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1214 cr
->thread
= thread
;
1215 listnode_add(master
->cancel_req
, cr
);
1216 do_thread_cancel(master
);
1218 pthread_mutex_unlock(&master
->mtx
);
1222 * Asynchronous cancellation.
1224 * Called with either a struct thread ** or void * to an event argument,
1225 * this function posts the correct cancellation request and blocks until it is
1228 * If the thread is currently running, execution blocks until it completes.
1230 * The last two parameters are mutually exclusive, i.e. if you pass one the
1231 * other must be NULL.
1233 * When the cancellation procedure executes on the target thread_master, the
1234 * thread * provided is checked for nullity. If it is null, the thread is
1235 * assumed to no longer exist and the cancellation request is a no-op. Thus
1236 * users of this API must pass a back-reference when scheduling the original
1241 * @param master the thread master with the relevant event / task
1242 * @param thread pointer to thread to cancel
1243 * @param eventobj the event
1245 void thread_cancel_async(struct thread_master
*master
, struct thread
**thread
,
1248 assert(!(thread
&& eventobj
) && (thread
|| eventobj
));
1249 assert(master
->owner
!= pthread_self());
1251 pthread_mutex_lock(&master
->mtx
);
1253 master
->canceled
= false;
1256 struct cancel_req
*cr
=
1257 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1258 cr
->threadref
= thread
;
1259 listnode_add(master
->cancel_req
, cr
);
1260 } else if (eventobj
) {
1261 struct cancel_req
*cr
=
1262 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1263 cr
->eventobj
= eventobj
;
1264 listnode_add(master
->cancel_req
, cr
);
1268 while (!master
->canceled
)
1269 pthread_cond_wait(&master
->cancel_cond
, &master
->mtx
);
1271 pthread_mutex_unlock(&master
->mtx
);
1273 /* ------------------------------------------------------------------------- */
1275 static struct timeval
*thread_timer_wait(struct pqueue
*queue
,
1276 struct timeval
*timer_val
)
1279 struct thread
*next_timer
= queue
->array
[0];
1280 monotime_until(&next_timer
->u
.sands
, timer_val
);
1286 static struct thread
*thread_run(struct thread_master
*m
, struct thread
*thread
,
1287 struct thread
*fetch
)
1290 thread_add_unuse(m
, thread
);
1294 static int thread_process_io_helper(struct thread_master
*m
,
1295 struct thread
*thread
, short state
, int pos
)
1297 struct thread
**thread_array
;
1302 if (thread
->type
== THREAD_READ
)
1303 thread_array
= m
->read
;
1305 thread_array
= m
->write
;
1307 thread_array
[thread
->u
.fd
] = NULL
;
1308 thread_list_add(&m
->ready
, thread
);
1309 thread
->type
= THREAD_READY
;
1310 /* if another pthread scheduled this file descriptor for the event we're
1311 * responding to, no problem; we're getting to it now */
1312 thread
->master
->handler
.pfds
[pos
].events
&= ~(state
);
1317 * Process I/O events.
1319 * Walks through file descriptor array looking for those pollfds whose .revents
1320 * field has something interesting. Deletes any invalid file descriptors.
1322 * @param m the thread master
1323 * @param num the number of active file descriptors (return value of poll())
1325 static void thread_process_io(struct thread_master
*m
, unsigned int num
)
1327 unsigned int ready
= 0;
1328 struct pollfd
*pfds
= m
->handler
.copy
;
1330 for (nfds_t i
= 0; i
< m
->handler
.copycount
&& ready
< num
; ++i
) {
1331 /* no event for current fd? immediately continue */
1332 if (pfds
[i
].revents
== 0)
1337 /* Unless someone has called thread_cancel from another pthread,
1339 * thing that could have changed in m->handler.pfds while we
1341 * asleep is the .events field in a given pollfd. Barring
1343 * that value should be a superset of the values we have in our
1345 * there's no need to update it. Similarily, barring deletion,
1347 * should still be a valid index into the master's pfds. */
1348 if (pfds
[i
].revents
& (POLLIN
| POLLHUP
))
1349 thread_process_io_helper(m
, m
->read
[pfds
[i
].fd
], POLLIN
,
1351 if (pfds
[i
].revents
& POLLOUT
)
1352 thread_process_io_helper(m
, m
->write
[pfds
[i
].fd
],
1355 /* if one of our file descriptors is garbage, remove the same
1357 * both pfds + update sizes and index */
1358 if (pfds
[i
].revents
& POLLNVAL
) {
1359 memmove(m
->handler
.pfds
+ i
, m
->handler
.pfds
+ i
+ 1,
1360 (m
->handler
.pfdcount
- i
- 1)
1361 * sizeof(struct pollfd
));
1362 m
->handler
.pfdcount
--;
1364 memmove(pfds
+ i
, pfds
+ i
+ 1,
1365 (m
->handler
.copycount
- i
- 1)
1366 * sizeof(struct pollfd
));
1367 m
->handler
.copycount
--;
1374 /* Add all timers that have popped to the ready list. */
1375 static unsigned int thread_process_timers(struct pqueue
*queue
,
1376 struct timeval
*timenow
)
1378 struct thread
*thread
;
1379 unsigned int ready
= 0;
1381 while (queue
->size
) {
1382 thread
= queue
->array
[0];
1383 if (timercmp(timenow
, &thread
->u
.sands
, <))
1385 pqueue_dequeue(queue
);
1386 thread
->type
= THREAD_READY
;
1387 thread_list_add(&thread
->master
->ready
, thread
);
1393 /* process a list en masse, e.g. for event thread lists */
1394 static unsigned int thread_process(struct thread_list
*list
)
1396 struct thread
*thread
;
1397 struct thread
*next
;
1398 unsigned int ready
= 0;
1400 for (thread
= list
->head
; thread
; thread
= next
) {
1401 next
= thread
->next
;
1402 thread_list_delete(list
, thread
);
1403 thread
->type
= THREAD_READY
;
1404 thread_list_add(&thread
->master
->ready
, thread
);
1411 /* Fetch next ready thread. */
1412 struct thread
*thread_fetch(struct thread_master
*m
, struct thread
*fetch
)
1414 struct thread
*thread
= NULL
;
1416 struct timeval zerotime
= {0, 0};
1418 struct timeval
*tw
= NULL
;
1423 /* Handle signals if any */
1424 if (m
->handle_signals
)
1425 quagga_sigevent_process();
1427 pthread_mutex_lock(&m
->mtx
);
1429 /* Process any pending cancellation requests */
1430 do_thread_cancel(m
);
1433 * Attempt to flush ready queue before going into poll().
1434 * This is performance-critical. Think twice before modifying.
1436 if ((thread
= thread_trim_head(&m
->ready
))) {
1437 fetch
= thread_run(m
, thread
, fetch
);
1440 pthread_mutex_unlock(&m
->mtx
);
1444 /* otherwise, tick through scheduling sequence */
1447 * Post events to ready queue. This must come before the
1448 * following block since events should occur immediately
1450 thread_process(&m
->event
);
1453 * If there are no tasks on the ready queue, we will poll()
1454 * until a timer expires or we receive I/O, whichever comes
1455 * first. The strategy for doing this is:
1457 * - If there are events pending, set the poll() timeout to zero
1458 * - If there are no events pending, but there are timers
1460 * timeout to the smallest remaining time on any timer
1461 * - If there are neither timers nor events pending, but there
1463 * descriptors pending, block indefinitely in poll()
1464 * - If nothing is pending, it's time for the application to die
1466 * In every case except the last, we need to hit poll() at least
1467 * once per loop to avoid starvation by events
1469 if (m
->ready
.count
== 0)
1470 tw
= thread_timer_wait(m
->timer
, &tv
);
1472 if (m
->ready
.count
!= 0 || (tw
&& !timercmp(tw
, &zerotime
, >)))
1475 if (!tw
&& m
->handler
.pfdcount
== 0) { /* die */
1476 pthread_mutex_unlock(&m
->mtx
);
1482 * Copy pollfd array + # active pollfds in it. Not necessary to
1483 * copy the array size as this is fixed.
1485 m
->handler
.copycount
= m
->handler
.pfdcount
;
1486 memcpy(m
->handler
.copy
, m
->handler
.pfds
,
1487 m
->handler
.copycount
* sizeof(struct pollfd
));
1489 pthread_mutex_unlock(&m
->mtx
);
1491 num
= fd_poll(m
, m
->handler
.copy
, m
->handler
.pfdsize
,
1492 m
->handler
.copycount
, tw
);
1494 pthread_mutex_lock(&m
->mtx
);
1496 /* Handle any errors received in poll() */
1498 if (errno
== EINTR
) {
1499 pthread_mutex_unlock(&m
->mtx
);
1500 /* loop around to signal handler */
1505 flog_err(EC_LIB_SYSTEM_CALL
, "poll() error: %s",
1506 safe_strerror(errno
));
1507 pthread_mutex_unlock(&m
->mtx
);
1512 /* Post timers to ready queue. */
1514 thread_process_timers(m
->timer
, &now
);
1516 /* Post I/O to ready queue. */
1518 thread_process_io(m
, num
);
1520 pthread_mutex_unlock(&m
->mtx
);
1522 } while (!thread
&& m
->spin
);
1527 static unsigned long timeval_elapsed(struct timeval a
, struct timeval b
)
1529 return (((a
.tv_sec
- b
.tv_sec
) * TIMER_SECOND_MICRO
)
1530 + (a
.tv_usec
- b
.tv_usec
));
1533 unsigned long thread_consumed_time(RUSAGE_T
*now
, RUSAGE_T
*start
,
1534 unsigned long *cputime
)
1536 /* This is 'user + sys' time. */
1537 *cputime
= timeval_elapsed(now
->cpu
.ru_utime
, start
->cpu
.ru_utime
)
1538 + timeval_elapsed(now
->cpu
.ru_stime
, start
->cpu
.ru_stime
);
1539 return timeval_elapsed(now
->real
, start
->real
);
1542 /* We should aim to yield after yield milliseconds, which defaults
1543 to THREAD_YIELD_TIME_SLOT .
1544 Note: we are using real (wall clock) time for this calculation.
1545 It could be argued that CPU time may make more sense in certain
1546 contexts. The things to consider are whether the thread may have
1547 blocked (in which case wall time increases, but CPU time does not),
1548 or whether the system is heavily loaded with other processes competing
1549 for CPU time. On balance, wall clock time seems to make sense.
1550 Plus it has the added benefit that gettimeofday should be faster
1551 than calling getrusage. */
1552 int thread_should_yield(struct thread
*thread
)
1555 pthread_mutex_lock(&thread
->mtx
);
1557 result
= monotime_since(&thread
->real
, NULL
)
1558 > (int64_t)thread
->yield
;
1560 pthread_mutex_unlock(&thread
->mtx
);
1564 void thread_set_yield_time(struct thread
*thread
, unsigned long yield_time
)
1566 pthread_mutex_lock(&thread
->mtx
);
1568 thread
->yield
= yield_time
;
1570 pthread_mutex_unlock(&thread
->mtx
);
1573 void thread_getrusage(RUSAGE_T
*r
)
1575 #if defined RUSAGE_THREAD
1576 #define FRR_RUSAGE RUSAGE_THREAD
1578 #define FRR_RUSAGE RUSAGE_SELF
1581 getrusage(FRR_RUSAGE
, &(r
->cpu
));
1587 * This function will atomically update the thread's usage history. At present
1588 * this is the only spot where usage history is written. Nevertheless the code
1589 * has been written such that the introduction of writers in the future should
1590 * not need to update it provided the writers atomically perform only the
1591 * operations done here, i.e. updating the total and maximum times. In
1592 * particular, the maximum real and cpu times must be monotonically increasing
1593 * or this code is not correct.
1595 void thread_call(struct thread
*thread
)
1597 _Atomic
unsigned long realtime
, cputime
;
1599 unsigned long helper
;
1600 RUSAGE_T before
, after
;
1603 thread
->real
= before
.real
;
1605 pthread_setspecific(thread_current
, thread
);
1606 (*thread
->func
)(thread
);
1607 pthread_setspecific(thread_current
, NULL
);
1611 realtime
= thread_consumed_time(&after
, &before
, &helper
);
1614 /* update realtime */
1615 atomic_fetch_add_explicit(&thread
->hist
->real
.total
, realtime
,
1616 memory_order_seq_cst
);
1617 exp
= atomic_load_explicit(&thread
->hist
->real
.max
,
1618 memory_order_seq_cst
);
1619 while (exp
< realtime
1620 && !atomic_compare_exchange_weak_explicit(
1621 &thread
->hist
->real
.max
, &exp
, realtime
,
1622 memory_order_seq_cst
, memory_order_seq_cst
))
1625 /* update cputime */
1626 atomic_fetch_add_explicit(&thread
->hist
->cpu
.total
, cputime
,
1627 memory_order_seq_cst
);
1628 exp
= atomic_load_explicit(&thread
->hist
->cpu
.max
,
1629 memory_order_seq_cst
);
1630 while (exp
< cputime
1631 && !atomic_compare_exchange_weak_explicit(
1632 &thread
->hist
->cpu
.max
, &exp
, cputime
,
1633 memory_order_seq_cst
, memory_order_seq_cst
))
1636 atomic_fetch_add_explicit(&thread
->hist
->total_calls
, 1,
1637 memory_order_seq_cst
);
1638 atomic_fetch_or_explicit(&thread
->hist
->types
, 1 << thread
->add_type
,
1639 memory_order_seq_cst
);
1641 #ifdef CONSUMED_TIME_CHECK
1642 if (realtime
> CONSUMED_TIME_CHECK
) {
1644 * We have a CPU Hog on our hands.
1645 * Whinge about it now, so we're aware this is yet another task
1650 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1651 thread
->funcname
, (unsigned long)thread
->func
,
1652 realtime
/ 1000, cputime
/ 1000);
1654 #endif /* CONSUMED_TIME_CHECK */
1657 /* Execute thread */
1658 void funcname_thread_execute(struct thread_master
*m
,
1659 int (*func
)(struct thread
*), void *arg
, int val
,
1662 struct thread
*thread
;
1664 /* Get or allocate new thread to execute. */
1665 pthread_mutex_lock(&m
->mtx
);
1667 thread
= thread_get(m
, THREAD_EVENT
, func
, arg
, debugargpass
);
1669 /* Set its event value. */
1670 pthread_mutex_lock(&thread
->mtx
);
1672 thread
->add_type
= THREAD_EXECUTE
;
1673 thread
->u
.val
= val
;
1674 thread
->ref
= &thread
;
1676 pthread_mutex_unlock(&thread
->mtx
);
1678 pthread_mutex_unlock(&m
->mtx
);
1680 /* Execute thread doing all accounting. */
1681 thread_call(thread
);
1683 /* Give back or free thread. */
1684 thread_add_unuse(m
, thread
);