]> git.proxmox.com Git - mirror_frr.git/blob - lib/thread.c
Merge remote-tracking branch 'origin/master' into pim_lib_work2
[mirror_frr.git] / lib / thread.c
1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with GNU Zebra; see the file COPYING. If not, write to the Free
18 * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
19 * 02111-1307, USA.
20 */
21
22 /* #define DEBUG */
23
24 #include <zebra.h>
25 #include <sys/resource.h>
26
27 #include "thread.h"
28 #include "memory.h"
29 #include "log.h"
30 #include "hash.h"
31 #include "pqueue.h"
32 #include "command.h"
33 #include "sigevent.h"
34
35 DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
36 DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
37 DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
38
39 #if defined(__APPLE__)
40 #include <mach/mach.h>
41 #include <mach/mach_time.h>
42 #endif
43
44 /* Recent absolute time of day */
45 struct timeval recent_time;
46 /* Relative time, since startup */
47 static struct timeval relative_time;
48
49 static struct hash *cpu_record = NULL;
50
51 /* Adjust so that tv_usec is in the range [0,TIMER_SECOND_MICRO).
52 And change negative values to 0. */
53 static struct timeval
54 timeval_adjust (struct timeval a)
55 {
56 while (a.tv_usec >= TIMER_SECOND_MICRO)
57 {
58 a.tv_usec -= TIMER_SECOND_MICRO;
59 a.tv_sec++;
60 }
61
62 while (a.tv_usec < 0)
63 {
64 a.tv_usec += TIMER_SECOND_MICRO;
65 a.tv_sec--;
66 }
67
68 if (a.tv_sec < 0)
69 /* Change negative timeouts to 0. */
70 a.tv_sec = a.tv_usec = 0;
71
72 return a;
73 }
74
75 static struct timeval
76 timeval_subtract (struct timeval a, struct timeval b)
77 {
78 struct timeval ret;
79
80 ret.tv_usec = a.tv_usec - b.tv_usec;
81 ret.tv_sec = a.tv_sec - b.tv_sec;
82
83 return timeval_adjust (ret);
84 }
85
86 static long
87 timeval_cmp (struct timeval a, struct timeval b)
88 {
89 return (a.tv_sec == b.tv_sec
90 ? a.tv_usec - b.tv_usec : a.tv_sec - b.tv_sec);
91 }
92
93 unsigned long
94 timeval_elapsed (struct timeval a, struct timeval b)
95 {
96 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
97 + (a.tv_usec - b.tv_usec));
98 }
99
100 /* gettimeofday wrapper, to keep recent_time updated */
101 static int
102 quagga_gettimeofday (struct timeval *tv)
103 {
104 int ret;
105
106 assert (tv);
107
108 if (!(ret = gettimeofday (&recent_time, NULL)))
109 {
110 /* avoid copy if user passed recent_time pointer.. */
111 if (tv != &recent_time)
112 *tv = recent_time;
113 return 0;
114 }
115 return ret;
116 }
117
118 static int
119 quagga_get_relative (struct timeval *tv)
120 {
121 int ret;
122
123 #ifdef HAVE_CLOCK_MONOTONIC
124 {
125 struct timespec tp;
126 if (!(ret = clock_gettime (CLOCK_MONOTONIC, &tp)))
127 {
128 relative_time.tv_sec = tp.tv_sec;
129 relative_time.tv_usec = tp.tv_nsec / 1000;
130 }
131 }
132 #elif defined(__APPLE__)
133 {
134 uint64_t ticks;
135 uint64_t useconds;
136 static mach_timebase_info_data_t timebase_info;
137
138 ticks = mach_absolute_time();
139 if (timebase_info.denom == 0)
140 mach_timebase_info(&timebase_info);
141
142 useconds = ticks * timebase_info.numer / timebase_info.denom / 1000;
143 relative_time.tv_sec = useconds / 1000000;
144 relative_time.tv_usec = useconds % 1000000;
145
146 return 0;
147 }
148 #else /* !HAVE_CLOCK_MONOTONIC && !__APPLE__ */
149 #error no monotonic clock on this system
150 #endif /* HAVE_CLOCK_MONOTONIC */
151
152 if (tv)
153 *tv = relative_time;
154
155 return ret;
156 }
157
158 /* Exported Quagga timestamp function.
159 * Modelled on POSIX clock_gettime.
160 */
161 int
162 quagga_gettime (enum quagga_clkid clkid, struct timeval *tv)
163 {
164 switch (clkid)
165 {
166 case QUAGGA_CLK_MONOTONIC:
167 return quagga_get_relative (tv);
168 default:
169 errno = EINVAL;
170 return -1;
171 }
172 }
173
174 time_t
175 quagga_monotime (void)
176 {
177 struct timeval tv;
178 quagga_get_relative(&tv);
179 return tv.tv_sec;
180 }
181
182 /* Public export of recent_relative_time by value */
183 struct timeval
184 recent_relative_time (void)
185 {
186 return relative_time;
187 }
188
189 static unsigned int
190 cpu_record_hash_key (struct cpu_thread_history *a)
191 {
192 return (uintptr_t) a->func;
193 }
194
195 static int
196 cpu_record_hash_cmp (const struct cpu_thread_history *a,
197 const struct cpu_thread_history *b)
198 {
199 return a->func == b->func;
200 }
201
202 static void *
203 cpu_record_hash_alloc (struct cpu_thread_history *a)
204 {
205 struct cpu_thread_history *new;
206 new = XCALLOC (MTYPE_THREAD_STATS, sizeof (struct cpu_thread_history));
207 new->func = a->func;
208 new->funcname = a->funcname;
209 return new;
210 }
211
212 static void
213 cpu_record_hash_free (void *a)
214 {
215 struct cpu_thread_history *hist = a;
216
217 XFREE (MTYPE_THREAD_STATS, hist);
218 }
219
220 static void
221 vty_out_cpu_thread_history(struct vty* vty,
222 struct cpu_thread_history *a)
223 {
224 vty_out(vty, "%5d %10ld.%03ld %9d %8ld %9ld %8ld %9ld",
225 a->total_active, a->cpu.total/1000, a->cpu.total%1000, a->total_calls,
226 a->cpu.total/a->total_calls, a->cpu.max,
227 a->real.total/a->total_calls, a->real.max);
228 vty_out(vty, " %c%c%c%c%c%c %s%s",
229 a->types & (1 << THREAD_READ) ? 'R':' ',
230 a->types & (1 << THREAD_WRITE) ? 'W':' ',
231 a->types & (1 << THREAD_TIMER) ? 'T':' ',
232 a->types & (1 << THREAD_EVENT) ? 'E':' ',
233 a->types & (1 << THREAD_EXECUTE) ? 'X':' ',
234 a->types & (1 << THREAD_BACKGROUND) ? 'B' : ' ',
235 a->funcname, VTY_NEWLINE);
236 }
237
238 static void
239 cpu_record_hash_print(struct hash_backet *bucket,
240 void *args[])
241 {
242 struct cpu_thread_history *totals = args[0];
243 struct vty *vty = args[1];
244 thread_type *filter = args[2];
245 struct cpu_thread_history *a = bucket->data;
246
247 if ( !(a->types & *filter) )
248 return;
249 vty_out_cpu_thread_history(vty,a);
250 totals->total_active += a->total_active;
251 totals->total_calls += a->total_calls;
252 totals->real.total += a->real.total;
253 if (totals->real.max < a->real.max)
254 totals->real.max = a->real.max;
255 totals->cpu.total += a->cpu.total;
256 if (totals->cpu.max < a->cpu.max)
257 totals->cpu.max = a->cpu.max;
258 }
259
260 static void
261 cpu_record_print(struct vty *vty, thread_type filter)
262 {
263 struct cpu_thread_history tmp;
264 void *args[3] = {&tmp, vty, &filter};
265
266 memset(&tmp, 0, sizeof tmp);
267 tmp.funcname = "TOTAL";
268 tmp.types = filter;
269
270 vty_out(vty, "%21s %18s %18s%s",
271 "", "CPU (user+system):", "Real (wall-clock):", VTY_NEWLINE);
272 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
273 vty_out(vty, " Avg uSec Max uSecs");
274 vty_out(vty, " Type Thread%s", VTY_NEWLINE);
275 hash_iterate(cpu_record,
276 (void(*)(struct hash_backet*,void*))cpu_record_hash_print,
277 args);
278
279 if (tmp.total_calls > 0)
280 vty_out_cpu_thread_history(vty, &tmp);
281 }
282
283 DEFUN (show_thread_cpu,
284 show_thread_cpu_cmd,
285 "show thread cpu [FILTER]",
286 SHOW_STR
287 "Thread information\n"
288 "Thread CPU usage\n"
289 "Display filter (rwtexb)\n")
290 {
291 int idx_filter = 3;
292 int i = 0;
293 thread_type filter = (thread_type) -1U;
294
295 if (argc > 3)
296 {
297 filter = 0;
298 while (argv[idx_filter]->arg[i] != '\0')
299 {
300 switch ( argv[idx_filter]->arg[i] )
301 {
302 case 'r':
303 case 'R':
304 filter |= (1 << THREAD_READ);
305 break;
306 case 'w':
307 case 'W':
308 filter |= (1 << THREAD_WRITE);
309 break;
310 case 't':
311 case 'T':
312 filter |= (1 << THREAD_TIMER);
313 break;
314 case 'e':
315 case 'E':
316 filter |= (1 << THREAD_EVENT);
317 break;
318 case 'x':
319 case 'X':
320 filter |= (1 << THREAD_EXECUTE);
321 break;
322 case 'b':
323 case 'B':
324 filter |= (1 << THREAD_BACKGROUND);
325 break;
326 default:
327 break;
328 }
329 ++i;
330 }
331 if (filter == 0)
332 {
333 vty_out(vty, "Invalid filter \"%s\" specified,"
334 " must contain at least one of 'RWTEXB'%s",
335 argv[idx_filter]->arg, VTY_NEWLINE);
336 return CMD_WARNING;
337 }
338 }
339
340 cpu_record_print(vty, filter);
341 return CMD_SUCCESS;
342 }
343
344 static void
345 cpu_record_hash_clear (struct hash_backet *bucket,
346 void *args)
347 {
348 thread_type *filter = args;
349 struct cpu_thread_history *a = bucket->data;
350
351 if ( !(a->types & *filter) )
352 return;
353
354 hash_release (cpu_record, bucket->data);
355 }
356
357 static void
358 cpu_record_clear (thread_type filter)
359 {
360 thread_type *tmp = &filter;
361 hash_iterate (cpu_record,
362 (void (*) (struct hash_backet*,void*)) cpu_record_hash_clear,
363 tmp);
364 }
365
366 DEFUN (clear_thread_cpu,
367 clear_thread_cpu_cmd,
368 "clear thread cpu [FILTER]",
369 "Clear stored data\n"
370 "Thread information\n"
371 "Thread CPU usage\n"
372 "Display filter (rwtexb)\n")
373 {
374 int idx_filter = 3;
375 int i = 0;
376 thread_type filter = (thread_type) -1U;
377
378 if (argc > 3)
379 {
380 filter = 0;
381 while (argv[idx_filter]->arg[i] != '\0')
382 {
383 switch ( argv[idx_filter]->arg[i] )
384 {
385 case 'r':
386 case 'R':
387 filter |= (1 << THREAD_READ);
388 break;
389 case 'w':
390 case 'W':
391 filter |= (1 << THREAD_WRITE);
392 break;
393 case 't':
394 case 'T':
395 filter |= (1 << THREAD_TIMER);
396 break;
397 case 'e':
398 case 'E':
399 filter |= (1 << THREAD_EVENT);
400 break;
401 case 'x':
402 case 'X':
403 filter |= (1 << THREAD_EXECUTE);
404 break;
405 case 'b':
406 case 'B':
407 filter |= (1 << THREAD_BACKGROUND);
408 break;
409 default:
410 break;
411 }
412 ++i;
413 }
414 if (filter == 0)
415 {
416 vty_out(vty, "Invalid filter \"%s\" specified,"
417 " must contain at least one of 'RWTEXB'%s",
418 argv[idx_filter]->arg, VTY_NEWLINE);
419 return CMD_WARNING;
420 }
421 }
422
423 cpu_record_clear (filter);
424 return CMD_SUCCESS;
425 }
426
427 void
428 thread_cmd_init (void)
429 {
430 install_element (VIEW_NODE, &show_thread_cpu_cmd);
431 install_element (ENABLE_NODE, &clear_thread_cpu_cmd);
432 }
433
434 static int
435 thread_timer_cmp(void *a, void *b)
436 {
437 struct thread *thread_a = a;
438 struct thread *thread_b = b;
439
440 long cmp = timeval_cmp(thread_a->u.sands, thread_b->u.sands);
441
442 if (cmp < 0)
443 return -1;
444 if (cmp > 0)
445 return 1;
446 return 0;
447 }
448
449 static void
450 thread_timer_update(void *node, int actual_position)
451 {
452 struct thread *thread = node;
453
454 thread->index = actual_position;
455 }
456
457 /* Allocate new thread master. */
458 struct thread_master *
459 thread_master_create (void)
460 {
461 struct thread_master *rv;
462 struct rlimit limit;
463
464 getrlimit(RLIMIT_NOFILE, &limit);
465
466 if (cpu_record == NULL)
467 cpu_record
468 = hash_create ((unsigned int (*) (void *))cpu_record_hash_key,
469 (int (*) (const void *, const void *))cpu_record_hash_cmp);
470
471 rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master));
472 if (rv == NULL)
473 {
474 return NULL;
475 }
476
477 rv->fd_limit = (int)limit.rlim_cur;
478 rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
479 if (rv->read == NULL)
480 {
481 XFREE (MTYPE_THREAD_MASTER, rv);
482 return NULL;
483 }
484
485 rv->write = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
486 if (rv->write == NULL)
487 {
488 XFREE (MTYPE_THREAD, rv->read);
489 XFREE (MTYPE_THREAD_MASTER, rv);
490 return NULL;
491 }
492
493 /* Initialize the timer queues */
494 rv->timer = pqueue_create();
495 rv->background = pqueue_create();
496 rv->timer->cmp = rv->background->cmp = thread_timer_cmp;
497 rv->timer->update = rv->background->update = thread_timer_update;
498
499 #if defined(HAVE_POLL)
500 rv->handler.pfdsize = rv->fd_limit;
501 rv->handler.pfdcount = 0;
502 rv->handler.pfds = (struct pollfd *) malloc (sizeof (struct pollfd) * rv->handler.pfdsize);
503 memset (rv->handler.pfds, 0, sizeof (struct pollfd) * rv->handler.pfdsize);
504 #endif
505 return rv;
506 }
507
508 /* Add a new thread to the list. */
509 static void
510 thread_list_add (struct thread_list *list, struct thread *thread)
511 {
512 thread->next = NULL;
513 thread->prev = list->tail;
514 if (list->tail)
515 list->tail->next = thread;
516 else
517 list->head = thread;
518 list->tail = thread;
519 list->count++;
520 }
521
522 /* Delete a thread from the list. */
523 static struct thread *
524 thread_list_delete (struct thread_list *list, struct thread *thread)
525 {
526 if (thread->next)
527 thread->next->prev = thread->prev;
528 else
529 list->tail = thread->prev;
530 if (thread->prev)
531 thread->prev->next = thread->next;
532 else
533 list->head = thread->next;
534 thread->next = thread->prev = NULL;
535 list->count--;
536 return thread;
537 }
538
539 static void
540 thread_delete_fd (struct thread **thread_array, struct thread *thread)
541 {
542 thread_array[thread->u.fd] = NULL;
543 }
544
545 static void
546 thread_add_fd (struct thread **thread_array, struct thread *thread)
547 {
548 thread_array[thread->u.fd] = thread;
549 }
550
551 /* Thread list is empty or not. */
552 static int
553 thread_empty (struct thread_list *list)
554 {
555 return list->head ? 0 : 1;
556 }
557
558 /* Delete top of the list and return it. */
559 static struct thread *
560 thread_trim_head (struct thread_list *list)
561 {
562 if (!thread_empty (list))
563 return thread_list_delete (list, list->head);
564 return NULL;
565 }
566
567 /* Move thread to unuse list. */
568 static void
569 thread_add_unuse (struct thread_master *m, struct thread *thread)
570 {
571 assert (m != NULL && thread != NULL);
572 assert (thread->next == NULL);
573 assert (thread->prev == NULL);
574
575 thread->type = THREAD_UNUSED;
576 thread->hist->total_active--;
577 thread_list_add (&m->unuse, thread);
578 }
579
580 /* Free all unused thread. */
581 static void
582 thread_list_free (struct thread_master *m, struct thread_list *list)
583 {
584 struct thread *t;
585 struct thread *next;
586
587 for (t = list->head; t; t = next)
588 {
589 next = t->next;
590 XFREE (MTYPE_THREAD, t);
591 list->count--;
592 m->alloc--;
593 }
594 }
595
596 static void
597 thread_array_free (struct thread_master *m, struct thread **thread_array)
598 {
599 struct thread *t;
600 int index;
601
602 for (index = 0; index < m->fd_limit; ++index)
603 {
604 t = thread_array[index];
605 if (t)
606 {
607 thread_array[index] = NULL;
608 XFREE (MTYPE_THREAD, t);
609 m->alloc--;
610 }
611 }
612 XFREE (MTYPE_THREAD, thread_array);
613 }
614
615 static void
616 thread_queue_free (struct thread_master *m, struct pqueue *queue)
617 {
618 int i;
619
620 for (i = 0; i < queue->size; i++)
621 XFREE(MTYPE_THREAD, queue->array[i]);
622
623 m->alloc -= queue->size;
624 pqueue_delete(queue);
625 }
626
627 /*
628 * thread_master_free_unused
629 *
630 * As threads are finished with they are put on the
631 * unuse list for later reuse.
632 * If we are shutting down, Free up unused threads
633 * So we can see if we forget to shut anything off
634 */
635 void
636 thread_master_free_unused (struct thread_master *m)
637 {
638 struct thread *t;
639 while ((t = thread_trim_head(&m->unuse)) != NULL)
640 {
641 XFREE(MTYPE_THREAD, t);
642 }
643 }
644
645 /* Stop thread scheduler. */
646 void
647 thread_master_free (struct thread_master *m)
648 {
649 thread_array_free (m, m->read);
650 thread_array_free (m, m->write);
651 thread_queue_free (m, m->timer);
652 thread_list_free (m, &m->event);
653 thread_list_free (m, &m->ready);
654 thread_list_free (m, &m->unuse);
655 thread_queue_free (m, m->background);
656
657 #if defined(HAVE_POLL)
658 XFREE (MTYPE_THREAD_MASTER, m->handler.pfds);
659 #endif
660 XFREE (MTYPE_THREAD_MASTER, m);
661
662 if (cpu_record)
663 {
664 hash_clean (cpu_record, cpu_record_hash_free);
665 hash_free (cpu_record);
666 cpu_record = NULL;
667 }
668 }
669
670 /* Return remain time in second. */
671 unsigned long
672 thread_timer_remain_second (struct thread *thread)
673 {
674 quagga_get_relative (NULL);
675
676 if (thread->u.sands.tv_sec - relative_time.tv_sec > 0)
677 return thread->u.sands.tv_sec - relative_time.tv_sec;
678 else
679 return 0;
680 }
681
682 #define debugargdef const char *funcname, const char *schedfrom, int fromln
683 #define debugargpass funcname, schedfrom, fromln
684
685 struct timeval
686 thread_timer_remain(struct thread *thread)
687 {
688 quagga_get_relative(NULL);
689
690 return timeval_subtract(thread->u.sands, relative_time);
691 }
692
693 /* Get new thread. */
694 static struct thread *
695 thread_get (struct thread_master *m, u_char type,
696 int (*func) (struct thread *), void *arg, debugargdef)
697 {
698 struct thread *thread = thread_trim_head (&m->unuse);
699 struct cpu_thread_history tmp;
700
701 if (! thread)
702 {
703 thread = XCALLOC (MTYPE_THREAD, sizeof (struct thread));
704 m->alloc++;
705 }
706 thread->type = type;
707 thread->add_type = type;
708 thread->master = m;
709 thread->arg = arg;
710 thread->index = -1;
711 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
712
713 /*
714 * So if the passed in funcname is not what we have
715 * stored that means the thread->hist needs to be
716 * updated. We keep the last one around in unused
717 * under the assumption that we are probably
718 * going to immediately allocate the same
719 * type of thread.
720 * This hopefully saves us some serious
721 * hash_get lookups.
722 */
723 if (thread->funcname != funcname ||
724 thread->func != func)
725 {
726 tmp.func = func;
727 tmp.funcname = funcname;
728 thread->hist = hash_get (cpu_record, &tmp,
729 (void * (*) (void *))cpu_record_hash_alloc);
730 }
731 thread->hist->total_active++;
732 thread->func = func;
733 thread->funcname = funcname;
734 thread->schedfrom = schedfrom;
735 thread->schedfrom_line = fromln;
736
737 return thread;
738 }
739
740 #if defined (HAVE_POLL)
741
742 #define fd_copy_fd_set(X) (X)
743
744 /* generic add thread function */
745 static struct thread *
746 generic_thread_add(struct thread_master *m, int (*func) (struct thread *),
747 void *arg, int fd, int dir, debugargdef)
748 {
749 struct thread *thread;
750
751 u_char type;
752 short int event;
753
754 if (dir == THREAD_READ)
755 {
756 event = (POLLIN | POLLHUP);
757 type = THREAD_READ;
758 }
759 else
760 {
761 event = (POLLOUT | POLLHUP);
762 type = THREAD_WRITE;
763 }
764
765 nfds_t queuepos = m->handler.pfdcount;
766 nfds_t i=0;
767 for (i=0; i<m->handler.pfdcount; i++)
768 if (m->handler.pfds[i].fd == fd)
769 {
770 queuepos = i;
771 break;
772 }
773
774 /* is there enough space for a new fd? */
775 assert (queuepos < m->handler.pfdsize);
776
777 thread = thread_get (m, type, func, arg, debugargpass);
778 m->handler.pfds[queuepos].fd = fd;
779 m->handler.pfds[queuepos].events |= event;
780 if (queuepos == m->handler.pfdcount)
781 m->handler.pfdcount++;
782
783 return thread;
784 }
785 #else
786
787 #define fd_copy_fd_set(X) (X)
788 #endif
789
790 static int
791 fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *timer_wait)
792 {
793 int num;
794 #if defined(HAVE_POLL)
795 /* recalc timeout for poll. Attention NULL pointer is no timeout with
796 select, where with poll no timeount is -1 */
797 int timeout = -1;
798 if (timer_wait != NULL)
799 timeout = (timer_wait->tv_sec*1000) + (timer_wait->tv_usec/1000);
800
801 num = poll (m->handler.pfds, m->handler.pfdcount + m->handler.pfdcountsnmp, timeout);
802 #else
803 num = select (size, read, write, except, timer_wait);
804 #endif
805
806 return num;
807 }
808
809 static int
810 fd_is_set (struct thread *thread, thread_fd_set *fdset, int pos)
811 {
812 #if defined(HAVE_POLL)
813 return 1;
814 #else
815 return FD_ISSET (THREAD_FD (thread), fdset);
816 #endif
817 }
818
819 static int
820 fd_clear_read_write (struct thread *thread)
821 {
822 #if !defined(HAVE_POLL)
823 thread_fd_set *fdset = NULL;
824 int fd = THREAD_FD (thread);
825
826 if (thread->type == THREAD_READ)
827 fdset = &thread->master->handler.readfd;
828 else
829 fdset = &thread->master->handler.writefd;
830
831 if (!FD_ISSET (fd, fdset))
832 return 0;
833
834 FD_CLR (fd, fdset);
835 #endif
836 return 1;
837 }
838
839 /* Add new read thread. */
840 struct thread *
841 funcname_thread_add_read_write (int dir, struct thread_master *m,
842 int (*func) (struct thread *), void *arg, int fd,
843 debugargdef)
844 {
845 struct thread *thread = NULL;
846
847 #if !defined(HAVE_POLL)
848 thread_fd_set *fdset = NULL;
849 if (dir == THREAD_READ)
850 fdset = &m->handler.readfd;
851 else
852 fdset = &m->handler.writefd;
853 #endif
854
855 #if defined (HAVE_POLL)
856 thread = generic_thread_add(m, func, arg, fd, dir, debugargpass);
857
858 if (thread == NULL)
859 return NULL;
860 #else
861 if (FD_ISSET (fd, fdset))
862 {
863 zlog (NULL, LOG_WARNING, "There is already %s fd [%d]", (dir = THREAD_READ) ? "read" : "write", fd);
864 return NULL;
865 }
866
867 FD_SET (fd, fdset);
868 thread = thread_get (m, dir, func, arg, debugargpass);
869 #endif
870
871 thread->u.fd = fd;
872 if (dir == THREAD_READ)
873 thread_add_fd (m->read, thread);
874 else
875 thread_add_fd (m->write, thread);
876
877 return thread;
878 }
879
880 static struct thread *
881 funcname_thread_add_timer_timeval (struct thread_master *m,
882 int (*func) (struct thread *),
883 int type,
884 void *arg,
885 struct timeval *time_relative,
886 debugargdef)
887 {
888 struct thread *thread;
889 struct pqueue *queue;
890 struct timeval alarm_time;
891
892 assert (m != NULL);
893
894 assert (type == THREAD_TIMER || type == THREAD_BACKGROUND);
895 assert (time_relative);
896
897 queue = ((type == THREAD_TIMER) ? m->timer : m->background);
898 thread = thread_get (m, type, func, arg, debugargpass);
899
900 /* Do we need jitter here? */
901 quagga_get_relative (NULL);
902 alarm_time.tv_sec = relative_time.tv_sec + time_relative->tv_sec;
903 alarm_time.tv_usec = relative_time.tv_usec + time_relative->tv_usec;
904 thread->u.sands = timeval_adjust(alarm_time);
905
906 pqueue_enqueue(thread, queue);
907 return thread;
908 }
909
910
911 /* Add timer event thread. */
912 struct thread *
913 funcname_thread_add_timer (struct thread_master *m,
914 int (*func) (struct thread *),
915 void *arg, long timer,
916 debugargdef)
917 {
918 struct timeval trel;
919
920 assert (m != NULL);
921
922 trel.tv_sec = timer;
923 trel.tv_usec = 0;
924
925 return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER, arg,
926 &trel, debugargpass);
927 }
928
929 /* Add timer event thread with "millisecond" resolution */
930 struct thread *
931 funcname_thread_add_timer_msec (struct thread_master *m,
932 int (*func) (struct thread *),
933 void *arg, long timer,
934 debugargdef)
935 {
936 struct timeval trel;
937
938 assert (m != NULL);
939
940 trel.tv_sec = timer / 1000;
941 trel.tv_usec = 1000*(timer % 1000);
942
943 return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER,
944 arg, &trel, debugargpass);
945 }
946
947 /* Add timer event thread with "millisecond" resolution */
948 struct thread *
949 funcname_thread_add_timer_tv (struct thread_master *m,
950 int (*func) (struct thread *),
951 void *arg, struct timeval *tv,
952 debugargdef)
953 {
954 return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER,
955 arg, tv, debugargpass);
956 }
957
958 /* Add a background thread, with an optional millisec delay */
959 struct thread *
960 funcname_thread_add_background (struct thread_master *m,
961 int (*func) (struct thread *),
962 void *arg, long delay,
963 debugargdef)
964 {
965 struct timeval trel;
966
967 assert (m != NULL);
968
969 if (delay)
970 {
971 trel.tv_sec = delay / 1000;
972 trel.tv_usec = 1000*(delay % 1000);
973 }
974 else
975 {
976 trel.tv_sec = 0;
977 trel.tv_usec = 0;
978 }
979
980 return funcname_thread_add_timer_timeval (m, func, THREAD_BACKGROUND,
981 arg, &trel, debugargpass);
982 }
983
984 /* Add simple event thread. */
985 struct thread *
986 funcname_thread_add_event (struct thread_master *m,
987 int (*func) (struct thread *), void *arg, int val,
988 debugargdef)
989 {
990 struct thread *thread;
991
992 assert (m != NULL);
993
994 thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass);
995 thread->u.val = val;
996 thread_list_add (&m->event, thread);
997
998 return thread;
999 }
1000
1001 static void
1002 thread_cancel_read_or_write (struct thread *thread, short int state)
1003 {
1004 #if defined(HAVE_POLL)
1005 nfds_t i;
1006
1007 for (i=0;i<thread->master->handler.pfdcount;++i)
1008 if (thread->master->handler.pfds[i].fd == thread->u.fd)
1009 {
1010 thread->master->handler.pfds[i].events &= ~(state);
1011
1012 /* remove thread fds from pfd list */
1013 if (thread->master->handler.pfds[i].events == 0)
1014 {
1015 memmove(thread->master->handler.pfds+i,
1016 thread->master->handler.pfds+i+1,
1017 (thread->master->handler.pfdsize-i-1) * sizeof(struct pollfd));
1018 thread->master->handler.pfdcount--;
1019 return;
1020 }
1021 }
1022 #endif
1023
1024 fd_clear_read_write (thread);
1025 }
1026
1027 /* Cancel thread from scheduler. */
1028 void
1029 thread_cancel (struct thread *thread)
1030 {
1031 struct thread_list *list = NULL;
1032 struct pqueue *queue = NULL;
1033 struct thread **thread_array = NULL;
1034
1035 switch (thread->type)
1036 {
1037 case THREAD_READ:
1038 #if defined (HAVE_POLL)
1039 thread_cancel_read_or_write (thread, POLLIN | POLLHUP);
1040 #else
1041 thread_cancel_read_or_write (thread, 0);
1042 #endif
1043 thread_array = thread->master->read;
1044 break;
1045 case THREAD_WRITE:
1046 #if defined (HAVE_POLL)
1047 thread_cancel_read_or_write (thread, POLLOUT | POLLHUP);
1048 #else
1049 thread_cancel_read_or_write (thread, 0);
1050 #endif
1051 thread_array = thread->master->write;
1052 break;
1053 case THREAD_TIMER:
1054 queue = thread->master->timer;
1055 break;
1056 case THREAD_EVENT:
1057 list = &thread->master->event;
1058 break;
1059 case THREAD_READY:
1060 list = &thread->master->ready;
1061 break;
1062 case THREAD_BACKGROUND:
1063 queue = thread->master->background;
1064 break;
1065 default:
1066 return;
1067 break;
1068 }
1069
1070 if (queue)
1071 {
1072 assert(thread->index >= 0);
1073 assert(thread == queue->array[thread->index]);
1074 pqueue_remove_at(thread->index, queue);
1075 }
1076 else if (list)
1077 {
1078 thread_list_delete (list, thread);
1079 }
1080 else if (thread_array)
1081 {
1082 thread_delete_fd (thread_array, thread);
1083 }
1084 else
1085 {
1086 assert(!"Thread should be either in queue or list or array!");
1087 }
1088
1089 thread_add_unuse (thread->master, thread);
1090 }
1091
1092 /* Delete all events which has argument value arg. */
1093 unsigned int
1094 thread_cancel_event (struct thread_master *m, void *arg)
1095 {
1096 unsigned int ret = 0;
1097 struct thread *thread;
1098
1099 thread = m->event.head;
1100 while (thread)
1101 {
1102 struct thread *t;
1103
1104 t = thread;
1105 thread = t->next;
1106
1107 if (t->arg == arg)
1108 {
1109 ret++;
1110 thread_list_delete (&m->event, t);
1111 thread_add_unuse (m, t);
1112 }
1113 }
1114
1115 /* thread can be on the ready list too */
1116 thread = m->ready.head;
1117 while (thread)
1118 {
1119 struct thread *t;
1120
1121 t = thread;
1122 thread = t->next;
1123
1124 if (t->arg == arg)
1125 {
1126 ret++;
1127 thread_list_delete (&m->ready, t);
1128 thread_add_unuse (m, t);
1129 }
1130 }
1131 return ret;
1132 }
1133
1134 static struct timeval *
1135 thread_timer_wait (struct pqueue *queue, struct timeval *timer_val)
1136 {
1137 if (queue->size)
1138 {
1139 struct thread *next_timer = queue->array[0];
1140 *timer_val = timeval_subtract (next_timer->u.sands, relative_time);
1141 return timer_val;
1142 }
1143 return NULL;
1144 }
1145
1146 static struct thread *
1147 thread_run (struct thread_master *m, struct thread *thread,
1148 struct thread *fetch)
1149 {
1150 *fetch = *thread;
1151 thread_add_unuse (m, thread);
1152 return fetch;
1153 }
1154
1155 static int
1156 thread_process_fds_helper (struct thread_master *m, struct thread *thread, thread_fd_set *fdset, short int state, int pos)
1157 {
1158 struct thread **thread_array;
1159
1160 if (!thread)
1161 return 0;
1162
1163 if (thread->type == THREAD_READ)
1164 thread_array = m->read;
1165 else
1166 thread_array = m->write;
1167
1168 if (fd_is_set (thread, fdset, pos))
1169 {
1170 fd_clear_read_write (thread);
1171 thread_delete_fd (thread_array, thread);
1172 thread_list_add (&m->ready, thread);
1173 thread->type = THREAD_READY;
1174 #if defined(HAVE_POLL)
1175 thread->master->handler.pfds[pos].events &= ~(state);
1176 #endif
1177 return 1;
1178 }
1179 return 0;
1180 }
1181
1182 #if defined(HAVE_POLL)
1183
1184 #if defined(HAVE_SNMP)
1185 /* add snmp fds to poll set */
1186 static void
1187 add_snmp_pollfds(struct thread_master *m, fd_set *snmpfds, int fdsetsize)
1188 {
1189 int i;
1190 m->handler.pfdcountsnmp = m->handler.pfdcount;
1191 /* cycle trough fds and add neccessary fds to poll set */
1192 for (i=0;i<fdsetsize;++i)
1193 {
1194 if (FD_ISSET(i, snmpfds))
1195 {
1196 assert (m->handler.pfdcountsnmp <= m->handler.pfdsize);
1197
1198 m->handler.pfds[m->handler.pfdcountsnmp].fd = i;
1199 m->handler.pfds[m->handler.pfdcountsnmp].events = POLLIN;
1200 m->handler.pfdcountsnmp++;
1201 }
1202 }
1203 }
1204 #endif
1205
1206 /* check poll events */
1207 static void
1208 check_pollfds(struct thread_master *m, fd_set *readfd, int num)
1209 {
1210 nfds_t i = 0;
1211 int ready = 0;
1212 for (i = 0; i < m->handler.pfdcount && ready < num ; ++i)
1213 {
1214 /* no event for current fd? immideatly continue */
1215 if(m->handler.pfds[i].revents == 0)
1216 continue;
1217
1218 ready++;
1219
1220 /* POLLIN / POLLOUT process event */
1221 if (m->handler.pfds[i].revents & POLLIN)
1222 thread_process_fds_helper(m, m->read[m->handler.pfds[i].fd], NULL, POLLIN, i);
1223 if (m->handler.pfds[i].revents & POLLOUT)
1224 thread_process_fds_helper(m, m->write[m->handler.pfds[i].fd], NULL, POLLOUT, i);
1225
1226 /* remove fd from list on POLLNVAL */
1227 if (m->handler.pfds[i].revents & POLLNVAL ||
1228 m->handler.pfds[i].revents & POLLHUP)
1229 {
1230 memmove(m->handler.pfds+i,
1231 m->handler.pfds+i+1,
1232 (m->handler.pfdsize-i-1) * sizeof(struct pollfd));
1233 m->handler.pfdcount--;
1234 i--;
1235 }
1236 else
1237 m->handler.pfds[i].revents = 0;
1238 }
1239 }
1240 #endif
1241
1242 static void
1243 thread_process_fds (struct thread_master *m, thread_fd_set *rset, thread_fd_set *wset, int num)
1244 {
1245 #if defined (HAVE_POLL)
1246 check_pollfds (m, rset, num);
1247 #else
1248 int ready = 0, index;
1249
1250 for (index = 0; index < m->fd_limit && ready < num; ++index)
1251 {
1252 ready += thread_process_fds_helper (m, m->read[index], rset, 0, 0);
1253 ready += thread_process_fds_helper (m, m->write[index], wset, 0, 0);
1254 }
1255 #endif
1256 }
1257
1258 /* Add all timers that have popped to the ready list. */
1259 static unsigned int
1260 thread_timer_process (struct pqueue *queue, struct timeval *timenow)
1261 {
1262 struct thread *thread;
1263 unsigned int ready = 0;
1264
1265 while (queue->size)
1266 {
1267 thread = queue->array[0];
1268 if (timeval_cmp (*timenow, thread->u.sands) < 0)
1269 return ready;
1270 pqueue_dequeue(queue);
1271 thread->type = THREAD_READY;
1272 thread_list_add (&thread->master->ready, thread);
1273 ready++;
1274 }
1275 return ready;
1276 }
1277
1278 /* process a list en masse, e.g. for event thread lists */
1279 static unsigned int
1280 thread_process (struct thread_list *list)
1281 {
1282 struct thread *thread;
1283 struct thread *next;
1284 unsigned int ready = 0;
1285
1286 for (thread = list->head; thread; thread = next)
1287 {
1288 next = thread->next;
1289 thread_list_delete (list, thread);
1290 thread->type = THREAD_READY;
1291 thread_list_add (&thread->master->ready, thread);
1292 ready++;
1293 }
1294 return ready;
1295 }
1296
1297
1298 /* Fetch next ready thread. */
1299 struct thread *
1300 thread_fetch (struct thread_master *m, struct thread *fetch)
1301 {
1302 struct thread *thread;
1303 thread_fd_set readfd;
1304 thread_fd_set writefd;
1305 thread_fd_set exceptfd;
1306 struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 };
1307 struct timeval timer_val_bg;
1308 struct timeval *timer_wait = &timer_val;
1309 struct timeval *timer_wait_bg;
1310
1311 while (1)
1312 {
1313 int num = 0;
1314
1315 /* Signals pre-empt everything */
1316 quagga_sigevent_process ();
1317
1318 /* Drain the ready queue of already scheduled jobs, before scheduling
1319 * more.
1320 */
1321 if ((thread = thread_trim_head (&m->ready)) != NULL)
1322 return thread_run (m, thread, fetch);
1323
1324 /* To be fair to all kinds of threads, and avoid starvation, we
1325 * need to be careful to consider all thread types for scheduling
1326 * in each quanta. I.e. we should not return early from here on.
1327 */
1328
1329 /* Normal event are the next highest priority. */
1330 thread_process (&m->event);
1331
1332 /* Structure copy. */
1333 #if !defined(HAVE_POLL)
1334 readfd = fd_copy_fd_set(m->handler.readfd);
1335 writefd = fd_copy_fd_set(m->handler.writefd);
1336 exceptfd = fd_copy_fd_set(m->handler.exceptfd);
1337 #endif
1338
1339 /* Calculate select wait timer if nothing else to do */
1340 if (m->ready.count == 0)
1341 {
1342 quagga_get_relative (NULL);
1343 timer_wait = thread_timer_wait (m->timer, &timer_val);
1344 timer_wait_bg = thread_timer_wait (m->background, &timer_val_bg);
1345
1346 if (timer_wait_bg &&
1347 (!timer_wait || (timeval_cmp (*timer_wait, *timer_wait_bg) > 0)))
1348 timer_wait = timer_wait_bg;
1349 }
1350
1351 num = fd_select (m, FD_SETSIZE, &readfd, &writefd, &exceptfd, timer_wait);
1352
1353 /* Signals should get quick treatment */
1354 if (num < 0)
1355 {
1356 if (errno == EINTR)
1357 continue; /* signal received - process it */
1358 zlog_warn ("select() error: %s", safe_strerror (errno));
1359 return NULL;
1360 }
1361
1362 /* Check foreground timers. Historically, they have had higher
1363 priority than I/O threads, so let's push them onto the ready
1364 list in front of the I/O threads. */
1365 quagga_get_relative (NULL);
1366 thread_timer_process (m->timer, &relative_time);
1367
1368 /* Got IO, process it */
1369 if (num > 0)
1370 thread_process_fds (m, &readfd, &writefd, num);
1371
1372 #if 0
1373 /* If any threads were made ready above (I/O or foreground timer),
1374 perhaps we should avoid adding background timers to the ready
1375 list at this time. If this is code is uncommented, then background
1376 timer threads will not run unless there is nothing else to do. */
1377 if ((thread = thread_trim_head (&m->ready)) != NULL)
1378 return thread_run (m, thread, fetch);
1379 #endif
1380
1381 /* Background timer/events, lowest priority */
1382 thread_timer_process (m->background, &relative_time);
1383
1384 if ((thread = thread_trim_head (&m->ready)) != NULL)
1385 return thread_run (m, thread, fetch);
1386 }
1387 }
1388
1389 unsigned long
1390 thread_consumed_time (RUSAGE_T *now, RUSAGE_T *start, unsigned long *cputime)
1391 {
1392 /* This is 'user + sys' time. */
1393 *cputime = timeval_elapsed (now->cpu.ru_utime, start->cpu.ru_utime) +
1394 timeval_elapsed (now->cpu.ru_stime, start->cpu.ru_stime);
1395 return timeval_elapsed (now->real, start->real);
1396 }
1397
1398 /* We should aim to yield after yield milliseconds, which defaults
1399 to THREAD_YIELD_TIME_SLOT .
1400 Note: we are using real (wall clock) time for this calculation.
1401 It could be argued that CPU time may make more sense in certain
1402 contexts. The things to consider are whether the thread may have
1403 blocked (in which case wall time increases, but CPU time does not),
1404 or whether the system is heavily loaded with other processes competing
1405 for CPU time. On balance, wall clock time seems to make sense.
1406 Plus it has the added benefit that gettimeofday should be faster
1407 than calling getrusage. */
1408 int
1409 thread_should_yield (struct thread *thread)
1410 {
1411 quagga_get_relative (NULL);
1412 return (timeval_elapsed(relative_time, thread->real) >
1413 thread->yield);
1414 }
1415
1416 void
1417 thread_set_yield_time (struct thread *thread, unsigned long yield_time)
1418 {
1419 thread->yield = yield_time;
1420 }
1421
1422 void
1423 thread_getrusage (RUSAGE_T *r)
1424 {
1425 quagga_get_relative (NULL);
1426 getrusage(RUSAGE_SELF, &(r->cpu));
1427 r->real = relative_time;
1428
1429 #ifdef HAVE_CLOCK_MONOTONIC
1430 /* quagga_get_relative() only updates recent_time if gettimeofday
1431 * based, not when using CLOCK_MONOTONIC. As we export recent_time
1432 * and guarantee to update it before threads are run...
1433 */
1434 quagga_gettimeofday(&recent_time);
1435 #endif /* HAVE_CLOCK_MONOTONIC */
1436 }
1437
1438 struct thread *thread_current = NULL;
1439
1440 /* We check thread consumed time. If the system has getrusage, we'll
1441 use that to get in-depth stats on the performance of the thread in addition
1442 to wall clock time stats from gettimeofday. */
1443 void
1444 thread_call (struct thread *thread)
1445 {
1446 unsigned long realtime, cputime;
1447 RUSAGE_T before, after;
1448
1449 GETRUSAGE (&before);
1450 thread->real = before.real;
1451
1452 thread_current = thread;
1453 (*thread->func) (thread);
1454 thread_current = NULL;
1455
1456 GETRUSAGE (&after);
1457
1458 realtime = thread_consumed_time (&after, &before, &cputime);
1459 thread->hist->real.total += realtime;
1460 if (thread->hist->real.max < realtime)
1461 thread->hist->real.max = realtime;
1462 thread->hist->cpu.total += cputime;
1463 if (thread->hist->cpu.max < cputime)
1464 thread->hist->cpu.max = cputime;
1465
1466 ++(thread->hist->total_calls);
1467 thread->hist->types |= (1 << thread->add_type);
1468
1469 #ifdef CONSUMED_TIME_CHECK
1470 if (realtime > CONSUMED_TIME_CHECK)
1471 {
1472 /*
1473 * We have a CPU Hog on our hands.
1474 * Whinge about it now, so we're aware this is yet another task
1475 * to fix.
1476 */
1477 zlog_warn ("SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1478 thread->funcname,
1479 (unsigned long) thread->func,
1480 realtime/1000, cputime/1000);
1481 }
1482 #endif /* CONSUMED_TIME_CHECK */
1483 }
1484
1485 /* Execute thread */
1486 struct thread *
1487 funcname_thread_execute (struct thread_master *m,
1488 int (*func)(struct thread *),
1489 void *arg,
1490 int val,
1491 debugargdef)
1492 {
1493 struct cpu_thread_history tmp;
1494 struct thread dummy;
1495
1496 memset (&dummy, 0, sizeof (struct thread));
1497
1498 dummy.type = THREAD_EVENT;
1499 dummy.add_type = THREAD_EXECUTE;
1500 dummy.master = NULL;
1501 dummy.arg = arg;
1502 dummy.u.val = val;
1503
1504 tmp.func = dummy.func = func;
1505 tmp.funcname = dummy.funcname = funcname;
1506 dummy.hist = hash_get (cpu_record, &tmp,
1507 (void * (*) (void *))cpu_record_hash_alloc);
1508
1509 dummy.schedfrom = schedfrom;
1510 dummy.schedfrom_line = fromln;
1511
1512 thread_call (&dummy);
1513
1514 return NULL;
1515 }