]> git.proxmox.com Git - mirror_frr.git/blob - lib/thread.c
lib: cope with negative timeout in thread.c
[mirror_frr.git] / lib / thread.c
1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with GNU Zebra; see the file COPYING. If not, write to the Free
18 * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
19 * 02111-1307, USA.
20 */
21
22 /* #define DEBUG */
23
24 #include <zebra.h>
25 #include <sys/resource.h>
26
27 #include "thread.h"
28 #include "memory.h"
29 #include "log.h"
30 #include "hash.h"
31 #include "pqueue.h"
32 #include "command.h"
33 #include "sigevent.h"
34
35 DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
36 DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
37 DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
38
39 #if defined(__APPLE__)
40 #include <mach/mach.h>
41 #include <mach/mach_time.h>
42 #endif
43
44 /* Relative time, since startup */
45 static struct hash *cpu_record = NULL;
46
47 static unsigned long
48 timeval_elapsed (struct timeval a, struct timeval b)
49 {
50 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
51 + (a.tv_usec - b.tv_usec));
52 }
53
54 static unsigned int
55 cpu_record_hash_key (struct cpu_thread_history *a)
56 {
57 return (uintptr_t) a->func;
58 }
59
60 static int
61 cpu_record_hash_cmp (const struct cpu_thread_history *a,
62 const struct cpu_thread_history *b)
63 {
64 return a->func == b->func;
65 }
66
67 static void *
68 cpu_record_hash_alloc (struct cpu_thread_history *a)
69 {
70 struct cpu_thread_history *new;
71 new = XCALLOC (MTYPE_THREAD_STATS, sizeof (struct cpu_thread_history));
72 new->func = a->func;
73 new->funcname = a->funcname;
74 return new;
75 }
76
77 static void
78 cpu_record_hash_free (void *a)
79 {
80 struct cpu_thread_history *hist = a;
81
82 XFREE (MTYPE_THREAD_STATS, hist);
83 }
84
85 static void
86 vty_out_cpu_thread_history(struct vty* vty,
87 struct cpu_thread_history *a)
88 {
89 vty_out(vty, "%5d %10ld.%03ld %9d %8ld %9ld %8ld %9ld",
90 a->total_active, a->cpu.total/1000, a->cpu.total%1000, a->total_calls,
91 a->cpu.total/a->total_calls, a->cpu.max,
92 a->real.total/a->total_calls, a->real.max);
93 vty_out(vty, " %c%c%c%c%c%c %s%s",
94 a->types & (1 << THREAD_READ) ? 'R':' ',
95 a->types & (1 << THREAD_WRITE) ? 'W':' ',
96 a->types & (1 << THREAD_TIMER) ? 'T':' ',
97 a->types & (1 << THREAD_EVENT) ? 'E':' ',
98 a->types & (1 << THREAD_EXECUTE) ? 'X':' ',
99 a->types & (1 << THREAD_BACKGROUND) ? 'B' : ' ',
100 a->funcname, VTY_NEWLINE);
101 }
102
103 static void
104 cpu_record_hash_print(struct hash_backet *bucket,
105 void *args[])
106 {
107 struct cpu_thread_history *totals = args[0];
108 struct vty *vty = args[1];
109 thread_type *filter = args[2];
110 struct cpu_thread_history *a = bucket->data;
111
112 if ( !(a->types & *filter) )
113 return;
114 vty_out_cpu_thread_history(vty,a);
115 totals->total_active += a->total_active;
116 totals->total_calls += a->total_calls;
117 totals->real.total += a->real.total;
118 if (totals->real.max < a->real.max)
119 totals->real.max = a->real.max;
120 totals->cpu.total += a->cpu.total;
121 if (totals->cpu.max < a->cpu.max)
122 totals->cpu.max = a->cpu.max;
123 }
124
125 static void
126 cpu_record_print(struct vty *vty, thread_type filter)
127 {
128 struct cpu_thread_history tmp;
129 void *args[3] = {&tmp, vty, &filter};
130
131 memset(&tmp, 0, sizeof tmp);
132 tmp.funcname = "TOTAL";
133 tmp.types = filter;
134
135 vty_out(vty, "%21s %18s %18s%s",
136 "", "CPU (user+system):", "Real (wall-clock):", VTY_NEWLINE);
137 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
138 vty_out(vty, " Avg uSec Max uSecs");
139 vty_out(vty, " Type Thread%s", VTY_NEWLINE);
140 hash_iterate(cpu_record,
141 (void(*)(struct hash_backet*,void*))cpu_record_hash_print,
142 args);
143
144 if (tmp.total_calls > 0)
145 vty_out_cpu_thread_history(vty, &tmp);
146 }
147
148 DEFUN (show_thread_cpu,
149 show_thread_cpu_cmd,
150 "show thread cpu [FILTER]",
151 SHOW_STR
152 "Thread information\n"
153 "Thread CPU usage\n"
154 "Display filter (rwtexb)\n")
155 {
156 int idx_filter = 3;
157 int i = 0;
158 thread_type filter = (thread_type) -1U;
159
160 if (argc > 3)
161 {
162 filter = 0;
163 while (argv[idx_filter]->arg[i] != '\0')
164 {
165 switch ( argv[idx_filter]->arg[i] )
166 {
167 case 'r':
168 case 'R':
169 filter |= (1 << THREAD_READ);
170 break;
171 case 'w':
172 case 'W':
173 filter |= (1 << THREAD_WRITE);
174 break;
175 case 't':
176 case 'T':
177 filter |= (1 << THREAD_TIMER);
178 break;
179 case 'e':
180 case 'E':
181 filter |= (1 << THREAD_EVENT);
182 break;
183 case 'x':
184 case 'X':
185 filter |= (1 << THREAD_EXECUTE);
186 break;
187 case 'b':
188 case 'B':
189 filter |= (1 << THREAD_BACKGROUND);
190 break;
191 default:
192 break;
193 }
194 ++i;
195 }
196 if (filter == 0)
197 {
198 vty_out(vty, "Invalid filter \"%s\" specified,"
199 " must contain at least one of 'RWTEXB'%s",
200 argv[idx_filter]->arg, VTY_NEWLINE);
201 return CMD_WARNING;
202 }
203 }
204
205 cpu_record_print(vty, filter);
206 return CMD_SUCCESS;
207 }
208
209 static void
210 cpu_record_hash_clear (struct hash_backet *bucket,
211 void *args)
212 {
213 thread_type *filter = args;
214 struct cpu_thread_history *a = bucket->data;
215
216 if ( !(a->types & *filter) )
217 return;
218
219 hash_release (cpu_record, bucket->data);
220 }
221
222 static void
223 cpu_record_clear (thread_type filter)
224 {
225 thread_type *tmp = &filter;
226 hash_iterate (cpu_record,
227 (void (*) (struct hash_backet*,void*)) cpu_record_hash_clear,
228 tmp);
229 }
230
231 DEFUN (clear_thread_cpu,
232 clear_thread_cpu_cmd,
233 "clear thread cpu [FILTER]",
234 "Clear stored data\n"
235 "Thread information\n"
236 "Thread CPU usage\n"
237 "Display filter (rwtexb)\n")
238 {
239 int idx_filter = 3;
240 int i = 0;
241 thread_type filter = (thread_type) -1U;
242
243 if (argc > 3)
244 {
245 filter = 0;
246 while (argv[idx_filter]->arg[i] != '\0')
247 {
248 switch ( argv[idx_filter]->arg[i] )
249 {
250 case 'r':
251 case 'R':
252 filter |= (1 << THREAD_READ);
253 break;
254 case 'w':
255 case 'W':
256 filter |= (1 << THREAD_WRITE);
257 break;
258 case 't':
259 case 'T':
260 filter |= (1 << THREAD_TIMER);
261 break;
262 case 'e':
263 case 'E':
264 filter |= (1 << THREAD_EVENT);
265 break;
266 case 'x':
267 case 'X':
268 filter |= (1 << THREAD_EXECUTE);
269 break;
270 case 'b':
271 case 'B':
272 filter |= (1 << THREAD_BACKGROUND);
273 break;
274 default:
275 break;
276 }
277 ++i;
278 }
279 if (filter == 0)
280 {
281 vty_out(vty, "Invalid filter \"%s\" specified,"
282 " must contain at least one of 'RWTEXB'%s",
283 argv[idx_filter]->arg, VTY_NEWLINE);
284 return CMD_WARNING;
285 }
286 }
287
288 cpu_record_clear (filter);
289 return CMD_SUCCESS;
290 }
291
292 void
293 thread_cmd_init (void)
294 {
295 install_element (VIEW_NODE, &show_thread_cpu_cmd);
296 install_element (ENABLE_NODE, &clear_thread_cpu_cmd);
297 }
298
299 static int
300 thread_timer_cmp(void *a, void *b)
301 {
302 struct thread *thread_a = a;
303 struct thread *thread_b = b;
304
305 if (timercmp (&thread_a->u.sands, &thread_b->u.sands, <))
306 return -1;
307 if (timercmp (&thread_a->u.sands, &thread_b->u.sands, >))
308 return 1;
309 return 0;
310 }
311
312 static void
313 thread_timer_update(void *node, int actual_position)
314 {
315 struct thread *thread = node;
316
317 thread->index = actual_position;
318 }
319
320 /* Allocate new thread master. */
321 struct thread_master *
322 thread_master_create (void)
323 {
324 struct thread_master *rv;
325 struct rlimit limit;
326
327 getrlimit(RLIMIT_NOFILE, &limit);
328
329 if (cpu_record == NULL)
330 cpu_record
331 = hash_create ((unsigned int (*) (void *))cpu_record_hash_key,
332 (int (*) (const void *, const void *))cpu_record_hash_cmp);
333
334 rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master));
335 if (rv == NULL)
336 {
337 return NULL;
338 }
339
340 rv->fd_limit = (int)limit.rlim_cur;
341 rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
342 if (rv->read == NULL)
343 {
344 XFREE (MTYPE_THREAD_MASTER, rv);
345 return NULL;
346 }
347
348 rv->write = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
349 if (rv->write == NULL)
350 {
351 XFREE (MTYPE_THREAD, rv->read);
352 XFREE (MTYPE_THREAD_MASTER, rv);
353 return NULL;
354 }
355
356 /* Initialize the timer queues */
357 rv->timer = pqueue_create();
358 rv->background = pqueue_create();
359 rv->timer->cmp = rv->background->cmp = thread_timer_cmp;
360 rv->timer->update = rv->background->update = thread_timer_update;
361
362 #if defined(HAVE_POLL)
363 rv->handler.pfdsize = rv->fd_limit;
364 rv->handler.pfdcount = 0;
365 rv->handler.pfds = (struct pollfd *) malloc (sizeof (struct pollfd) * rv->handler.pfdsize);
366 memset (rv->handler.pfds, 0, sizeof (struct pollfd) * rv->handler.pfdsize);
367 #endif
368 return rv;
369 }
370
371 /* Add a new thread to the list. */
372 static void
373 thread_list_add (struct thread_list *list, struct thread *thread)
374 {
375 thread->next = NULL;
376 thread->prev = list->tail;
377 if (list->tail)
378 list->tail->next = thread;
379 else
380 list->head = thread;
381 list->tail = thread;
382 list->count++;
383 }
384
385 /* Delete a thread from the list. */
386 static struct thread *
387 thread_list_delete (struct thread_list *list, struct thread *thread)
388 {
389 if (thread->next)
390 thread->next->prev = thread->prev;
391 else
392 list->tail = thread->prev;
393 if (thread->prev)
394 thread->prev->next = thread->next;
395 else
396 list->head = thread->next;
397 thread->next = thread->prev = NULL;
398 list->count--;
399 return thread;
400 }
401
402 static void
403 thread_delete_fd (struct thread **thread_array, struct thread *thread)
404 {
405 thread_array[thread->u.fd] = NULL;
406 }
407
408 static void
409 thread_add_fd (struct thread **thread_array, struct thread *thread)
410 {
411 thread_array[thread->u.fd] = thread;
412 }
413
414 /* Thread list is empty or not. */
415 static int
416 thread_empty (struct thread_list *list)
417 {
418 return list->head ? 0 : 1;
419 }
420
421 /* Delete top of the list and return it. */
422 static struct thread *
423 thread_trim_head (struct thread_list *list)
424 {
425 if (!thread_empty (list))
426 return thread_list_delete (list, list->head);
427 return NULL;
428 }
429
430 /* Move thread to unuse list. */
431 static void
432 thread_add_unuse (struct thread_master *m, struct thread *thread)
433 {
434 assert (m != NULL && thread != NULL);
435 assert (thread->next == NULL);
436 assert (thread->prev == NULL);
437
438 thread->type = THREAD_UNUSED;
439 thread->hist->total_active--;
440 thread_list_add (&m->unuse, thread);
441 }
442
443 /* Free all unused thread. */
444 static void
445 thread_list_free (struct thread_master *m, struct thread_list *list)
446 {
447 struct thread *t;
448 struct thread *next;
449
450 for (t = list->head; t; t = next)
451 {
452 next = t->next;
453 XFREE (MTYPE_THREAD, t);
454 list->count--;
455 m->alloc--;
456 }
457 }
458
459 static void
460 thread_array_free (struct thread_master *m, struct thread **thread_array)
461 {
462 struct thread *t;
463 int index;
464
465 for (index = 0; index < m->fd_limit; ++index)
466 {
467 t = thread_array[index];
468 if (t)
469 {
470 thread_array[index] = NULL;
471 XFREE (MTYPE_THREAD, t);
472 m->alloc--;
473 }
474 }
475 XFREE (MTYPE_THREAD, thread_array);
476 }
477
478 static void
479 thread_queue_free (struct thread_master *m, struct pqueue *queue)
480 {
481 int i;
482
483 for (i = 0; i < queue->size; i++)
484 XFREE(MTYPE_THREAD, queue->array[i]);
485
486 m->alloc -= queue->size;
487 pqueue_delete(queue);
488 }
489
490 /*
491 * thread_master_free_unused
492 *
493 * As threads are finished with they are put on the
494 * unuse list for later reuse.
495 * If we are shutting down, Free up unused threads
496 * So we can see if we forget to shut anything off
497 */
498 void
499 thread_master_free_unused (struct thread_master *m)
500 {
501 struct thread *t;
502 while ((t = thread_trim_head(&m->unuse)) != NULL)
503 {
504 XFREE(MTYPE_THREAD, t);
505 }
506 }
507
508 /* Stop thread scheduler. */
509 void
510 thread_master_free (struct thread_master *m)
511 {
512 thread_array_free (m, m->read);
513 thread_array_free (m, m->write);
514 thread_queue_free (m, m->timer);
515 thread_list_free (m, &m->event);
516 thread_list_free (m, &m->ready);
517 thread_list_free (m, &m->unuse);
518 thread_queue_free (m, m->background);
519
520 #if defined(HAVE_POLL)
521 XFREE (MTYPE_THREAD_MASTER, m->handler.pfds);
522 #endif
523 XFREE (MTYPE_THREAD_MASTER, m);
524
525 if (cpu_record)
526 {
527 hash_clean (cpu_record, cpu_record_hash_free);
528 hash_free (cpu_record);
529 cpu_record = NULL;
530 }
531 }
532
533 /* Return remain time in second. */
534 unsigned long
535 thread_timer_remain_second (struct thread *thread)
536 {
537 int64_t remain = monotime_until(&thread->u.sands, NULL) / 1000000LL;
538 return remain < 0 ? 0 : remain;
539 }
540
541 #define debugargdef const char *funcname, const char *schedfrom, int fromln
542 #define debugargpass funcname, schedfrom, fromln
543
544 struct timeval
545 thread_timer_remain(struct thread *thread)
546 {
547 struct timeval remain;
548 monotime_until(&thread->u.sands, &remain);
549 return remain;
550 }
551
552 /* Get new thread. */
553 static struct thread *
554 thread_get (struct thread_master *m, u_char type,
555 int (*func) (struct thread *), void *arg, debugargdef)
556 {
557 struct thread *thread = thread_trim_head (&m->unuse);
558 struct cpu_thread_history tmp;
559
560 if (! thread)
561 {
562 thread = XCALLOC (MTYPE_THREAD, sizeof (struct thread));
563 m->alloc++;
564 }
565 thread->type = type;
566 thread->add_type = type;
567 thread->master = m;
568 thread->arg = arg;
569 thread->index = -1;
570 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
571
572 /*
573 * So if the passed in funcname is not what we have
574 * stored that means the thread->hist needs to be
575 * updated. We keep the last one around in unused
576 * under the assumption that we are probably
577 * going to immediately allocate the same
578 * type of thread.
579 * This hopefully saves us some serious
580 * hash_get lookups.
581 */
582 if (thread->funcname != funcname ||
583 thread->func != func)
584 {
585 tmp.func = func;
586 tmp.funcname = funcname;
587 thread->hist = hash_get (cpu_record, &tmp,
588 (void * (*) (void *))cpu_record_hash_alloc);
589 }
590 thread->hist->total_active++;
591 thread->func = func;
592 thread->funcname = funcname;
593 thread->schedfrom = schedfrom;
594 thread->schedfrom_line = fromln;
595
596 return thread;
597 }
598
599 #if defined (HAVE_POLL)
600
601 #define fd_copy_fd_set(X) (X)
602
603 /* generic add thread function */
604 static struct thread *
605 generic_thread_add(struct thread_master *m, int (*func) (struct thread *),
606 void *arg, int fd, int dir, debugargdef)
607 {
608 struct thread *thread;
609
610 u_char type;
611 short int event;
612
613 if (dir == THREAD_READ)
614 {
615 event = (POLLIN | POLLHUP);
616 type = THREAD_READ;
617 }
618 else
619 {
620 event = (POLLOUT | POLLHUP);
621 type = THREAD_WRITE;
622 }
623
624 nfds_t queuepos = m->handler.pfdcount;
625 nfds_t i=0;
626 for (i=0; i<m->handler.pfdcount; i++)
627 if (m->handler.pfds[i].fd == fd)
628 {
629 queuepos = i;
630 break;
631 }
632
633 /* is there enough space for a new fd? */
634 assert (queuepos < m->handler.pfdsize);
635
636 thread = thread_get (m, type, func, arg, debugargpass);
637 m->handler.pfds[queuepos].fd = fd;
638 m->handler.pfds[queuepos].events |= event;
639 if (queuepos == m->handler.pfdcount)
640 m->handler.pfdcount++;
641
642 return thread;
643 }
644 #else
645
646 #define fd_copy_fd_set(X) (X)
647 #endif
648
649 static int
650 fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *timer_wait)
651 {
652 int num;
653 #if defined(HAVE_POLL)
654 /* recalc timeout for poll. Attention NULL pointer is no timeout with
655 select, where with poll no timeount is -1 */
656 int timeout = -1;
657 if (timer_wait != NULL)
658 timeout = (timer_wait->tv_sec*1000) + (timer_wait->tv_usec/1000);
659
660 num = poll (m->handler.pfds, m->handler.pfdcount + m->handler.pfdcountsnmp, timeout);
661 #else
662 num = select (size, read, write, except, timer_wait);
663 #endif
664
665 return num;
666 }
667
668 static int
669 fd_is_set (struct thread *thread, thread_fd_set *fdset, int pos)
670 {
671 #if defined(HAVE_POLL)
672 return 1;
673 #else
674 return FD_ISSET (THREAD_FD (thread), fdset);
675 #endif
676 }
677
678 static int
679 fd_clear_read_write (struct thread *thread)
680 {
681 #if !defined(HAVE_POLL)
682 thread_fd_set *fdset = NULL;
683 int fd = THREAD_FD (thread);
684
685 if (thread->type == THREAD_READ)
686 fdset = &thread->master->handler.readfd;
687 else
688 fdset = &thread->master->handler.writefd;
689
690 if (!FD_ISSET (fd, fdset))
691 return 0;
692
693 FD_CLR (fd, fdset);
694 #endif
695 return 1;
696 }
697
698 /* Add new read thread. */
699 struct thread *
700 funcname_thread_add_read_write (int dir, struct thread_master *m,
701 int (*func) (struct thread *), void *arg, int fd,
702 debugargdef)
703 {
704 struct thread *thread = NULL;
705
706 #if !defined(HAVE_POLL)
707 thread_fd_set *fdset = NULL;
708 if (dir == THREAD_READ)
709 fdset = &m->handler.readfd;
710 else
711 fdset = &m->handler.writefd;
712 #endif
713
714 #if defined (HAVE_POLL)
715 thread = generic_thread_add(m, func, arg, fd, dir, debugargpass);
716
717 if (thread == NULL)
718 return NULL;
719 #else
720 if (FD_ISSET (fd, fdset))
721 {
722 zlog (NULL, LOG_WARNING, "There is already %s fd [%d]", (dir = THREAD_READ) ? "read" : "write", fd);
723 return NULL;
724 }
725
726 FD_SET (fd, fdset);
727 thread = thread_get (m, dir, func, arg, debugargpass);
728 #endif
729
730 thread->u.fd = fd;
731 if (dir == THREAD_READ)
732 thread_add_fd (m->read, thread);
733 else
734 thread_add_fd (m->write, thread);
735
736 return thread;
737 }
738
739 static struct thread *
740 funcname_thread_add_timer_timeval (struct thread_master *m,
741 int (*func) (struct thread *),
742 int type,
743 void *arg,
744 struct timeval *time_relative,
745 debugargdef)
746 {
747 struct thread *thread;
748 struct pqueue *queue;
749
750 assert (m != NULL);
751
752 assert (type == THREAD_TIMER || type == THREAD_BACKGROUND);
753 assert (time_relative);
754
755 queue = ((type == THREAD_TIMER) ? m->timer : m->background);
756 thread = thread_get (m, type, func, arg, debugargpass);
757
758 monotime(&thread->u.sands);
759 timeradd(&thread->u.sands, time_relative, &thread->u.sands);
760
761 pqueue_enqueue(thread, queue);
762 return thread;
763 }
764
765
766 /* Add timer event thread. */
767 struct thread *
768 funcname_thread_add_timer (struct thread_master *m,
769 int (*func) (struct thread *),
770 void *arg, long timer,
771 debugargdef)
772 {
773 struct timeval trel;
774
775 assert (m != NULL);
776
777 trel.tv_sec = timer;
778 trel.tv_usec = 0;
779
780 return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER, arg,
781 &trel, debugargpass);
782 }
783
784 /* Add timer event thread with "millisecond" resolution */
785 struct thread *
786 funcname_thread_add_timer_msec (struct thread_master *m,
787 int (*func) (struct thread *),
788 void *arg, long timer,
789 debugargdef)
790 {
791 struct timeval trel;
792
793 assert (m != NULL);
794
795 trel.tv_sec = timer / 1000;
796 trel.tv_usec = 1000*(timer % 1000);
797
798 return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER,
799 arg, &trel, debugargpass);
800 }
801
802 /* Add timer event thread with "millisecond" resolution */
803 struct thread *
804 funcname_thread_add_timer_tv (struct thread_master *m,
805 int (*func) (struct thread *),
806 void *arg, struct timeval *tv,
807 debugargdef)
808 {
809 return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER,
810 arg, tv, debugargpass);
811 }
812
813 /* Add a background thread, with an optional millisec delay */
814 struct thread *
815 funcname_thread_add_background (struct thread_master *m,
816 int (*func) (struct thread *),
817 void *arg, long delay,
818 debugargdef)
819 {
820 struct timeval trel;
821
822 assert (m != NULL);
823
824 if (delay)
825 {
826 trel.tv_sec = delay / 1000;
827 trel.tv_usec = 1000*(delay % 1000);
828 }
829 else
830 {
831 trel.tv_sec = 0;
832 trel.tv_usec = 0;
833 }
834
835 return funcname_thread_add_timer_timeval (m, func, THREAD_BACKGROUND,
836 arg, &trel, debugargpass);
837 }
838
839 /* Add simple event thread. */
840 struct thread *
841 funcname_thread_add_event (struct thread_master *m,
842 int (*func) (struct thread *), void *arg, int val,
843 debugargdef)
844 {
845 struct thread *thread;
846
847 assert (m != NULL);
848
849 thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass);
850 thread->u.val = val;
851 thread_list_add (&m->event, thread);
852
853 return thread;
854 }
855
856 static void
857 thread_cancel_read_or_write (struct thread *thread, short int state)
858 {
859 #if defined(HAVE_POLL)
860 nfds_t i;
861
862 for (i=0;i<thread->master->handler.pfdcount;++i)
863 if (thread->master->handler.pfds[i].fd == thread->u.fd)
864 {
865 thread->master->handler.pfds[i].events &= ~(state);
866
867 /* remove thread fds from pfd list */
868 if (thread->master->handler.pfds[i].events == 0)
869 {
870 memmove(thread->master->handler.pfds+i,
871 thread->master->handler.pfds+i+1,
872 (thread->master->handler.pfdsize-i-1) * sizeof(struct pollfd));
873 thread->master->handler.pfdcount--;
874 return;
875 }
876 }
877 #endif
878
879 fd_clear_read_write (thread);
880 }
881
882 /* Cancel thread from scheduler. */
883 void
884 thread_cancel (struct thread *thread)
885 {
886 struct thread_list *list = NULL;
887 struct pqueue *queue = NULL;
888 struct thread **thread_array = NULL;
889
890 switch (thread->type)
891 {
892 case THREAD_READ:
893 #if defined (HAVE_POLL)
894 thread_cancel_read_or_write (thread, POLLIN | POLLHUP);
895 #else
896 thread_cancel_read_or_write (thread, 0);
897 #endif
898 thread_array = thread->master->read;
899 break;
900 case THREAD_WRITE:
901 #if defined (HAVE_POLL)
902 thread_cancel_read_or_write (thread, POLLOUT | POLLHUP);
903 #else
904 thread_cancel_read_or_write (thread, 0);
905 #endif
906 thread_array = thread->master->write;
907 break;
908 case THREAD_TIMER:
909 queue = thread->master->timer;
910 break;
911 case THREAD_EVENT:
912 list = &thread->master->event;
913 break;
914 case THREAD_READY:
915 list = &thread->master->ready;
916 break;
917 case THREAD_BACKGROUND:
918 queue = thread->master->background;
919 break;
920 default:
921 return;
922 break;
923 }
924
925 if (queue)
926 {
927 assert(thread->index >= 0);
928 assert(thread == queue->array[thread->index]);
929 pqueue_remove_at(thread->index, queue);
930 }
931 else if (list)
932 {
933 thread_list_delete (list, thread);
934 }
935 else if (thread_array)
936 {
937 thread_delete_fd (thread_array, thread);
938 }
939 else
940 {
941 assert(!"Thread should be either in queue or list or array!");
942 }
943
944 thread_add_unuse (thread->master, thread);
945 }
946
947 /* Delete all events which has argument value arg. */
948 unsigned int
949 thread_cancel_event (struct thread_master *m, void *arg)
950 {
951 unsigned int ret = 0;
952 struct thread *thread;
953
954 thread = m->event.head;
955 while (thread)
956 {
957 struct thread *t;
958
959 t = thread;
960 thread = t->next;
961
962 if (t->arg == arg)
963 {
964 ret++;
965 thread_list_delete (&m->event, t);
966 thread_add_unuse (m, t);
967 }
968 }
969
970 /* thread can be on the ready list too */
971 thread = m->ready.head;
972 while (thread)
973 {
974 struct thread *t;
975
976 t = thread;
977 thread = t->next;
978
979 if (t->arg == arg)
980 {
981 ret++;
982 thread_list_delete (&m->ready, t);
983 thread_add_unuse (m, t);
984 }
985 }
986 return ret;
987 }
988
989 static struct timeval *
990 thread_timer_wait (struct pqueue *queue, struct timeval *timer_val)
991 {
992 if (queue->size)
993 {
994 struct thread *next_timer = queue->array[0];
995 monotime_until(&next_timer->u.sands, timer_val);
996 return timer_val;
997 }
998 return NULL;
999 }
1000
1001 static struct thread *
1002 thread_run (struct thread_master *m, struct thread *thread,
1003 struct thread *fetch)
1004 {
1005 *fetch = *thread;
1006 thread_add_unuse (m, thread);
1007 return fetch;
1008 }
1009
1010 static int
1011 thread_process_fds_helper (struct thread_master *m, struct thread *thread, thread_fd_set *fdset, short int state, int pos)
1012 {
1013 struct thread **thread_array;
1014
1015 if (!thread)
1016 return 0;
1017
1018 if (thread->type == THREAD_READ)
1019 thread_array = m->read;
1020 else
1021 thread_array = m->write;
1022
1023 if (fd_is_set (thread, fdset, pos))
1024 {
1025 fd_clear_read_write (thread);
1026 thread_delete_fd (thread_array, thread);
1027 thread_list_add (&m->ready, thread);
1028 thread->type = THREAD_READY;
1029 #if defined(HAVE_POLL)
1030 thread->master->handler.pfds[pos].events &= ~(state);
1031 #endif
1032 return 1;
1033 }
1034 return 0;
1035 }
1036
1037 #if defined(HAVE_POLL)
1038
1039 #if defined(HAVE_SNMP)
1040 /* add snmp fds to poll set */
1041 static void
1042 add_snmp_pollfds(struct thread_master *m, fd_set *snmpfds, int fdsetsize)
1043 {
1044 int i;
1045 m->handler.pfdcountsnmp = m->handler.pfdcount;
1046 /* cycle trough fds and add neccessary fds to poll set */
1047 for (i=0;i<fdsetsize;++i)
1048 {
1049 if (FD_ISSET(i, snmpfds))
1050 {
1051 assert (m->handler.pfdcountsnmp <= m->handler.pfdsize);
1052
1053 m->handler.pfds[m->handler.pfdcountsnmp].fd = i;
1054 m->handler.pfds[m->handler.pfdcountsnmp].events = POLLIN;
1055 m->handler.pfdcountsnmp++;
1056 }
1057 }
1058 }
1059 #endif
1060
1061 /* check poll events */
1062 static void
1063 check_pollfds(struct thread_master *m, fd_set *readfd, int num)
1064 {
1065 nfds_t i = 0;
1066 int ready = 0;
1067 for (i = 0; i < m->handler.pfdcount && ready < num ; ++i)
1068 {
1069 /* no event for current fd? immideatly continue */
1070 if(m->handler.pfds[i].revents == 0)
1071 continue;
1072
1073 ready++;
1074
1075 /* POLLIN / POLLOUT process event */
1076 if (m->handler.pfds[i].revents & POLLIN)
1077 thread_process_fds_helper(m, m->read[m->handler.pfds[i].fd], NULL, POLLIN, i);
1078 if (m->handler.pfds[i].revents & POLLOUT)
1079 thread_process_fds_helper(m, m->write[m->handler.pfds[i].fd], NULL, POLLOUT, i);
1080
1081 /* remove fd from list on POLLNVAL */
1082 if (m->handler.pfds[i].revents & POLLNVAL ||
1083 m->handler.pfds[i].revents & POLLHUP)
1084 {
1085 memmove(m->handler.pfds+i,
1086 m->handler.pfds+i+1,
1087 (m->handler.pfdsize-i-1) * sizeof(struct pollfd));
1088 m->handler.pfdcount--;
1089 i--;
1090 }
1091 else
1092 m->handler.pfds[i].revents = 0;
1093 }
1094 }
1095 #endif
1096
1097 static void
1098 thread_process_fds (struct thread_master *m, thread_fd_set *rset, thread_fd_set *wset, int num)
1099 {
1100 #if defined (HAVE_POLL)
1101 check_pollfds (m, rset, num);
1102 #else
1103 int ready = 0, index;
1104
1105 for (index = 0; index < m->fd_limit && ready < num; ++index)
1106 {
1107 ready += thread_process_fds_helper (m, m->read[index], rset, 0, 0);
1108 ready += thread_process_fds_helper (m, m->write[index], wset, 0, 0);
1109 }
1110 #endif
1111 }
1112
1113 /* Add all timers that have popped to the ready list. */
1114 static unsigned int
1115 thread_timer_process (struct pqueue *queue, struct timeval *timenow)
1116 {
1117 struct thread *thread;
1118 unsigned int ready = 0;
1119
1120 while (queue->size)
1121 {
1122 thread = queue->array[0];
1123 if (timercmp (timenow, &thread->u.sands, <))
1124 return ready;
1125 pqueue_dequeue(queue);
1126 thread->type = THREAD_READY;
1127 thread_list_add (&thread->master->ready, thread);
1128 ready++;
1129 }
1130 return ready;
1131 }
1132
1133 /* process a list en masse, e.g. for event thread lists */
1134 static unsigned int
1135 thread_process (struct thread_list *list)
1136 {
1137 struct thread *thread;
1138 struct thread *next;
1139 unsigned int ready = 0;
1140
1141 for (thread = list->head; thread; thread = next)
1142 {
1143 next = thread->next;
1144 thread_list_delete (list, thread);
1145 thread->type = THREAD_READY;
1146 thread_list_add (&thread->master->ready, thread);
1147 ready++;
1148 }
1149 return ready;
1150 }
1151
1152
1153 /* Fetch next ready thread. */
1154 struct thread *
1155 thread_fetch (struct thread_master *m, struct thread *fetch)
1156 {
1157 struct thread *thread;
1158 thread_fd_set readfd;
1159 thread_fd_set writefd;
1160 thread_fd_set exceptfd;
1161 struct timeval now;
1162 struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 };
1163 struct timeval timer_val_bg;
1164 struct timeval *timer_wait = &timer_val;
1165 struct timeval *timer_wait_bg;
1166
1167 while (1)
1168 {
1169 int num = 0;
1170
1171 /* Signals pre-empt everything */
1172 quagga_sigevent_process ();
1173
1174 /* Drain the ready queue of already scheduled jobs, before scheduling
1175 * more.
1176 */
1177 if ((thread = thread_trim_head (&m->ready)) != NULL)
1178 return thread_run (m, thread, fetch);
1179
1180 /* To be fair to all kinds of threads, and avoid starvation, we
1181 * need to be careful to consider all thread types for scheduling
1182 * in each quanta. I.e. we should not return early from here on.
1183 */
1184
1185 /* Normal event are the next highest priority. */
1186 thread_process (&m->event);
1187
1188 /* Structure copy. */
1189 #if !defined(HAVE_POLL)
1190 readfd = fd_copy_fd_set(m->handler.readfd);
1191 writefd = fd_copy_fd_set(m->handler.writefd);
1192 exceptfd = fd_copy_fd_set(m->handler.exceptfd);
1193 #endif
1194
1195 /* Calculate select wait timer if nothing else to do */
1196 if (m->ready.count == 0)
1197 {
1198 timer_wait = thread_timer_wait (m->timer, &timer_val);
1199 timer_wait_bg = thread_timer_wait (m->background, &timer_val_bg);
1200
1201 if (timer_wait_bg &&
1202 (!timer_wait || (timercmp (timer_wait, timer_wait_bg, >))))
1203 timer_wait = timer_wait_bg;
1204 }
1205
1206 if (timer_wait && timer_wait->tv_sec < 0)
1207 {
1208 timerclear(&timer_val);
1209 timer_wait = &timer_val;
1210 }
1211
1212 num = fd_select (m, FD_SETSIZE, &readfd, &writefd, &exceptfd, timer_wait);
1213
1214 /* Signals should get quick treatment */
1215 if (num < 0)
1216 {
1217 if (errno == EINTR)
1218 continue; /* signal received - process it */
1219 zlog_warn ("select() error: %s", safe_strerror (errno));
1220 return NULL;
1221 }
1222
1223 /* Check foreground timers. Historically, they have had higher
1224 priority than I/O threads, so let's push them onto the ready
1225 list in front of the I/O threads. */
1226 monotime(&now);
1227 thread_timer_process (m->timer, &now);
1228
1229 /* Got IO, process it */
1230 if (num > 0)
1231 thread_process_fds (m, &readfd, &writefd, num);
1232
1233 #if 0
1234 /* If any threads were made ready above (I/O or foreground timer),
1235 perhaps we should avoid adding background timers to the ready
1236 list at this time. If this is code is uncommented, then background
1237 timer threads will not run unless there is nothing else to do. */
1238 if ((thread = thread_trim_head (&m->ready)) != NULL)
1239 return thread_run (m, thread, fetch);
1240 #endif
1241
1242 /* Background timer/events, lowest priority */
1243 thread_timer_process (m->background, &now);
1244
1245 if ((thread = thread_trim_head (&m->ready)) != NULL)
1246 return thread_run (m, thread, fetch);
1247 }
1248 }
1249
1250 unsigned long
1251 thread_consumed_time (RUSAGE_T *now, RUSAGE_T *start, unsigned long *cputime)
1252 {
1253 /* This is 'user + sys' time. */
1254 *cputime = timeval_elapsed (now->cpu.ru_utime, start->cpu.ru_utime) +
1255 timeval_elapsed (now->cpu.ru_stime, start->cpu.ru_stime);
1256 return timeval_elapsed (now->real, start->real);
1257 }
1258
1259 /* We should aim to yield after yield milliseconds, which defaults
1260 to THREAD_YIELD_TIME_SLOT .
1261 Note: we are using real (wall clock) time for this calculation.
1262 It could be argued that CPU time may make more sense in certain
1263 contexts. The things to consider are whether the thread may have
1264 blocked (in which case wall time increases, but CPU time does not),
1265 or whether the system is heavily loaded with other processes competing
1266 for CPU time. On balance, wall clock time seems to make sense.
1267 Plus it has the added benefit that gettimeofday should be faster
1268 than calling getrusage. */
1269 int
1270 thread_should_yield (struct thread *thread)
1271 {
1272 return monotime_since(&thread->real, NULL) > (int64_t)thread->yield;
1273 }
1274
1275 void
1276 thread_set_yield_time (struct thread *thread, unsigned long yield_time)
1277 {
1278 thread->yield = yield_time;
1279 }
1280
1281 void
1282 thread_getrusage (RUSAGE_T *r)
1283 {
1284 monotime(&r->real);
1285 getrusage(RUSAGE_SELF, &(r->cpu));
1286 }
1287
1288 struct thread *thread_current = NULL;
1289
1290 /* We check thread consumed time. If the system has getrusage, we'll
1291 use that to get in-depth stats on the performance of the thread in addition
1292 to wall clock time stats from gettimeofday. */
1293 void
1294 thread_call (struct thread *thread)
1295 {
1296 unsigned long realtime, cputime;
1297 RUSAGE_T before, after;
1298
1299 GETRUSAGE (&before);
1300 thread->real = before.real;
1301
1302 thread_current = thread;
1303 (*thread->func) (thread);
1304 thread_current = NULL;
1305
1306 GETRUSAGE (&after);
1307
1308 realtime = thread_consumed_time (&after, &before, &cputime);
1309 thread->hist->real.total += realtime;
1310 if (thread->hist->real.max < realtime)
1311 thread->hist->real.max = realtime;
1312 thread->hist->cpu.total += cputime;
1313 if (thread->hist->cpu.max < cputime)
1314 thread->hist->cpu.max = cputime;
1315
1316 ++(thread->hist->total_calls);
1317 thread->hist->types |= (1 << thread->add_type);
1318
1319 #ifdef CONSUMED_TIME_CHECK
1320 if (realtime > CONSUMED_TIME_CHECK)
1321 {
1322 /*
1323 * We have a CPU Hog on our hands.
1324 * Whinge about it now, so we're aware this is yet another task
1325 * to fix.
1326 */
1327 zlog_warn ("SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1328 thread->funcname,
1329 (unsigned long) thread->func,
1330 realtime/1000, cputime/1000);
1331 }
1332 #endif /* CONSUMED_TIME_CHECK */
1333 }
1334
1335 /* Execute thread */
1336 struct thread *
1337 funcname_thread_execute (struct thread_master *m,
1338 int (*func)(struct thread *),
1339 void *arg,
1340 int val,
1341 debugargdef)
1342 {
1343 struct cpu_thread_history tmp;
1344 struct thread dummy;
1345
1346 memset (&dummy, 0, sizeof (struct thread));
1347
1348 dummy.type = THREAD_EVENT;
1349 dummy.add_type = THREAD_EXECUTE;
1350 dummy.master = NULL;
1351 dummy.arg = arg;
1352 dummy.u.val = val;
1353
1354 tmp.func = dummy.func = func;
1355 tmp.funcname = dummy.funcname = funcname;
1356 dummy.hist = hash_get (cpu_record, &tmp,
1357 (void * (*) (void *))cpu_record_hash_alloc);
1358
1359 dummy.schedfrom = schedfrom;
1360 dummy.schedfrom_line = fromln;
1361
1362 thread_call (&dummy);
1363
1364 return NULL;
1365 }