]> git.proxmox.com Git - mirror_frr.git/blob - lib/thread.c
debian: add pkg-config to build-depends
[mirror_frr.git] / lib / thread.c
1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with GNU Zebra; see the file COPYING. If not, write to the Free
18 * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
19 * 02111-1307, USA.
20 */
21
22 /* #define DEBUG */
23
24 #include <zebra.h>
25 #include <sys/resource.h>
26
27 #include "thread.h"
28 #include "memory.h"
29 #include "log.h"
30 #include "hash.h"
31 #include "pqueue.h"
32 #include "command.h"
33 #include "sigevent.h"
34
35 DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
36 DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
37 DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
38
39 #if defined(__APPLE__)
40 #include <mach/mach.h>
41 #include <mach/mach_time.h>
42 #endif
43
44 /* Relative time, since startup */
45 static struct hash *cpu_record = NULL;
46
47 static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
48 {
49 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
50 + (a.tv_usec - b.tv_usec));
51 }
52
53 static unsigned int cpu_record_hash_key(struct cpu_thread_history *a)
54 {
55 return (uintptr_t)a->func;
56 }
57
58 static int cpu_record_hash_cmp(const struct cpu_thread_history *a,
59 const struct cpu_thread_history *b)
60 {
61 return a->func == b->func;
62 }
63
64 static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
65 {
66 struct cpu_thread_history *new;
67 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
68 new->func = a->func;
69 new->funcname = a->funcname;
70 return new;
71 }
72
73 static void cpu_record_hash_free(void *a)
74 {
75 struct cpu_thread_history *hist = a;
76
77 XFREE(MTYPE_THREAD_STATS, hist);
78 }
79
80 static void vty_out_cpu_thread_history(struct vty *vty,
81 struct cpu_thread_history *a)
82 {
83 vty_out(vty, "%5d %10ld.%03ld %9d %8ld %9ld %8ld %9ld", a->total_active,
84 a->cpu.total / 1000, a->cpu.total % 1000, a->total_calls,
85 a->cpu.total / a->total_calls, a->cpu.max,
86 a->real.total / a->total_calls, a->real.max);
87 vty_out(vty, " %c%c%c%c%c%c %s%s",
88 a->types & (1 << THREAD_READ) ? 'R' : ' ',
89 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
90 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
91 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
92 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ',
93 a->types & (1 << THREAD_BACKGROUND) ? 'B' : ' ', a->funcname,
94 VTY_NEWLINE);
95 }
96
97 static void cpu_record_hash_print(struct hash_backet *bucket, void *args[])
98 {
99 struct cpu_thread_history *totals = args[0];
100 struct vty *vty = args[1];
101 thread_type *filter = args[2];
102 struct cpu_thread_history *a = bucket->data;
103
104 if (!(a->types & *filter))
105 return;
106 vty_out_cpu_thread_history(vty, a);
107 totals->total_active += a->total_active;
108 totals->total_calls += a->total_calls;
109 totals->real.total += a->real.total;
110 if (totals->real.max < a->real.max)
111 totals->real.max = a->real.max;
112 totals->cpu.total += a->cpu.total;
113 if (totals->cpu.max < a->cpu.max)
114 totals->cpu.max = a->cpu.max;
115 }
116
117 static void cpu_record_print(struct vty *vty, thread_type filter)
118 {
119 struct cpu_thread_history tmp;
120 void *args[3] = {&tmp, vty, &filter};
121
122 memset(&tmp, 0, sizeof tmp);
123 tmp.funcname = "TOTAL";
124 tmp.types = filter;
125
126 vty_out(vty, "%21s %18s %18s%s", "",
127 "CPU (user+system):", "Real (wall-clock):", VTY_NEWLINE);
128 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
129 vty_out(vty, " Avg uSec Max uSecs");
130 vty_out(vty, " Type Thread%s", VTY_NEWLINE);
131 hash_iterate(cpu_record, (void (*)(struct hash_backet *,
132 void *))cpu_record_hash_print,
133 args);
134
135 if (tmp.total_calls > 0)
136 vty_out_cpu_thread_history(vty, &tmp);
137 }
138
139 DEFUN (show_thread_cpu,
140 show_thread_cpu_cmd,
141 "show thread cpu [FILTER]",
142 SHOW_STR
143 "Thread information\n"
144 "Thread CPU usage\n"
145 "Display filter (rwtexb)\n")
146 {
147 int idx_filter = 3;
148 int i = 0;
149 thread_type filter = (thread_type)-1U;
150
151 if (argc > 3) {
152 filter = 0;
153 while (argv[idx_filter]->arg[i] != '\0') {
154 switch (argv[idx_filter]->arg[i]) {
155 case 'r':
156 case 'R':
157 filter |= (1 << THREAD_READ);
158 break;
159 case 'w':
160 case 'W':
161 filter |= (1 << THREAD_WRITE);
162 break;
163 case 't':
164 case 'T':
165 filter |= (1 << THREAD_TIMER);
166 break;
167 case 'e':
168 case 'E':
169 filter |= (1 << THREAD_EVENT);
170 break;
171 case 'x':
172 case 'X':
173 filter |= (1 << THREAD_EXECUTE);
174 break;
175 case 'b':
176 case 'B':
177 filter |= (1 << THREAD_BACKGROUND);
178 break;
179 default:
180 break;
181 }
182 ++i;
183 }
184 if (filter == 0) {
185 vty_out(vty,
186 "Invalid filter \"%s\" specified,"
187 " must contain at least one of 'RWTEXB'%s",
188 argv[idx_filter]->arg, VTY_NEWLINE);
189 return CMD_WARNING;
190 }
191 }
192
193 cpu_record_print(vty, filter);
194 return CMD_SUCCESS;
195 }
196
197 static void cpu_record_hash_clear(struct hash_backet *bucket, void *args)
198 {
199 thread_type *filter = args;
200 struct cpu_thread_history *a = bucket->data;
201
202 if (!(a->types & *filter))
203 return;
204
205 hash_release(cpu_record, bucket->data);
206 }
207
208 static void cpu_record_clear(thread_type filter)
209 {
210 thread_type *tmp = &filter;
211 hash_iterate(cpu_record, (void (*)(struct hash_backet *,
212 void *))cpu_record_hash_clear,
213 tmp);
214 }
215
216 DEFUN (clear_thread_cpu,
217 clear_thread_cpu_cmd,
218 "clear thread cpu [FILTER]",
219 "Clear stored data\n"
220 "Thread information\n"
221 "Thread CPU usage\n"
222 "Display filter (rwtexb)\n")
223 {
224 int idx_filter = 3;
225 int i = 0;
226 thread_type filter = (thread_type)-1U;
227
228 if (argc > 3) {
229 filter = 0;
230 while (argv[idx_filter]->arg[i] != '\0') {
231 switch (argv[idx_filter]->arg[i]) {
232 case 'r':
233 case 'R':
234 filter |= (1 << THREAD_READ);
235 break;
236 case 'w':
237 case 'W':
238 filter |= (1 << THREAD_WRITE);
239 break;
240 case 't':
241 case 'T':
242 filter |= (1 << THREAD_TIMER);
243 break;
244 case 'e':
245 case 'E':
246 filter |= (1 << THREAD_EVENT);
247 break;
248 case 'x':
249 case 'X':
250 filter |= (1 << THREAD_EXECUTE);
251 break;
252 case 'b':
253 case 'B':
254 filter |= (1 << THREAD_BACKGROUND);
255 break;
256 default:
257 break;
258 }
259 ++i;
260 }
261 if (filter == 0) {
262 vty_out(vty,
263 "Invalid filter \"%s\" specified,"
264 " must contain at least one of 'RWTEXB'%s",
265 argv[idx_filter]->arg, VTY_NEWLINE);
266 return CMD_WARNING;
267 }
268 }
269
270 cpu_record_clear(filter);
271 return CMD_SUCCESS;
272 }
273
274 void thread_cmd_init(void)
275 {
276 install_element(VIEW_NODE, &show_thread_cpu_cmd);
277 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
278 }
279
280 static int thread_timer_cmp(void *a, void *b)
281 {
282 struct thread *thread_a = a;
283 struct thread *thread_b = b;
284
285 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, <))
286 return -1;
287 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, >))
288 return 1;
289 return 0;
290 }
291
292 static void thread_timer_update(void *node, int actual_position)
293 {
294 struct thread *thread = node;
295
296 thread->index = actual_position;
297 }
298
299 /* Allocate new thread master. */
300 struct thread_master *thread_master_create(void)
301 {
302 struct thread_master *rv;
303 struct rlimit limit;
304
305 getrlimit(RLIMIT_NOFILE, &limit);
306
307 if (cpu_record == NULL)
308 cpu_record = hash_create(
309 (unsigned int (*)(void *))cpu_record_hash_key,
310 (int (*)(const void *,
311 const void *))cpu_record_hash_cmp);
312
313 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
314 if (rv == NULL) {
315 return NULL;
316 }
317
318 rv->fd_limit = (int)limit.rlim_cur;
319 rv->read =
320 XCALLOC(MTYPE_THREAD, sizeof(struct thread *) * rv->fd_limit);
321 if (rv->read == NULL) {
322 XFREE(MTYPE_THREAD_MASTER, rv);
323 return NULL;
324 }
325
326 rv->write =
327 XCALLOC(MTYPE_THREAD, sizeof(struct thread *) * rv->fd_limit);
328 if (rv->write == NULL) {
329 XFREE(MTYPE_THREAD, rv->read);
330 XFREE(MTYPE_THREAD_MASTER, rv);
331 return NULL;
332 }
333
334 /* Initialize the timer queues */
335 rv->timer = pqueue_create();
336 rv->background = pqueue_create();
337 rv->timer->cmp = rv->background->cmp = thread_timer_cmp;
338 rv->timer->update = rv->background->update = thread_timer_update;
339
340 #if defined(HAVE_POLL_CALL)
341 rv->handler.pfdsize = rv->fd_limit;
342 rv->handler.pfdcount = 0;
343 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
344 sizeof(struct pollfd) * rv->handler.pfdsize);
345 #endif
346 return rv;
347 }
348
349 /* Add a new thread to the list. */
350 static void thread_list_add(struct thread_list *list, struct thread *thread)
351 {
352 thread->next = NULL;
353 thread->prev = list->tail;
354 if (list->tail)
355 list->tail->next = thread;
356 else
357 list->head = thread;
358 list->tail = thread;
359 list->count++;
360 }
361
362 /* Delete a thread from the list. */
363 static struct thread *thread_list_delete(struct thread_list *list,
364 struct thread *thread)
365 {
366 if (thread->next)
367 thread->next->prev = thread->prev;
368 else
369 list->tail = thread->prev;
370 if (thread->prev)
371 thread->prev->next = thread->next;
372 else
373 list->head = thread->next;
374 thread->next = thread->prev = NULL;
375 list->count--;
376 return thread;
377 }
378
379 static void thread_delete_fd(struct thread **thread_array,
380 struct thread *thread)
381 {
382 thread_array[thread->u.fd] = NULL;
383 }
384
385 static void thread_add_fd(struct thread **thread_array, struct thread *thread)
386 {
387 thread_array[thread->u.fd] = thread;
388 }
389
390 /* Thread list is empty or not. */
391 static int thread_empty(struct thread_list *list)
392 {
393 return list->head ? 0 : 1;
394 }
395
396 /* Delete top of the list and return it. */
397 static struct thread *thread_trim_head(struct thread_list *list)
398 {
399 if (!thread_empty(list))
400 return thread_list_delete(list, list->head);
401 return NULL;
402 }
403
404 /* Move thread to unuse list. */
405 static void thread_add_unuse(struct thread_master *m, struct thread *thread)
406 {
407 assert(m != NULL && thread != NULL);
408 assert(thread->next == NULL);
409 assert(thread->prev == NULL);
410
411 thread->type = THREAD_UNUSED;
412 thread->hist->total_active--;
413 thread_list_add(&m->unuse, thread);
414 }
415
416 /* Free all unused thread. */
417 static void thread_list_free(struct thread_master *m, struct thread_list *list)
418 {
419 struct thread *t;
420 struct thread *next;
421
422 for (t = list->head; t; t = next) {
423 next = t->next;
424 XFREE(MTYPE_THREAD, t);
425 list->count--;
426 m->alloc--;
427 }
428 }
429
430 static void thread_array_free(struct thread_master *m,
431 struct thread **thread_array)
432 {
433 struct thread *t;
434 int index;
435
436 for (index = 0; index < m->fd_limit; ++index) {
437 t = thread_array[index];
438 if (t) {
439 thread_array[index] = NULL;
440 XFREE(MTYPE_THREAD, t);
441 m->alloc--;
442 }
443 }
444 XFREE(MTYPE_THREAD, thread_array);
445 }
446
447 static void thread_queue_free(struct thread_master *m, struct pqueue *queue)
448 {
449 int i;
450
451 for (i = 0; i < queue->size; i++)
452 XFREE(MTYPE_THREAD, queue->array[i]);
453
454 m->alloc -= queue->size;
455 pqueue_delete(queue);
456 }
457
458 /*
459 * thread_master_free_unused
460 *
461 * As threads are finished with they are put on the
462 * unuse list for later reuse.
463 * If we are shutting down, Free up unused threads
464 * So we can see if we forget to shut anything off
465 */
466 void thread_master_free_unused(struct thread_master *m)
467 {
468 struct thread *t;
469 while ((t = thread_trim_head(&m->unuse)) != NULL) {
470 XFREE(MTYPE_THREAD, t);
471 }
472 }
473
474 /* Stop thread scheduler. */
475 void thread_master_free(struct thread_master *m)
476 {
477 thread_array_free(m, m->read);
478 thread_array_free(m, m->write);
479 thread_queue_free(m, m->timer);
480 thread_list_free(m, &m->event);
481 thread_list_free(m, &m->ready);
482 thread_list_free(m, &m->unuse);
483 thread_queue_free(m, m->background);
484
485 #if defined(HAVE_POLL_CALL)
486 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
487 #endif
488 XFREE(MTYPE_THREAD_MASTER, m);
489
490 if (cpu_record) {
491 hash_clean(cpu_record, cpu_record_hash_free);
492 hash_free(cpu_record);
493 cpu_record = NULL;
494 }
495 }
496
497 /* Return remain time in second. */
498 unsigned long thread_timer_remain_second(struct thread *thread)
499 {
500 int64_t remain = monotime_until(&thread->u.sands, NULL) / 1000000LL;
501 return remain < 0 ? 0 : remain;
502 }
503
504 #define debugargdef const char *funcname, const char *schedfrom, int fromln
505 #define debugargpass funcname, schedfrom, fromln
506
507 struct timeval thread_timer_remain(struct thread *thread)
508 {
509 struct timeval remain;
510 monotime_until(&thread->u.sands, &remain);
511 return remain;
512 }
513
514 /* Get new thread. */
515 static struct thread *thread_get(struct thread_master *m, u_char type,
516 int (*func)(struct thread *), void *arg,
517 debugargdef)
518 {
519 struct thread *thread = thread_trim_head(&m->unuse);
520 struct cpu_thread_history tmp;
521
522 if (!thread) {
523 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
524 m->alloc++;
525 }
526 thread->type = type;
527 thread->add_type = type;
528 thread->master = m;
529 thread->arg = arg;
530 thread->index = -1;
531 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
532
533 /*
534 * So if the passed in funcname is not what we have
535 * stored that means the thread->hist needs to be
536 * updated. We keep the last one around in unused
537 * under the assumption that we are probably
538 * going to immediately allocate the same
539 * type of thread.
540 * This hopefully saves us some serious
541 * hash_get lookups.
542 */
543 if (thread->funcname != funcname || thread->func != func) {
544 tmp.func = func;
545 tmp.funcname = funcname;
546 thread->hist =
547 hash_get(cpu_record, &tmp,
548 (void *(*)(void *))cpu_record_hash_alloc);
549 }
550 thread->hist->total_active++;
551 thread->func = func;
552 thread->funcname = funcname;
553 thread->schedfrom = schedfrom;
554 thread->schedfrom_line = fromln;
555
556 return thread;
557 }
558
559 #if defined(HAVE_POLL_CALL)
560
561 #define fd_copy_fd_set(X) (X)
562
563 /* generic add thread function */
564 static struct thread *generic_thread_add(struct thread_master *m,
565 int (*func)(struct thread *),
566 void *arg, int fd, int dir,
567 debugargdef)
568 {
569 struct thread *thread;
570
571 u_char type;
572 short int event;
573
574 if (dir == THREAD_READ) {
575 event = (POLLIN | POLLHUP);
576 type = THREAD_READ;
577 } else {
578 event = (POLLOUT | POLLHUP);
579 type = THREAD_WRITE;
580 }
581
582 nfds_t queuepos = m->handler.pfdcount;
583 nfds_t i = 0;
584 for (i = 0; i < m->handler.pfdcount; i++)
585 if (m->handler.pfds[i].fd == fd) {
586 queuepos = i;
587 break;
588 }
589
590 /* is there enough space for a new fd? */
591 assert(queuepos < m->handler.pfdsize);
592
593 thread = thread_get(m, type, func, arg, debugargpass);
594 m->handler.pfds[queuepos].fd = fd;
595 m->handler.pfds[queuepos].events |= event;
596 if (queuepos == m->handler.pfdcount)
597 m->handler.pfdcount++;
598
599 return thread;
600 }
601 #else
602
603 #define fd_copy_fd_set(X) (X)
604 #endif
605
606 static int fd_select(struct thread_master *m, int size, thread_fd_set *read,
607 thread_fd_set *write, thread_fd_set *except,
608 struct timeval *timer_wait)
609 {
610 int num;
611 #if defined(HAVE_POLL_CALL)
612 /* recalc timeout for poll. Attention NULL pointer is no timeout with
613 select, where with poll no timeount is -1 */
614 int timeout = -1;
615 if (timer_wait != NULL)
616 timeout = (timer_wait->tv_sec * 1000)
617 + (timer_wait->tv_usec / 1000);
618
619 num = poll(m->handler.pfds,
620 m->handler.pfdcount + m->handler.pfdcountsnmp, timeout);
621 #else
622 num = select(size, read, write, except, timer_wait);
623 #endif
624
625 return num;
626 }
627
628 static int fd_is_set(struct thread *thread, thread_fd_set *fdset, int pos)
629 {
630 #if defined(HAVE_POLL_CALL)
631 return 1;
632 #else
633 return FD_ISSET(THREAD_FD(thread), fdset);
634 #endif
635 }
636
637 static int fd_clear_read_write(struct thread *thread)
638 {
639 #if !defined(HAVE_POLL_CALL)
640 thread_fd_set *fdset = NULL;
641 int fd = THREAD_FD(thread);
642
643 if (thread->type == THREAD_READ)
644 fdset = &thread->master->handler.readfd;
645 else
646 fdset = &thread->master->handler.writefd;
647
648 if (!FD_ISSET(fd, fdset))
649 return 0;
650
651 FD_CLR(fd, fdset);
652 #endif
653 return 1;
654 }
655
656 /* Add new read thread. */
657 struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
658 int (*func)(struct thread *),
659 void *arg, int fd, debugargdef)
660 {
661 struct thread *thread = NULL;
662
663 #if !defined(HAVE_POLL_CALL)
664 thread_fd_set *fdset = NULL;
665 if (dir == THREAD_READ)
666 fdset = &m->handler.readfd;
667 else
668 fdset = &m->handler.writefd;
669 #endif
670
671 #if defined(HAVE_POLL_CALL)
672 thread = generic_thread_add(m, func, arg, fd, dir, debugargpass);
673
674 if (thread == NULL)
675 return NULL;
676 #else
677 if (FD_ISSET(fd, fdset)) {
678 zlog_warn("There is already %s fd [%d]",
679 (dir == THREAD_READ) ? "read" : "write", fd);
680 return NULL;
681 }
682
683 FD_SET(fd, fdset);
684 thread = thread_get(m, dir, func, arg, debugargpass);
685 #endif
686
687 thread->u.fd = fd;
688 if (dir == THREAD_READ)
689 thread_add_fd(m->read, thread);
690 else
691 thread_add_fd(m->write, thread);
692
693 return thread;
694 }
695
696 static struct thread *funcname_thread_add_timer_timeval(
697 struct thread_master *m, int (*func)(struct thread *), int type,
698 void *arg, struct timeval *time_relative, debugargdef)
699 {
700 struct thread *thread;
701 struct pqueue *queue;
702
703 assert(m != NULL);
704
705 assert(type == THREAD_TIMER || type == THREAD_BACKGROUND);
706 assert(time_relative);
707
708 queue = ((type == THREAD_TIMER) ? m->timer : m->background);
709 thread = thread_get(m, type, func, arg, debugargpass);
710
711 monotime(&thread->u.sands);
712 timeradd(&thread->u.sands, time_relative, &thread->u.sands);
713
714 pqueue_enqueue(thread, queue);
715 return thread;
716 }
717
718
719 /* Add timer event thread. */
720 struct thread *funcname_thread_add_timer(struct thread_master *m,
721 int (*func)(struct thread *),
722 void *arg, long timer, debugargdef)
723 {
724 struct timeval trel;
725
726 assert(m != NULL);
727
728 trel.tv_sec = timer;
729 trel.tv_usec = 0;
730
731 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
732 &trel, debugargpass);
733 }
734
735 /* Add timer event thread with "millisecond" resolution */
736 struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
737 int (*func)(struct thread *),
738 void *arg, long timer,
739 debugargdef)
740 {
741 struct timeval trel;
742
743 assert(m != NULL);
744
745 trel.tv_sec = timer / 1000;
746 trel.tv_usec = 1000 * (timer % 1000);
747
748 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
749 &trel, debugargpass);
750 }
751
752 /* Add timer event thread with "millisecond" resolution */
753 struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
754 int (*func)(struct thread *),
755 void *arg, struct timeval *tv,
756 debugargdef)
757 {
758 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
759 debugargpass);
760 }
761
762 /* Add a background thread, with an optional millisec delay */
763 struct thread *funcname_thread_add_background(struct thread_master *m,
764 int (*func)(struct thread *),
765 void *arg, long delay,
766 debugargdef)
767 {
768 struct timeval trel;
769
770 assert(m != NULL);
771
772 if (delay) {
773 trel.tv_sec = delay / 1000;
774 trel.tv_usec = 1000 * (delay % 1000);
775 } else {
776 trel.tv_sec = 0;
777 trel.tv_usec = 0;
778 }
779
780 return funcname_thread_add_timer_timeval(m, func, THREAD_BACKGROUND,
781 arg, &trel, debugargpass);
782 }
783
784 /* Add simple event thread. */
785 struct thread *funcname_thread_add_event(struct thread_master *m,
786 int (*func)(struct thread *),
787 void *arg, int val, debugargdef)
788 {
789 struct thread *thread;
790
791 assert(m != NULL);
792
793 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
794 thread->u.val = val;
795 thread_list_add(&m->event, thread);
796
797 return thread;
798 }
799
800 static void thread_cancel_read_or_write(struct thread *thread, short int state)
801 {
802 #if defined(HAVE_POLL_CALL)
803 nfds_t i;
804
805 for (i = 0; i < thread->master->handler.pfdcount; ++i)
806 if (thread->master->handler.pfds[i].fd == thread->u.fd) {
807 thread->master->handler.pfds[i].events &= ~(state);
808
809 /* remove thread fds from pfd list */
810 if (thread->master->handler.pfds[i].events == 0) {
811 memmove(thread->master->handler.pfds + i,
812 thread->master->handler.pfds + i + 1,
813 (thread->master->handler.pfdsize - i
814 - 1) * sizeof(struct pollfd));
815 thread->master->handler.pfdcount--;
816 return;
817 }
818 }
819 #endif
820
821 fd_clear_read_write(thread);
822 }
823
824 /* Cancel thread from scheduler. */
825 void thread_cancel(struct thread *thread)
826 {
827 struct thread_list *list = NULL;
828 struct pqueue *queue = NULL;
829 struct thread **thread_array = NULL;
830
831 switch (thread->type) {
832 case THREAD_READ:
833 #if defined(HAVE_POLL_CALL)
834 thread_cancel_read_or_write(thread, POLLIN | POLLHUP);
835 #else
836 thread_cancel_read_or_write(thread, 0);
837 #endif
838 thread_array = thread->master->read;
839 break;
840 case THREAD_WRITE:
841 #if defined(HAVE_POLL_CALL)
842 thread_cancel_read_or_write(thread, POLLOUT | POLLHUP);
843 #else
844 thread_cancel_read_or_write(thread, 0);
845 #endif
846 thread_array = thread->master->write;
847 break;
848 case THREAD_TIMER:
849 queue = thread->master->timer;
850 break;
851 case THREAD_EVENT:
852 list = &thread->master->event;
853 break;
854 case THREAD_READY:
855 list = &thread->master->ready;
856 break;
857 case THREAD_BACKGROUND:
858 queue = thread->master->background;
859 break;
860 default:
861 return;
862 break;
863 }
864
865 if (queue) {
866 assert(thread->index >= 0);
867 assert(thread == queue->array[thread->index]);
868 pqueue_remove_at(thread->index, queue);
869 } else if (list) {
870 thread_list_delete(list, thread);
871 } else if (thread_array) {
872 thread_delete_fd(thread_array, thread);
873 } else {
874 assert(!"Thread should be either in queue or list or array!");
875 }
876
877 thread_add_unuse(thread->master, thread);
878 }
879
880 /* Delete all events which has argument value arg. */
881 unsigned int thread_cancel_event(struct thread_master *m, void *arg)
882 {
883 unsigned int ret = 0;
884 struct thread *thread;
885
886 thread = m->event.head;
887 while (thread) {
888 struct thread *t;
889
890 t = thread;
891 thread = t->next;
892
893 if (t->arg == arg) {
894 ret++;
895 thread_list_delete(&m->event, t);
896 thread_add_unuse(m, t);
897 }
898 }
899
900 /* thread can be on the ready list too */
901 thread = m->ready.head;
902 while (thread) {
903 struct thread *t;
904
905 t = thread;
906 thread = t->next;
907
908 if (t->arg == arg) {
909 ret++;
910 thread_list_delete(&m->ready, t);
911 thread_add_unuse(m, t);
912 }
913 }
914 return ret;
915 }
916
917 static struct timeval *thread_timer_wait(struct pqueue *queue,
918 struct timeval *timer_val)
919 {
920 if (queue->size) {
921 struct thread *next_timer = queue->array[0];
922 monotime_until(&next_timer->u.sands, timer_val);
923 return timer_val;
924 }
925 return NULL;
926 }
927
928 static struct thread *thread_run(struct thread_master *m, struct thread *thread,
929 struct thread *fetch)
930 {
931 *fetch = *thread;
932 thread_add_unuse(m, thread);
933 return fetch;
934 }
935
936 static int thread_process_fds_helper(struct thread_master *m,
937 struct thread *thread,
938 thread_fd_set *fdset, short int state,
939 int pos)
940 {
941 struct thread **thread_array;
942
943 if (!thread)
944 return 0;
945
946 if (thread->type == THREAD_READ)
947 thread_array = m->read;
948 else
949 thread_array = m->write;
950
951 if (fd_is_set(thread, fdset, pos)) {
952 fd_clear_read_write(thread);
953 thread_delete_fd(thread_array, thread);
954 thread_list_add(&m->ready, thread);
955 thread->type = THREAD_READY;
956 #if defined(HAVE_POLL_CALL)
957 thread->master->handler.pfds[pos].events &= ~(state);
958 #endif
959 return 1;
960 }
961 return 0;
962 }
963
964 #if defined(HAVE_POLL_CALL)
965
966 /* check poll events */
967 static void check_pollfds(struct thread_master *m, fd_set *readfd, int num)
968 {
969 nfds_t i = 0;
970 int ready = 0;
971 for (i = 0; i < m->handler.pfdcount && ready < num; ++i) {
972 /* no event for current fd? immideatly continue */
973 if (m->handler.pfds[i].revents == 0)
974 continue;
975
976 ready++;
977
978 /* POLLIN / POLLOUT process event */
979 if (m->handler.pfds[i].revents & (POLLIN | POLLHUP))
980 thread_process_fds_helper(
981 m, m->read[m->handler.pfds[i].fd], NULL, POLLIN,
982 i);
983 if (m->handler.pfds[i].revents & POLLOUT)
984 thread_process_fds_helper(
985 m, m->write[m->handler.pfds[i].fd], NULL,
986 POLLOUT, i);
987
988 /* remove fd from list on POLLNVAL */
989 if (m->handler.pfds[i].revents & POLLNVAL) {
990 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
991 (m->handler.pfdsize - i - 1)
992 * sizeof(struct pollfd));
993 m->handler.pfdcount--;
994 i--;
995 } else
996 m->handler.pfds[i].revents = 0;
997 }
998 }
999 #endif
1000
1001 static void thread_process_fds(struct thread_master *m, thread_fd_set *rset,
1002 thread_fd_set *wset, int num)
1003 {
1004 #if defined(HAVE_POLL_CALL)
1005 check_pollfds(m, rset, num);
1006 #else
1007 int ready = 0, index;
1008
1009 for (index = 0; index < m->fd_limit && ready < num; ++index) {
1010 ready += thread_process_fds_helper(m, m->read[index], rset, 0,
1011 0);
1012 ready += thread_process_fds_helper(m, m->write[index], wset, 0,
1013 0);
1014 }
1015 #endif
1016 }
1017
1018 /* Add all timers that have popped to the ready list. */
1019 static unsigned int thread_timer_process(struct pqueue *queue,
1020 struct timeval *timenow)
1021 {
1022 struct thread *thread;
1023 unsigned int ready = 0;
1024
1025 while (queue->size) {
1026 thread = queue->array[0];
1027 if (timercmp(timenow, &thread->u.sands, <))
1028 return ready;
1029 pqueue_dequeue(queue);
1030 thread->type = THREAD_READY;
1031 thread_list_add(&thread->master->ready, thread);
1032 ready++;
1033 }
1034 return ready;
1035 }
1036
1037 /* process a list en masse, e.g. for event thread lists */
1038 static unsigned int thread_process(struct thread_list *list)
1039 {
1040 struct thread *thread;
1041 struct thread *next;
1042 unsigned int ready = 0;
1043
1044 for (thread = list->head; thread; thread = next) {
1045 next = thread->next;
1046 thread_list_delete(list, thread);
1047 thread->type = THREAD_READY;
1048 thread_list_add(&thread->master->ready, thread);
1049 ready++;
1050 }
1051 return ready;
1052 }
1053
1054
1055 /* Fetch next ready thread. */
1056 struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
1057 {
1058 struct thread *thread;
1059 thread_fd_set readfd;
1060 thread_fd_set writefd;
1061 thread_fd_set exceptfd;
1062 struct timeval now;
1063 struct timeval timer_val = {.tv_sec = 0, .tv_usec = 0};
1064 struct timeval timer_val_bg;
1065 struct timeval *timer_wait = &timer_val;
1066 struct timeval *timer_wait_bg;
1067
1068 while (1) {
1069 int num = 0;
1070
1071 /* Signals pre-empt everything */
1072 quagga_sigevent_process();
1073
1074 /* Drain the ready queue of already scheduled jobs, before
1075 * scheduling
1076 * more.
1077 */
1078 if ((thread = thread_trim_head(&m->ready)) != NULL)
1079 return thread_run(m, thread, fetch);
1080
1081 /* To be fair to all kinds of threads, and avoid starvation, we
1082 * need to be careful to consider all thread types for
1083 * scheduling
1084 * in each quanta. I.e. we should not return early from here on.
1085 */
1086
1087 /* Normal event are the next highest priority. */
1088 thread_process(&m->event);
1089
1090 /* Structure copy. */
1091 #if !defined(HAVE_POLL_CALL)
1092 readfd = fd_copy_fd_set(m->handler.readfd);
1093 writefd = fd_copy_fd_set(m->handler.writefd);
1094 exceptfd = fd_copy_fd_set(m->handler.exceptfd);
1095 #endif
1096
1097 /* Calculate select wait timer if nothing else to do */
1098 if (m->ready.count == 0) {
1099 timer_wait = thread_timer_wait(m->timer, &timer_val);
1100 timer_wait_bg =
1101 thread_timer_wait(m->background, &timer_val_bg);
1102
1103 if (timer_wait_bg
1104 && (!timer_wait
1105 || (timercmp(timer_wait, timer_wait_bg, >))))
1106 timer_wait = timer_wait_bg;
1107 }
1108
1109 if (timer_wait && timer_wait->tv_sec < 0) {
1110 timerclear(&timer_val);
1111 timer_wait = &timer_val;
1112 }
1113
1114 num = fd_select(m, FD_SETSIZE, &readfd, &writefd, &exceptfd,
1115 timer_wait);
1116
1117 /* Signals should get quick treatment */
1118 if (num < 0) {
1119 if (errno == EINTR)
1120 continue; /* signal received - process it */
1121 zlog_warn("select() error: %s", safe_strerror(errno));
1122 return NULL;
1123 }
1124
1125 /* Check foreground timers. Historically, they have had higher
1126 priority than I/O threads, so let's push them onto the ready
1127 list in front of the I/O threads. */
1128 monotime(&now);
1129 thread_timer_process(m->timer, &now);
1130
1131 /* Got IO, process it */
1132 if (num > 0)
1133 thread_process_fds(m, &readfd, &writefd, num);
1134
1135 #if 0
1136 /* If any threads were made ready above (I/O or foreground timer),
1137 perhaps we should avoid adding background timers to the ready
1138 list at this time. If this is code is uncommented, then background
1139 timer threads will not run unless there is nothing else to do. */
1140 if ((thread = thread_trim_head (&m->ready)) != NULL)
1141 return thread_run (m, thread, fetch);
1142 #endif
1143
1144 /* Background timer/events, lowest priority */
1145 thread_timer_process(m->background, &now);
1146
1147 if ((thread = thread_trim_head(&m->ready)) != NULL)
1148 return thread_run(m, thread, fetch);
1149 }
1150 }
1151
1152 unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1153 unsigned long *cputime)
1154 {
1155 /* This is 'user + sys' time. */
1156 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1157 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1158 return timeval_elapsed(now->real, start->real);
1159 }
1160
1161 /* We should aim to yield after yield milliseconds, which defaults
1162 to THREAD_YIELD_TIME_SLOT .
1163 Note: we are using real (wall clock) time for this calculation.
1164 It could be argued that CPU time may make more sense in certain
1165 contexts. The things to consider are whether the thread may have
1166 blocked (in which case wall time increases, but CPU time does not),
1167 or whether the system is heavily loaded with other processes competing
1168 for CPU time. On balance, wall clock time seems to make sense.
1169 Plus it has the added benefit that gettimeofday should be faster
1170 than calling getrusage. */
1171 int thread_should_yield(struct thread *thread)
1172 {
1173 return monotime_since(&thread->real, NULL) > (int64_t)thread->yield;
1174 }
1175
1176 void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
1177 {
1178 thread->yield = yield_time;
1179 }
1180
1181 void thread_getrusage(RUSAGE_T *r)
1182 {
1183 monotime(&r->real);
1184 getrusage(RUSAGE_SELF, &(r->cpu));
1185 }
1186
1187 struct thread *thread_current = NULL;
1188
1189 /* We check thread consumed time. If the system has getrusage, we'll
1190 use that to get in-depth stats on the performance of the thread in addition
1191 to wall clock time stats from gettimeofday. */
1192 void thread_call(struct thread *thread)
1193 {
1194 unsigned long realtime, cputime;
1195 RUSAGE_T before, after;
1196
1197 GETRUSAGE(&before);
1198 thread->real = before.real;
1199
1200 thread_current = thread;
1201 (*thread->func)(thread);
1202 thread_current = NULL;
1203
1204 GETRUSAGE(&after);
1205
1206 realtime = thread_consumed_time(&after, &before, &cputime);
1207 thread->hist->real.total += realtime;
1208 if (thread->hist->real.max < realtime)
1209 thread->hist->real.max = realtime;
1210 thread->hist->cpu.total += cputime;
1211 if (thread->hist->cpu.max < cputime)
1212 thread->hist->cpu.max = cputime;
1213
1214 ++(thread->hist->total_calls);
1215 thread->hist->types |= (1 << thread->add_type);
1216
1217 #ifdef CONSUMED_TIME_CHECK
1218 if (realtime > CONSUMED_TIME_CHECK) {
1219 /*
1220 * We have a CPU Hog on our hands.
1221 * Whinge about it now, so we're aware this is yet another task
1222 * to fix.
1223 */
1224 zlog_warn(
1225 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1226 thread->funcname, (unsigned long)thread->func,
1227 realtime / 1000, cputime / 1000);
1228 }
1229 #endif /* CONSUMED_TIME_CHECK */
1230 }
1231
1232 /* Execute thread */
1233 struct thread *funcname_thread_execute(struct thread_master *m,
1234 int (*func)(struct thread *), void *arg,
1235 int val, debugargdef)
1236 {
1237 struct cpu_thread_history tmp;
1238 struct thread dummy;
1239
1240 memset(&dummy, 0, sizeof(struct thread));
1241
1242 dummy.type = THREAD_EVENT;
1243 dummy.add_type = THREAD_EXECUTE;
1244 dummy.master = NULL;
1245 dummy.arg = arg;
1246 dummy.u.val = val;
1247
1248 tmp.func = dummy.func = func;
1249 tmp.funcname = dummy.funcname = funcname;
1250 dummy.hist = hash_get(cpu_record, &tmp,
1251 (void *(*)(void *))cpu_record_hash_alloc);
1252
1253 dummy.schedfrom = schedfrom;
1254 dummy.schedfrom_line = fromln;
1255
1256 thread_call(&dummy);
1257
1258 return NULL;
1259 }