]> git.proxmox.com Git - mirror_frr.git/blob - lib/thread.c
*: remove THREAD_ON macros, add nullity check
[mirror_frr.git] / lib / thread.c
1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with GNU Zebra; see the file COPYING. If not, write to the Free
18 * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
19 * 02111-1307, USA.
20 */
21
22 /* #define DEBUG */
23
24 #include <zebra.h>
25 #include <sys/resource.h>
26
27 #include "thread.h"
28 #include "memory.h"
29 #include "log.h"
30 #include "hash.h"
31 #include "pqueue.h"
32 #include "command.h"
33 #include "sigevent.h"
34
35 DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
36 DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
37 DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
38
39 #if defined(__APPLE__)
40 #include <mach/mach.h>
41 #include <mach/mach_time.h>
42 #endif
43
44 static pthread_mutex_t cpu_record_mtx = PTHREAD_MUTEX_INITIALIZER;
45 static struct hash *cpu_record = NULL;
46
47 static unsigned long
48 timeval_elapsed (struct timeval a, struct timeval b)
49 {
50 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
51 + (a.tv_usec - b.tv_usec));
52 }
53
54 static unsigned int
55 cpu_record_hash_key (struct cpu_thread_history *a)
56 {
57 return (uintptr_t) a->func;
58 }
59
60 static int
61 cpu_record_hash_cmp (const struct cpu_thread_history *a,
62 const struct cpu_thread_history *b)
63 {
64 return a->func == b->func;
65 }
66
67 static void *
68 cpu_record_hash_alloc (struct cpu_thread_history *a)
69 {
70 struct cpu_thread_history *new;
71 new = XCALLOC (MTYPE_THREAD_STATS, sizeof (struct cpu_thread_history));
72 new->func = a->func;
73 new->funcname = a->funcname;
74 return new;
75 }
76
77 static void
78 cpu_record_hash_free (void *a)
79 {
80 struct cpu_thread_history *hist = a;
81
82 XFREE (MTYPE_THREAD_STATS, hist);
83 }
84
85 static void
86 vty_out_cpu_thread_history(struct vty* vty,
87 struct cpu_thread_history *a)
88 {
89 vty_out(vty, "%5d %10ld.%03ld %9d %8ld %9ld %8ld %9ld",
90 a->total_active, a->cpu.total/1000, a->cpu.total%1000, a->total_calls,
91 a->cpu.total/a->total_calls, a->cpu.max,
92 a->real.total/a->total_calls, a->real.max);
93 vty_out(vty, " %c%c%c%c%c%c %s%s",
94 a->types & (1 << THREAD_READ) ? 'R':' ',
95 a->types & (1 << THREAD_WRITE) ? 'W':' ',
96 a->types & (1 << THREAD_TIMER) ? 'T':' ',
97 a->types & (1 << THREAD_EVENT) ? 'E':' ',
98 a->types & (1 << THREAD_EXECUTE) ? 'X':' ',
99 a->types & (1 << THREAD_BACKGROUND) ? 'B' : ' ',
100 a->funcname, VTY_NEWLINE);
101 }
102
103 static void
104 cpu_record_hash_print(struct hash_backet *bucket,
105 void *args[])
106 {
107 struct cpu_thread_history *totals = args[0];
108 struct vty *vty = args[1];
109 thread_type *filter = args[2];
110 struct cpu_thread_history *a = bucket->data;
111
112 if ( !(a->types & *filter) )
113 return;
114 vty_out_cpu_thread_history(vty,a);
115 totals->total_active += a->total_active;
116 totals->total_calls += a->total_calls;
117 totals->real.total += a->real.total;
118 if (totals->real.max < a->real.max)
119 totals->real.max = a->real.max;
120 totals->cpu.total += a->cpu.total;
121 if (totals->cpu.max < a->cpu.max)
122 totals->cpu.max = a->cpu.max;
123 }
124
125 static void
126 cpu_record_print(struct vty *vty, thread_type filter)
127 {
128 struct cpu_thread_history tmp;
129 void *args[3] = {&tmp, vty, &filter};
130
131 memset(&tmp, 0, sizeof tmp);
132 tmp.funcname = "TOTAL";
133 tmp.types = filter;
134
135 vty_out(vty, "%21s %18s %18s%s",
136 "", "CPU (user+system):", "Real (wall-clock):", VTY_NEWLINE);
137 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
138 vty_out(vty, " Avg uSec Max uSecs");
139 vty_out(vty, " Type Thread%s", VTY_NEWLINE);
140
141 pthread_mutex_lock (&cpu_record_mtx);
142 {
143 hash_iterate(cpu_record,
144 (void(*)(struct hash_backet*,void*))cpu_record_hash_print,
145 args);
146 }
147 pthread_mutex_unlock (&cpu_record_mtx);
148
149 if (tmp.total_calls > 0)
150 vty_out_cpu_thread_history(vty, &tmp);
151 }
152
153 DEFUN (show_thread_cpu,
154 show_thread_cpu_cmd,
155 "show thread cpu [FILTER]",
156 SHOW_STR
157 "Thread information\n"
158 "Thread CPU usage\n"
159 "Display filter (rwtexb)\n")
160 {
161 int idx_filter = 3;
162 int i = 0;
163 thread_type filter = (thread_type) -1U;
164
165 if (argc > 3)
166 {
167 filter = 0;
168 while (argv[idx_filter]->arg[i] != '\0')
169 {
170 switch ( argv[idx_filter]->arg[i] )
171 {
172 case 'r':
173 case 'R':
174 filter |= (1 << THREAD_READ);
175 break;
176 case 'w':
177 case 'W':
178 filter |= (1 << THREAD_WRITE);
179 break;
180 case 't':
181 case 'T':
182 filter |= (1 << THREAD_TIMER);
183 break;
184 case 'e':
185 case 'E':
186 filter |= (1 << THREAD_EVENT);
187 break;
188 case 'x':
189 case 'X':
190 filter |= (1 << THREAD_EXECUTE);
191 break;
192 case 'b':
193 case 'B':
194 filter |= (1 << THREAD_BACKGROUND);
195 break;
196 default:
197 break;
198 }
199 ++i;
200 }
201 if (filter == 0)
202 {
203 vty_out(vty, "Invalid filter \"%s\" specified,"
204 " must contain at least one of 'RWTEXB'%s",
205 argv[idx_filter]->arg, VTY_NEWLINE);
206 return CMD_WARNING;
207 }
208 }
209
210 cpu_record_print(vty, filter);
211 return CMD_SUCCESS;
212 }
213
214 static void
215 cpu_record_hash_clear (struct hash_backet *bucket,
216 void *args)
217 {
218 thread_type *filter = args;
219 struct cpu_thread_history *a = bucket->data;
220
221 if ( !(a->types & *filter) )
222 return;
223
224 pthread_mutex_lock (&cpu_record_mtx);
225 {
226 hash_release (cpu_record, bucket->data);
227 }
228 pthread_mutex_unlock (&cpu_record_mtx);
229 }
230
231 static void
232 cpu_record_clear (thread_type filter)
233 {
234 thread_type *tmp = &filter;
235
236 pthread_mutex_lock (&cpu_record_mtx);
237 {
238 hash_iterate (cpu_record,
239 (void (*) (struct hash_backet*,void*)) cpu_record_hash_clear,
240 tmp);
241 }
242 pthread_mutex_unlock (&cpu_record_mtx);
243 }
244
245 DEFUN (clear_thread_cpu,
246 clear_thread_cpu_cmd,
247 "clear thread cpu [FILTER]",
248 "Clear stored data\n"
249 "Thread information\n"
250 "Thread CPU usage\n"
251 "Display filter (rwtexb)\n")
252 {
253 int idx_filter = 3;
254 int i = 0;
255 thread_type filter = (thread_type) -1U;
256
257 if (argc > 3)
258 {
259 filter = 0;
260 while (argv[idx_filter]->arg[i] != '\0')
261 {
262 switch ( argv[idx_filter]->arg[i] )
263 {
264 case 'r':
265 case 'R':
266 filter |= (1 << THREAD_READ);
267 break;
268 case 'w':
269 case 'W':
270 filter |= (1 << THREAD_WRITE);
271 break;
272 case 't':
273 case 'T':
274 filter |= (1 << THREAD_TIMER);
275 break;
276 case 'e':
277 case 'E':
278 filter |= (1 << THREAD_EVENT);
279 break;
280 case 'x':
281 case 'X':
282 filter |= (1 << THREAD_EXECUTE);
283 break;
284 case 'b':
285 case 'B':
286 filter |= (1 << THREAD_BACKGROUND);
287 break;
288 default:
289 break;
290 }
291 ++i;
292 }
293 if (filter == 0)
294 {
295 vty_out(vty, "Invalid filter \"%s\" specified,"
296 " must contain at least one of 'RWTEXB'%s",
297 argv[idx_filter]->arg, VTY_NEWLINE);
298 return CMD_WARNING;
299 }
300 }
301
302 cpu_record_clear (filter);
303 return CMD_SUCCESS;
304 }
305
306 void
307 thread_cmd_init (void)
308 {
309 install_element (VIEW_NODE, &show_thread_cpu_cmd);
310 install_element (ENABLE_NODE, &clear_thread_cpu_cmd);
311 }
312
313 static int
314 thread_timer_cmp(void *a, void *b)
315 {
316 struct thread *thread_a = a;
317 struct thread *thread_b = b;
318
319 if (timercmp (&thread_a->u.sands, &thread_b->u.sands, <))
320 return -1;
321 if (timercmp (&thread_a->u.sands, &thread_b->u.sands, >))
322 return 1;
323 return 0;
324 }
325
326 static void
327 thread_timer_update(void *node, int actual_position)
328 {
329 struct thread *thread = node;
330
331 thread->index = actual_position;
332 }
333
334 /* Allocate new thread master. */
335 struct thread_master *
336 thread_master_create (void)
337 {
338 struct thread_master *rv;
339 struct rlimit limit;
340
341 getrlimit(RLIMIT_NOFILE, &limit);
342
343 pthread_mutex_lock (&cpu_record_mtx);
344 {
345 if (cpu_record == NULL)
346 cpu_record = hash_create ((unsigned int (*) (void *))cpu_record_hash_key,
347 (int (*) (const void *, const void *))
348 cpu_record_hash_cmp);
349 }
350 pthread_mutex_unlock (&cpu_record_mtx);
351
352 rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master));
353 if (rv == NULL)
354 return NULL;
355
356 pthread_mutex_init (&rv->mtx, NULL);
357
358 rv->fd_limit = (int)limit.rlim_cur;
359 rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
360 if (rv->read == NULL)
361 {
362 XFREE (MTYPE_THREAD_MASTER, rv);
363 return NULL;
364 }
365
366 rv->write = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
367 if (rv->write == NULL)
368 {
369 XFREE (MTYPE_THREAD, rv->read);
370 XFREE (MTYPE_THREAD_MASTER, rv);
371 return NULL;
372 }
373
374 /* Initialize the timer queues */
375 rv->timer = pqueue_create();
376 rv->background = pqueue_create();
377 rv->timer->cmp = rv->background->cmp = thread_timer_cmp;
378 rv->timer->update = rv->background->update = thread_timer_update;
379 rv->spin = true;
380 rv->handle_signals = true;
381
382 #if defined(HAVE_POLL_CALL)
383 rv->handler.pfdsize = rv->fd_limit;
384 rv->handler.pfdcount = 0;
385 rv->handler.pfds = XCALLOC (MTYPE_THREAD_MASTER,
386 sizeof (struct pollfd) * rv->handler.pfdsize);
387 #endif
388 return rv;
389 }
390
391 /* Add a new thread to the list. */
392 static void
393 thread_list_add (struct thread_list *list, struct thread *thread)
394 {
395 thread->next = NULL;
396 thread->prev = list->tail;
397 if (list->tail)
398 list->tail->next = thread;
399 else
400 list->head = thread;
401 list->tail = thread;
402 list->count++;
403 }
404
405 /* Delete a thread from the list. */
406 static struct thread *
407 thread_list_delete (struct thread_list *list, struct thread *thread)
408 {
409 if (thread->next)
410 thread->next->prev = thread->prev;
411 else
412 list->tail = thread->prev;
413 if (thread->prev)
414 thread->prev->next = thread->next;
415 else
416 list->head = thread->next;
417 thread->next = thread->prev = NULL;
418 list->count--;
419 return thread;
420 }
421
422 static void
423 thread_delete_fd (struct thread **thread_array, struct thread *thread)
424 {
425 thread_array[thread->u.fd] = NULL;
426 }
427
428 static void
429 thread_add_fd (struct thread **thread_array, struct thread *thread)
430 {
431 thread_array[thread->u.fd] = thread;
432 }
433
434 /* Thread list is empty or not. */
435 static int
436 thread_empty (struct thread_list *list)
437 {
438 return list->head ? 0 : 1;
439 }
440
441 /* Delete top of the list and return it. */
442 static struct thread *
443 thread_trim_head (struct thread_list *list)
444 {
445 if (!thread_empty (list))
446 return thread_list_delete (list, list->head);
447 return NULL;
448 }
449
450 /* Move thread to unuse list. */
451 static void
452 thread_add_unuse (struct thread_master *m, struct thread *thread)
453 {
454 assert (m != NULL && thread != NULL);
455 assert (thread->next == NULL);
456 assert (thread->prev == NULL);
457
458 thread->type = THREAD_UNUSED;
459 thread->hist->total_active--;
460 thread_list_add (&m->unuse, thread);
461 }
462
463 /* Free all unused thread. */
464 static void
465 thread_list_free (struct thread_master *m, struct thread_list *list)
466 {
467 struct thread *t;
468 struct thread *next;
469
470 for (t = list->head; t; t = next)
471 {
472 next = t->next;
473 XFREE (MTYPE_THREAD, t);
474 list->count--;
475 m->alloc--;
476 }
477 }
478
479 static void
480 thread_array_free (struct thread_master *m, struct thread **thread_array)
481 {
482 struct thread *t;
483 int index;
484
485 for (index = 0; index < m->fd_limit; ++index)
486 {
487 t = thread_array[index];
488 if (t)
489 {
490 thread_array[index] = NULL;
491 XFREE (MTYPE_THREAD, t);
492 m->alloc--;
493 }
494 }
495 XFREE (MTYPE_THREAD, thread_array);
496 }
497
498 static void
499 thread_queue_free (struct thread_master *m, struct pqueue *queue)
500 {
501 int i;
502
503 for (i = 0; i < queue->size; i++)
504 XFREE(MTYPE_THREAD, queue->array[i]);
505
506 m->alloc -= queue->size;
507 pqueue_delete(queue);
508 }
509
510 /*
511 * thread_master_free_unused
512 *
513 * As threads are finished with they are put on the
514 * unuse list for later reuse.
515 * If we are shutting down, Free up unused threads
516 * So we can see if we forget to shut anything off
517 */
518 void
519 thread_master_free_unused (struct thread_master *m)
520 {
521 pthread_mutex_lock (&m->mtx);
522 {
523 struct thread *t;
524 while ((t = thread_trim_head(&m->unuse)) != NULL)
525 {
526 pthread_mutex_destroy (&t->mtx);
527 XFREE(MTYPE_THREAD, t);
528 }
529 }
530 pthread_mutex_unlock (&m->mtx);
531 }
532
533 /* Stop thread scheduler. */
534 void
535 thread_master_free (struct thread_master *m)
536 {
537 thread_array_free (m, m->read);
538 thread_array_free (m, m->write);
539 thread_queue_free (m, m->timer);
540 thread_list_free (m, &m->event);
541 thread_list_free (m, &m->ready);
542 thread_list_free (m, &m->unuse);
543 thread_queue_free (m, m->background);
544 pthread_mutex_destroy (&m->mtx);
545
546 #if defined(HAVE_POLL_CALL)
547 XFREE (MTYPE_THREAD_MASTER, m->handler.pfds);
548 #endif
549 XFREE (MTYPE_THREAD_MASTER, m);
550
551 pthread_mutex_lock (&cpu_record_mtx);
552 {
553 if (cpu_record)
554 {
555 hash_clean (cpu_record, cpu_record_hash_free);
556 hash_free (cpu_record);
557 cpu_record = NULL;
558 }
559 }
560 pthread_mutex_unlock (&cpu_record_mtx);
561 }
562
563 /* Return remain time in second. */
564 unsigned long
565 thread_timer_remain_second (struct thread *thread)
566 {
567 int64_t remain;
568
569 pthread_mutex_lock (&thread->mtx);
570 {
571 remain = monotime_until(&thread->u.sands, NULL) / 1000000LL;
572 }
573 pthread_mutex_unlock (&thread->mtx);
574
575 return remain < 0 ? 0 : remain;
576 }
577
578 #define debugargdef const char *funcname, const char *schedfrom, int fromln
579 #define debugargpass funcname, schedfrom, fromln
580
581 struct timeval
582 thread_timer_remain(struct thread *thread)
583 {
584 struct timeval remain;
585 pthread_mutex_lock (&thread->mtx);
586 {
587 monotime_until(&thread->u.sands, &remain);
588 }
589 pthread_mutex_unlock (&thread->mtx);
590 return remain;
591 }
592
593 /* Get new thread. */
594 static struct thread *
595 thread_get (struct thread_master *m, u_char type,
596 int (*func) (struct thread *), void *arg, debugargdef)
597 {
598 struct thread *thread = thread_trim_head (&m->unuse);
599 struct cpu_thread_history tmp;
600
601 if (! thread)
602 {
603 thread = XCALLOC (MTYPE_THREAD, sizeof (struct thread));
604 /* mutex only needs to be initialized at struct creation. */
605 pthread_mutex_init (&thread->mtx, NULL);
606 m->alloc++;
607 }
608
609 thread->type = type;
610 thread->add_type = type;
611 thread->master = m;
612 thread->arg = arg;
613 thread->index = -1;
614 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
615
616 /*
617 * So if the passed in funcname is not what we have
618 * stored that means the thread->hist needs to be
619 * updated. We keep the last one around in unused
620 * under the assumption that we are probably
621 * going to immediately allocate the same
622 * type of thread.
623 * This hopefully saves us some serious
624 * hash_get lookups.
625 */
626 if (thread->funcname != funcname ||
627 thread->func != func)
628 {
629 tmp.func = func;
630 tmp.funcname = funcname;
631 pthread_mutex_lock (&cpu_record_mtx);
632 {
633 thread->hist = hash_get (cpu_record, &tmp,
634 (void * (*) (void *))cpu_record_hash_alloc);
635 }
636 pthread_mutex_unlock (&cpu_record_mtx);
637 }
638 thread->hist->total_active++;
639 thread->func = func;
640 thread->funcname = funcname;
641 thread->schedfrom = schedfrom;
642 thread->schedfrom_line = fromln;
643
644 return thread;
645 }
646
647 #if defined (HAVE_POLL_CALL)
648
649 #define fd_copy_fd_set(X) (X)
650
651 /* generic add thread function */
652 static struct thread *
653 generic_thread_add(struct thread_master *m, int (*func) (struct thread *),
654 void *arg, int fd, int dir, debugargdef)
655 {
656 struct thread *thread;
657
658 u_char type;
659 short int event;
660
661 if (dir == THREAD_READ)
662 {
663 event = (POLLIN | POLLHUP);
664 type = THREAD_READ;
665 }
666 else
667 {
668 event = (POLLOUT | POLLHUP);
669 type = THREAD_WRITE;
670 }
671
672 nfds_t queuepos = m->handler.pfdcount;
673 nfds_t i=0;
674 for (i=0; i<m->handler.pfdcount; i++)
675 if (m->handler.pfds[i].fd == fd)
676 {
677 queuepos = i;
678 break;
679 }
680
681 /* is there enough space for a new fd? */
682 assert (queuepos < m->handler.pfdsize);
683
684 thread = thread_get (m, type, func, arg, debugargpass);
685 m->handler.pfds[queuepos].fd = fd;
686 m->handler.pfds[queuepos].events |= event;
687 if (queuepos == m->handler.pfdcount)
688 m->handler.pfdcount++;
689
690 return thread;
691 }
692 #else
693
694 #define fd_copy_fd_set(X) (X)
695 #endif
696
697 static int
698 fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *timer_wait)
699 {
700 int num;
701
702 /* If timer_wait is null here, that means either select() or poll() should
703 * block indefinitely, unless the thread_master has overriden it. select()
704 * and poll() differ in the timeout values they interpret as an indefinite
705 * block; select() requires a null pointer, while poll takes a millisecond
706 * value of -1.
707 *
708 * The thread_master owner has the option of overriding the default behavior
709 * by setting ->selectpoll_timeout. If the value is positive, it specifies
710 * the maximum number of milliseconds to wait. If the timeout is -1, it
711 * specifies that we should never wait and always return immediately even if
712 * no event is detected. If the value is zero, the behavior is default.
713 */
714
715 #if defined(HAVE_POLL_CALL)
716 int timeout = -1;
717
718 if (timer_wait != NULL && m->selectpoll_timeout == 0) // use the default value
719 timeout = (timer_wait->tv_sec*1000) + (timer_wait->tv_usec/1000);
720 else if (m->selectpoll_timeout > 0) // use the user's timeout
721 timeout = m->selectpoll_timeout;
722 else if (m->selectpoll_timeout < 0) // effect a poll (return immediately)
723 timeout = 0;
724
725 num = poll (m->handler.pfds, m->handler.pfdcount + m->handler.pfdcountsnmp, timeout);
726 #else
727 struct timeval timeout;
728
729 if (m->selectpoll_timeout > 0) // use the user's timeout
730 {
731 timeout.tv_sec = m->selectpoll_timeout / 1000;
732 timeout.tv_usec = (m->selectpoll_timeout % 1000) * 1000;
733 timer_wait = &timeout;
734 }
735 else if (m->selectpoll_timeout < 0) // effect a poll (return immediately)
736 {
737 timeout.tv_sec = 0;
738 timeout.tv_usec = 0;
739 timer_wait = &timeout;
740 }
741 num = select (size, read, write, except, timer_wait);
742 #endif
743
744 return num;
745 }
746
747 static int
748 fd_is_set (struct thread *thread, thread_fd_set *fdset, int pos)
749 {
750 #if defined(HAVE_POLL_CALL)
751 return 1;
752 #else
753 return FD_ISSET (THREAD_FD (thread), fdset);
754 #endif
755 }
756
757 static int
758 fd_clear_read_write (struct thread *thread)
759 {
760 #if !defined(HAVE_POLL_CALL)
761 thread_fd_set *fdset = NULL;
762 int fd = THREAD_FD (thread);
763
764 if (thread->type == THREAD_READ)
765 fdset = &thread->master->handler.readfd;
766 else
767 fdset = &thread->master->handler.writefd;
768
769 if (!FD_ISSET (fd, fdset))
770 return 0;
771
772 FD_CLR (fd, fdset);
773 #endif
774 return 1;
775 }
776
777 /* Add new read thread. */
778 struct thread *
779 funcname_thread_add_read_write (int dir, struct thread_master *m,
780 int (*func) (struct thread *), void *arg, int fd, struct thread **t_ptr,
781 debugargdef)
782 {
783 struct thread *thread = NULL;
784
785 pthread_mutex_lock (&m->mtx);
786 {
787 if (t_ptr && *t_ptr) // thread is already scheduled; don't reschedule
788 {
789 pthread_mutex_unlock (&m->mtx);
790 return NULL;
791 }
792
793 #if defined (HAVE_POLL_CALL)
794 thread = generic_thread_add(m, func, arg, fd, dir, debugargpass);
795 #else
796 if (fd >= FD_SETSIZE)
797 {
798 zlog_err ("File descriptor %d is >= FD_SETSIZE (%d). Please recompile"
799 "with --enable-poll=yes", fd, FD_SETSIZE);
800 assert (fd < FD_SETSIZE && !"fd >= FD_SETSIZE");
801 }
802 thread_fd_set *fdset = NULL;
803 if (dir == THREAD_READ)
804 fdset = &m->handler.readfd;
805 else
806 fdset = &m->handler.writefd;
807
808 if (FD_ISSET (fd, fdset))
809 {
810 zlog_warn ("There is already %s fd [%d]",
811 (dir == THREAD_READ) ? "read" : "write", fd);
812 }
813 else
814 {
815 FD_SET (fd, fdset);
816 thread = thread_get (m, dir, func, arg, debugargpass);
817 }
818 #endif
819
820 if (thread)
821 {
822 pthread_mutex_lock (&thread->mtx);
823 {
824 thread->u.fd = fd;
825 if (dir == THREAD_READ)
826 thread_add_fd (m->read, thread);
827 else
828 thread_add_fd (m->write, thread);
829 }
830 pthread_mutex_unlock (&thread->mtx);
831 }
832
833 if (t_ptr)
834 *t_ptr = thread;
835 }
836 pthread_mutex_unlock (&m->mtx);
837
838 return thread;
839 }
840
841 static struct thread *
842 funcname_thread_add_timer_timeval (struct thread_master *m,
843 int (*func) (struct thread *), int type, void *arg,
844 struct timeval *time_relative, struct thread **t_ptr, debugargdef)
845 {
846 struct thread *thread;
847 struct pqueue *queue;
848
849 assert (m != NULL);
850
851 assert (type == THREAD_TIMER || type == THREAD_BACKGROUND);
852 assert (time_relative);
853
854 pthread_mutex_lock (&m->mtx);
855 {
856 if (t_ptr && *t_ptr) // thread is already scheduled; don't reschedule
857 {
858 pthread_mutex_unlock (&m->mtx);
859 return NULL;
860 }
861
862 queue = ((type == THREAD_TIMER) ? m->timer : m->background);
863 thread = thread_get (m, type, func, arg, debugargpass);
864
865 pthread_mutex_lock (&thread->mtx);
866 {
867 monotime(&thread->u.sands);
868 timeradd(&thread->u.sands, time_relative, &thread->u.sands);
869 pqueue_enqueue(thread, queue);
870 }
871 pthread_mutex_unlock (&thread->mtx);
872
873 if (t_ptr)
874 *t_ptr = thread;
875 }
876 pthread_mutex_unlock (&m->mtx);
877
878 return thread;
879 }
880
881
882 /* Add timer event thread. */
883 struct thread *
884 funcname_thread_add_timer (struct thread_master *m,
885 int (*func) (struct thread *), void *arg, long timer,
886 struct thread **t_ptr, debugargdef)
887 {
888 struct timeval trel;
889
890 assert (m != NULL);
891
892 trel.tv_sec = timer;
893 trel.tv_usec = 0;
894
895 return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER, arg, &trel,
896 t_ptr, debugargpass);
897 }
898
899 /* Add timer event thread with "millisecond" resolution */
900 struct thread *
901 funcname_thread_add_timer_msec (struct thread_master *m,
902 int (*func) (struct thread *), void *arg, long timer,
903 struct thread **t_ptr, debugargdef)
904 {
905 struct timeval trel;
906
907 assert (m != NULL);
908
909 trel.tv_sec = timer / 1000;
910 trel.tv_usec = 1000*(timer % 1000);
911
912 return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER, arg, &trel,
913 t_ptr, debugargpass);
914 }
915
916 /* Add timer event thread with "millisecond" resolution */
917 struct thread *
918 funcname_thread_add_timer_tv (struct thread_master *m,
919 int (*func) (struct thread *), void *arg, struct timeval *tv,
920 struct thread **t_ptr, debugargdef)
921 {
922 return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER, arg, tv,
923 t_ptr, debugargpass);
924 }
925
926 /* Add a background thread, with an optional millisec delay */
927 struct thread *
928 funcname_thread_add_background (struct thread_master *m,
929 int (*func) (struct thread *), void *arg, long delay,
930 struct thread **t_ptr, debugargdef)
931 {
932 struct timeval trel;
933
934 assert (m != NULL);
935
936 if (delay)
937 {
938 trel.tv_sec = delay / 1000;
939 trel.tv_usec = 1000*(delay % 1000);
940 }
941 else
942 {
943 trel.tv_sec = 0;
944 trel.tv_usec = 0;
945 }
946
947 return funcname_thread_add_timer_timeval (m, func, THREAD_BACKGROUND, arg,
948 &trel, t_ptr, debugargpass);
949 }
950
951 /* Add simple event thread. */
952 struct thread *
953 funcname_thread_add_event (struct thread_master *m,
954 int (*func) (struct thread *), void *arg, int val,
955 struct thread **t_ptr, debugargdef)
956 {
957 struct thread *thread;
958
959 assert (m != NULL);
960
961 pthread_mutex_lock (&m->mtx);
962 {
963 if (t_ptr && *t_ptr) // thread is already scheduled; don't reschedule
964 {
965 pthread_mutex_unlock (&m->mtx);
966 return NULL;
967 }
968
969 thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass);
970 pthread_mutex_lock (&thread->mtx);
971 {
972 thread->u.val = val;
973 thread_list_add (&m->event, thread);
974 }
975 pthread_mutex_unlock (&thread->mtx);
976
977 if (t_ptr)
978 *t_ptr = thread;
979 }
980 pthread_mutex_unlock (&m->mtx);
981
982 return thread;
983 }
984
985 static void
986 thread_cancel_read_or_write (struct thread *thread, short int state)
987 {
988 #if defined(HAVE_POLL_CALL)
989 nfds_t i;
990
991 for (i=0;i<thread->master->handler.pfdcount;++i)
992 if (thread->master->handler.pfds[i].fd == thread->u.fd)
993 {
994 thread->master->handler.pfds[i].events &= ~(state);
995
996 /* remove thread fds from pfd list */
997 if (thread->master->handler.pfds[i].events == 0)
998 {
999 memmove(thread->master->handler.pfds+i,
1000 thread->master->handler.pfds+i+1,
1001 (thread->master->handler.pfdsize-i-1) * sizeof(struct pollfd));
1002 thread->master->handler.pfdcount--;
1003 return;
1004 }
1005 }
1006 #endif
1007
1008 fd_clear_read_write (thread);
1009 }
1010
1011 /**
1012 * Cancel thread from scheduler.
1013 *
1014 * This function is *NOT* MT-safe. DO NOT call it from any other pthread except
1015 * the one which owns thread->master.
1016 */
1017 void
1018 thread_cancel (struct thread *thread)
1019 {
1020 struct thread_list *list = NULL;
1021 struct pqueue *queue = NULL;
1022 struct thread **thread_array = NULL;
1023
1024 pthread_mutex_lock (&thread->master->mtx);
1025 pthread_mutex_lock (&thread->mtx);
1026
1027 switch (thread->type)
1028 {
1029 case THREAD_READ:
1030 #if defined (HAVE_POLL_CALL)
1031 thread_cancel_read_or_write (thread, POLLIN | POLLHUP);
1032 #else
1033 thread_cancel_read_or_write (thread, 0);
1034 #endif
1035 thread_array = thread->master->read;
1036 break;
1037 case THREAD_WRITE:
1038 #if defined (HAVE_POLL_CALL)
1039 thread_cancel_read_or_write (thread, POLLOUT | POLLHUP);
1040 #else
1041 thread_cancel_read_or_write (thread, 0);
1042 #endif
1043 thread_array = thread->master->write;
1044 break;
1045 case THREAD_TIMER:
1046 queue = thread->master->timer;
1047 break;
1048 case THREAD_EVENT:
1049 list = &thread->master->event;
1050 break;
1051 case THREAD_READY:
1052 list = &thread->master->ready;
1053 break;
1054 case THREAD_BACKGROUND:
1055 queue = thread->master->background;
1056 break;
1057 default:
1058 goto done;
1059 break;
1060 }
1061
1062 if (queue)
1063 {
1064 assert(thread->index >= 0);
1065 pqueue_remove (thread, queue);
1066 }
1067 else if (list)
1068 {
1069 thread_list_delete (list, thread);
1070 }
1071 else if (thread_array)
1072 {
1073 thread_delete_fd (thread_array, thread);
1074 }
1075 else
1076 {
1077 assert(!"Thread should be either in queue or list or array!");
1078 }
1079
1080 thread_add_unuse (thread->master, thread);
1081
1082 done:
1083 pthread_mutex_unlock (&thread->mtx);
1084 pthread_mutex_unlock (&thread->master->mtx);
1085 }
1086
1087 /* Delete all events which has argument value arg. */
1088 unsigned int
1089 thread_cancel_event (struct thread_master *m, void *arg)
1090 {
1091 unsigned int ret = 0;
1092 struct thread *thread;
1093 struct thread *t;
1094
1095 pthread_mutex_lock (&m->mtx);
1096 {
1097 thread = m->event.head;
1098 while (thread)
1099 {
1100 t = thread;
1101 pthread_mutex_lock (&t->mtx);
1102 {
1103 thread = t->next;
1104
1105 if (t->arg == arg)
1106 {
1107 ret++;
1108 thread_list_delete (&m->event, t);
1109 thread_add_unuse (m, t);
1110 }
1111 }
1112 pthread_mutex_unlock (&t->mtx);
1113 }
1114
1115 /* thread can be on the ready list too */
1116 thread = m->ready.head;
1117 while (thread)
1118 {
1119 t = thread;
1120 pthread_mutex_lock (&t->mtx);
1121 {
1122 thread = t->next;
1123
1124 if (t->arg == arg)
1125 {
1126 ret++;
1127 thread_list_delete (&m->ready, t);
1128 thread_add_unuse (m, t);
1129 }
1130 }
1131 pthread_mutex_unlock (&t->mtx);
1132 }
1133 }
1134 pthread_mutex_unlock (&m->mtx);
1135 return ret;
1136 }
1137
1138 static struct timeval *
1139 thread_timer_wait (struct pqueue *queue, struct timeval *timer_val)
1140 {
1141 if (queue->size)
1142 {
1143 struct thread *next_timer = queue->array[0];
1144 monotime_until(&next_timer->u.sands, timer_val);
1145 return timer_val;
1146 }
1147 return NULL;
1148 }
1149
1150 static struct thread *
1151 thread_run (struct thread_master *m, struct thread *thread,
1152 struct thread *fetch)
1153 {
1154 *fetch = *thread;
1155 thread_add_unuse (m, thread);
1156 return fetch;
1157 }
1158
1159 static int
1160 thread_process_fds_helper (struct thread_master *m, struct thread *thread, thread_fd_set *fdset, short int state, int pos)
1161 {
1162 struct thread **thread_array;
1163
1164 if (!thread)
1165 return 0;
1166
1167 if (thread->type == THREAD_READ)
1168 thread_array = m->read;
1169 else
1170 thread_array = m->write;
1171
1172 if (fd_is_set (thread, fdset, pos))
1173 {
1174 fd_clear_read_write (thread);
1175 thread_delete_fd (thread_array, thread);
1176 thread_list_add (&m->ready, thread);
1177 thread->type = THREAD_READY;
1178 #if defined(HAVE_POLL_CALL)
1179 thread->master->handler.pfds[pos].events &= ~(state);
1180 #endif
1181 return 1;
1182 }
1183 return 0;
1184 }
1185
1186 #if defined(HAVE_POLL_CALL)
1187
1188 /* check poll events */
1189 static void
1190 check_pollfds(struct thread_master *m, fd_set *readfd, int num)
1191 {
1192 nfds_t i = 0;
1193 int ready = 0;
1194 for (i = 0; i < m->handler.pfdcount && ready < num ; ++i)
1195 {
1196 /* no event for current fd? immideatly continue */
1197 if(m->handler.pfds[i].revents == 0)
1198 continue;
1199
1200 ready++;
1201
1202 /* POLLIN / POLLOUT process event */
1203 if (m->handler.pfds[i].revents & POLLIN)
1204 thread_process_fds_helper(m, m->read[m->handler.pfds[i].fd], NULL, POLLIN, i);
1205 if (m->handler.pfds[i].revents & POLLOUT)
1206 thread_process_fds_helper(m, m->write[m->handler.pfds[i].fd], NULL, POLLOUT, i);
1207
1208 /* remove fd from list on POLLNVAL */
1209 if (m->handler.pfds[i].revents & POLLNVAL ||
1210 m->handler.pfds[i].revents & POLLHUP)
1211 {
1212 memmove(m->handler.pfds+i,
1213 m->handler.pfds+i+1,
1214 (m->handler.pfdsize-i-1) * sizeof(struct pollfd));
1215 m->handler.pfdcount--;
1216 i--;
1217 }
1218 else
1219 m->handler.pfds[i].revents = 0;
1220 }
1221 }
1222 #endif
1223
1224 static void
1225 thread_process_fds (struct thread_master *m, thread_fd_set *rset, thread_fd_set *wset, int num)
1226 {
1227 #if defined (HAVE_POLL_CALL)
1228 check_pollfds (m, rset, num);
1229 #else
1230 int ready = 0, index;
1231
1232 for (index = 0; index < m->fd_limit && ready < num; ++index)
1233 {
1234 ready += thread_process_fds_helper (m, m->read[index], rset, 0, 0);
1235 ready += thread_process_fds_helper (m, m->write[index], wset, 0, 0);
1236 }
1237 #endif
1238 }
1239
1240 /* Add all timers that have popped to the ready list. */
1241 static unsigned int
1242 thread_timer_process (struct pqueue *queue, struct timeval *timenow)
1243 {
1244 struct thread *thread;
1245 unsigned int ready = 0;
1246
1247 while (queue->size)
1248 {
1249 thread = queue->array[0];
1250 if (timercmp (timenow, &thread->u.sands, <))
1251 return ready;
1252 pqueue_dequeue(queue);
1253 thread->type = THREAD_READY;
1254 thread_list_add (&thread->master->ready, thread);
1255 ready++;
1256 }
1257 return ready;
1258 }
1259
1260 /* process a list en masse, e.g. for event thread lists */
1261 static unsigned int
1262 thread_process (struct thread_list *list)
1263 {
1264 struct thread *thread;
1265 struct thread *next;
1266 unsigned int ready = 0;
1267
1268 for (thread = list->head; thread; thread = next)
1269 {
1270 next = thread->next;
1271 thread_list_delete (list, thread);
1272 thread->type = THREAD_READY;
1273 thread_list_add (&thread->master->ready, thread);
1274 ready++;
1275 }
1276 return ready;
1277 }
1278
1279
1280 /* Fetch next ready thread. */
1281 struct thread *
1282 thread_fetch (struct thread_master *m, struct thread *fetch)
1283 {
1284 struct thread *thread;
1285 thread_fd_set readfd;
1286 thread_fd_set writefd;
1287 thread_fd_set exceptfd;
1288 struct timeval now;
1289 struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 };
1290 struct timeval timer_val_bg;
1291 struct timeval *timer_wait = &timer_val;
1292 struct timeval *timer_wait_bg;
1293
1294 do
1295 {
1296 int num = 0;
1297
1298 /* Signals pre-empt everything */
1299 if (m->handle_signals)
1300 quagga_sigevent_process ();
1301
1302 pthread_mutex_lock (&m->mtx);
1303 /* Drain the ready queue of already scheduled jobs, before scheduling
1304 * more.
1305 */
1306 if ((thread = thread_trim_head (&m->ready)) != NULL)
1307 {
1308 fetch = thread_run (m, thread, fetch);
1309 pthread_mutex_unlock (&m->mtx);
1310 return fetch;
1311 }
1312
1313 /* To be fair to all kinds of threads, and avoid starvation, we
1314 * need to be careful to consider all thread types for scheduling
1315 * in each quanta. I.e. we should not return early from here on.
1316 */
1317
1318 /* Normal event are the next highest priority. */
1319 thread_process (&m->event);
1320
1321 /* Structure copy. */
1322 #if !defined(HAVE_POLL_CALL)
1323 readfd = fd_copy_fd_set(m->handler.readfd);
1324 writefd = fd_copy_fd_set(m->handler.writefd);
1325 exceptfd = fd_copy_fd_set(m->handler.exceptfd);
1326 #endif
1327
1328 /* Calculate select wait timer if nothing else to do */
1329 if (m->ready.count == 0)
1330 {
1331 timer_wait = thread_timer_wait (m->timer, &timer_val);
1332 timer_wait_bg = thread_timer_wait (m->background, &timer_val_bg);
1333
1334 if (timer_wait_bg &&
1335 (!timer_wait || (timercmp (timer_wait, timer_wait_bg, >))))
1336 timer_wait = timer_wait_bg;
1337 }
1338
1339 if (timer_wait && timer_wait->tv_sec < 0)
1340 {
1341 timerclear(&timer_val);
1342 timer_wait = &timer_val;
1343 }
1344
1345 num = fd_select (m, FD_SETSIZE, &readfd, &writefd, &exceptfd, timer_wait);
1346
1347 /* Signals should get quick treatment */
1348 if (num < 0)
1349 {
1350 if (errno == EINTR)
1351 {
1352 pthread_mutex_unlock (&m->mtx);
1353 continue; /* signal received - process it */
1354 }
1355 zlog_warn ("select() error: %s", safe_strerror (errno));
1356 pthread_mutex_unlock (&m->mtx);
1357 return NULL;
1358 }
1359
1360 /* Check foreground timers. Historically, they have had higher
1361 priority than I/O threads, so let's push them onto the ready
1362 list in front of the I/O threads. */
1363 monotime(&now);
1364 thread_timer_process (m->timer, &now);
1365
1366 /* Got IO, process it */
1367 if (num > 0)
1368 thread_process_fds (m, &readfd, &writefd, num);
1369
1370 #if 0
1371 /* If any threads were made ready above (I/O or foreground timer),
1372 perhaps we should avoid adding background timers to the ready
1373 list at this time. If this is code is uncommented, then background
1374 timer threads will not run unless there is nothing else to do. */
1375 if ((thread = thread_trim_head (&m->ready)) != NULL)
1376 {
1377 fetch = thread_run (m, thread, fetch);
1378 pthread_mutex_unlock (&m->mtx);
1379 return fetch;
1380 }
1381 #endif
1382
1383 /* Background timer/events, lowest priority */
1384 thread_timer_process (m->background, &now);
1385
1386 if ((thread = thread_trim_head (&m->ready)) != NULL)
1387 {
1388 fetch = thread_run (m, thread, fetch);
1389 pthread_mutex_unlock (&m->mtx);
1390 return fetch;
1391 }
1392
1393 pthread_mutex_unlock (&m->mtx);
1394
1395 } while (m->spin);
1396
1397 return NULL;
1398 }
1399
1400 unsigned long
1401 thread_consumed_time (RUSAGE_T *now, RUSAGE_T *start, unsigned long *cputime)
1402 {
1403 /* This is 'user + sys' time. */
1404 *cputime = timeval_elapsed (now->cpu.ru_utime, start->cpu.ru_utime) +
1405 timeval_elapsed (now->cpu.ru_stime, start->cpu.ru_stime);
1406 return timeval_elapsed (now->real, start->real);
1407 }
1408
1409 /* We should aim to yield after yield milliseconds, which defaults
1410 to THREAD_YIELD_TIME_SLOT .
1411 Note: we are using real (wall clock) time for this calculation.
1412 It could be argued that CPU time may make more sense in certain
1413 contexts. The things to consider are whether the thread may have
1414 blocked (in which case wall time increases, but CPU time does not),
1415 or whether the system is heavily loaded with other processes competing
1416 for CPU time. On balance, wall clock time seems to make sense.
1417 Plus it has the added benefit that gettimeofday should be faster
1418 than calling getrusage. */
1419 int
1420 thread_should_yield (struct thread *thread)
1421 {
1422 int result;
1423 pthread_mutex_lock (&thread->mtx);
1424 {
1425 result = monotime_since(&thread->real, NULL) > (int64_t)thread->yield;
1426 }
1427 pthread_mutex_unlock (&thread->mtx);
1428 return result;
1429 }
1430
1431 void
1432 thread_set_yield_time (struct thread *thread, unsigned long yield_time)
1433 {
1434 pthread_mutex_lock (&thread->mtx);
1435 {
1436 thread->yield = yield_time;
1437 }
1438 pthread_mutex_unlock (&thread->mtx);
1439 }
1440
1441 void
1442 thread_getrusage (RUSAGE_T *r)
1443 {
1444 monotime(&r->real);
1445 getrusage(RUSAGE_SELF, &(r->cpu));
1446 }
1447
1448 struct thread *thread_current = NULL;
1449
1450 /* We check thread consumed time. If the system has getrusage, we'll
1451 use that to get in-depth stats on the performance of the thread in addition
1452 to wall clock time stats from gettimeofday. */
1453 void
1454 thread_call (struct thread *thread)
1455 {
1456 unsigned long realtime, cputime;
1457 RUSAGE_T before, after;
1458
1459 GETRUSAGE (&before);
1460 thread->real = before.real;
1461
1462 thread_current = thread;
1463 (*thread->func) (thread);
1464 thread_current = NULL;
1465
1466 GETRUSAGE (&after);
1467
1468 realtime = thread_consumed_time (&after, &before, &cputime);
1469 thread->hist->real.total += realtime;
1470 if (thread->hist->real.max < realtime)
1471 thread->hist->real.max = realtime;
1472 thread->hist->cpu.total += cputime;
1473 if (thread->hist->cpu.max < cputime)
1474 thread->hist->cpu.max = cputime;
1475
1476 ++(thread->hist->total_calls);
1477 thread->hist->types |= (1 << thread->add_type);
1478
1479 #ifdef CONSUMED_TIME_CHECK
1480 if (realtime > CONSUMED_TIME_CHECK)
1481 {
1482 /*
1483 * We have a CPU Hog on our hands.
1484 * Whinge about it now, so we're aware this is yet another task
1485 * to fix.
1486 */
1487 zlog_warn ("SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1488 thread->funcname,
1489 (unsigned long) thread->func,
1490 realtime/1000, cputime/1000);
1491 }
1492 #endif /* CONSUMED_TIME_CHECK */
1493 }
1494
1495 /* Execute thread */
1496 struct thread *
1497 funcname_thread_execute (struct thread_master *m,
1498 int (*func)(struct thread *),
1499 void *arg,
1500 int val,
1501 debugargdef)
1502 {
1503 struct cpu_thread_history tmp;
1504 struct thread dummy;
1505
1506 memset (&dummy, 0, sizeof (struct thread));
1507
1508 pthread_mutex_init (&dummy.mtx, NULL);
1509 dummy.type = THREAD_EVENT;
1510 dummy.add_type = THREAD_EXECUTE;
1511 dummy.master = NULL;
1512 dummy.arg = arg;
1513 dummy.u.val = val;
1514
1515 tmp.func = dummy.func = func;
1516 tmp.funcname = dummy.funcname = funcname;
1517 pthread_mutex_lock (&cpu_record_mtx);
1518 {
1519 dummy.hist = hash_get (cpu_record, &tmp,
1520 (void * (*) (void *))cpu_record_hash_alloc);
1521 }
1522 pthread_mutex_unlock (&cpu_record_mtx);
1523
1524 dummy.schedfrom = schedfrom;
1525 dummy.schedfrom_line = fromln;
1526
1527 thread_call (&dummy);
1528
1529 return NULL;
1530 }