]> git.proxmox.com Git - mirror_frr.git/blob - lib/thread.c
Merge branch '-isisd-simpl' into stable/2.0
[mirror_frr.git] / lib / thread.c
1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with GNU Zebra; see the file COPYING. If not, write to the Free
18 * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
19 * 02111-1307, USA.
20 */
21
22 /* #define DEBUG */
23
24 #include <zebra.h>
25 #include <sys/resource.h>
26
27 #include "thread.h"
28 #include "memory.h"
29 #include "log.h"
30 #include "hash.h"
31 #include "pqueue.h"
32 #include "command.h"
33 #include "sigevent.h"
34
35 DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
36 DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
37 DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
38
39 #if defined(__APPLE__)
40 #include <mach/mach.h>
41 #include <mach/mach_time.h>
42 #endif
43
44 /* Recent absolute time of day */
45 struct timeval recent_time;
46 /* Relative time, since startup */
47 static struct timeval relative_time;
48
49 static struct hash *cpu_record = NULL;
50
51 /* Adjust so that tv_usec is in the range [0,TIMER_SECOND_MICRO).
52 And change negative values to 0. */
53 static struct timeval
54 timeval_adjust (struct timeval a)
55 {
56 while (a.tv_usec >= TIMER_SECOND_MICRO)
57 {
58 a.tv_usec -= TIMER_SECOND_MICRO;
59 a.tv_sec++;
60 }
61
62 while (a.tv_usec < 0)
63 {
64 a.tv_usec += TIMER_SECOND_MICRO;
65 a.tv_sec--;
66 }
67
68 if (a.tv_sec < 0)
69 /* Change negative timeouts to 0. */
70 a.tv_sec = a.tv_usec = 0;
71
72 return a;
73 }
74
75 static struct timeval
76 timeval_subtract (struct timeval a, struct timeval b)
77 {
78 struct timeval ret;
79
80 ret.tv_usec = a.tv_usec - b.tv_usec;
81 ret.tv_sec = a.tv_sec - b.tv_sec;
82
83 return timeval_adjust (ret);
84 }
85
86 static long
87 timeval_cmp (struct timeval a, struct timeval b)
88 {
89 return (a.tv_sec == b.tv_sec
90 ? a.tv_usec - b.tv_usec : a.tv_sec - b.tv_sec);
91 }
92
93 unsigned long
94 timeval_elapsed (struct timeval a, struct timeval b)
95 {
96 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
97 + (a.tv_usec - b.tv_usec));
98 }
99
100 /* gettimeofday wrapper, to keep recent_time updated */
101 static int
102 quagga_gettimeofday (struct timeval *tv)
103 {
104 int ret;
105
106 assert (tv);
107
108 if (!(ret = gettimeofday (&recent_time, NULL)))
109 {
110 /* avoid copy if user passed recent_time pointer.. */
111 if (tv != &recent_time)
112 *tv = recent_time;
113 return 0;
114 }
115 return ret;
116 }
117
118 static int
119 quagga_get_relative (struct timeval *tv)
120 {
121 int ret;
122
123 #ifdef HAVE_CLOCK_MONOTONIC
124 {
125 struct timespec tp;
126 if (!(ret = clock_gettime (CLOCK_MONOTONIC, &tp)))
127 {
128 relative_time.tv_sec = tp.tv_sec;
129 relative_time.tv_usec = tp.tv_nsec / 1000;
130 }
131 }
132 #elif defined(__APPLE__)
133 {
134 uint64_t ticks;
135 uint64_t useconds;
136 static mach_timebase_info_data_t timebase_info;
137
138 ticks = mach_absolute_time();
139 if (timebase_info.denom == 0)
140 mach_timebase_info(&timebase_info);
141
142 useconds = ticks * timebase_info.numer / timebase_info.denom / 1000;
143 relative_time.tv_sec = useconds / 1000000;
144 relative_time.tv_usec = useconds % 1000000;
145
146 return 0;
147 }
148 #else /* !HAVE_CLOCK_MONOTONIC && !__APPLE__ */
149 #error no monotonic clock on this system
150 #endif /* HAVE_CLOCK_MONOTONIC */
151
152 if (tv)
153 *tv = relative_time;
154
155 return ret;
156 }
157
158 /* Exported Quagga timestamp function.
159 * Modelled on POSIX clock_gettime.
160 */
161 int
162 quagga_gettime (enum quagga_clkid clkid, struct timeval *tv)
163 {
164 switch (clkid)
165 {
166 case QUAGGA_CLK_MONOTONIC:
167 return quagga_get_relative (tv);
168 default:
169 errno = EINVAL;
170 return -1;
171 }
172 }
173
174 time_t
175 quagga_monotime (void)
176 {
177 struct timeval tv;
178 quagga_get_relative(&tv);
179 return tv.tv_sec;
180 }
181
182 /* Public export of recent_relative_time by value */
183 struct timeval
184 recent_relative_time (void)
185 {
186 return relative_time;
187 }
188
189 static unsigned int
190 cpu_record_hash_key (struct cpu_thread_history *a)
191 {
192 return (uintptr_t) a->func;
193 }
194
195 static int
196 cpu_record_hash_cmp (const struct cpu_thread_history *a,
197 const struct cpu_thread_history *b)
198 {
199 return a->func == b->func;
200 }
201
202 static void *
203 cpu_record_hash_alloc (struct cpu_thread_history *a)
204 {
205 struct cpu_thread_history *new;
206 new = XCALLOC (MTYPE_THREAD_STATS, sizeof (struct cpu_thread_history));
207 new->func = a->func;
208 new->funcname = a->funcname;
209 return new;
210 }
211
212 static void
213 cpu_record_hash_free (void *a)
214 {
215 struct cpu_thread_history *hist = a;
216
217 XFREE (MTYPE_THREAD_STATS, hist);
218 }
219
220 static void
221 vty_out_cpu_thread_history(struct vty* vty,
222 struct cpu_thread_history *a)
223 {
224 vty_out(vty, "%10ld.%03ld %9d %8ld %9ld %8ld %9ld",
225 a->cpu.total/1000, a->cpu.total%1000, a->total_calls,
226 a->cpu.total/a->total_calls, a->cpu.max,
227 a->real.total/a->total_calls, a->real.max);
228 vty_out(vty, " %c%c%c%c%c%c %s%s",
229 a->types & (1 << THREAD_READ) ? 'R':' ',
230 a->types & (1 << THREAD_WRITE) ? 'W':' ',
231 a->types & (1 << THREAD_TIMER) ? 'T':' ',
232 a->types & (1 << THREAD_EVENT) ? 'E':' ',
233 a->types & (1 << THREAD_EXECUTE) ? 'X':' ',
234 a->types & (1 << THREAD_BACKGROUND) ? 'B' : ' ',
235 a->funcname, VTY_NEWLINE);
236 }
237
238 static void
239 cpu_record_hash_print(struct hash_backet *bucket,
240 void *args[])
241 {
242 struct cpu_thread_history *totals = args[0];
243 struct vty *vty = args[1];
244 thread_type *filter = args[2];
245 struct cpu_thread_history *a = bucket->data;
246
247 if ( !(a->types & *filter) )
248 return;
249 vty_out_cpu_thread_history(vty,a);
250 totals->total_calls += a->total_calls;
251 totals->real.total += a->real.total;
252 if (totals->real.max < a->real.max)
253 totals->real.max = a->real.max;
254 totals->cpu.total += a->cpu.total;
255 if (totals->cpu.max < a->cpu.max)
256 totals->cpu.max = a->cpu.max;
257 }
258
259 static void
260 cpu_record_print(struct vty *vty, thread_type filter)
261 {
262 struct cpu_thread_history tmp;
263 void *args[3] = {&tmp, vty, &filter};
264
265 memset(&tmp, 0, sizeof tmp);
266 tmp.funcname = "TOTAL";
267 tmp.types = filter;
268
269 vty_out(vty, "%21s %18s %18s%s",
270 "", "CPU (user+system):", "Real (wall-clock):", VTY_NEWLINE);
271 vty_out(vty, " Runtime(ms) Invoked Avg uSec Max uSecs");
272 vty_out(vty, " Avg uSec Max uSecs");
273 vty_out(vty, " Type Thread%s", VTY_NEWLINE);
274 hash_iterate(cpu_record,
275 (void(*)(struct hash_backet*,void*))cpu_record_hash_print,
276 args);
277
278 if (tmp.total_calls > 0)
279 vty_out_cpu_thread_history(vty, &tmp);
280 }
281
282 DEFUN (show_thread_cpu,
283 show_thread_cpu_cmd,
284 "show thread cpu [FILTER]",
285 SHOW_STR
286 "Thread information\n"
287 "Thread CPU usage\n"
288 "Display filter (rwtexb)\n")
289 {
290 int i = 0;
291 thread_type filter = (thread_type) -1U;
292
293 if (argc > 0)
294 {
295 filter = 0;
296 while (argv[0][i] != '\0')
297 {
298 switch ( argv[0][i] )
299 {
300 case 'r':
301 case 'R':
302 filter |= (1 << THREAD_READ);
303 break;
304 case 'w':
305 case 'W':
306 filter |= (1 << THREAD_WRITE);
307 break;
308 case 't':
309 case 'T':
310 filter |= (1 << THREAD_TIMER);
311 break;
312 case 'e':
313 case 'E':
314 filter |= (1 << THREAD_EVENT);
315 break;
316 case 'x':
317 case 'X':
318 filter |= (1 << THREAD_EXECUTE);
319 break;
320 case 'b':
321 case 'B':
322 filter |= (1 << THREAD_BACKGROUND);
323 break;
324 default:
325 break;
326 }
327 ++i;
328 }
329 if (filter == 0)
330 {
331 vty_out(vty, "Invalid filter \"%s\" specified,"
332 " must contain at least one of 'RWTEXB'%s",
333 argv[0], VTY_NEWLINE);
334 return CMD_WARNING;
335 }
336 }
337
338 cpu_record_print(vty, filter);
339 return CMD_SUCCESS;
340 }
341
342 static void
343 cpu_record_hash_clear (struct hash_backet *bucket,
344 void *args)
345 {
346 thread_type *filter = args;
347 struct cpu_thread_history *a = bucket->data;
348
349 if ( !(a->types & *filter) )
350 return;
351
352 hash_release (cpu_record, bucket->data);
353 }
354
355 static void
356 cpu_record_clear (thread_type filter)
357 {
358 thread_type *tmp = &filter;
359 hash_iterate (cpu_record,
360 (void (*) (struct hash_backet*,void*)) cpu_record_hash_clear,
361 tmp);
362 }
363
364 DEFUN (clear_thread_cpu,
365 clear_thread_cpu_cmd,
366 "clear thread cpu [FILTER]",
367 "Clear stored data\n"
368 "Thread information\n"
369 "Thread CPU usage\n"
370 "Display filter (rwtexb)\n")
371 {
372 int i = 0;
373 thread_type filter = (thread_type) -1U;
374
375 if (argc > 0)
376 {
377 filter = 0;
378 while (argv[0][i] != '\0')
379 {
380 switch ( argv[0][i] )
381 {
382 case 'r':
383 case 'R':
384 filter |= (1 << THREAD_READ);
385 break;
386 case 'w':
387 case 'W':
388 filter |= (1 << THREAD_WRITE);
389 break;
390 case 't':
391 case 'T':
392 filter |= (1 << THREAD_TIMER);
393 break;
394 case 'e':
395 case 'E':
396 filter |= (1 << THREAD_EVENT);
397 break;
398 case 'x':
399 case 'X':
400 filter |= (1 << THREAD_EXECUTE);
401 break;
402 case 'b':
403 case 'B':
404 filter |= (1 << THREAD_BACKGROUND);
405 break;
406 default:
407 break;
408 }
409 ++i;
410 }
411 if (filter == 0)
412 {
413 vty_out(vty, "Invalid filter \"%s\" specified,"
414 " must contain at least one of 'RWTEXB'%s",
415 argv[0], VTY_NEWLINE);
416 return CMD_WARNING;
417 }
418 }
419
420 cpu_record_clear (filter);
421 return CMD_SUCCESS;
422 }
423
424 static int
425 thread_timer_cmp(void *a, void *b)
426 {
427 struct thread *thread_a = a;
428 struct thread *thread_b = b;
429
430 long cmp = timeval_cmp(thread_a->u.sands, thread_b->u.sands);
431
432 if (cmp < 0)
433 return -1;
434 if (cmp > 0)
435 return 1;
436 return 0;
437 }
438
439 static void
440 thread_timer_update(void *node, int actual_position)
441 {
442 struct thread *thread = node;
443
444 thread->index = actual_position;
445 }
446
447 /* Allocate new thread master. */
448 struct thread_master *
449 thread_master_create (void)
450 {
451 struct thread_master *rv;
452 struct rlimit limit;
453
454 getrlimit(RLIMIT_NOFILE, &limit);
455
456 if (cpu_record == NULL)
457 cpu_record
458 = hash_create ((unsigned int (*) (void *))cpu_record_hash_key,
459 (int (*) (const void *, const void *))cpu_record_hash_cmp);
460
461 rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master));
462 if (rv == NULL)
463 {
464 return NULL;
465 }
466
467 rv->fd_limit = (int)limit.rlim_cur;
468 rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
469 if (rv->read == NULL)
470 {
471 XFREE (MTYPE_THREAD_MASTER, rv);
472 return NULL;
473 }
474
475 rv->write = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
476 if (rv->write == NULL)
477 {
478 XFREE (MTYPE_THREAD, rv->read);
479 XFREE (MTYPE_THREAD_MASTER, rv);
480 return NULL;
481 }
482
483 /* Initialize the timer queues */
484 rv->timer = pqueue_create();
485 rv->background = pqueue_create();
486 rv->timer->cmp = rv->background->cmp = thread_timer_cmp;
487 rv->timer->update = rv->background->update = thread_timer_update;
488
489 #if defined(HAVE_POLL)
490 rv->handler.pfdsize = rv->fd_limit;
491 rv->handler.pfdcount = 0;
492 rv->handler.pfds = (struct pollfd *) malloc (sizeof (struct pollfd) * rv->handler.pfdsize);
493 memset (rv->handler.pfds, 0, sizeof (struct pollfd) * rv->handler.pfdsize);
494 #endif
495 return rv;
496 }
497
498 /* Add a new thread to the list. */
499 static void
500 thread_list_add (struct thread_list *list, struct thread *thread)
501 {
502 thread->next = NULL;
503 thread->prev = list->tail;
504 if (list->tail)
505 list->tail->next = thread;
506 else
507 list->head = thread;
508 list->tail = thread;
509 list->count++;
510 }
511
512 /* Delete a thread from the list. */
513 static struct thread *
514 thread_list_delete (struct thread_list *list, struct thread *thread)
515 {
516 if (thread->next)
517 thread->next->prev = thread->prev;
518 else
519 list->tail = thread->prev;
520 if (thread->prev)
521 thread->prev->next = thread->next;
522 else
523 list->head = thread->next;
524 thread->next = thread->prev = NULL;
525 list->count--;
526 return thread;
527 }
528
529 static void
530 thread_delete_fd (struct thread **thread_array, struct thread *thread)
531 {
532 thread_array[thread->u.fd] = NULL;
533 }
534
535 static void
536 thread_add_fd (struct thread **thread_array, struct thread *thread)
537 {
538 thread_array[thread->u.fd] = thread;
539 }
540
541 /* Thread list is empty or not. */
542 static int
543 thread_empty (struct thread_list *list)
544 {
545 return list->head ? 0 : 1;
546 }
547
548 /* Delete top of the list and return it. */
549 static struct thread *
550 thread_trim_head (struct thread_list *list)
551 {
552 if (!thread_empty (list))
553 return thread_list_delete (list, list->head);
554 return NULL;
555 }
556
557 /* Move thread to unuse list. */
558 static void
559 thread_add_unuse (struct thread_master *m, struct thread *thread)
560 {
561 assert (m != NULL && thread != NULL);
562 assert (thread->next == NULL);
563 assert (thread->prev == NULL);
564 assert (thread->type == THREAD_UNUSED);
565 thread_list_add (&m->unuse, thread);
566 }
567
568 /* Free all unused thread. */
569 static void
570 thread_list_free (struct thread_master *m, struct thread_list *list)
571 {
572 struct thread *t;
573 struct thread *next;
574
575 for (t = list->head; t; t = next)
576 {
577 next = t->next;
578 XFREE (MTYPE_THREAD, t);
579 list->count--;
580 m->alloc--;
581 }
582 }
583
584 static void
585 thread_array_free (struct thread_master *m, struct thread **thread_array)
586 {
587 struct thread *t;
588 int index;
589
590 for (index = 0; index < m->fd_limit; ++index)
591 {
592 t = thread_array[index];
593 if (t)
594 {
595 thread_array[index] = NULL;
596 XFREE (MTYPE_THREAD, t);
597 m->alloc--;
598 }
599 }
600 XFREE (MTYPE_THREAD, thread_array);
601 }
602
603 static void
604 thread_queue_free (struct thread_master *m, struct pqueue *queue)
605 {
606 int i;
607
608 for (i = 0; i < queue->size; i++)
609 XFREE(MTYPE_THREAD, queue->array[i]);
610
611 m->alloc -= queue->size;
612 pqueue_delete(queue);
613 }
614
615 /*
616 * thread_master_free_unused
617 *
618 * As threads are finished with they are put on the
619 * unuse list for later reuse.
620 * If we are shutting down, Free up unused threads
621 * So we can see if we forget to shut anything off
622 */
623 void
624 thread_master_free_unused (struct thread_master *m)
625 {
626 struct thread *t;
627 while ((t = thread_trim_head(&m->unuse)) != NULL)
628 {
629 XFREE(MTYPE_THREAD, t);
630 }
631 }
632
633 /* Stop thread scheduler. */
634 void
635 thread_master_free (struct thread_master *m)
636 {
637 thread_array_free (m, m->read);
638 thread_array_free (m, m->write);
639 thread_queue_free (m, m->timer);
640 thread_list_free (m, &m->event);
641 thread_list_free (m, &m->ready);
642 thread_list_free (m, &m->unuse);
643 thread_queue_free (m, m->background);
644
645 #if defined(HAVE_POLL)
646 XFREE (MTYPE_THREAD_MASTER, m->handler.pfds);
647 #endif
648 XFREE (MTYPE_THREAD_MASTER, m);
649
650 if (cpu_record)
651 {
652 hash_clean (cpu_record, cpu_record_hash_free);
653 hash_free (cpu_record);
654 cpu_record = NULL;
655 }
656 }
657
658 /* Return remain time in second. */
659 unsigned long
660 thread_timer_remain_second (struct thread *thread)
661 {
662 quagga_get_relative (NULL);
663
664 if (thread->u.sands.tv_sec - relative_time.tv_sec > 0)
665 return thread->u.sands.tv_sec - relative_time.tv_sec;
666 else
667 return 0;
668 }
669
670 #define debugargdef const char *funcname, const char *schedfrom, int fromln
671 #define debugargpass funcname, schedfrom, fromln
672
673 struct timeval
674 thread_timer_remain(struct thread *thread)
675 {
676 quagga_get_relative(NULL);
677
678 return timeval_subtract(thread->u.sands, relative_time);
679 }
680
681 /* Get new thread. */
682 static struct thread *
683 thread_get (struct thread_master *m, u_char type,
684 int (*func) (struct thread *), void *arg, debugargdef)
685 {
686 struct thread *thread = thread_trim_head (&m->unuse);
687
688 if (! thread)
689 {
690 thread = XCALLOC (MTYPE_THREAD, sizeof (struct thread));
691 m->alloc++;
692 }
693 thread->type = type;
694 thread->add_type = type;
695 thread->master = m;
696 thread->func = func;
697 thread->arg = arg;
698 thread->index = -1;
699 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
700
701 thread->funcname = funcname;
702 thread->schedfrom = schedfrom;
703 thread->schedfrom_line = fromln;
704
705 return thread;
706 }
707
708 #if defined (HAVE_POLL)
709
710 #define fd_copy_fd_set(X) (X)
711
712 /* generic add thread function */
713 static struct thread *
714 generic_thread_add(struct thread_master *m, int (*func) (struct thread *),
715 void *arg, int fd, int dir, debugargdef)
716 {
717 struct thread *thread;
718
719 u_char type;
720 short int event;
721
722 if (dir == THREAD_READ)
723 {
724 event = (POLLIN | POLLHUP);
725 type = THREAD_READ;
726 }
727 else
728 {
729 event = (POLLOUT | POLLHUP);
730 type = THREAD_WRITE;
731 }
732
733 nfds_t queuepos = m->handler.pfdcount;
734 nfds_t i=0;
735 for (i=0; i<m->handler.pfdcount; i++)
736 if (m->handler.pfds[i].fd == fd)
737 {
738 queuepos = i;
739 break;
740 }
741
742 /* is there enough space for a new fd? */
743 assert (queuepos < m->handler.pfdsize);
744
745 thread = thread_get (m, type, func, arg, debugargpass);
746 m->handler.pfds[queuepos].fd = fd;
747 m->handler.pfds[queuepos].events |= event;
748 if (queuepos == m->handler.pfdcount)
749 m->handler.pfdcount++;
750
751 return thread;
752 }
753 #else
754
755 #define fd_copy_fd_set(X) (X)
756 #endif
757
758 static int
759 fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *timer_wait)
760 {
761 int num;
762 #if defined(HAVE_POLL)
763 /* recalc timeout for poll. Attention NULL pointer is no timeout with
764 select, where with poll no timeount is -1 */
765 int timeout = -1;
766 if (timer_wait != NULL)
767 timeout = (timer_wait->tv_sec*1000) + (timer_wait->tv_usec/1000);
768
769 num = poll (m->handler.pfds, m->handler.pfdcount + m->handler.pfdcountsnmp, timeout);
770 #else
771 num = select (size, read, write, except, timer_wait);
772 #endif
773
774 return num;
775 }
776
777 static int
778 fd_is_set (struct thread *thread, thread_fd_set *fdset, int pos)
779 {
780 #if defined(HAVE_POLL)
781 return 1;
782 #else
783 return FD_ISSET (THREAD_FD (thread), fdset);
784 #endif
785 }
786
787 static int
788 fd_clear_read_write (struct thread *thread)
789 {
790 #if !defined(HAVE_POLL)
791 thread_fd_set *fdset = NULL;
792 int fd = THREAD_FD (thread);
793
794 if (thread->type == THREAD_READ)
795 fdset = &thread->master->handler.readfd;
796 else
797 fdset = &thread->master->handler.writefd;
798
799 if (!FD_ISSET (fd, fdset))
800 return 0;
801
802 FD_CLR (fd, fdset);
803 #endif
804 return 1;
805 }
806
807 /* Add new read thread. */
808 struct thread *
809 funcname_thread_add_read_write (int dir, struct thread_master *m,
810 int (*func) (struct thread *), void *arg, int fd,
811 debugargdef)
812 {
813 struct thread *thread = NULL;
814
815 #if !defined(HAVE_POLL)
816 thread_fd_set *fdset = NULL;
817 if (dir == THREAD_READ)
818 fdset = &m->handler.readfd;
819 else
820 fdset = &m->handler.writefd;
821 #endif
822
823 #if defined (HAVE_POLL)
824 thread = generic_thread_add(m, func, arg, fd, dir, debugargpass);
825
826 if (thread == NULL)
827 return NULL;
828 #else
829 if (FD_ISSET (fd, fdset))
830 {
831 zlog (NULL, LOG_WARNING, "There is already %s fd [%d]", (dir = THREAD_READ) ? "read" : "write", fd);
832 return NULL;
833 }
834
835 FD_SET (fd, fdset);
836 thread = thread_get (m, dir, func, arg, debugargpass);
837 #endif
838
839 thread->u.fd = fd;
840 if (dir == THREAD_READ)
841 thread_add_fd (m->read, thread);
842 else
843 thread_add_fd (m->write, thread);
844
845 return thread;
846 }
847
848 static struct thread *
849 funcname_thread_add_timer_timeval (struct thread_master *m,
850 int (*func) (struct thread *),
851 int type,
852 void *arg,
853 struct timeval *time_relative,
854 debugargdef)
855 {
856 struct thread *thread;
857 struct pqueue *queue;
858 struct timeval alarm_time;
859
860 assert (m != NULL);
861
862 assert (type == THREAD_TIMER || type == THREAD_BACKGROUND);
863 assert (time_relative);
864
865 queue = ((type == THREAD_TIMER) ? m->timer : m->background);
866 thread = thread_get (m, type, func, arg, debugargpass);
867
868 /* Do we need jitter here? */
869 quagga_get_relative (NULL);
870 alarm_time.tv_sec = relative_time.tv_sec + time_relative->tv_sec;
871 alarm_time.tv_usec = relative_time.tv_usec + time_relative->tv_usec;
872 thread->u.sands = timeval_adjust(alarm_time);
873
874 pqueue_enqueue(thread, queue);
875 return thread;
876 }
877
878
879 /* Add timer event thread. */
880 struct thread *
881 funcname_thread_add_timer (struct thread_master *m,
882 int (*func) (struct thread *),
883 void *arg, long timer,
884 debugargdef)
885 {
886 struct timeval trel;
887
888 assert (m != NULL);
889
890 trel.tv_sec = timer;
891 trel.tv_usec = 0;
892
893 return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER, arg,
894 &trel, debugargpass);
895 }
896
897 /* Add timer event thread with "millisecond" resolution */
898 struct thread *
899 funcname_thread_add_timer_msec (struct thread_master *m,
900 int (*func) (struct thread *),
901 void *arg, long timer,
902 debugargdef)
903 {
904 struct timeval trel;
905
906 assert (m != NULL);
907
908 trel.tv_sec = timer / 1000;
909 trel.tv_usec = 1000*(timer % 1000);
910
911 return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER,
912 arg, &trel, debugargpass);
913 }
914
915 /* Add timer event thread with "millisecond" resolution */
916 struct thread *
917 funcname_thread_add_timer_tv (struct thread_master *m,
918 int (*func) (struct thread *),
919 void *arg, struct timeval *tv,
920 debugargdef)
921 {
922 return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER,
923 arg, tv, debugargpass);
924 }
925
926 /* Add a background thread, with an optional millisec delay */
927 struct thread *
928 funcname_thread_add_background (struct thread_master *m,
929 int (*func) (struct thread *),
930 void *arg, long delay,
931 debugargdef)
932 {
933 struct timeval trel;
934
935 assert (m != NULL);
936
937 if (delay)
938 {
939 trel.tv_sec = delay / 1000;
940 trel.tv_usec = 1000*(delay % 1000);
941 }
942 else
943 {
944 trel.tv_sec = 0;
945 trel.tv_usec = 0;
946 }
947
948 return funcname_thread_add_timer_timeval (m, func, THREAD_BACKGROUND,
949 arg, &trel, debugargpass);
950 }
951
952 /* Add simple event thread. */
953 struct thread *
954 funcname_thread_add_event (struct thread_master *m,
955 int (*func) (struct thread *), void *arg, int val,
956 debugargdef)
957 {
958 struct thread *thread;
959
960 assert (m != NULL);
961
962 thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass);
963 thread->u.val = val;
964 thread_list_add (&m->event, thread);
965
966 return thread;
967 }
968
969 static void
970 thread_cancel_read_or_write (struct thread *thread, short int state)
971 {
972 #if defined(HAVE_POLL)
973 nfds_t i;
974
975 for (i=0;i<thread->master->handler.pfdcount;++i)
976 if (thread->master->handler.pfds[i].fd == thread->u.fd)
977 {
978 thread->master->handler.pfds[i].events &= ~(state);
979
980 /* remove thread fds from pfd list */
981 if (thread->master->handler.pfds[i].events == 0)
982 {
983 memmove(thread->master->handler.pfds+i,
984 thread->master->handler.pfds+i+1,
985 (thread->master->handler.pfdsize-i-1) * sizeof(struct pollfd));
986 thread->master->handler.pfdcount--;
987 return;
988 }
989 }
990 #endif
991
992 fd_clear_read_write (thread);
993 }
994
995 /* Cancel thread from scheduler. */
996 void
997 thread_cancel (struct thread *thread)
998 {
999 struct thread_list *list = NULL;
1000 struct pqueue *queue = NULL;
1001 struct thread **thread_array = NULL;
1002
1003 switch (thread->type)
1004 {
1005 case THREAD_READ:
1006 #if defined (HAVE_POLL)
1007 thread_cancel_read_or_write (thread, POLLIN | POLLHUP);
1008 #else
1009 thread_cancel_read_or_write (thread, 0);
1010 #endif
1011 thread_array = thread->master->read;
1012 break;
1013 case THREAD_WRITE:
1014 #if defined (HAVE_POLL)
1015 thread_cancel_read_or_write (thread, POLLOUT | POLLHUP);
1016 #else
1017 thread_cancel_read_or_write (thread, 0);
1018 #endif
1019 thread_array = thread->master->write;
1020 break;
1021 case THREAD_TIMER:
1022 queue = thread->master->timer;
1023 break;
1024 case THREAD_EVENT:
1025 list = &thread->master->event;
1026 break;
1027 case THREAD_READY:
1028 list = &thread->master->ready;
1029 break;
1030 case THREAD_BACKGROUND:
1031 queue = thread->master->background;
1032 break;
1033 default:
1034 return;
1035 break;
1036 }
1037
1038 if (queue)
1039 {
1040 assert(thread->index >= 0);
1041 assert(thread == queue->array[thread->index]);
1042 pqueue_remove_at(thread->index, queue);
1043 }
1044 else if (list)
1045 {
1046 thread_list_delete (list, thread);
1047 }
1048 else if (thread_array)
1049 {
1050 thread_delete_fd (thread_array, thread);
1051 }
1052 else
1053 {
1054 assert(!"Thread should be either in queue or list or array!");
1055 }
1056
1057 thread->type = THREAD_UNUSED;
1058 thread_add_unuse (thread->master, thread);
1059 }
1060
1061 /* Delete all events which has argument value arg. */
1062 unsigned int
1063 thread_cancel_event (struct thread_master *m, void *arg)
1064 {
1065 unsigned int ret = 0;
1066 struct thread *thread;
1067
1068 thread = m->event.head;
1069 while (thread)
1070 {
1071 struct thread *t;
1072
1073 t = thread;
1074 thread = t->next;
1075
1076 if (t->arg == arg)
1077 {
1078 ret++;
1079 thread_list_delete (&m->event, t);
1080 t->type = THREAD_UNUSED;
1081 thread_add_unuse (m, t);
1082 }
1083 }
1084
1085 /* thread can be on the ready list too */
1086 thread = m->ready.head;
1087 while (thread)
1088 {
1089 struct thread *t;
1090
1091 t = thread;
1092 thread = t->next;
1093
1094 if (t->arg == arg)
1095 {
1096 ret++;
1097 thread_list_delete (&m->ready, t);
1098 t->type = THREAD_UNUSED;
1099 thread_add_unuse (m, t);
1100 }
1101 }
1102 return ret;
1103 }
1104
1105 static struct timeval *
1106 thread_timer_wait (struct pqueue *queue, struct timeval *timer_val)
1107 {
1108 if (queue->size)
1109 {
1110 struct thread *next_timer = queue->array[0];
1111 *timer_val = timeval_subtract (next_timer->u.sands, relative_time);
1112 return timer_val;
1113 }
1114 return NULL;
1115 }
1116
1117 static struct thread *
1118 thread_run (struct thread_master *m, struct thread *thread,
1119 struct thread *fetch)
1120 {
1121 *fetch = *thread;
1122 thread->type = THREAD_UNUSED;
1123 thread_add_unuse (m, thread);
1124 return fetch;
1125 }
1126
1127 static int
1128 thread_process_fds_helper (struct thread_master *m, struct thread *thread, thread_fd_set *fdset, short int state, int pos)
1129 {
1130 struct thread **thread_array;
1131
1132 if (!thread)
1133 return 0;
1134
1135 if (thread->type == THREAD_READ)
1136 thread_array = m->read;
1137 else
1138 thread_array = m->write;
1139
1140 if (fd_is_set (thread, fdset, pos))
1141 {
1142 fd_clear_read_write (thread);
1143 thread_delete_fd (thread_array, thread);
1144 thread_list_add (&m->ready, thread);
1145 thread->type = THREAD_READY;
1146 #if defined(HAVE_POLL)
1147 thread->master->handler.pfds[pos].events &= ~(state);
1148 #endif
1149 return 1;
1150 }
1151 return 0;
1152 }
1153
1154 #if defined(HAVE_POLL)
1155
1156 #if defined(HAVE_SNMP)
1157 /* add snmp fds to poll set */
1158 static void
1159 add_snmp_pollfds(struct thread_master *m, fd_set *snmpfds, int fdsetsize)
1160 {
1161 int i;
1162 m->handler.pfdcountsnmp = m->handler.pfdcount;
1163 /* cycle trough fds and add neccessary fds to poll set */
1164 for (i=0;i<fdsetsize;++i)
1165 {
1166 if (FD_ISSET(i, snmpfds))
1167 {
1168 assert (m->handler.pfdcountsnmp <= m->handler.pfdsize);
1169
1170 m->handler.pfds[m->handler.pfdcountsnmp].fd = i;
1171 m->handler.pfds[m->handler.pfdcountsnmp].events = POLLIN;
1172 m->handler.pfdcountsnmp++;
1173 }
1174 }
1175 }
1176 #endif
1177
1178 /* check poll events */
1179 static void
1180 check_pollfds(struct thread_master *m, fd_set *readfd, int num)
1181 {
1182 nfds_t i = 0;
1183 int ready = 0;
1184 for (i = 0; i < m->handler.pfdcount && ready < num ; ++i)
1185 {
1186 /* no event for current fd? immideatly continue */
1187 if(m->handler.pfds[i].revents == 0)
1188 continue;
1189
1190 ready++;
1191
1192 /* POLLIN / POLLOUT process event */
1193 if (m->handler.pfds[i].revents & POLLIN)
1194 thread_process_fds_helper(m, m->read[m->handler.pfds[i].fd], NULL, POLLIN, i);
1195 if (m->handler.pfds[i].revents & POLLOUT)
1196 thread_process_fds_helper(m, m->write[m->handler.pfds[i].fd], NULL, POLLOUT, i);
1197
1198 /* remove fd from list on POLLNVAL */
1199 if (m->handler.pfds[i].revents & POLLNVAL ||
1200 m->handler.pfds[i].revents & POLLHUP)
1201 {
1202 memmove(m->handler.pfds+i,
1203 m->handler.pfds+i+1,
1204 (m->handler.pfdsize-i-1) * sizeof(struct pollfd));
1205 m->handler.pfdcount--;
1206 i--;
1207 }
1208 else
1209 m->handler.pfds[i].revents = 0;
1210 }
1211 }
1212 #endif
1213
1214 static void
1215 thread_process_fds (struct thread_master *m, thread_fd_set *rset, thread_fd_set *wset, int num)
1216 {
1217 #if defined (HAVE_POLL)
1218 check_pollfds (m, rset, num);
1219 #else
1220 int ready = 0, index;
1221
1222 for (index = 0; index < m->fd_limit && ready < num; ++index)
1223 {
1224 ready += thread_process_fds_helper (m, m->read[index], rset, 0, 0);
1225 ready += thread_process_fds_helper (m, m->write[index], wset, 0, 0);
1226 }
1227 #endif
1228 }
1229
1230 /* Add all timers that have popped to the ready list. */
1231 static unsigned int
1232 thread_timer_process (struct pqueue *queue, struct timeval *timenow)
1233 {
1234 struct thread *thread;
1235 unsigned int ready = 0;
1236
1237 while (queue->size)
1238 {
1239 thread = queue->array[0];
1240 if (timeval_cmp (*timenow, thread->u.sands) < 0)
1241 return ready;
1242 pqueue_dequeue(queue);
1243 thread->type = THREAD_READY;
1244 thread_list_add (&thread->master->ready, thread);
1245 ready++;
1246 }
1247 return ready;
1248 }
1249
1250 /* process a list en masse, e.g. for event thread lists */
1251 static unsigned int
1252 thread_process (struct thread_list *list)
1253 {
1254 struct thread *thread;
1255 struct thread *next;
1256 unsigned int ready = 0;
1257
1258 for (thread = list->head; thread; thread = next)
1259 {
1260 next = thread->next;
1261 thread_list_delete (list, thread);
1262 thread->type = THREAD_READY;
1263 thread_list_add (&thread->master->ready, thread);
1264 ready++;
1265 }
1266 return ready;
1267 }
1268
1269
1270 /* Fetch next ready thread. */
1271 struct thread *
1272 thread_fetch (struct thread_master *m, struct thread *fetch)
1273 {
1274 struct thread *thread;
1275 thread_fd_set readfd;
1276 thread_fd_set writefd;
1277 thread_fd_set exceptfd;
1278 struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 };
1279 struct timeval timer_val_bg;
1280 struct timeval *timer_wait = &timer_val;
1281 struct timeval *timer_wait_bg;
1282
1283 while (1)
1284 {
1285 int num = 0;
1286
1287 /* Signals pre-empt everything */
1288 quagga_sigevent_process ();
1289
1290 /* Drain the ready queue of already scheduled jobs, before scheduling
1291 * more.
1292 */
1293 if ((thread = thread_trim_head (&m->ready)) != NULL)
1294 return thread_run (m, thread, fetch);
1295
1296 /* To be fair to all kinds of threads, and avoid starvation, we
1297 * need to be careful to consider all thread types for scheduling
1298 * in each quanta. I.e. we should not return early from here on.
1299 */
1300
1301 /* Normal event are the next highest priority. */
1302 thread_process (&m->event);
1303
1304 /* Structure copy. */
1305 #if !defined(HAVE_POLL)
1306 readfd = fd_copy_fd_set(m->handler.readfd);
1307 writefd = fd_copy_fd_set(m->handler.writefd);
1308 exceptfd = fd_copy_fd_set(m->handler.exceptfd);
1309 #endif
1310
1311 /* Calculate select wait timer if nothing else to do */
1312 if (m->ready.count == 0)
1313 {
1314 quagga_get_relative (NULL);
1315 timer_wait = thread_timer_wait (m->timer, &timer_val);
1316 timer_wait_bg = thread_timer_wait (m->background, &timer_val_bg);
1317
1318 if (timer_wait_bg &&
1319 (!timer_wait || (timeval_cmp (*timer_wait, *timer_wait_bg) > 0)))
1320 timer_wait = timer_wait_bg;
1321 }
1322
1323 num = fd_select (m, FD_SETSIZE, &readfd, &writefd, &exceptfd, timer_wait);
1324
1325 /* Signals should get quick treatment */
1326 if (num < 0)
1327 {
1328 if (errno == EINTR)
1329 continue; /* signal received - process it */
1330 zlog_warn ("select() error: %s", safe_strerror (errno));
1331 return NULL;
1332 }
1333
1334 /* Check foreground timers. Historically, they have had higher
1335 priority than I/O threads, so let's push them onto the ready
1336 list in front of the I/O threads. */
1337 quagga_get_relative (NULL);
1338 thread_timer_process (m->timer, &relative_time);
1339
1340 /* Got IO, process it */
1341 if (num > 0)
1342 thread_process_fds (m, &readfd, &writefd, num);
1343
1344 #if 0
1345 /* If any threads were made ready above (I/O or foreground timer),
1346 perhaps we should avoid adding background timers to the ready
1347 list at this time. If this is code is uncommented, then background
1348 timer threads will not run unless there is nothing else to do. */
1349 if ((thread = thread_trim_head (&m->ready)) != NULL)
1350 return thread_run (m, thread, fetch);
1351 #endif
1352
1353 /* Background timer/events, lowest priority */
1354 thread_timer_process (m->background, &relative_time);
1355
1356 if ((thread = thread_trim_head (&m->ready)) != NULL)
1357 return thread_run (m, thread, fetch);
1358 }
1359 }
1360
1361 unsigned long
1362 thread_consumed_time (RUSAGE_T *now, RUSAGE_T *start, unsigned long *cputime)
1363 {
1364 /* This is 'user + sys' time. */
1365 *cputime = timeval_elapsed (now->cpu.ru_utime, start->cpu.ru_utime) +
1366 timeval_elapsed (now->cpu.ru_stime, start->cpu.ru_stime);
1367 return timeval_elapsed (now->real, start->real);
1368 }
1369
1370 /* We should aim to yield after yield milliseconds, which defaults
1371 to THREAD_YIELD_TIME_SLOT .
1372 Note: we are using real (wall clock) time for this calculation.
1373 It could be argued that CPU time may make more sense in certain
1374 contexts. The things to consider are whether the thread may have
1375 blocked (in which case wall time increases, but CPU time does not),
1376 or whether the system is heavily loaded with other processes competing
1377 for CPU time. On balance, wall clock time seems to make sense.
1378 Plus it has the added benefit that gettimeofday should be faster
1379 than calling getrusage. */
1380 int
1381 thread_should_yield (struct thread *thread)
1382 {
1383 quagga_get_relative (NULL);
1384 return (timeval_elapsed(relative_time, thread->real) >
1385 thread->yield);
1386 }
1387
1388 void
1389 thread_set_yield_time (struct thread *thread, unsigned long yield_time)
1390 {
1391 thread->yield = yield_time;
1392 }
1393
1394 void
1395 thread_getrusage (RUSAGE_T *r)
1396 {
1397 quagga_get_relative (NULL);
1398 getrusage(RUSAGE_SELF, &(r->cpu));
1399 r->real = relative_time;
1400
1401 #ifdef HAVE_CLOCK_MONOTONIC
1402 /* quagga_get_relative() only updates recent_time if gettimeofday
1403 * based, not when using CLOCK_MONOTONIC. As we export recent_time
1404 * and guarantee to update it before threads are run...
1405 */
1406 quagga_gettimeofday(&recent_time);
1407 #endif /* HAVE_CLOCK_MONOTONIC */
1408 }
1409
1410 struct thread *thread_current = NULL;
1411
1412 /* We check thread consumed time. If the system has getrusage, we'll
1413 use that to get in-depth stats on the performance of the thread in addition
1414 to wall clock time stats from gettimeofday. */
1415 void
1416 thread_call (struct thread *thread)
1417 {
1418 unsigned long realtime, cputime;
1419 RUSAGE_T before, after;
1420
1421 /* Cache a pointer to the relevant cpu history thread, if the thread
1422 * does not have it yet.
1423 *
1424 * Callers submitting 'dummy threads' hence must take care that
1425 * thread->cpu is NULL
1426 */
1427 if (!thread->hist)
1428 {
1429 struct cpu_thread_history tmp;
1430
1431 tmp.func = thread->func;
1432 tmp.funcname = thread->funcname;
1433
1434 thread->hist = hash_get (cpu_record, &tmp,
1435 (void * (*) (void *))cpu_record_hash_alloc);
1436 }
1437
1438 GETRUSAGE (&before);
1439 thread->real = before.real;
1440
1441 thread_current = thread;
1442 (*thread->func) (thread);
1443 thread_current = NULL;
1444
1445 GETRUSAGE (&after);
1446
1447 realtime = thread_consumed_time (&after, &before, &cputime);
1448 thread->hist->real.total += realtime;
1449 if (thread->hist->real.max < realtime)
1450 thread->hist->real.max = realtime;
1451 thread->hist->cpu.total += cputime;
1452 if (thread->hist->cpu.max < cputime)
1453 thread->hist->cpu.max = cputime;
1454
1455 ++(thread->hist->total_calls);
1456 thread->hist->types |= (1 << thread->add_type);
1457
1458 #ifdef CONSUMED_TIME_CHECK
1459 if (realtime > CONSUMED_TIME_CHECK)
1460 {
1461 /*
1462 * We have a CPU Hog on our hands.
1463 * Whinge about it now, so we're aware this is yet another task
1464 * to fix.
1465 */
1466 zlog_warn ("SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1467 thread->funcname,
1468 (unsigned long) thread->func,
1469 realtime/1000, cputime/1000);
1470 }
1471 #endif /* CONSUMED_TIME_CHECK */
1472 }
1473
1474 /* Execute thread */
1475 struct thread *
1476 funcname_thread_execute (struct thread_master *m,
1477 int (*func)(struct thread *),
1478 void *arg,
1479 int val,
1480 debugargdef)
1481 {
1482 struct thread dummy;
1483
1484 memset (&dummy, 0, sizeof (struct thread));
1485
1486 dummy.type = THREAD_EVENT;
1487 dummy.add_type = THREAD_EXECUTE;
1488 dummy.master = NULL;
1489 dummy.func = func;
1490 dummy.arg = arg;
1491 dummy.u.val = val;
1492
1493 dummy.funcname = funcname;
1494 dummy.schedfrom = schedfrom;
1495 dummy.schedfrom_line = fromln;
1496
1497 thread_call (&dummy);
1498
1499 return NULL;
1500 }