1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
4 * This file is part of GNU Zebra.
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 #include <sys/resource.h>
35 #include "frratomic.h"
36 #include "lib_errors.h"
38 DEFINE_MTYPE_STATIC(LIB
, THREAD
, "Thread")
39 DEFINE_MTYPE_STATIC(LIB
, THREAD_MASTER
, "Thread master")
40 DEFINE_MTYPE_STATIC(LIB
, THREAD_POLL
, "Thread Poll Info")
41 DEFINE_MTYPE_STATIC(LIB
, THREAD_STATS
, "Thread stats")
43 DECLARE_LIST(thread_list
, struct thread
, threaditem
)
45 #if defined(__APPLE__)
46 #include <mach/mach.h>
47 #include <mach/mach_time.h>
52 static unsigned char wakebyte = 0x01; \
53 write(m->io_pipe[1], &wakebyte, 1); \
56 /* control variable for initializer */
57 static pthread_once_t init_once
= PTHREAD_ONCE_INIT
;
58 pthread_key_t thread_current
;
60 static pthread_mutex_t masters_mtx
= PTHREAD_MUTEX_INITIALIZER
;
61 static struct list
*masters
;
63 static void thread_free(struct thread_master
*master
, struct thread
*thread
);
65 /* CLI start ---------------------------------------------------------------- */
66 static unsigned int cpu_record_hash_key(const struct cpu_thread_history
*a
)
68 int size
= sizeof(a
->func
);
70 return jhash(&a
->func
, size
, 0);
73 static bool cpu_record_hash_cmp(const struct cpu_thread_history
*a
,
74 const struct cpu_thread_history
*b
)
76 return a
->func
== b
->func
;
79 static void *cpu_record_hash_alloc(struct cpu_thread_history
*a
)
81 struct cpu_thread_history
*new;
82 new = XCALLOC(MTYPE_THREAD_STATS
, sizeof(struct cpu_thread_history
));
84 new->funcname
= a
->funcname
;
88 static void cpu_record_hash_free(void *a
)
90 struct cpu_thread_history
*hist
= a
;
92 XFREE(MTYPE_THREAD_STATS
, hist
);
95 static void vty_out_cpu_thread_history(struct vty
*vty
,
96 struct cpu_thread_history
*a
)
98 vty_out(vty
, "%5zu %10zu.%03zu %9zu %8zu %9zu %8zu %9zu",
99 (size_t)a
->total_active
,
100 a
->cpu
.total
/ 1000, a
->cpu
.total
% 1000,
101 (size_t)a
->total_calls
,
102 a
->cpu
.total
/ a
->total_calls
, a
->cpu
.max
,
103 a
->real
.total
/ a
->total_calls
, a
->real
.max
);
104 vty_out(vty
, " %c%c%c%c%c %s\n",
105 a
->types
& (1 << THREAD_READ
) ? 'R' : ' ',
106 a
->types
& (1 << THREAD_WRITE
) ? 'W' : ' ',
107 a
->types
& (1 << THREAD_TIMER
) ? 'T' : ' ',
108 a
->types
& (1 << THREAD_EVENT
) ? 'E' : ' ',
109 a
->types
& (1 << THREAD_EXECUTE
) ? 'X' : ' ', a
->funcname
);
112 static void cpu_record_hash_print(struct hash_bucket
*bucket
, void *args
[])
114 struct cpu_thread_history
*totals
= args
[0];
115 struct cpu_thread_history copy
;
116 struct vty
*vty
= args
[1];
117 uint8_t *filter
= args
[2];
119 struct cpu_thread_history
*a
= bucket
->data
;
122 atomic_load_explicit(&a
->total_active
, memory_order_seq_cst
);
124 atomic_load_explicit(&a
->total_calls
, memory_order_seq_cst
);
126 atomic_load_explicit(&a
->cpu
.total
, memory_order_seq_cst
);
127 copy
.cpu
.max
= atomic_load_explicit(&a
->cpu
.max
, memory_order_seq_cst
);
129 atomic_load_explicit(&a
->real
.total
, memory_order_seq_cst
);
131 atomic_load_explicit(&a
->real
.max
, memory_order_seq_cst
);
132 copy
.types
= atomic_load_explicit(&a
->types
, memory_order_seq_cst
);
133 copy
.funcname
= a
->funcname
;
135 if (!(copy
.types
& *filter
))
138 vty_out_cpu_thread_history(vty
, ©
);
139 totals
->total_active
+= copy
.total_active
;
140 totals
->total_calls
+= copy
.total_calls
;
141 totals
->real
.total
+= copy
.real
.total
;
142 if (totals
->real
.max
< copy
.real
.max
)
143 totals
->real
.max
= copy
.real
.max
;
144 totals
->cpu
.total
+= copy
.cpu
.total
;
145 if (totals
->cpu
.max
< copy
.cpu
.max
)
146 totals
->cpu
.max
= copy
.cpu
.max
;
149 static void cpu_record_print(struct vty
*vty
, uint8_t filter
)
151 struct cpu_thread_history tmp
;
152 void *args
[3] = {&tmp
, vty
, &filter
};
153 struct thread_master
*m
;
156 memset(&tmp
, 0, sizeof tmp
);
157 tmp
.funcname
= "TOTAL";
160 pthread_mutex_lock(&masters_mtx
);
162 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
163 const char *name
= m
->name
? m
->name
: "main";
165 char underline
[strlen(name
) + 1];
166 memset(underline
, '-', sizeof(underline
));
167 underline
[sizeof(underline
) - 1] = '\0';
170 vty_out(vty
, "Showing statistics for pthread %s\n",
172 vty_out(vty
, "-------------------------------%s\n",
174 vty_out(vty
, "%21s %18s %18s\n", "",
175 "CPU (user+system):", "Real (wall-clock):");
177 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
178 vty_out(vty
, " Avg uSec Max uSecs");
179 vty_out(vty
, " Type Thread\n");
181 if (m
->cpu_record
->count
)
184 (void (*)(struct hash_bucket
*,
185 void *))cpu_record_hash_print
,
188 vty_out(vty
, "No data to display yet.\n");
193 pthread_mutex_unlock(&masters_mtx
);
196 vty_out(vty
, "Total thread statistics\n");
197 vty_out(vty
, "-------------------------\n");
198 vty_out(vty
, "%21s %18s %18s\n", "",
199 "CPU (user+system):", "Real (wall-clock):");
200 vty_out(vty
, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
201 vty_out(vty
, " Avg uSec Max uSecs");
202 vty_out(vty
, " Type Thread\n");
204 if (tmp
.total_calls
> 0)
205 vty_out_cpu_thread_history(vty
, &tmp
);
208 static void cpu_record_hash_clear(struct hash_bucket
*bucket
, void *args
[])
210 uint8_t *filter
= args
[0];
211 struct hash
*cpu_record
= args
[1];
213 struct cpu_thread_history
*a
= bucket
->data
;
215 if (!(a
->types
& *filter
))
218 hash_release(cpu_record
, bucket
->data
);
221 static void cpu_record_clear(uint8_t filter
)
223 uint8_t *tmp
= &filter
;
224 struct thread_master
*m
;
227 pthread_mutex_lock(&masters_mtx
);
229 for (ALL_LIST_ELEMENTS_RO(masters
, ln
, m
)) {
230 pthread_mutex_lock(&m
->mtx
);
232 void *args
[2] = {tmp
, m
->cpu_record
};
235 (void (*)(struct hash_bucket
*,
236 void *))cpu_record_hash_clear
,
239 pthread_mutex_unlock(&m
->mtx
);
242 pthread_mutex_unlock(&masters_mtx
);
245 static uint8_t parse_filter(const char *filterstr
)
250 while (filterstr
[i
] != '\0') {
251 switch (filterstr
[i
]) {
254 filter
|= (1 << THREAD_READ
);
258 filter
|= (1 << THREAD_WRITE
);
262 filter
|= (1 << THREAD_TIMER
);
266 filter
|= (1 << THREAD_EVENT
);
270 filter
|= (1 << THREAD_EXECUTE
);
280 DEFUN (show_thread_cpu
,
282 "show thread cpu [FILTER]",
284 "Thread information\n"
286 "Display filter (rwtex)\n")
288 uint8_t filter
= (uint8_t)-1U;
291 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
292 filter
= parse_filter(argv
[idx
]->arg
);
295 "Invalid filter \"%s\" specified; must contain at least"
302 cpu_record_print(vty
, filter
);
306 static void show_thread_poll_helper(struct vty
*vty
, struct thread_master
*m
)
308 const char *name
= m
->name
? m
->name
: "main";
309 char underline
[strlen(name
) + 1];
310 struct thread
*thread
;
313 memset(underline
, '-', sizeof(underline
));
314 underline
[sizeof(underline
) - 1] = '\0';
316 vty_out(vty
, "\nShowing poll FD's for %s\n", name
);
317 vty_out(vty
, "----------------------%s\n", underline
);
318 vty_out(vty
, "Count: %u/%d\n", (uint32_t)m
->handler
.pfdcount
,
320 for (i
= 0; i
< m
->handler
.pfdcount
; i
++) {
321 vty_out(vty
, "\t%6d fd:%6d events:%2d revents:%2d\t\t", i
,
322 m
->handler
.pfds
[i
].fd
, m
->handler
.pfds
[i
].events
,
323 m
->handler
.pfds
[i
].revents
);
325 if (m
->handler
.pfds
[i
].events
& POLLIN
) {
326 thread
= m
->read
[m
->handler
.pfds
[i
].fd
];
329 vty_out(vty
, "ERROR ");
331 vty_out(vty
, "%s ", thread
->funcname
);
335 if (m
->handler
.pfds
[i
].events
& POLLOUT
) {
336 thread
= m
->write
[m
->handler
.pfds
[i
].fd
];
339 vty_out(vty
, "ERROR\n");
341 vty_out(vty
, "%s\n", thread
->funcname
);
347 DEFUN (show_thread_poll
,
348 show_thread_poll_cmd
,
351 "Thread information\n"
352 "Show poll FD's and information\n")
354 struct listnode
*node
;
355 struct thread_master
*m
;
357 pthread_mutex_lock(&masters_mtx
);
359 for (ALL_LIST_ELEMENTS_RO(masters
, node
, m
)) {
360 show_thread_poll_helper(vty
, m
);
363 pthread_mutex_unlock(&masters_mtx
);
369 DEFUN (clear_thread_cpu
,
370 clear_thread_cpu_cmd
,
371 "clear thread cpu [FILTER]",
372 "Clear stored data in all pthreads\n"
373 "Thread information\n"
375 "Display filter (rwtexb)\n")
377 uint8_t filter
= (uint8_t)-1U;
380 if (argv_find(argv
, argc
, "FILTER", &idx
)) {
381 filter
= parse_filter(argv
[idx
]->arg
);
384 "Invalid filter \"%s\" specified; must contain at least"
391 cpu_record_clear(filter
);
395 void thread_cmd_init(void)
397 install_element(VIEW_NODE
, &show_thread_cpu_cmd
);
398 install_element(VIEW_NODE
, &show_thread_poll_cmd
);
399 install_element(ENABLE_NODE
, &clear_thread_cpu_cmd
);
401 /* CLI end ------------------------------------------------------------------ */
404 static int thread_timer_cmp(void *a
, void *b
)
406 struct thread
*thread_a
= a
;
407 struct thread
*thread_b
= b
;
409 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, <))
411 if (timercmp(&thread_a
->u
.sands
, &thread_b
->u
.sands
, >))
416 static void thread_timer_update(void *node
, int actual_position
)
418 struct thread
*thread
= node
;
420 thread
->index
= actual_position
;
423 static void cancelreq_del(void *cr
)
425 XFREE(MTYPE_TMP
, cr
);
428 /* initializer, only ever called once */
429 static void initializer(void)
431 pthread_key_create(&thread_current
, NULL
);
434 struct thread_master
*thread_master_create(const char *name
)
436 struct thread_master
*rv
;
439 pthread_once(&init_once
, &initializer
);
441 rv
= XCALLOC(MTYPE_THREAD_MASTER
, sizeof(struct thread_master
));
443 /* Initialize master mutex */
444 pthread_mutex_init(&rv
->mtx
, NULL
);
445 pthread_cond_init(&rv
->cancel_cond
, NULL
);
448 rv
->name
= name
? XSTRDUP(MTYPE_THREAD_MASTER
, name
) : NULL
;
450 /* Initialize I/O task data structures */
451 getrlimit(RLIMIT_NOFILE
, &limit
);
452 rv
->fd_limit
= (int)limit
.rlim_cur
;
453 rv
->read
= XCALLOC(MTYPE_THREAD_POLL
,
454 sizeof(struct thread
*) * rv
->fd_limit
);
456 rv
->write
= XCALLOC(MTYPE_THREAD_POLL
,
457 sizeof(struct thread
*) * rv
->fd_limit
);
459 rv
->cpu_record
= hash_create_size(
460 8, (unsigned int (*)(const void *))cpu_record_hash_key
,
461 (bool (*)(const void *, const void *))cpu_record_hash_cmp
,
464 thread_list_init(&rv
->event
);
465 thread_list_init(&rv
->ready
);
466 thread_list_init(&rv
->unuse
);
468 /* Initialize the timer queues */
469 rv
->timer
= pqueue_create();
470 rv
->timer
->cmp
= thread_timer_cmp
;
471 rv
->timer
->update
= thread_timer_update
;
473 /* Initialize thread_fetch() settings */
475 rv
->handle_signals
= true;
477 /* Set pthread owner, should be updated by actual owner */
478 rv
->owner
= pthread_self();
479 rv
->cancel_req
= list_new();
480 rv
->cancel_req
->del
= cancelreq_del
;
483 /* Initialize pipe poker */
485 set_nonblocking(rv
->io_pipe
[0]);
486 set_nonblocking(rv
->io_pipe
[1]);
488 /* Initialize data structures for poll() */
489 rv
->handler
.pfdsize
= rv
->fd_limit
;
490 rv
->handler
.pfdcount
= 0;
491 rv
->handler
.pfds
= XCALLOC(MTYPE_THREAD_MASTER
,
492 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
493 rv
->handler
.copy
= XCALLOC(MTYPE_THREAD_MASTER
,
494 sizeof(struct pollfd
) * rv
->handler
.pfdsize
);
496 /* add to list of threadmasters */
497 pthread_mutex_lock(&masters_mtx
);
500 masters
= list_new();
502 listnode_add(masters
, rv
);
504 pthread_mutex_unlock(&masters_mtx
);
509 void thread_master_set_name(struct thread_master
*master
, const char *name
)
511 pthread_mutex_lock(&master
->mtx
);
513 XFREE(MTYPE_THREAD_MASTER
, master
->name
);
514 master
->name
= XSTRDUP(MTYPE_THREAD_MASTER
, name
);
516 pthread_mutex_unlock(&master
->mtx
);
519 #define THREAD_UNUSED_DEPTH 10
521 /* Move thread to unuse list. */
522 static void thread_add_unuse(struct thread_master
*m
, struct thread
*thread
)
524 pthread_mutex_t mtxc
= thread
->mtx
;
526 assert(m
!= NULL
&& thread
!= NULL
);
528 thread
->hist
->total_active
--;
529 memset(thread
, 0, sizeof(struct thread
));
530 thread
->type
= THREAD_UNUSED
;
532 /* Restore the thread mutex context. */
535 if (thread_list_count(&m
->unuse
) < THREAD_UNUSED_DEPTH
) {
536 thread_list_add_tail(&m
->unuse
, thread
);
540 thread_free(m
, thread
);
543 /* Free all unused thread. */
544 static void thread_list_free(struct thread_master
*m
,
545 struct thread_list_head
*list
)
549 while ((t
= thread_list_pop(list
)))
553 static void thread_array_free(struct thread_master
*m
,
554 struct thread
**thread_array
)
559 for (index
= 0; index
< m
->fd_limit
; ++index
) {
560 t
= thread_array
[index
];
562 thread_array
[index
] = NULL
;
566 XFREE(MTYPE_THREAD_POLL
, thread_array
);
569 static void thread_queue_free(struct thread_master
*m
, struct pqueue
*queue
)
573 for (i
= 0; i
< queue
->size
; i
++)
574 thread_free(m
, queue
->array
[i
]);
576 pqueue_delete(queue
);
580 * thread_master_free_unused
582 * As threads are finished with they are put on the
583 * unuse list for later reuse.
584 * If we are shutting down, Free up unused threads
585 * So we can see if we forget to shut anything off
587 void thread_master_free_unused(struct thread_master
*m
)
589 pthread_mutex_lock(&m
->mtx
);
592 while ((t
= thread_list_pop(&m
->unuse
)))
595 pthread_mutex_unlock(&m
->mtx
);
598 /* Stop thread scheduler. */
599 void thread_master_free(struct thread_master
*m
)
601 pthread_mutex_lock(&masters_mtx
);
603 listnode_delete(masters
, m
);
604 if (masters
->count
== 0) {
605 list_delete(&masters
);
608 pthread_mutex_unlock(&masters_mtx
);
610 thread_array_free(m
, m
->read
);
611 thread_array_free(m
, m
->write
);
612 thread_queue_free(m
, m
->timer
);
613 thread_list_free(m
, &m
->event
);
614 thread_list_free(m
, &m
->ready
);
615 thread_list_free(m
, &m
->unuse
);
616 pthread_mutex_destroy(&m
->mtx
);
617 pthread_cond_destroy(&m
->cancel_cond
);
618 close(m
->io_pipe
[0]);
619 close(m
->io_pipe
[1]);
620 list_delete(&m
->cancel_req
);
621 m
->cancel_req
= NULL
;
623 hash_clean(m
->cpu_record
, cpu_record_hash_free
);
624 hash_free(m
->cpu_record
);
625 m
->cpu_record
= NULL
;
627 XFREE(MTYPE_THREAD_MASTER
, m
->name
);
628 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.pfds
);
629 XFREE(MTYPE_THREAD_MASTER
, m
->handler
.copy
);
630 XFREE(MTYPE_THREAD_MASTER
, m
);
633 /* Return remain time in miliseconds. */
634 unsigned long thread_timer_remain_msec(struct thread
*thread
)
638 pthread_mutex_lock(&thread
->mtx
);
640 remain
= monotime_until(&thread
->u
.sands
, NULL
) / 1000LL;
642 pthread_mutex_unlock(&thread
->mtx
);
644 return remain
< 0 ? 0 : remain
;
647 /* Return remain time in seconds. */
648 unsigned long thread_timer_remain_second(struct thread
*thread
)
650 return thread_timer_remain_msec(thread
) / 1000LL;
653 #define debugargdef const char *funcname, const char *schedfrom, int fromln
654 #define debugargpass funcname, schedfrom, fromln
656 struct timeval
thread_timer_remain(struct thread
*thread
)
658 struct timeval remain
;
659 pthread_mutex_lock(&thread
->mtx
);
661 monotime_until(&thread
->u
.sands
, &remain
);
663 pthread_mutex_unlock(&thread
->mtx
);
667 /* Get new thread. */
668 static struct thread
*thread_get(struct thread_master
*m
, uint8_t type
,
669 int (*func
)(struct thread
*), void *arg
,
672 struct thread
*thread
= thread_list_pop(&m
->unuse
);
673 struct cpu_thread_history tmp
;
676 thread
= XCALLOC(MTYPE_THREAD
, sizeof(struct thread
));
677 /* mutex only needs to be initialized at struct creation. */
678 pthread_mutex_init(&thread
->mtx
, NULL
);
683 thread
->add_type
= type
;
687 thread
->yield
= THREAD_YIELD_TIME_SLOT
; /* default */
691 * So if the passed in funcname is not what we have
692 * stored that means the thread->hist needs to be
693 * updated. We keep the last one around in unused
694 * under the assumption that we are probably
695 * going to immediately allocate the same
697 * This hopefully saves us some serious
700 if (thread
->funcname
!= funcname
|| thread
->func
!= func
) {
702 tmp
.funcname
= funcname
;
704 hash_get(m
->cpu_record
, &tmp
,
705 (void *(*)(void *))cpu_record_hash_alloc
);
707 thread
->hist
->total_active
++;
709 thread
->funcname
= funcname
;
710 thread
->schedfrom
= schedfrom
;
711 thread
->schedfrom_line
= fromln
;
716 static void thread_free(struct thread_master
*master
, struct thread
*thread
)
718 /* Update statistics. */
719 assert(master
->alloc
> 0);
722 /* Free allocated resources. */
723 pthread_mutex_destroy(&thread
->mtx
);
724 XFREE(MTYPE_THREAD
, thread
);
727 static int fd_poll(struct thread_master
*m
, struct pollfd
*pfds
, nfds_t pfdsize
,
728 nfds_t count
, const struct timeval
*timer_wait
)
730 /* If timer_wait is null here, that means poll() should block
732 * unless the thread_master has overriden it by setting
733 * ->selectpoll_timeout.
734 * If the value is positive, it specifies the maximum number of
736 * to wait. If the timeout is -1, it specifies that we should never wait
738 * always return immediately even if no event is detected. If the value
740 * zero, the behavior is default. */
743 /* number of file descriptors with events */
746 if (timer_wait
!= NULL
747 && m
->selectpoll_timeout
== 0) // use the default value
748 timeout
= (timer_wait
->tv_sec
* 1000)
749 + (timer_wait
->tv_usec
/ 1000);
750 else if (m
->selectpoll_timeout
> 0) // use the user's timeout
751 timeout
= m
->selectpoll_timeout
;
752 else if (m
->selectpoll_timeout
753 < 0) // effect a poll (return immediately)
756 /* add poll pipe poker */
757 assert(count
+ 1 < pfdsize
);
758 pfds
[count
].fd
= m
->io_pipe
[0];
759 pfds
[count
].events
= POLLIN
;
760 pfds
[count
].revents
= 0x00;
762 num
= poll(pfds
, count
+ 1, timeout
);
764 unsigned char trash
[64];
765 if (num
> 0 && pfds
[count
].revents
!= 0 && num
--)
766 while (read(m
->io_pipe
[0], &trash
, sizeof(trash
)) > 0)
772 /* Add new read thread. */
773 struct thread
*funcname_thread_add_read_write(int dir
, struct thread_master
*m
,
774 int (*func
)(struct thread
*),
776 struct thread
**t_ptr
,
779 struct thread
*thread
= NULL
;
780 struct thread
**thread_array
;
782 assert(fd
>= 0 && fd
< m
->fd_limit
);
783 pthread_mutex_lock(&m
->mtx
);
786 && *t_ptr
) // thread is already scheduled; don't reschedule
788 pthread_mutex_unlock(&m
->mtx
);
792 /* default to a new pollfd */
793 nfds_t queuepos
= m
->handler
.pfdcount
;
795 if (dir
== THREAD_READ
)
796 thread_array
= m
->read
;
798 thread_array
= m
->write
;
800 /* if we already have a pollfd for our file descriptor, find and
802 for (nfds_t i
= 0; i
< m
->handler
.pfdcount
; i
++)
803 if (m
->handler
.pfds
[i
].fd
== fd
) {
808 * What happens if we have a thread already
809 * created for this event?
811 if (thread_array
[fd
])
812 assert(!"Thread already scheduled for file descriptor");
817 /* make sure we have room for this fd + pipe poker fd */
818 assert(queuepos
+ 1 < m
->handler
.pfdsize
);
820 thread
= thread_get(m
, dir
, func
, arg
, debugargpass
);
822 m
->handler
.pfds
[queuepos
].fd
= fd
;
823 m
->handler
.pfds
[queuepos
].events
|=
824 (dir
== THREAD_READ
? POLLIN
: POLLOUT
);
826 if (queuepos
== m
->handler
.pfdcount
)
827 m
->handler
.pfdcount
++;
830 pthread_mutex_lock(&thread
->mtx
);
833 thread_array
[thread
->u
.fd
] = thread
;
835 pthread_mutex_unlock(&thread
->mtx
);
845 pthread_mutex_unlock(&m
->mtx
);
850 static struct thread
*
851 funcname_thread_add_timer_timeval(struct thread_master
*m
,
852 int (*func
)(struct thread
*), int type
,
853 void *arg
, struct timeval
*time_relative
,
854 struct thread
**t_ptr
, debugargdef
)
856 struct thread
*thread
;
857 struct pqueue
*queue
;
861 assert(type
== THREAD_TIMER
);
862 assert(time_relative
);
864 pthread_mutex_lock(&m
->mtx
);
867 && *t_ptr
) // thread is already scheduled; don't reschedule
869 pthread_mutex_unlock(&m
->mtx
);
874 thread
= thread_get(m
, type
, func
, arg
, debugargpass
);
876 pthread_mutex_lock(&thread
->mtx
);
878 monotime(&thread
->u
.sands
);
879 timeradd(&thread
->u
.sands
, time_relative
,
881 pqueue_enqueue(thread
, queue
);
887 pthread_mutex_unlock(&thread
->mtx
);
891 pthread_mutex_unlock(&m
->mtx
);
897 /* Add timer event thread. */
898 struct thread
*funcname_thread_add_timer(struct thread_master
*m
,
899 int (*func
)(struct thread
*),
900 void *arg
, long timer
,
901 struct thread
**t_ptr
, debugargdef
)
910 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
911 &trel
, t_ptr
, debugargpass
);
914 /* Add timer event thread with "millisecond" resolution */
915 struct thread
*funcname_thread_add_timer_msec(struct thread_master
*m
,
916 int (*func
)(struct thread
*),
917 void *arg
, long timer
,
918 struct thread
**t_ptr
,
925 trel
.tv_sec
= timer
/ 1000;
926 trel
.tv_usec
= 1000 * (timer
% 1000);
928 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
,
929 &trel
, t_ptr
, debugargpass
);
932 /* Add timer event thread with "millisecond" resolution */
933 struct thread
*funcname_thread_add_timer_tv(struct thread_master
*m
,
934 int (*func
)(struct thread
*),
935 void *arg
, struct timeval
*tv
,
936 struct thread
**t_ptr
, debugargdef
)
938 return funcname_thread_add_timer_timeval(m
, func
, THREAD_TIMER
, arg
, tv
,
939 t_ptr
, debugargpass
);
942 /* Add simple event thread. */
943 struct thread
*funcname_thread_add_event(struct thread_master
*m
,
944 int (*func
)(struct thread
*),
946 struct thread
**t_ptr
, debugargdef
)
948 struct thread
*thread
;
952 pthread_mutex_lock(&m
->mtx
);
955 && *t_ptr
) // thread is already scheduled; don't reschedule
957 pthread_mutex_unlock(&m
->mtx
);
961 thread
= thread_get(m
, THREAD_EVENT
, func
, arg
, debugargpass
);
962 pthread_mutex_lock(&thread
->mtx
);
965 thread_list_add_tail(&m
->event
, thread
);
967 pthread_mutex_unlock(&thread
->mtx
);
976 pthread_mutex_unlock(&m
->mtx
);
981 /* Thread cancellation ------------------------------------------------------ */
984 * NOT's out the .events field of pollfd corresponding to the given file
985 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
987 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
988 * implementation for details.
992 * @param state the event to cancel. One or more (OR'd together) of the
997 static void thread_cancel_rw(struct thread_master
*master
, int fd
, short state
)
1001 /* Cancel POLLHUP too just in case some bozo set it */
1004 /* find the index of corresponding pollfd */
1007 for (i
= 0; i
< master
->handler
.pfdcount
; i
++)
1008 if (master
->handler
.pfds
[i
].fd
== fd
) {
1015 "[!] Received cancellation request for nonexistent rw job");
1016 zlog_debug("[!] threadmaster: %s | fd: %d",
1017 master
->name
? master
->name
: "", fd
);
1021 /* NOT out event. */
1022 master
->handler
.pfds
[i
].events
&= ~(state
);
1024 /* If all events are canceled, delete / resize the pollfd array. */
1025 if (master
->handler
.pfds
[i
].events
== 0) {
1026 memmove(master
->handler
.pfds
+ i
, master
->handler
.pfds
+ i
+ 1,
1027 (master
->handler
.pfdcount
- i
- 1)
1028 * sizeof(struct pollfd
));
1029 master
->handler
.pfdcount
--;
1032 /* If we have the same pollfd in the copy, perform the same operations,
1033 * otherwise return. */
1034 if (i
>= master
->handler
.copycount
)
1037 master
->handler
.copy
[i
].events
&= ~(state
);
1039 if (master
->handler
.copy
[i
].events
== 0) {
1040 memmove(master
->handler
.copy
+ i
, master
->handler
.copy
+ i
+ 1,
1041 (master
->handler
.copycount
- i
- 1)
1042 * sizeof(struct pollfd
));
1043 master
->handler
.copycount
--;
1048 * Process cancellation requests.
1050 * This may only be run from the pthread which owns the thread_master.
1052 * @param master the thread master to process
1053 * @REQUIRE master->mtx
1055 static void do_thread_cancel(struct thread_master
*master
)
1057 struct thread_list_head
*list
= NULL
;
1058 struct pqueue
*queue
= NULL
;
1059 struct thread
**thread_array
= NULL
;
1060 struct thread
*thread
;
1062 struct cancel_req
*cr
;
1063 struct listnode
*ln
;
1064 for (ALL_LIST_ELEMENTS_RO(master
->cancel_req
, ln
, cr
)) {
1065 /* If this is an event object cancellation, linear search
1067 * list deleting any events which have the specified argument.
1069 * need to check every thread in the ready queue. */
1073 frr_each_safe(thread_list
, &master
->event
, t
) {
1074 if (t
->arg
!= cr
->eventobj
)
1076 thread_list_del(&master
->event
, t
);
1079 thread_add_unuse(master
, t
);
1082 frr_each_safe(thread_list
, &master
->ready
, t
) {
1083 if (t
->arg
!= cr
->eventobj
)
1085 thread_list_del(&master
->ready
, t
);
1088 thread_add_unuse(master
, t
);
1093 /* The pointer varies depending on whether the cancellation
1095 * made asynchronously or not. If it was, we need to check
1097 * thread even exists anymore before cancelling it. */
1098 thread
= (cr
->thread
) ? cr
->thread
: *cr
->threadref
;
1103 /* Determine the appropriate queue to cancel the thread from */
1104 switch (thread
->type
) {
1106 thread_cancel_rw(master
, thread
->u
.fd
, POLLIN
);
1107 thread_array
= master
->read
;
1110 thread_cancel_rw(master
, thread
->u
.fd
, POLLOUT
);
1111 thread_array
= master
->write
;
1114 queue
= master
->timer
;
1117 list
= &master
->event
;
1120 list
= &master
->ready
;
1128 assert(thread
->index
>= 0);
1129 assert(thread
== queue
->array
[thread
->index
]);
1130 pqueue_remove_at(thread
->index
, queue
);
1132 thread_list_del(list
, thread
);
1133 } else if (thread_array
) {
1134 thread_array
[thread
->u
.fd
] = NULL
;
1136 assert(!"Thread should be either in queue or list or array!");
1140 *thread
->ref
= NULL
;
1142 thread_add_unuse(thread
->master
, thread
);
1145 /* Delete and free all cancellation requests */
1146 list_delete_all_node(master
->cancel_req
);
1148 /* Wake up any threads which may be blocked in thread_cancel_async() */
1149 master
->canceled
= true;
1150 pthread_cond_broadcast(&master
->cancel_cond
);
1154 * Cancel any events which have the specified argument.
1158 * @param m the thread_master to cancel from
1159 * @param arg the argument passed when creating the event
1161 void thread_cancel_event(struct thread_master
*master
, void *arg
)
1163 assert(master
->owner
== pthread_self());
1165 pthread_mutex_lock(&master
->mtx
);
1167 struct cancel_req
*cr
=
1168 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1170 listnode_add(master
->cancel_req
, cr
);
1171 do_thread_cancel(master
);
1173 pthread_mutex_unlock(&master
->mtx
);
1177 * Cancel a specific task.
1181 * @param thread task to cancel
1183 void thread_cancel(struct thread
*thread
)
1185 struct thread_master
*master
= thread
->master
;
1187 assert(master
->owner
== pthread_self());
1189 pthread_mutex_lock(&master
->mtx
);
1191 struct cancel_req
*cr
=
1192 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1193 cr
->thread
= thread
;
1194 listnode_add(master
->cancel_req
, cr
);
1195 do_thread_cancel(master
);
1197 pthread_mutex_unlock(&master
->mtx
);
1201 * Asynchronous cancellation.
1203 * Called with either a struct thread ** or void * to an event argument,
1204 * this function posts the correct cancellation request and blocks until it is
1207 * If the thread is currently running, execution blocks until it completes.
1209 * The last two parameters are mutually exclusive, i.e. if you pass one the
1210 * other must be NULL.
1212 * When the cancellation procedure executes on the target thread_master, the
1213 * thread * provided is checked for nullity. If it is null, the thread is
1214 * assumed to no longer exist and the cancellation request is a no-op. Thus
1215 * users of this API must pass a back-reference when scheduling the original
1220 * @param master the thread master with the relevant event / task
1221 * @param thread pointer to thread to cancel
1222 * @param eventobj the event
1224 void thread_cancel_async(struct thread_master
*master
, struct thread
**thread
,
1227 assert(!(thread
&& eventobj
) && (thread
|| eventobj
));
1228 assert(master
->owner
!= pthread_self());
1230 pthread_mutex_lock(&master
->mtx
);
1232 master
->canceled
= false;
1235 struct cancel_req
*cr
=
1236 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1237 cr
->threadref
= thread
;
1238 listnode_add(master
->cancel_req
, cr
);
1239 } else if (eventobj
) {
1240 struct cancel_req
*cr
=
1241 XCALLOC(MTYPE_TMP
, sizeof(struct cancel_req
));
1242 cr
->eventobj
= eventobj
;
1243 listnode_add(master
->cancel_req
, cr
);
1247 while (!master
->canceled
)
1248 pthread_cond_wait(&master
->cancel_cond
, &master
->mtx
);
1250 pthread_mutex_unlock(&master
->mtx
);
1252 /* ------------------------------------------------------------------------- */
1254 static struct timeval
*thread_timer_wait(struct pqueue
*queue
,
1255 struct timeval
*timer_val
)
1258 struct thread
*next_timer
= queue
->array
[0];
1259 monotime_until(&next_timer
->u
.sands
, timer_val
);
1265 static struct thread
*thread_run(struct thread_master
*m
, struct thread
*thread
,
1266 struct thread
*fetch
)
1269 thread_add_unuse(m
, thread
);
1273 static int thread_process_io_helper(struct thread_master
*m
,
1274 struct thread
*thread
, short state
,
1275 short actual_state
, int pos
)
1277 struct thread
**thread_array
;
1280 * poll() clears the .events field, but the pollfd array we
1281 * pass to poll() is a copy of the one used to schedule threads.
1282 * We need to synchronize state between the two here by applying
1283 * the same changes poll() made on the copy of the "real" pollfd
1286 * This cleans up a possible infinite loop where we refuse
1287 * to respond to a poll event but poll is insistent that
1290 m
->handler
.pfds
[pos
].events
&= ~(state
);
1293 if ((actual_state
& (POLLHUP
|POLLIN
)) != POLLHUP
)
1294 flog_err(EC_LIB_NO_THREAD
,
1295 "Attempting to process an I/O event but for fd: %d(%d) no thread to handle this!\n",
1296 m
->handler
.pfds
[pos
].fd
, actual_state
);
1300 if (thread
->type
== THREAD_READ
)
1301 thread_array
= m
->read
;
1303 thread_array
= m
->write
;
1305 thread_array
[thread
->u
.fd
] = NULL
;
1306 thread_list_add_tail(&m
->ready
, thread
);
1307 thread
->type
= THREAD_READY
;
1313 * Process I/O events.
1315 * Walks through file descriptor array looking for those pollfds whose .revents
1316 * field has something interesting. Deletes any invalid file descriptors.
1318 * @param m the thread master
1319 * @param num the number of active file descriptors (return value of poll())
1321 static void thread_process_io(struct thread_master
*m
, unsigned int num
)
1323 unsigned int ready
= 0;
1324 struct pollfd
*pfds
= m
->handler
.copy
;
1326 for (nfds_t i
= 0; i
< m
->handler
.copycount
&& ready
< num
; ++i
) {
1327 /* no event for current fd? immediately continue */
1328 if (pfds
[i
].revents
== 0)
1333 /* Unless someone has called thread_cancel from another pthread,
1335 * thing that could have changed in m->handler.pfds while we
1337 * asleep is the .events field in a given pollfd. Barring
1339 * that value should be a superset of the values we have in our
1341 * there's no need to update it. Similarily, barring deletion,
1343 * should still be a valid index into the master's pfds. */
1344 if (pfds
[i
].revents
& (POLLIN
| POLLHUP
)) {
1345 thread_process_io_helper(m
, m
->read
[pfds
[i
].fd
], POLLIN
,
1346 pfds
[i
].revents
, i
);
1348 if (pfds
[i
].revents
& POLLOUT
)
1349 thread_process_io_helper(m
, m
->write
[pfds
[i
].fd
],
1350 POLLOUT
, pfds
[i
].revents
, i
);
1352 /* if one of our file descriptors is garbage, remove the same
1354 * both pfds + update sizes and index */
1355 if (pfds
[i
].revents
& POLLNVAL
) {
1356 memmove(m
->handler
.pfds
+ i
, m
->handler
.pfds
+ i
+ 1,
1357 (m
->handler
.pfdcount
- i
- 1)
1358 * sizeof(struct pollfd
));
1359 m
->handler
.pfdcount
--;
1361 memmove(pfds
+ i
, pfds
+ i
+ 1,
1362 (m
->handler
.copycount
- i
- 1)
1363 * sizeof(struct pollfd
));
1364 m
->handler
.copycount
--;
1371 /* Add all timers that have popped to the ready list. */
1372 static unsigned int thread_process_timers(struct pqueue
*queue
,
1373 struct timeval
*timenow
)
1375 struct thread
*thread
;
1376 unsigned int ready
= 0;
1378 while (queue
->size
) {
1379 thread
= queue
->array
[0];
1380 if (timercmp(timenow
, &thread
->u
.sands
, <))
1382 pqueue_dequeue(queue
);
1383 thread
->type
= THREAD_READY
;
1384 thread_list_add_tail(&thread
->master
->ready
, thread
);
1390 /* process a list en masse, e.g. for event thread lists */
1391 static unsigned int thread_process(struct thread_list_head
*list
)
1393 struct thread
*thread
;
1394 unsigned int ready
= 0;
1396 while ((thread
= thread_list_pop(list
))) {
1397 thread
->type
= THREAD_READY
;
1398 thread_list_add_tail(&thread
->master
->ready
, thread
);
1405 /* Fetch next ready thread. */
1406 struct thread
*thread_fetch(struct thread_master
*m
, struct thread
*fetch
)
1408 struct thread
*thread
= NULL
;
1410 struct timeval zerotime
= {0, 0};
1412 struct timeval
*tw
= NULL
;
1417 /* Handle signals if any */
1418 if (m
->handle_signals
)
1419 quagga_sigevent_process();
1421 pthread_mutex_lock(&m
->mtx
);
1423 /* Process any pending cancellation requests */
1424 do_thread_cancel(m
);
1427 * Attempt to flush ready queue before going into poll().
1428 * This is performance-critical. Think twice before modifying.
1430 if ((thread
= thread_list_pop(&m
->ready
))) {
1431 fetch
= thread_run(m
, thread
, fetch
);
1434 pthread_mutex_unlock(&m
->mtx
);
1438 /* otherwise, tick through scheduling sequence */
1441 * Post events to ready queue. This must come before the
1442 * following block since events should occur immediately
1444 thread_process(&m
->event
);
1447 * If there are no tasks on the ready queue, we will poll()
1448 * until a timer expires or we receive I/O, whichever comes
1449 * first. The strategy for doing this is:
1451 * - If there are events pending, set the poll() timeout to zero
1452 * - If there are no events pending, but there are timers
1454 * timeout to the smallest remaining time on any timer
1455 * - If there are neither timers nor events pending, but there
1457 * descriptors pending, block indefinitely in poll()
1458 * - If nothing is pending, it's time for the application to die
1460 * In every case except the last, we need to hit poll() at least
1461 * once per loop to avoid starvation by events
1463 if (!thread_list_count(&m
->ready
))
1464 tw
= thread_timer_wait(m
->timer
, &tv
);
1466 if (thread_list_count(&m
->ready
) ||
1467 (tw
&& !timercmp(tw
, &zerotime
, >)))
1470 if (!tw
&& m
->handler
.pfdcount
== 0) { /* die */
1471 pthread_mutex_unlock(&m
->mtx
);
1477 * Copy pollfd array + # active pollfds in it. Not necessary to
1478 * copy the array size as this is fixed.
1480 m
->handler
.copycount
= m
->handler
.pfdcount
;
1481 memcpy(m
->handler
.copy
, m
->handler
.pfds
,
1482 m
->handler
.copycount
* sizeof(struct pollfd
));
1484 pthread_mutex_unlock(&m
->mtx
);
1486 num
= fd_poll(m
, m
->handler
.copy
, m
->handler
.pfdsize
,
1487 m
->handler
.copycount
, tw
);
1489 pthread_mutex_lock(&m
->mtx
);
1491 /* Handle any errors received in poll() */
1493 if (errno
== EINTR
) {
1494 pthread_mutex_unlock(&m
->mtx
);
1495 /* loop around to signal handler */
1500 flog_err(EC_LIB_SYSTEM_CALL
, "poll() error: %s",
1501 safe_strerror(errno
));
1502 pthread_mutex_unlock(&m
->mtx
);
1507 /* Post timers to ready queue. */
1509 thread_process_timers(m
->timer
, &now
);
1511 /* Post I/O to ready queue. */
1513 thread_process_io(m
, num
);
1515 pthread_mutex_unlock(&m
->mtx
);
1517 } while (!thread
&& m
->spin
);
1522 static unsigned long timeval_elapsed(struct timeval a
, struct timeval b
)
1524 return (((a
.tv_sec
- b
.tv_sec
) * TIMER_SECOND_MICRO
)
1525 + (a
.tv_usec
- b
.tv_usec
));
1528 unsigned long thread_consumed_time(RUSAGE_T
*now
, RUSAGE_T
*start
,
1529 unsigned long *cputime
)
1531 /* This is 'user + sys' time. */
1532 *cputime
= timeval_elapsed(now
->cpu
.ru_utime
, start
->cpu
.ru_utime
)
1533 + timeval_elapsed(now
->cpu
.ru_stime
, start
->cpu
.ru_stime
);
1534 return timeval_elapsed(now
->real
, start
->real
);
1537 /* We should aim to yield after yield milliseconds, which defaults
1538 to THREAD_YIELD_TIME_SLOT .
1539 Note: we are using real (wall clock) time for this calculation.
1540 It could be argued that CPU time may make more sense in certain
1541 contexts. The things to consider are whether the thread may have
1542 blocked (in which case wall time increases, but CPU time does not),
1543 or whether the system is heavily loaded with other processes competing
1544 for CPU time. On balance, wall clock time seems to make sense.
1545 Plus it has the added benefit that gettimeofday should be faster
1546 than calling getrusage. */
1547 int thread_should_yield(struct thread
*thread
)
1550 pthread_mutex_lock(&thread
->mtx
);
1552 result
= monotime_since(&thread
->real
, NULL
)
1553 > (int64_t)thread
->yield
;
1555 pthread_mutex_unlock(&thread
->mtx
);
1559 void thread_set_yield_time(struct thread
*thread
, unsigned long yield_time
)
1561 pthread_mutex_lock(&thread
->mtx
);
1563 thread
->yield
= yield_time
;
1565 pthread_mutex_unlock(&thread
->mtx
);
1568 void thread_getrusage(RUSAGE_T
*r
)
1570 #if defined RUSAGE_THREAD
1571 #define FRR_RUSAGE RUSAGE_THREAD
1573 #define FRR_RUSAGE RUSAGE_SELF
1576 getrusage(FRR_RUSAGE
, &(r
->cpu
));
1582 * This function will atomically update the thread's usage history. At present
1583 * this is the only spot where usage history is written. Nevertheless the code
1584 * has been written such that the introduction of writers in the future should
1585 * not need to update it provided the writers atomically perform only the
1586 * operations done here, i.e. updating the total and maximum times. In
1587 * particular, the maximum real and cpu times must be monotonically increasing
1588 * or this code is not correct.
1590 void thread_call(struct thread
*thread
)
1592 _Atomic
unsigned long realtime
, cputime
;
1594 unsigned long helper
;
1595 RUSAGE_T before
, after
;
1598 thread
->real
= before
.real
;
1600 pthread_setspecific(thread_current
, thread
);
1601 (*thread
->func
)(thread
);
1602 pthread_setspecific(thread_current
, NULL
);
1606 realtime
= thread_consumed_time(&after
, &before
, &helper
);
1609 /* update realtime */
1610 atomic_fetch_add_explicit(&thread
->hist
->real
.total
, realtime
,
1611 memory_order_seq_cst
);
1612 exp
= atomic_load_explicit(&thread
->hist
->real
.max
,
1613 memory_order_seq_cst
);
1614 while (exp
< realtime
1615 && !atomic_compare_exchange_weak_explicit(
1616 &thread
->hist
->real
.max
, &exp
, realtime
,
1617 memory_order_seq_cst
, memory_order_seq_cst
))
1620 /* update cputime */
1621 atomic_fetch_add_explicit(&thread
->hist
->cpu
.total
, cputime
,
1622 memory_order_seq_cst
);
1623 exp
= atomic_load_explicit(&thread
->hist
->cpu
.max
,
1624 memory_order_seq_cst
);
1625 while (exp
< cputime
1626 && !atomic_compare_exchange_weak_explicit(
1627 &thread
->hist
->cpu
.max
, &exp
, cputime
,
1628 memory_order_seq_cst
, memory_order_seq_cst
))
1631 atomic_fetch_add_explicit(&thread
->hist
->total_calls
, 1,
1632 memory_order_seq_cst
);
1633 atomic_fetch_or_explicit(&thread
->hist
->types
, 1 << thread
->add_type
,
1634 memory_order_seq_cst
);
1636 #ifdef CONSUMED_TIME_CHECK
1637 if (realtime
> CONSUMED_TIME_CHECK
) {
1639 * We have a CPU Hog on our hands.
1640 * Whinge about it now, so we're aware this is yet another task
1645 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1646 thread
->funcname
, (unsigned long)thread
->func
,
1647 realtime
/ 1000, cputime
/ 1000);
1649 #endif /* CONSUMED_TIME_CHECK */
1652 /* Execute thread */
1653 void funcname_thread_execute(struct thread_master
*m
,
1654 int (*func
)(struct thread
*), void *arg
, int val
,
1657 struct thread
*thread
;
1659 /* Get or allocate new thread to execute. */
1660 pthread_mutex_lock(&m
->mtx
);
1662 thread
= thread_get(m
, THREAD_EVENT
, func
, arg
, debugargpass
);
1664 /* Set its event value. */
1665 pthread_mutex_lock(&thread
->mtx
);
1667 thread
->add_type
= THREAD_EXECUTE
;
1668 thread
->u
.val
= val
;
1669 thread
->ref
= &thread
;
1671 pthread_mutex_unlock(&thread
->mtx
);
1673 pthread_mutex_unlock(&m
->mtx
);
1675 /* Execute thread doing all accounting. */
1676 thread_call(thread
);
1678 /* Give back or free thread. */
1679 thread_add_unuse(m
, thread
);