1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
4 * This file is part of GNU Zebra.
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 #include <sys/resource.h>
35 #include "frratomic.h"
36 #include "lib_errors.h"
38 DEFINE_MTYPE_STATIC(LIB
, THREAD
, "Thread")
39 DEFINE_MTYPE_STATIC(LIB
, THREAD_MASTER
, "Thread master")
40 DEFINE_MTYPE_STATIC(LIB
, THREAD_POLL
, "Thread Poll Info")
41 DEFINE_MTYPE_STATIC(LIB
, THREAD_STATS
, "Thread stats")
43 DECLARE_LIST(thread_list
, struct thread
, threaditem
)
45 static int thread_timer_cmp(const struct thread
*a
, const struct thread
*b
)
47 if (a
->u
.sands
.tv_sec
< b
->u
.sands
.tv_sec
)
49 if (a
->u
.sands
.tv_sec
> b
->u
.sands
.tv_sec
)
51 if (a
->u
.sands
.tv_usec
< b
->u
.sands
.tv_usec
)
53 if (a
->u
.sands
.tv_usec
> b
->u
.sands
.tv_usec
)
58 DECLARE_HEAP(thread_timer_list
, struct thread
, timeritem
,
61 #if defined(__APPLE__)
62 #include <mach/mach.h>
63 #include <mach/mach_time.h>
68 static unsigned char wakebyte = 0x01; \
69 write(m->io_pipe[1], &wakebyte, 1); \
72 /* control variable for initializer */
73 static pthread_once_t init_once
= PTHREAD_ONCE_INIT
;
74 pthread_key_t thread_current
;
76 static pthread_mutex_t masters_mtx
= PTHREAD_MUTEX_INITIALIZER
;
77 static struct list
*masters
;
79 static void thread_free(struct thread_master
*master
, struct thread
*thread
);
81 /* CLI start ---------------------------------------------------------------- */
82 static unsigned int cpu_record_hash_key(const struct cpu_thread_history
*a
)
84 int size
= sizeof(a
->func
);
86 return jhash(&a
->func
, size
, 0);
89 static bool cpu_record_hash_cmp(const struct cpu_thread_history
*a
,
90 const struct cpu_thread_history
*b
)
92 return a
->func
== b
->func
;
95 static void *cpu_record_hash_alloc(struct cpu_thread_history
*a
)
97 struct cpu_thread_history
*new;
98 new = XCALLOC(MTYPE_THREAD_STATS
, sizeof(struct cpu_thread_history
));
100 new->funcname
= a
->funcname
;
104 static void cpu_record_hash_free(void *a
)
106 struct cpu_thread_history
*hist
= a
;
108 XFREE(MTYPE_THREAD_STATS
, hist
);
111 static void vty_out_cpu_thread_history(struct vty
*vty
,
112 struct cpu_thread_history
*a
)
114 vty_out(vty
, "%5zu %10zu.%03zu %9zu %8zu %9zu %8zu %9zu",
115 (size_t)a
->total_active
,
116 a
->cpu
.total
/ 1000, a
->cpu
.total
% 1000,
117 (size_t)a
->total_calls
,
118 a
->cpu
.total
/ a
->total_calls
, a
->cpu
.max
,
119 a
->real
.total
/ a
->total_calls
, a
->real
.max
);
120 vty_out(vty
, " %c%c%c%c%c %s\n",
121 a
->types
& (1 << THREAD_READ
) ? 'R' : ' ',
122 a
->types
& (1 << THREAD_WRITE
) ? 'W' : ' ',
123 a
->types
& (1 << THREAD_TIMER
) ? 'T' : ' ',
124 a
->types
& (1 << THREAD_EVENT
) ? 'E' : ' ',
125 a
->types
& (1 << THREAD_EXECUTE
) ? 'X' : ' ', a
->funcname
);
128 static void cpu_record_hash_print(struct hash_bucket
*bucket
, void *args
[])
130 struct cpu_thread_history
*totals
= args
[0];
131 struct cpu_thread_history copy
;
132 struct vty
*vty
= args
[1];
133 uint8_t *filter
= args
[2];
135 struct cpu_thread_history
*a
= bucket
->data
;
138 atomic_load_explicit(&a
->total_active
, memory_order_seq_cst
);
140 atomic_load_explicit(&a
->total_calls
, memory_order_seq_cst
);
142 atomic_load_explicit(&a
->cpu
.total
, memory_order_seq_cst
);
143 copy
.cpu
.max
= atomic_load_explicit(&a
->cpu
.max
, memory_order_seq_cst
);
145 atomic_load_explicit(&a
->real
.total
, memory_order_seq_cst
);
147 atomic_load_explicit(&a
->real
.max
, memory_order_seq_cst
);
148 copy
.types
= atomic_load_explicit(&a
->types
, memory_order_seq_cst
);
149 copy
.funcname
= a
->funcname
;
151 if (!(copy
.types
& *filter
))
154 vty_out_cpu_thread_history(vty
, ©
);
155 totals
->total_active
+= copy
.total_active
;
156 totals
->total_calls
+= copy
.total_calls
;
157 totals
->real
.total
+= copy
.real
.total
;
158 if (totals
->real
.max
< copy
.real
.max
)
159 totals
->real
.max
= copy
.real
.max
;
160 totals
->cpu
.total
+= copy
.cpu
.total
;
161 if (totals
->cpu
.max
< copy
.cpu
.max
)
162 totals
->cpu
.max
= copy
.cpu
.max
;
165 static void cpu_record_print(struct vty
*vty
, uint8_t filter
)
167 struct cpu_thread_history tmp
;
168 void *args
[3] = {&tmp
, vty
, &filter
};
169 struct thread_master
*m
;
172 memset(&tmp
, 0, sizeof tmp
);
173 tmp
.funcname
= "TOTAL";
176 pthread_mutex_lock(&masters_mtx
);
178 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
179 const char *name
= m
->name
? m
->name
: "main";
181 char underline
[strlen(name
) + 1];
182 memset(underline
, '-', sizeof(underline
));
183 underline
[sizeof(underline
) - 1] = '\0';
186 vty_out(vty
, "Showing statistics for pthread %s\n",
188 vty_out(vty
, "-------------------------------%s\n",
190 vty_out(vty
, "%21s %18s %18s\n", "",
191 "CPU (user+system):", "Real (wall-clock):");
193 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
194 vty_out(vty
, " Avg uSec Max uSecs");
195 vty_out(vty
, " Type Thread\n");
197 if (m
->cpu_record
->count
)
200 (void (*)(struct hash_bucket
*,
201 void *))cpu_record_hash_print
,
204 vty_out(vty
, "No data to display yet.\n");
209 pthread_mutex_unlock(&masters_mtx
);
212 vty_out(vty
, "Total thread statistics\n");
213 vty_out(vty
, "-------------------------\n");
214 vty_out(vty
, "%21s %18s %18s\n", "",
215 "CPU (user+system):", "Real (wall-clock):");
216 vty_out(vty
, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
217 vty_out(vty
, " Avg uSec Max uSecs");
218 vty_out(vty
, " Type Thread\n");
220 if (tmp
.total_calls
> 0)
221 vty_out_cpu_thread_history(vty
, &tmp
);
224 static void cpu_record_hash_clear(struct hash_bucket
*bucket
, void *args
[])
226 uint8_t *filter
= args
[0];
227 struct hash
*cpu_record
= args
[1];
229 struct cpu_thread_history
*a
= bucket
->data
;
231 if (!(a
->types
& *filter
))
234 hash_release(cpu_record
, bucket
->data
);
237 static void cpu_record_clear(uint8_t filter
)
239 uint8_t *tmp
= &filter
;
240 struct thread_master
*m
;
243 pthread_mutex_lock(&masters_mtx
);
245 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
246 pthread_mutex_lock(&m
->mtx
);
248 void *args
[2] = {tmp
, m
->cpu_record
};
251 (void (*)(struct hash_bucket
*,
252 void *))cpu_record_hash_clear
,
255 pthread_mutex_unlock(&m
->mtx
);
258 pthread_mutex_unlock(&masters_mtx
);
261 static uint8_t parse_filter(const char *filterstr
)
266 while (filterstr
[i
] != '\0') {
267 switch (filterstr
[i
]) {
270 filter
|= (1 << THREAD_READ
);
274 filter
|= (1 << THREAD_WRITE
);
278 filter
|= (1 << THREAD_TIMER
);
282 filter
|= (1 << THREAD_EVENT
);
286 filter
|= (1 << THREAD_EXECUTE
);
296 DEFUN (show_thread_cpu
,
298 "show thread cpu [FILTER]",
300 "Thread information\n"
302 "Display filter (rwtex)\n")
304 uint8_t filter
= (uint8_t)-1U;
307 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
308 filter
= parse_filter(argv
[idx
]->arg
);
311 "Invalid filter \"%s\" specified; must contain at least"
318 cpu_record_print(vty
, filter
);
322 static void show_thread_poll_helper(struct vty
*vty
, struct thread_master
*m
)
324 const char *name
= m
->name
? m
->name
: "main";
325 char underline
[strlen(name
) + 1];
326 struct thread
*thread
;
329 memset(underline
, '-', sizeof(underline
));
330 underline
[sizeof(underline
) - 1] = '\0';
332 vty_out(vty
, "\nShowing poll FD's for %s\n", name
);
333 vty_out(vty
, "----------------------%s\n", underline
);
334 vty_out(vty
, "Count: %u/%d\n", (uint32_t)m
->handler
.pfdcount
,
336 for (i
= 0; i
< m
->handler
.pfdcount
; i
++) {
337 vty_out(vty
, "\t%6d fd:%6d events:%2d revents:%2d\t\t", i
,
338 m
->handler
.pfds
[i
].fd
, m
->handler
.pfds
[i
].events
,
339 m
->handler
.pfds
[i
].revents
);
341 if (m
->handler
.pfds
[i
].events
& POLLIN
) {
342 thread
= m
->read
[m
->handler
.pfds
[i
].fd
];
345 vty_out(vty
, "ERROR ");
347 vty_out(vty
, "%s ", thread
->funcname
);
351 if (m
->handler
.pfds
[i
].events
& POLLOUT
) {
352 thread
= m
->write
[m
->handler
.pfds
[i
].fd
];
355 vty_out(vty
, "ERROR\n");
357 vty_out(vty
, "%s\n", thread
->funcname
);
363 DEFUN (show_thread_poll
,
364 show_thread_poll_cmd
,
367 "Thread information\n"
368 "Show poll FD's and information\n")
370 struct listnode
*node
;
371 struct thread_master
*m
;
373 pthread_mutex_lock(&masters_mtx
);
375 for (ALL_LIST_ELEMENTS_RO(masters
, node
, m
)) {
376 show_thread_poll_helper(vty
, m
);
379 pthread_mutex_unlock(&masters_mtx
);
385 DEFUN (clear_thread_cpu
,
386 clear_thread_cpu_cmd
,
387 "clear thread cpu [FILTER]",
388 "Clear stored data in all pthreads\n"
389 "Thread information\n"
391 "Display filter (rwtexb)\n")
393 uint8_t filter
= (uint8_t)-1U;
396 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
397 filter
= parse_filter(argv
[idx
]->arg
);
400 "Invalid filter \"%s\" specified; must contain at least"
407 cpu_record_clear(filter
);
411 void thread_cmd_init(void)
413 install_element(VIEW_NODE
, &show_thread_cpu_cmd
);
414 install_element(VIEW_NODE
, &show_thread_poll_cmd
);
415 install_element(ENABLE_NODE
, &clear_thread_cpu_cmd
);
417 /* CLI end ------------------------------------------------------------------ */
420 static void cancelreq_del(void *cr
)
422 XFREE(MTYPE_TMP
, cr
);
425 /* initializer, only ever called once */
426 static void initializer(void)
428 pthread_key_create(&thread_current
, NULL
);
431 struct thread_master
*thread_master_create(const char *name
)
433 struct thread_master
*rv
;
436 pthread_once(&init_once
, &initializer
);
438 rv
= XCALLOC(MTYPE_THREAD_MASTER
, sizeof(struct thread_master
));
440 /* Initialize master mutex */
441 pthread_mutex_init(&rv
->mtx
, NULL
);
442 pthread_cond_init(&rv
->cancel_cond
, NULL
);
445 rv
->name
= name
? XSTRDUP(MTYPE_THREAD_MASTER
, name
) : NULL
;
447 /* Initialize I/O task data structures */
448 getrlimit(RLIMIT_NOFILE
, &limit
);
449 rv
->fd_limit
= (int)limit
.rlim_cur
;
450 rv
->read
= XCALLOC(MTYPE_THREAD_POLL
,
451 sizeof(struct thread
*) * rv
->fd_limit
);
453 rv
->write
= XCALLOC(MTYPE_THREAD_POLL
,
454 sizeof(struct thread
*) * rv
->fd_limit
);
456 rv
->cpu_record
= hash_create_size(
457 8, (unsigned int (*)(const void *))cpu_record_hash_key
,
458 (bool (*)(const void *, const void *))cpu_record_hash_cmp
,
461 thread_list_init(&rv
->event
);
462 thread_list_init(&rv
->ready
);
463 thread_list_init(&rv
->unuse
);
464 thread_timer_list_init(&rv
->timer
);
466 /* Initialize thread_fetch() settings */
468 rv
->handle_signals
= true;
470 /* Set pthread owner, should be updated by actual owner */
471 rv
->owner
= pthread_self();
472 rv
->cancel_req
= list_new();
473 rv
->cancel_req
->del
= cancelreq_del
;
476 /* Initialize pipe poker */
478 set_nonblocking(rv
->io_pipe
[0]);
479 set_nonblocking(rv
->io_pipe
[1]);
481 /* Initialize data structures for poll() */
482 rv
->handler
.pfdsize
= rv
->fd_limit
;
483 rv
->handler
.pfdcount
= 0;
484 rv
->handler
.pfds
= XCALLOC(MTYPE_THREAD_MASTER
,
485 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
486 rv
->handler
.copy
= XCALLOC(MTYPE_THREAD_MASTER
,
487 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
489 /* add to list of threadmasters */
490 pthread_mutex_lock(&masters_mtx
);
493 masters
= list_new();
495 listnode_add(masters
, rv
);
497 pthread_mutex_unlock(&masters_mtx
);
502 void thread_master_set_name(struct thread_master
*master
, const char *name
)
504 pthread_mutex_lock(&master
->mtx
);
506 XFREE(MTYPE_THREAD_MASTER
, master
->name
);
507 master
->name
= XSTRDUP(MTYPE_THREAD_MASTER
, name
);
509 pthread_mutex_unlock(&master
->mtx
);
512 #define THREAD_UNUSED_DEPTH 10
514 /* Move thread to unuse list. */
515 static void thread_add_unuse(struct thread_master
*m
, struct thread
*thread
)
517 pthread_mutex_t mtxc
= thread
->mtx
;
519 assert(m
!= NULL
&& thread
!= NULL
);
521 thread
->hist
->total_active
--;
522 memset(thread
, 0, sizeof(struct thread
));
523 thread
->type
= THREAD_UNUSED
;
525 /* Restore the thread mutex context. */
528 if (thread_list_count(&m
->unuse
) < THREAD_UNUSED_DEPTH
) {
529 thread_list_add_tail(&m
->unuse
, thread
);
533 thread_free(m
, thread
);
536 /* Free all unused thread. */
537 static void thread_list_free(struct thread_master
*m
,
538 struct thread_list_head
*list
)
542 while ((t
= thread_list_pop(list
)))
546 static void thread_array_free(struct thread_master
*m
,
547 struct thread
**thread_array
)
552 for (index
= 0; index
< m
->fd_limit
; ++index
) {
553 t
= thread_array
[index
];
555 thread_array
[index
] = NULL
;
559 XFREE(MTYPE_THREAD_POLL
, thread_array
);
563 * thread_master_free_unused
565 * As threads are finished with they are put on the
566 * unuse list for later reuse.
567 * If we are shutting down, Free up unused threads
568 * So we can see if we forget to shut anything off
570 void thread_master_free_unused(struct thread_master
*m
)
572 pthread_mutex_lock(&m
->mtx
);
575 while ((t
= thread_list_pop(&m
->unuse
)))
578 pthread_mutex_unlock(&m
->mtx
);
581 /* Stop thread scheduler. */
582 void thread_master_free(struct thread_master
*m
)
586 pthread_mutex_lock(&masters_mtx
);
588 listnode_delete(masters
, m
);
589 if (masters
->count
== 0) {
590 list_delete(&masters
);
593 pthread_mutex_unlock(&masters_mtx
);
595 thread_array_free(m
, m
->read
);
596 thread_array_free(m
, m
->write
);
597 while ((t
= thread_timer_list_pop(&m
->timer
)))
599 thread_list_free(m
, &m
->event
);
600 thread_list_free(m
, &m
->ready
);
601 thread_list_free(m
, &m
->unuse
);
602 pthread_mutex_destroy(&m
->mtx
);
603 pthread_cond_destroy(&m
->cancel_cond
);
604 close(m
->io_pipe
[0]);
605 close(m
->io_pipe
[1]);
606 list_delete(&m
->cancel_req
);
607 m
->cancel_req
= NULL
;
609 hash_clean(m
->cpu_record
, cpu_record_hash_free
);
610 hash_free(m
->cpu_record
);
611 m
->cpu_record
= NULL
;
613 XFREE(MTYPE_THREAD_MASTER
, m
->name
);
614 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.pfds
);
615 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.copy
);
616 XFREE(MTYPE_THREAD_MASTER
, m
);
619 /* Return remain time in miliseconds. */
620 unsigned long thread_timer_remain_msec(struct thread
*thread
)
624 pthread_mutex_lock(&thread
->mtx
);
626 remain
= monotime_until(&thread
->u
.sands
, NULL
) / 1000LL;
628 pthread_mutex_unlock(&thread
->mtx
);
630 return remain
< 0 ? 0 : remain
;
633 /* Return remain time in seconds. */
634 unsigned long thread_timer_remain_second(struct thread
*thread
)
636 return thread_timer_remain_msec(thread
) / 1000LL;
639 #define debugargdef const char *funcname, const char *schedfrom, int fromln
640 #define debugargpass funcname, schedfrom, fromln
642 struct timeval
thread_timer_remain(struct thread
*thread
)
644 struct timeval remain
;
645 pthread_mutex_lock(&thread
->mtx
);
647 monotime_until(&thread
->u
.sands
, &remain
);
649 pthread_mutex_unlock(&thread
->mtx
);
653 /* Get new thread. */
654 static struct thread
*thread_get(struct thread_master
*m
, uint8_t type
,
655 int (*func
)(struct thread
*), void *arg
,
658 struct thread
*thread
= thread_list_pop(&m
->unuse
);
659 struct cpu_thread_history tmp
;
662 thread
= XCALLOC(MTYPE_THREAD
, sizeof(struct thread
));
663 /* mutex only needs to be initialized at struct creation. */
664 pthread_mutex_init(&thread
->mtx
, NULL
);
669 thread
->add_type
= type
;
672 thread
->yield
= THREAD_YIELD_TIME_SLOT
; /* default */
676 * So if the passed in funcname is not what we have
677 * stored that means the thread->hist needs to be
678 * updated. We keep the last one around in unused
679 * under the assumption that we are probably
680 * going to immediately allocate the same
682 * This hopefully saves us some serious
685 if (thread
->funcname
!= funcname
|| thread
->func
!= func
) {
687 tmp
.funcname
= funcname
;
689 hash_get(m
->cpu_record
, &tmp
,
690 (void *(*)(void *))cpu_record_hash_alloc
);
692 thread
->hist
->total_active
++;
694 thread
->funcname
= funcname
;
695 thread
->schedfrom
= schedfrom
;
696 thread
->schedfrom_line
= fromln
;
701 static void thread_free(struct thread_master
*master
, struct thread
*thread
)
703 /* Update statistics. */
704 assert(master
->alloc
> 0);
707 /* Free allocated resources. */
708 pthread_mutex_destroy(&thread
->mtx
);
709 XFREE(MTYPE_THREAD
, thread
);
712 static int fd_poll(struct thread_master
*m
, struct pollfd
*pfds
, nfds_t pfdsize
,
713 nfds_t count
, const struct timeval
*timer_wait
)
715 /* If timer_wait is null here, that means poll() should block
717 * unless the thread_master has overridden it by setting
718 * ->selectpoll_timeout.
719 * If the value is positive, it specifies the maximum number of
721 * to wait. If the timeout is -1, it specifies that we should never wait
723 * always return immediately even if no event is detected. If the value
725 * zero, the behavior is default. */
728 /* number of file descriptors with events */
731 if (timer_wait
!= NULL
732 && m
->selectpoll_timeout
== 0) // use the default value
733 timeout
= (timer_wait
->tv_sec
* 1000)
734 + (timer_wait
->tv_usec
/ 1000);
735 else if (m
->selectpoll_timeout
> 0) // use the user's timeout
736 timeout
= m
->selectpoll_timeout
;
737 else if (m
->selectpoll_timeout
738 < 0) // effect a poll (return immediately)
742 rcu_assert_read_unlocked();
744 /* add poll pipe poker */
745 assert(count
+ 1 < pfdsize
);
746 pfds
[count
].fd
= m
->io_pipe
[0];
747 pfds
[count
].events
= POLLIN
;
748 pfds
[count
].revents
= 0x00;
750 num
= poll(pfds
, count
+ 1, timeout
);
752 unsigned char trash
[64];
753 if (num
> 0 && pfds
[count
].revents
!= 0 && num
--)
754 while (read(m
->io_pipe
[0], &trash
, sizeof(trash
)) > 0)
762 /* Add new read thread. */
763 struct thread
*funcname_thread_add_read_write(int dir
, struct thread_master
*m
,
764 int (*func
)(struct thread
*),
766 struct thread
**t_ptr
,
769 struct thread
*thread
= NULL
;
770 struct thread
**thread_array
;
772 assert(fd
>= 0 && fd
< m
->fd_limit
);
773 pthread_mutex_lock(&m
->mtx
);
776 && *t_ptr
) // thread is already scheduled; don't reschedule
778 pthread_mutex_unlock(&m
->mtx
);
782 /* default to a new pollfd */
783 nfds_t queuepos
= m
->handler
.pfdcount
;
785 if (dir
== THREAD_READ
)
786 thread_array
= m
->read
;
788 thread_array
= m
->write
;
790 /* if we already have a pollfd for our file descriptor, find and
792 for (nfds_t i
= 0; i
< m
->handler
.pfdcount
; i
++)
793 if (m
->handler
.pfds
[i
].fd
== fd
) {
798 * What happens if we have a thread already
799 * created for this event?
801 if (thread_array
[fd
])
802 assert(!"Thread already scheduled for file descriptor");
807 /* make sure we have room for this fd + pipe poker fd */
808 assert(queuepos
+ 1 < m
->handler
.pfdsize
);
810 thread
= thread_get(m
, dir
, func
, arg
, debugargpass
);
812 m
->handler
.pfds
[queuepos
].fd
= fd
;
813 m
->handler
.pfds
[queuepos
].events
|=
814 (dir
== THREAD_READ
? POLLIN
: POLLOUT
);
816 if (queuepos
== m
->handler
.pfdcount
)
817 m
->handler
.pfdcount
++;
820 pthread_mutex_lock(&thread
->mtx
);
823 thread_array
[thread
->u
.fd
] = thread
;
825 pthread_mutex_unlock(&thread
->mtx
);
835 pthread_mutex_unlock(&m
->mtx
);
840 static struct thread
*
841 funcname_thread_add_timer_timeval(struct thread_master
*m
,
842 int (*func
)(struct thread
*), int type
,
843 void *arg
, struct timeval
*time_relative
,
844 struct thread
**t_ptr
, debugargdef
)
846 struct thread
*thread
;
850 assert(type
== THREAD_TIMER
);
851 assert(time_relative
);
853 pthread_mutex_lock(&m
->mtx
);
856 && *t_ptr
) // thread is already scheduled; don't reschedule
858 pthread_mutex_unlock(&m
->mtx
);
862 thread
= thread_get(m
, type
, func
, arg
, debugargpass
);
864 pthread_mutex_lock(&thread
->mtx
);
866 monotime(&thread
->u
.sands
);
867 timeradd(&thread
->u
.sands
, time_relative
,
869 thread_timer_list_add(&m
->timer
, thread
);
875 pthread_mutex_unlock(&thread
->mtx
);
879 pthread_mutex_unlock(&m
->mtx
);
885 /* Add timer event thread. */
886 struct thread
*funcname_thread_add_timer(struct thread_master
*m
,
887 int (*func
)(struct thread
*),
888 void *arg
, long timer
,
889 struct thread
**t_ptr
, debugargdef
)
898 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
899 &trel
, t_ptr
, debugargpass
);
902 /* Add timer event thread with "millisecond" resolution */
903 struct thread
*funcname_thread_add_timer_msec(struct thread_master
*m
,
904 int (*func
)(struct thread
*),
905 void *arg
, long timer
,
906 struct thread
**t_ptr
,
913 trel
.tv_sec
= timer
/ 1000;
914 trel
.tv_usec
= 1000 * (timer
% 1000);
916 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
917 &trel
, t_ptr
, debugargpass
);
920 /* Add timer event thread with "millisecond" resolution */
921 struct thread
*funcname_thread_add_timer_tv(struct thread_master
*m
,
922 int (*func
)(struct thread
*),
923 void *arg
, struct timeval
*tv
,
924 struct thread
**t_ptr
, debugargdef
)
926 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
, tv
,
927 t_ptr
, debugargpass
);
930 /* Add simple event thread. */
931 struct thread
*funcname_thread_add_event(struct thread_master
*m
,
932 int (*func
)(struct thread
*),
934 struct thread
**t_ptr
, debugargdef
)
936 struct thread
*thread
;
940 pthread_mutex_lock(&m
->mtx
);
943 && *t_ptr
) // thread is already scheduled; don't reschedule
945 pthread_mutex_unlock(&m
->mtx
);
949 thread
= thread_get(m
, THREAD_EVENT
, func
, arg
, debugargpass
);
950 pthread_mutex_lock(&thread
->mtx
);
953 thread_list_add_tail(&m
->event
, thread
);
955 pthread_mutex_unlock(&thread
->mtx
);
964 pthread_mutex_unlock(&m
->mtx
);
969 /* Thread cancellation ------------------------------------------------------ */
972 * NOT's out the .events field of pollfd corresponding to the given file
973 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
975 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
976 * implementation for details.
980 * @param state the event to cancel. One or more (OR'd together) of the
985 static void thread_cancel_rw(struct thread_master
*master
, int fd
, short state
)
989 /* Cancel POLLHUP too just in case some bozo set it */
992 /* find the index of corresponding pollfd */
995 for (i
= 0; i
< master
->handler
.pfdcount
; i
++)
996 if (master
->handler
.pfds
[i
].fd
== fd
) {
1003 "[!] Received cancellation request for nonexistent rw job");
1004 zlog_debug("[!] threadmaster: %s | fd: %d",
1005 master
->name
? master
->name
: "", fd
);
1009 /* NOT out event. */
1010 master
->handler
.pfds
[i
].events
&= ~(state
);
1012 /* If all events are canceled, delete / resize the pollfd array. */
1013 if (master
->handler
.pfds
[i
].events
== 0) {
1014 memmove(master
->handler
.pfds
+ i
, master
->handler
.pfds
+ i
+ 1,
1015 (master
->handler
.pfdcount
- i
- 1)
1016 * sizeof(struct pollfd
));
1017 master
->handler
.pfdcount
--;
1020 /* If we have the same pollfd in the copy, perform the same operations,
1021 * otherwise return. */
1022 if (i
>= master
->handler
.copycount
)
1025 master
->handler
.copy
[i
].events
&= ~(state
);
1027 if (master
->handler
.copy
[i
].events
== 0) {
1028 memmove(master
->handler
.copy
+ i
, master
->handler
.copy
+ i
+ 1,
1029 (master
->handler
.copycount
- i
- 1)
1030 * sizeof(struct pollfd
));
1031 master
->handler
.copycount
--;
1036 * Process cancellation requests.
1038 * This may only be run from the pthread which owns the thread_master.
1040 * @param master the thread master to process
1041 * @REQUIRE master->mtx
1043 static void do_thread_cancel(struct thread_master
*master
)
1045 struct thread_list_head
*list
= NULL
;
1046 struct thread
**thread_array
= NULL
;
1047 struct thread
*thread
;
1049 struct cancel_req
*cr
;
1050 struct listnode
*ln
;
1051 for (ALL_LIST_ELEMENTS_RO(master
->cancel_req
, ln
, cr
)) {
1052 /* If this is an event object cancellation, linear search
1054 * list deleting any events which have the specified argument.
1056 * need to check every thread in the ready queue. */
1060 frr_each_safe(thread_list
, &master
->event
, t
) {
1061 if (t
->arg
!= cr
->eventobj
)
1063 thread_list_del(&master
->event
, t
);
1066 thread_add_unuse(master
, t
);
1069 frr_each_safe(thread_list
, &master
->ready
, t
) {
1070 if (t
->arg
!= cr
->eventobj
)
1072 thread_list_del(&master
->ready
, t
);
1075 thread_add_unuse(master
, t
);
1080 /* The pointer varies depending on whether the cancellation
1082 * made asynchronously or not. If it was, we need to check
1084 * thread even exists anymore before cancelling it. */
1085 thread
= (cr
->thread
) ? cr
->thread
: *cr
->threadref
;
1090 /* Determine the appropriate queue to cancel the thread from */
1091 switch (thread
->type
) {
1093 thread_cancel_rw(master
, thread
->u
.fd
, POLLIN
);
1094 thread_array
= master
->read
;
1097 thread_cancel_rw(master
, thread
->u
.fd
, POLLOUT
);
1098 thread_array
= master
->write
;
1101 thread_timer_list_del(&master
->timer
, thread
);
1104 list
= &master
->event
;
1107 list
= &master
->ready
;
1115 thread_list_del(list
, thread
);
1116 } else if (thread_array
) {
1117 thread_array
[thread
->u
.fd
] = NULL
;
1121 *thread
->ref
= NULL
;
1123 thread_add_unuse(thread
->master
, thread
);
1126 /* Delete and free all cancellation requests */
1127 list_delete_all_node(master
->cancel_req
);
1129 /* Wake up any threads which may be blocked in thread_cancel_async() */
1130 master
->canceled
= true;
1131 pthread_cond_broadcast(&master
->cancel_cond
);
1135 * Cancel any events which have the specified argument.
1139 * @param m the thread_master to cancel from
1140 * @param arg the argument passed when creating the event
1142 void thread_cancel_event(struct thread_master
*master
, void *arg
)
1144 assert(master
->owner
== pthread_self());
1146 pthread_mutex_lock(&master
->mtx
);
1148 struct cancel_req
*cr
=
1149 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1151 listnode_add(master
->cancel_req
, cr
);
1152 do_thread_cancel(master
);
1154 pthread_mutex_unlock(&master
->mtx
);
1158 * Cancel a specific task.
1162 * @param thread task to cancel
1164 void thread_cancel(struct thread
*thread
)
1166 struct thread_master
*master
= thread
->master
;
1168 assert(master
->owner
== pthread_self());
1170 pthread_mutex_lock(&master
->mtx
);
1172 struct cancel_req
*cr
=
1173 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1174 cr
->thread
= thread
;
1175 listnode_add(master
->cancel_req
, cr
);
1176 do_thread_cancel(master
);
1178 pthread_mutex_unlock(&master
->mtx
);
1182 * Asynchronous cancellation.
1184 * Called with either a struct thread ** or void * to an event argument,
1185 * this function posts the correct cancellation request and blocks until it is
1188 * If the thread is currently running, execution blocks until it completes.
1190 * The last two parameters are mutually exclusive, i.e. if you pass one the
1191 * other must be NULL.
1193 * When the cancellation procedure executes on the target thread_master, the
1194 * thread * provided is checked for nullity. If it is null, the thread is
1195 * assumed to no longer exist and the cancellation request is a no-op. Thus
1196 * users of this API must pass a back-reference when scheduling the original
1201 * @param master the thread master with the relevant event / task
1202 * @param thread pointer to thread to cancel
1203 * @param eventobj the event
1205 void thread_cancel_async(struct thread_master
*master
, struct thread
**thread
,
1208 assert(!(thread
&& eventobj
) && (thread
|| eventobj
));
1209 assert(master
->owner
!= pthread_self());
1211 pthread_mutex_lock(&master
->mtx
);
1213 master
->canceled
= false;
1216 struct cancel_req
*cr
=
1217 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1218 cr
->threadref
= thread
;
1219 listnode_add(master
->cancel_req
, cr
);
1220 } else if (eventobj
) {
1221 struct cancel_req
*cr
=
1222 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1223 cr
->eventobj
= eventobj
;
1224 listnode_add(master
->cancel_req
, cr
);
1228 while (!master
->canceled
)
1229 pthread_cond_wait(&master
->cancel_cond
, &master
->mtx
);
1231 pthread_mutex_unlock(&master
->mtx
);
1233 /* ------------------------------------------------------------------------- */
1235 static struct timeval
*thread_timer_wait(struct thread_timer_list_head
*timers
,
1236 struct timeval
*timer_val
)
1238 if (!thread_timer_list_count(timers
))
1241 struct thread
*next_timer
= thread_timer_list_first(timers
);
1242 monotime_until(&next_timer
->u
.sands
, timer_val
);
1246 static struct thread
*thread_run(struct thread_master
*m
, struct thread
*thread
,
1247 struct thread
*fetch
)
1250 thread_add_unuse(m
, thread
);
1254 static int thread_process_io_helper(struct thread_master
*m
,
1255 struct thread
*thread
, short state
,
1256 short actual_state
, int pos
)
1258 struct thread
**thread_array
;
1261 * poll() clears the .events field, but the pollfd array we
1262 * pass to poll() is a copy of the one used to schedule threads.
1263 * We need to synchronize state between the two here by applying
1264 * the same changes poll() made on the copy of the "real" pollfd
1267 * This cleans up a possible infinite loop where we refuse
1268 * to respond to a poll event but poll is insistent that
1271 m
->handler
.pfds
[pos
].events
&= ~(state
);
1274 if ((actual_state
& (POLLHUP
|POLLIN
)) != POLLHUP
)
1275 flog_err(EC_LIB_NO_THREAD
,
1276 "Attempting to process an I/O event but for fd: %d(%d) no thread to handle this!\n",
1277 m
->handler
.pfds
[pos
].fd
, actual_state
);
1281 if (thread
->type
== THREAD_READ
)
1282 thread_array
= m
->read
;
1284 thread_array
= m
->write
;
1286 thread_array
[thread
->u
.fd
] = NULL
;
1287 thread_list_add_tail(&m
->ready
, thread
);
1288 thread
->type
= THREAD_READY
;
1294 * Process I/O events.
1296 * Walks through file descriptor array looking for those pollfds whose .revents
1297 * field has something interesting. Deletes any invalid file descriptors.
1299 * @param m the thread master
1300 * @param num the number of active file descriptors (return value of poll())
1302 static void thread_process_io(struct thread_master
*m
, unsigned int num
)
1304 unsigned int ready
= 0;
1305 struct pollfd
*pfds
= m
->handler
.copy
;
1307 for (nfds_t i
= 0; i
< m
->handler
.copycount
&& ready
< num
; ++i
) {
1308 /* no event for current fd? immediately continue */
1309 if (pfds
[i
].revents
== 0)
1314 /* Unless someone has called thread_cancel from another pthread,
1316 * thing that could have changed in m->handler.pfds while we
1318 * asleep is the .events field in a given pollfd. Barring
1320 * that value should be a superset of the values we have in our
1322 * there's no need to update it. Similarily, barring deletion,
1324 * should still be a valid index into the master's pfds. */
1325 if (pfds
[i
].revents
& (POLLIN
| POLLHUP
)) {
1326 thread_process_io_helper(m
, m
->read
[pfds
[i
].fd
], POLLIN
,
1327 pfds
[i
].revents
, i
);
1329 if (pfds
[i
].revents
& POLLOUT
)
1330 thread_process_io_helper(m
, m
->write
[pfds
[i
].fd
],
1331 POLLOUT
, pfds
[i
].revents
, i
);
1333 /* if one of our file descriptors is garbage, remove the same
1335 * both pfds + update sizes and index */
1336 if (pfds
[i
].revents
& POLLNVAL
) {
1337 memmove(m
->handler
.pfds
+ i
, m
->handler
.pfds
+ i
+ 1,
1338 (m
->handler
.pfdcount
- i
- 1)
1339 * sizeof(struct pollfd
));
1340 m
->handler
.pfdcount
--;
1342 memmove(pfds
+ i
, pfds
+ i
+ 1,
1343 (m
->handler
.copycount
- i
- 1)
1344 * sizeof(struct pollfd
));
1345 m
->handler
.copycount
--;
1352 /* Add all timers that have popped to the ready list. */
1353 static unsigned int thread_process_timers(struct thread_timer_list_head
*timers
,
1354 struct timeval
*timenow
)
1356 struct thread
*thread
;
1357 unsigned int ready
= 0;
1359 while ((thread
= thread_timer_list_first(timers
))) {
1360 if (timercmp(timenow
, &thread
->u
.sands
, <))
1362 thread_timer_list_pop(timers
);
1363 thread
->type
= THREAD_READY
;
1364 thread_list_add_tail(&thread
->master
->ready
, thread
);
1370 /* process a list en masse, e.g. for event thread lists */
1371 static unsigned int thread_process(struct thread_list_head
*list
)
1373 struct thread
*thread
;
1374 unsigned int ready
= 0;
1376 while ((thread
= thread_list_pop(list
))) {
1377 thread
->type
= THREAD_READY
;
1378 thread_list_add_tail(&thread
->master
->ready
, thread
);
1385 /* Fetch next ready thread. */
1386 struct thread
*thread_fetch(struct thread_master
*m
, struct thread
*fetch
)
1388 struct thread
*thread
= NULL
;
1390 struct timeval zerotime
= {0, 0};
1392 struct timeval
*tw
= NULL
;
1397 /* Handle signals if any */
1398 if (m
->handle_signals
)
1399 quagga_sigevent_process();
1401 pthread_mutex_lock(&m
->mtx
);
1403 /* Process any pending cancellation requests */
1404 do_thread_cancel(m
);
1407 * Attempt to flush ready queue before going into poll().
1408 * This is performance-critical. Think twice before modifying.
1410 if ((thread
= thread_list_pop(&m
->ready
))) {
1411 fetch
= thread_run(m
, thread
, fetch
);
1414 pthread_mutex_unlock(&m
->mtx
);
1418 /* otherwise, tick through scheduling sequence */
1421 * Post events to ready queue. This must come before the
1422 * following block since events should occur immediately
1424 thread_process(&m
->event
);
1427 * If there are no tasks on the ready queue, we will poll()
1428 * until a timer expires or we receive I/O, whichever comes
1429 * first. The strategy for doing this is:
1431 * - If there are events pending, set the poll() timeout to zero
1432 * - If there are no events pending, but there are timers
1434 * timeout to the smallest remaining time on any timer
1435 * - If there are neither timers nor events pending, but there
1437 * descriptors pending, block indefinitely in poll()
1438 * - If nothing is pending, it's time for the application to die
1440 * In every case except the last, we need to hit poll() at least
1441 * once per loop to avoid starvation by events
1443 if (!thread_list_count(&m
->ready
))
1444 tw
= thread_timer_wait(&m
->timer
, &tv
);
1446 if (thread_list_count(&m
->ready
) ||
1447 (tw
&& !timercmp(tw
, &zerotime
, >)))
1450 if (!tw
&& m
->handler
.pfdcount
== 0) { /* die */
1451 pthread_mutex_unlock(&m
->mtx
);
1457 * Copy pollfd array + # active pollfds in it. Not necessary to
1458 * copy the array size as this is fixed.
1460 m
->handler
.copycount
= m
->handler
.pfdcount
;
1461 memcpy(m
->handler
.copy
, m
->handler
.pfds
,
1462 m
->handler
.copycount
* sizeof(struct pollfd
));
1464 pthread_mutex_unlock(&m
->mtx
);
1466 num
= fd_poll(m
, m
->handler
.copy
, m
->handler
.pfdsize
,
1467 m
->handler
.copycount
, tw
);
1469 pthread_mutex_lock(&m
->mtx
);
1471 /* Handle any errors received in poll() */
1473 if (errno
== EINTR
) {
1474 pthread_mutex_unlock(&m
->mtx
);
1475 /* loop around to signal handler */
1480 flog_err(EC_LIB_SYSTEM_CALL
, "poll() error: %s",
1481 safe_strerror(errno
));
1482 pthread_mutex_unlock(&m
->mtx
);
1487 /* Post timers to ready queue. */
1489 thread_process_timers(&m
->timer
, &now
);
1491 /* Post I/O to ready queue. */
1493 thread_process_io(m
, num
);
1495 pthread_mutex_unlock(&m
->mtx
);
1497 } while (!thread
&& m
->spin
);
1502 static unsigned long timeval_elapsed(struct timeval a
, struct timeval b
)
1504 return (((a
.tv_sec
- b
.tv_sec
) * TIMER_SECOND_MICRO
)
1505 + (a
.tv_usec
- b
.tv_usec
));
1508 unsigned long thread_consumed_time(RUSAGE_T
*now
, RUSAGE_T
*start
,
1509 unsigned long *cputime
)
1511 /* This is 'user + sys' time. */
1512 *cputime
= timeval_elapsed(now
->cpu
.ru_utime
, start
->cpu
.ru_utime
)
1513 + timeval_elapsed(now
->cpu
.ru_stime
, start
->cpu
.ru_stime
);
1514 return timeval_elapsed(now
->real
, start
->real
);
1517 /* We should aim to yield after yield milliseconds, which defaults
1518 to THREAD_YIELD_TIME_SLOT .
1519 Note: we are using real (wall clock) time for this calculation.
1520 It could be argued that CPU time may make more sense in certain
1521 contexts. The things to consider are whether the thread may have
1522 blocked (in which case wall time increases, but CPU time does not),
1523 or whether the system is heavily loaded with other processes competing
1524 for CPU time. On balance, wall clock time seems to make sense.
1525 Plus it has the added benefit that gettimeofday should be faster
1526 than calling getrusage. */
1527 int thread_should_yield(struct thread
*thread
)
1530 pthread_mutex_lock(&thread
->mtx
);
1532 result
= monotime_since(&thread
->real
, NULL
)
1533 > (int64_t)thread
->yield
;
1535 pthread_mutex_unlock(&thread
->mtx
);
1539 void thread_set_yield_time(struct thread
*thread
, unsigned long yield_time
)
1541 pthread_mutex_lock(&thread
->mtx
);
1543 thread
->yield
= yield_time
;
1545 pthread_mutex_unlock(&thread
->mtx
);
1548 void thread_getrusage(RUSAGE_T
*r
)
1550 #if defined RUSAGE_THREAD
1551 #define FRR_RUSAGE RUSAGE_THREAD
1553 #define FRR_RUSAGE RUSAGE_SELF
1556 getrusage(FRR_RUSAGE
, &(r
->cpu
));
1562 * This function will atomically update the thread's usage history. At present
1563 * this is the only spot where usage history is written. Nevertheless the code
1564 * has been written such that the introduction of writers in the future should
1565 * not need to update it provided the writers atomically perform only the
1566 * operations done here, i.e. updating the total and maximum times. In
1567 * particular, the maximum real and cpu times must be monotonically increasing
1568 * or this code is not correct.
1570 void thread_call(struct thread
*thread
)
1572 _Atomic
unsigned long realtime
, cputime
;
1574 unsigned long helper
;
1575 RUSAGE_T before
, after
;
1578 thread
->real
= before
.real
;
1580 pthread_setspecific(thread_current
, thread
);
1581 (*thread
->func
)(thread
);
1582 pthread_setspecific(thread_current
, NULL
);
1586 realtime
= thread_consumed_time(&after
, &before
, &helper
);
1589 /* update realtime */
1590 atomic_fetch_add_explicit(&thread
->hist
->real
.total
, realtime
,
1591 memory_order_seq_cst
);
1592 exp
= atomic_load_explicit(&thread
->hist
->real
.max
,
1593 memory_order_seq_cst
);
1594 while (exp
< realtime
1595 && !atomic_compare_exchange_weak_explicit(
1596 &thread
->hist
->real
.max
, &exp
, realtime
,
1597 memory_order_seq_cst
, memory_order_seq_cst
))
1600 /* update cputime */
1601 atomic_fetch_add_explicit(&thread
->hist
->cpu
.total
, cputime
,
1602 memory_order_seq_cst
);
1603 exp
= atomic_load_explicit(&thread
->hist
->cpu
.max
,
1604 memory_order_seq_cst
);
1605 while (exp
< cputime
1606 && !atomic_compare_exchange_weak_explicit(
1607 &thread
->hist
->cpu
.max
, &exp
, cputime
,
1608 memory_order_seq_cst
, memory_order_seq_cst
))
1611 atomic_fetch_add_explicit(&thread
->hist
->total_calls
, 1,
1612 memory_order_seq_cst
);
1613 atomic_fetch_or_explicit(&thread
->hist
->types
, 1 << thread
->add_type
,
1614 memory_order_seq_cst
);
1616 #ifdef CONSUMED_TIME_CHECK
1617 if (realtime
> CONSUMED_TIME_CHECK
) {
1619 * We have a CPU Hog on our hands.
1620 * Whinge about it now, so we're aware this is yet another task
1625 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1626 thread
->funcname
, (unsigned long)thread
->func
,
1627 realtime
/ 1000, cputime
/ 1000);
1629 #endif /* CONSUMED_TIME_CHECK */
1632 /* Execute thread */
1633 void funcname_thread_execute(struct thread_master
*m
,
1634 int (*func
)(struct thread
*), void *arg
, int val
,
1637 struct thread
*thread
;
1639 /* Get or allocate new thread to execute. */
1640 pthread_mutex_lock(&m
->mtx
);
1642 thread
= thread_get(m
, THREAD_EVENT
, func
, arg
, debugargpass
);
1644 /* Set its event value. */
1645 pthread_mutex_lock(&thread
->mtx
);
1647 thread
->add_type
= THREAD_EXECUTE
;
1648 thread
->u
.val
= val
;
1649 thread
->ref
= &thread
;
1651 pthread_mutex_unlock(&thread
->mtx
);
1653 pthread_mutex_unlock(&m
->mtx
);
1655 /* Execute thread doing all accounting. */
1656 thread_call(thread
);
1658 /* Give back or free thread. */
1659 thread_add_unuse(m
, thread
);