1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
4 * This file is part of GNU Zebra.
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 #include <sys/resource.h>
35 #include "frratomic.h"
37 DEFINE_MTYPE_STATIC(LIB
, THREAD
, "Thread")
38 DEFINE_MTYPE_STATIC(LIB
, THREAD_MASTER
, "Thread master")
39 DEFINE_MTYPE_STATIC(LIB
, THREAD_STATS
, "Thread stats")
41 #if defined(__APPLE__)
42 #include <mach/mach.h>
43 #include <mach/mach_time.h>
48 static unsigned char wakebyte = 0x01; \
49 write(m->io_pipe[1], &wakebyte, 1); \
52 /* control variable for initializer */
53 pthread_once_t init_once
= PTHREAD_ONCE_INIT
;
54 pthread_key_t thread_current
;
56 pthread_mutex_t masters_mtx
= PTHREAD_MUTEX_INITIALIZER
;
57 static struct list
*masters
;
60 /* CLI start ---------------------------------------------------------------- */
61 static unsigned int cpu_record_hash_key(struct cpu_thread_history
*a
)
63 int size
= sizeof(a
->func
);
65 return jhash(&a
->func
, size
, 0);
68 static int cpu_record_hash_cmp(const struct cpu_thread_history
*a
,
69 const struct cpu_thread_history
*b
)
71 return a
->func
== b
->func
;
74 static void *cpu_record_hash_alloc(struct cpu_thread_history
*a
)
76 struct cpu_thread_history
*new;
77 new = XCALLOC(MTYPE_THREAD_STATS
, sizeof(struct cpu_thread_history
));
79 new->funcname
= a
->funcname
;
83 static void cpu_record_hash_free(void *a
)
85 struct cpu_thread_history
*hist
= a
;
87 XFREE(MTYPE_THREAD_STATS
, hist
);
90 static void vty_out_cpu_thread_history(struct vty
*vty
,
91 struct cpu_thread_history
*a
)
93 vty_out(vty
, "%5d %10lu.%03lu %9u %8lu %9lu %8lu %9lu", a
->total_active
,
94 a
->cpu
.total
/ 1000, a
->cpu
.total
% 1000, a
->total_calls
,
95 a
->cpu
.total
/ a
->total_calls
, a
->cpu
.max
,
96 a
->real
.total
/ a
->total_calls
, a
->real
.max
);
97 vty_out(vty
, " %c%c%c%c%c %s\n",
98 a
->types
& (1 << THREAD_READ
) ? 'R' : ' ',
99 a
->types
& (1 << THREAD_WRITE
) ? 'W' : ' ',
100 a
->types
& (1 << THREAD_TIMER
) ? 'T' : ' ',
101 a
->types
& (1 << THREAD_EVENT
) ? 'E' : ' ',
102 a
->types
& (1 << THREAD_EXECUTE
) ? 'X' : ' ', a
->funcname
);
105 static void cpu_record_hash_print(struct hash_backet
*bucket
, void *args
[])
107 struct cpu_thread_history
*totals
= args
[0];
108 struct cpu_thread_history copy
;
109 struct vty
*vty
= args
[1];
110 uint8_t *filter
= args
[2];
112 struct cpu_thread_history
*a
= bucket
->data
;
115 atomic_load_explicit(&a
->total_active
, memory_order_seq_cst
);
117 atomic_load_explicit(&a
->total_calls
, memory_order_seq_cst
);
119 atomic_load_explicit(&a
->cpu
.total
, memory_order_seq_cst
);
120 copy
.cpu
.max
= atomic_load_explicit(&a
->cpu
.max
, memory_order_seq_cst
);
122 atomic_load_explicit(&a
->real
.total
, memory_order_seq_cst
);
124 atomic_load_explicit(&a
->real
.max
, memory_order_seq_cst
);
125 copy
.types
= atomic_load_explicit(&a
->types
, memory_order_seq_cst
);
126 copy
.funcname
= a
->funcname
;
128 if (!(copy
.types
& *filter
))
131 vty_out_cpu_thread_history(vty
, ©
);
132 totals
->total_active
+= copy
.total_active
;
133 totals
->total_calls
+= copy
.total_calls
;
134 totals
->real
.total
+= copy
.real
.total
;
135 if (totals
->real
.max
< copy
.real
.max
)
136 totals
->real
.max
= copy
.real
.max
;
137 totals
->cpu
.total
+= copy
.cpu
.total
;
138 if (totals
->cpu
.max
< copy
.cpu
.max
)
139 totals
->cpu
.max
= copy
.cpu
.max
;
142 static void cpu_record_print(struct vty
*vty
, uint8_t filter
)
144 struct cpu_thread_history tmp
;
145 void *args
[3] = {&tmp
, vty
, &filter
};
146 struct thread_master
*m
;
149 memset(&tmp
, 0, sizeof tmp
);
150 tmp
.funcname
= "TOTAL";
153 pthread_mutex_lock(&masters_mtx
);
155 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
156 const char *name
= m
->name
? m
->name
: "main";
158 char underline
[strlen(name
) + 1];
159 memset(underline
, '-', sizeof(underline
));
160 underline
[sizeof(underline
) - 1] = '\0';
163 vty_out(vty
, "Showing statistics for pthread %s\n",
165 vty_out(vty
, "-------------------------------%s\n",
167 vty_out(vty
, "%21s %18s %18s\n", "",
168 "CPU (user+system):", "Real (wall-clock):");
170 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
171 vty_out(vty
, " Avg uSec Max uSecs");
172 vty_out(vty
, " Type Thread\n");
174 if (m
->cpu_record
->count
)
177 (void (*)(struct hash_backet
*,
178 void *))cpu_record_hash_print
,
181 vty_out(vty
, "No data to display yet.\n");
186 pthread_mutex_unlock(&masters_mtx
);
189 vty_out(vty
, "Total thread statistics\n");
190 vty_out(vty
, "-------------------------\n");
191 vty_out(vty
, "%21s %18s %18s\n", "",
192 "CPU (user+system):", "Real (wall-clock):");
193 vty_out(vty
, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
194 vty_out(vty
, " Avg uSec Max uSecs");
195 vty_out(vty
, " Type Thread\n");
197 if (tmp
.total_calls
> 0)
198 vty_out_cpu_thread_history(vty
, &tmp
);
201 static void cpu_record_hash_clear(struct hash_backet
*bucket
, void *args
[])
203 uint8_t *filter
= args
[0];
204 struct hash
*cpu_record
= args
[1];
206 struct cpu_thread_history
*a
= bucket
->data
;
208 if (!(a
->types
& *filter
))
211 hash_release(cpu_record
, bucket
->data
);
214 static void cpu_record_clear(uint8_t filter
)
216 uint8_t *tmp
= &filter
;
217 struct thread_master
*m
;
220 pthread_mutex_lock(&masters_mtx
);
222 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
223 pthread_mutex_lock(&m
->mtx
);
225 void *args
[2] = {tmp
, m
->cpu_record
};
228 (void (*)(struct hash_backet
*,
229 void *))cpu_record_hash_clear
,
232 pthread_mutex_unlock(&m
->mtx
);
235 pthread_mutex_unlock(&masters_mtx
);
238 static uint8_t parse_filter(const char *filterstr
)
243 while (filterstr
[i
] != '\0') {
244 switch (filterstr
[i
]) {
247 filter
|= (1 << THREAD_READ
);
251 filter
|= (1 << THREAD_WRITE
);
255 filter
|= (1 << THREAD_TIMER
);
259 filter
|= (1 << THREAD_EVENT
);
263 filter
|= (1 << THREAD_EXECUTE
);
273 DEFUN (show_thread_cpu
,
275 "show thread cpu [FILTER]",
277 "Thread information\n"
279 "Display filter (rwtexb)\n")
281 uint8_t filter
= (uint8_t)-1U;
284 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
285 filter
= parse_filter(argv
[idx
]->arg
);
288 "Invalid filter \"%s\" specified; must contain at least"
295 cpu_record_print(vty
, filter
);
299 static void show_thread_poll_helper(struct vty
*vty
, struct thread_master
*m
)
301 const char *name
= m
->name
? m
->name
: "main";
302 char underline
[strlen(name
) + 1];
305 memset(underline
, '-', sizeof(underline
));
306 underline
[sizeof(underline
) - 1] = '\0';
308 vty_out(vty
, "\nShowing poll FD's for %s\n", name
);
309 vty_out(vty
, "----------------------%s\n", underline
);
310 vty_out(vty
, "Count: %u\n", (uint32_t)m
->handler
.pfdcount
);
311 for (i
= 0; i
< m
->handler
.pfdcount
; i
++)
312 vty_out(vty
, "\t%6d fd:%6d events:%2d revents:%2d\n", i
,
313 m
->handler
.pfds
[i
].fd
,
314 m
->handler
.pfds
[i
].events
,
315 m
->handler
.pfds
[i
].revents
);
318 DEFUN (show_thread_poll
,
319 show_thread_poll_cmd
,
322 "Thread information\n"
323 "Show poll FD's and information\n")
325 struct listnode
*node
;
326 struct thread_master
*m
;
328 pthread_mutex_lock(&masters_mtx
);
330 for (ALL_LIST_ELEMENTS_RO(masters
, node
, m
)) {
331 show_thread_poll_helper(vty
, m
);
334 pthread_mutex_unlock(&masters_mtx
);
340 DEFUN (clear_thread_cpu
,
341 clear_thread_cpu_cmd
,
342 "clear thread cpu [FILTER]",
343 "Clear stored data in all pthreads\n"
344 "Thread information\n"
346 "Display filter (rwtexb)\n")
348 uint8_t filter
= (uint8_t)-1U;
351 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
352 filter
= parse_filter(argv
[idx
]->arg
);
355 "Invalid filter \"%s\" specified; must contain at least"
362 cpu_record_clear(filter
);
366 void thread_cmd_init(void)
368 install_element(VIEW_NODE
, &show_thread_cpu_cmd
);
369 install_element(VIEW_NODE
, &show_thread_poll_cmd
);
370 install_element(ENABLE_NODE
, &clear_thread_cpu_cmd
);
372 /* CLI end ------------------------------------------------------------------ */
375 static int thread_timer_cmp(void *a
, void *b
)
377 struct thread
*thread_a
= a
;
378 struct thread
*thread_b
= b
;
380 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, <))
382 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, >))
387 static void thread_timer_update(void *node
, int actual_position
)
389 struct thread
*thread
= node
;
391 thread
->index
= actual_position
;
394 static void cancelreq_del(void *cr
)
396 XFREE(MTYPE_TMP
, cr
);
399 /* initializer, only ever called once */
400 static void initializer()
402 pthread_key_create(&thread_current
, NULL
);
405 struct thread_master
*thread_master_create(const char *name
)
407 struct thread_master
*rv
;
410 pthread_once(&init_once
, &initializer
);
412 rv
= XCALLOC(MTYPE_THREAD_MASTER
, sizeof(struct thread_master
));
416 /* Initialize master mutex */
417 pthread_mutex_init(&rv
->mtx
, NULL
);
418 pthread_cond_init(&rv
->cancel_cond
, NULL
);
421 rv
->name
= name
? XSTRDUP(MTYPE_THREAD_MASTER
, name
) : NULL
;
423 /* Initialize I/O task data structures */
424 getrlimit(RLIMIT_NOFILE
, &limit
);
425 rv
->fd_limit
= (int)limit
.rlim_cur
;
427 XCALLOC(MTYPE_THREAD
, sizeof(struct thread
*) * rv
->fd_limit
);
428 if (rv
->read
== NULL
) {
429 XFREE(MTYPE_THREAD_MASTER
, rv
);
433 XCALLOC(MTYPE_THREAD
, sizeof(struct thread
*) * rv
->fd_limit
);
434 if (rv
->write
== NULL
) {
435 XFREE(MTYPE_THREAD
, rv
->read
);
436 XFREE(MTYPE_THREAD_MASTER
, rv
);
440 rv
->cpu_record
= hash_create_size(
441 8, (unsigned int (*)(void *))cpu_record_hash_key
,
442 (int (*)(const void *, const void *))cpu_record_hash_cmp
,
446 /* Initialize the timer queues */
447 rv
->timer
= pqueue_create();
448 rv
->timer
->cmp
= thread_timer_cmp
;
449 rv
->timer
->update
= thread_timer_update
;
451 /* Initialize thread_fetch() settings */
453 rv
->handle_signals
= true;
455 /* Set pthread owner, should be updated by actual owner */
456 rv
->owner
= pthread_self();
457 rv
->cancel_req
= list_new();
458 rv
->cancel_req
->del
= cancelreq_del
;
461 /* Initialize pipe poker */
463 set_nonblocking(rv
->io_pipe
[0]);
464 set_nonblocking(rv
->io_pipe
[1]);
466 /* Initialize data structures for poll() */
467 rv
->handler
.pfdsize
= rv
->fd_limit
;
468 rv
->handler
.pfdcount
= 0;
469 rv
->handler
.pfds
= XCALLOC(MTYPE_THREAD_MASTER
,
470 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
471 rv
->handler
.copy
= XCALLOC(MTYPE_THREAD_MASTER
,
472 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
474 /* add to list of threadmasters */
475 pthread_mutex_lock(&masters_mtx
);
478 masters
= list_new();
480 listnode_add(masters
, rv
);
482 pthread_mutex_unlock(&masters_mtx
);
487 void thread_master_set_name(struct thread_master
*master
, const char *name
)
489 pthread_mutex_lock(&master
->mtx
);
492 XFREE(MTYPE_THREAD_MASTER
, master
->name
);
493 master
->name
= XSTRDUP(MTYPE_THREAD_MASTER
, name
);
495 pthread_mutex_unlock(&master
->mtx
);
498 /* Add a new thread to the list. */
499 static void thread_list_add(struct thread_list
*list
, struct thread
*thread
)
502 thread
->prev
= list
->tail
;
504 list
->tail
->next
= thread
;
511 /* Delete a thread from the list. */
512 static struct thread
*thread_list_delete(struct thread_list
*list
,
513 struct thread
*thread
)
516 thread
->next
->prev
= thread
->prev
;
518 list
->tail
= thread
->prev
;
520 thread
->prev
->next
= thread
->next
;
522 list
->head
= thread
->next
;
523 thread
->next
= thread
->prev
= NULL
;
528 /* Thread list is empty or not. */
529 static int thread_empty(struct thread_list
*list
)
531 return list
->head
? 0 : 1;
534 /* Delete top of the list and return it. */
535 static struct thread
*thread_trim_head(struct thread_list
*list
)
537 if (!thread_empty(list
))
538 return thread_list_delete(list
, list
->head
);
542 /* Move thread to unuse list. */
543 static void thread_add_unuse(struct thread_master
*m
, struct thread
*thread
)
545 assert(m
!= NULL
&& thread
!= NULL
);
546 assert(thread
->next
== NULL
);
547 assert(thread
->prev
== NULL
);
550 thread
->type
= THREAD_UNUSED
;
551 thread
->hist
->total_active
--;
552 thread_list_add(&m
->unuse
, thread
);
555 /* Free all unused thread. */
556 static void thread_list_free(struct thread_master
*m
, struct thread_list
*list
)
561 for (t
= list
->head
; t
; t
= next
) {
563 XFREE(MTYPE_THREAD
, t
);
569 static void thread_array_free(struct thread_master
*m
,
570 struct thread
**thread_array
)
575 for (index
= 0; index
< m
->fd_limit
; ++index
) {
576 t
= thread_array
[index
];
578 thread_array
[index
] = NULL
;
579 XFREE(MTYPE_THREAD
, t
);
583 XFREE(MTYPE_THREAD
, thread_array
);
586 static void thread_queue_free(struct thread_master
*m
, struct pqueue
*queue
)
590 for (i
= 0; i
< queue
->size
; i
++)
591 XFREE(MTYPE_THREAD
, queue
->array
[i
]);
593 m
->alloc
-= queue
->size
;
594 pqueue_delete(queue
);
598 * thread_master_free_unused
600 * As threads are finished with they are put on the
601 * unuse list for later reuse.
602 * If we are shutting down, Free up unused threads
603 * So we can see if we forget to shut anything off
605 void thread_master_free_unused(struct thread_master
*m
)
607 pthread_mutex_lock(&m
->mtx
);
610 while ((t
= thread_trim_head(&m
->unuse
)) != NULL
) {
611 pthread_mutex_destroy(&t
->mtx
);
612 XFREE(MTYPE_THREAD
, t
);
615 pthread_mutex_unlock(&m
->mtx
);
618 /* Stop thread scheduler. */
619 void thread_master_free(struct thread_master
*m
)
621 pthread_mutex_lock(&masters_mtx
);
623 listnode_delete(masters
, m
);
624 if (masters
->count
== 0) {
625 list_delete_and_null(&masters
);
628 pthread_mutex_unlock(&masters_mtx
);
630 thread_array_free(m
, m
->read
);
631 thread_array_free(m
, m
->write
);
632 thread_queue_free(m
, m
->timer
);
633 thread_list_free(m
, &m
->event
);
634 thread_list_free(m
, &m
->ready
);
635 thread_list_free(m
, &m
->unuse
);
636 pthread_mutex_destroy(&m
->mtx
);
637 pthread_cond_destroy(&m
->cancel_cond
);
638 close(m
->io_pipe
[0]);
639 close(m
->io_pipe
[1]);
640 list_delete_and_null(&m
->cancel_req
);
641 m
->cancel_req
= NULL
;
643 hash_clean(m
->cpu_record
, cpu_record_hash_free
);
644 hash_free(m
->cpu_record
);
645 m
->cpu_record
= NULL
;
648 XFREE(MTYPE_THREAD_MASTER
, m
->name
);
649 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.pfds
);
650 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.copy
);
651 XFREE(MTYPE_THREAD_MASTER
, m
);
654 /* Return remain time in second. */
655 unsigned long thread_timer_remain_second(struct thread
*thread
)
659 pthread_mutex_lock(&thread
->mtx
);
661 remain
= monotime_until(&thread
->u
.sands
, NULL
) / 1000000LL;
663 pthread_mutex_unlock(&thread
->mtx
);
665 return remain
< 0 ? 0 : remain
;
668 #define debugargdef const char *funcname, const char *schedfrom, int fromln
669 #define debugargpass funcname, schedfrom, fromln
671 struct timeval
thread_timer_remain(struct thread
*thread
)
673 struct timeval remain
;
674 pthread_mutex_lock(&thread
->mtx
);
676 monotime_until(&thread
->u
.sands
, &remain
);
678 pthread_mutex_unlock(&thread
->mtx
);
682 /* Get new thread. */
683 static struct thread
*thread_get(struct thread_master
*m
, uint8_t type
,
684 int (*func
)(struct thread
*), void *arg
,
687 struct thread
*thread
= thread_trim_head(&m
->unuse
);
688 struct cpu_thread_history tmp
;
691 thread
= XCALLOC(MTYPE_THREAD
, sizeof(struct thread
));
692 /* mutex only needs to be initialized at struct creation. */
693 pthread_mutex_init(&thread
->mtx
, NULL
);
698 thread
->add_type
= type
;
702 thread
->yield
= THREAD_YIELD_TIME_SLOT
; /* default */
706 * So if the passed in funcname is not what we have
707 * stored that means the thread->hist needs to be
708 * updated. We keep the last one around in unused
709 * under the assumption that we are probably
710 * going to immediately allocate the same
712 * This hopefully saves us some serious
715 if (thread
->funcname
!= funcname
|| thread
->func
!= func
) {
717 tmp
.funcname
= funcname
;
719 hash_get(m
->cpu_record
, &tmp
,
720 (void *(*)(void *))cpu_record_hash_alloc
);
722 thread
->hist
->total_active
++;
724 thread
->funcname
= funcname
;
725 thread
->schedfrom
= schedfrom
;
726 thread
->schedfrom_line
= fromln
;
731 static int fd_poll(struct thread_master
*m
, struct pollfd
*pfds
, nfds_t pfdsize
,
732 nfds_t count
, const struct timeval
*timer_wait
)
734 /* If timer_wait is null here, that means poll() should block
736 * unless the thread_master has overriden it by setting
737 * ->selectpoll_timeout.
738 * If the value is positive, it specifies the maximum number of
740 * to wait. If the timeout is -1, it specifies that we should never wait
742 * always return immediately even if no event is detected. If the value
744 * zero, the behavior is default. */
747 /* number of file descriptors with events */
750 if (timer_wait
!= NULL
751 && m
->selectpoll_timeout
== 0) // use the default value
752 timeout
= (timer_wait
->tv_sec
* 1000)
753 + (timer_wait
->tv_usec
/ 1000);
754 else if (m
->selectpoll_timeout
> 0) // use the user's timeout
755 timeout
= m
->selectpoll_timeout
;
756 else if (m
->selectpoll_timeout
757 < 0) // effect a poll (return immediately)
760 /* add poll pipe poker */
761 assert(count
+ 1 < pfdsize
);
762 pfds
[count
].fd
= m
->io_pipe
[0];
763 pfds
[count
].events
= POLLIN
;
764 pfds
[count
].revents
= 0x00;
766 num
= poll(pfds
, count
+ 1, timeout
);
768 unsigned char trash
[64];
769 if (num
> 0 && pfds
[count
].revents
!= 0 && num
--)
770 while (read(m
->io_pipe
[0], &trash
, sizeof(trash
)) > 0)
776 /* Add new read thread. */
777 struct thread
*funcname_thread_add_read_write(int dir
, struct thread_master
*m
,
778 int (*func
)(struct thread
*),
780 struct thread
**t_ptr
,
783 struct thread
*thread
= NULL
;
785 pthread_mutex_lock(&m
->mtx
);
788 && *t_ptr
) // thread is already scheduled; don't reschedule
790 pthread_mutex_unlock(&m
->mtx
);
794 /* default to a new pollfd */
795 nfds_t queuepos
= m
->handler
.pfdcount
;
797 /* if we already have a pollfd for our file descriptor, find and
799 for (nfds_t i
= 0; i
< m
->handler
.pfdcount
; i
++)
800 if (m
->handler
.pfds
[i
].fd
== fd
) {
805 /* make sure we have room for this fd + pipe poker fd */
806 assert(queuepos
+ 1 < m
->handler
.pfdsize
);
808 thread
= thread_get(m
, dir
, func
, arg
, debugargpass
);
810 m
->handler
.pfds
[queuepos
].fd
= fd
;
811 m
->handler
.pfds
[queuepos
].events
|=
812 (dir
== THREAD_READ
? POLLIN
: POLLOUT
);
814 if (queuepos
== m
->handler
.pfdcount
)
815 m
->handler
.pfdcount
++;
818 pthread_mutex_lock(&thread
->mtx
);
821 if (dir
== THREAD_READ
)
822 m
->read
[thread
->u
.fd
] = thread
;
824 m
->write
[thread
->u
.fd
] = thread
;
826 pthread_mutex_unlock(&thread
->mtx
);
836 pthread_mutex_unlock(&m
->mtx
);
841 static struct thread
*
842 funcname_thread_add_timer_timeval(struct thread_master
*m
,
843 int (*func
)(struct thread
*), int type
,
844 void *arg
, struct timeval
*time_relative
,
845 struct thread
**t_ptr
, debugargdef
)
847 struct thread
*thread
;
848 struct pqueue
*queue
;
852 assert(type
== THREAD_TIMER
);
853 assert(time_relative
);
855 pthread_mutex_lock(&m
->mtx
);
858 && *t_ptr
) // thread is already scheduled; don't reschedule
860 pthread_mutex_unlock(&m
->mtx
);
865 thread
= thread_get(m
, type
, func
, arg
, debugargpass
);
867 pthread_mutex_lock(&thread
->mtx
);
869 monotime(&thread
->u
.sands
);
870 timeradd(&thread
->u
.sands
, time_relative
,
872 pqueue_enqueue(thread
, queue
);
878 pthread_mutex_unlock(&thread
->mtx
);
882 pthread_mutex_unlock(&m
->mtx
);
888 /* Add timer event thread. */
889 struct thread
*funcname_thread_add_timer(struct thread_master
*m
,
890 int (*func
)(struct thread
*),
891 void *arg
, long timer
,
892 struct thread
**t_ptr
, debugargdef
)
901 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
902 &trel
, t_ptr
, debugargpass
);
905 /* Add timer event thread with "millisecond" resolution */
906 struct thread
*funcname_thread_add_timer_msec(struct thread_master
*m
,
907 int (*func
)(struct thread
*),
908 void *arg
, long timer
,
909 struct thread
**t_ptr
,
916 trel
.tv_sec
= timer
/ 1000;
917 trel
.tv_usec
= 1000 * (timer
% 1000);
919 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
920 &trel
, t_ptr
, debugargpass
);
923 /* Add timer event thread with "millisecond" resolution */
924 struct thread
*funcname_thread_add_timer_tv(struct thread_master
*m
,
925 int (*func
)(struct thread
*),
926 void *arg
, struct timeval
*tv
,
927 struct thread
**t_ptr
, debugargdef
)
929 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
, tv
,
930 t_ptr
, debugargpass
);
933 /* Add simple event thread. */
934 struct thread
*funcname_thread_add_event(struct thread_master
*m
,
935 int (*func
)(struct thread
*),
937 struct thread
**t_ptr
, debugargdef
)
939 struct thread
*thread
;
943 pthread_mutex_lock(&m
->mtx
);
946 && *t_ptr
) // thread is already scheduled; don't reschedule
948 pthread_mutex_unlock(&m
->mtx
);
952 thread
= thread_get(m
, THREAD_EVENT
, func
, arg
, debugargpass
);
953 pthread_mutex_lock(&thread
->mtx
);
956 thread_list_add(&m
->event
, thread
);
958 pthread_mutex_unlock(&thread
->mtx
);
967 pthread_mutex_unlock(&m
->mtx
);
972 /* Thread cancellation ------------------------------------------------------ */
975 * NOT's out the .events field of pollfd corresponding to the given file
976 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
978 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
979 * implementation for details.
983 * @param state the event to cancel. One or more (OR'd together) of the
988 static void thread_cancel_rw(struct thread_master
*master
, int fd
, short state
)
992 /* Cancel POLLHUP too just in case some bozo set it */
995 /* find the index of corresponding pollfd */
998 for (i
= 0; i
< master
->handler
.pfdcount
; i
++)
999 if (master
->handler
.pfds
[i
].fd
== fd
) {
1006 "[!] Received cancellation request for nonexistent rw job");
1007 zlog_debug("[!] threadmaster: %s | fd: %d",
1008 master
->name
? master
->name
: "", fd
);
1012 /* NOT out event. */
1013 master
->handler
.pfds
[i
].events
&= ~(state
);
1015 /* If all events are canceled, delete / resize the pollfd array. */
1016 if (master
->handler
.pfds
[i
].events
== 0) {
1017 memmove(master
->handler
.pfds
+ i
, master
->handler
.pfds
+ i
+ 1,
1018 (master
->handler
.pfdcount
- i
- 1)
1019 * sizeof(struct pollfd
));
1020 master
->handler
.pfdcount
--;
1023 /* If we have the same pollfd in the copy, perform the same operations,
1024 * otherwise return. */
1025 if (i
>= master
->handler
.copycount
)
1028 master
->handler
.copy
[i
].events
&= ~(state
);
1030 if (master
->handler
.copy
[i
].events
== 0) {
1031 memmove(master
->handler
.copy
+ i
, master
->handler
.copy
+ i
+ 1,
1032 (master
->handler
.copycount
- i
- 1)
1033 * sizeof(struct pollfd
));
1034 master
->handler
.copycount
--;
1039 * Process cancellation requests.
1041 * This may only be run from the pthread which owns the thread_master.
1043 * @param master the thread master to process
1044 * @REQUIRE master->mtx
1046 static void do_thread_cancel(struct thread_master
*master
)
1048 struct thread_list
*list
= NULL
;
1049 struct pqueue
*queue
= NULL
;
1050 struct thread
**thread_array
= NULL
;
1051 struct thread
*thread
;
1053 struct cancel_req
*cr
;
1054 struct listnode
*ln
;
1055 for (ALL_LIST_ELEMENTS_RO(master
->cancel_req
, ln
, cr
)) {
1056 /* If this is an event object cancellation, linear search
1058 * list deleting any events which have the specified argument.
1060 * need to check every thread in the ready queue. */
1063 thread
= master
->event
.head
;
1069 if (t
->arg
== cr
->eventobj
) {
1070 thread_list_delete(&master
->event
, t
);
1073 thread_add_unuse(master
, t
);
1077 thread
= master
->ready
.head
;
1082 if (t
->arg
== cr
->eventobj
) {
1083 thread_list_delete(&master
->ready
, t
);
1086 thread_add_unuse(master
, t
);
1092 /* The pointer varies depending on whether the cancellation
1094 * made asynchronously or not. If it was, we need to check
1096 * thread even exists anymore before cancelling it. */
1097 thread
= (cr
->thread
) ? cr
->thread
: *cr
->threadref
;
1102 /* Determine the appropriate queue to cancel the thread from */
1103 switch (thread
->type
) {
1105 thread_cancel_rw(master
, thread
->u
.fd
, POLLIN
);
1106 thread_array
= master
->read
;
1109 thread_cancel_rw(master
, thread
->u
.fd
, POLLOUT
);
1110 thread_array
= master
->write
;
1113 queue
= master
->timer
;
1116 list
= &master
->event
;
1119 list
= &master
->ready
;
1127 assert(thread
->index
>= 0);
1128 assert(thread
== queue
->array
[thread
->index
]);
1129 pqueue_remove_at(thread
->index
, queue
);
1131 thread_list_delete(list
, thread
);
1132 } else if (thread_array
) {
1133 thread_array
[thread
->u
.fd
] = NULL
;
1135 assert(!"Thread should be either in queue or list or array!");
1139 *thread
->ref
= NULL
;
1141 thread_add_unuse(thread
->master
, thread
);
1144 /* Delete and free all cancellation requests */
1145 list_delete_all_node(master
->cancel_req
);
1147 /* Wake up any threads which may be blocked in thread_cancel_async() */
1148 master
->canceled
= true;
1149 pthread_cond_broadcast(&master
->cancel_cond
);
1153 * Cancel any events which have the specified argument.
1157 * @param m the thread_master to cancel from
1158 * @param arg the argument passed when creating the event
1160 void thread_cancel_event(struct thread_master
*master
, void *arg
)
1162 assert(master
->owner
== pthread_self());
1164 pthread_mutex_lock(&master
->mtx
);
1166 struct cancel_req
*cr
=
1167 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1169 listnode_add(master
->cancel_req
, cr
);
1170 do_thread_cancel(master
);
1172 pthread_mutex_unlock(&master
->mtx
);
1176 * Cancel a specific task.
1180 * @param thread task to cancel
1182 void thread_cancel(struct thread
*thread
)
1184 assert(thread
->master
->owner
== pthread_self());
1186 pthread_mutex_lock(&thread
->master
->mtx
);
1188 struct cancel_req
*cr
=
1189 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1190 cr
->thread
= thread
;
1191 listnode_add(thread
->master
->cancel_req
, cr
);
1192 do_thread_cancel(thread
->master
);
1194 pthread_mutex_unlock(&thread
->master
->mtx
);
1198 * Asynchronous cancellation.
1200 * Called with either a struct thread ** or void * to an event argument,
1201 * this function posts the correct cancellation request and blocks until it is
1204 * If the thread is currently running, execution blocks until it completes.
1206 * The last two parameters are mutually exclusive, i.e. if you pass one the
1207 * other must be NULL.
1209 * When the cancellation procedure executes on the target thread_master, the
1210 * thread * provided is checked for nullity. If it is null, the thread is
1211 * assumed to no longer exist and the cancellation request is a no-op. Thus
1212 * users of this API must pass a back-reference when scheduling the original
1217 * @param master the thread master with the relevant event / task
1218 * @param thread pointer to thread to cancel
1219 * @param eventobj the event
1221 void thread_cancel_async(struct thread_master
*master
, struct thread
**thread
,
1224 assert(!(thread
&& eventobj
) && (thread
|| eventobj
));
1225 assert(master
->owner
!= pthread_self());
1227 pthread_mutex_lock(&master
->mtx
);
1229 master
->canceled
= false;
1232 struct cancel_req
*cr
=
1233 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1234 cr
->threadref
= thread
;
1235 listnode_add(master
->cancel_req
, cr
);
1236 } else if (eventobj
) {
1237 struct cancel_req
*cr
=
1238 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1239 cr
->eventobj
= eventobj
;
1240 listnode_add(master
->cancel_req
, cr
);
1244 while (!master
->canceled
)
1245 pthread_cond_wait(&master
->cancel_cond
, &master
->mtx
);
1247 pthread_mutex_unlock(&master
->mtx
);
1249 /* ------------------------------------------------------------------------- */
1251 static struct timeval
*thread_timer_wait(struct pqueue
*queue
,
1252 struct timeval
*timer_val
)
1255 struct thread
*next_timer
= queue
->array
[0];
1256 monotime_until(&next_timer
->u
.sands
, timer_val
);
1262 static struct thread
*thread_run(struct thread_master
*m
, struct thread
*thread
,
1263 struct thread
*fetch
)
1266 thread_add_unuse(m
, thread
);
1270 static int thread_process_io_helper(struct thread_master
*m
,
1271 struct thread
*thread
, short state
, int pos
)
1273 struct thread
**thread_array
;
1278 if (thread
->type
== THREAD_READ
)
1279 thread_array
= m
->read
;
1281 thread_array
= m
->write
;
1283 thread_array
[thread
->u
.fd
] = NULL
;
1284 thread_list_add(&m
->ready
, thread
);
1285 thread
->type
= THREAD_READY
;
1286 /* if another pthread scheduled this file descriptor for the event we're
1287 * responding to, no problem; we're getting to it now */
1288 thread
->master
->handler
.pfds
[pos
].events
&= ~(state
);
1293 * Process I/O events.
1295 * Walks through file descriptor array looking for those pollfds whose .revents
1296 * field has something interesting. Deletes any invalid file descriptors.
1298 * @param m the thread master
1299 * @param num the number of active file descriptors (return value of poll())
1301 static void thread_process_io(struct thread_master
*m
, unsigned int num
)
1303 unsigned int ready
= 0;
1304 struct pollfd
*pfds
= m
->handler
.copy
;
1306 for (nfds_t i
= 0; i
< m
->handler
.copycount
&& ready
< num
; ++i
) {
1307 /* no event for current fd? immediately continue */
1308 if (pfds
[i
].revents
== 0)
1313 /* Unless someone has called thread_cancel from another pthread,
1315 * thing that could have changed in m->handler.pfds while we
1317 * asleep is the .events field in a given pollfd. Barring
1319 * that value should be a superset of the values we have in our
1321 * there's no need to update it. Similarily, barring deletion,
1323 * should still be a valid index into the master's pfds. */
1324 if (pfds
[i
].revents
& (POLLIN
| POLLHUP
))
1325 thread_process_io_helper(m
, m
->read
[pfds
[i
].fd
], POLLIN
,
1327 if (pfds
[i
].revents
& POLLOUT
)
1328 thread_process_io_helper(m
, m
->write
[pfds
[i
].fd
],
1331 /* if one of our file descriptors is garbage, remove the same
1333 * both pfds + update sizes and index */
1334 if (pfds
[i
].revents
& POLLNVAL
) {
1335 memmove(m
->handler
.pfds
+ i
, m
->handler
.pfds
+ i
+ 1,
1336 (m
->handler
.pfdcount
- i
- 1)
1337 * sizeof(struct pollfd
));
1338 m
->handler
.pfdcount
--;
1340 memmove(pfds
+ i
, pfds
+ i
+ 1,
1341 (m
->handler
.copycount
- i
- 1)
1342 * sizeof(struct pollfd
));
1343 m
->handler
.copycount
--;
1350 /* Add all timers that have popped to the ready list. */
1351 static unsigned int thread_process_timers(struct pqueue
*queue
,
1352 struct timeval
*timenow
)
1354 struct thread
*thread
;
1355 unsigned int ready
= 0;
1357 while (queue
->size
) {
1358 thread
= queue
->array
[0];
1359 if (timercmp(timenow
, &thread
->u
.sands
, <))
1361 pqueue_dequeue(queue
);
1362 thread
->type
= THREAD_READY
;
1363 thread_list_add(&thread
->master
->ready
, thread
);
1369 /* process a list en masse, e.g. for event thread lists */
1370 static unsigned int thread_process(struct thread_list
*list
)
1372 struct thread
*thread
;
1373 struct thread
*next
;
1374 unsigned int ready
= 0;
1376 for (thread
= list
->head
; thread
; thread
= next
) {
1377 next
= thread
->next
;
1378 thread_list_delete(list
, thread
);
1379 thread
->type
= THREAD_READY
;
1380 thread_list_add(&thread
->master
->ready
, thread
);
1387 /* Fetch next ready thread. */
1388 struct thread
*thread_fetch(struct thread_master
*m
, struct thread
*fetch
)
1390 struct thread
*thread
= NULL
;
1392 struct timeval zerotime
= {0, 0};
1394 struct timeval
*tw
= NULL
;
1399 /* Handle signals if any */
1400 if (m
->handle_signals
)
1401 quagga_sigevent_process();
1403 pthread_mutex_lock(&m
->mtx
);
1405 /* Process any pending cancellation requests */
1406 do_thread_cancel(m
);
1409 * Attempt to flush ready queue before going into poll().
1410 * This is performance-critical. Think twice before modifying.
1412 if ((thread
= thread_trim_head(&m
->ready
))) {
1413 fetch
= thread_run(m
, thread
, fetch
);
1416 pthread_mutex_unlock(&m
->mtx
);
1420 /* otherwise, tick through scheduling sequence */
1423 * Post events to ready queue. This must come before the
1424 * following block since events should occur immediately
1426 thread_process(&m
->event
);
1429 * If there are no tasks on the ready queue, we will poll()
1430 * until a timer expires or we receive I/O, whichever comes
1431 * first. The strategy for doing this is:
1433 * - If there are events pending, set the poll() timeout to zero
1434 * - If there are no events pending, but there are timers
1436 * timeout to the smallest remaining time on any timer
1437 * - If there are neither timers nor events pending, but there
1439 * descriptors pending, block indefinitely in poll()
1440 * - If nothing is pending, it's time for the application to die
1442 * In every case except the last, we need to hit poll() at least
1443 * once per loop to avoid starvation by events
1445 if (m
->ready
.count
== 0)
1446 tw
= thread_timer_wait(m
->timer
, &tv
);
1448 if (m
->ready
.count
!= 0 || (tw
&& !timercmp(tw
, &zerotime
, >)))
1451 if (!tw
&& m
->handler
.pfdcount
== 0) { /* die */
1452 pthread_mutex_unlock(&m
->mtx
);
1458 * Copy pollfd array + # active pollfds in it. Not necessary to
1459 * copy the array size as this is fixed.
1461 m
->handler
.copycount
= m
->handler
.pfdcount
;
1462 memcpy(m
->handler
.copy
, m
->handler
.pfds
,
1463 m
->handler
.copycount
* sizeof(struct pollfd
));
1465 pthread_mutex_unlock(&m
->mtx
);
1467 num
= fd_poll(m
, m
->handler
.copy
, m
->handler
.pfdsize
,
1468 m
->handler
.copycount
, tw
);
1470 pthread_mutex_lock(&m
->mtx
);
1472 /* Handle any errors received in poll() */
1474 if (errno
== EINTR
) {
1475 pthread_mutex_unlock(&m
->mtx
);
1476 /* loop around to signal handler */
1481 zlog_warn("poll() error: %s", safe_strerror(errno
));
1482 pthread_mutex_unlock(&m
->mtx
);
1487 /* Post timers to ready queue. */
1489 thread_process_timers(m
->timer
, &now
);
1491 /* Post I/O to ready queue. */
1493 thread_process_io(m
, num
);
1495 pthread_mutex_unlock(&m
->mtx
);
1497 } while (!thread
&& m
->spin
);
1502 static unsigned long timeval_elapsed(struct timeval a
, struct timeval b
)
1504 return (((a
.tv_sec
- b
.tv_sec
) * TIMER_SECOND_MICRO
)
1505 + (a
.tv_usec
- b
.tv_usec
));
1508 unsigned long thread_consumed_time(RUSAGE_T
*now
, RUSAGE_T
*start
,
1509 unsigned long *cputime
)
1511 /* This is 'user + sys' time. */
1512 *cputime
= timeval_elapsed(now
->cpu
.ru_utime
, start
->cpu
.ru_utime
)
1513 + timeval_elapsed(now
->cpu
.ru_stime
, start
->cpu
.ru_stime
);
1514 return timeval_elapsed(now
->real
, start
->real
);
1517 /* We should aim to yield after yield milliseconds, which defaults
1518 to THREAD_YIELD_TIME_SLOT .
1519 Note: we are using real (wall clock) time for this calculation.
1520 It could be argued that CPU time may make more sense in certain
1521 contexts. The things to consider are whether the thread may have
1522 blocked (in which case wall time increases, but CPU time does not),
1523 or whether the system is heavily loaded with other processes competing
1524 for CPU time. On balance, wall clock time seems to make sense.
1525 Plus it has the added benefit that gettimeofday should be faster
1526 than calling getrusage. */
1527 int thread_should_yield(struct thread
*thread
)
1530 pthread_mutex_lock(&thread
->mtx
);
1532 result
= monotime_since(&thread
->real
, NULL
)
1533 > (int64_t)thread
->yield
;
1535 pthread_mutex_unlock(&thread
->mtx
);
1539 void thread_set_yield_time(struct thread
*thread
, unsigned long yield_time
)
1541 pthread_mutex_lock(&thread
->mtx
);
1543 thread
->yield
= yield_time
;
1545 pthread_mutex_unlock(&thread
->mtx
);
1548 void thread_getrusage(RUSAGE_T
*r
)
1551 getrusage(RUSAGE_SELF
, &(r
->cpu
));
1557 * This function will atomically update the thread's usage history. At present
1558 * this is the only spot where usage history is written. Nevertheless the code
1559 * has been written such that the introduction of writers in the future should
1560 * not need to update it provided the writers atomically perform only the
1561 * operations done here, i.e. updating the total and maximum times. In
1562 * particular, the maximum real and cpu times must be monotonically increasing
1563 * or this code is not correct.
1565 void thread_call(struct thread
*thread
)
1567 _Atomic
unsigned long realtime
, cputime
;
1569 unsigned long helper
;
1570 RUSAGE_T before
, after
;
1573 thread
->real
= before
.real
;
1575 pthread_setspecific(thread_current
, thread
);
1576 (*thread
->func
)(thread
);
1577 pthread_setspecific(thread_current
, NULL
);
1581 realtime
= thread_consumed_time(&after
, &before
, &helper
);
1584 /* update realtime */
1585 atomic_fetch_add_explicit(&thread
->hist
->real
.total
, realtime
,
1586 memory_order_seq_cst
);
1587 exp
= atomic_load_explicit(&thread
->hist
->real
.max
,
1588 memory_order_seq_cst
);
1589 while (exp
< realtime
1590 && !atomic_compare_exchange_weak_explicit(
1591 &thread
->hist
->real
.max
, &exp
, realtime
,
1592 memory_order_seq_cst
, memory_order_seq_cst
))
1595 /* update cputime */
1596 atomic_fetch_add_explicit(&thread
->hist
->cpu
.total
, cputime
,
1597 memory_order_seq_cst
);
1598 exp
= atomic_load_explicit(&thread
->hist
->cpu
.max
,
1599 memory_order_seq_cst
);
1600 while (exp
< cputime
1601 && !atomic_compare_exchange_weak_explicit(
1602 &thread
->hist
->cpu
.max
, &exp
, cputime
,
1603 memory_order_seq_cst
, memory_order_seq_cst
))
1606 atomic_fetch_add_explicit(&thread
->hist
->total_calls
, 1,
1607 memory_order_seq_cst
);
1608 atomic_fetch_or_explicit(&thread
->hist
->types
, 1 << thread
->add_type
,
1609 memory_order_seq_cst
);
1611 #ifdef CONSUMED_TIME_CHECK
1612 if (realtime
> CONSUMED_TIME_CHECK
) {
1614 * We have a CPU Hog on our hands.
1615 * Whinge about it now, so we're aware this is yet another task
1619 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1620 thread
->funcname
, (unsigned long)thread
->func
,
1621 realtime
/ 1000, cputime
/ 1000);
1623 #endif /* CONSUMED_TIME_CHECK */
1626 /* Execute thread */
1627 void funcname_thread_execute(struct thread_master
*m
,
1628 int (*func
)(struct thread
*), void *arg
, int val
,
1631 struct cpu_thread_history tmp
;
1632 struct thread dummy
;
1634 memset(&dummy
, 0, sizeof(struct thread
));
1636 pthread_mutex_init(&dummy
.mtx
, NULL
);
1637 dummy
.type
= THREAD_EVENT
;
1638 dummy
.add_type
= THREAD_EXECUTE
;
1639 dummy
.master
= NULL
;
1643 tmp
.func
= dummy
.func
= func
;
1644 tmp
.funcname
= dummy
.funcname
= funcname
;
1645 dummy
.hist
= hash_get(m
->cpu_record
, &tmp
,
1646 (void *(*)(void *))cpu_record_hash_alloc
);
1648 dummy
.schedfrom
= schedfrom
;
1649 dummy
.schedfrom_line
= fromln
;
1651 thread_call(&dummy
);