1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
4 * This file is part of GNU Zebra.
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 #include <sys/resource.h>
35 #include "frratomic.h"
36 #include "lib_errors.h"
38 DEFINE_MTYPE_STATIC(LIB
, THREAD
, "Thread")
39 DEFINE_MTYPE_STATIC(LIB
, THREAD_MASTER
, "Thread master")
40 DEFINE_MTYPE_STATIC(LIB
, THREAD_POLL
, "Thread Poll Info")
41 DEFINE_MTYPE_STATIC(LIB
, THREAD_STATS
, "Thread stats")
43 #if defined(__APPLE__)
44 #include <mach/mach.h>
45 #include <mach/mach_time.h>
50 static unsigned char wakebyte = 0x01; \
51 write(m->io_pipe[1], &wakebyte, 1); \
54 /* control variable for initializer */
55 pthread_once_t init_once
= PTHREAD_ONCE_INIT
;
56 pthread_key_t thread_current
;
58 pthread_mutex_t masters_mtx
= PTHREAD_MUTEX_INITIALIZER
;
59 static struct list
*masters
;
61 static void thread_free(struct thread_master
*master
, struct thread
*thread
);
63 /* CLI start ---------------------------------------------------------------- */
64 static unsigned int cpu_record_hash_key(struct cpu_thread_history
*a
)
66 int size
= sizeof(a
->func
);
68 return jhash(&a
->func
, size
, 0);
71 static bool cpu_record_hash_cmp(const struct cpu_thread_history
*a
,
72 const struct cpu_thread_history
*b
)
74 return a
->func
== b
->func
;
77 static void *cpu_record_hash_alloc(struct cpu_thread_history
*a
)
79 struct cpu_thread_history
*new;
80 new = XCALLOC(MTYPE_THREAD_STATS
, sizeof(struct cpu_thread_history
));
82 new->funcname
= a
->funcname
;
86 static void cpu_record_hash_free(void *a
)
88 struct cpu_thread_history
*hist
= a
;
90 XFREE(MTYPE_THREAD_STATS
, hist
);
93 static void vty_out_cpu_thread_history(struct vty
*vty
,
94 struct cpu_thread_history
*a
)
96 vty_out(vty
, "%5"PRIdFAST32
" %10lu.%03lu %9"PRIuFAST32
97 " %8lu %9lu %8lu %9lu", a
->total_active
,
98 a
->cpu
.total
/ 1000, a
->cpu
.total
% 1000, a
->total_calls
,
99 a
->cpu
.total
/ a
->total_calls
, a
->cpu
.max
,
100 a
->real
.total
/ a
->total_calls
, a
->real
.max
);
101 vty_out(vty
, " %c%c%c%c%c %s\n",
102 a
->types
& (1 << THREAD_READ
) ? 'R' : ' ',
103 a
->types
& (1 << THREAD_WRITE
) ? 'W' : ' ',
104 a
->types
& (1 << THREAD_TIMER
) ? 'T' : ' ',
105 a
->types
& (1 << THREAD_EVENT
) ? 'E' : ' ',
106 a
->types
& (1 << THREAD_EXECUTE
) ? 'X' : ' ', a
->funcname
);
109 static void cpu_record_hash_print(struct hash_bucket
*bucket
, void *args
[])
111 struct cpu_thread_history
*totals
= args
[0];
112 struct cpu_thread_history copy
;
113 struct vty
*vty
= args
[1];
114 uint8_t *filter
= args
[2];
116 struct cpu_thread_history
*a
= bucket
->data
;
119 atomic_load_explicit(&a
->total_active
, memory_order_seq_cst
);
121 atomic_load_explicit(&a
->total_calls
, memory_order_seq_cst
);
123 atomic_load_explicit(&a
->cpu
.total
, memory_order_seq_cst
);
124 copy
.cpu
.max
= atomic_load_explicit(&a
->cpu
.max
, memory_order_seq_cst
);
126 atomic_load_explicit(&a
->real
.total
, memory_order_seq_cst
);
128 atomic_load_explicit(&a
->real
.max
, memory_order_seq_cst
);
129 copy
.types
= atomic_load_explicit(&a
->types
, memory_order_seq_cst
);
130 copy
.funcname
= a
->funcname
;
132 if (!(copy
.types
& *filter
))
135 vty_out_cpu_thread_history(vty
, ©
);
136 totals
->total_active
+= copy
.total_active
;
137 totals
->total_calls
+= copy
.total_calls
;
138 totals
->real
.total
+= copy
.real
.total
;
139 if (totals
->real
.max
< copy
.real
.max
)
140 totals
->real
.max
= copy
.real
.max
;
141 totals
->cpu
.total
+= copy
.cpu
.total
;
142 if (totals
->cpu
.max
< copy
.cpu
.max
)
143 totals
->cpu
.max
= copy
.cpu
.max
;
146 static void cpu_record_print(struct vty
*vty
, uint8_t filter
)
148 struct cpu_thread_history tmp
;
149 void *args
[3] = {&tmp
, vty
, &filter
};
150 struct thread_master
*m
;
153 memset(&tmp
, 0, sizeof tmp
);
154 tmp
.funcname
= "TOTAL";
157 pthread_mutex_lock(&masters_mtx
);
159 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
160 const char *name
= m
->name
? m
->name
: "main";
162 char underline
[strlen(name
) + 1];
163 memset(underline
, '-', sizeof(underline
));
164 underline
[sizeof(underline
) - 1] = '\0';
167 vty_out(vty
, "Showing statistics for pthread %s\n",
169 vty_out(vty
, "-------------------------------%s\n",
171 vty_out(vty
, "%21s %18s %18s\n", "",
172 "CPU (user+system):", "Real (wall-clock):");
174 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
175 vty_out(vty
, " Avg uSec Max uSecs");
176 vty_out(vty
, " Type Thread\n");
178 if (m
->cpu_record
->count
)
181 (void (*)(struct hash_bucket
*,
182 void *))cpu_record_hash_print
,
185 vty_out(vty
, "No data to display yet.\n");
190 pthread_mutex_unlock(&masters_mtx
);
193 vty_out(vty
, "Total thread statistics\n");
194 vty_out(vty
, "-------------------------\n");
195 vty_out(vty
, "%21s %18s %18s\n", "",
196 "CPU (user+system):", "Real (wall-clock):");
197 vty_out(vty
, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
198 vty_out(vty
, " Avg uSec Max uSecs");
199 vty_out(vty
, " Type Thread\n");
201 if (tmp
.total_calls
> 0)
202 vty_out_cpu_thread_history(vty
, &tmp
);
205 static void cpu_record_hash_clear(struct hash_bucket
*bucket
, void *args
[])
207 uint8_t *filter
= args
[0];
208 struct hash
*cpu_record
= args
[1];
210 struct cpu_thread_history
*a
= bucket
->data
;
212 if (!(a
->types
& *filter
))
215 hash_release(cpu_record
, bucket
->data
);
218 static void cpu_record_clear(uint8_t filter
)
220 uint8_t *tmp
= &filter
;
221 struct thread_master
*m
;
224 pthread_mutex_lock(&masters_mtx
);
226 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
227 pthread_mutex_lock(&m
->mtx
);
229 void *args
[2] = {tmp
, m
->cpu_record
};
232 (void (*)(struct hash_bucket
*,
233 void *))cpu_record_hash_clear
,
236 pthread_mutex_unlock(&m
->mtx
);
239 pthread_mutex_unlock(&masters_mtx
);
242 static uint8_t parse_filter(const char *filterstr
)
247 while (filterstr
[i
] != '\0') {
248 switch (filterstr
[i
]) {
251 filter
|= (1 << THREAD_READ
);
255 filter
|= (1 << THREAD_WRITE
);
259 filter
|= (1 << THREAD_TIMER
);
263 filter
|= (1 << THREAD_EVENT
);
267 filter
|= (1 << THREAD_EXECUTE
);
277 DEFUN (show_thread_cpu
,
279 "show thread cpu [FILTER]",
281 "Thread information\n"
283 "Display filter (rwtexb)\n")
285 uint8_t filter
= (uint8_t)-1U;
288 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
289 filter
= parse_filter(argv
[idx
]->arg
);
292 "Invalid filter \"%s\" specified; must contain at least"
299 cpu_record_print(vty
, filter
);
303 static void show_thread_poll_helper(struct vty
*vty
, struct thread_master
*m
)
305 const char *name
= m
->name
? m
->name
: "main";
306 char underline
[strlen(name
) + 1];
309 memset(underline
, '-', sizeof(underline
));
310 underline
[sizeof(underline
) - 1] = '\0';
312 vty_out(vty
, "\nShowing poll FD's for %s\n", name
);
313 vty_out(vty
, "----------------------%s\n", underline
);
314 vty_out(vty
, "Count: %u\n", (uint32_t)m
->handler
.pfdcount
);
315 for (i
= 0; i
< m
->handler
.pfdcount
; i
++)
316 vty_out(vty
, "\t%6d fd:%6d events:%2d revents:%2d\n", i
,
317 m
->handler
.pfds
[i
].fd
,
318 m
->handler
.pfds
[i
].events
,
319 m
->handler
.pfds
[i
].revents
);
322 DEFUN (show_thread_poll
,
323 show_thread_poll_cmd
,
326 "Thread information\n"
327 "Show poll FD's and information\n")
329 struct listnode
*node
;
330 struct thread_master
*m
;
332 pthread_mutex_lock(&masters_mtx
);
334 for (ALL_LIST_ELEMENTS_RO(masters
, node
, m
)) {
335 show_thread_poll_helper(vty
, m
);
338 pthread_mutex_unlock(&masters_mtx
);
344 DEFUN (clear_thread_cpu
,
345 clear_thread_cpu_cmd
,
346 "clear thread cpu [FILTER]",
347 "Clear stored data in all pthreads\n"
348 "Thread information\n"
350 "Display filter (rwtexb)\n")
352 uint8_t filter
= (uint8_t)-1U;
355 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
356 filter
= parse_filter(argv
[idx
]->arg
);
359 "Invalid filter \"%s\" specified; must contain at least"
366 cpu_record_clear(filter
);
370 void thread_cmd_init(void)
372 install_element(VIEW_NODE
, &show_thread_cpu_cmd
);
373 install_element(VIEW_NODE
, &show_thread_poll_cmd
);
374 install_element(ENABLE_NODE
, &clear_thread_cpu_cmd
);
376 /* CLI end ------------------------------------------------------------------ */
379 static int thread_timer_cmp(void *a
, void *b
)
381 struct thread
*thread_a
= a
;
382 struct thread
*thread_b
= b
;
384 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, <))
386 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, >))
391 static void thread_timer_update(void *node
, int actual_position
)
393 struct thread
*thread
= node
;
395 thread
->index
= actual_position
;
398 static void cancelreq_del(void *cr
)
400 XFREE(MTYPE_TMP
, cr
);
403 /* initializer, only ever called once */
404 static void initializer(void)
406 pthread_key_create(&thread_current
, NULL
);
409 struct thread_master
*thread_master_create(const char *name
)
411 struct thread_master
*rv
;
414 pthread_once(&init_once
, &initializer
);
416 rv
= XCALLOC(MTYPE_THREAD_MASTER
, sizeof(struct thread_master
));
420 /* Initialize master mutex */
421 pthread_mutex_init(&rv
->mtx
, NULL
);
422 pthread_cond_init(&rv
->cancel_cond
, NULL
);
425 rv
->name
= name
? XSTRDUP(MTYPE_THREAD_MASTER
, name
) : NULL
;
427 /* Initialize I/O task data structures */
428 getrlimit(RLIMIT_NOFILE
, &limit
);
429 rv
->fd_limit
= (int)limit
.rlim_cur
;
430 rv
->read
= XCALLOC(MTYPE_THREAD_POLL
,
431 sizeof(struct thread
*) * rv
->fd_limit
);
433 rv
->write
= XCALLOC(MTYPE_THREAD_POLL
,
434 sizeof(struct thread
*) * rv
->fd_limit
);
436 rv
->cpu_record
= hash_create_size(
437 8, (unsigned int (*)(void *))cpu_record_hash_key
,
438 (bool (*)(const void *, const void *))cpu_record_hash_cmp
,
442 /* Initialize the timer queues */
443 rv
->timer
= pqueue_create();
444 rv
->timer
->cmp
= thread_timer_cmp
;
445 rv
->timer
->update
= thread_timer_update
;
447 /* Initialize thread_fetch() settings */
449 rv
->handle_signals
= true;
451 /* Set pthread owner, should be updated by actual owner */
452 rv
->owner
= pthread_self();
453 rv
->cancel_req
= list_new();
454 rv
->cancel_req
->del
= cancelreq_del
;
457 /* Initialize pipe poker */
459 set_nonblocking(rv
->io_pipe
[0]);
460 set_nonblocking(rv
->io_pipe
[1]);
462 /* Initialize data structures for poll() */
463 rv
->handler
.pfdsize
= rv
->fd_limit
;
464 rv
->handler
.pfdcount
= 0;
465 rv
->handler
.pfds
= XCALLOC(MTYPE_THREAD_MASTER
,
466 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
467 rv
->handler
.copy
= XCALLOC(MTYPE_THREAD_MASTER
,
468 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
470 /* add to list of threadmasters */
471 pthread_mutex_lock(&masters_mtx
);
474 masters
= list_new();
476 listnode_add(masters
, rv
);
478 pthread_mutex_unlock(&masters_mtx
);
483 void thread_master_set_name(struct thread_master
*master
, const char *name
)
485 pthread_mutex_lock(&master
->mtx
);
488 XFREE(MTYPE_THREAD_MASTER
, master
->name
);
489 master
->name
= XSTRDUP(MTYPE_THREAD_MASTER
, name
);
491 pthread_mutex_unlock(&master
->mtx
);
494 /* Add a new thread to the list. */
495 static void thread_list_add(struct thread_list
*list
, struct thread
*thread
)
498 thread
->prev
= list
->tail
;
500 list
->tail
->next
= thread
;
507 /* Delete a thread from the list. */
508 static struct thread
*thread_list_delete(struct thread_list
*list
,
509 struct thread
*thread
)
512 thread
->next
->prev
= thread
->prev
;
514 list
->tail
= thread
->prev
;
516 thread
->prev
->next
= thread
->next
;
518 list
->head
= thread
->next
;
519 thread
->next
= thread
->prev
= NULL
;
524 /* Thread list is empty or not. */
525 static int thread_empty(struct thread_list
*list
)
527 return list
->head
? 0 : 1;
530 /* Delete top of the list and return it. */
531 static struct thread
*thread_trim_head(struct thread_list
*list
)
533 if (!thread_empty(list
))
534 return thread_list_delete(list
, list
->head
);
538 #define THREAD_UNUSED_DEPTH 10
540 /* Move thread to unuse list. */
541 static void thread_add_unuse(struct thread_master
*m
, struct thread
*thread
)
543 pthread_mutex_t mtxc
= thread
->mtx
;
545 assert(m
!= NULL
&& thread
!= NULL
);
546 assert(thread
->next
== NULL
);
547 assert(thread
->prev
== NULL
);
549 thread
->hist
->total_active
--;
550 memset(thread
, 0, sizeof(struct thread
));
551 thread
->type
= THREAD_UNUSED
;
553 /* Restore the thread mutex context. */
556 if (m
->unuse
.count
< THREAD_UNUSED_DEPTH
) {
557 thread_list_add(&m
->unuse
, thread
);
561 thread_free(m
, thread
);
564 /* Free all unused thread. */
565 static void thread_list_free(struct thread_master
*m
, struct thread_list
*list
)
570 for (t
= list
->head
; t
; t
= next
) {
577 static void thread_array_free(struct thread_master
*m
,
578 struct thread
**thread_array
)
583 for (index
= 0; index
< m
->fd_limit
; ++index
) {
584 t
= thread_array
[index
];
586 thread_array
[index
] = NULL
;
590 XFREE(MTYPE_THREAD_POLL
, thread_array
);
593 static void thread_queue_free(struct thread_master
*m
, struct pqueue
*queue
)
597 for (i
= 0; i
< queue
->size
; i
++)
598 thread_free(m
, queue
->array
[i
]);
600 pqueue_delete(queue
);
604 * thread_master_free_unused
606 * As threads are finished with they are put on the
607 * unuse list for later reuse.
608 * If we are shutting down, Free up unused threads
609 * So we can see if we forget to shut anything off
611 void thread_master_free_unused(struct thread_master
*m
)
613 pthread_mutex_lock(&m
->mtx
);
616 while ((t
= thread_trim_head(&m
->unuse
)) != NULL
) {
620 pthread_mutex_unlock(&m
->mtx
);
623 /* Stop thread scheduler. */
624 void thread_master_free(struct thread_master
*m
)
626 pthread_mutex_lock(&masters_mtx
);
628 listnode_delete(masters
, m
);
629 if (masters
->count
== 0) {
630 list_delete(&masters
);
633 pthread_mutex_unlock(&masters_mtx
);
635 thread_array_free(m
, m
->read
);
636 thread_array_free(m
, m
->write
);
637 thread_queue_free(m
, m
->timer
);
638 thread_list_free(m
, &m
->event
);
639 thread_list_free(m
, &m
->ready
);
640 thread_list_free(m
, &m
->unuse
);
641 pthread_mutex_destroy(&m
->mtx
);
642 pthread_cond_destroy(&m
->cancel_cond
);
643 close(m
->io_pipe
[0]);
644 close(m
->io_pipe
[1]);
645 list_delete(&m
->cancel_req
);
646 m
->cancel_req
= NULL
;
648 hash_clean(m
->cpu_record
, cpu_record_hash_free
);
649 hash_free(m
->cpu_record
);
650 m
->cpu_record
= NULL
;
653 XFREE(MTYPE_THREAD_MASTER
, m
->name
);
654 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.pfds
);
655 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.copy
);
656 XFREE(MTYPE_THREAD_MASTER
, m
);
659 /* Return remain time in miliseconds. */
660 unsigned long thread_timer_remain_msec(struct thread
*thread
)
664 pthread_mutex_lock(&thread
->mtx
);
666 remain
= monotime_until(&thread
->u
.sands
, NULL
) / 1000LL;
668 pthread_mutex_unlock(&thread
->mtx
);
670 return remain
< 0 ? 0 : remain
;
673 /* Return remain time in seconds. */
674 unsigned long thread_timer_remain_second(struct thread
*thread
)
676 return thread_timer_remain_msec(thread
) / 1000LL;
679 #define debugargdef const char *funcname, const char *schedfrom, int fromln
680 #define debugargpass funcname, schedfrom, fromln
682 struct timeval
thread_timer_remain(struct thread
*thread
)
684 struct timeval remain
;
685 pthread_mutex_lock(&thread
->mtx
);
687 monotime_until(&thread
->u
.sands
, &remain
);
689 pthread_mutex_unlock(&thread
->mtx
);
693 /* Get new thread. */
694 static struct thread
*thread_get(struct thread_master
*m
, uint8_t type
,
695 int (*func
)(struct thread
*), void *arg
,
698 struct thread
*thread
= thread_trim_head(&m
->unuse
);
699 struct cpu_thread_history tmp
;
702 thread
= XCALLOC(MTYPE_THREAD
, sizeof(struct thread
));
703 /* mutex only needs to be initialized at struct creation. */
704 pthread_mutex_init(&thread
->mtx
, NULL
);
709 thread
->add_type
= type
;
713 thread
->yield
= THREAD_YIELD_TIME_SLOT
; /* default */
717 * So if the passed in funcname is not what we have
718 * stored that means the thread->hist needs to be
719 * updated. We keep the last one around in unused
720 * under the assumption that we are probably
721 * going to immediately allocate the same
723 * This hopefully saves us some serious
726 if (thread
->funcname
!= funcname
|| thread
->func
!= func
) {
728 tmp
.funcname
= funcname
;
730 hash_get(m
->cpu_record
, &tmp
,
731 (void *(*)(void *))cpu_record_hash_alloc
);
733 thread
->hist
->total_active
++;
735 thread
->funcname
= funcname
;
736 thread
->schedfrom
= schedfrom
;
737 thread
->schedfrom_line
= fromln
;
742 static void thread_free(struct thread_master
*master
, struct thread
*thread
)
744 /* Update statistics. */
745 assert(master
->alloc
> 0);
748 /* Free allocated resources. */
749 pthread_mutex_destroy(&thread
->mtx
);
750 XFREE(MTYPE_THREAD
, thread
);
753 static int fd_poll(struct thread_master
*m
, struct pollfd
*pfds
, nfds_t pfdsize
,
754 nfds_t count
, const struct timeval
*timer_wait
)
756 /* If timer_wait is null here, that means poll() should block
758 * unless the thread_master has overriden it by setting
759 * ->selectpoll_timeout.
760 * If the value is positive, it specifies the maximum number of
762 * to wait. If the timeout is -1, it specifies that we should never wait
764 * always return immediately even if no event is detected. If the value
766 * zero, the behavior is default. */
769 /* number of file descriptors with events */
772 if (timer_wait
!= NULL
773 && m
->selectpoll_timeout
== 0) // use the default value
774 timeout
= (timer_wait
->tv_sec
* 1000)
775 + (timer_wait
->tv_usec
/ 1000);
776 else if (m
->selectpoll_timeout
> 0) // use the user's timeout
777 timeout
= m
->selectpoll_timeout
;
778 else if (m
->selectpoll_timeout
779 < 0) // effect a poll (return immediately)
782 /* add poll pipe poker */
783 assert(count
+ 1 < pfdsize
);
784 pfds
[count
].fd
= m
->io_pipe
[0];
785 pfds
[count
].events
= POLLIN
;
786 pfds
[count
].revents
= 0x00;
788 num
= poll(pfds
, count
+ 1, timeout
);
790 unsigned char trash
[64];
791 if (num
> 0 && pfds
[count
].revents
!= 0 && num
--)
792 while (read(m
->io_pipe
[0], &trash
, sizeof(trash
)) > 0)
798 /* Add new read thread. */
799 struct thread
*funcname_thread_add_read_write(int dir
, struct thread_master
*m
,
800 int (*func
)(struct thread
*),
802 struct thread
**t_ptr
,
805 struct thread
*thread
= NULL
;
807 assert(fd
>= 0 && fd
< m
->fd_limit
);
808 pthread_mutex_lock(&m
->mtx
);
811 && *t_ptr
) // thread is already scheduled; don't reschedule
813 pthread_mutex_unlock(&m
->mtx
);
817 /* default to a new pollfd */
818 nfds_t queuepos
= m
->handler
.pfdcount
;
820 /* if we already have a pollfd for our file descriptor, find and
822 for (nfds_t i
= 0; i
< m
->handler
.pfdcount
; i
++)
823 if (m
->handler
.pfds
[i
].fd
== fd
) {
828 /* make sure we have room for this fd + pipe poker fd */
829 assert(queuepos
+ 1 < m
->handler
.pfdsize
);
831 thread
= thread_get(m
, dir
, func
, arg
, debugargpass
);
833 m
->handler
.pfds
[queuepos
].fd
= fd
;
834 m
->handler
.pfds
[queuepos
].events
|=
835 (dir
== THREAD_READ
? POLLIN
: POLLOUT
);
837 if (queuepos
== m
->handler
.pfdcount
)
838 m
->handler
.pfdcount
++;
841 pthread_mutex_lock(&thread
->mtx
);
844 if (dir
== THREAD_READ
)
845 m
->read
[thread
->u
.fd
] = thread
;
847 m
->write
[thread
->u
.fd
] = thread
;
849 pthread_mutex_unlock(&thread
->mtx
);
859 pthread_mutex_unlock(&m
->mtx
);
864 static struct thread
*
865 funcname_thread_add_timer_timeval(struct thread_master
*m
,
866 int (*func
)(struct thread
*), int type
,
867 void *arg
, struct timeval
*time_relative
,
868 struct thread
**t_ptr
, debugargdef
)
870 struct thread
*thread
;
871 struct pqueue
*queue
;
875 assert(type
== THREAD_TIMER
);
876 assert(time_relative
);
878 pthread_mutex_lock(&m
->mtx
);
881 && *t_ptr
) // thread is already scheduled; don't reschedule
883 pthread_mutex_unlock(&m
->mtx
);
888 thread
= thread_get(m
, type
, func
, arg
, debugargpass
);
890 pthread_mutex_lock(&thread
->mtx
);
892 monotime(&thread
->u
.sands
);
893 timeradd(&thread
->u
.sands
, time_relative
,
895 pqueue_enqueue(thread
, queue
);
901 pthread_mutex_unlock(&thread
->mtx
);
905 pthread_mutex_unlock(&m
->mtx
);
911 /* Add timer event thread. */
912 struct thread
*funcname_thread_add_timer(struct thread_master
*m
,
913 int (*func
)(struct thread
*),
914 void *arg
, long timer
,
915 struct thread
**t_ptr
, debugargdef
)
924 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
925 &trel
, t_ptr
, debugargpass
);
928 /* Add timer event thread with "millisecond" resolution */
929 struct thread
*funcname_thread_add_timer_msec(struct thread_master
*m
,
930 int (*func
)(struct thread
*),
931 void *arg
, long timer
,
932 struct thread
**t_ptr
,
939 trel
.tv_sec
= timer
/ 1000;
940 trel
.tv_usec
= 1000 * (timer
% 1000);
942 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
943 &trel
, t_ptr
, debugargpass
);
946 /* Add timer event thread with "millisecond" resolution */
947 struct thread
*funcname_thread_add_timer_tv(struct thread_master
*m
,
948 int (*func
)(struct thread
*),
949 void *arg
, struct timeval
*tv
,
950 struct thread
**t_ptr
, debugargdef
)
952 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
, tv
,
953 t_ptr
, debugargpass
);
956 /* Add simple event thread. */
957 struct thread
*funcname_thread_add_event(struct thread_master
*m
,
958 int (*func
)(struct thread
*),
960 struct thread
**t_ptr
, debugargdef
)
962 struct thread
*thread
;
966 pthread_mutex_lock(&m
->mtx
);
969 && *t_ptr
) // thread is already scheduled; don't reschedule
971 pthread_mutex_unlock(&m
->mtx
);
975 thread
= thread_get(m
, THREAD_EVENT
, func
, arg
, debugargpass
);
976 pthread_mutex_lock(&thread
->mtx
);
979 thread_list_add(&m
->event
, thread
);
981 pthread_mutex_unlock(&thread
->mtx
);
990 pthread_mutex_unlock(&m
->mtx
);
995 /* Thread cancellation ------------------------------------------------------ */
998 * NOT's out the .events field of pollfd corresponding to the given file
999 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
1001 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
1002 * implementation for details.
1006 * @param state the event to cancel. One or more (OR'd together) of the
1011 static void thread_cancel_rw(struct thread_master
*master
, int fd
, short state
)
1015 /* Cancel POLLHUP too just in case some bozo set it */
1018 /* find the index of corresponding pollfd */
1021 for (i
= 0; i
< master
->handler
.pfdcount
; i
++)
1022 if (master
->handler
.pfds
[i
].fd
== fd
) {
1029 "[!] Received cancellation request for nonexistent rw job");
1030 zlog_debug("[!] threadmaster: %s | fd: %d",
1031 master
->name
? master
->name
: "", fd
);
1035 /* NOT out event. */
1036 master
->handler
.pfds
[i
].events
&= ~(state
);
1038 /* If all events are canceled, delete / resize the pollfd array. */
1039 if (master
->handler
.pfds
[i
].events
== 0) {
1040 memmove(master
->handler
.pfds
+ i
, master
->handler
.pfds
+ i
+ 1,
1041 (master
->handler
.pfdcount
- i
- 1)
1042 * sizeof(struct pollfd
));
1043 master
->handler
.pfdcount
--;
1046 /* If we have the same pollfd in the copy, perform the same operations,
1047 * otherwise return. */
1048 if (i
>= master
->handler
.copycount
)
1051 master
->handler
.copy
[i
].events
&= ~(state
);
1053 if (master
->handler
.copy
[i
].events
== 0) {
1054 memmove(master
->handler
.copy
+ i
, master
->handler
.copy
+ i
+ 1,
1055 (master
->handler
.copycount
- i
- 1)
1056 * sizeof(struct pollfd
));
1057 master
->handler
.copycount
--;
1062 * Process cancellation requests.
1064 * This may only be run from the pthread which owns the thread_master.
1066 * @param master the thread master to process
1067 * @REQUIRE master->mtx
1069 static void do_thread_cancel(struct thread_master
*master
)
1071 struct thread_list
*list
= NULL
;
1072 struct pqueue
*queue
= NULL
;
1073 struct thread
**thread_array
= NULL
;
1074 struct thread
*thread
;
1076 struct cancel_req
*cr
;
1077 struct listnode
*ln
;
1078 for (ALL_LIST_ELEMENTS_RO(master
->cancel_req
, ln
, cr
)) {
1079 /* If this is an event object cancellation, linear search
1081 * list deleting any events which have the specified argument.
1083 * need to check every thread in the ready queue. */
1086 thread
= master
->event
.head
;
1092 if (t
->arg
== cr
->eventobj
) {
1093 thread_list_delete(&master
->event
, t
);
1096 thread_add_unuse(master
, t
);
1100 thread
= master
->ready
.head
;
1105 if (t
->arg
== cr
->eventobj
) {
1106 thread_list_delete(&master
->ready
, t
);
1109 thread_add_unuse(master
, t
);
1115 /* The pointer varies depending on whether the cancellation
1117 * made asynchronously or not. If it was, we need to check
1119 * thread even exists anymore before cancelling it. */
1120 thread
= (cr
->thread
) ? cr
->thread
: *cr
->threadref
;
1125 /* Determine the appropriate queue to cancel the thread from */
1126 switch (thread
->type
) {
1128 thread_cancel_rw(master
, thread
->u
.fd
, POLLIN
);
1129 thread_array
= master
->read
;
1132 thread_cancel_rw(master
, thread
->u
.fd
, POLLOUT
);
1133 thread_array
= master
->write
;
1136 queue
= master
->timer
;
1139 list
= &master
->event
;
1142 list
= &master
->ready
;
1150 assert(thread
->index
>= 0);
1151 assert(thread
== queue
->array
[thread
->index
]);
1152 pqueue_remove_at(thread
->index
, queue
);
1154 thread_list_delete(list
, thread
);
1155 } else if (thread_array
) {
1156 thread_array
[thread
->u
.fd
] = NULL
;
1158 assert(!"Thread should be either in queue or list or array!");
1162 *thread
->ref
= NULL
;
1164 thread_add_unuse(thread
->master
, thread
);
1167 /* Delete and free all cancellation requests */
1168 list_delete_all_node(master
->cancel_req
);
1170 /* Wake up any threads which may be blocked in thread_cancel_async() */
1171 master
->canceled
= true;
1172 pthread_cond_broadcast(&master
->cancel_cond
);
1176 * Cancel any events which have the specified argument.
1180 * @param m the thread_master to cancel from
1181 * @param arg the argument passed when creating the event
1183 void thread_cancel_event(struct thread_master
*master
, void *arg
)
1185 assert(master
->owner
== pthread_self());
1187 pthread_mutex_lock(&master
->mtx
);
1189 struct cancel_req
*cr
=
1190 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1192 listnode_add(master
->cancel_req
, cr
);
1193 do_thread_cancel(master
);
1195 pthread_mutex_unlock(&master
->mtx
);
1199 * Cancel a specific task.
1203 * @param thread task to cancel
1205 void thread_cancel(struct thread
*thread
)
1207 struct thread_master
*master
= thread
->master
;
1209 assert(master
->owner
== pthread_self());
1211 pthread_mutex_lock(&master
->mtx
);
1213 struct cancel_req
*cr
=
1214 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1215 cr
->thread
= thread
;
1216 listnode_add(master
->cancel_req
, cr
);
1217 do_thread_cancel(master
);
1219 pthread_mutex_unlock(&master
->mtx
);
1223 * Asynchronous cancellation.
1225 * Called with either a struct thread ** or void * to an event argument,
1226 * this function posts the correct cancellation request and blocks until it is
1229 * If the thread is currently running, execution blocks until it completes.
1231 * The last two parameters are mutually exclusive, i.e. if you pass one the
1232 * other must be NULL.
1234 * When the cancellation procedure executes on the target thread_master, the
1235 * thread * provided is checked for nullity. If it is null, the thread is
1236 * assumed to no longer exist and the cancellation request is a no-op. Thus
1237 * users of this API must pass a back-reference when scheduling the original
1242 * @param master the thread master with the relevant event / task
1243 * @param thread pointer to thread to cancel
1244 * @param eventobj the event
1246 void thread_cancel_async(struct thread_master
*master
, struct thread
**thread
,
1249 assert(!(thread
&& eventobj
) && (thread
|| eventobj
));
1250 assert(master
->owner
!= pthread_self());
1252 pthread_mutex_lock(&master
->mtx
);
1254 master
->canceled
= false;
1257 struct cancel_req
*cr
=
1258 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1259 cr
->threadref
= thread
;
1260 listnode_add(master
->cancel_req
, cr
);
1261 } else if (eventobj
) {
1262 struct cancel_req
*cr
=
1263 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1264 cr
->eventobj
= eventobj
;
1265 listnode_add(master
->cancel_req
, cr
);
1269 while (!master
->canceled
)
1270 pthread_cond_wait(&master
->cancel_cond
, &master
->mtx
);
1272 pthread_mutex_unlock(&master
->mtx
);
1274 /* ------------------------------------------------------------------------- */
1276 static struct timeval
*thread_timer_wait(struct pqueue
*queue
,
1277 struct timeval
*timer_val
)
1280 struct thread
*next_timer
= queue
->array
[0];
1281 monotime_until(&next_timer
->u
.sands
, timer_val
);
1287 static struct thread
*thread_run(struct thread_master
*m
, struct thread
*thread
,
1288 struct thread
*fetch
)
1291 thread_add_unuse(m
, thread
);
1295 static int thread_process_io_helper(struct thread_master
*m
,
1296 struct thread
*thread
, short state
, int pos
)
1298 struct thread
**thread_array
;
1303 if (thread
->type
== THREAD_READ
)
1304 thread_array
= m
->read
;
1306 thread_array
= m
->write
;
1308 thread_array
[thread
->u
.fd
] = NULL
;
1309 thread_list_add(&m
->ready
, thread
);
1310 thread
->type
= THREAD_READY
;
1311 /* if another pthread scheduled this file descriptor for the event we're
1312 * responding to, no problem; we're getting to it now */
1313 thread
->master
->handler
.pfds
[pos
].events
&= ~(state
);
1318 * Process I/O events.
1320 * Walks through file descriptor array looking for those pollfds whose .revents
1321 * field has something interesting. Deletes any invalid file descriptors.
1323 * @param m the thread master
1324 * @param num the number of active file descriptors (return value of poll())
1326 static void thread_process_io(struct thread_master
*m
, unsigned int num
)
1328 unsigned int ready
= 0;
1329 struct pollfd
*pfds
= m
->handler
.copy
;
1331 for (nfds_t i
= 0; i
< m
->handler
.copycount
&& ready
< num
; ++i
) {
1332 /* no event for current fd? immediately continue */
1333 if (pfds
[i
].revents
== 0)
1338 /* Unless someone has called thread_cancel from another pthread,
1340 * thing that could have changed in m->handler.pfds while we
1342 * asleep is the .events field in a given pollfd. Barring
1344 * that value should be a superset of the values we have in our
1346 * there's no need to update it. Similarily, barring deletion,
1348 * should still be a valid index into the master's pfds. */
1349 if (pfds
[i
].revents
& (POLLIN
| POLLHUP
))
1350 thread_process_io_helper(m
, m
->read
[pfds
[i
].fd
], POLLIN
,
1352 if (pfds
[i
].revents
& POLLOUT
)
1353 thread_process_io_helper(m
, m
->write
[pfds
[i
].fd
],
1356 /* if one of our file descriptors is garbage, remove the same
1358 * both pfds + update sizes and index */
1359 if (pfds
[i
].revents
& POLLNVAL
) {
1360 memmove(m
->handler
.pfds
+ i
, m
->handler
.pfds
+ i
+ 1,
1361 (m
->handler
.pfdcount
- i
- 1)
1362 * sizeof(struct pollfd
));
1363 m
->handler
.pfdcount
--;
1365 memmove(pfds
+ i
, pfds
+ i
+ 1,
1366 (m
->handler
.copycount
- i
- 1)
1367 * sizeof(struct pollfd
));
1368 m
->handler
.copycount
--;
1375 /* Add all timers that have popped to the ready list. */
1376 static unsigned int thread_process_timers(struct pqueue
*queue
,
1377 struct timeval
*timenow
)
1379 struct thread
*thread
;
1380 unsigned int ready
= 0;
1382 while (queue
->size
) {
1383 thread
= queue
->array
[0];
1384 if (timercmp(timenow
, &thread
->u
.sands
, <))
1386 pqueue_dequeue(queue
);
1387 thread
->type
= THREAD_READY
;
1388 thread_list_add(&thread
->master
->ready
, thread
);
1394 /* process a list en masse, e.g. for event thread lists */
1395 static unsigned int thread_process(struct thread_list
*list
)
1397 struct thread
*thread
;
1398 struct thread
*next
;
1399 unsigned int ready
= 0;
1401 for (thread
= list
->head
; thread
; thread
= next
) {
1402 next
= thread
->next
;
1403 thread_list_delete(list
, thread
);
1404 thread
->type
= THREAD_READY
;
1405 thread_list_add(&thread
->master
->ready
, thread
);
1412 /* Fetch next ready thread. */
1413 struct thread
*thread_fetch(struct thread_master
*m
, struct thread
*fetch
)
1415 struct thread
*thread
= NULL
;
1417 struct timeval zerotime
= {0, 0};
1419 struct timeval
*tw
= NULL
;
1424 /* Handle signals if any */
1425 if (m
->handle_signals
)
1426 quagga_sigevent_process();
1428 pthread_mutex_lock(&m
->mtx
);
1430 /* Process any pending cancellation requests */
1431 do_thread_cancel(m
);
1434 * Attempt to flush ready queue before going into poll().
1435 * This is performance-critical. Think twice before modifying.
1437 if ((thread
= thread_trim_head(&m
->ready
))) {
1438 fetch
= thread_run(m
, thread
, fetch
);
1441 pthread_mutex_unlock(&m
->mtx
);
1445 /* otherwise, tick through scheduling sequence */
1448 * Post events to ready queue. This must come before the
1449 * following block since events should occur immediately
1451 thread_process(&m
->event
);
1454 * If there are no tasks on the ready queue, we will poll()
1455 * until a timer expires or we receive I/O, whichever comes
1456 * first. The strategy for doing this is:
1458 * - If there are events pending, set the poll() timeout to zero
1459 * - If there are no events pending, but there are timers
1461 * timeout to the smallest remaining time on any timer
1462 * - If there are neither timers nor events pending, but there
1464 * descriptors pending, block indefinitely in poll()
1465 * - If nothing is pending, it's time for the application to die
1467 * In every case except the last, we need to hit poll() at least
1468 * once per loop to avoid starvation by events
1470 if (m
->ready
.count
== 0)
1471 tw
= thread_timer_wait(m
->timer
, &tv
);
1473 if (m
->ready
.count
!= 0 || (tw
&& !timercmp(tw
, &zerotime
, >)))
1476 if (!tw
&& m
->handler
.pfdcount
== 0) { /* die */
1477 pthread_mutex_unlock(&m
->mtx
);
1483 * Copy pollfd array + # active pollfds in it. Not necessary to
1484 * copy the array size as this is fixed.
1486 m
->handler
.copycount
= m
->handler
.pfdcount
;
1487 memcpy(m
->handler
.copy
, m
->handler
.pfds
,
1488 m
->handler
.copycount
* sizeof(struct pollfd
));
1490 pthread_mutex_unlock(&m
->mtx
);
1492 num
= fd_poll(m
, m
->handler
.copy
, m
->handler
.pfdsize
,
1493 m
->handler
.copycount
, tw
);
1495 pthread_mutex_lock(&m
->mtx
);
1497 /* Handle any errors received in poll() */
1499 if (errno
== EINTR
) {
1500 pthread_mutex_unlock(&m
->mtx
);
1501 /* loop around to signal handler */
1506 flog_err(EC_LIB_SYSTEM_CALL
, "poll() error: %s",
1507 safe_strerror(errno
));
1508 pthread_mutex_unlock(&m
->mtx
);
1513 /* Post timers to ready queue. */
1515 thread_process_timers(m
->timer
, &now
);
1517 /* Post I/O to ready queue. */
1519 thread_process_io(m
, num
);
1521 pthread_mutex_unlock(&m
->mtx
);
1523 } while (!thread
&& m
->spin
);
1528 static unsigned long timeval_elapsed(struct timeval a
, struct timeval b
)
1530 return (((a
.tv_sec
- b
.tv_sec
) * TIMER_SECOND_MICRO
)
1531 + (a
.tv_usec
- b
.tv_usec
));
1534 unsigned long thread_consumed_time(RUSAGE_T
*now
, RUSAGE_T
*start
,
1535 unsigned long *cputime
)
1537 /* This is 'user + sys' time. */
1538 *cputime
= timeval_elapsed(now
->cpu
.ru_utime
, start
->cpu
.ru_utime
)
1539 + timeval_elapsed(now
->cpu
.ru_stime
, start
->cpu
.ru_stime
);
1540 return timeval_elapsed(now
->real
, start
->real
);
1543 /* We should aim to yield after yield milliseconds, which defaults
1544 to THREAD_YIELD_TIME_SLOT .
1545 Note: we are using real (wall clock) time for this calculation.
1546 It could be argued that CPU time may make more sense in certain
1547 contexts. The things to consider are whether the thread may have
1548 blocked (in which case wall time increases, but CPU time does not),
1549 or whether the system is heavily loaded with other processes competing
1550 for CPU time. On balance, wall clock time seems to make sense.
1551 Plus it has the added benefit that gettimeofday should be faster
1552 than calling getrusage. */
1553 int thread_should_yield(struct thread
*thread
)
1556 pthread_mutex_lock(&thread
->mtx
);
1558 result
= monotime_since(&thread
->real
, NULL
)
1559 > (int64_t)thread
->yield
;
1561 pthread_mutex_unlock(&thread
->mtx
);
1565 void thread_set_yield_time(struct thread
*thread
, unsigned long yield_time
)
1567 pthread_mutex_lock(&thread
->mtx
);
1569 thread
->yield
= yield_time
;
1571 pthread_mutex_unlock(&thread
->mtx
);
1574 void thread_getrusage(RUSAGE_T
*r
)
1576 #if defined RUSAGE_THREAD
1577 #define FRR_RUSAGE RUSAGE_THREAD
1579 #define FRR_RUSAGE RUSAGE_SELF
1582 getrusage(FRR_RUSAGE
, &(r
->cpu
));
1588 * This function will atomically update the thread's usage history. At present
1589 * this is the only spot where usage history is written. Nevertheless the code
1590 * has been written such that the introduction of writers in the future should
1591 * not need to update it provided the writers atomically perform only the
1592 * operations done here, i.e. updating the total and maximum times. In
1593 * particular, the maximum real and cpu times must be monotonically increasing
1594 * or this code is not correct.
1596 void thread_call(struct thread
*thread
)
1598 _Atomic
unsigned long realtime
, cputime
;
1600 unsigned long helper
;
1601 RUSAGE_T before
, after
;
1604 thread
->real
= before
.real
;
1606 pthread_setspecific(thread_current
, thread
);
1607 (*thread
->func
)(thread
);
1608 pthread_setspecific(thread_current
, NULL
);
1612 realtime
= thread_consumed_time(&after
, &before
, &helper
);
1615 /* update realtime */
1616 atomic_fetch_add_explicit(&thread
->hist
->real
.total
, realtime
,
1617 memory_order_seq_cst
);
1618 exp
= atomic_load_explicit(&thread
->hist
->real
.max
,
1619 memory_order_seq_cst
);
1620 while (exp
< realtime
1621 && !atomic_compare_exchange_weak_explicit(
1622 &thread
->hist
->real
.max
, &exp
, realtime
,
1623 memory_order_seq_cst
, memory_order_seq_cst
))
1626 /* update cputime */
1627 atomic_fetch_add_explicit(&thread
->hist
->cpu
.total
, cputime
,
1628 memory_order_seq_cst
);
1629 exp
= atomic_load_explicit(&thread
->hist
->cpu
.max
,
1630 memory_order_seq_cst
);
1631 while (exp
< cputime
1632 && !atomic_compare_exchange_weak_explicit(
1633 &thread
->hist
->cpu
.max
, &exp
, cputime
,
1634 memory_order_seq_cst
, memory_order_seq_cst
))
1637 atomic_fetch_add_explicit(&thread
->hist
->total_calls
, 1,
1638 memory_order_seq_cst
);
1639 atomic_fetch_or_explicit(&thread
->hist
->types
, 1 << thread
->add_type
,
1640 memory_order_seq_cst
);
1642 #ifdef CONSUMED_TIME_CHECK
1643 if (realtime
> CONSUMED_TIME_CHECK
) {
1645 * We have a CPU Hog on our hands.
1646 * Whinge about it now, so we're aware this is yet another task
1651 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1652 thread
->funcname
, (unsigned long)thread
->func
,
1653 realtime
/ 1000, cputime
/ 1000);
1655 #endif /* CONSUMED_TIME_CHECK */
1658 /* Execute thread */
1659 void funcname_thread_execute(struct thread_master
*m
,
1660 int (*func
)(struct thread
*), void *arg
, int val
,
1663 struct thread
*thread
;
1665 /* Get or allocate new thread to execute. */
1666 pthread_mutex_lock(&m
->mtx
);
1668 thread
= thread_get(m
, THREAD_EVENT
, func
, arg
, debugargpass
);
1670 /* Set its event value. */
1671 pthread_mutex_lock(&thread
->mtx
);
1673 thread
->add_type
= THREAD_EXECUTE
;
1674 thread
->u
.val
= val
;
1675 thread
->ref
= &thread
;
1677 pthread_mutex_unlock(&thread
->mtx
);
1679 pthread_mutex_unlock(&m
->mtx
);
1681 /* Execute thread doing all accounting. */
1682 thread_call(thread
);
1684 /* Give back or free thread. */
1685 thread_add_unuse(m
, thread
);