]> git.proxmox.com Git - mirror_frr.git/blob - lib/thread.c
* : clean up format specifiers for gcc-10
[mirror_frr.git] / lib / thread.c
1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
21 /* #define DEBUG */
22
23 #include <zebra.h>
24 #include <sys/resource.h>
25
26 #include "thread.h"
27 #include "memory.h"
28 #include "frrcu.h"
29 #include "log.h"
30 #include "hash.h"
31 #include "command.h"
32 #include "sigevent.h"
33 #include "network.h"
34 #include "jhash.h"
35 #include "frratomic.h"
36 #include "frr_pthread.h"
37 #include "lib_errors.h"
38 #include "libfrr_trace.h"
39
40 DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
41 DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
42 DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
43 DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
44
45 DECLARE_LIST(thread_list, struct thread, threaditem)
46
47 static int thread_timer_cmp(const struct thread *a, const struct thread *b)
48 {
49 if (a->u.sands.tv_sec < b->u.sands.tv_sec)
50 return -1;
51 if (a->u.sands.tv_sec > b->u.sands.tv_sec)
52 return 1;
53 if (a->u.sands.tv_usec < b->u.sands.tv_usec)
54 return -1;
55 if (a->u.sands.tv_usec > b->u.sands.tv_usec)
56 return 1;
57 return 0;
58 }
59
60 DECLARE_HEAP(thread_timer_list, struct thread, timeritem,
61 thread_timer_cmp)
62
63 #if defined(__APPLE__)
64 #include <mach/mach.h>
65 #include <mach/mach_time.h>
66 #endif
67
68 #define AWAKEN(m) \
69 do { \
70 const unsigned char wakebyte = 0x01; \
71 write(m->io_pipe[1], &wakebyte, 1); \
72 } while (0);
73
74 /* control variable for initializer */
75 static pthread_once_t init_once = PTHREAD_ONCE_INIT;
76 pthread_key_t thread_current;
77
78 static pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
79 static struct list *masters;
80
81 static void thread_free(struct thread_master *master, struct thread *thread);
82
83 /* CLI start ---------------------------------------------------------------- */
84 static unsigned int cpu_record_hash_key(const struct cpu_thread_history *a)
85 {
86 int size = sizeof(a->func);
87
88 return jhash(&a->func, size, 0);
89 }
90
91 static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
92 const struct cpu_thread_history *b)
93 {
94 return a->func == b->func;
95 }
96
97 static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
98 {
99 struct cpu_thread_history *new;
100 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
101 new->func = a->func;
102 new->funcname = a->funcname;
103 return new;
104 }
105
106 static void cpu_record_hash_free(void *a)
107 {
108 struct cpu_thread_history *hist = a;
109
110 XFREE(MTYPE_THREAD_STATS, hist);
111 }
112
113 #ifndef EXCLUDE_CPU_TIME
114 static void vty_out_cpu_thread_history(struct vty *vty,
115 struct cpu_thread_history *a)
116 {
117 vty_out(vty, "%5zu %10zu.%03zu %9zu %8zu %9zu %8zu %9zu",
118 a->total_active, a->cpu.total / 1000, a->cpu.total % 1000,
119 a->total_calls, (a->cpu.total / a->total_calls), a->cpu.max,
120 (a->real.total / a->total_calls), a->real.max);
121 vty_out(vty, " %c%c%c%c%c %s\n",
122 a->types & (1 << THREAD_READ) ? 'R' : ' ',
123 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
124 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
125 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
126 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
127 }
128
129 static void cpu_record_hash_print(struct hash_bucket *bucket, void *args[])
130 {
131 struct cpu_thread_history *totals = args[0];
132 struct cpu_thread_history copy;
133 struct vty *vty = args[1];
134 uint8_t *filter = args[2];
135
136 struct cpu_thread_history *a = bucket->data;
137
138 copy.total_active =
139 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
140 copy.total_calls =
141 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
142 copy.cpu.total =
143 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
144 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
145 copy.real.total =
146 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
147 copy.real.max =
148 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
149 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
150 copy.funcname = a->funcname;
151
152 if (!(copy.types & *filter))
153 return;
154
155 vty_out_cpu_thread_history(vty, &copy);
156 totals->total_active += copy.total_active;
157 totals->total_calls += copy.total_calls;
158 totals->real.total += copy.real.total;
159 if (totals->real.max < copy.real.max)
160 totals->real.max = copy.real.max;
161 totals->cpu.total += copy.cpu.total;
162 if (totals->cpu.max < copy.cpu.max)
163 totals->cpu.max = copy.cpu.max;
164 }
165
166 static void cpu_record_print(struct vty *vty, uint8_t filter)
167 {
168 struct cpu_thread_history tmp;
169 void *args[3] = {&tmp, vty, &filter};
170 struct thread_master *m;
171 struct listnode *ln;
172
173 memset(&tmp, 0, sizeof(tmp));
174 tmp.funcname = "TOTAL";
175 tmp.types = filter;
176
177 frr_with_mutex(&masters_mtx) {
178 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
179 const char *name = m->name ? m->name : "main";
180
181 char underline[strlen(name) + 1];
182 memset(underline, '-', sizeof(underline));
183 underline[sizeof(underline) - 1] = '\0';
184
185 vty_out(vty, "\n");
186 vty_out(vty, "Showing statistics for pthread %s\n",
187 name);
188 vty_out(vty, "-------------------------------%s\n",
189 underline);
190 vty_out(vty, "%21s %18s %18s\n", "",
191 "CPU (user+system):", "Real (wall-clock):");
192 vty_out(vty,
193 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
194 vty_out(vty, " Avg uSec Max uSecs");
195 vty_out(vty, " Type Thread\n");
196
197 if (m->cpu_record->count)
198 hash_iterate(
199 m->cpu_record,
200 (void (*)(struct hash_bucket *,
201 void *))cpu_record_hash_print,
202 args);
203 else
204 vty_out(vty, "No data to display yet.\n");
205
206 vty_out(vty, "\n");
207 }
208 }
209
210 vty_out(vty, "\n");
211 vty_out(vty, "Total thread statistics\n");
212 vty_out(vty, "-------------------------\n");
213 vty_out(vty, "%21s %18s %18s\n", "",
214 "CPU (user+system):", "Real (wall-clock):");
215 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
216 vty_out(vty, " Avg uSec Max uSecs");
217 vty_out(vty, " Type Thread\n");
218
219 if (tmp.total_calls > 0)
220 vty_out_cpu_thread_history(vty, &tmp);
221 }
222 #endif
223
224 static void cpu_record_hash_clear(struct hash_bucket *bucket, void *args[])
225 {
226 uint8_t *filter = args[0];
227 struct hash *cpu_record = args[1];
228
229 struct cpu_thread_history *a = bucket->data;
230
231 if (!(a->types & *filter))
232 return;
233
234 hash_release(cpu_record, bucket->data);
235 }
236
237 static void cpu_record_clear(uint8_t filter)
238 {
239 uint8_t *tmp = &filter;
240 struct thread_master *m;
241 struct listnode *ln;
242
243 frr_with_mutex(&masters_mtx) {
244 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
245 frr_with_mutex(&m->mtx) {
246 void *args[2] = {tmp, m->cpu_record};
247 hash_iterate(
248 m->cpu_record,
249 (void (*)(struct hash_bucket *,
250 void *))cpu_record_hash_clear,
251 args);
252 }
253 }
254 }
255 }
256
257 static uint8_t parse_filter(const char *filterstr)
258 {
259 int i = 0;
260 int filter = 0;
261
262 while (filterstr[i] != '\0') {
263 switch (filterstr[i]) {
264 case 'r':
265 case 'R':
266 filter |= (1 << THREAD_READ);
267 break;
268 case 'w':
269 case 'W':
270 filter |= (1 << THREAD_WRITE);
271 break;
272 case 't':
273 case 'T':
274 filter |= (1 << THREAD_TIMER);
275 break;
276 case 'e':
277 case 'E':
278 filter |= (1 << THREAD_EVENT);
279 break;
280 case 'x':
281 case 'X':
282 filter |= (1 << THREAD_EXECUTE);
283 break;
284 default:
285 break;
286 }
287 ++i;
288 }
289 return filter;
290 }
291
292 #ifndef EXCLUDE_CPU_TIME
293 DEFUN (show_thread_cpu,
294 show_thread_cpu_cmd,
295 "show thread cpu [FILTER]",
296 SHOW_STR
297 "Thread information\n"
298 "Thread CPU usage\n"
299 "Display filter (rwtex)\n")
300 {
301 uint8_t filter = (uint8_t)-1U;
302 int idx = 0;
303
304 if (argv_find(argv, argc, "FILTER", &idx)) {
305 filter = parse_filter(argv[idx]->arg);
306 if (!filter) {
307 vty_out(vty,
308 "Invalid filter \"%s\" specified; must contain at leastone of 'RWTEXB'\n",
309 argv[idx]->arg);
310 return CMD_WARNING;
311 }
312 }
313
314 cpu_record_print(vty, filter);
315 return CMD_SUCCESS;
316 }
317 #endif
318
319 static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
320 {
321 const char *name = m->name ? m->name : "main";
322 char underline[strlen(name) + 1];
323 struct thread *thread;
324 uint32_t i;
325
326 memset(underline, '-', sizeof(underline));
327 underline[sizeof(underline) - 1] = '\0';
328
329 vty_out(vty, "\nShowing poll FD's for %s\n", name);
330 vty_out(vty, "----------------------%s\n", underline);
331 vty_out(vty, "Count: %u/%d\n", (uint32_t)m->handler.pfdcount,
332 m->fd_limit);
333 for (i = 0; i < m->handler.pfdcount; i++) {
334 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\t\t", i,
335 m->handler.pfds[i].fd, m->handler.pfds[i].events,
336 m->handler.pfds[i].revents);
337
338 if (m->handler.pfds[i].events & POLLIN) {
339 thread = m->read[m->handler.pfds[i].fd];
340
341 if (!thread)
342 vty_out(vty, "ERROR ");
343 else
344 vty_out(vty, "%s ", thread->funcname);
345 } else
346 vty_out(vty, " ");
347
348 if (m->handler.pfds[i].events & POLLOUT) {
349 thread = m->write[m->handler.pfds[i].fd];
350
351 if (!thread)
352 vty_out(vty, "ERROR\n");
353 else
354 vty_out(vty, "%s\n", thread->funcname);
355 } else
356 vty_out(vty, "\n");
357 }
358 }
359
360 DEFUN (show_thread_poll,
361 show_thread_poll_cmd,
362 "show thread poll",
363 SHOW_STR
364 "Thread information\n"
365 "Show poll FD's and information\n")
366 {
367 struct listnode *node;
368 struct thread_master *m;
369
370 frr_with_mutex(&masters_mtx) {
371 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
372 show_thread_poll_helper(vty, m);
373 }
374 }
375
376 return CMD_SUCCESS;
377 }
378
379
380 DEFUN (clear_thread_cpu,
381 clear_thread_cpu_cmd,
382 "clear thread cpu [FILTER]",
383 "Clear stored data in all pthreads\n"
384 "Thread information\n"
385 "Thread CPU usage\n"
386 "Display filter (rwtexb)\n")
387 {
388 uint8_t filter = (uint8_t)-1U;
389 int idx = 0;
390
391 if (argv_find(argv, argc, "FILTER", &idx)) {
392 filter = parse_filter(argv[idx]->arg);
393 if (!filter) {
394 vty_out(vty,
395 "Invalid filter \"%s\" specified; must contain at leastone of 'RWTEXB'\n",
396 argv[idx]->arg);
397 return CMD_WARNING;
398 }
399 }
400
401 cpu_record_clear(filter);
402 return CMD_SUCCESS;
403 }
404
405 void thread_cmd_init(void)
406 {
407 #ifndef EXCLUDE_CPU_TIME
408 install_element(VIEW_NODE, &show_thread_cpu_cmd);
409 #endif
410 install_element(VIEW_NODE, &show_thread_poll_cmd);
411 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
412 }
413 /* CLI end ------------------------------------------------------------------ */
414
415
416 static void cancelreq_del(void *cr)
417 {
418 XFREE(MTYPE_TMP, cr);
419 }
420
421 /* initializer, only ever called once */
422 static void initializer(void)
423 {
424 pthread_key_create(&thread_current, NULL);
425 }
426
427 struct thread_master *thread_master_create(const char *name)
428 {
429 struct thread_master *rv;
430 struct rlimit limit;
431
432 pthread_once(&init_once, &initializer);
433
434 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
435
436 /* Initialize master mutex */
437 pthread_mutex_init(&rv->mtx, NULL);
438 pthread_cond_init(&rv->cancel_cond, NULL);
439
440 /* Set name */
441 name = name ? name : "default";
442 rv->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
443
444 /* Initialize I/O task data structures */
445 getrlimit(RLIMIT_NOFILE, &limit);
446 rv->fd_limit = (int)limit.rlim_cur;
447 rv->read = XCALLOC(MTYPE_THREAD_POLL,
448 sizeof(struct thread *) * rv->fd_limit);
449
450 rv->write = XCALLOC(MTYPE_THREAD_POLL,
451 sizeof(struct thread *) * rv->fd_limit);
452
453 char tmhashname[strlen(name) + 32];
454 snprintf(tmhashname, sizeof(tmhashname), "%s - threadmaster event hash",
455 name);
456 rv->cpu_record = hash_create_size(
457 8, (unsigned int (*)(const void *))cpu_record_hash_key,
458 (bool (*)(const void *, const void *))cpu_record_hash_cmp,
459 tmhashname);
460
461 thread_list_init(&rv->event);
462 thread_list_init(&rv->ready);
463 thread_list_init(&rv->unuse);
464 thread_timer_list_init(&rv->timer);
465
466 /* Initialize thread_fetch() settings */
467 rv->spin = true;
468 rv->handle_signals = true;
469
470 /* Set pthread owner, should be updated by actual owner */
471 rv->owner = pthread_self();
472 rv->cancel_req = list_new();
473 rv->cancel_req->del = cancelreq_del;
474 rv->canceled = true;
475
476 /* Initialize pipe poker */
477 pipe(rv->io_pipe);
478 set_nonblocking(rv->io_pipe[0]);
479 set_nonblocking(rv->io_pipe[1]);
480
481 /* Initialize data structures for poll() */
482 rv->handler.pfdsize = rv->fd_limit;
483 rv->handler.pfdcount = 0;
484 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
485 sizeof(struct pollfd) * rv->handler.pfdsize);
486 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
487 sizeof(struct pollfd) * rv->handler.pfdsize);
488
489 /* add to list of threadmasters */
490 frr_with_mutex(&masters_mtx) {
491 if (!masters)
492 masters = list_new();
493
494 listnode_add(masters, rv);
495 }
496
497 return rv;
498 }
499
500 void thread_master_set_name(struct thread_master *master, const char *name)
501 {
502 frr_with_mutex(&master->mtx) {
503 XFREE(MTYPE_THREAD_MASTER, master->name);
504 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
505 }
506 }
507
508 #define THREAD_UNUSED_DEPTH 10
509
510 /* Move thread to unuse list. */
511 static void thread_add_unuse(struct thread_master *m, struct thread *thread)
512 {
513 pthread_mutex_t mtxc = thread->mtx;
514
515 assert(m != NULL && thread != NULL);
516
517 thread->hist->total_active--;
518 memset(thread, 0, sizeof(struct thread));
519 thread->type = THREAD_UNUSED;
520
521 /* Restore the thread mutex context. */
522 thread->mtx = mtxc;
523
524 if (thread_list_count(&m->unuse) < THREAD_UNUSED_DEPTH) {
525 thread_list_add_tail(&m->unuse, thread);
526 return;
527 }
528
529 thread_free(m, thread);
530 }
531
532 /* Free all unused thread. */
533 static void thread_list_free(struct thread_master *m,
534 struct thread_list_head *list)
535 {
536 struct thread *t;
537
538 while ((t = thread_list_pop(list)))
539 thread_free(m, t);
540 }
541
542 static void thread_array_free(struct thread_master *m,
543 struct thread **thread_array)
544 {
545 struct thread *t;
546 int index;
547
548 for (index = 0; index < m->fd_limit; ++index) {
549 t = thread_array[index];
550 if (t) {
551 thread_array[index] = NULL;
552 thread_free(m, t);
553 }
554 }
555 XFREE(MTYPE_THREAD_POLL, thread_array);
556 }
557
558 /*
559 * thread_master_free_unused
560 *
561 * As threads are finished with they are put on the
562 * unuse list for later reuse.
563 * If we are shutting down, Free up unused threads
564 * So we can see if we forget to shut anything off
565 */
566 void thread_master_free_unused(struct thread_master *m)
567 {
568 frr_with_mutex(&m->mtx) {
569 struct thread *t;
570 while ((t = thread_list_pop(&m->unuse)))
571 thread_free(m, t);
572 }
573 }
574
575 /* Stop thread scheduler. */
576 void thread_master_free(struct thread_master *m)
577 {
578 struct thread *t;
579
580 frr_with_mutex(&masters_mtx) {
581 listnode_delete(masters, m);
582 if (masters->count == 0) {
583 list_delete(&masters);
584 }
585 }
586
587 thread_array_free(m, m->read);
588 thread_array_free(m, m->write);
589 while ((t = thread_timer_list_pop(&m->timer)))
590 thread_free(m, t);
591 thread_list_free(m, &m->event);
592 thread_list_free(m, &m->ready);
593 thread_list_free(m, &m->unuse);
594 pthread_mutex_destroy(&m->mtx);
595 pthread_cond_destroy(&m->cancel_cond);
596 close(m->io_pipe[0]);
597 close(m->io_pipe[1]);
598 list_delete(&m->cancel_req);
599 m->cancel_req = NULL;
600
601 hash_clean(m->cpu_record, cpu_record_hash_free);
602 hash_free(m->cpu_record);
603 m->cpu_record = NULL;
604
605 XFREE(MTYPE_THREAD_MASTER, m->name);
606 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
607 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
608 XFREE(MTYPE_THREAD_MASTER, m);
609 }
610
611 /* Return remain time in miliseconds. */
612 unsigned long thread_timer_remain_msec(struct thread *thread)
613 {
614 int64_t remain;
615
616 frr_with_mutex(&thread->mtx) {
617 remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
618 }
619
620 return remain < 0 ? 0 : remain;
621 }
622
623 /* Return remain time in seconds. */
624 unsigned long thread_timer_remain_second(struct thread *thread)
625 {
626 return thread_timer_remain_msec(thread) / 1000LL;
627 }
628
629 #define debugargdef const char *funcname, const char *schedfrom, int fromln
630 #define debugargpass funcname, schedfrom, fromln
631
632 struct timeval thread_timer_remain(struct thread *thread)
633 {
634 struct timeval remain;
635 frr_with_mutex(&thread->mtx) {
636 monotime_until(&thread->u.sands, &remain);
637 }
638 return remain;
639 }
640
641 static int time_hhmmss(char *buf, int buf_size, long sec)
642 {
643 long hh;
644 long mm;
645 int wr;
646
647 zassert(buf_size >= 8);
648
649 hh = sec / 3600;
650 sec %= 3600;
651 mm = sec / 60;
652 sec %= 60;
653
654 wr = snprintf(buf, buf_size, "%02ld:%02ld:%02ld", hh, mm, sec);
655
656 return wr != 8;
657 }
658
659 char *thread_timer_to_hhmmss(char *buf, int buf_size,
660 struct thread *t_timer)
661 {
662 if (t_timer) {
663 time_hhmmss(buf, buf_size,
664 thread_timer_remain_second(t_timer));
665 } else {
666 snprintf(buf, buf_size, "--:--:--");
667 }
668 return buf;
669 }
670
671 /* Get new thread. */
672 static struct thread *thread_get(struct thread_master *m, uint8_t type,
673 int (*func)(struct thread *), void *arg,
674 debugargdef)
675 {
676 struct thread *thread = thread_list_pop(&m->unuse);
677 struct cpu_thread_history tmp;
678
679 if (!thread) {
680 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
681 /* mutex only needs to be initialized at struct creation. */
682 pthread_mutex_init(&thread->mtx, NULL);
683 m->alloc++;
684 }
685
686 thread->type = type;
687 thread->add_type = type;
688 thread->master = m;
689 thread->arg = arg;
690 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
691 thread->ref = NULL;
692
693 /*
694 * So if the passed in funcname is not what we have
695 * stored that means the thread->hist needs to be
696 * updated. We keep the last one around in unused
697 * under the assumption that we are probably
698 * going to immediately allocate the same
699 * type of thread.
700 * This hopefully saves us some serious
701 * hash_get lookups.
702 */
703 if (thread->funcname != funcname || thread->func != func) {
704 tmp.func = func;
705 tmp.funcname = funcname;
706 thread->hist =
707 hash_get(m->cpu_record, &tmp,
708 (void *(*)(void *))cpu_record_hash_alloc);
709 }
710 thread->hist->total_active++;
711 thread->func = func;
712 thread->funcname = funcname;
713 thread->schedfrom = schedfrom;
714 thread->schedfrom_line = fromln;
715
716 return thread;
717 }
718
719 static void thread_free(struct thread_master *master, struct thread *thread)
720 {
721 /* Update statistics. */
722 assert(master->alloc > 0);
723 master->alloc--;
724
725 /* Free allocated resources. */
726 pthread_mutex_destroy(&thread->mtx);
727 XFREE(MTYPE_THREAD, thread);
728 }
729
730 static int fd_poll(struct thread_master *m, const struct timeval *timer_wait,
731 bool *eintr_p)
732 {
733 sigset_t origsigs;
734 unsigned char trash[64];
735 nfds_t count = m->handler.copycount;
736
737 /*
738 * If timer_wait is null here, that means poll() should block
739 * indefinitely, unless the thread_master has overridden it by setting
740 * ->selectpoll_timeout.
741 *
742 * If the value is positive, it specifies the maximum number of
743 * milliseconds to wait. If the timeout is -1, it specifies that
744 * we should never wait and always return immediately even if no
745 * event is detected. If the value is zero, the behavior is default.
746 */
747 int timeout = -1;
748
749 /* number of file descriptors with events */
750 int num;
751
752 if (timer_wait != NULL
753 && m->selectpoll_timeout == 0) // use the default value
754 timeout = (timer_wait->tv_sec * 1000)
755 + (timer_wait->tv_usec / 1000);
756 else if (m->selectpoll_timeout > 0) // use the user's timeout
757 timeout = m->selectpoll_timeout;
758 else if (m->selectpoll_timeout
759 < 0) // effect a poll (return immediately)
760 timeout = 0;
761
762 zlog_tls_buffer_flush();
763 rcu_read_unlock();
764 rcu_assert_read_unlocked();
765
766 /* add poll pipe poker */
767 assert(count + 1 < m->handler.pfdsize);
768 m->handler.copy[count].fd = m->io_pipe[0];
769 m->handler.copy[count].events = POLLIN;
770 m->handler.copy[count].revents = 0x00;
771
772 /* We need to deal with a signal-handling race here: we
773 * don't want to miss a crucial signal, such as SIGTERM or SIGINT,
774 * that may arrive just before we enter poll(). We will block the
775 * key signals, then check whether any have arrived - if so, we return
776 * before calling poll(). If not, we'll re-enable the signals
777 * in the ppoll() call.
778 */
779
780 sigemptyset(&origsigs);
781 if (m->handle_signals) {
782 /* Main pthread that handles the app signals */
783 if (frr_sigevent_check(&origsigs)) {
784 /* Signal to process - restore signal mask and return */
785 pthread_sigmask(SIG_SETMASK, &origsigs, NULL);
786 num = -1;
787 *eintr_p = true;
788 goto done;
789 }
790 } else {
791 /* Don't make any changes for the non-main pthreads */
792 pthread_sigmask(SIG_SETMASK, NULL, &origsigs);
793 }
794
795 #if defined(HAVE_PPOLL)
796 struct timespec ts, *tsp;
797
798 if (timeout >= 0) {
799 ts.tv_sec = timeout / 1000;
800 ts.tv_nsec = (timeout % 1000) * 1000000;
801 tsp = &ts;
802 } else
803 tsp = NULL;
804
805 num = ppoll(m->handler.copy, count + 1, tsp, &origsigs);
806 pthread_sigmask(SIG_SETMASK, &origsigs, NULL);
807 #else
808 /* Not ideal - there is a race after we restore the signal mask */
809 pthread_sigmask(SIG_SETMASK, &origsigs, NULL);
810 num = poll(m->handler.copy, count + 1, timeout);
811 #endif
812
813 done:
814
815 if (num < 0 && errno == EINTR)
816 *eintr_p = true;
817
818 if (num > 0 && m->handler.copy[count].revents != 0 && num--)
819 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
820 ;
821
822 rcu_read_lock();
823
824 return num;
825 }
826
827 /* Add new read thread. */
828 struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
829 int (*func)(struct thread *),
830 void *arg, int fd,
831 struct thread **t_ptr,
832 debugargdef)
833 {
834 struct thread *thread = NULL;
835 struct thread **thread_array;
836
837 if (dir == THREAD_READ)
838 frrtrace(9, frr_libfrr, schedule_read, m, funcname, schedfrom,
839 fromln, t_ptr, fd, 0, arg, 0);
840 else
841 frrtrace(9, frr_libfrr, schedule_write, m, funcname, schedfrom,
842 fromln, t_ptr, fd, 0, arg, 0);
843
844 assert(fd >= 0 && fd < m->fd_limit);
845 frr_with_mutex(&m->mtx) {
846 if (t_ptr && *t_ptr)
847 // thread is already scheduled; don't reschedule
848 break;
849
850 /* default to a new pollfd */
851 nfds_t queuepos = m->handler.pfdcount;
852
853 if (dir == THREAD_READ)
854 thread_array = m->read;
855 else
856 thread_array = m->write;
857
858 /* if we already have a pollfd for our file descriptor, find and
859 * use it */
860 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
861 if (m->handler.pfds[i].fd == fd) {
862 queuepos = i;
863
864 #ifdef DEV_BUILD
865 /*
866 * What happens if we have a thread already
867 * created for this event?
868 */
869 if (thread_array[fd])
870 assert(!"Thread already scheduled for file descriptor");
871 #endif
872 break;
873 }
874
875 /* make sure we have room for this fd + pipe poker fd */
876 assert(queuepos + 1 < m->handler.pfdsize);
877
878 thread = thread_get(m, dir, func, arg, debugargpass);
879
880 m->handler.pfds[queuepos].fd = fd;
881 m->handler.pfds[queuepos].events |=
882 (dir == THREAD_READ ? POLLIN : POLLOUT);
883
884 if (queuepos == m->handler.pfdcount)
885 m->handler.pfdcount++;
886
887 if (thread) {
888 frr_with_mutex(&thread->mtx) {
889 thread->u.fd = fd;
890 thread_array[thread->u.fd] = thread;
891 }
892
893 if (t_ptr) {
894 *t_ptr = thread;
895 thread->ref = t_ptr;
896 }
897 }
898
899 AWAKEN(m);
900 }
901
902 return thread;
903 }
904
905 static struct thread *
906 funcname_thread_add_timer_timeval(struct thread_master *m,
907 int (*func)(struct thread *), int type,
908 void *arg, struct timeval *time_relative,
909 struct thread **t_ptr, debugargdef)
910 {
911 struct thread *thread;
912
913 assert(m != NULL);
914
915 assert(type == THREAD_TIMER);
916 assert(time_relative);
917
918 frrtrace(9, frr_libfrr, schedule_timer, m, funcname, schedfrom, fromln,
919 t_ptr, 0, 0, arg, (long)time_relative->tv_sec);
920
921 frr_with_mutex(&m->mtx) {
922 if (t_ptr && *t_ptr)
923 /* thread is already scheduled; don't reschedule */
924 return NULL;
925
926 thread = thread_get(m, type, func, arg, debugargpass);
927
928 frr_with_mutex(&thread->mtx) {
929 monotime(&thread->u.sands);
930 timeradd(&thread->u.sands, time_relative,
931 &thread->u.sands);
932 thread_timer_list_add(&m->timer, thread);
933 if (t_ptr) {
934 *t_ptr = thread;
935 thread->ref = t_ptr;
936 }
937 }
938
939 AWAKEN(m);
940 }
941
942 return thread;
943 }
944
945
946 /* Add timer event thread. */
947 struct thread *funcname_thread_add_timer(struct thread_master *m,
948 int (*func)(struct thread *),
949 void *arg, long timer,
950 struct thread **t_ptr, debugargdef)
951 {
952 struct timeval trel;
953
954 assert(m != NULL);
955
956 trel.tv_sec = timer;
957 trel.tv_usec = 0;
958
959 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
960 &trel, t_ptr, debugargpass);
961 }
962
963 /* Add timer event thread with "millisecond" resolution */
964 struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
965 int (*func)(struct thread *),
966 void *arg, long timer,
967 struct thread **t_ptr,
968 debugargdef)
969 {
970 struct timeval trel;
971
972 assert(m != NULL);
973
974 trel.tv_sec = timer / 1000;
975 trel.tv_usec = 1000 * (timer % 1000);
976
977 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
978 &trel, t_ptr, debugargpass);
979 }
980
981 /* Add timer event thread with "millisecond" resolution */
982 struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
983 int (*func)(struct thread *),
984 void *arg, struct timeval *tv,
985 struct thread **t_ptr, debugargdef)
986 {
987 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
988 t_ptr, debugargpass);
989 }
990
991 /* Add simple event thread. */
992 struct thread *funcname_thread_add_event(struct thread_master *m,
993 int (*func)(struct thread *),
994 void *arg, int val,
995 struct thread **t_ptr, debugargdef)
996 {
997 struct thread *thread = NULL;
998
999 frrtrace(9, frr_libfrr, schedule_event, m, funcname, schedfrom, fromln,
1000 t_ptr, 0, val, arg, 0);
1001
1002 assert(m != NULL);
1003
1004 frr_with_mutex(&m->mtx) {
1005 if (t_ptr && *t_ptr)
1006 /* thread is already scheduled; don't reschedule */
1007 break;
1008
1009 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
1010 frr_with_mutex(&thread->mtx) {
1011 thread->u.val = val;
1012 thread_list_add_tail(&m->event, thread);
1013 }
1014
1015 if (t_ptr) {
1016 *t_ptr = thread;
1017 thread->ref = t_ptr;
1018 }
1019
1020 AWAKEN(m);
1021 }
1022
1023 return thread;
1024 }
1025
1026 /* Thread cancellation ------------------------------------------------------ */
1027
1028 /**
1029 * NOT's out the .events field of pollfd corresponding to the given file
1030 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
1031 *
1032 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
1033 * implementation for details.
1034 *
1035 * @param master
1036 * @param fd
1037 * @param state the event to cancel. One or more (OR'd together) of the
1038 * following:
1039 * - POLLIN
1040 * - POLLOUT
1041 */
1042 static void thread_cancel_rw(struct thread_master *master, int fd, short state)
1043 {
1044 bool found = false;
1045
1046 /* Cancel POLLHUP too just in case some bozo set it */
1047 state |= POLLHUP;
1048
1049 /* find the index of corresponding pollfd */
1050 nfds_t i;
1051
1052 for (i = 0; i < master->handler.pfdcount; i++)
1053 if (master->handler.pfds[i].fd == fd) {
1054 found = true;
1055 break;
1056 }
1057
1058 if (!found) {
1059 zlog_debug(
1060 "[!] Received cancellation request for nonexistent rw job");
1061 zlog_debug("[!] threadmaster: %s | fd: %d",
1062 master->name ? master->name : "", fd);
1063 return;
1064 }
1065
1066 /* NOT out event. */
1067 master->handler.pfds[i].events &= ~(state);
1068
1069 /* If all events are canceled, delete / resize the pollfd array. */
1070 if (master->handler.pfds[i].events == 0) {
1071 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
1072 (master->handler.pfdcount - i - 1)
1073 * sizeof(struct pollfd));
1074 master->handler.pfdcount--;
1075 master->handler.pfds[master->handler.pfdcount].fd = 0;
1076 master->handler.pfds[master->handler.pfdcount].events = 0;
1077 }
1078
1079 /* If we have the same pollfd in the copy, perform the same operations,
1080 * otherwise return. */
1081 if (i >= master->handler.copycount)
1082 return;
1083
1084 master->handler.copy[i].events &= ~(state);
1085
1086 if (master->handler.copy[i].events == 0) {
1087 memmove(master->handler.copy + i, master->handler.copy + i + 1,
1088 (master->handler.copycount - i - 1)
1089 * sizeof(struct pollfd));
1090 master->handler.copycount--;
1091 master->handler.copy[master->handler.copycount].fd = 0;
1092 master->handler.copy[master->handler.copycount].events = 0;
1093 }
1094 }
1095
1096 /**
1097 * Process cancellation requests.
1098 *
1099 * This may only be run from the pthread which owns the thread_master.
1100 *
1101 * @param master the thread master to process
1102 * @REQUIRE master->mtx
1103 */
1104 static void do_thread_cancel(struct thread_master *master)
1105 {
1106 struct thread_list_head *list = NULL;
1107 struct thread **thread_array = NULL;
1108 struct thread *thread;
1109
1110 struct cancel_req *cr;
1111 struct listnode *ln;
1112 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1113 /*
1114 * If this is an event object cancellation, linear search
1115 * through event list deleting any events which have the
1116 * specified argument. We also need to check every thread
1117 * in the ready queue.
1118 */
1119 if (cr->eventobj) {
1120 struct thread *t;
1121
1122 frr_each_safe(thread_list, &master->event, t) {
1123 if (t->arg != cr->eventobj)
1124 continue;
1125 thread_list_del(&master->event, t);
1126 if (t->ref)
1127 *t->ref = NULL;
1128 thread_add_unuse(master, t);
1129 }
1130
1131 frr_each_safe(thread_list, &master->ready, t) {
1132 if (t->arg != cr->eventobj)
1133 continue;
1134 thread_list_del(&master->ready, t);
1135 if (t->ref)
1136 *t->ref = NULL;
1137 thread_add_unuse(master, t);
1138 }
1139 continue;
1140 }
1141
1142 /*
1143 * The pointer varies depending on whether the cancellation
1144 * request was made asynchronously or not. If it was, we
1145 * need to check whether the thread even exists anymore
1146 * before cancelling it.
1147 */
1148 thread = (cr->thread) ? cr->thread : *cr->threadref;
1149
1150 if (!thread)
1151 continue;
1152
1153 /* Determine the appropriate queue to cancel the thread from */
1154 switch (thread->type) {
1155 case THREAD_READ:
1156 thread_cancel_rw(master, thread->u.fd, POLLIN);
1157 thread_array = master->read;
1158 break;
1159 case THREAD_WRITE:
1160 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1161 thread_array = master->write;
1162 break;
1163 case THREAD_TIMER:
1164 thread_timer_list_del(&master->timer, thread);
1165 break;
1166 case THREAD_EVENT:
1167 list = &master->event;
1168 break;
1169 case THREAD_READY:
1170 list = &master->ready;
1171 break;
1172 default:
1173 continue;
1174 break;
1175 }
1176
1177 if (list) {
1178 thread_list_del(list, thread);
1179 } else if (thread_array) {
1180 thread_array[thread->u.fd] = NULL;
1181 }
1182
1183 if (thread->ref)
1184 *thread->ref = NULL;
1185
1186 thread_add_unuse(thread->master, thread);
1187 }
1188
1189 /* Delete and free all cancellation requests */
1190 if (master->cancel_req)
1191 list_delete_all_node(master->cancel_req);
1192
1193 /* Wake up any threads which may be blocked in thread_cancel_async() */
1194 master->canceled = true;
1195 pthread_cond_broadcast(&master->cancel_cond);
1196 }
1197
1198 /**
1199 * Cancel any events which have the specified argument.
1200 *
1201 * MT-Unsafe
1202 *
1203 * @param m the thread_master to cancel from
1204 * @param arg the argument passed when creating the event
1205 */
1206 void thread_cancel_event(struct thread_master *master, void *arg)
1207 {
1208 assert(master->owner == pthread_self());
1209
1210 frr_with_mutex(&master->mtx) {
1211 struct cancel_req *cr =
1212 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1213 cr->eventobj = arg;
1214 listnode_add(master->cancel_req, cr);
1215 do_thread_cancel(master);
1216 }
1217 }
1218
1219 /**
1220 * Cancel a specific task.
1221 *
1222 * MT-Unsafe
1223 *
1224 * @param thread task to cancel
1225 */
1226 void thread_cancel(struct thread **thread)
1227 {
1228 struct thread_master *master;
1229
1230 if (thread == NULL || *thread == NULL)
1231 return;
1232
1233 master = (*thread)->master;
1234
1235 frrtrace(9, frr_libfrr, thread_cancel, master, (*thread)->funcname,
1236 (*thread)->schedfrom, (*thread)->schedfrom_line, NULL, (*thread)->u.fd,
1237 (*thread)->u.val, (*thread)->arg, (*thread)->u.sands.tv_sec);
1238
1239 assert(master->owner == pthread_self());
1240
1241 frr_with_mutex(&master->mtx) {
1242 struct cancel_req *cr =
1243 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1244 cr->thread = *thread;
1245 listnode_add(master->cancel_req, cr);
1246 do_thread_cancel(master);
1247 }
1248
1249 *thread = NULL;
1250 }
1251
1252 /**
1253 * Asynchronous cancellation.
1254 *
1255 * Called with either a struct thread ** or void * to an event argument,
1256 * this function posts the correct cancellation request and blocks until it is
1257 * serviced.
1258 *
1259 * If the thread is currently running, execution blocks until it completes.
1260 *
1261 * The last two parameters are mutually exclusive, i.e. if you pass one the
1262 * other must be NULL.
1263 *
1264 * When the cancellation procedure executes on the target thread_master, the
1265 * thread * provided is checked for nullity. If it is null, the thread is
1266 * assumed to no longer exist and the cancellation request is a no-op. Thus
1267 * users of this API must pass a back-reference when scheduling the original
1268 * task.
1269 *
1270 * MT-Safe
1271 *
1272 * @param master the thread master with the relevant event / task
1273 * @param thread pointer to thread to cancel
1274 * @param eventobj the event
1275 */
1276 void thread_cancel_async(struct thread_master *master, struct thread **thread,
1277 void *eventobj)
1278 {
1279 assert(!(thread && eventobj) && (thread || eventobj));
1280
1281 if (thread && *thread)
1282 frrtrace(9, frr_libfrr, thread_cancel_async, master,
1283 (*thread)->funcname, (*thread)->schedfrom,
1284 (*thread)->schedfrom_line, NULL, (*thread)->u.fd,
1285 (*thread)->u.val, (*thread)->arg,
1286 (*thread)->u.sands.tv_sec);
1287 else
1288 frrtrace(9, frr_libfrr, thread_cancel_async, master, NULL, NULL,
1289 0, NULL, 0, 0, eventobj, 0);
1290
1291 assert(master->owner != pthread_self());
1292
1293 frr_with_mutex(&master->mtx) {
1294 master->canceled = false;
1295
1296 if (thread) {
1297 struct cancel_req *cr =
1298 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1299 cr->threadref = thread;
1300 listnode_add(master->cancel_req, cr);
1301 } else if (eventobj) {
1302 struct cancel_req *cr =
1303 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1304 cr->eventobj = eventobj;
1305 listnode_add(master->cancel_req, cr);
1306 }
1307 AWAKEN(master);
1308
1309 while (!master->canceled)
1310 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1311 }
1312
1313 if (thread)
1314 *thread = NULL;
1315 }
1316 /* ------------------------------------------------------------------------- */
1317
1318 static struct timeval *thread_timer_wait(struct thread_timer_list_head *timers,
1319 struct timeval *timer_val)
1320 {
1321 if (!thread_timer_list_count(timers))
1322 return NULL;
1323
1324 struct thread *next_timer = thread_timer_list_first(timers);
1325 monotime_until(&next_timer->u.sands, timer_val);
1326 return timer_val;
1327 }
1328
1329 static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1330 struct thread *fetch)
1331 {
1332 *fetch = *thread;
1333 thread_add_unuse(m, thread);
1334 return fetch;
1335 }
1336
1337 static int thread_process_io_helper(struct thread_master *m,
1338 struct thread *thread, short state,
1339 short actual_state, int pos)
1340 {
1341 struct thread **thread_array;
1342
1343 /*
1344 * poll() clears the .events field, but the pollfd array we
1345 * pass to poll() is a copy of the one used to schedule threads.
1346 * We need to synchronize state between the two here by applying
1347 * the same changes poll() made on the copy of the "real" pollfd
1348 * array.
1349 *
1350 * This cleans up a possible infinite loop where we refuse
1351 * to respond to a poll event but poll is insistent that
1352 * we should.
1353 */
1354 m->handler.pfds[pos].events &= ~(state);
1355
1356 if (!thread) {
1357 if ((actual_state & (POLLHUP|POLLIN)) != POLLHUP)
1358 flog_err(EC_LIB_NO_THREAD,
1359 "Attempting to process an I/O event but for fd: %d(%d) no thread to handle this!\n",
1360 m->handler.pfds[pos].fd, actual_state);
1361 return 0;
1362 }
1363
1364 if (thread->type == THREAD_READ)
1365 thread_array = m->read;
1366 else
1367 thread_array = m->write;
1368
1369 thread_array[thread->u.fd] = NULL;
1370 thread_list_add_tail(&m->ready, thread);
1371 thread->type = THREAD_READY;
1372
1373 return 1;
1374 }
1375
1376 /**
1377 * Process I/O events.
1378 *
1379 * Walks through file descriptor array looking for those pollfds whose .revents
1380 * field has something interesting. Deletes any invalid file descriptors.
1381 *
1382 * @param m the thread master
1383 * @param num the number of active file descriptors (return value of poll())
1384 */
1385 static void thread_process_io(struct thread_master *m, unsigned int num)
1386 {
1387 unsigned int ready = 0;
1388 struct pollfd *pfds = m->handler.copy;
1389
1390 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1391 /* no event for current fd? immediately continue */
1392 if (pfds[i].revents == 0)
1393 continue;
1394
1395 ready++;
1396
1397 /*
1398 * Unless someone has called thread_cancel from another
1399 * pthread, the only thing that could have changed in
1400 * m->handler.pfds while we were asleep is the .events
1401 * field in a given pollfd. Barring thread_cancel() that
1402 * value should be a superset of the values we have in our
1403 * copy, so there's no need to update it. Similarily,
1404 * barring deletion, the fd should still be a valid index
1405 * into the master's pfds.
1406 *
1407 * We are including POLLERR here to do a READ event
1408 * this is because the read should fail and the
1409 * read function should handle it appropriately
1410 */
1411 if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
1412 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
1413 pfds[i].revents, i);
1414 }
1415 if (pfds[i].revents & POLLOUT)
1416 thread_process_io_helper(m, m->write[pfds[i].fd],
1417 POLLOUT, pfds[i].revents, i);
1418
1419 /* if one of our file descriptors is garbage, remove the same
1420 * from
1421 * both pfds + update sizes and index */
1422 if (pfds[i].revents & POLLNVAL) {
1423 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1424 (m->handler.pfdcount - i - 1)
1425 * sizeof(struct pollfd));
1426 m->handler.pfdcount--;
1427 m->handler.pfds[m->handler.pfdcount].fd = 0;
1428 m->handler.pfds[m->handler.pfdcount].events = 0;
1429
1430 memmove(pfds + i, pfds + i + 1,
1431 (m->handler.copycount - i - 1)
1432 * sizeof(struct pollfd));
1433 m->handler.copycount--;
1434 m->handler.copy[m->handler.copycount].fd = 0;
1435 m->handler.copy[m->handler.copycount].events = 0;
1436
1437 i--;
1438 }
1439 }
1440 }
1441
1442 /* Add all timers that have popped to the ready list. */
1443 static unsigned int thread_process_timers(struct thread_timer_list_head *timers,
1444 struct timeval *timenow)
1445 {
1446 struct thread *thread;
1447 unsigned int ready = 0;
1448
1449 while ((thread = thread_timer_list_first(timers))) {
1450 if (timercmp(timenow, &thread->u.sands, <))
1451 return ready;
1452 thread_timer_list_pop(timers);
1453 thread->type = THREAD_READY;
1454 thread_list_add_tail(&thread->master->ready, thread);
1455 ready++;
1456 }
1457 return ready;
1458 }
1459
1460 /* process a list en masse, e.g. for event thread lists */
1461 static unsigned int thread_process(struct thread_list_head *list)
1462 {
1463 struct thread *thread;
1464 unsigned int ready = 0;
1465
1466 while ((thread = thread_list_pop(list))) {
1467 thread->type = THREAD_READY;
1468 thread_list_add_tail(&thread->master->ready, thread);
1469 ready++;
1470 }
1471 return ready;
1472 }
1473
1474
1475 /* Fetch next ready thread. */
1476 struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
1477 {
1478 struct thread *thread = NULL;
1479 struct timeval now;
1480 struct timeval zerotime = {0, 0};
1481 struct timeval tv;
1482 struct timeval *tw = NULL;
1483 bool eintr_p = false;
1484 int num = 0;
1485
1486 do {
1487 /* Handle signals if any */
1488 if (m->handle_signals)
1489 quagga_sigevent_process();
1490
1491 pthread_mutex_lock(&m->mtx);
1492
1493 /* Process any pending cancellation requests */
1494 do_thread_cancel(m);
1495
1496 /*
1497 * Attempt to flush ready queue before going into poll().
1498 * This is performance-critical. Think twice before modifying.
1499 */
1500 if ((thread = thread_list_pop(&m->ready))) {
1501 fetch = thread_run(m, thread, fetch);
1502 if (fetch->ref)
1503 *fetch->ref = NULL;
1504 pthread_mutex_unlock(&m->mtx);
1505 break;
1506 }
1507
1508 /* otherwise, tick through scheduling sequence */
1509
1510 /*
1511 * Post events to ready queue. This must come before the
1512 * following block since events should occur immediately
1513 */
1514 thread_process(&m->event);
1515
1516 /*
1517 * If there are no tasks on the ready queue, we will poll()
1518 * until a timer expires or we receive I/O, whichever comes
1519 * first. The strategy for doing this is:
1520 *
1521 * - If there are events pending, set the poll() timeout to zero
1522 * - If there are no events pending, but there are timers
1523 * pending, set the timeout to the smallest remaining time on
1524 * any timer.
1525 * - If there are neither timers nor events pending, but there
1526 * are file descriptors pending, block indefinitely in poll()
1527 * - If nothing is pending, it's time for the application to die
1528 *
1529 * In every case except the last, we need to hit poll() at least
1530 * once per loop to avoid starvation by events
1531 */
1532 if (!thread_list_count(&m->ready))
1533 tw = thread_timer_wait(&m->timer, &tv);
1534
1535 if (thread_list_count(&m->ready) ||
1536 (tw && !timercmp(tw, &zerotime, >)))
1537 tw = &zerotime;
1538
1539 if (!tw && m->handler.pfdcount == 0) { /* die */
1540 pthread_mutex_unlock(&m->mtx);
1541 fetch = NULL;
1542 break;
1543 }
1544
1545 /*
1546 * Copy pollfd array + # active pollfds in it. Not necessary to
1547 * copy the array size as this is fixed.
1548 */
1549 m->handler.copycount = m->handler.pfdcount;
1550 memcpy(m->handler.copy, m->handler.pfds,
1551 m->handler.copycount * sizeof(struct pollfd));
1552
1553 pthread_mutex_unlock(&m->mtx);
1554 {
1555 eintr_p = false;
1556 num = fd_poll(m, tw, &eintr_p);
1557 }
1558 pthread_mutex_lock(&m->mtx);
1559
1560 /* Handle any errors received in poll() */
1561 if (num < 0) {
1562 if (eintr_p) {
1563 pthread_mutex_unlock(&m->mtx);
1564 /* loop around to signal handler */
1565 continue;
1566 }
1567
1568 /* else die */
1569 flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
1570 safe_strerror(errno));
1571 pthread_mutex_unlock(&m->mtx);
1572 fetch = NULL;
1573 break;
1574 }
1575
1576 /* Post timers to ready queue. */
1577 monotime(&now);
1578 thread_process_timers(&m->timer, &now);
1579
1580 /* Post I/O to ready queue. */
1581 if (num > 0)
1582 thread_process_io(m, num);
1583
1584 pthread_mutex_unlock(&m->mtx);
1585
1586 } while (!thread && m->spin);
1587
1588 return fetch;
1589 }
1590
1591 static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
1592 {
1593 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1594 + (a.tv_usec - b.tv_usec));
1595 }
1596
1597 unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1598 unsigned long *cputime)
1599 {
1600 /* This is 'user + sys' time. */
1601 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1602 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1603 return timeval_elapsed(now->real, start->real);
1604 }
1605
1606 /* We should aim to yield after yield milliseconds, which defaults
1607 to THREAD_YIELD_TIME_SLOT .
1608 Note: we are using real (wall clock) time for this calculation.
1609 It could be argued that CPU time may make more sense in certain
1610 contexts. The things to consider are whether the thread may have
1611 blocked (in which case wall time increases, but CPU time does not),
1612 or whether the system is heavily loaded with other processes competing
1613 for CPU time. On balance, wall clock time seems to make sense.
1614 Plus it has the added benefit that gettimeofday should be faster
1615 than calling getrusage. */
1616 int thread_should_yield(struct thread *thread)
1617 {
1618 int result;
1619 frr_with_mutex(&thread->mtx) {
1620 result = monotime_since(&thread->real, NULL)
1621 > (int64_t)thread->yield;
1622 }
1623 return result;
1624 }
1625
1626 void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
1627 {
1628 frr_with_mutex(&thread->mtx) {
1629 thread->yield = yield_time;
1630 }
1631 }
1632
1633 void thread_getrusage(RUSAGE_T *r)
1634 {
1635 #if defined RUSAGE_THREAD
1636 #define FRR_RUSAGE RUSAGE_THREAD
1637 #else
1638 #define FRR_RUSAGE RUSAGE_SELF
1639 #endif
1640 monotime(&r->real);
1641 #ifndef EXCLUDE_CPU_TIME
1642 getrusage(FRR_RUSAGE, &(r->cpu));
1643 #endif
1644 }
1645
1646 /*
1647 * Call a thread.
1648 *
1649 * This function will atomically update the thread's usage history. At present
1650 * this is the only spot where usage history is written. Nevertheless the code
1651 * has been written such that the introduction of writers in the future should
1652 * not need to update it provided the writers atomically perform only the
1653 * operations done here, i.e. updating the total and maximum times. In
1654 * particular, the maximum real and cpu times must be monotonically increasing
1655 * or this code is not correct.
1656 */
1657 void thread_call(struct thread *thread)
1658 {
1659 #ifndef EXCLUDE_CPU_TIME
1660 _Atomic unsigned long realtime, cputime;
1661 unsigned long exp;
1662 unsigned long helper;
1663 #endif
1664 RUSAGE_T before, after;
1665
1666 GETRUSAGE(&before);
1667 thread->real = before.real;
1668
1669 frrtrace(9, frr_libfrr, thread_call, thread->master, thread->funcname,
1670 thread->schedfrom, thread->schedfrom_line, NULL, thread->u.fd,
1671 thread->u.val, thread->arg, thread->u.sands.tv_sec);
1672
1673 pthread_setspecific(thread_current, thread);
1674 (*thread->func)(thread);
1675 pthread_setspecific(thread_current, NULL);
1676
1677 GETRUSAGE(&after);
1678
1679 #ifndef EXCLUDE_CPU_TIME
1680 realtime = thread_consumed_time(&after, &before, &helper);
1681 cputime = helper;
1682
1683 /* update realtime */
1684 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1685 memory_order_seq_cst);
1686 exp = atomic_load_explicit(&thread->hist->real.max,
1687 memory_order_seq_cst);
1688 while (exp < realtime
1689 && !atomic_compare_exchange_weak_explicit(
1690 &thread->hist->real.max, &exp, realtime,
1691 memory_order_seq_cst, memory_order_seq_cst))
1692 ;
1693
1694 /* update cputime */
1695 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1696 memory_order_seq_cst);
1697 exp = atomic_load_explicit(&thread->hist->cpu.max,
1698 memory_order_seq_cst);
1699 while (exp < cputime
1700 && !atomic_compare_exchange_weak_explicit(
1701 &thread->hist->cpu.max, &exp, cputime,
1702 memory_order_seq_cst, memory_order_seq_cst))
1703 ;
1704
1705 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1706 memory_order_seq_cst);
1707 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1708 memory_order_seq_cst);
1709
1710 #ifdef CONSUMED_TIME_CHECK
1711 if (realtime > CONSUMED_TIME_CHECK) {
1712 /*
1713 * We have a CPU Hog on our hands.
1714 * Whinge about it now, so we're aware this is yet another task
1715 * to fix.
1716 */
1717 flog_warn(
1718 EC_LIB_SLOW_THREAD,
1719 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1720 thread->funcname, (unsigned long)thread->func,
1721 realtime / 1000, cputime / 1000);
1722 }
1723 #endif /* CONSUMED_TIME_CHECK */
1724 #endif /* Exclude CPU Time */
1725 }
1726
1727 /* Execute thread */
1728 void funcname_thread_execute(struct thread_master *m,
1729 int (*func)(struct thread *), void *arg, int val,
1730 debugargdef)
1731 {
1732 struct thread *thread;
1733
1734 /* Get or allocate new thread to execute. */
1735 frr_with_mutex(&m->mtx) {
1736 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
1737
1738 /* Set its event value. */
1739 frr_with_mutex(&thread->mtx) {
1740 thread->add_type = THREAD_EXECUTE;
1741 thread->u.val = val;
1742 thread->ref = &thread;
1743 }
1744 }
1745
1746 /* Execute thread doing all accounting. */
1747 thread_call(thread);
1748
1749 /* Give back or free thread. */
1750 thread_add_unuse(m, thread);
1751 }
1752
1753 /* Debug signal mask - if 'sigs' is NULL, use current effective mask. */
1754 void debug_signals(const sigset_t *sigs)
1755 {
1756 int i, found;
1757 sigset_t tmpsigs;
1758 char buf[300];
1759
1760 /*
1761 * We're only looking at the non-realtime signals here, so we need
1762 * some limit value. Platform differences mean at some point we just
1763 * need to pick a reasonable value.
1764 */
1765 #if defined SIGRTMIN
1766 # define LAST_SIGNAL SIGRTMIN
1767 #else
1768 # define LAST_SIGNAL 32
1769 #endif
1770
1771
1772 if (sigs == NULL) {
1773 sigemptyset(&tmpsigs);
1774 pthread_sigmask(SIG_BLOCK, NULL, &tmpsigs);
1775 sigs = &tmpsigs;
1776 }
1777
1778 found = 0;
1779 buf[0] = '\0';
1780
1781 for (i = 0; i < LAST_SIGNAL; i++) {
1782 char tmp[20];
1783
1784 if (sigismember(sigs, i) > 0) {
1785 if (found > 0)
1786 strlcat(buf, ",", sizeof(buf));
1787 snprintf(tmp, sizeof(tmp), "%d", i);
1788 strlcat(buf, tmp, sizeof(buf));
1789 found++;
1790 }
1791 }
1792
1793 if (found == 0)
1794 snprintf(buf, sizeof(buf), "<none>");
1795
1796 zlog_debug("%s: %s", __func__, buf);
1797 }