]> git.proxmox.com Git - mirror_frr.git/blob - lib/thread.c
lib: fix doc comment of the "cli_show_end" northbound callback
[mirror_frr.git] / lib / thread.c
1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
21 /* #define DEBUG */
22
23 #include <zebra.h>
24 #include <sys/resource.h>
25
26 #include "thread.h"
27 #include "memory.h"
28 #include "frrcu.h"
29 #include "log.h"
30 #include "hash.h"
31 #include "command.h"
32 #include "sigevent.h"
33 #include "network.h"
34 #include "jhash.h"
35 #include "frratomic.h"
36 #include "lib_errors.h"
37
38 DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
39 DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
40 DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
41 DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
42
43 DECLARE_LIST(thread_list, struct thread, threaditem)
44
45 static int thread_timer_cmp(const struct thread *a, const struct thread *b)
46 {
47 if (a->u.sands.tv_sec < b->u.sands.tv_sec)
48 return -1;
49 if (a->u.sands.tv_sec > b->u.sands.tv_sec)
50 return 1;
51 if (a->u.sands.tv_usec < b->u.sands.tv_usec)
52 return -1;
53 if (a->u.sands.tv_usec > b->u.sands.tv_usec)
54 return 1;
55 return 0;
56 }
57
58 DECLARE_HEAP(thread_timer_list, struct thread, timeritem,
59 thread_timer_cmp)
60
61 #if defined(__APPLE__)
62 #include <mach/mach.h>
63 #include <mach/mach_time.h>
64 #endif
65
66 #define AWAKEN(m) \
67 do { \
68 static unsigned char wakebyte = 0x01; \
69 write(m->io_pipe[1], &wakebyte, 1); \
70 } while (0);
71
72 /* control variable for initializer */
73 static pthread_once_t init_once = PTHREAD_ONCE_INIT;
74 pthread_key_t thread_current;
75
76 static pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
77 static struct list *masters;
78
79 static void thread_free(struct thread_master *master, struct thread *thread);
80
81 /* CLI start ---------------------------------------------------------------- */
82 static unsigned int cpu_record_hash_key(const struct cpu_thread_history *a)
83 {
84 int size = sizeof(a->func);
85
86 return jhash(&a->func, size, 0);
87 }
88
89 static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
90 const struct cpu_thread_history *b)
91 {
92 return a->func == b->func;
93 }
94
95 static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
96 {
97 struct cpu_thread_history *new;
98 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
99 new->func = a->func;
100 new->funcname = a->funcname;
101 return new;
102 }
103
104 static void cpu_record_hash_free(void *a)
105 {
106 struct cpu_thread_history *hist = a;
107
108 XFREE(MTYPE_THREAD_STATS, hist);
109 }
110
111 static void vty_out_cpu_thread_history(struct vty *vty,
112 struct cpu_thread_history *a)
113 {
114 vty_out(vty, "%5zu %10zu.%03zu %9zu %8zu %9zu %8zu %9zu",
115 (size_t)a->total_active,
116 a->cpu.total / 1000, a->cpu.total % 1000,
117 (size_t)a->total_calls,
118 a->cpu.total / a->total_calls, a->cpu.max,
119 a->real.total / a->total_calls, a->real.max);
120 vty_out(vty, " %c%c%c%c%c %s\n",
121 a->types & (1 << THREAD_READ) ? 'R' : ' ',
122 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
123 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
124 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
125 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
126 }
127
128 static void cpu_record_hash_print(struct hash_bucket *bucket, void *args[])
129 {
130 struct cpu_thread_history *totals = args[0];
131 struct cpu_thread_history copy;
132 struct vty *vty = args[1];
133 uint8_t *filter = args[2];
134
135 struct cpu_thread_history *a = bucket->data;
136
137 copy.total_active =
138 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
139 copy.total_calls =
140 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
141 copy.cpu.total =
142 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
143 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
144 copy.real.total =
145 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
146 copy.real.max =
147 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
148 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
149 copy.funcname = a->funcname;
150
151 if (!(copy.types & *filter))
152 return;
153
154 vty_out_cpu_thread_history(vty, &copy);
155 totals->total_active += copy.total_active;
156 totals->total_calls += copy.total_calls;
157 totals->real.total += copy.real.total;
158 if (totals->real.max < copy.real.max)
159 totals->real.max = copy.real.max;
160 totals->cpu.total += copy.cpu.total;
161 if (totals->cpu.max < copy.cpu.max)
162 totals->cpu.max = copy.cpu.max;
163 }
164
165 static void cpu_record_print(struct vty *vty, uint8_t filter)
166 {
167 struct cpu_thread_history tmp;
168 void *args[3] = {&tmp, vty, &filter};
169 struct thread_master *m;
170 struct listnode *ln;
171
172 memset(&tmp, 0, sizeof tmp);
173 tmp.funcname = "TOTAL";
174 tmp.types = filter;
175
176 pthread_mutex_lock(&masters_mtx);
177 {
178 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
179 const char *name = m->name ? m->name : "main";
180
181 char underline[strlen(name) + 1];
182 memset(underline, '-', sizeof(underline));
183 underline[sizeof(underline) - 1] = '\0';
184
185 vty_out(vty, "\n");
186 vty_out(vty, "Showing statistics for pthread %s\n",
187 name);
188 vty_out(vty, "-------------------------------%s\n",
189 underline);
190 vty_out(vty, "%21s %18s %18s\n", "",
191 "CPU (user+system):", "Real (wall-clock):");
192 vty_out(vty,
193 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
194 vty_out(vty, " Avg uSec Max uSecs");
195 vty_out(vty, " Type Thread\n");
196
197 if (m->cpu_record->count)
198 hash_iterate(
199 m->cpu_record,
200 (void (*)(struct hash_bucket *,
201 void *))cpu_record_hash_print,
202 args);
203 else
204 vty_out(vty, "No data to display yet.\n");
205
206 vty_out(vty, "\n");
207 }
208 }
209 pthread_mutex_unlock(&masters_mtx);
210
211 vty_out(vty, "\n");
212 vty_out(vty, "Total thread statistics\n");
213 vty_out(vty, "-------------------------\n");
214 vty_out(vty, "%21s %18s %18s\n", "",
215 "CPU (user+system):", "Real (wall-clock):");
216 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
217 vty_out(vty, " Avg uSec Max uSecs");
218 vty_out(vty, " Type Thread\n");
219
220 if (tmp.total_calls > 0)
221 vty_out_cpu_thread_history(vty, &tmp);
222 }
223
224 static void cpu_record_hash_clear(struct hash_bucket *bucket, void *args[])
225 {
226 uint8_t *filter = args[0];
227 struct hash *cpu_record = args[1];
228
229 struct cpu_thread_history *a = bucket->data;
230
231 if (!(a->types & *filter))
232 return;
233
234 hash_release(cpu_record, bucket->data);
235 }
236
237 static void cpu_record_clear(uint8_t filter)
238 {
239 uint8_t *tmp = &filter;
240 struct thread_master *m;
241 struct listnode *ln;
242
243 pthread_mutex_lock(&masters_mtx);
244 {
245 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
246 pthread_mutex_lock(&m->mtx);
247 {
248 void *args[2] = {tmp, m->cpu_record};
249 hash_iterate(
250 m->cpu_record,
251 (void (*)(struct hash_bucket *,
252 void *))cpu_record_hash_clear,
253 args);
254 }
255 pthread_mutex_unlock(&m->mtx);
256 }
257 }
258 pthread_mutex_unlock(&masters_mtx);
259 }
260
261 static uint8_t parse_filter(const char *filterstr)
262 {
263 int i = 0;
264 int filter = 0;
265
266 while (filterstr[i] != '\0') {
267 switch (filterstr[i]) {
268 case 'r':
269 case 'R':
270 filter |= (1 << THREAD_READ);
271 break;
272 case 'w':
273 case 'W':
274 filter |= (1 << THREAD_WRITE);
275 break;
276 case 't':
277 case 'T':
278 filter |= (1 << THREAD_TIMER);
279 break;
280 case 'e':
281 case 'E':
282 filter |= (1 << THREAD_EVENT);
283 break;
284 case 'x':
285 case 'X':
286 filter |= (1 << THREAD_EXECUTE);
287 break;
288 default:
289 break;
290 }
291 ++i;
292 }
293 return filter;
294 }
295
296 DEFUN (show_thread_cpu,
297 show_thread_cpu_cmd,
298 "show thread cpu [FILTER]",
299 SHOW_STR
300 "Thread information\n"
301 "Thread CPU usage\n"
302 "Display filter (rwtex)\n")
303 {
304 uint8_t filter = (uint8_t)-1U;
305 int idx = 0;
306
307 if (argv_find(argv, argc, "FILTER", &idx)) {
308 filter = parse_filter(argv[idx]->arg);
309 if (!filter) {
310 vty_out(vty,
311 "Invalid filter \"%s\" specified; must contain at least"
312 "one of 'RWTEXB'\n",
313 argv[idx]->arg);
314 return CMD_WARNING;
315 }
316 }
317
318 cpu_record_print(vty, filter);
319 return CMD_SUCCESS;
320 }
321
322 static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
323 {
324 const char *name = m->name ? m->name : "main";
325 char underline[strlen(name) + 1];
326 struct thread *thread;
327 uint32_t i;
328
329 memset(underline, '-', sizeof(underline));
330 underline[sizeof(underline) - 1] = '\0';
331
332 vty_out(vty, "\nShowing poll FD's for %s\n", name);
333 vty_out(vty, "----------------------%s\n", underline);
334 vty_out(vty, "Count: %u/%d\n", (uint32_t)m->handler.pfdcount,
335 m->fd_limit);
336 for (i = 0; i < m->handler.pfdcount; i++) {
337 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\t\t", i,
338 m->handler.pfds[i].fd, m->handler.pfds[i].events,
339 m->handler.pfds[i].revents);
340
341 if (m->handler.pfds[i].events & POLLIN) {
342 thread = m->read[m->handler.pfds[i].fd];
343
344 if (!thread)
345 vty_out(vty, "ERROR ");
346 else
347 vty_out(vty, "%s ", thread->funcname);
348 } else
349 vty_out(vty, " ");
350
351 if (m->handler.pfds[i].events & POLLOUT) {
352 thread = m->write[m->handler.pfds[i].fd];
353
354 if (!thread)
355 vty_out(vty, "ERROR\n");
356 else
357 vty_out(vty, "%s\n", thread->funcname);
358 } else
359 vty_out(vty, "\n");
360 }
361 }
362
363 DEFUN (show_thread_poll,
364 show_thread_poll_cmd,
365 "show thread poll",
366 SHOW_STR
367 "Thread information\n"
368 "Show poll FD's and information\n")
369 {
370 struct listnode *node;
371 struct thread_master *m;
372
373 pthread_mutex_lock(&masters_mtx);
374 {
375 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
376 show_thread_poll_helper(vty, m);
377 }
378 }
379 pthread_mutex_unlock(&masters_mtx);
380
381 return CMD_SUCCESS;
382 }
383
384
385 DEFUN (clear_thread_cpu,
386 clear_thread_cpu_cmd,
387 "clear thread cpu [FILTER]",
388 "Clear stored data in all pthreads\n"
389 "Thread information\n"
390 "Thread CPU usage\n"
391 "Display filter (rwtexb)\n")
392 {
393 uint8_t filter = (uint8_t)-1U;
394 int idx = 0;
395
396 if (argv_find(argv, argc, "FILTER", &idx)) {
397 filter = parse_filter(argv[idx]->arg);
398 if (!filter) {
399 vty_out(vty,
400 "Invalid filter \"%s\" specified; must contain at least"
401 "one of 'RWTEXB'\n",
402 argv[idx]->arg);
403 return CMD_WARNING;
404 }
405 }
406
407 cpu_record_clear(filter);
408 return CMD_SUCCESS;
409 }
410
411 void thread_cmd_init(void)
412 {
413 install_element(VIEW_NODE, &show_thread_cpu_cmd);
414 install_element(VIEW_NODE, &show_thread_poll_cmd);
415 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
416 }
417 /* CLI end ------------------------------------------------------------------ */
418
419
420 static void cancelreq_del(void *cr)
421 {
422 XFREE(MTYPE_TMP, cr);
423 }
424
425 /* initializer, only ever called once */
426 static void initializer(void)
427 {
428 pthread_key_create(&thread_current, NULL);
429 }
430
431 struct thread_master *thread_master_create(const char *name)
432 {
433 struct thread_master *rv;
434 struct rlimit limit;
435
436 pthread_once(&init_once, &initializer);
437
438 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
439
440 /* Initialize master mutex */
441 pthread_mutex_init(&rv->mtx, NULL);
442 pthread_cond_init(&rv->cancel_cond, NULL);
443
444 /* Set name */
445 rv->name = name ? XSTRDUP(MTYPE_THREAD_MASTER, name) : NULL;
446
447 /* Initialize I/O task data structures */
448 getrlimit(RLIMIT_NOFILE, &limit);
449 rv->fd_limit = (int)limit.rlim_cur;
450 rv->read = XCALLOC(MTYPE_THREAD_POLL,
451 sizeof(struct thread *) * rv->fd_limit);
452
453 rv->write = XCALLOC(MTYPE_THREAD_POLL,
454 sizeof(struct thread *) * rv->fd_limit);
455
456 rv->cpu_record = hash_create_size(
457 8, (unsigned int (*)(const void *))cpu_record_hash_key,
458 (bool (*)(const void *, const void *))cpu_record_hash_cmp,
459 "Thread Hash");
460
461 thread_list_init(&rv->event);
462 thread_list_init(&rv->ready);
463 thread_list_init(&rv->unuse);
464 thread_timer_list_init(&rv->timer);
465
466 /* Initialize thread_fetch() settings */
467 rv->spin = true;
468 rv->handle_signals = true;
469
470 /* Set pthread owner, should be updated by actual owner */
471 rv->owner = pthread_self();
472 rv->cancel_req = list_new();
473 rv->cancel_req->del = cancelreq_del;
474 rv->canceled = true;
475
476 /* Initialize pipe poker */
477 pipe(rv->io_pipe);
478 set_nonblocking(rv->io_pipe[0]);
479 set_nonblocking(rv->io_pipe[1]);
480
481 /* Initialize data structures for poll() */
482 rv->handler.pfdsize = rv->fd_limit;
483 rv->handler.pfdcount = 0;
484 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
485 sizeof(struct pollfd) * rv->handler.pfdsize);
486 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
487 sizeof(struct pollfd) * rv->handler.pfdsize);
488
489 /* add to list of threadmasters */
490 pthread_mutex_lock(&masters_mtx);
491 {
492 if (!masters)
493 masters = list_new();
494
495 listnode_add(masters, rv);
496 }
497 pthread_mutex_unlock(&masters_mtx);
498
499 return rv;
500 }
501
502 void thread_master_set_name(struct thread_master *master, const char *name)
503 {
504 pthread_mutex_lock(&master->mtx);
505 {
506 XFREE(MTYPE_THREAD_MASTER, master->name);
507 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
508 }
509 pthread_mutex_unlock(&master->mtx);
510 }
511
512 #define THREAD_UNUSED_DEPTH 10
513
514 /* Move thread to unuse list. */
515 static void thread_add_unuse(struct thread_master *m, struct thread *thread)
516 {
517 pthread_mutex_t mtxc = thread->mtx;
518
519 assert(m != NULL && thread != NULL);
520
521 thread->hist->total_active--;
522 memset(thread, 0, sizeof(struct thread));
523 thread->type = THREAD_UNUSED;
524
525 /* Restore the thread mutex context. */
526 thread->mtx = mtxc;
527
528 if (thread_list_count(&m->unuse) < THREAD_UNUSED_DEPTH) {
529 thread_list_add_tail(&m->unuse, thread);
530 return;
531 }
532
533 thread_free(m, thread);
534 }
535
536 /* Free all unused thread. */
537 static void thread_list_free(struct thread_master *m,
538 struct thread_list_head *list)
539 {
540 struct thread *t;
541
542 while ((t = thread_list_pop(list)))
543 thread_free(m, t);
544 }
545
546 static void thread_array_free(struct thread_master *m,
547 struct thread **thread_array)
548 {
549 struct thread *t;
550 int index;
551
552 for (index = 0; index < m->fd_limit; ++index) {
553 t = thread_array[index];
554 if (t) {
555 thread_array[index] = NULL;
556 thread_free(m, t);
557 }
558 }
559 XFREE(MTYPE_THREAD_POLL, thread_array);
560 }
561
562 /*
563 * thread_master_free_unused
564 *
565 * As threads are finished with they are put on the
566 * unuse list for later reuse.
567 * If we are shutting down, Free up unused threads
568 * So we can see if we forget to shut anything off
569 */
570 void thread_master_free_unused(struct thread_master *m)
571 {
572 pthread_mutex_lock(&m->mtx);
573 {
574 struct thread *t;
575 while ((t = thread_list_pop(&m->unuse)))
576 thread_free(m, t);
577 }
578 pthread_mutex_unlock(&m->mtx);
579 }
580
581 /* Stop thread scheduler. */
582 void thread_master_free(struct thread_master *m)
583 {
584 struct thread *t;
585
586 pthread_mutex_lock(&masters_mtx);
587 {
588 listnode_delete(masters, m);
589 if (masters->count == 0) {
590 list_delete(&masters);
591 }
592 }
593 pthread_mutex_unlock(&masters_mtx);
594
595 thread_array_free(m, m->read);
596 thread_array_free(m, m->write);
597 while ((t = thread_timer_list_pop(&m->timer)))
598 thread_free(m, t);
599 thread_list_free(m, &m->event);
600 thread_list_free(m, &m->ready);
601 thread_list_free(m, &m->unuse);
602 pthread_mutex_destroy(&m->mtx);
603 pthread_cond_destroy(&m->cancel_cond);
604 close(m->io_pipe[0]);
605 close(m->io_pipe[1]);
606 list_delete(&m->cancel_req);
607 m->cancel_req = NULL;
608
609 hash_clean(m->cpu_record, cpu_record_hash_free);
610 hash_free(m->cpu_record);
611 m->cpu_record = NULL;
612
613 XFREE(MTYPE_THREAD_MASTER, m->name);
614 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
615 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
616 XFREE(MTYPE_THREAD_MASTER, m);
617 }
618
619 /* Return remain time in miliseconds. */
620 unsigned long thread_timer_remain_msec(struct thread *thread)
621 {
622 int64_t remain;
623
624 pthread_mutex_lock(&thread->mtx);
625 {
626 remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
627 }
628 pthread_mutex_unlock(&thread->mtx);
629
630 return remain < 0 ? 0 : remain;
631 }
632
633 /* Return remain time in seconds. */
634 unsigned long thread_timer_remain_second(struct thread *thread)
635 {
636 return thread_timer_remain_msec(thread) / 1000LL;
637 }
638
639 #define debugargdef const char *funcname, const char *schedfrom, int fromln
640 #define debugargpass funcname, schedfrom, fromln
641
642 struct timeval thread_timer_remain(struct thread *thread)
643 {
644 struct timeval remain;
645 pthread_mutex_lock(&thread->mtx);
646 {
647 monotime_until(&thread->u.sands, &remain);
648 }
649 pthread_mutex_unlock(&thread->mtx);
650 return remain;
651 }
652
653 /* Get new thread. */
654 static struct thread *thread_get(struct thread_master *m, uint8_t type,
655 int (*func)(struct thread *), void *arg,
656 debugargdef)
657 {
658 struct thread *thread = thread_list_pop(&m->unuse);
659 struct cpu_thread_history tmp;
660
661 if (!thread) {
662 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
663 /* mutex only needs to be initialized at struct creation. */
664 pthread_mutex_init(&thread->mtx, NULL);
665 m->alloc++;
666 }
667
668 thread->type = type;
669 thread->add_type = type;
670 thread->master = m;
671 thread->arg = arg;
672 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
673 thread->ref = NULL;
674
675 /*
676 * So if the passed in funcname is not what we have
677 * stored that means the thread->hist needs to be
678 * updated. We keep the last one around in unused
679 * under the assumption that we are probably
680 * going to immediately allocate the same
681 * type of thread.
682 * This hopefully saves us some serious
683 * hash_get lookups.
684 */
685 if (thread->funcname != funcname || thread->func != func) {
686 tmp.func = func;
687 tmp.funcname = funcname;
688 thread->hist =
689 hash_get(m->cpu_record, &tmp,
690 (void *(*)(void *))cpu_record_hash_alloc);
691 }
692 thread->hist->total_active++;
693 thread->func = func;
694 thread->funcname = funcname;
695 thread->schedfrom = schedfrom;
696 thread->schedfrom_line = fromln;
697
698 return thread;
699 }
700
701 static void thread_free(struct thread_master *master, struct thread *thread)
702 {
703 /* Update statistics. */
704 assert(master->alloc > 0);
705 master->alloc--;
706
707 /* Free allocated resources. */
708 pthread_mutex_destroy(&thread->mtx);
709 XFREE(MTYPE_THREAD, thread);
710 }
711
712 static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
713 nfds_t count, const struct timeval *timer_wait)
714 {
715 /* If timer_wait is null here, that means poll() should block
716 * indefinitely,
717 * unless the thread_master has overridden it by setting
718 * ->selectpoll_timeout.
719 * If the value is positive, it specifies the maximum number of
720 * milliseconds
721 * to wait. If the timeout is -1, it specifies that we should never wait
722 * and
723 * always return immediately even if no event is detected. If the value
724 * is
725 * zero, the behavior is default. */
726 int timeout = -1;
727
728 /* number of file descriptors with events */
729 int num;
730
731 if (timer_wait != NULL
732 && m->selectpoll_timeout == 0) // use the default value
733 timeout = (timer_wait->tv_sec * 1000)
734 + (timer_wait->tv_usec / 1000);
735 else if (m->selectpoll_timeout > 0) // use the user's timeout
736 timeout = m->selectpoll_timeout;
737 else if (m->selectpoll_timeout
738 < 0) // effect a poll (return immediately)
739 timeout = 0;
740
741 rcu_read_unlock();
742 rcu_assert_read_unlocked();
743
744 /* add poll pipe poker */
745 assert(count + 1 < pfdsize);
746 pfds[count].fd = m->io_pipe[0];
747 pfds[count].events = POLLIN;
748 pfds[count].revents = 0x00;
749
750 num = poll(pfds, count + 1, timeout);
751
752 unsigned char trash[64];
753 if (num > 0 && pfds[count].revents != 0 && num--)
754 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
755 ;
756
757 rcu_read_lock();
758
759 return num;
760 }
761
762 /* Add new read thread. */
763 struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
764 int (*func)(struct thread *),
765 void *arg, int fd,
766 struct thread **t_ptr,
767 debugargdef)
768 {
769 struct thread *thread = NULL;
770 struct thread **thread_array;
771
772 assert(fd >= 0 && fd < m->fd_limit);
773 pthread_mutex_lock(&m->mtx);
774 {
775 if (t_ptr
776 && *t_ptr) // thread is already scheduled; don't reschedule
777 {
778 pthread_mutex_unlock(&m->mtx);
779 return NULL;
780 }
781
782 /* default to a new pollfd */
783 nfds_t queuepos = m->handler.pfdcount;
784
785 if (dir == THREAD_READ)
786 thread_array = m->read;
787 else
788 thread_array = m->write;
789
790 /* if we already have a pollfd for our file descriptor, find and
791 * use it */
792 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
793 if (m->handler.pfds[i].fd == fd) {
794 queuepos = i;
795
796 #ifdef DEV_BUILD
797 /*
798 * What happens if we have a thread already
799 * created for this event?
800 */
801 if (thread_array[fd])
802 assert(!"Thread already scheduled for file descriptor");
803 #endif
804 break;
805 }
806
807 /* make sure we have room for this fd + pipe poker fd */
808 assert(queuepos + 1 < m->handler.pfdsize);
809
810 thread = thread_get(m, dir, func, arg, debugargpass);
811
812 m->handler.pfds[queuepos].fd = fd;
813 m->handler.pfds[queuepos].events |=
814 (dir == THREAD_READ ? POLLIN : POLLOUT);
815
816 if (queuepos == m->handler.pfdcount)
817 m->handler.pfdcount++;
818
819 if (thread) {
820 pthread_mutex_lock(&thread->mtx);
821 {
822 thread->u.fd = fd;
823 thread_array[thread->u.fd] = thread;
824 }
825 pthread_mutex_unlock(&thread->mtx);
826
827 if (t_ptr) {
828 *t_ptr = thread;
829 thread->ref = t_ptr;
830 }
831 }
832
833 AWAKEN(m);
834 }
835 pthread_mutex_unlock(&m->mtx);
836
837 return thread;
838 }
839
840 static struct thread *
841 funcname_thread_add_timer_timeval(struct thread_master *m,
842 int (*func)(struct thread *), int type,
843 void *arg, struct timeval *time_relative,
844 struct thread **t_ptr, debugargdef)
845 {
846 struct thread *thread;
847
848 assert(m != NULL);
849
850 assert(type == THREAD_TIMER);
851 assert(time_relative);
852
853 pthread_mutex_lock(&m->mtx);
854 {
855 if (t_ptr
856 && *t_ptr) // thread is already scheduled; don't reschedule
857 {
858 pthread_mutex_unlock(&m->mtx);
859 return NULL;
860 }
861
862 thread = thread_get(m, type, func, arg, debugargpass);
863
864 pthread_mutex_lock(&thread->mtx);
865 {
866 monotime(&thread->u.sands);
867 timeradd(&thread->u.sands, time_relative,
868 &thread->u.sands);
869 thread_timer_list_add(&m->timer, thread);
870 if (t_ptr) {
871 *t_ptr = thread;
872 thread->ref = t_ptr;
873 }
874 }
875 pthread_mutex_unlock(&thread->mtx);
876
877 AWAKEN(m);
878 }
879 pthread_mutex_unlock(&m->mtx);
880
881 return thread;
882 }
883
884
885 /* Add timer event thread. */
886 struct thread *funcname_thread_add_timer(struct thread_master *m,
887 int (*func)(struct thread *),
888 void *arg, long timer,
889 struct thread **t_ptr, debugargdef)
890 {
891 struct timeval trel;
892
893 assert(m != NULL);
894
895 trel.tv_sec = timer;
896 trel.tv_usec = 0;
897
898 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
899 &trel, t_ptr, debugargpass);
900 }
901
902 /* Add timer event thread with "millisecond" resolution */
903 struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
904 int (*func)(struct thread *),
905 void *arg, long timer,
906 struct thread **t_ptr,
907 debugargdef)
908 {
909 struct timeval trel;
910
911 assert(m != NULL);
912
913 trel.tv_sec = timer / 1000;
914 trel.tv_usec = 1000 * (timer % 1000);
915
916 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
917 &trel, t_ptr, debugargpass);
918 }
919
920 /* Add timer event thread with "millisecond" resolution */
921 struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
922 int (*func)(struct thread *),
923 void *arg, struct timeval *tv,
924 struct thread **t_ptr, debugargdef)
925 {
926 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
927 t_ptr, debugargpass);
928 }
929
930 /* Add simple event thread. */
931 struct thread *funcname_thread_add_event(struct thread_master *m,
932 int (*func)(struct thread *),
933 void *arg, int val,
934 struct thread **t_ptr, debugargdef)
935 {
936 struct thread *thread;
937
938 assert(m != NULL);
939
940 pthread_mutex_lock(&m->mtx);
941 {
942 if (t_ptr
943 && *t_ptr) // thread is already scheduled; don't reschedule
944 {
945 pthread_mutex_unlock(&m->mtx);
946 return NULL;
947 }
948
949 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
950 pthread_mutex_lock(&thread->mtx);
951 {
952 thread->u.val = val;
953 thread_list_add_tail(&m->event, thread);
954 }
955 pthread_mutex_unlock(&thread->mtx);
956
957 if (t_ptr) {
958 *t_ptr = thread;
959 thread->ref = t_ptr;
960 }
961
962 AWAKEN(m);
963 }
964 pthread_mutex_unlock(&m->mtx);
965
966 return thread;
967 }
968
969 /* Thread cancellation ------------------------------------------------------ */
970
971 /**
972 * NOT's out the .events field of pollfd corresponding to the given file
973 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
974 *
975 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
976 * implementation for details.
977 *
978 * @param master
979 * @param fd
980 * @param state the event to cancel. One or more (OR'd together) of the
981 * following:
982 * - POLLIN
983 * - POLLOUT
984 */
985 static void thread_cancel_rw(struct thread_master *master, int fd, short state)
986 {
987 bool found = false;
988
989 /* Cancel POLLHUP too just in case some bozo set it */
990 state |= POLLHUP;
991
992 /* find the index of corresponding pollfd */
993 nfds_t i;
994
995 for (i = 0; i < master->handler.pfdcount; i++)
996 if (master->handler.pfds[i].fd == fd) {
997 found = true;
998 break;
999 }
1000
1001 if (!found) {
1002 zlog_debug(
1003 "[!] Received cancellation request for nonexistent rw job");
1004 zlog_debug("[!] threadmaster: %s | fd: %d",
1005 master->name ? master->name : "", fd);
1006 return;
1007 }
1008
1009 /* NOT out event. */
1010 master->handler.pfds[i].events &= ~(state);
1011
1012 /* If all events are canceled, delete / resize the pollfd array. */
1013 if (master->handler.pfds[i].events == 0) {
1014 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
1015 (master->handler.pfdcount - i - 1)
1016 * sizeof(struct pollfd));
1017 master->handler.pfdcount--;
1018 }
1019
1020 /* If we have the same pollfd in the copy, perform the same operations,
1021 * otherwise return. */
1022 if (i >= master->handler.copycount)
1023 return;
1024
1025 master->handler.copy[i].events &= ~(state);
1026
1027 if (master->handler.copy[i].events == 0) {
1028 memmove(master->handler.copy + i, master->handler.copy + i + 1,
1029 (master->handler.copycount - i - 1)
1030 * sizeof(struct pollfd));
1031 master->handler.copycount--;
1032 }
1033 }
1034
1035 /**
1036 * Process cancellation requests.
1037 *
1038 * This may only be run from the pthread which owns the thread_master.
1039 *
1040 * @param master the thread master to process
1041 * @REQUIRE master->mtx
1042 */
1043 static void do_thread_cancel(struct thread_master *master)
1044 {
1045 struct thread_list_head *list = NULL;
1046 struct thread **thread_array = NULL;
1047 struct thread *thread;
1048
1049 struct cancel_req *cr;
1050 struct listnode *ln;
1051 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1052 /* If this is an event object cancellation, linear search
1053 * through event
1054 * list deleting any events which have the specified argument.
1055 * We also
1056 * need to check every thread in the ready queue. */
1057 if (cr->eventobj) {
1058 struct thread *t;
1059
1060 frr_each_safe(thread_list, &master->event, t) {
1061 if (t->arg != cr->eventobj)
1062 continue;
1063 thread_list_del(&master->event, t);
1064 if (t->ref)
1065 *t->ref = NULL;
1066 thread_add_unuse(master, t);
1067 }
1068
1069 frr_each_safe(thread_list, &master->ready, t) {
1070 if (t->arg != cr->eventobj)
1071 continue;
1072 thread_list_del(&master->ready, t);
1073 if (t->ref)
1074 *t->ref = NULL;
1075 thread_add_unuse(master, t);
1076 }
1077 continue;
1078 }
1079
1080 /* The pointer varies depending on whether the cancellation
1081 * request was
1082 * made asynchronously or not. If it was, we need to check
1083 * whether the
1084 * thread even exists anymore before cancelling it. */
1085 thread = (cr->thread) ? cr->thread : *cr->threadref;
1086
1087 if (!thread)
1088 continue;
1089
1090 /* Determine the appropriate queue to cancel the thread from */
1091 switch (thread->type) {
1092 case THREAD_READ:
1093 thread_cancel_rw(master, thread->u.fd, POLLIN);
1094 thread_array = master->read;
1095 break;
1096 case THREAD_WRITE:
1097 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1098 thread_array = master->write;
1099 break;
1100 case THREAD_TIMER:
1101 thread_timer_list_del(&master->timer, thread);
1102 break;
1103 case THREAD_EVENT:
1104 list = &master->event;
1105 break;
1106 case THREAD_READY:
1107 list = &master->ready;
1108 break;
1109 default:
1110 continue;
1111 break;
1112 }
1113
1114 if (list) {
1115 thread_list_del(list, thread);
1116 } else if (thread_array) {
1117 thread_array[thread->u.fd] = NULL;
1118 }
1119
1120 if (thread->ref)
1121 *thread->ref = NULL;
1122
1123 thread_add_unuse(thread->master, thread);
1124 }
1125
1126 /* Delete and free all cancellation requests */
1127 list_delete_all_node(master->cancel_req);
1128
1129 /* Wake up any threads which may be blocked in thread_cancel_async() */
1130 master->canceled = true;
1131 pthread_cond_broadcast(&master->cancel_cond);
1132 }
1133
1134 /**
1135 * Cancel any events which have the specified argument.
1136 *
1137 * MT-Unsafe
1138 *
1139 * @param m the thread_master to cancel from
1140 * @param arg the argument passed when creating the event
1141 */
1142 void thread_cancel_event(struct thread_master *master, void *arg)
1143 {
1144 assert(master->owner == pthread_self());
1145
1146 pthread_mutex_lock(&master->mtx);
1147 {
1148 struct cancel_req *cr =
1149 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1150 cr->eventobj = arg;
1151 listnode_add(master->cancel_req, cr);
1152 do_thread_cancel(master);
1153 }
1154 pthread_mutex_unlock(&master->mtx);
1155 }
1156
1157 /**
1158 * Cancel a specific task.
1159 *
1160 * MT-Unsafe
1161 *
1162 * @param thread task to cancel
1163 */
1164 void thread_cancel(struct thread *thread)
1165 {
1166 struct thread_master *master = thread->master;
1167
1168 assert(master->owner == pthread_self());
1169
1170 pthread_mutex_lock(&master->mtx);
1171 {
1172 struct cancel_req *cr =
1173 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1174 cr->thread = thread;
1175 listnode_add(master->cancel_req, cr);
1176 do_thread_cancel(master);
1177 }
1178 pthread_mutex_unlock(&master->mtx);
1179 }
1180
1181 /**
1182 * Asynchronous cancellation.
1183 *
1184 * Called with either a struct thread ** or void * to an event argument,
1185 * this function posts the correct cancellation request and blocks until it is
1186 * serviced.
1187 *
1188 * If the thread is currently running, execution blocks until it completes.
1189 *
1190 * The last two parameters are mutually exclusive, i.e. if you pass one the
1191 * other must be NULL.
1192 *
1193 * When the cancellation procedure executes on the target thread_master, the
1194 * thread * provided is checked for nullity. If it is null, the thread is
1195 * assumed to no longer exist and the cancellation request is a no-op. Thus
1196 * users of this API must pass a back-reference when scheduling the original
1197 * task.
1198 *
1199 * MT-Safe
1200 *
1201 * @param master the thread master with the relevant event / task
1202 * @param thread pointer to thread to cancel
1203 * @param eventobj the event
1204 */
1205 void thread_cancel_async(struct thread_master *master, struct thread **thread,
1206 void *eventobj)
1207 {
1208 assert(!(thread && eventobj) && (thread || eventobj));
1209 assert(master->owner != pthread_self());
1210
1211 pthread_mutex_lock(&master->mtx);
1212 {
1213 master->canceled = false;
1214
1215 if (thread) {
1216 struct cancel_req *cr =
1217 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1218 cr->threadref = thread;
1219 listnode_add(master->cancel_req, cr);
1220 } else if (eventobj) {
1221 struct cancel_req *cr =
1222 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1223 cr->eventobj = eventobj;
1224 listnode_add(master->cancel_req, cr);
1225 }
1226 AWAKEN(master);
1227
1228 while (!master->canceled)
1229 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1230 }
1231 pthread_mutex_unlock(&master->mtx);
1232 }
1233 /* ------------------------------------------------------------------------- */
1234
1235 static struct timeval *thread_timer_wait(struct thread_timer_list_head *timers,
1236 struct timeval *timer_val)
1237 {
1238 if (!thread_timer_list_count(timers))
1239 return NULL;
1240
1241 struct thread *next_timer = thread_timer_list_first(timers);
1242 monotime_until(&next_timer->u.sands, timer_val);
1243 return timer_val;
1244 }
1245
1246 static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1247 struct thread *fetch)
1248 {
1249 *fetch = *thread;
1250 thread_add_unuse(m, thread);
1251 return fetch;
1252 }
1253
1254 static int thread_process_io_helper(struct thread_master *m,
1255 struct thread *thread, short state,
1256 short actual_state, int pos)
1257 {
1258 struct thread **thread_array;
1259
1260 /*
1261 * poll() clears the .events field, but the pollfd array we
1262 * pass to poll() is a copy of the one used to schedule threads.
1263 * We need to synchronize state between the two here by applying
1264 * the same changes poll() made on the copy of the "real" pollfd
1265 * array.
1266 *
1267 * This cleans up a possible infinite loop where we refuse
1268 * to respond to a poll event but poll is insistent that
1269 * we should.
1270 */
1271 m->handler.pfds[pos].events &= ~(state);
1272
1273 if (!thread) {
1274 if ((actual_state & (POLLHUP|POLLIN)) != POLLHUP)
1275 flog_err(EC_LIB_NO_THREAD,
1276 "Attempting to process an I/O event but for fd: %d(%d) no thread to handle this!\n",
1277 m->handler.pfds[pos].fd, actual_state);
1278 return 0;
1279 }
1280
1281 if (thread->type == THREAD_READ)
1282 thread_array = m->read;
1283 else
1284 thread_array = m->write;
1285
1286 thread_array[thread->u.fd] = NULL;
1287 thread_list_add_tail(&m->ready, thread);
1288 thread->type = THREAD_READY;
1289
1290 return 1;
1291 }
1292
1293 /**
1294 * Process I/O events.
1295 *
1296 * Walks through file descriptor array looking for those pollfds whose .revents
1297 * field has something interesting. Deletes any invalid file descriptors.
1298 *
1299 * @param m the thread master
1300 * @param num the number of active file descriptors (return value of poll())
1301 */
1302 static void thread_process_io(struct thread_master *m, unsigned int num)
1303 {
1304 unsigned int ready = 0;
1305 struct pollfd *pfds = m->handler.copy;
1306
1307 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1308 /* no event for current fd? immediately continue */
1309 if (pfds[i].revents == 0)
1310 continue;
1311
1312 ready++;
1313
1314 /* Unless someone has called thread_cancel from another pthread,
1315 * the only
1316 * thing that could have changed in m->handler.pfds while we
1317 * were
1318 * asleep is the .events field in a given pollfd. Barring
1319 * thread_cancel()
1320 * that value should be a superset of the values we have in our
1321 * copy, so
1322 * there's no need to update it. Similarily, barring deletion,
1323 * the fd
1324 * should still be a valid index into the master's pfds. */
1325 if (pfds[i].revents & (POLLIN | POLLHUP)) {
1326 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
1327 pfds[i].revents, i);
1328 }
1329 if (pfds[i].revents & POLLOUT)
1330 thread_process_io_helper(m, m->write[pfds[i].fd],
1331 POLLOUT, pfds[i].revents, i);
1332
1333 /* if one of our file descriptors is garbage, remove the same
1334 * from
1335 * both pfds + update sizes and index */
1336 if (pfds[i].revents & POLLNVAL) {
1337 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1338 (m->handler.pfdcount - i - 1)
1339 * sizeof(struct pollfd));
1340 m->handler.pfdcount--;
1341
1342 memmove(pfds + i, pfds + i + 1,
1343 (m->handler.copycount - i - 1)
1344 * sizeof(struct pollfd));
1345 m->handler.copycount--;
1346
1347 i--;
1348 }
1349 }
1350 }
1351
1352 /* Add all timers that have popped to the ready list. */
1353 static unsigned int thread_process_timers(struct thread_timer_list_head *timers,
1354 struct timeval *timenow)
1355 {
1356 struct thread *thread;
1357 unsigned int ready = 0;
1358
1359 while ((thread = thread_timer_list_first(timers))) {
1360 if (timercmp(timenow, &thread->u.sands, <))
1361 return ready;
1362 thread_timer_list_pop(timers);
1363 thread->type = THREAD_READY;
1364 thread_list_add_tail(&thread->master->ready, thread);
1365 ready++;
1366 }
1367 return ready;
1368 }
1369
1370 /* process a list en masse, e.g. for event thread lists */
1371 static unsigned int thread_process(struct thread_list_head *list)
1372 {
1373 struct thread *thread;
1374 unsigned int ready = 0;
1375
1376 while ((thread = thread_list_pop(list))) {
1377 thread->type = THREAD_READY;
1378 thread_list_add_tail(&thread->master->ready, thread);
1379 ready++;
1380 }
1381 return ready;
1382 }
1383
1384
1385 /* Fetch next ready thread. */
1386 struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
1387 {
1388 struct thread *thread = NULL;
1389 struct timeval now;
1390 struct timeval zerotime = {0, 0};
1391 struct timeval tv;
1392 struct timeval *tw = NULL;
1393
1394 int num = 0;
1395
1396 do {
1397 /* Handle signals if any */
1398 if (m->handle_signals)
1399 quagga_sigevent_process();
1400
1401 pthread_mutex_lock(&m->mtx);
1402
1403 /* Process any pending cancellation requests */
1404 do_thread_cancel(m);
1405
1406 /*
1407 * Attempt to flush ready queue before going into poll().
1408 * This is performance-critical. Think twice before modifying.
1409 */
1410 if ((thread = thread_list_pop(&m->ready))) {
1411 fetch = thread_run(m, thread, fetch);
1412 if (fetch->ref)
1413 *fetch->ref = NULL;
1414 pthread_mutex_unlock(&m->mtx);
1415 break;
1416 }
1417
1418 /* otherwise, tick through scheduling sequence */
1419
1420 /*
1421 * Post events to ready queue. This must come before the
1422 * following block since events should occur immediately
1423 */
1424 thread_process(&m->event);
1425
1426 /*
1427 * If there are no tasks on the ready queue, we will poll()
1428 * until a timer expires or we receive I/O, whichever comes
1429 * first. The strategy for doing this is:
1430 *
1431 * - If there are events pending, set the poll() timeout to zero
1432 * - If there are no events pending, but there are timers
1433 * pending, set the
1434 * timeout to the smallest remaining time on any timer
1435 * - If there are neither timers nor events pending, but there
1436 * are file
1437 * descriptors pending, block indefinitely in poll()
1438 * - If nothing is pending, it's time for the application to die
1439 *
1440 * In every case except the last, we need to hit poll() at least
1441 * once per loop to avoid starvation by events
1442 */
1443 if (!thread_list_count(&m->ready))
1444 tw = thread_timer_wait(&m->timer, &tv);
1445
1446 if (thread_list_count(&m->ready) ||
1447 (tw && !timercmp(tw, &zerotime, >)))
1448 tw = &zerotime;
1449
1450 if (!tw && m->handler.pfdcount == 0) { /* die */
1451 pthread_mutex_unlock(&m->mtx);
1452 fetch = NULL;
1453 break;
1454 }
1455
1456 /*
1457 * Copy pollfd array + # active pollfds in it. Not necessary to
1458 * copy the array size as this is fixed.
1459 */
1460 m->handler.copycount = m->handler.pfdcount;
1461 memcpy(m->handler.copy, m->handler.pfds,
1462 m->handler.copycount * sizeof(struct pollfd));
1463
1464 pthread_mutex_unlock(&m->mtx);
1465 {
1466 num = fd_poll(m, m->handler.copy, m->handler.pfdsize,
1467 m->handler.copycount, tw);
1468 }
1469 pthread_mutex_lock(&m->mtx);
1470
1471 /* Handle any errors received in poll() */
1472 if (num < 0) {
1473 if (errno == EINTR) {
1474 pthread_mutex_unlock(&m->mtx);
1475 /* loop around to signal handler */
1476 continue;
1477 }
1478
1479 /* else die */
1480 flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
1481 safe_strerror(errno));
1482 pthread_mutex_unlock(&m->mtx);
1483 fetch = NULL;
1484 break;
1485 }
1486
1487 /* Post timers to ready queue. */
1488 monotime(&now);
1489 thread_process_timers(&m->timer, &now);
1490
1491 /* Post I/O to ready queue. */
1492 if (num > 0)
1493 thread_process_io(m, num);
1494
1495 pthread_mutex_unlock(&m->mtx);
1496
1497 } while (!thread && m->spin);
1498
1499 return fetch;
1500 }
1501
1502 static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
1503 {
1504 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1505 + (a.tv_usec - b.tv_usec));
1506 }
1507
1508 unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1509 unsigned long *cputime)
1510 {
1511 /* This is 'user + sys' time. */
1512 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1513 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1514 return timeval_elapsed(now->real, start->real);
1515 }
1516
1517 /* We should aim to yield after yield milliseconds, which defaults
1518 to THREAD_YIELD_TIME_SLOT .
1519 Note: we are using real (wall clock) time for this calculation.
1520 It could be argued that CPU time may make more sense in certain
1521 contexts. The things to consider are whether the thread may have
1522 blocked (in which case wall time increases, but CPU time does not),
1523 or whether the system is heavily loaded with other processes competing
1524 for CPU time. On balance, wall clock time seems to make sense.
1525 Plus it has the added benefit that gettimeofday should be faster
1526 than calling getrusage. */
1527 int thread_should_yield(struct thread *thread)
1528 {
1529 int result;
1530 pthread_mutex_lock(&thread->mtx);
1531 {
1532 result = monotime_since(&thread->real, NULL)
1533 > (int64_t)thread->yield;
1534 }
1535 pthread_mutex_unlock(&thread->mtx);
1536 return result;
1537 }
1538
1539 void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
1540 {
1541 pthread_mutex_lock(&thread->mtx);
1542 {
1543 thread->yield = yield_time;
1544 }
1545 pthread_mutex_unlock(&thread->mtx);
1546 }
1547
1548 void thread_getrusage(RUSAGE_T *r)
1549 {
1550 #if defined RUSAGE_THREAD
1551 #define FRR_RUSAGE RUSAGE_THREAD
1552 #else
1553 #define FRR_RUSAGE RUSAGE_SELF
1554 #endif
1555 monotime(&r->real);
1556 getrusage(FRR_RUSAGE, &(r->cpu));
1557 }
1558
1559 /*
1560 * Call a thread.
1561 *
1562 * This function will atomically update the thread's usage history. At present
1563 * this is the only spot where usage history is written. Nevertheless the code
1564 * has been written such that the introduction of writers in the future should
1565 * not need to update it provided the writers atomically perform only the
1566 * operations done here, i.e. updating the total and maximum times. In
1567 * particular, the maximum real and cpu times must be monotonically increasing
1568 * or this code is not correct.
1569 */
1570 void thread_call(struct thread *thread)
1571 {
1572 _Atomic unsigned long realtime, cputime;
1573 unsigned long exp;
1574 unsigned long helper;
1575 RUSAGE_T before, after;
1576
1577 GETRUSAGE(&before);
1578 thread->real = before.real;
1579
1580 pthread_setspecific(thread_current, thread);
1581 (*thread->func)(thread);
1582 pthread_setspecific(thread_current, NULL);
1583
1584 GETRUSAGE(&after);
1585
1586 realtime = thread_consumed_time(&after, &before, &helper);
1587 cputime = helper;
1588
1589 /* update realtime */
1590 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1591 memory_order_seq_cst);
1592 exp = atomic_load_explicit(&thread->hist->real.max,
1593 memory_order_seq_cst);
1594 while (exp < realtime
1595 && !atomic_compare_exchange_weak_explicit(
1596 &thread->hist->real.max, &exp, realtime,
1597 memory_order_seq_cst, memory_order_seq_cst))
1598 ;
1599
1600 /* update cputime */
1601 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1602 memory_order_seq_cst);
1603 exp = atomic_load_explicit(&thread->hist->cpu.max,
1604 memory_order_seq_cst);
1605 while (exp < cputime
1606 && !atomic_compare_exchange_weak_explicit(
1607 &thread->hist->cpu.max, &exp, cputime,
1608 memory_order_seq_cst, memory_order_seq_cst))
1609 ;
1610
1611 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1612 memory_order_seq_cst);
1613 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1614 memory_order_seq_cst);
1615
1616 #ifdef CONSUMED_TIME_CHECK
1617 if (realtime > CONSUMED_TIME_CHECK) {
1618 /*
1619 * We have a CPU Hog on our hands.
1620 * Whinge about it now, so we're aware this is yet another task
1621 * to fix.
1622 */
1623 flog_warn(
1624 EC_LIB_SLOW_THREAD,
1625 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1626 thread->funcname, (unsigned long)thread->func,
1627 realtime / 1000, cputime / 1000);
1628 }
1629 #endif /* CONSUMED_TIME_CHECK */
1630 }
1631
1632 /* Execute thread */
1633 void funcname_thread_execute(struct thread_master *m,
1634 int (*func)(struct thread *), void *arg, int val,
1635 debugargdef)
1636 {
1637 struct thread *thread;
1638
1639 /* Get or allocate new thread to execute. */
1640 pthread_mutex_lock(&m->mtx);
1641 {
1642 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
1643
1644 /* Set its event value. */
1645 pthread_mutex_lock(&thread->mtx);
1646 {
1647 thread->add_type = THREAD_EXECUTE;
1648 thread->u.val = val;
1649 thread->ref = &thread;
1650 }
1651 pthread_mutex_unlock(&thread->mtx);
1652 }
1653 pthread_mutex_unlock(&m->mtx);
1654
1655 /* Execute thread doing all accounting. */
1656 thread_call(thread);
1657
1658 /* Give back or free thread. */
1659 thread_add_unuse(m, thread);
1660 }