]> git.proxmox.com Git - mirror_frr.git/blob - lib/thread.c
lib: reduce exported var symbols
[mirror_frr.git] / lib / thread.c
1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
21 /* #define DEBUG */
22
23 #include <zebra.h>
24 #include <sys/resource.h>
25
26 #include "thread.h"
27 #include "memory.h"
28 #include "log.h"
29 #include "hash.h"
30 #include "pqueue.h"
31 #include "command.h"
32 #include "sigevent.h"
33 #include "network.h"
34 #include "jhash.h"
35 #include "frratomic.h"
36 #include "lib_errors.h"
37
38 DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
39 DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
40 DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
41 DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
42
43 #if defined(__APPLE__)
44 #include <mach/mach.h>
45 #include <mach/mach_time.h>
46 #endif
47
48 #define AWAKEN(m) \
49 do { \
50 static unsigned char wakebyte = 0x01; \
51 write(m->io_pipe[1], &wakebyte, 1); \
52 } while (0);
53
54 /* control variable for initializer */
55 static pthread_once_t init_once = PTHREAD_ONCE_INIT;
56 pthread_key_t thread_current;
57
58 static pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
59 static struct list *masters;
60
61 static void thread_free(struct thread_master *master, struct thread *thread);
62
63 /* CLI start ---------------------------------------------------------------- */
64 static unsigned int cpu_record_hash_key(struct cpu_thread_history *a)
65 {
66 int size = sizeof(a->func);
67
68 return jhash(&a->func, size, 0);
69 }
70
71 static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
72 const struct cpu_thread_history *b)
73 {
74 return a->func == b->func;
75 }
76
77 static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
78 {
79 struct cpu_thread_history *new;
80 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
81 new->func = a->func;
82 new->funcname = a->funcname;
83 return new;
84 }
85
86 static void cpu_record_hash_free(void *a)
87 {
88 struct cpu_thread_history *hist = a;
89
90 XFREE(MTYPE_THREAD_STATS, hist);
91 }
92
93 static void vty_out_cpu_thread_history(struct vty *vty,
94 struct cpu_thread_history *a)
95 {
96 vty_out(vty, "%5"PRIdFAST32" %10lu.%03lu %9"PRIuFAST32
97 " %8lu %9lu %8lu %9lu", a->total_active,
98 a->cpu.total / 1000, a->cpu.total % 1000, a->total_calls,
99 a->cpu.total / a->total_calls, a->cpu.max,
100 a->real.total / a->total_calls, a->real.max);
101 vty_out(vty, " %c%c%c%c%c %s\n",
102 a->types & (1 << THREAD_READ) ? 'R' : ' ',
103 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
104 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
105 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
106 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
107 }
108
109 static void cpu_record_hash_print(struct hash_bucket *bucket, void *args[])
110 {
111 struct cpu_thread_history *totals = args[0];
112 struct cpu_thread_history copy;
113 struct vty *vty = args[1];
114 uint8_t *filter = args[2];
115
116 struct cpu_thread_history *a = bucket->data;
117
118 copy.total_active =
119 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
120 copy.total_calls =
121 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
122 copy.cpu.total =
123 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
124 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
125 copy.real.total =
126 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
127 copy.real.max =
128 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
129 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
130 copy.funcname = a->funcname;
131
132 if (!(copy.types & *filter))
133 return;
134
135 vty_out_cpu_thread_history(vty, &copy);
136 totals->total_active += copy.total_active;
137 totals->total_calls += copy.total_calls;
138 totals->real.total += copy.real.total;
139 if (totals->real.max < copy.real.max)
140 totals->real.max = copy.real.max;
141 totals->cpu.total += copy.cpu.total;
142 if (totals->cpu.max < copy.cpu.max)
143 totals->cpu.max = copy.cpu.max;
144 }
145
146 static void cpu_record_print(struct vty *vty, uint8_t filter)
147 {
148 struct cpu_thread_history tmp;
149 void *args[3] = {&tmp, vty, &filter};
150 struct thread_master *m;
151 struct listnode *ln;
152
153 memset(&tmp, 0, sizeof tmp);
154 tmp.funcname = "TOTAL";
155 tmp.types = filter;
156
157 pthread_mutex_lock(&masters_mtx);
158 {
159 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
160 const char *name = m->name ? m->name : "main";
161
162 char underline[strlen(name) + 1];
163 memset(underline, '-', sizeof(underline));
164 underline[sizeof(underline) - 1] = '\0';
165
166 vty_out(vty, "\n");
167 vty_out(vty, "Showing statistics for pthread %s\n",
168 name);
169 vty_out(vty, "-------------------------------%s\n",
170 underline);
171 vty_out(vty, "%21s %18s %18s\n", "",
172 "CPU (user+system):", "Real (wall-clock):");
173 vty_out(vty,
174 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
175 vty_out(vty, " Avg uSec Max uSecs");
176 vty_out(vty, " Type Thread\n");
177
178 if (m->cpu_record->count)
179 hash_iterate(
180 m->cpu_record,
181 (void (*)(struct hash_bucket *,
182 void *))cpu_record_hash_print,
183 args);
184 else
185 vty_out(vty, "No data to display yet.\n");
186
187 vty_out(vty, "\n");
188 }
189 }
190 pthread_mutex_unlock(&masters_mtx);
191
192 vty_out(vty, "\n");
193 vty_out(vty, "Total thread statistics\n");
194 vty_out(vty, "-------------------------\n");
195 vty_out(vty, "%21s %18s %18s\n", "",
196 "CPU (user+system):", "Real (wall-clock):");
197 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
198 vty_out(vty, " Avg uSec Max uSecs");
199 vty_out(vty, " Type Thread\n");
200
201 if (tmp.total_calls > 0)
202 vty_out_cpu_thread_history(vty, &tmp);
203 }
204
205 static void cpu_record_hash_clear(struct hash_bucket *bucket, void *args[])
206 {
207 uint8_t *filter = args[0];
208 struct hash *cpu_record = args[1];
209
210 struct cpu_thread_history *a = bucket->data;
211
212 if (!(a->types & *filter))
213 return;
214
215 hash_release(cpu_record, bucket->data);
216 }
217
218 static void cpu_record_clear(uint8_t filter)
219 {
220 uint8_t *tmp = &filter;
221 struct thread_master *m;
222 struct listnode *ln;
223
224 pthread_mutex_lock(&masters_mtx);
225 {
226 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
227 pthread_mutex_lock(&m->mtx);
228 {
229 void *args[2] = {tmp, m->cpu_record};
230 hash_iterate(
231 m->cpu_record,
232 (void (*)(struct hash_bucket *,
233 void *))cpu_record_hash_clear,
234 args);
235 }
236 pthread_mutex_unlock(&m->mtx);
237 }
238 }
239 pthread_mutex_unlock(&masters_mtx);
240 }
241
242 static uint8_t parse_filter(const char *filterstr)
243 {
244 int i = 0;
245 int filter = 0;
246
247 while (filterstr[i] != '\0') {
248 switch (filterstr[i]) {
249 case 'r':
250 case 'R':
251 filter |= (1 << THREAD_READ);
252 break;
253 case 'w':
254 case 'W':
255 filter |= (1 << THREAD_WRITE);
256 break;
257 case 't':
258 case 'T':
259 filter |= (1 << THREAD_TIMER);
260 break;
261 case 'e':
262 case 'E':
263 filter |= (1 << THREAD_EVENT);
264 break;
265 case 'x':
266 case 'X':
267 filter |= (1 << THREAD_EXECUTE);
268 break;
269 default:
270 break;
271 }
272 ++i;
273 }
274 return filter;
275 }
276
277 DEFUN (show_thread_cpu,
278 show_thread_cpu_cmd,
279 "show thread cpu [FILTER]",
280 SHOW_STR
281 "Thread information\n"
282 "Thread CPU usage\n"
283 "Display filter (rwtexb)\n")
284 {
285 uint8_t filter = (uint8_t)-1U;
286 int idx = 0;
287
288 if (argv_find(argv, argc, "FILTER", &idx)) {
289 filter = parse_filter(argv[idx]->arg);
290 if (!filter) {
291 vty_out(vty,
292 "Invalid filter \"%s\" specified; must contain at least"
293 "one of 'RWTEXB'\n",
294 argv[idx]->arg);
295 return CMD_WARNING;
296 }
297 }
298
299 cpu_record_print(vty, filter);
300 return CMD_SUCCESS;
301 }
302
303 static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
304 {
305 const char *name = m->name ? m->name : "main";
306 char underline[strlen(name) + 1];
307 uint32_t i;
308
309 memset(underline, '-', sizeof(underline));
310 underline[sizeof(underline) - 1] = '\0';
311
312 vty_out(vty, "\nShowing poll FD's for %s\n", name);
313 vty_out(vty, "----------------------%s\n", underline);
314 vty_out(vty, "Count: %u\n", (uint32_t)m->handler.pfdcount);
315 for (i = 0; i < m->handler.pfdcount; i++)
316 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\n", i,
317 m->handler.pfds[i].fd,
318 m->handler.pfds[i].events,
319 m->handler.pfds[i].revents);
320 }
321
322 DEFUN (show_thread_poll,
323 show_thread_poll_cmd,
324 "show thread poll",
325 SHOW_STR
326 "Thread information\n"
327 "Show poll FD's and information\n")
328 {
329 struct listnode *node;
330 struct thread_master *m;
331
332 pthread_mutex_lock(&masters_mtx);
333 {
334 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
335 show_thread_poll_helper(vty, m);
336 }
337 }
338 pthread_mutex_unlock(&masters_mtx);
339
340 return CMD_SUCCESS;
341 }
342
343
344 DEFUN (clear_thread_cpu,
345 clear_thread_cpu_cmd,
346 "clear thread cpu [FILTER]",
347 "Clear stored data in all pthreads\n"
348 "Thread information\n"
349 "Thread CPU usage\n"
350 "Display filter (rwtexb)\n")
351 {
352 uint8_t filter = (uint8_t)-1U;
353 int idx = 0;
354
355 if (argv_find(argv, argc, "FILTER", &idx)) {
356 filter = parse_filter(argv[idx]->arg);
357 if (!filter) {
358 vty_out(vty,
359 "Invalid filter \"%s\" specified; must contain at least"
360 "one of 'RWTEXB'\n",
361 argv[idx]->arg);
362 return CMD_WARNING;
363 }
364 }
365
366 cpu_record_clear(filter);
367 return CMD_SUCCESS;
368 }
369
370 void thread_cmd_init(void)
371 {
372 install_element(VIEW_NODE, &show_thread_cpu_cmd);
373 install_element(VIEW_NODE, &show_thread_poll_cmd);
374 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
375 }
376 /* CLI end ------------------------------------------------------------------ */
377
378
379 static int thread_timer_cmp(void *a, void *b)
380 {
381 struct thread *thread_a = a;
382 struct thread *thread_b = b;
383
384 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, <))
385 return -1;
386 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, >))
387 return 1;
388 return 0;
389 }
390
391 static void thread_timer_update(void *node, int actual_position)
392 {
393 struct thread *thread = node;
394
395 thread->index = actual_position;
396 }
397
398 static void cancelreq_del(void *cr)
399 {
400 XFREE(MTYPE_TMP, cr);
401 }
402
403 /* initializer, only ever called once */
404 static void initializer(void)
405 {
406 pthread_key_create(&thread_current, NULL);
407 }
408
409 struct thread_master *thread_master_create(const char *name)
410 {
411 struct thread_master *rv;
412 struct rlimit limit;
413
414 pthread_once(&init_once, &initializer);
415
416 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
417
418 /* Initialize master mutex */
419 pthread_mutex_init(&rv->mtx, NULL);
420 pthread_cond_init(&rv->cancel_cond, NULL);
421
422 /* Set name */
423 rv->name = name ? XSTRDUP(MTYPE_THREAD_MASTER, name) : NULL;
424
425 /* Initialize I/O task data structures */
426 getrlimit(RLIMIT_NOFILE, &limit);
427 rv->fd_limit = (int)limit.rlim_cur;
428 rv->read = XCALLOC(MTYPE_THREAD_POLL,
429 sizeof(struct thread *) * rv->fd_limit);
430
431 rv->write = XCALLOC(MTYPE_THREAD_POLL,
432 sizeof(struct thread *) * rv->fd_limit);
433
434 rv->cpu_record = hash_create_size(
435 8, (unsigned int (*)(void *))cpu_record_hash_key,
436 (bool (*)(const void *, const void *))cpu_record_hash_cmp,
437 "Thread Hash");
438
439
440 /* Initialize the timer queues */
441 rv->timer = pqueue_create();
442 rv->timer->cmp = thread_timer_cmp;
443 rv->timer->update = thread_timer_update;
444
445 /* Initialize thread_fetch() settings */
446 rv->spin = true;
447 rv->handle_signals = true;
448
449 /* Set pthread owner, should be updated by actual owner */
450 rv->owner = pthread_self();
451 rv->cancel_req = list_new();
452 rv->cancel_req->del = cancelreq_del;
453 rv->canceled = true;
454
455 /* Initialize pipe poker */
456 pipe(rv->io_pipe);
457 set_nonblocking(rv->io_pipe[0]);
458 set_nonblocking(rv->io_pipe[1]);
459
460 /* Initialize data structures for poll() */
461 rv->handler.pfdsize = rv->fd_limit;
462 rv->handler.pfdcount = 0;
463 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
464 sizeof(struct pollfd) * rv->handler.pfdsize);
465 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
466 sizeof(struct pollfd) * rv->handler.pfdsize);
467
468 /* add to list of threadmasters */
469 pthread_mutex_lock(&masters_mtx);
470 {
471 if (!masters)
472 masters = list_new();
473
474 listnode_add(masters, rv);
475 }
476 pthread_mutex_unlock(&masters_mtx);
477
478 return rv;
479 }
480
481 void thread_master_set_name(struct thread_master *master, const char *name)
482 {
483 pthread_mutex_lock(&master->mtx);
484 {
485 XFREE(MTYPE_THREAD_MASTER, master->name);
486 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
487 }
488 pthread_mutex_unlock(&master->mtx);
489 }
490
491 /* Add a new thread to the list. */
492 static void thread_list_add(struct thread_list *list, struct thread *thread)
493 {
494 thread->next = NULL;
495 thread->prev = list->tail;
496 if (list->tail)
497 list->tail->next = thread;
498 else
499 list->head = thread;
500 list->tail = thread;
501 list->count++;
502 }
503
504 /* Delete a thread from the list. */
505 static struct thread *thread_list_delete(struct thread_list *list,
506 struct thread *thread)
507 {
508 if (thread->next)
509 thread->next->prev = thread->prev;
510 else
511 list->tail = thread->prev;
512 if (thread->prev)
513 thread->prev->next = thread->next;
514 else
515 list->head = thread->next;
516 thread->next = thread->prev = NULL;
517 list->count--;
518 return thread;
519 }
520
521 /* Thread list is empty or not. */
522 static int thread_empty(struct thread_list *list)
523 {
524 return list->head ? 0 : 1;
525 }
526
527 /* Delete top of the list and return it. */
528 static struct thread *thread_trim_head(struct thread_list *list)
529 {
530 if (!thread_empty(list))
531 return thread_list_delete(list, list->head);
532 return NULL;
533 }
534
535 #define THREAD_UNUSED_DEPTH 10
536
537 /* Move thread to unuse list. */
538 static void thread_add_unuse(struct thread_master *m, struct thread *thread)
539 {
540 pthread_mutex_t mtxc = thread->mtx;
541
542 assert(m != NULL && thread != NULL);
543 assert(thread->next == NULL);
544 assert(thread->prev == NULL);
545
546 thread->hist->total_active--;
547 memset(thread, 0, sizeof(struct thread));
548 thread->type = THREAD_UNUSED;
549
550 /* Restore the thread mutex context. */
551 thread->mtx = mtxc;
552
553 if (m->unuse.count < THREAD_UNUSED_DEPTH) {
554 thread_list_add(&m->unuse, thread);
555 return;
556 }
557
558 thread_free(m, thread);
559 }
560
561 /* Free all unused thread. */
562 static void thread_list_free(struct thread_master *m, struct thread_list *list)
563 {
564 struct thread *t;
565 struct thread *next;
566
567 for (t = list->head; t; t = next) {
568 next = t->next;
569 thread_free(m, t);
570 list->count--;
571 }
572 }
573
574 static void thread_array_free(struct thread_master *m,
575 struct thread **thread_array)
576 {
577 struct thread *t;
578 int index;
579
580 for (index = 0; index < m->fd_limit; ++index) {
581 t = thread_array[index];
582 if (t) {
583 thread_array[index] = NULL;
584 thread_free(m, t);
585 }
586 }
587 XFREE(MTYPE_THREAD_POLL, thread_array);
588 }
589
590 static void thread_queue_free(struct thread_master *m, struct pqueue *queue)
591 {
592 int i;
593
594 for (i = 0; i < queue->size; i++)
595 thread_free(m, queue->array[i]);
596
597 pqueue_delete(queue);
598 }
599
600 /*
601 * thread_master_free_unused
602 *
603 * As threads are finished with they are put on the
604 * unuse list for later reuse.
605 * If we are shutting down, Free up unused threads
606 * So we can see if we forget to shut anything off
607 */
608 void thread_master_free_unused(struct thread_master *m)
609 {
610 pthread_mutex_lock(&m->mtx);
611 {
612 struct thread *t;
613 while ((t = thread_trim_head(&m->unuse)) != NULL) {
614 thread_free(m, t);
615 }
616 }
617 pthread_mutex_unlock(&m->mtx);
618 }
619
620 /* Stop thread scheduler. */
621 void thread_master_free(struct thread_master *m)
622 {
623 pthread_mutex_lock(&masters_mtx);
624 {
625 listnode_delete(masters, m);
626 if (masters->count == 0) {
627 list_delete(&masters);
628 }
629 }
630 pthread_mutex_unlock(&masters_mtx);
631
632 thread_array_free(m, m->read);
633 thread_array_free(m, m->write);
634 thread_queue_free(m, m->timer);
635 thread_list_free(m, &m->event);
636 thread_list_free(m, &m->ready);
637 thread_list_free(m, &m->unuse);
638 pthread_mutex_destroy(&m->mtx);
639 pthread_cond_destroy(&m->cancel_cond);
640 close(m->io_pipe[0]);
641 close(m->io_pipe[1]);
642 list_delete(&m->cancel_req);
643 m->cancel_req = NULL;
644
645 hash_clean(m->cpu_record, cpu_record_hash_free);
646 hash_free(m->cpu_record);
647 m->cpu_record = NULL;
648
649 XFREE(MTYPE_THREAD_MASTER, m->name);
650 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
651 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
652 XFREE(MTYPE_THREAD_MASTER, m);
653 }
654
655 /* Return remain time in miliseconds. */
656 unsigned long thread_timer_remain_msec(struct thread *thread)
657 {
658 int64_t remain;
659
660 pthread_mutex_lock(&thread->mtx);
661 {
662 remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
663 }
664 pthread_mutex_unlock(&thread->mtx);
665
666 return remain < 0 ? 0 : remain;
667 }
668
669 /* Return remain time in seconds. */
670 unsigned long thread_timer_remain_second(struct thread *thread)
671 {
672 return thread_timer_remain_msec(thread) / 1000LL;
673 }
674
675 #define debugargdef const char *funcname, const char *schedfrom, int fromln
676 #define debugargpass funcname, schedfrom, fromln
677
678 struct timeval thread_timer_remain(struct thread *thread)
679 {
680 struct timeval remain;
681 pthread_mutex_lock(&thread->mtx);
682 {
683 monotime_until(&thread->u.sands, &remain);
684 }
685 pthread_mutex_unlock(&thread->mtx);
686 return remain;
687 }
688
689 /* Get new thread. */
690 static struct thread *thread_get(struct thread_master *m, uint8_t type,
691 int (*func)(struct thread *), void *arg,
692 debugargdef)
693 {
694 struct thread *thread = thread_trim_head(&m->unuse);
695 struct cpu_thread_history tmp;
696
697 if (!thread) {
698 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
699 /* mutex only needs to be initialized at struct creation. */
700 pthread_mutex_init(&thread->mtx, NULL);
701 m->alloc++;
702 }
703
704 thread->type = type;
705 thread->add_type = type;
706 thread->master = m;
707 thread->arg = arg;
708 thread->index = -1;
709 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
710 thread->ref = NULL;
711
712 /*
713 * So if the passed in funcname is not what we have
714 * stored that means the thread->hist needs to be
715 * updated. We keep the last one around in unused
716 * under the assumption that we are probably
717 * going to immediately allocate the same
718 * type of thread.
719 * This hopefully saves us some serious
720 * hash_get lookups.
721 */
722 if (thread->funcname != funcname || thread->func != func) {
723 tmp.func = func;
724 tmp.funcname = funcname;
725 thread->hist =
726 hash_get(m->cpu_record, &tmp,
727 (void *(*)(void *))cpu_record_hash_alloc);
728 }
729 thread->hist->total_active++;
730 thread->func = func;
731 thread->funcname = funcname;
732 thread->schedfrom = schedfrom;
733 thread->schedfrom_line = fromln;
734
735 return thread;
736 }
737
738 static void thread_free(struct thread_master *master, struct thread *thread)
739 {
740 /* Update statistics. */
741 assert(master->alloc > 0);
742 master->alloc--;
743
744 /* Free allocated resources. */
745 pthread_mutex_destroy(&thread->mtx);
746 XFREE(MTYPE_THREAD, thread);
747 }
748
749 static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
750 nfds_t count, const struct timeval *timer_wait)
751 {
752 /* If timer_wait is null here, that means poll() should block
753 * indefinitely,
754 * unless the thread_master has overriden it by setting
755 * ->selectpoll_timeout.
756 * If the value is positive, it specifies the maximum number of
757 * milliseconds
758 * to wait. If the timeout is -1, it specifies that we should never wait
759 * and
760 * always return immediately even if no event is detected. If the value
761 * is
762 * zero, the behavior is default. */
763 int timeout = -1;
764
765 /* number of file descriptors with events */
766 int num;
767
768 if (timer_wait != NULL
769 && m->selectpoll_timeout == 0) // use the default value
770 timeout = (timer_wait->tv_sec * 1000)
771 + (timer_wait->tv_usec / 1000);
772 else if (m->selectpoll_timeout > 0) // use the user's timeout
773 timeout = m->selectpoll_timeout;
774 else if (m->selectpoll_timeout
775 < 0) // effect a poll (return immediately)
776 timeout = 0;
777
778 /* add poll pipe poker */
779 assert(count + 1 < pfdsize);
780 pfds[count].fd = m->io_pipe[0];
781 pfds[count].events = POLLIN;
782 pfds[count].revents = 0x00;
783
784 num = poll(pfds, count + 1, timeout);
785
786 unsigned char trash[64];
787 if (num > 0 && pfds[count].revents != 0 && num--)
788 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
789 ;
790
791 return num;
792 }
793
794 /* Add new read thread. */
795 struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
796 int (*func)(struct thread *),
797 void *arg, int fd,
798 struct thread **t_ptr,
799 debugargdef)
800 {
801 struct thread *thread = NULL;
802
803 assert(fd >= 0 && fd < m->fd_limit);
804 pthread_mutex_lock(&m->mtx);
805 {
806 if (t_ptr
807 && *t_ptr) // thread is already scheduled; don't reschedule
808 {
809 pthread_mutex_unlock(&m->mtx);
810 return NULL;
811 }
812
813 /* default to a new pollfd */
814 nfds_t queuepos = m->handler.pfdcount;
815
816 /* if we already have a pollfd for our file descriptor, find and
817 * use it */
818 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
819 if (m->handler.pfds[i].fd == fd) {
820 queuepos = i;
821 break;
822 }
823
824 /* make sure we have room for this fd + pipe poker fd */
825 assert(queuepos + 1 < m->handler.pfdsize);
826
827 thread = thread_get(m, dir, func, arg, debugargpass);
828
829 m->handler.pfds[queuepos].fd = fd;
830 m->handler.pfds[queuepos].events |=
831 (dir == THREAD_READ ? POLLIN : POLLOUT);
832
833 if (queuepos == m->handler.pfdcount)
834 m->handler.pfdcount++;
835
836 if (thread) {
837 pthread_mutex_lock(&thread->mtx);
838 {
839 thread->u.fd = fd;
840 if (dir == THREAD_READ)
841 m->read[thread->u.fd] = thread;
842 else
843 m->write[thread->u.fd] = thread;
844 }
845 pthread_mutex_unlock(&thread->mtx);
846
847 if (t_ptr) {
848 *t_ptr = thread;
849 thread->ref = t_ptr;
850 }
851 }
852
853 AWAKEN(m);
854 }
855 pthread_mutex_unlock(&m->mtx);
856
857 return thread;
858 }
859
860 static struct thread *
861 funcname_thread_add_timer_timeval(struct thread_master *m,
862 int (*func)(struct thread *), int type,
863 void *arg, struct timeval *time_relative,
864 struct thread **t_ptr, debugargdef)
865 {
866 struct thread *thread;
867 struct pqueue *queue;
868
869 assert(m != NULL);
870
871 assert(type == THREAD_TIMER);
872 assert(time_relative);
873
874 pthread_mutex_lock(&m->mtx);
875 {
876 if (t_ptr
877 && *t_ptr) // thread is already scheduled; don't reschedule
878 {
879 pthread_mutex_unlock(&m->mtx);
880 return NULL;
881 }
882
883 queue = m->timer;
884 thread = thread_get(m, type, func, arg, debugargpass);
885
886 pthread_mutex_lock(&thread->mtx);
887 {
888 monotime(&thread->u.sands);
889 timeradd(&thread->u.sands, time_relative,
890 &thread->u.sands);
891 pqueue_enqueue(thread, queue);
892 if (t_ptr) {
893 *t_ptr = thread;
894 thread->ref = t_ptr;
895 }
896 }
897 pthread_mutex_unlock(&thread->mtx);
898
899 AWAKEN(m);
900 }
901 pthread_mutex_unlock(&m->mtx);
902
903 return thread;
904 }
905
906
907 /* Add timer event thread. */
908 struct thread *funcname_thread_add_timer(struct thread_master *m,
909 int (*func)(struct thread *),
910 void *arg, long timer,
911 struct thread **t_ptr, debugargdef)
912 {
913 struct timeval trel;
914
915 assert(m != NULL);
916
917 trel.tv_sec = timer;
918 trel.tv_usec = 0;
919
920 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
921 &trel, t_ptr, debugargpass);
922 }
923
924 /* Add timer event thread with "millisecond" resolution */
925 struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
926 int (*func)(struct thread *),
927 void *arg, long timer,
928 struct thread **t_ptr,
929 debugargdef)
930 {
931 struct timeval trel;
932
933 assert(m != NULL);
934
935 trel.tv_sec = timer / 1000;
936 trel.tv_usec = 1000 * (timer % 1000);
937
938 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
939 &trel, t_ptr, debugargpass);
940 }
941
942 /* Add timer event thread with "millisecond" resolution */
943 struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
944 int (*func)(struct thread *),
945 void *arg, struct timeval *tv,
946 struct thread **t_ptr, debugargdef)
947 {
948 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
949 t_ptr, debugargpass);
950 }
951
952 /* Add simple event thread. */
953 struct thread *funcname_thread_add_event(struct thread_master *m,
954 int (*func)(struct thread *),
955 void *arg, int val,
956 struct thread **t_ptr, debugargdef)
957 {
958 struct thread *thread;
959
960 assert(m != NULL);
961
962 pthread_mutex_lock(&m->mtx);
963 {
964 if (t_ptr
965 && *t_ptr) // thread is already scheduled; don't reschedule
966 {
967 pthread_mutex_unlock(&m->mtx);
968 return NULL;
969 }
970
971 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
972 pthread_mutex_lock(&thread->mtx);
973 {
974 thread->u.val = val;
975 thread_list_add(&m->event, thread);
976 }
977 pthread_mutex_unlock(&thread->mtx);
978
979 if (t_ptr) {
980 *t_ptr = thread;
981 thread->ref = t_ptr;
982 }
983
984 AWAKEN(m);
985 }
986 pthread_mutex_unlock(&m->mtx);
987
988 return thread;
989 }
990
991 /* Thread cancellation ------------------------------------------------------ */
992
993 /**
994 * NOT's out the .events field of pollfd corresponding to the given file
995 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
996 *
997 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
998 * implementation for details.
999 *
1000 * @param master
1001 * @param fd
1002 * @param state the event to cancel. One or more (OR'd together) of the
1003 * following:
1004 * - POLLIN
1005 * - POLLOUT
1006 */
1007 static void thread_cancel_rw(struct thread_master *master, int fd, short state)
1008 {
1009 bool found = false;
1010
1011 /* Cancel POLLHUP too just in case some bozo set it */
1012 state |= POLLHUP;
1013
1014 /* find the index of corresponding pollfd */
1015 nfds_t i;
1016
1017 for (i = 0; i < master->handler.pfdcount; i++)
1018 if (master->handler.pfds[i].fd == fd) {
1019 found = true;
1020 break;
1021 }
1022
1023 if (!found) {
1024 zlog_debug(
1025 "[!] Received cancellation request for nonexistent rw job");
1026 zlog_debug("[!] threadmaster: %s | fd: %d",
1027 master->name ? master->name : "", fd);
1028 return;
1029 }
1030
1031 /* NOT out event. */
1032 master->handler.pfds[i].events &= ~(state);
1033
1034 /* If all events are canceled, delete / resize the pollfd array. */
1035 if (master->handler.pfds[i].events == 0) {
1036 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
1037 (master->handler.pfdcount - i - 1)
1038 * sizeof(struct pollfd));
1039 master->handler.pfdcount--;
1040 }
1041
1042 /* If we have the same pollfd in the copy, perform the same operations,
1043 * otherwise return. */
1044 if (i >= master->handler.copycount)
1045 return;
1046
1047 master->handler.copy[i].events &= ~(state);
1048
1049 if (master->handler.copy[i].events == 0) {
1050 memmove(master->handler.copy + i, master->handler.copy + i + 1,
1051 (master->handler.copycount - i - 1)
1052 * sizeof(struct pollfd));
1053 master->handler.copycount--;
1054 }
1055 }
1056
1057 /**
1058 * Process cancellation requests.
1059 *
1060 * This may only be run from the pthread which owns the thread_master.
1061 *
1062 * @param master the thread master to process
1063 * @REQUIRE master->mtx
1064 */
1065 static void do_thread_cancel(struct thread_master *master)
1066 {
1067 struct thread_list *list = NULL;
1068 struct pqueue *queue = NULL;
1069 struct thread **thread_array = NULL;
1070 struct thread *thread;
1071
1072 struct cancel_req *cr;
1073 struct listnode *ln;
1074 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1075 /* If this is an event object cancellation, linear search
1076 * through event
1077 * list deleting any events which have the specified argument.
1078 * We also
1079 * need to check every thread in the ready queue. */
1080 if (cr->eventobj) {
1081 struct thread *t;
1082 thread = master->event.head;
1083
1084 while (thread) {
1085 t = thread;
1086 thread = t->next;
1087
1088 if (t->arg == cr->eventobj) {
1089 thread_list_delete(&master->event, t);
1090 if (t->ref)
1091 *t->ref = NULL;
1092 thread_add_unuse(master, t);
1093 }
1094 }
1095
1096 thread = master->ready.head;
1097 while (thread) {
1098 t = thread;
1099 thread = t->next;
1100
1101 if (t->arg == cr->eventobj) {
1102 thread_list_delete(&master->ready, t);
1103 if (t->ref)
1104 *t->ref = NULL;
1105 thread_add_unuse(master, t);
1106 }
1107 }
1108 continue;
1109 }
1110
1111 /* The pointer varies depending on whether the cancellation
1112 * request was
1113 * made asynchronously or not. If it was, we need to check
1114 * whether the
1115 * thread even exists anymore before cancelling it. */
1116 thread = (cr->thread) ? cr->thread : *cr->threadref;
1117
1118 if (!thread)
1119 continue;
1120
1121 /* Determine the appropriate queue to cancel the thread from */
1122 switch (thread->type) {
1123 case THREAD_READ:
1124 thread_cancel_rw(master, thread->u.fd, POLLIN);
1125 thread_array = master->read;
1126 break;
1127 case THREAD_WRITE:
1128 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1129 thread_array = master->write;
1130 break;
1131 case THREAD_TIMER:
1132 queue = master->timer;
1133 break;
1134 case THREAD_EVENT:
1135 list = &master->event;
1136 break;
1137 case THREAD_READY:
1138 list = &master->ready;
1139 break;
1140 default:
1141 continue;
1142 break;
1143 }
1144
1145 if (queue) {
1146 assert(thread->index >= 0);
1147 assert(thread == queue->array[thread->index]);
1148 pqueue_remove_at(thread->index, queue);
1149 } else if (list) {
1150 thread_list_delete(list, thread);
1151 } else if (thread_array) {
1152 thread_array[thread->u.fd] = NULL;
1153 } else {
1154 assert(!"Thread should be either in queue or list or array!");
1155 }
1156
1157 if (thread->ref)
1158 *thread->ref = NULL;
1159
1160 thread_add_unuse(thread->master, thread);
1161 }
1162
1163 /* Delete and free all cancellation requests */
1164 list_delete_all_node(master->cancel_req);
1165
1166 /* Wake up any threads which may be blocked in thread_cancel_async() */
1167 master->canceled = true;
1168 pthread_cond_broadcast(&master->cancel_cond);
1169 }
1170
1171 /**
1172 * Cancel any events which have the specified argument.
1173 *
1174 * MT-Unsafe
1175 *
1176 * @param m the thread_master to cancel from
1177 * @param arg the argument passed when creating the event
1178 */
1179 void thread_cancel_event(struct thread_master *master, void *arg)
1180 {
1181 assert(master->owner == pthread_self());
1182
1183 pthread_mutex_lock(&master->mtx);
1184 {
1185 struct cancel_req *cr =
1186 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1187 cr->eventobj = arg;
1188 listnode_add(master->cancel_req, cr);
1189 do_thread_cancel(master);
1190 }
1191 pthread_mutex_unlock(&master->mtx);
1192 }
1193
1194 /**
1195 * Cancel a specific task.
1196 *
1197 * MT-Unsafe
1198 *
1199 * @param thread task to cancel
1200 */
1201 void thread_cancel(struct thread *thread)
1202 {
1203 struct thread_master *master = thread->master;
1204
1205 assert(master->owner == pthread_self());
1206
1207 pthread_mutex_lock(&master->mtx);
1208 {
1209 struct cancel_req *cr =
1210 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1211 cr->thread = thread;
1212 listnode_add(master->cancel_req, cr);
1213 do_thread_cancel(master);
1214 }
1215 pthread_mutex_unlock(&master->mtx);
1216 }
1217
1218 /**
1219 * Asynchronous cancellation.
1220 *
1221 * Called with either a struct thread ** or void * to an event argument,
1222 * this function posts the correct cancellation request and blocks until it is
1223 * serviced.
1224 *
1225 * If the thread is currently running, execution blocks until it completes.
1226 *
1227 * The last two parameters are mutually exclusive, i.e. if you pass one the
1228 * other must be NULL.
1229 *
1230 * When the cancellation procedure executes on the target thread_master, the
1231 * thread * provided is checked for nullity. If it is null, the thread is
1232 * assumed to no longer exist and the cancellation request is a no-op. Thus
1233 * users of this API must pass a back-reference when scheduling the original
1234 * task.
1235 *
1236 * MT-Safe
1237 *
1238 * @param master the thread master with the relevant event / task
1239 * @param thread pointer to thread to cancel
1240 * @param eventobj the event
1241 */
1242 void thread_cancel_async(struct thread_master *master, struct thread **thread,
1243 void *eventobj)
1244 {
1245 assert(!(thread && eventobj) && (thread || eventobj));
1246 assert(master->owner != pthread_self());
1247
1248 pthread_mutex_lock(&master->mtx);
1249 {
1250 master->canceled = false;
1251
1252 if (thread) {
1253 struct cancel_req *cr =
1254 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1255 cr->threadref = thread;
1256 listnode_add(master->cancel_req, cr);
1257 } else if (eventobj) {
1258 struct cancel_req *cr =
1259 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1260 cr->eventobj = eventobj;
1261 listnode_add(master->cancel_req, cr);
1262 }
1263 AWAKEN(master);
1264
1265 while (!master->canceled)
1266 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1267 }
1268 pthread_mutex_unlock(&master->mtx);
1269 }
1270 /* ------------------------------------------------------------------------- */
1271
1272 static struct timeval *thread_timer_wait(struct pqueue *queue,
1273 struct timeval *timer_val)
1274 {
1275 if (queue->size) {
1276 struct thread *next_timer = queue->array[0];
1277 monotime_until(&next_timer->u.sands, timer_val);
1278 return timer_val;
1279 }
1280 return NULL;
1281 }
1282
1283 static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1284 struct thread *fetch)
1285 {
1286 *fetch = *thread;
1287 thread_add_unuse(m, thread);
1288 return fetch;
1289 }
1290
1291 static int thread_process_io_helper(struct thread_master *m,
1292 struct thread *thread, short state, int pos)
1293 {
1294 struct thread **thread_array;
1295
1296 if (!thread)
1297 return 0;
1298
1299 if (thread->type == THREAD_READ)
1300 thread_array = m->read;
1301 else
1302 thread_array = m->write;
1303
1304 thread_array[thread->u.fd] = NULL;
1305 thread_list_add(&m->ready, thread);
1306 thread->type = THREAD_READY;
1307 /* if another pthread scheduled this file descriptor for the event we're
1308 * responding to, no problem; we're getting to it now */
1309 thread->master->handler.pfds[pos].events &= ~(state);
1310 return 1;
1311 }
1312
1313 /**
1314 * Process I/O events.
1315 *
1316 * Walks through file descriptor array looking for those pollfds whose .revents
1317 * field has something interesting. Deletes any invalid file descriptors.
1318 *
1319 * @param m the thread master
1320 * @param num the number of active file descriptors (return value of poll())
1321 */
1322 static void thread_process_io(struct thread_master *m, unsigned int num)
1323 {
1324 unsigned int ready = 0;
1325 struct pollfd *pfds = m->handler.copy;
1326
1327 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1328 /* no event for current fd? immediately continue */
1329 if (pfds[i].revents == 0)
1330 continue;
1331
1332 ready++;
1333
1334 /* Unless someone has called thread_cancel from another pthread,
1335 * the only
1336 * thing that could have changed in m->handler.pfds while we
1337 * were
1338 * asleep is the .events field in a given pollfd. Barring
1339 * thread_cancel()
1340 * that value should be a superset of the values we have in our
1341 * copy, so
1342 * there's no need to update it. Similarily, barring deletion,
1343 * the fd
1344 * should still be a valid index into the master's pfds. */
1345 if (pfds[i].revents & (POLLIN | POLLHUP))
1346 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
1347 i);
1348 if (pfds[i].revents & POLLOUT)
1349 thread_process_io_helper(m, m->write[pfds[i].fd],
1350 POLLOUT, i);
1351
1352 /* if one of our file descriptors is garbage, remove the same
1353 * from
1354 * both pfds + update sizes and index */
1355 if (pfds[i].revents & POLLNVAL) {
1356 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1357 (m->handler.pfdcount - i - 1)
1358 * sizeof(struct pollfd));
1359 m->handler.pfdcount--;
1360
1361 memmove(pfds + i, pfds + i + 1,
1362 (m->handler.copycount - i - 1)
1363 * sizeof(struct pollfd));
1364 m->handler.copycount--;
1365
1366 i--;
1367 }
1368 }
1369 }
1370
1371 /* Add all timers that have popped to the ready list. */
1372 static unsigned int thread_process_timers(struct pqueue *queue,
1373 struct timeval *timenow)
1374 {
1375 struct thread *thread;
1376 unsigned int ready = 0;
1377
1378 while (queue->size) {
1379 thread = queue->array[0];
1380 if (timercmp(timenow, &thread->u.sands, <))
1381 return ready;
1382 pqueue_dequeue(queue);
1383 thread->type = THREAD_READY;
1384 thread_list_add(&thread->master->ready, thread);
1385 ready++;
1386 }
1387 return ready;
1388 }
1389
1390 /* process a list en masse, e.g. for event thread lists */
1391 static unsigned int thread_process(struct thread_list *list)
1392 {
1393 struct thread *thread;
1394 struct thread *next;
1395 unsigned int ready = 0;
1396
1397 for (thread = list->head; thread; thread = next) {
1398 next = thread->next;
1399 thread_list_delete(list, thread);
1400 thread->type = THREAD_READY;
1401 thread_list_add(&thread->master->ready, thread);
1402 ready++;
1403 }
1404 return ready;
1405 }
1406
1407
1408 /* Fetch next ready thread. */
1409 struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
1410 {
1411 struct thread *thread = NULL;
1412 struct timeval now;
1413 struct timeval zerotime = {0, 0};
1414 struct timeval tv;
1415 struct timeval *tw = NULL;
1416
1417 int num = 0;
1418
1419 do {
1420 /* Handle signals if any */
1421 if (m->handle_signals)
1422 quagga_sigevent_process();
1423
1424 pthread_mutex_lock(&m->mtx);
1425
1426 /* Process any pending cancellation requests */
1427 do_thread_cancel(m);
1428
1429 /*
1430 * Attempt to flush ready queue before going into poll().
1431 * This is performance-critical. Think twice before modifying.
1432 */
1433 if ((thread = thread_trim_head(&m->ready))) {
1434 fetch = thread_run(m, thread, fetch);
1435 if (fetch->ref)
1436 *fetch->ref = NULL;
1437 pthread_mutex_unlock(&m->mtx);
1438 break;
1439 }
1440
1441 /* otherwise, tick through scheduling sequence */
1442
1443 /*
1444 * Post events to ready queue. This must come before the
1445 * following block since events should occur immediately
1446 */
1447 thread_process(&m->event);
1448
1449 /*
1450 * If there are no tasks on the ready queue, we will poll()
1451 * until a timer expires or we receive I/O, whichever comes
1452 * first. The strategy for doing this is:
1453 *
1454 * - If there are events pending, set the poll() timeout to zero
1455 * - If there are no events pending, but there are timers
1456 * pending, set the
1457 * timeout to the smallest remaining time on any timer
1458 * - If there are neither timers nor events pending, but there
1459 * are file
1460 * descriptors pending, block indefinitely in poll()
1461 * - If nothing is pending, it's time for the application to die
1462 *
1463 * In every case except the last, we need to hit poll() at least
1464 * once per loop to avoid starvation by events
1465 */
1466 if (m->ready.count == 0)
1467 tw = thread_timer_wait(m->timer, &tv);
1468
1469 if (m->ready.count != 0 || (tw && !timercmp(tw, &zerotime, >)))
1470 tw = &zerotime;
1471
1472 if (!tw && m->handler.pfdcount == 0) { /* die */
1473 pthread_mutex_unlock(&m->mtx);
1474 fetch = NULL;
1475 break;
1476 }
1477
1478 /*
1479 * Copy pollfd array + # active pollfds in it. Not necessary to
1480 * copy the array size as this is fixed.
1481 */
1482 m->handler.copycount = m->handler.pfdcount;
1483 memcpy(m->handler.copy, m->handler.pfds,
1484 m->handler.copycount * sizeof(struct pollfd));
1485
1486 pthread_mutex_unlock(&m->mtx);
1487 {
1488 num = fd_poll(m, m->handler.copy, m->handler.pfdsize,
1489 m->handler.copycount, tw);
1490 }
1491 pthread_mutex_lock(&m->mtx);
1492
1493 /* Handle any errors received in poll() */
1494 if (num < 0) {
1495 if (errno == EINTR) {
1496 pthread_mutex_unlock(&m->mtx);
1497 /* loop around to signal handler */
1498 continue;
1499 }
1500
1501 /* else die */
1502 flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
1503 safe_strerror(errno));
1504 pthread_mutex_unlock(&m->mtx);
1505 fetch = NULL;
1506 break;
1507 }
1508
1509 /* Post timers to ready queue. */
1510 monotime(&now);
1511 thread_process_timers(m->timer, &now);
1512
1513 /* Post I/O to ready queue. */
1514 if (num > 0)
1515 thread_process_io(m, num);
1516
1517 pthread_mutex_unlock(&m->mtx);
1518
1519 } while (!thread && m->spin);
1520
1521 return fetch;
1522 }
1523
1524 static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
1525 {
1526 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1527 + (a.tv_usec - b.tv_usec));
1528 }
1529
1530 unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1531 unsigned long *cputime)
1532 {
1533 /* This is 'user + sys' time. */
1534 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1535 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1536 return timeval_elapsed(now->real, start->real);
1537 }
1538
1539 /* We should aim to yield after yield milliseconds, which defaults
1540 to THREAD_YIELD_TIME_SLOT .
1541 Note: we are using real (wall clock) time for this calculation.
1542 It could be argued that CPU time may make more sense in certain
1543 contexts. The things to consider are whether the thread may have
1544 blocked (in which case wall time increases, but CPU time does not),
1545 or whether the system is heavily loaded with other processes competing
1546 for CPU time. On balance, wall clock time seems to make sense.
1547 Plus it has the added benefit that gettimeofday should be faster
1548 than calling getrusage. */
1549 int thread_should_yield(struct thread *thread)
1550 {
1551 int result;
1552 pthread_mutex_lock(&thread->mtx);
1553 {
1554 result = monotime_since(&thread->real, NULL)
1555 > (int64_t)thread->yield;
1556 }
1557 pthread_mutex_unlock(&thread->mtx);
1558 return result;
1559 }
1560
1561 void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
1562 {
1563 pthread_mutex_lock(&thread->mtx);
1564 {
1565 thread->yield = yield_time;
1566 }
1567 pthread_mutex_unlock(&thread->mtx);
1568 }
1569
1570 void thread_getrusage(RUSAGE_T *r)
1571 {
1572 #if defined RUSAGE_THREAD
1573 #define FRR_RUSAGE RUSAGE_THREAD
1574 #else
1575 #define FRR_RUSAGE RUSAGE_SELF
1576 #endif
1577 monotime(&r->real);
1578 getrusage(FRR_RUSAGE, &(r->cpu));
1579 }
1580
1581 /*
1582 * Call a thread.
1583 *
1584 * This function will atomically update the thread's usage history. At present
1585 * this is the only spot where usage history is written. Nevertheless the code
1586 * has been written such that the introduction of writers in the future should
1587 * not need to update it provided the writers atomically perform only the
1588 * operations done here, i.e. updating the total and maximum times. In
1589 * particular, the maximum real and cpu times must be monotonically increasing
1590 * or this code is not correct.
1591 */
1592 void thread_call(struct thread *thread)
1593 {
1594 _Atomic unsigned long realtime, cputime;
1595 unsigned long exp;
1596 unsigned long helper;
1597 RUSAGE_T before, after;
1598
1599 GETRUSAGE(&before);
1600 thread->real = before.real;
1601
1602 pthread_setspecific(thread_current, thread);
1603 (*thread->func)(thread);
1604 pthread_setspecific(thread_current, NULL);
1605
1606 GETRUSAGE(&after);
1607
1608 realtime = thread_consumed_time(&after, &before, &helper);
1609 cputime = helper;
1610
1611 /* update realtime */
1612 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1613 memory_order_seq_cst);
1614 exp = atomic_load_explicit(&thread->hist->real.max,
1615 memory_order_seq_cst);
1616 while (exp < realtime
1617 && !atomic_compare_exchange_weak_explicit(
1618 &thread->hist->real.max, &exp, realtime,
1619 memory_order_seq_cst, memory_order_seq_cst))
1620 ;
1621
1622 /* update cputime */
1623 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1624 memory_order_seq_cst);
1625 exp = atomic_load_explicit(&thread->hist->cpu.max,
1626 memory_order_seq_cst);
1627 while (exp < cputime
1628 && !atomic_compare_exchange_weak_explicit(
1629 &thread->hist->cpu.max, &exp, cputime,
1630 memory_order_seq_cst, memory_order_seq_cst))
1631 ;
1632
1633 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1634 memory_order_seq_cst);
1635 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1636 memory_order_seq_cst);
1637
1638 #ifdef CONSUMED_TIME_CHECK
1639 if (realtime > CONSUMED_TIME_CHECK) {
1640 /*
1641 * We have a CPU Hog on our hands.
1642 * Whinge about it now, so we're aware this is yet another task
1643 * to fix.
1644 */
1645 flog_warn(
1646 EC_LIB_SLOW_THREAD,
1647 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1648 thread->funcname, (unsigned long)thread->func,
1649 realtime / 1000, cputime / 1000);
1650 }
1651 #endif /* CONSUMED_TIME_CHECK */
1652 }
1653
1654 /* Execute thread */
1655 void funcname_thread_execute(struct thread_master *m,
1656 int (*func)(struct thread *), void *arg, int val,
1657 debugargdef)
1658 {
1659 struct thread *thread;
1660
1661 /* Get or allocate new thread to execute. */
1662 pthread_mutex_lock(&m->mtx);
1663 {
1664 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
1665
1666 /* Set its event value. */
1667 pthread_mutex_lock(&thread->mtx);
1668 {
1669 thread->add_type = THREAD_EXECUTE;
1670 thread->u.val = val;
1671 thread->ref = &thread;
1672 }
1673 pthread_mutex_unlock(&thread->mtx);
1674 }
1675 pthread_mutex_unlock(&m->mtx);
1676
1677 /* Execute thread doing all accounting. */
1678 thread_call(thread);
1679
1680 /* Give back or free thread. */
1681 thread_add_unuse(m, thread);
1682 }