]> git.proxmox.com Git - mirror_frr.git/blob - lib/thread.c
lib: Get thread.c to compile
[mirror_frr.git] / lib / thread.c
1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with GNU Zebra; see the file COPYING. If not, write to the Free
18 * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
19 * 02111-1307, USA.
20 */
21
22 /* #define DEBUG */
23
24 #include <zebra.h>
25 #include <sys/resource.h>
26
27 #include "thread.h"
28 #include "memory.h"
29 #include "log.h"
30 #include "hash.h"
31 #include "pqueue.h"
32 #include "command.h"
33 #include "sigevent.h"
34
35 DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
36 DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
37 DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
38
39 #if defined(__APPLE__)
40 #include <mach/mach.h>
41 #include <mach/mach_time.h>
42 #endif
43
44 /* Recent absolute time of day */
45 struct timeval recent_time;
46 /* Relative time, since startup */
47 static struct timeval relative_time;
48
49 static struct hash *cpu_record = NULL;
50
51 /* Adjust so that tv_usec is in the range [0,TIMER_SECOND_MICRO).
52 And change negative values to 0. */
53 static struct timeval
54 timeval_adjust (struct timeval a)
55 {
56 while (a.tv_usec >= TIMER_SECOND_MICRO)
57 {
58 a.tv_usec -= TIMER_SECOND_MICRO;
59 a.tv_sec++;
60 }
61
62 while (a.tv_usec < 0)
63 {
64 a.tv_usec += TIMER_SECOND_MICRO;
65 a.tv_sec--;
66 }
67
68 if (a.tv_sec < 0)
69 /* Change negative timeouts to 0. */
70 a.tv_sec = a.tv_usec = 0;
71
72 return a;
73 }
74
75 static struct timeval
76 timeval_subtract (struct timeval a, struct timeval b)
77 {
78 struct timeval ret;
79
80 ret.tv_usec = a.tv_usec - b.tv_usec;
81 ret.tv_sec = a.tv_sec - b.tv_sec;
82
83 return timeval_adjust (ret);
84 }
85
86 static long
87 timeval_cmp (struct timeval a, struct timeval b)
88 {
89 return (a.tv_sec == b.tv_sec
90 ? a.tv_usec - b.tv_usec : a.tv_sec - b.tv_sec);
91 }
92
93 unsigned long
94 timeval_elapsed (struct timeval a, struct timeval b)
95 {
96 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
97 + (a.tv_usec - b.tv_usec));
98 }
99
100 /* gettimeofday wrapper, to keep recent_time updated */
101 static int
102 quagga_gettimeofday (struct timeval *tv)
103 {
104 int ret;
105
106 assert (tv);
107
108 if (!(ret = gettimeofday (&recent_time, NULL)))
109 {
110 /* avoid copy if user passed recent_time pointer.. */
111 if (tv != &recent_time)
112 *tv = recent_time;
113 return 0;
114 }
115 return ret;
116 }
117
118 static int
119 quagga_get_relative (struct timeval *tv)
120 {
121 int ret;
122
123 #ifdef HAVE_CLOCK_MONOTONIC
124 {
125 struct timespec tp;
126 if (!(ret = clock_gettime (CLOCK_MONOTONIC, &tp)))
127 {
128 relative_time.tv_sec = tp.tv_sec;
129 relative_time.tv_usec = tp.tv_nsec / 1000;
130 }
131 }
132 #elif defined(__APPLE__)
133 {
134 uint64_t ticks;
135 uint64_t useconds;
136 static mach_timebase_info_data_t timebase_info;
137
138 ticks = mach_absolute_time();
139 if (timebase_info.denom == 0)
140 mach_timebase_info(&timebase_info);
141
142 useconds = ticks * timebase_info.numer / timebase_info.denom / 1000;
143 relative_time.tv_sec = useconds / 1000000;
144 relative_time.tv_usec = useconds % 1000000;
145
146 return 0;
147 }
148 #else /* !HAVE_CLOCK_MONOTONIC && !__APPLE__ */
149 #error no monotonic clock on this system
150 #endif /* HAVE_CLOCK_MONOTONIC */
151
152 if (tv)
153 *tv = relative_time;
154
155 return ret;
156 }
157
158 /* Exported Quagga timestamp function.
159 * Modelled on POSIX clock_gettime.
160 */
161 int
162 quagga_gettime (enum quagga_clkid clkid, struct timeval *tv)
163 {
164 switch (clkid)
165 {
166 case QUAGGA_CLK_MONOTONIC:
167 return quagga_get_relative (tv);
168 default:
169 errno = EINVAL;
170 return -1;
171 }
172 }
173
174 time_t
175 quagga_monotime (void)
176 {
177 struct timeval tv;
178 quagga_get_relative(&tv);
179 return tv.tv_sec;
180 }
181
182 /* Public export of recent_relative_time by value */
183 struct timeval
184 recent_relative_time (void)
185 {
186 return relative_time;
187 }
188
189 static unsigned int
190 cpu_record_hash_key (struct cpu_thread_history *a)
191 {
192 return (uintptr_t) a->func;
193 }
194
195 static int
196 cpu_record_hash_cmp (const struct cpu_thread_history *a,
197 const struct cpu_thread_history *b)
198 {
199 return a->func == b->func;
200 }
201
202 static void *
203 cpu_record_hash_alloc (struct cpu_thread_history *a)
204 {
205 struct cpu_thread_history *new;
206 new = XCALLOC (MTYPE_THREAD_STATS, sizeof (struct cpu_thread_history));
207 new->func = a->func;
208 new->funcname = a->funcname;
209 return new;
210 }
211
212 static void
213 cpu_record_hash_free (void *a)
214 {
215 struct cpu_thread_history *hist = a;
216
217 XFREE (MTYPE_THREAD_STATS, hist);
218 }
219
220 static void
221 vty_out_cpu_thread_history(struct vty* vty,
222 struct cpu_thread_history *a)
223 {
224 #ifdef HAVE_RUSAGE
225 vty_out(vty, "%7ld.%03ld %9d %8ld %9ld %8ld %9ld",
226 a->cpu.total/1000, a->cpu.total%1000, a->total_calls,
227 a->cpu.total/a->total_calls, a->cpu.max,
228 a->real.total/a->total_calls, a->real.max);
229 #else
230 vty_out(vty, "%7ld.%03ld %9d %8ld %9ld",
231 a->real.total/1000, a->real.total%1000, a->total_calls,
232 a->real.total/a->total_calls, a->real.max);
233 #endif
234 vty_out(vty, " %c%c%c%c%c%c %s%s",
235 a->types & (1 << THREAD_READ) ? 'R':' ',
236 a->types & (1 << THREAD_WRITE) ? 'W':' ',
237 a->types & (1 << THREAD_TIMER) ? 'T':' ',
238 a->types & (1 << THREAD_EVENT) ? 'E':' ',
239 a->types & (1 << THREAD_EXECUTE) ? 'X':' ',
240 a->types & (1 << THREAD_BACKGROUND) ? 'B' : ' ',
241 a->funcname, VTY_NEWLINE);
242 }
243
244 static void
245 cpu_record_hash_print(struct hash_backet *bucket,
246 void *args[])
247 {
248 struct cpu_thread_history *totals = args[0];
249 struct vty *vty = args[1];
250 thread_type *filter = args[2];
251 struct cpu_thread_history *a = bucket->data;
252
253 if ( !(a->types & *filter) )
254 return;
255 vty_out_cpu_thread_history(vty,a);
256 totals->total_calls += a->total_calls;
257 totals->real.total += a->real.total;
258 if (totals->real.max < a->real.max)
259 totals->real.max = a->real.max;
260 #ifdef HAVE_RUSAGE
261 totals->cpu.total += a->cpu.total;
262 if (totals->cpu.max < a->cpu.max)
263 totals->cpu.max = a->cpu.max;
264 #endif
265 }
266
267 static void
268 cpu_record_print(struct vty *vty, thread_type filter)
269 {
270 struct cpu_thread_history tmp;
271 void *args[3] = {&tmp, vty, &filter};
272
273 memset(&tmp, 0, sizeof tmp);
274 tmp.funcname = "TOTAL";
275 tmp.types = filter;
276
277 #ifdef HAVE_RUSAGE
278 vty_out(vty, "%21s %18s %18s%s",
279 "", "CPU (user+system):", "Real (wall-clock):", VTY_NEWLINE);
280 #endif
281 vty_out(vty, "Runtime(ms) Invoked Avg uSec Max uSecs");
282 #ifdef HAVE_RUSAGE
283 vty_out(vty, " Avg uSec Max uSecs");
284 #endif
285 vty_out(vty, " Type Thread%s", VTY_NEWLINE);
286 hash_iterate(cpu_record,
287 (void(*)(struct hash_backet*,void*))cpu_record_hash_print,
288 args);
289
290 if (tmp.total_calls > 0)
291 vty_out_cpu_thread_history(vty, &tmp);
292 }
293
294 DEFUN (show_thread_cpu,
295 show_thread_cpu_cmd,
296 "show thread cpu [FILTER]",
297 SHOW_STR
298 "Thread information\n"
299 "Thread CPU usage\n"
300 "Display filter (rwtexb)\n")
301 {
302 int i = 0;
303 thread_type filter = (thread_type) -1U;
304
305 if (argc > 0)
306 {
307 filter = 0;
308 while (argv[0]->arg[i] != '\0')
309 {
310 switch ( argv[0]->arg[i] )
311 {
312 case 'r':
313 case 'R':
314 filter |= (1 << THREAD_READ);
315 break;
316 case 'w':
317 case 'W':
318 filter |= (1 << THREAD_WRITE);
319 break;
320 case 't':
321 case 'T':
322 filter |= (1 << THREAD_TIMER);
323 break;
324 case 'e':
325 case 'E':
326 filter |= (1 << THREAD_EVENT);
327 break;
328 case 'x':
329 case 'X':
330 filter |= (1 << THREAD_EXECUTE);
331 break;
332 case 'b':
333 case 'B':
334 filter |= (1 << THREAD_BACKGROUND);
335 break;
336 default:
337 break;
338 }
339 ++i;
340 }
341 if (filter == 0)
342 {
343 vty_out(vty, "Invalid filter \"%s\" specified,"
344 " must contain at least one of 'RWTEXB'%s",
345 argv[0]->arg, VTY_NEWLINE);
346 return CMD_WARNING;
347 }
348 }
349
350 cpu_record_print(vty, filter);
351 return CMD_SUCCESS;
352 }
353
354 static void
355 cpu_record_hash_clear (struct hash_backet *bucket,
356 void *args)
357 {
358 thread_type *filter = args;
359 struct cpu_thread_history *a = bucket->data;
360
361 if ( !(a->types & *filter) )
362 return;
363
364 hash_release (cpu_record, bucket->data);
365 }
366
367 static void
368 cpu_record_clear (thread_type filter)
369 {
370 thread_type *tmp = &filter;
371 hash_iterate (cpu_record,
372 (void (*) (struct hash_backet*,void*)) cpu_record_hash_clear,
373 tmp);
374 }
375
376 DEFUN (clear_thread_cpu,
377 clear_thread_cpu_cmd,
378 "clear thread cpu [FILTER]",
379 "Clear stored data\n"
380 "Thread information\n"
381 "Thread CPU usage\n"
382 "Display filter (rwtexb)\n")
383 {
384 int i = 0;
385 thread_type filter = (thread_type) -1U;
386
387 if (argc > 0)
388 {
389 filter = 0;
390 while (argv[0]->arg[i] != '\0')
391 {
392 switch ( argv[0]->arg[i] )
393 {
394 case 'r':
395 case 'R':
396 filter |= (1 << THREAD_READ);
397 break;
398 case 'w':
399 case 'W':
400 filter |= (1 << THREAD_WRITE);
401 break;
402 case 't':
403 case 'T':
404 filter |= (1 << THREAD_TIMER);
405 break;
406 case 'e':
407 case 'E':
408 filter |= (1 << THREAD_EVENT);
409 break;
410 case 'x':
411 case 'X':
412 filter |= (1 << THREAD_EXECUTE);
413 break;
414 case 'b':
415 case 'B':
416 filter |= (1 << THREAD_BACKGROUND);
417 break;
418 default:
419 break;
420 }
421 ++i;
422 }
423 if (filter == 0)
424 {
425 vty_out(vty, "Invalid filter \"%s\" specified,"
426 " must contain at least one of 'RWTEXB'%s",
427 argv[0]->arg, VTY_NEWLINE);
428 return CMD_WARNING;
429 }
430 }
431
432 cpu_record_clear (filter);
433 return CMD_SUCCESS;
434 }
435
436 static int
437 thread_timer_cmp(void *a, void *b)
438 {
439 struct thread *thread_a = a;
440 struct thread *thread_b = b;
441
442 long cmp = timeval_cmp(thread_a->u.sands, thread_b->u.sands);
443
444 if (cmp < 0)
445 return -1;
446 if (cmp > 0)
447 return 1;
448 return 0;
449 }
450
451 static void
452 thread_timer_update(void *node, int actual_position)
453 {
454 struct thread *thread = node;
455
456 thread->index = actual_position;
457 }
458
459 /* Allocate new thread master. */
460 struct thread_master *
461 thread_master_create (void)
462 {
463 struct thread_master *rv;
464 struct rlimit limit;
465
466 getrlimit(RLIMIT_NOFILE, &limit);
467
468 if (cpu_record == NULL)
469 cpu_record
470 = hash_create ((unsigned int (*) (void *))cpu_record_hash_key,
471 (int (*) (const void *, const void *))cpu_record_hash_cmp);
472
473 rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master));
474 if (rv == NULL)
475 {
476 return NULL;
477 }
478
479 rv->fd_limit = (int)limit.rlim_cur;
480 rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
481 if (rv->read == NULL)
482 {
483 XFREE (MTYPE_THREAD_MASTER, rv);
484 return NULL;
485 }
486
487 rv->write = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
488 if (rv->write == NULL)
489 {
490 XFREE (MTYPE_THREAD, rv->read);
491 XFREE (MTYPE_THREAD_MASTER, rv);
492 return NULL;
493 }
494
495 /* Initialize the timer queues */
496 rv->timer = pqueue_create();
497 rv->background = pqueue_create();
498 rv->timer->cmp = rv->background->cmp = thread_timer_cmp;
499 rv->timer->update = rv->background->update = thread_timer_update;
500
501 #if defined(HAVE_POLL)
502 rv->handler.pfdsize = rv->fd_limit;
503 rv->handler.pfdcount = 0;
504 rv->handler.pfds = (struct pollfd *) malloc (sizeof (struct pollfd) * rv->handler.pfdsize);
505 memset (rv->handler.pfds, 0, sizeof (struct pollfd) * rv->handler.pfdsize);
506 #endif
507 return rv;
508 }
509
510 /* Add a new thread to the list. */
511 static void
512 thread_list_add (struct thread_list *list, struct thread *thread)
513 {
514 thread->next = NULL;
515 thread->prev = list->tail;
516 if (list->tail)
517 list->tail->next = thread;
518 else
519 list->head = thread;
520 list->tail = thread;
521 list->count++;
522 }
523
524 /* Delete a thread from the list. */
525 static struct thread *
526 thread_list_delete (struct thread_list *list, struct thread *thread)
527 {
528 if (thread->next)
529 thread->next->prev = thread->prev;
530 else
531 list->tail = thread->prev;
532 if (thread->prev)
533 thread->prev->next = thread->next;
534 else
535 list->head = thread->next;
536 thread->next = thread->prev = NULL;
537 list->count--;
538 return thread;
539 }
540
541 static void
542 thread_delete_fd (struct thread **thread_array, struct thread *thread)
543 {
544 thread_array[thread->u.fd] = NULL;
545 }
546
547 static void
548 thread_add_fd (struct thread **thread_array, struct thread *thread)
549 {
550 thread_array[thread->u.fd] = thread;
551 }
552
553 /* Thread list is empty or not. */
554 static int
555 thread_empty (struct thread_list *list)
556 {
557 return list->head ? 0 : 1;
558 }
559
560 /* Delete top of the list and return it. */
561 static struct thread *
562 thread_trim_head (struct thread_list *list)
563 {
564 if (!thread_empty (list))
565 return thread_list_delete (list, list->head);
566 return NULL;
567 }
568
569 /* Move thread to unuse list. */
570 static void
571 thread_add_unuse (struct thread_master *m, struct thread *thread)
572 {
573 assert (m != NULL && thread != NULL);
574 assert (thread->next == NULL);
575 assert (thread->prev == NULL);
576 assert (thread->type == THREAD_UNUSED);
577 thread_list_add (&m->unuse, thread);
578 }
579
580 /* Free all unused thread. */
581 static void
582 thread_list_free (struct thread_master *m, struct thread_list *list)
583 {
584 struct thread *t;
585 struct thread *next;
586
587 for (t = list->head; t; t = next)
588 {
589 next = t->next;
590 XFREE (MTYPE_THREAD, t);
591 list->count--;
592 m->alloc--;
593 }
594 }
595
596 static void
597 thread_array_free (struct thread_master *m, struct thread **thread_array)
598 {
599 struct thread *t;
600 int index;
601
602 for (index = 0; index < m->fd_limit; ++index)
603 {
604 t = thread_array[index];
605 if (t)
606 {
607 thread_array[index] = NULL;
608 XFREE (MTYPE_THREAD, t);
609 m->alloc--;
610 }
611 }
612 XFREE (MTYPE_THREAD, thread_array);
613 }
614
615 static void
616 thread_queue_free (struct thread_master *m, struct pqueue *queue)
617 {
618 int i;
619
620 for (i = 0; i < queue->size; i++)
621 XFREE(MTYPE_THREAD, queue->array[i]);
622
623 m->alloc -= queue->size;
624 pqueue_delete(queue);
625 }
626
627 /*
628 * thread_master_free_unused
629 *
630 * As threads are finished with they are put on the
631 * unuse list for later reuse.
632 * If we are shutting down, Free up unused threads
633 * So we can see if we forget to shut anything off
634 */
635 void
636 thread_master_free_unused (struct thread_master *m)
637 {
638 struct thread *t;
639 while ((t = thread_trim_head(&m->unuse)) != NULL)
640 {
641 XFREE(MTYPE_THREAD, t);
642 }
643 }
644
645 /* Stop thread scheduler. */
646 void
647 thread_master_free (struct thread_master *m)
648 {
649 thread_array_free (m, m->read);
650 thread_array_free (m, m->write);
651 thread_queue_free (m, m->timer);
652 thread_list_free (m, &m->event);
653 thread_list_free (m, &m->ready);
654 thread_list_free (m, &m->unuse);
655 thread_queue_free (m, m->background);
656
657 #if defined(HAVE_POLL)
658 XFREE (MTYPE_THREAD_MASTER, m->handler.pfds);
659 #endif
660 XFREE (MTYPE_THREAD_MASTER, m);
661
662 if (cpu_record)
663 {
664 hash_clean (cpu_record, cpu_record_hash_free);
665 hash_free (cpu_record);
666 cpu_record = NULL;
667 }
668 }
669
670 /* Return remain time in second. */
671 unsigned long
672 thread_timer_remain_second (struct thread *thread)
673 {
674 quagga_get_relative (NULL);
675
676 if (thread->u.sands.tv_sec - relative_time.tv_sec > 0)
677 return thread->u.sands.tv_sec - relative_time.tv_sec;
678 else
679 return 0;
680 }
681
682 #define debugargdef const char *funcname, const char *schedfrom, int fromln
683 #define debugargpass funcname, schedfrom, fromln
684
685 struct timeval
686 thread_timer_remain(struct thread *thread)
687 {
688 quagga_get_relative(NULL);
689
690 return timeval_subtract(thread->u.sands, relative_time);
691 }
692
693 /* Get new thread. */
694 static struct thread *
695 thread_get (struct thread_master *m, u_char type,
696 int (*func) (struct thread *), void *arg, debugargdef)
697 {
698 struct thread *thread = thread_trim_head (&m->unuse);
699
700 if (! thread)
701 {
702 thread = XCALLOC (MTYPE_THREAD, sizeof (struct thread));
703 m->alloc++;
704 }
705 thread->type = type;
706 thread->add_type = type;
707 thread->master = m;
708 thread->func = func;
709 thread->arg = arg;
710 thread->index = -1;
711 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
712
713 thread->funcname = funcname;
714 thread->schedfrom = schedfrom;
715 thread->schedfrom_line = fromln;
716
717 return thread;
718 }
719
720 #if defined (HAVE_POLL)
721
722 #define fd_copy_fd_set(X) (X)
723
724 /* generic add thread function */
725 static struct thread *
726 generic_thread_add(struct thread_master *m, int (*func) (struct thread *),
727 void *arg, int fd, int dir, debugargdef)
728 {
729 struct thread *thread;
730
731 u_char type;
732 short int event;
733
734 if (dir == THREAD_READ)
735 {
736 event = (POLLIN | POLLHUP);
737 type = THREAD_READ;
738 }
739 else
740 {
741 event = (POLLOUT | POLLHUP);
742 type = THREAD_WRITE;
743 }
744
745 nfds_t queuepos = m->handler.pfdcount;
746 nfds_t i=0;
747 for (i=0; i<m->handler.pfdcount; i++)
748 if (m->handler.pfds[i].fd == fd)
749 {
750 queuepos = i;
751 break;
752 }
753
754 /* is there enough space for a new fd? */
755 assert (queuepos < m->handler.pfdsize);
756
757 thread = thread_get (m, type, func, arg, debugargpass);
758 m->handler.pfds[queuepos].fd = fd;
759 m->handler.pfds[queuepos].events |= event;
760 if (queuepos == m->handler.pfdcount)
761 m->handler.pfdcount++;
762
763 return thread;
764 }
765 #else
766
767 #define fd_copy_fd_set(X) (X)
768 #endif
769
770 static int
771 fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *timer_wait)
772 {
773 int num;
774 #if defined(HAVE_POLL)
775 /* recalc timeout for poll. Attention NULL pointer is no timeout with
776 select, where with poll no timeount is -1 */
777 int timeout = -1;
778 if (timer_wait != NULL)
779 timeout = (timer_wait->tv_sec*1000) + (timer_wait->tv_usec/1000);
780
781 num = poll (m->handler.pfds, m->handler.pfdcount + m->handler.pfdcountsnmp, timeout);
782 #else
783 num = select (size, read, write, except, timer_wait);
784 #endif
785
786 return num;
787 }
788
789 static int
790 fd_is_set (struct thread *thread, thread_fd_set *fdset, int pos)
791 {
792 #if defined(HAVE_POLL)
793 return 1;
794 #else
795 return FD_ISSET (THREAD_FD (thread), fdset);
796 #endif
797 }
798
799 static int
800 fd_clear_read_write (struct thread *thread)
801 {
802 #if !defined(HAVE_POLL)
803 thread_fd_set *fdset = NULL;
804 int fd = THREAD_FD (thread);
805
806 if (thread->type == THREAD_READ)
807 fdset = &thread->master->handler.readfd;
808 else
809 fdset = &thread->master->handler.writefd;
810
811 if (!FD_ISSET (fd, fdset))
812 return 0;
813
814 FD_CLR (fd, fdset);
815 #endif
816 return 1;
817 }
818
819 /* Add new read thread. */
820 struct thread *
821 funcname_thread_add_read_write (int dir, struct thread_master *m,
822 int (*func) (struct thread *), void *arg, int fd,
823 debugargdef)
824 {
825 struct thread *thread = NULL;
826
827 #if !defined(HAVE_POLL)
828 thread_fd_set *fdset = NULL;
829 if (dir == THREAD_READ)
830 fdset = &m->handler.readfd;
831 else
832 fdset = &m->handler.writefd;
833 #endif
834
835 #if defined (HAVE_POLL)
836 thread = generic_thread_add(m, func, arg, fd, dir, debugargpass);
837
838 if (thread == NULL)
839 return NULL;
840 #else
841 if (FD_ISSET (fd, fdset))
842 {
843 zlog (NULL, LOG_WARNING, "There is already %s fd [%d]", (dir = THREAD_READ) ? "read" : "write", fd);
844 return NULL;
845 }
846
847 FD_SET (fd, fdset);
848 thread = thread_get (m, dir, func, arg, debugargpass);
849 #endif
850
851 thread->u.fd = fd;
852 if (dir == THREAD_READ)
853 thread_add_fd (m->read, thread);
854 else
855 thread_add_fd (m->write, thread);
856
857 return thread;
858 }
859
860 static struct thread *
861 funcname_thread_add_timer_timeval (struct thread_master *m,
862 int (*func) (struct thread *),
863 int type,
864 void *arg,
865 struct timeval *time_relative,
866 debugargdef)
867 {
868 struct thread *thread;
869 struct pqueue *queue;
870 struct timeval alarm_time;
871
872 assert (m != NULL);
873
874 assert (type == THREAD_TIMER || type == THREAD_BACKGROUND);
875 assert (time_relative);
876
877 queue = ((type == THREAD_TIMER) ? m->timer : m->background);
878 thread = thread_get (m, type, func, arg, debugargpass);
879
880 /* Do we need jitter here? */
881 quagga_get_relative (NULL);
882 alarm_time.tv_sec = relative_time.tv_sec + time_relative->tv_sec;
883 alarm_time.tv_usec = relative_time.tv_usec + time_relative->tv_usec;
884 thread->u.sands = timeval_adjust(alarm_time);
885
886 pqueue_enqueue(thread, queue);
887 return thread;
888 }
889
890
891 /* Add timer event thread. */
892 struct thread *
893 funcname_thread_add_timer (struct thread_master *m,
894 int (*func) (struct thread *),
895 void *arg, long timer,
896 debugargdef)
897 {
898 struct timeval trel;
899
900 assert (m != NULL);
901
902 trel.tv_sec = timer;
903 trel.tv_usec = 0;
904
905 return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER, arg,
906 &trel, debugargpass);
907 }
908
909 /* Add timer event thread with "millisecond" resolution */
910 struct thread *
911 funcname_thread_add_timer_msec (struct thread_master *m,
912 int (*func) (struct thread *),
913 void *arg, long timer,
914 debugargdef)
915 {
916 struct timeval trel;
917
918 assert (m != NULL);
919
920 trel.tv_sec = timer / 1000;
921 trel.tv_usec = 1000*(timer % 1000);
922
923 return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER,
924 arg, &trel, debugargpass);
925 }
926
927 /* Add timer event thread with "millisecond" resolution */
928 struct thread *
929 funcname_thread_add_timer_tv (struct thread_master *m,
930 int (*func) (struct thread *),
931 void *arg, struct timeval *tv,
932 debugargdef)
933 {
934 return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER,
935 arg, tv, debugargpass);
936 }
937
938 /* Add a background thread, with an optional millisec delay */
939 struct thread *
940 funcname_thread_add_background (struct thread_master *m,
941 int (*func) (struct thread *),
942 void *arg, long delay,
943 debugargdef)
944 {
945 struct timeval trel;
946
947 assert (m != NULL);
948
949 if (delay)
950 {
951 trel.tv_sec = delay / 1000;
952 trel.tv_usec = 1000*(delay % 1000);
953 }
954 else
955 {
956 trel.tv_sec = 0;
957 trel.tv_usec = 0;
958 }
959
960 return funcname_thread_add_timer_timeval (m, func, THREAD_BACKGROUND,
961 arg, &trel, debugargpass);
962 }
963
964 /* Add simple event thread. */
965 struct thread *
966 funcname_thread_add_event (struct thread_master *m,
967 int (*func) (struct thread *), void *arg, int val,
968 debugargdef)
969 {
970 struct thread *thread;
971
972 assert (m != NULL);
973
974 thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass);
975 thread->u.val = val;
976 thread_list_add (&m->event, thread);
977
978 return thread;
979 }
980
981 static void
982 thread_cancel_read_or_write (struct thread *thread, short int state)
983 {
984 #if defined(HAVE_POLL)
985 nfds_t i;
986
987 for (i=0;i<thread->master->handler.pfdcount;++i)
988 if (thread->master->handler.pfds[i].fd == thread->u.fd)
989 {
990 thread->master->handler.pfds[i].events &= ~(state);
991
992 /* remove thread fds from pfd list */
993 if (thread->master->handler.pfds[i].events == 0)
994 {
995 memmove(thread->master->handler.pfds+i,
996 thread->master->handler.pfds+i+1,
997 (thread->master->handler.pfdsize-i-1) * sizeof(struct pollfd));
998 thread->master->handler.pfdcount--;
999 return;
1000 }
1001 }
1002 #endif
1003
1004 fd_clear_read_write (thread);
1005 }
1006
1007 /* Cancel thread from scheduler. */
1008 void
1009 thread_cancel (struct thread *thread)
1010 {
1011 struct thread_list *list = NULL;
1012 struct pqueue *queue = NULL;
1013 struct thread **thread_array = NULL;
1014
1015 switch (thread->type)
1016 {
1017 case THREAD_READ:
1018 #if defined (HAVE_POLL)
1019 thread_cancel_read_or_write (thread, POLLIN | POLLHUP);
1020 #else
1021 thread_cancel_read_or_write (thread, 0);
1022 #endif
1023 thread_array = thread->master->read;
1024 break;
1025 case THREAD_WRITE:
1026 #if defined (HAVE_POLL)
1027 thread_cancel_read_or_write (thread, POLLOUT | POLLHUP);
1028 #else
1029 thread_cancel_read_or_write (thread, 0);
1030 #endif
1031 thread_array = thread->master->write;
1032 break;
1033 case THREAD_TIMER:
1034 queue = thread->master->timer;
1035 break;
1036 case THREAD_EVENT:
1037 list = &thread->master->event;
1038 break;
1039 case THREAD_READY:
1040 list = &thread->master->ready;
1041 break;
1042 case THREAD_BACKGROUND:
1043 queue = thread->master->background;
1044 break;
1045 default:
1046 return;
1047 break;
1048 }
1049
1050 if (queue)
1051 {
1052 assert(thread->index >= 0);
1053 assert(thread == queue->array[thread->index]);
1054 pqueue_remove_at(thread->index, queue);
1055 }
1056 else if (list)
1057 {
1058 thread_list_delete (list, thread);
1059 }
1060 else if (thread_array)
1061 {
1062 thread_delete_fd (thread_array, thread);
1063 }
1064 else
1065 {
1066 assert(!"Thread should be either in queue or list or array!");
1067 }
1068
1069 thread->type = THREAD_UNUSED;
1070 thread_add_unuse (thread->master, thread);
1071 }
1072
1073 /* Delete all events which has argument value arg. */
1074 unsigned int
1075 thread_cancel_event (struct thread_master *m, void *arg)
1076 {
1077 unsigned int ret = 0;
1078 struct thread *thread;
1079
1080 thread = m->event.head;
1081 while (thread)
1082 {
1083 struct thread *t;
1084
1085 t = thread;
1086 thread = t->next;
1087
1088 if (t->arg == arg)
1089 {
1090 ret++;
1091 thread_list_delete (&m->event, t);
1092 t->type = THREAD_UNUSED;
1093 thread_add_unuse (m, t);
1094 }
1095 }
1096
1097 /* thread can be on the ready list too */
1098 thread = m->ready.head;
1099 while (thread)
1100 {
1101 struct thread *t;
1102
1103 t = thread;
1104 thread = t->next;
1105
1106 if (t->arg == arg)
1107 {
1108 ret++;
1109 thread_list_delete (&m->ready, t);
1110 t->type = THREAD_UNUSED;
1111 thread_add_unuse (m, t);
1112 }
1113 }
1114 return ret;
1115 }
1116
1117 static struct timeval *
1118 thread_timer_wait (struct pqueue *queue, struct timeval *timer_val)
1119 {
1120 if (queue->size)
1121 {
1122 struct thread *next_timer = queue->array[0];
1123 *timer_val = timeval_subtract (next_timer->u.sands, relative_time);
1124 return timer_val;
1125 }
1126 return NULL;
1127 }
1128
1129 static struct thread *
1130 thread_run (struct thread_master *m, struct thread *thread,
1131 struct thread *fetch)
1132 {
1133 *fetch = *thread;
1134 thread->type = THREAD_UNUSED;
1135 thread_add_unuse (m, thread);
1136 return fetch;
1137 }
1138
1139 static int
1140 thread_process_fds_helper (struct thread_master *m, struct thread *thread, thread_fd_set *fdset, short int state, int pos)
1141 {
1142 struct thread **thread_array;
1143
1144 if (!thread)
1145 return 0;
1146
1147 if (thread->type == THREAD_READ)
1148 thread_array = m->read;
1149 else
1150 thread_array = m->write;
1151
1152 if (fd_is_set (thread, fdset, pos))
1153 {
1154 fd_clear_read_write (thread);
1155 thread_delete_fd (thread_array, thread);
1156 thread_list_add (&m->ready, thread);
1157 thread->type = THREAD_READY;
1158 #if defined(HAVE_POLL)
1159 thread->master->handler.pfds[pos].events &= ~(state);
1160 #endif
1161 return 1;
1162 }
1163 return 0;
1164 }
1165
1166 #if defined(HAVE_POLL)
1167
1168 #if defined(HAVE_SNMP)
1169 /* add snmp fds to poll set */
1170 static void
1171 add_snmp_pollfds(struct thread_master *m, fd_set *snmpfds, int fdsetsize)
1172 {
1173 int i;
1174 m->handler.pfdcountsnmp = m->handler.pfdcount;
1175 /* cycle trough fds and add neccessary fds to poll set */
1176 for (i=0;i<fdsetsize;++i)
1177 {
1178 if (FD_ISSET(i, snmpfds))
1179 {
1180 assert (m->handler.pfdcountsnmp <= m->handler.pfdsize);
1181
1182 m->handler.pfds[m->handler.pfdcountsnmp].fd = i;
1183 m->handler.pfds[m->handler.pfdcountsnmp].events = POLLIN;
1184 m->handler.pfdcountsnmp++;
1185 }
1186 }
1187 }
1188 #endif
1189
1190 /* check poll events */
1191 static void
1192 check_pollfds(struct thread_master *m, fd_set *readfd, int num)
1193 {
1194 nfds_t i = 0;
1195 int ready = 0;
1196 for (i = 0; i < m->handler.pfdcount && ready < num ; ++i)
1197 {
1198 /* no event for current fd? immideatly continue */
1199 if(m->handler.pfds[i].revents == 0)
1200 continue;
1201
1202 ready++;
1203
1204 /* POLLIN / POLLOUT process event */
1205 if (m->handler.pfds[i].revents & POLLIN)
1206 thread_process_fds_helper(m, m->read[m->handler.pfds[i].fd], NULL, POLLIN, i);
1207 if (m->handler.pfds[i].revents & POLLOUT)
1208 thread_process_fds_helper(m, m->write[m->handler.pfds[i].fd], NULL, POLLOUT, i);
1209
1210 /* remove fd from list on POLLNVAL */
1211 if (m->handler.pfds[i].revents & POLLNVAL ||
1212 m->handler.pfds[i].revents & POLLHUP)
1213 {
1214 memmove(m->handler.pfds+i,
1215 m->handler.pfds+i+1,
1216 (m->handler.pfdsize-i-1) * sizeof(struct pollfd));
1217 m->handler.pfdcount--;
1218 i--;
1219 }
1220 else
1221 m->handler.pfds[i].revents = 0;
1222 }
1223 }
1224 #endif
1225
1226 static void
1227 thread_process_fds (struct thread_master *m, thread_fd_set *rset, thread_fd_set *wset, int num)
1228 {
1229 #if defined (HAVE_POLL)
1230 check_pollfds (m, rset, num);
1231 #else
1232 int ready = 0, index;
1233
1234 for (index = 0; index < m->fd_limit && ready < num; ++index)
1235 {
1236 ready += thread_process_fds_helper (m, m->read[index], rset, 0, 0);
1237 ready += thread_process_fds_helper (m, m->write[index], wset, 0, 0);
1238 }
1239 #endif
1240 }
1241
1242 /* Add all timers that have popped to the ready list. */
1243 static unsigned int
1244 thread_timer_process (struct pqueue *queue, struct timeval *timenow)
1245 {
1246 struct thread *thread;
1247 unsigned int ready = 0;
1248
1249 while (queue->size)
1250 {
1251 thread = queue->array[0];
1252 if (timeval_cmp (*timenow, thread->u.sands) < 0)
1253 return ready;
1254 pqueue_dequeue(queue);
1255 thread->type = THREAD_READY;
1256 thread_list_add (&thread->master->ready, thread);
1257 ready++;
1258 }
1259 return ready;
1260 }
1261
1262 /* process a list en masse, e.g. for event thread lists */
1263 static unsigned int
1264 thread_process (struct thread_list *list)
1265 {
1266 struct thread *thread;
1267 struct thread *next;
1268 unsigned int ready = 0;
1269
1270 for (thread = list->head; thread; thread = next)
1271 {
1272 next = thread->next;
1273 thread_list_delete (list, thread);
1274 thread->type = THREAD_READY;
1275 thread_list_add (&thread->master->ready, thread);
1276 ready++;
1277 }
1278 return ready;
1279 }
1280
1281
1282 /* Fetch next ready thread. */
1283 struct thread *
1284 thread_fetch (struct thread_master *m, struct thread *fetch)
1285 {
1286 struct thread *thread;
1287 thread_fd_set readfd;
1288 thread_fd_set writefd;
1289 thread_fd_set exceptfd;
1290 struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 };
1291 struct timeval timer_val_bg;
1292 struct timeval *timer_wait = &timer_val;
1293 struct timeval *timer_wait_bg;
1294
1295 while (1)
1296 {
1297 int num = 0;
1298
1299 /* Signals pre-empt everything */
1300 quagga_sigevent_process ();
1301
1302 /* Drain the ready queue of already scheduled jobs, before scheduling
1303 * more.
1304 */
1305 if ((thread = thread_trim_head (&m->ready)) != NULL)
1306 return thread_run (m, thread, fetch);
1307
1308 /* To be fair to all kinds of threads, and avoid starvation, we
1309 * need to be careful to consider all thread types for scheduling
1310 * in each quanta. I.e. we should not return early from here on.
1311 */
1312
1313 /* Normal event are the next highest priority. */
1314 thread_process (&m->event);
1315
1316 /* Structure copy. */
1317 #if !defined(HAVE_POLL)
1318 readfd = fd_copy_fd_set(m->handler.readfd);
1319 writefd = fd_copy_fd_set(m->handler.writefd);
1320 exceptfd = fd_copy_fd_set(m->handler.exceptfd);
1321 #endif
1322
1323 /* Calculate select wait timer if nothing else to do */
1324 if (m->ready.count == 0)
1325 {
1326 quagga_get_relative (NULL);
1327 timer_wait = thread_timer_wait (m->timer, &timer_val);
1328 timer_wait_bg = thread_timer_wait (m->background, &timer_val_bg);
1329
1330 if (timer_wait_bg &&
1331 (!timer_wait || (timeval_cmp (*timer_wait, *timer_wait_bg) > 0)))
1332 timer_wait = timer_wait_bg;
1333 }
1334
1335 num = fd_select (m, FD_SETSIZE, &readfd, &writefd, &exceptfd, timer_wait);
1336
1337 /* Signals should get quick treatment */
1338 if (num < 0)
1339 {
1340 if (errno == EINTR)
1341 continue; /* signal received - process it */
1342 zlog_warn ("select() error: %s", safe_strerror (errno));
1343 return NULL;
1344 }
1345
1346 /* Check foreground timers. Historically, they have had higher
1347 priority than I/O threads, so let's push them onto the ready
1348 list in front of the I/O threads. */
1349 quagga_get_relative (NULL);
1350 thread_timer_process (m->timer, &relative_time);
1351
1352 /* Got IO, process it */
1353 if (num > 0)
1354 thread_process_fds (m, &readfd, &writefd, num);
1355
1356 #if 0
1357 /* If any threads were made ready above (I/O or foreground timer),
1358 perhaps we should avoid adding background timers to the ready
1359 list at this time. If this is code is uncommented, then background
1360 timer threads will not run unless there is nothing else to do. */
1361 if ((thread = thread_trim_head (&m->ready)) != NULL)
1362 return thread_run (m, thread, fetch);
1363 #endif
1364
1365 /* Background timer/events, lowest priority */
1366 thread_timer_process (m->background, &relative_time);
1367
1368 if ((thread = thread_trim_head (&m->ready)) != NULL)
1369 return thread_run (m, thread, fetch);
1370 }
1371 }
1372
1373 unsigned long
1374 thread_consumed_time (RUSAGE_T *now, RUSAGE_T *start, unsigned long *cputime)
1375 {
1376 #ifdef HAVE_RUSAGE
1377 /* This is 'user + sys' time. */
1378 *cputime = timeval_elapsed (now->cpu.ru_utime, start->cpu.ru_utime) +
1379 timeval_elapsed (now->cpu.ru_stime, start->cpu.ru_stime);
1380 #else
1381 *cputime = 0;
1382 #endif /* HAVE_RUSAGE */
1383 return timeval_elapsed (now->real, start->real);
1384 }
1385
1386 /* We should aim to yield after yield milliseconds, which defaults
1387 to THREAD_YIELD_TIME_SLOT .
1388 Note: we are using real (wall clock) time for this calculation.
1389 It could be argued that CPU time may make more sense in certain
1390 contexts. The things to consider are whether the thread may have
1391 blocked (in which case wall time increases, but CPU time does not),
1392 or whether the system is heavily loaded with other processes competing
1393 for CPU time. On balance, wall clock time seems to make sense.
1394 Plus it has the added benefit that gettimeofday should be faster
1395 than calling getrusage. */
1396 int
1397 thread_should_yield (struct thread *thread)
1398 {
1399 quagga_get_relative (NULL);
1400 return (timeval_elapsed(relative_time, thread->real) >
1401 thread->yield);
1402 }
1403
1404 void
1405 thread_set_yield_time (struct thread *thread, unsigned long yield_time)
1406 {
1407 thread->yield = yield_time;
1408 }
1409
1410 void
1411 thread_getrusage (RUSAGE_T *r)
1412 {
1413 quagga_get_relative (NULL);
1414 #ifdef HAVE_RUSAGE
1415 getrusage(RUSAGE_SELF, &(r->cpu));
1416 #endif
1417 r->real = relative_time;
1418
1419 #ifdef HAVE_CLOCK_MONOTONIC
1420 /* quagga_get_relative() only updates recent_time if gettimeofday
1421 * based, not when using CLOCK_MONOTONIC. As we export recent_time
1422 * and guarantee to update it before threads are run...
1423 */
1424 quagga_gettimeofday(&recent_time);
1425 #endif /* HAVE_CLOCK_MONOTONIC */
1426 }
1427
1428 struct thread *thread_current = NULL;
1429
1430 /* We check thread consumed time. If the system has getrusage, we'll
1431 use that to get in-depth stats on the performance of the thread in addition
1432 to wall clock time stats from gettimeofday. */
1433 void
1434 thread_call (struct thread *thread)
1435 {
1436 unsigned long realtime, cputime;
1437 RUSAGE_T before, after;
1438
1439 /* Cache a pointer to the relevant cpu history thread, if the thread
1440 * does not have it yet.
1441 *
1442 * Callers submitting 'dummy threads' hence must take care that
1443 * thread->cpu is NULL
1444 */
1445 if (!thread->hist)
1446 {
1447 struct cpu_thread_history tmp;
1448
1449 tmp.func = thread->func;
1450 tmp.funcname = thread->funcname;
1451
1452 thread->hist = hash_get (cpu_record, &tmp,
1453 (void * (*) (void *))cpu_record_hash_alloc);
1454 }
1455
1456 GETRUSAGE (&before);
1457 thread->real = before.real;
1458
1459 thread_current = thread;
1460 (*thread->func) (thread);
1461 thread_current = NULL;
1462
1463 GETRUSAGE (&after);
1464
1465 realtime = thread_consumed_time (&after, &before, &cputime);
1466 thread->hist->real.total += realtime;
1467 if (thread->hist->real.max < realtime)
1468 thread->hist->real.max = realtime;
1469 #ifdef HAVE_RUSAGE
1470 thread->hist->cpu.total += cputime;
1471 if (thread->hist->cpu.max < cputime)
1472 thread->hist->cpu.max = cputime;
1473 #endif
1474
1475 ++(thread->hist->total_calls);
1476 thread->hist->types |= (1 << thread->add_type);
1477
1478 #ifdef CONSUMED_TIME_CHECK
1479 if (realtime > CONSUMED_TIME_CHECK)
1480 {
1481 /*
1482 * We have a CPU Hog on our hands.
1483 * Whinge about it now, so we're aware this is yet another task
1484 * to fix.
1485 */
1486 zlog_warn ("SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1487 thread->funcname,
1488 (unsigned long) thread->func,
1489 realtime/1000, cputime/1000);
1490 }
1491 #endif /* CONSUMED_TIME_CHECK */
1492 }
1493
1494 /* Execute thread */
1495 struct thread *
1496 funcname_thread_execute (struct thread_master *m,
1497 int (*func)(struct thread *),
1498 void *arg,
1499 int val,
1500 debugargdef)
1501 {
1502 struct thread dummy;
1503
1504 memset (&dummy, 0, sizeof (struct thread));
1505
1506 dummy.type = THREAD_EVENT;
1507 dummy.add_type = THREAD_EXECUTE;
1508 dummy.master = NULL;
1509 dummy.func = func;
1510 dummy.arg = arg;
1511 dummy.u.val = val;
1512
1513 dummy.funcname = funcname;
1514 dummy.schedfrom = schedfrom;
1515 dummy.schedfrom_line = fromln;
1516
1517 thread_call (&dummy);
1518
1519 return NULL;
1520 }