]> git.proxmox.com Git - mirror_frr.git/blob - lib/thread.c
lib: fix access to stack value
[mirror_frr.git] / lib / thread.c
1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with GNU Zebra; see the file COPYING. If not, write to the Free
18 * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
19 * 02111-1307, USA.
20 */
21
22 /* #define DEBUG */
23
24 #include <zebra.h>
25 #include <sys/resource.h>
26
27 #include "thread.h"
28 #include "memory.h"
29 #include "log.h"
30 #include "hash.h"
31 #include "pqueue.h"
32 #include "command.h"
33 #include "sigevent.h"
34
35 DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
36 DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
37 DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
38
39 #if defined(__APPLE__)
40 #include <mach/mach.h>
41 #include <mach/mach_time.h>
42 #endif
43
44 static pthread_mutex_t cpu_record_mtx = PTHREAD_MUTEX_INITIALIZER;
45 static struct hash *cpu_record = NULL;
46
47 static unsigned long
48 timeval_elapsed (struct timeval a, struct timeval b)
49 {
50 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
51 + (a.tv_usec - b.tv_usec));
52 }
53
54 static unsigned int
55 cpu_record_hash_key (struct cpu_thread_history *a)
56 {
57 return (uintptr_t) a->func;
58 }
59
60 static int
61 cpu_record_hash_cmp (const struct cpu_thread_history *a,
62 const struct cpu_thread_history *b)
63 {
64 return a->func == b->func;
65 }
66
67 static void *
68 cpu_record_hash_alloc (struct cpu_thread_history *a)
69 {
70 struct cpu_thread_history *new;
71 new = XCALLOC (MTYPE_THREAD_STATS, sizeof (struct cpu_thread_history));
72 new->func = a->func;
73 new->funcname = a->funcname;
74 return new;
75 }
76
77 static void
78 cpu_record_hash_free (void *a)
79 {
80 struct cpu_thread_history *hist = a;
81
82 XFREE (MTYPE_THREAD_STATS, hist);
83 }
84
85 static void
86 vty_out_cpu_thread_history(struct vty* vty,
87 struct cpu_thread_history *a)
88 {
89 vty_out(vty, "%5d %10ld.%03ld %9d %8ld %9ld %8ld %9ld",
90 a->total_active, a->cpu.total/1000, a->cpu.total%1000, a->total_calls,
91 a->cpu.total/a->total_calls, a->cpu.max,
92 a->real.total/a->total_calls, a->real.max);
93 vty_out(vty, " %c%c%c%c%c%c %s%s",
94 a->types & (1 << THREAD_READ) ? 'R':' ',
95 a->types & (1 << THREAD_WRITE) ? 'W':' ',
96 a->types & (1 << THREAD_TIMER) ? 'T':' ',
97 a->types & (1 << THREAD_EVENT) ? 'E':' ',
98 a->types & (1 << THREAD_EXECUTE) ? 'X':' ',
99 a->types & (1 << THREAD_BACKGROUND) ? 'B' : ' ',
100 a->funcname, VTY_NEWLINE);
101 }
102
103 static void
104 cpu_record_hash_print(struct hash_backet *bucket,
105 void *args[])
106 {
107 struct cpu_thread_history *totals = args[0];
108 struct vty *vty = args[1];
109 thread_type *filter = args[2];
110 struct cpu_thread_history *a = bucket->data;
111
112 if ( !(a->types & *filter) )
113 return;
114 vty_out_cpu_thread_history(vty,a);
115 totals->total_active += a->total_active;
116 totals->total_calls += a->total_calls;
117 totals->real.total += a->real.total;
118 if (totals->real.max < a->real.max)
119 totals->real.max = a->real.max;
120 totals->cpu.total += a->cpu.total;
121 if (totals->cpu.max < a->cpu.max)
122 totals->cpu.max = a->cpu.max;
123 }
124
125 static void
126 cpu_record_print(struct vty *vty, thread_type filter)
127 {
128 struct cpu_thread_history tmp;
129 void *args[3] = {&tmp, vty, &filter};
130
131 memset(&tmp, 0, sizeof tmp);
132 tmp.funcname = "TOTAL";
133 tmp.types = filter;
134
135 vty_out(vty, "%21s %18s %18s%s",
136 "", "CPU (user+system):", "Real (wall-clock):", VTY_NEWLINE);
137 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
138 vty_out(vty, " Avg uSec Max uSecs");
139 vty_out(vty, " Type Thread%s", VTY_NEWLINE);
140
141 pthread_mutex_lock (&cpu_record_mtx);
142 {
143 hash_iterate(cpu_record,
144 (void(*)(struct hash_backet*,void*))cpu_record_hash_print,
145 args);
146 }
147 pthread_mutex_unlock (&cpu_record_mtx);
148
149 if (tmp.total_calls > 0)
150 vty_out_cpu_thread_history(vty, &tmp);
151 }
152
153 DEFUN (show_thread_cpu,
154 show_thread_cpu_cmd,
155 "show thread cpu [FILTER]",
156 SHOW_STR
157 "Thread information\n"
158 "Thread CPU usage\n"
159 "Display filter (rwtexb)\n")
160 {
161 int idx_filter = 3;
162 int i = 0;
163 thread_type filter = (thread_type) -1U;
164
165 if (argc > 3)
166 {
167 filter = 0;
168 while (argv[idx_filter]->arg[i] != '\0')
169 {
170 switch ( argv[idx_filter]->arg[i] )
171 {
172 case 'r':
173 case 'R':
174 filter |= (1 << THREAD_READ);
175 break;
176 case 'w':
177 case 'W':
178 filter |= (1 << THREAD_WRITE);
179 break;
180 case 't':
181 case 'T':
182 filter |= (1 << THREAD_TIMER);
183 break;
184 case 'e':
185 case 'E':
186 filter |= (1 << THREAD_EVENT);
187 break;
188 case 'x':
189 case 'X':
190 filter |= (1 << THREAD_EXECUTE);
191 break;
192 case 'b':
193 case 'B':
194 filter |= (1 << THREAD_BACKGROUND);
195 break;
196 default:
197 break;
198 }
199 ++i;
200 }
201 if (filter == 0)
202 {
203 vty_out(vty, "Invalid filter \"%s\" specified,"
204 " must contain at least one of 'RWTEXB'%s",
205 argv[idx_filter]->arg, VTY_NEWLINE);
206 return CMD_WARNING;
207 }
208 }
209
210 cpu_record_print(vty, filter);
211 return CMD_SUCCESS;
212 }
213
214 static void
215 cpu_record_hash_clear (struct hash_backet *bucket,
216 void *args)
217 {
218 thread_type *filter = args;
219 struct cpu_thread_history *a = bucket->data;
220
221 if ( !(a->types & *filter) )
222 return;
223
224 pthread_mutex_lock (&cpu_record_mtx);
225 {
226 hash_release (cpu_record, bucket->data);
227 }
228 pthread_mutex_unlock (&cpu_record_mtx);
229 }
230
231 static void
232 cpu_record_clear (thread_type filter)
233 {
234 thread_type *tmp = &filter;
235
236 pthread_mutex_lock (&cpu_record_mtx);
237 {
238 hash_iterate (cpu_record,
239 (void (*) (struct hash_backet*,void*)) cpu_record_hash_clear,
240 tmp);
241 }
242 pthread_mutex_unlock (&cpu_record_mtx);
243 }
244
245 DEFUN (clear_thread_cpu,
246 clear_thread_cpu_cmd,
247 "clear thread cpu [FILTER]",
248 "Clear stored data\n"
249 "Thread information\n"
250 "Thread CPU usage\n"
251 "Display filter (rwtexb)\n")
252 {
253 int idx_filter = 3;
254 int i = 0;
255 thread_type filter = (thread_type) -1U;
256
257 if (argc > 3)
258 {
259 filter = 0;
260 while (argv[idx_filter]->arg[i] != '\0')
261 {
262 switch ( argv[idx_filter]->arg[i] )
263 {
264 case 'r':
265 case 'R':
266 filter |= (1 << THREAD_READ);
267 break;
268 case 'w':
269 case 'W':
270 filter |= (1 << THREAD_WRITE);
271 break;
272 case 't':
273 case 'T':
274 filter |= (1 << THREAD_TIMER);
275 break;
276 case 'e':
277 case 'E':
278 filter |= (1 << THREAD_EVENT);
279 break;
280 case 'x':
281 case 'X':
282 filter |= (1 << THREAD_EXECUTE);
283 break;
284 case 'b':
285 case 'B':
286 filter |= (1 << THREAD_BACKGROUND);
287 break;
288 default:
289 break;
290 }
291 ++i;
292 }
293 if (filter == 0)
294 {
295 vty_out(vty, "Invalid filter \"%s\" specified,"
296 " must contain at least one of 'RWTEXB'%s",
297 argv[idx_filter]->arg, VTY_NEWLINE);
298 return CMD_WARNING;
299 }
300 }
301
302 cpu_record_clear (filter);
303 return CMD_SUCCESS;
304 }
305
306 void
307 thread_cmd_init (void)
308 {
309 install_element (VIEW_NODE, &show_thread_cpu_cmd);
310 install_element (ENABLE_NODE, &clear_thread_cpu_cmd);
311 }
312
313 static int
314 thread_timer_cmp(void *a, void *b)
315 {
316 struct thread *thread_a = a;
317 struct thread *thread_b = b;
318
319 if (timercmp (&thread_a->u.sands, &thread_b->u.sands, <))
320 return -1;
321 if (timercmp (&thread_a->u.sands, &thread_b->u.sands, >))
322 return 1;
323 return 0;
324 }
325
326 static void
327 thread_timer_update(void *node, int actual_position)
328 {
329 struct thread *thread = node;
330
331 thread->index = actual_position;
332 }
333
334 /* Allocate new thread master. */
335 struct thread_master *
336 thread_master_create (void)
337 {
338 struct thread_master *rv;
339 struct rlimit limit;
340
341 getrlimit(RLIMIT_NOFILE, &limit);
342
343 pthread_mutex_lock (&cpu_record_mtx);
344 {
345 if (cpu_record == NULL)
346 cpu_record = hash_create ((unsigned int (*) (void *))cpu_record_hash_key,
347 (int (*) (const void *, const void *))
348 cpu_record_hash_cmp);
349 }
350 pthread_mutex_unlock (&cpu_record_mtx);
351
352 rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master));
353 if (rv == NULL)
354 return NULL;
355
356 pthread_mutex_init (&rv->mtx, NULL);
357
358 rv->fd_limit = (int)limit.rlim_cur;
359 rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
360 if (rv->read == NULL)
361 {
362 XFREE (MTYPE_THREAD_MASTER, rv);
363 return NULL;
364 }
365
366 rv->write = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
367 if (rv->write == NULL)
368 {
369 XFREE (MTYPE_THREAD, rv->read);
370 XFREE (MTYPE_THREAD_MASTER, rv);
371 return NULL;
372 }
373
374 /* Initialize the timer queues */
375 rv->timer = pqueue_create();
376 rv->background = pqueue_create();
377 rv->timer->cmp = rv->background->cmp = thread_timer_cmp;
378 rv->timer->update = rv->background->update = thread_timer_update;
379 rv->spin = true;
380 rv->handle_signals = true;
381
382 #if defined(HAVE_POLL_CALL)
383 rv->handler.pfdsize = rv->fd_limit;
384 rv->handler.pfdcount = 0;
385 rv->handler.pfds = XCALLOC (MTYPE_THREAD_MASTER,
386 sizeof (struct pollfd) * rv->handler.pfdsize);
387 #endif
388 return rv;
389 }
390
391 /* Add a new thread to the list. */
392 static void
393 thread_list_add (struct thread_list *list, struct thread *thread)
394 {
395 thread->next = NULL;
396 thread->prev = list->tail;
397 if (list->tail)
398 list->tail->next = thread;
399 else
400 list->head = thread;
401 list->tail = thread;
402 list->count++;
403 }
404
405 /* Delete a thread from the list. */
406 static struct thread *
407 thread_list_delete (struct thread_list *list, struct thread *thread)
408 {
409 if (thread->next)
410 thread->next->prev = thread->prev;
411 else
412 list->tail = thread->prev;
413 if (thread->prev)
414 thread->prev->next = thread->next;
415 else
416 list->head = thread->next;
417 thread->next = thread->prev = NULL;
418 list->count--;
419 return thread;
420 }
421
422 static void
423 thread_delete_fd (struct thread **thread_array, struct thread *thread)
424 {
425 thread_array[thread->u.fd] = NULL;
426 }
427
428 static void
429 thread_add_fd (struct thread **thread_array, struct thread *thread)
430 {
431 thread_array[thread->u.fd] = thread;
432 }
433
434 /* Thread list is empty or not. */
435 static int
436 thread_empty (struct thread_list *list)
437 {
438 return list->head ? 0 : 1;
439 }
440
441 /* Delete top of the list and return it. */
442 static struct thread *
443 thread_trim_head (struct thread_list *list)
444 {
445 if (!thread_empty (list))
446 return thread_list_delete (list, list->head);
447 return NULL;
448 }
449
450 /* Move thread to unuse list. */
451 static void
452 thread_add_unuse (struct thread_master *m, struct thread *thread)
453 {
454 assert (m != NULL && thread != NULL);
455 assert (thread->next == NULL);
456 assert (thread->prev == NULL);
457 thread->ref = NULL;
458
459 thread->type = THREAD_UNUSED;
460 thread->hist->total_active--;
461 thread_list_add (&m->unuse, thread);
462 }
463
464 /* Free all unused thread. */
465 static void
466 thread_list_free (struct thread_master *m, struct thread_list *list)
467 {
468 struct thread *t;
469 struct thread *next;
470
471 for (t = list->head; t; t = next)
472 {
473 next = t->next;
474 XFREE (MTYPE_THREAD, t);
475 list->count--;
476 m->alloc--;
477 }
478 }
479
480 static void
481 thread_array_free (struct thread_master *m, struct thread **thread_array)
482 {
483 struct thread *t;
484 int index;
485
486 for (index = 0; index < m->fd_limit; ++index)
487 {
488 t = thread_array[index];
489 if (t)
490 {
491 thread_array[index] = NULL;
492 XFREE (MTYPE_THREAD, t);
493 m->alloc--;
494 }
495 }
496 XFREE (MTYPE_THREAD, thread_array);
497 }
498
499 static void
500 thread_queue_free (struct thread_master *m, struct pqueue *queue)
501 {
502 int i;
503
504 for (i = 0; i < queue->size; i++)
505 XFREE(MTYPE_THREAD, queue->array[i]);
506
507 m->alloc -= queue->size;
508 pqueue_delete(queue);
509 }
510
511 /*
512 * thread_master_free_unused
513 *
514 * As threads are finished with they are put on the
515 * unuse list for later reuse.
516 * If we are shutting down, Free up unused threads
517 * So we can see if we forget to shut anything off
518 */
519 void
520 thread_master_free_unused (struct thread_master *m)
521 {
522 pthread_mutex_lock (&m->mtx);
523 {
524 struct thread *t;
525 while ((t = thread_trim_head(&m->unuse)) != NULL)
526 {
527 pthread_mutex_destroy (&t->mtx);
528 XFREE(MTYPE_THREAD, t);
529 }
530 }
531 pthread_mutex_unlock (&m->mtx);
532 }
533
534 /* Stop thread scheduler. */
535 void
536 thread_master_free (struct thread_master *m)
537 {
538 thread_array_free (m, m->read);
539 thread_array_free (m, m->write);
540 thread_queue_free (m, m->timer);
541 thread_list_free (m, &m->event);
542 thread_list_free (m, &m->ready);
543 thread_list_free (m, &m->unuse);
544 thread_queue_free (m, m->background);
545 pthread_mutex_destroy (&m->mtx);
546
547 #if defined(HAVE_POLL_CALL)
548 XFREE (MTYPE_THREAD_MASTER, m->handler.pfds);
549 #endif
550 XFREE (MTYPE_THREAD_MASTER, m);
551
552 pthread_mutex_lock (&cpu_record_mtx);
553 {
554 if (cpu_record)
555 {
556 hash_clean (cpu_record, cpu_record_hash_free);
557 hash_free (cpu_record);
558 cpu_record = NULL;
559 }
560 }
561 pthread_mutex_unlock (&cpu_record_mtx);
562 }
563
564 /* Return remain time in second. */
565 unsigned long
566 thread_timer_remain_second (struct thread *thread)
567 {
568 int64_t remain;
569
570 pthread_mutex_lock (&thread->mtx);
571 {
572 remain = monotime_until(&thread->u.sands, NULL) / 1000000LL;
573 }
574 pthread_mutex_unlock (&thread->mtx);
575
576 return remain < 0 ? 0 : remain;
577 }
578
579 #define debugargdef const char *funcname, const char *schedfrom, int fromln
580 #define debugargpass funcname, schedfrom, fromln
581
582 struct timeval
583 thread_timer_remain(struct thread *thread)
584 {
585 struct timeval remain;
586 pthread_mutex_lock (&thread->mtx);
587 {
588 monotime_until(&thread->u.sands, &remain);
589 }
590 pthread_mutex_unlock (&thread->mtx);
591 return remain;
592 }
593
594 /* Get new thread. */
595 static struct thread *
596 thread_get (struct thread_master *m, u_char type,
597 int (*func) (struct thread *), void *arg, debugargdef)
598 {
599 struct thread *thread = thread_trim_head (&m->unuse);
600 struct cpu_thread_history tmp;
601
602 if (! thread)
603 {
604 thread = XCALLOC (MTYPE_THREAD, sizeof (struct thread));
605 /* mutex only needs to be initialized at struct creation. */
606 pthread_mutex_init (&thread->mtx, NULL);
607 m->alloc++;
608 }
609
610 thread->type = type;
611 thread->add_type = type;
612 thread->master = m;
613 thread->arg = arg;
614 thread->index = -1;
615 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
616 thread->ref = NULL;
617
618 /*
619 * So if the passed in funcname is not what we have
620 * stored that means the thread->hist needs to be
621 * updated. We keep the last one around in unused
622 * under the assumption that we are probably
623 * going to immediately allocate the same
624 * type of thread.
625 * This hopefully saves us some serious
626 * hash_get lookups.
627 */
628 if (thread->funcname != funcname ||
629 thread->func != func)
630 {
631 tmp.func = func;
632 tmp.funcname = funcname;
633 pthread_mutex_lock (&cpu_record_mtx);
634 {
635 thread->hist = hash_get (cpu_record, &tmp,
636 (void * (*) (void *))cpu_record_hash_alloc);
637 }
638 pthread_mutex_unlock (&cpu_record_mtx);
639 }
640 thread->hist->total_active++;
641 thread->func = func;
642 thread->funcname = funcname;
643 thread->schedfrom = schedfrom;
644 thread->schedfrom_line = fromln;
645
646 return thread;
647 }
648
649 #if defined (HAVE_POLL_CALL)
650
651 #define fd_copy_fd_set(X) (X)
652
653 /* generic add thread function */
654 static struct thread *
655 generic_thread_add(struct thread_master *m, int (*func) (struct thread *),
656 void *arg, int fd, int dir, debugargdef)
657 {
658 struct thread *thread;
659
660 u_char type;
661 short int event;
662
663 if (dir == THREAD_READ)
664 {
665 event = (POLLIN | POLLHUP);
666 type = THREAD_READ;
667 }
668 else
669 {
670 event = (POLLOUT | POLLHUP);
671 type = THREAD_WRITE;
672 }
673
674 nfds_t queuepos = m->handler.pfdcount;
675 nfds_t i=0;
676 for (i=0; i<m->handler.pfdcount; i++)
677 if (m->handler.pfds[i].fd == fd)
678 {
679 queuepos = i;
680 break;
681 }
682
683 /* is there enough space for a new fd? */
684 assert (queuepos < m->handler.pfdsize);
685
686 thread = thread_get (m, type, func, arg, debugargpass);
687 m->handler.pfds[queuepos].fd = fd;
688 m->handler.pfds[queuepos].events |= event;
689 if (queuepos == m->handler.pfdcount)
690 m->handler.pfdcount++;
691
692 return thread;
693 }
694 #else
695
696 #define fd_copy_fd_set(X) (X)
697 #endif
698
699 static int
700 fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *timer_wait)
701 {
702 int num;
703
704 /* If timer_wait is null here, that means either select() or poll() should
705 * block indefinitely, unless the thread_master has overriden it. select()
706 * and poll() differ in the timeout values they interpret as an indefinite
707 * block; select() requires a null pointer, while poll takes a millisecond
708 * value of -1.
709 *
710 * The thread_master owner has the option of overriding the default behavior
711 * by setting ->selectpoll_timeout. If the value is positive, it specifies
712 * the maximum number of milliseconds to wait. If the timeout is -1, it
713 * specifies that we should never wait and always return immediately even if
714 * no event is detected. If the value is zero, the behavior is default.
715 */
716
717 #if defined(HAVE_POLL_CALL)
718 int timeout = -1;
719
720 if (timer_wait != NULL && m->selectpoll_timeout == 0) // use the default value
721 timeout = (timer_wait->tv_sec*1000) + (timer_wait->tv_usec/1000);
722 else if (m->selectpoll_timeout > 0) // use the user's timeout
723 timeout = m->selectpoll_timeout;
724 else if (m->selectpoll_timeout < 0) // effect a poll (return immediately)
725 timeout = 0;
726
727 num = poll (m->handler.pfds, m->handler.pfdcount + m->handler.pfdcountsnmp, timeout);
728 #else
729 struct timeval timeout;
730
731 if (m->selectpoll_timeout > 0) // use the user's timeout
732 {
733 timeout.tv_sec = m->selectpoll_timeout / 1000;
734 timeout.tv_usec = (m->selectpoll_timeout % 1000) * 1000;
735 timer_wait = &timeout;
736 }
737 else if (m->selectpoll_timeout < 0) // effect a poll (return immediately)
738 {
739 timeout.tv_sec = 0;
740 timeout.tv_usec = 0;
741 timer_wait = &timeout;
742 }
743 num = select (size, read, write, except, timer_wait);
744 #endif
745
746 return num;
747 }
748
749 static int
750 fd_is_set (struct thread *thread, thread_fd_set *fdset, int pos)
751 {
752 #if defined(HAVE_POLL_CALL)
753 return 1;
754 #else
755 return FD_ISSET (THREAD_FD (thread), fdset);
756 #endif
757 }
758
759 static int
760 fd_clear_read_write (struct thread *thread)
761 {
762 #if !defined(HAVE_POLL_CALL)
763 thread_fd_set *fdset = NULL;
764 int fd = THREAD_FD (thread);
765
766 if (thread->type == THREAD_READ)
767 fdset = &thread->master->handler.readfd;
768 else
769 fdset = &thread->master->handler.writefd;
770
771 if (!FD_ISSET (fd, fdset))
772 return 0;
773
774 FD_CLR (fd, fdset);
775 #endif
776 return 1;
777 }
778
779 /* Add new read thread. */
780 void
781 funcname_thread_add_read_write (int dir, struct thread_master *m,
782 int (*func) (struct thread *), void *arg, int fd, struct thread **t_ptr,
783 debugargdef)
784 {
785 struct thread *thread = NULL;
786
787 pthread_mutex_lock (&m->mtx);
788 {
789 if (t_ptr && *t_ptr) // thread is already scheduled; don't reschedule
790 {
791 pthread_mutex_unlock (&m->mtx);
792 return;
793 }
794
795 #if defined (HAVE_POLL_CALL)
796 thread = generic_thread_add(m, func, arg, fd, dir, debugargpass);
797 #else
798 if (fd >= FD_SETSIZE)
799 {
800 zlog_err ("File descriptor %d is >= FD_SETSIZE (%d). Please recompile"
801 "with --enable-poll=yes", fd, FD_SETSIZE);
802 assert (fd < FD_SETSIZE && !"fd >= FD_SETSIZE");
803 }
804 thread_fd_set *fdset = NULL;
805 if (dir == THREAD_READ)
806 fdset = &m->handler.readfd;
807 else
808 fdset = &m->handler.writefd;
809
810 if (FD_ISSET (fd, fdset))
811 {
812 zlog_warn ("There is already %s fd [%d]",
813 (dir == THREAD_READ) ? "read" : "write", fd);
814 }
815 else
816 {
817 FD_SET (fd, fdset);
818 thread = thread_get (m, dir, func, arg, debugargpass);
819 }
820 #endif
821
822 if (thread)
823 {
824 pthread_mutex_lock (&thread->mtx);
825 {
826 thread->u.fd = fd;
827 if (dir == THREAD_READ)
828 thread_add_fd (m->read, thread);
829 else
830 thread_add_fd (m->write, thread);
831 }
832 pthread_mutex_unlock (&thread->mtx);
833
834 if (t_ptr)
835 {
836 *t_ptr = thread;
837 thread->ref = t_ptr;
838 }
839 }
840 }
841 pthread_mutex_unlock (&m->mtx);
842 }
843
844 static void
845 funcname_thread_add_timer_timeval (struct thread_master *m,
846 int (*func) (struct thread *), int type, void *arg,
847 struct timeval *time_relative, struct thread **t_ptr, debugargdef)
848 {
849 struct thread *thread;
850 struct pqueue *queue;
851
852 assert (m != NULL);
853
854 assert (type == THREAD_TIMER || type == THREAD_BACKGROUND);
855 assert (time_relative);
856
857 pthread_mutex_lock (&m->mtx);
858 {
859 if (t_ptr && *t_ptr) // thread is already scheduled; don't reschedule
860 {
861 pthread_mutex_unlock (&m->mtx);
862 return;
863 }
864
865 queue = ((type == THREAD_TIMER) ? m->timer : m->background);
866 thread = thread_get (m, type, func, arg, debugargpass);
867
868 pthread_mutex_lock (&thread->mtx);
869 {
870 monotime(&thread->u.sands);
871 timeradd(&thread->u.sands, time_relative, &thread->u.sands);
872 pqueue_enqueue(thread, queue);
873 if (t_ptr)
874 {
875 *t_ptr = thread;
876 thread->ref = t_ptr;
877 }
878 }
879 pthread_mutex_unlock (&thread->mtx);
880 }
881 pthread_mutex_unlock (&m->mtx);
882 }
883
884
885 /* Add timer event thread. */
886 void
887 funcname_thread_add_timer (struct thread_master *m,
888 int (*func) (struct thread *), void *arg, long timer,
889 struct thread **t_ptr, debugargdef)
890 {
891 struct timeval trel;
892
893 assert (m != NULL);
894
895 trel.tv_sec = timer;
896 trel.tv_usec = 0;
897
898 return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER, arg, &trel,
899 t_ptr, debugargpass);
900 }
901
902 /* Add timer event thread with "millisecond" resolution */
903 void
904 funcname_thread_add_timer_msec (struct thread_master *m,
905 int (*func) (struct thread *), void *arg, long timer,
906 struct thread **t_ptr, debugargdef)
907 {
908 struct timeval trel;
909
910 assert (m != NULL);
911
912 trel.tv_sec = timer / 1000;
913 trel.tv_usec = 1000*(timer % 1000);
914
915 funcname_thread_add_timer_timeval (m, func, THREAD_TIMER, arg, &trel,
916 t_ptr, debugargpass);
917 }
918
919 /* Add timer event thread with "millisecond" resolution */
920 void
921 funcname_thread_add_timer_tv (struct thread_master *m,
922 int (*func) (struct thread *), void *arg, struct timeval *tv,
923 struct thread **t_ptr, debugargdef)
924 {
925 funcname_thread_add_timer_timeval (m, func, THREAD_TIMER, arg, tv, t_ptr,
926 debugargpass);
927 }
928
929 /* Add a background thread, with an optional millisec delay */
930 void
931 funcname_thread_add_background (struct thread_master *m,
932 int (*func) (struct thread *), void *arg, long delay,
933 struct thread **t_ptr, debugargdef)
934 {
935 struct timeval trel;
936
937 assert (m != NULL);
938
939 if (delay)
940 {
941 trel.tv_sec = delay / 1000;
942 trel.tv_usec = 1000*(delay % 1000);
943 }
944 else
945 {
946 trel.tv_sec = 0;
947 trel.tv_usec = 0;
948 }
949
950 funcname_thread_add_timer_timeval (m, func, THREAD_BACKGROUND, arg, &trel,
951 t_ptr, debugargpass);
952 }
953
954 /* Add simple event thread. */
955 void
956 funcname_thread_add_event (struct thread_master *m,
957 int (*func) (struct thread *), void *arg, int val,
958 struct thread **t_ptr, debugargdef)
959 {
960 struct thread *thread;
961
962 assert (m != NULL);
963
964 pthread_mutex_lock (&m->mtx);
965 {
966 if (t_ptr && *t_ptr) // thread is already scheduled; don't reschedule
967 {
968 pthread_mutex_unlock (&m->mtx);
969 return;
970 }
971
972 thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass);
973 pthread_mutex_lock (&thread->mtx);
974 {
975 thread->u.val = val;
976 thread_list_add (&m->event, thread);
977 }
978 pthread_mutex_unlock (&thread->mtx);
979
980 if (t_ptr)
981 {
982 *t_ptr = thread;
983 thread->ref = t_ptr;
984 }
985 }
986 pthread_mutex_unlock (&m->mtx);
987 }
988
989 static void
990 thread_cancel_read_or_write (struct thread *thread, short int state)
991 {
992 #if defined(HAVE_POLL_CALL)
993 nfds_t i;
994
995 for (i=0;i<thread->master->handler.pfdcount;++i)
996 if (thread->master->handler.pfds[i].fd == thread->u.fd)
997 {
998 thread->master->handler.pfds[i].events &= ~(state);
999
1000 /* remove thread fds from pfd list */
1001 if (thread->master->handler.pfds[i].events == 0)
1002 {
1003 memmove(thread->master->handler.pfds+i,
1004 thread->master->handler.pfds+i+1,
1005 (thread->master->handler.pfdsize-i-1) * sizeof(struct pollfd));
1006 thread->master->handler.pfdcount--;
1007 return;
1008 }
1009 }
1010 #endif
1011
1012 fd_clear_read_write (thread);
1013 }
1014
1015 /**
1016 * Cancel thread from scheduler.
1017 *
1018 * This function is *NOT* MT-safe. DO NOT call it from any other pthread except
1019 * the one which owns thread->master.
1020 */
1021 void
1022 thread_cancel (struct thread *thread)
1023 {
1024 struct thread_list *list = NULL;
1025 struct pqueue *queue = NULL;
1026 struct thread **thread_array = NULL;
1027
1028 pthread_mutex_lock (&thread->master->mtx);
1029 pthread_mutex_lock (&thread->mtx);
1030
1031 switch (thread->type)
1032 {
1033 case THREAD_READ:
1034 #if defined (HAVE_POLL_CALL)
1035 thread_cancel_read_or_write (thread, POLLIN | POLLHUP);
1036 #else
1037 thread_cancel_read_or_write (thread, 0);
1038 #endif
1039 thread_array = thread->master->read;
1040 break;
1041 case THREAD_WRITE:
1042 #if defined (HAVE_POLL_CALL)
1043 thread_cancel_read_or_write (thread, POLLOUT | POLLHUP);
1044 #else
1045 thread_cancel_read_or_write (thread, 0);
1046 #endif
1047 thread_array = thread->master->write;
1048 break;
1049 case THREAD_TIMER:
1050 queue = thread->master->timer;
1051 break;
1052 case THREAD_EVENT:
1053 list = &thread->master->event;
1054 break;
1055 case THREAD_READY:
1056 list = &thread->master->ready;
1057 break;
1058 case THREAD_BACKGROUND:
1059 queue = thread->master->background;
1060 break;
1061 default:
1062 goto done;
1063 break;
1064 }
1065
1066 if (queue)
1067 {
1068 assert(thread->index >= 0);
1069 pqueue_remove (thread, queue);
1070 }
1071 else if (list)
1072 {
1073 thread_list_delete (list, thread);
1074 }
1075 else if (thread_array)
1076 {
1077 thread_delete_fd (thread_array, thread);
1078 }
1079 else
1080 {
1081 assert(!"Thread should be either in queue or list or array!");
1082 }
1083
1084 if (thread->ref)
1085 *thread->ref = NULL;
1086
1087 thread_add_unuse (thread->master, thread);
1088
1089 done:
1090 pthread_mutex_unlock (&thread->mtx);
1091 pthread_mutex_unlock (&thread->master->mtx);
1092 }
1093
1094 /* Delete all events which has argument value arg. */
1095 unsigned int
1096 thread_cancel_event (struct thread_master *m, void *arg)
1097 {
1098 unsigned int ret = 0;
1099 struct thread *thread;
1100 struct thread *t;
1101
1102 pthread_mutex_lock (&m->mtx);
1103 {
1104 thread = m->event.head;
1105 while (thread)
1106 {
1107 t = thread;
1108 pthread_mutex_lock (&t->mtx);
1109 {
1110 thread = t->next;
1111
1112 if (t->arg == arg)
1113 {
1114 ret++;
1115 thread_list_delete (&m->event, t);
1116 if (t->ref)
1117 *t->ref = NULL;
1118 thread_add_unuse (m, t);
1119 }
1120 }
1121 pthread_mutex_unlock (&t->mtx);
1122 }
1123
1124 /* thread can be on the ready list too */
1125 thread = m->ready.head;
1126 while (thread)
1127 {
1128 t = thread;
1129 pthread_mutex_lock (&t->mtx);
1130 {
1131 thread = t->next;
1132
1133 if (t->arg == arg)
1134 {
1135 ret++;
1136 thread_list_delete (&m->ready, t);
1137 if (t->ref)
1138 *t->ref = NULL;
1139 thread_add_unuse (m, t);
1140 }
1141 }
1142 pthread_mutex_unlock (&t->mtx);
1143 }
1144 }
1145 pthread_mutex_unlock (&m->mtx);
1146 return ret;
1147 }
1148
1149 static struct timeval *
1150 thread_timer_wait (struct pqueue *queue, struct timeval *timer_val)
1151 {
1152 if (queue->size)
1153 {
1154 struct thread *next_timer = queue->array[0];
1155 monotime_until(&next_timer->u.sands, timer_val);
1156 return timer_val;
1157 }
1158 return NULL;
1159 }
1160
1161 static struct thread *
1162 thread_run (struct thread_master *m, struct thread *thread,
1163 struct thread *fetch)
1164 {
1165 *fetch = *thread;
1166 thread_add_unuse (m, thread);
1167 return fetch;
1168 }
1169
1170 static int
1171 thread_process_fds_helper (struct thread_master *m, struct thread *thread, thread_fd_set *fdset, short int state, int pos)
1172 {
1173 struct thread **thread_array;
1174
1175 if (!thread)
1176 return 0;
1177
1178 if (thread->type == THREAD_READ)
1179 thread_array = m->read;
1180 else
1181 thread_array = m->write;
1182
1183 if (fd_is_set (thread, fdset, pos))
1184 {
1185 fd_clear_read_write (thread);
1186 thread_delete_fd (thread_array, thread);
1187 thread_list_add (&m->ready, thread);
1188 thread->type = THREAD_READY;
1189 #if defined(HAVE_POLL_CALL)
1190 thread->master->handler.pfds[pos].events &= ~(state);
1191 #endif
1192 return 1;
1193 }
1194 return 0;
1195 }
1196
1197 #if defined(HAVE_POLL_CALL)
1198
1199 /* check poll events */
1200 static void
1201 check_pollfds(struct thread_master *m, fd_set *readfd, int num)
1202 {
1203 nfds_t i = 0;
1204 int ready = 0;
1205 for (i = 0; i < m->handler.pfdcount && ready < num ; ++i)
1206 {
1207 /* no event for current fd? immideatly continue */
1208 if(m->handler.pfds[i].revents == 0)
1209 continue;
1210
1211 ready++;
1212
1213 /* POLLIN / POLLOUT process event */
1214 if (m->handler.pfds[i].revents & POLLIN)
1215 thread_process_fds_helper(m, m->read[m->handler.pfds[i].fd], NULL, POLLIN, i);
1216 if (m->handler.pfds[i].revents & POLLOUT)
1217 thread_process_fds_helper(m, m->write[m->handler.pfds[i].fd], NULL, POLLOUT, i);
1218
1219 /* remove fd from list on POLLNVAL */
1220 if (m->handler.pfds[i].revents & POLLNVAL ||
1221 m->handler.pfds[i].revents & POLLHUP)
1222 {
1223 memmove(m->handler.pfds+i,
1224 m->handler.pfds+i+1,
1225 (m->handler.pfdsize-i-1) * sizeof(struct pollfd));
1226 m->handler.pfdcount--;
1227 i--;
1228 }
1229 else
1230 m->handler.pfds[i].revents = 0;
1231 }
1232 }
1233 #endif
1234
1235 static void
1236 thread_process_fds (struct thread_master *m, thread_fd_set *rset, thread_fd_set *wset, int num)
1237 {
1238 #if defined (HAVE_POLL_CALL)
1239 check_pollfds (m, rset, num);
1240 #else
1241 int ready = 0, index;
1242
1243 for (index = 0; index < m->fd_limit && ready < num; ++index)
1244 {
1245 ready += thread_process_fds_helper (m, m->read[index], rset, 0, 0);
1246 ready += thread_process_fds_helper (m, m->write[index], wset, 0, 0);
1247 }
1248 #endif
1249 }
1250
1251 /* Add all timers that have popped to the ready list. */
1252 static unsigned int
1253 thread_timer_process (struct pqueue *queue, struct timeval *timenow)
1254 {
1255 struct thread *thread;
1256 unsigned int ready = 0;
1257
1258 while (queue->size)
1259 {
1260 thread = queue->array[0];
1261 if (timercmp (timenow, &thread->u.sands, <))
1262 return ready;
1263 pqueue_dequeue(queue);
1264 thread->type = THREAD_READY;
1265 thread_list_add (&thread->master->ready, thread);
1266 ready++;
1267 }
1268 return ready;
1269 }
1270
1271 /* process a list en masse, e.g. for event thread lists */
1272 static unsigned int
1273 thread_process (struct thread_list *list)
1274 {
1275 struct thread *thread;
1276 struct thread *next;
1277 unsigned int ready = 0;
1278
1279 for (thread = list->head; thread; thread = next)
1280 {
1281 next = thread->next;
1282 thread_list_delete (list, thread);
1283 thread->type = THREAD_READY;
1284 thread_list_add (&thread->master->ready, thread);
1285 ready++;
1286 }
1287 return ready;
1288 }
1289
1290
1291 /* Fetch next ready thread. */
1292 struct thread *
1293 thread_fetch (struct thread_master *m, struct thread *fetch)
1294 {
1295 struct thread *thread;
1296 thread_fd_set readfd;
1297 thread_fd_set writefd;
1298 thread_fd_set exceptfd;
1299 struct timeval now;
1300 struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 };
1301 struct timeval timer_val_bg;
1302 struct timeval *timer_wait = &timer_val;
1303 struct timeval *timer_wait_bg;
1304
1305 do
1306 {
1307 int num = 0;
1308
1309 /* Signals pre-empt everything */
1310 if (m->handle_signals)
1311 quagga_sigevent_process ();
1312
1313 pthread_mutex_lock (&m->mtx);
1314 /* Drain the ready queue of already scheduled jobs, before scheduling
1315 * more.
1316 */
1317 if ((thread = thread_trim_head (&m->ready)) != NULL)
1318 {
1319 fetch = thread_run (m, thread, fetch);
1320 if (fetch->ref)
1321 *fetch->ref = NULL;
1322 pthread_mutex_unlock (&m->mtx);
1323 return fetch;
1324 }
1325
1326 /* To be fair to all kinds of threads, and avoid starvation, we
1327 * need to be careful to consider all thread types for scheduling
1328 * in each quanta. I.e. we should not return early from here on.
1329 */
1330
1331 /* Normal event are the next highest priority. */
1332 thread_process (&m->event);
1333
1334 /* Structure copy. */
1335 #if !defined(HAVE_POLL_CALL)
1336 readfd = fd_copy_fd_set(m->handler.readfd);
1337 writefd = fd_copy_fd_set(m->handler.writefd);
1338 exceptfd = fd_copy_fd_set(m->handler.exceptfd);
1339 #endif
1340
1341 /* Calculate select wait timer if nothing else to do */
1342 if (m->ready.count == 0)
1343 {
1344 timer_wait = thread_timer_wait (m->timer, &timer_val);
1345 timer_wait_bg = thread_timer_wait (m->background, &timer_val_bg);
1346
1347 if (timer_wait_bg &&
1348 (!timer_wait || (timercmp (timer_wait, timer_wait_bg, >))))
1349 timer_wait = timer_wait_bg;
1350 }
1351
1352 if (timer_wait && timer_wait->tv_sec < 0)
1353 {
1354 timerclear(&timer_val);
1355 timer_wait = &timer_val;
1356 }
1357
1358 num = fd_select (m, FD_SETSIZE, &readfd, &writefd, &exceptfd, timer_wait);
1359
1360 /* Signals should get quick treatment */
1361 if (num < 0)
1362 {
1363 if (errno == EINTR)
1364 {
1365 pthread_mutex_unlock (&m->mtx);
1366 continue; /* signal received - process it */
1367 }
1368 zlog_warn ("select() error: %s", safe_strerror (errno));
1369 pthread_mutex_unlock (&m->mtx);
1370 return NULL;
1371 }
1372
1373 /* Check foreground timers. Historically, they have had higher
1374 priority than I/O threads, so let's push them onto the ready
1375 list in front of the I/O threads. */
1376 monotime(&now);
1377 thread_timer_process (m->timer, &now);
1378
1379 /* Got IO, process it */
1380 if (num > 0)
1381 thread_process_fds (m, &readfd, &writefd, num);
1382
1383 #if 0
1384 /* If any threads were made ready above (I/O or foreground timer),
1385 perhaps we should avoid adding background timers to the ready
1386 list at this time. If this is code is uncommented, then background
1387 timer threads will not run unless there is nothing else to do. */
1388 if ((thread = thread_trim_head (&m->ready)) != NULL)
1389 {
1390 fetch = thread_run (m, thread, fetch);
1391 if (fetch->ref)
1392 *fetch->ref = NULL;
1393 pthread_mutex_unlock (&m->mtx);
1394 return fetch;
1395 }
1396 #endif
1397
1398 /* Background timer/events, lowest priority */
1399 thread_timer_process (m->background, &now);
1400
1401 if ((thread = thread_trim_head (&m->ready)) != NULL)
1402 {
1403 fetch = thread_run (m, thread, fetch);
1404 if (fetch->ref)
1405 *fetch->ref = NULL;
1406 pthread_mutex_unlock (&m->mtx);
1407 return fetch;
1408 }
1409
1410 pthread_mutex_unlock (&m->mtx);
1411
1412 } while (m->spin);
1413
1414 return NULL;
1415 }
1416
1417 unsigned long
1418 thread_consumed_time (RUSAGE_T *now, RUSAGE_T *start, unsigned long *cputime)
1419 {
1420 /* This is 'user + sys' time. */
1421 *cputime = timeval_elapsed (now->cpu.ru_utime, start->cpu.ru_utime) +
1422 timeval_elapsed (now->cpu.ru_stime, start->cpu.ru_stime);
1423 return timeval_elapsed (now->real, start->real);
1424 }
1425
1426 /* We should aim to yield after yield milliseconds, which defaults
1427 to THREAD_YIELD_TIME_SLOT .
1428 Note: we are using real (wall clock) time for this calculation.
1429 It could be argued that CPU time may make more sense in certain
1430 contexts. The things to consider are whether the thread may have
1431 blocked (in which case wall time increases, but CPU time does not),
1432 or whether the system is heavily loaded with other processes competing
1433 for CPU time. On balance, wall clock time seems to make sense.
1434 Plus it has the added benefit that gettimeofday should be faster
1435 than calling getrusage. */
1436 int
1437 thread_should_yield (struct thread *thread)
1438 {
1439 int result;
1440 pthread_mutex_lock (&thread->mtx);
1441 {
1442 result = monotime_since(&thread->real, NULL) > (int64_t)thread->yield;
1443 }
1444 pthread_mutex_unlock (&thread->mtx);
1445 return result;
1446 }
1447
1448 void
1449 thread_set_yield_time (struct thread *thread, unsigned long yield_time)
1450 {
1451 pthread_mutex_lock (&thread->mtx);
1452 {
1453 thread->yield = yield_time;
1454 }
1455 pthread_mutex_unlock (&thread->mtx);
1456 }
1457
1458 void
1459 thread_getrusage (RUSAGE_T *r)
1460 {
1461 monotime(&r->real);
1462 getrusage(RUSAGE_SELF, &(r->cpu));
1463 }
1464
1465 struct thread *thread_current = NULL;
1466
1467 /* We check thread consumed time. If the system has getrusage, we'll
1468 use that to get in-depth stats on the performance of the thread in addition
1469 to wall clock time stats from gettimeofday. */
1470 void
1471 thread_call (struct thread *thread)
1472 {
1473 unsigned long realtime, cputime;
1474 RUSAGE_T before, after;
1475
1476 GETRUSAGE (&before);
1477 thread->real = before.real;
1478
1479 thread_current = thread;
1480 (*thread->func) (thread);
1481 thread_current = NULL;
1482
1483 GETRUSAGE (&after);
1484
1485 realtime = thread_consumed_time (&after, &before, &cputime);
1486 thread->hist->real.total += realtime;
1487 if (thread->hist->real.max < realtime)
1488 thread->hist->real.max = realtime;
1489 thread->hist->cpu.total += cputime;
1490 if (thread->hist->cpu.max < cputime)
1491 thread->hist->cpu.max = cputime;
1492
1493 ++(thread->hist->total_calls);
1494 thread->hist->types |= (1 << thread->add_type);
1495
1496 #ifdef CONSUMED_TIME_CHECK
1497 if (realtime > CONSUMED_TIME_CHECK)
1498 {
1499 /*
1500 * We have a CPU Hog on our hands.
1501 * Whinge about it now, so we're aware this is yet another task
1502 * to fix.
1503 */
1504 zlog_warn ("SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1505 thread->funcname,
1506 (unsigned long) thread->func,
1507 realtime/1000, cputime/1000);
1508 }
1509 #endif /* CONSUMED_TIME_CHECK */
1510 }
1511
1512 /* Execute thread */
1513 void
1514 funcname_thread_execute (struct thread_master *m,
1515 int (*func)(struct thread *),
1516 void *arg,
1517 int val,
1518 debugargdef)
1519 {
1520 struct cpu_thread_history tmp;
1521 struct thread dummy;
1522
1523 memset (&dummy, 0, sizeof (struct thread));
1524
1525 pthread_mutex_init (&dummy.mtx, NULL);
1526 dummy.type = THREAD_EVENT;
1527 dummy.add_type = THREAD_EXECUTE;
1528 dummy.master = NULL;
1529 dummy.arg = arg;
1530 dummy.u.val = val;
1531
1532 tmp.func = dummy.func = func;
1533 tmp.funcname = dummy.funcname = funcname;
1534 pthread_mutex_lock (&cpu_record_mtx);
1535 {
1536 dummy.hist = hash_get (cpu_record, &tmp,
1537 (void * (*) (void *))cpu_record_hash_alloc);
1538 }
1539 pthread_mutex_unlock (&cpu_record_mtx);
1540
1541 dummy.schedfrom = schedfrom;
1542 dummy.schedfrom_line = fromln;
1543
1544 thread_call (&dummy);
1545 }