]> git.proxmox.com Git - mirror_frr.git/blob - lib/thread.c
Merge pull request #2420 from pacovn/Coverity_1399246_Logically_dead_code
[mirror_frr.git] / lib / thread.c
1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
21 /* #define DEBUG */
22
23 #include <zebra.h>
24 #include <sys/resource.h>
25
26 #include "thread.h"
27 #include "memory.h"
28 #include "log.h"
29 #include "hash.h"
30 #include "pqueue.h"
31 #include "command.h"
32 #include "sigevent.h"
33 #include "network.h"
34 #include "jhash.h"
35 #include "frratomic.h"
36
37 DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
38 DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
39 DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
40
41 #if defined(__APPLE__)
42 #include <mach/mach.h>
43 #include <mach/mach_time.h>
44 #endif
45
46 #define AWAKEN(m) \
47 do { \
48 static unsigned char wakebyte = 0x01; \
49 write(m->io_pipe[1], &wakebyte, 1); \
50 } while (0);
51
52 /* control variable for initializer */
53 pthread_once_t init_once = PTHREAD_ONCE_INIT;
54 pthread_key_t thread_current;
55
56 pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
57 static struct list *masters;
58
59
60 /* CLI start ---------------------------------------------------------------- */
61 static unsigned int cpu_record_hash_key(struct cpu_thread_history *a)
62 {
63 int size = sizeof(a->func);
64
65 return jhash(&a->func, size, 0);
66 }
67
68 static int cpu_record_hash_cmp(const struct cpu_thread_history *a,
69 const struct cpu_thread_history *b)
70 {
71 return a->func == b->func;
72 }
73
74 static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
75 {
76 struct cpu_thread_history *new;
77 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
78 new->func = a->func;
79 new->funcname = a->funcname;
80 return new;
81 }
82
83 static void cpu_record_hash_free(void *a)
84 {
85 struct cpu_thread_history *hist = a;
86
87 XFREE(MTYPE_THREAD_STATS, hist);
88 }
89
90 static void vty_out_cpu_thread_history(struct vty *vty,
91 struct cpu_thread_history *a)
92 {
93 vty_out(vty, "%5d %10lu.%03lu %9u %8lu %9lu %8lu %9lu", a->total_active,
94 a->cpu.total / 1000, a->cpu.total % 1000, a->total_calls,
95 a->cpu.total / a->total_calls, a->cpu.max,
96 a->real.total / a->total_calls, a->real.max);
97 vty_out(vty, " %c%c%c%c%c %s\n",
98 a->types & (1 << THREAD_READ) ? 'R' : ' ',
99 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
100 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
101 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
102 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
103 }
104
105 static void cpu_record_hash_print(struct hash_backet *bucket, void *args[])
106 {
107 struct cpu_thread_history *totals = args[0];
108 struct cpu_thread_history copy;
109 struct vty *vty = args[1];
110 uint8_t *filter = args[2];
111
112 struct cpu_thread_history *a = bucket->data;
113
114 copy.total_active =
115 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
116 copy.total_calls =
117 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
118 copy.cpu.total =
119 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
120 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
121 copy.real.total =
122 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
123 copy.real.max =
124 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
125 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
126 copy.funcname = a->funcname;
127
128 if (!(copy.types & *filter))
129 return;
130
131 vty_out_cpu_thread_history(vty, &copy);
132 totals->total_active += copy.total_active;
133 totals->total_calls += copy.total_calls;
134 totals->real.total += copy.real.total;
135 if (totals->real.max < copy.real.max)
136 totals->real.max = copy.real.max;
137 totals->cpu.total += copy.cpu.total;
138 if (totals->cpu.max < copy.cpu.max)
139 totals->cpu.max = copy.cpu.max;
140 }
141
142 static void cpu_record_print(struct vty *vty, uint8_t filter)
143 {
144 struct cpu_thread_history tmp;
145 void *args[3] = {&tmp, vty, &filter};
146 struct thread_master *m;
147 struct listnode *ln;
148
149 memset(&tmp, 0, sizeof tmp);
150 tmp.funcname = "TOTAL";
151 tmp.types = filter;
152
153 pthread_mutex_lock(&masters_mtx);
154 {
155 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
156 const char *name = m->name ? m->name : "main";
157
158 char underline[strlen(name) + 1];
159 memset(underline, '-', sizeof(underline));
160 underline[sizeof(underline) - 1] = '\0';
161
162 vty_out(vty, "\n");
163 vty_out(vty, "Showing statistics for pthread %s\n",
164 name);
165 vty_out(vty, "-------------------------------%s\n",
166 underline);
167 vty_out(vty, "%21s %18s %18s\n", "",
168 "CPU (user+system):", "Real (wall-clock):");
169 vty_out(vty,
170 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
171 vty_out(vty, " Avg uSec Max uSecs");
172 vty_out(vty, " Type Thread\n");
173
174 if (m->cpu_record->count)
175 hash_iterate(
176 m->cpu_record,
177 (void (*)(struct hash_backet *,
178 void *))cpu_record_hash_print,
179 args);
180 else
181 vty_out(vty, "No data to display yet.\n");
182
183 vty_out(vty, "\n");
184 }
185 }
186 pthread_mutex_unlock(&masters_mtx);
187
188 vty_out(vty, "\n");
189 vty_out(vty, "Total thread statistics\n");
190 vty_out(vty, "-------------------------\n");
191 vty_out(vty, "%21s %18s %18s\n", "",
192 "CPU (user+system):", "Real (wall-clock):");
193 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
194 vty_out(vty, " Avg uSec Max uSecs");
195 vty_out(vty, " Type Thread\n");
196
197 if (tmp.total_calls > 0)
198 vty_out_cpu_thread_history(vty, &tmp);
199 }
200
201 static void cpu_record_hash_clear(struct hash_backet *bucket, void *args[])
202 {
203 uint8_t *filter = args[0];
204 struct hash *cpu_record = args[1];
205
206 struct cpu_thread_history *a = bucket->data;
207
208 if (!(a->types & *filter))
209 return;
210
211 hash_release(cpu_record, bucket->data);
212 }
213
214 static void cpu_record_clear(uint8_t filter)
215 {
216 uint8_t *tmp = &filter;
217 struct thread_master *m;
218 struct listnode *ln;
219
220 pthread_mutex_lock(&masters_mtx);
221 {
222 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
223 pthread_mutex_lock(&m->mtx);
224 {
225 void *args[2] = {tmp, m->cpu_record};
226 hash_iterate(
227 m->cpu_record,
228 (void (*)(struct hash_backet *,
229 void *))cpu_record_hash_clear,
230 args);
231 }
232 pthread_mutex_unlock(&m->mtx);
233 }
234 }
235 pthread_mutex_unlock(&masters_mtx);
236 }
237
238 static uint8_t parse_filter(const char *filterstr)
239 {
240 int i = 0;
241 int filter = 0;
242
243 while (filterstr[i] != '\0') {
244 switch (filterstr[i]) {
245 case 'r':
246 case 'R':
247 filter |= (1 << THREAD_READ);
248 break;
249 case 'w':
250 case 'W':
251 filter |= (1 << THREAD_WRITE);
252 break;
253 case 't':
254 case 'T':
255 filter |= (1 << THREAD_TIMER);
256 break;
257 case 'e':
258 case 'E':
259 filter |= (1 << THREAD_EVENT);
260 break;
261 case 'x':
262 case 'X':
263 filter |= (1 << THREAD_EXECUTE);
264 break;
265 default:
266 break;
267 }
268 ++i;
269 }
270 return filter;
271 }
272
273 DEFUN (show_thread_cpu,
274 show_thread_cpu_cmd,
275 "show thread cpu [FILTER]",
276 SHOW_STR
277 "Thread information\n"
278 "Thread CPU usage\n"
279 "Display filter (rwtexb)\n")
280 {
281 uint8_t filter = (uint8_t)-1U;
282 int idx = 0;
283
284 if (argv_find(argv, argc, "FILTER", &idx)) {
285 filter = parse_filter(argv[idx]->arg);
286 if (!filter) {
287 vty_out(vty,
288 "Invalid filter \"%s\" specified; must contain at least"
289 "one of 'RWTEXB'\n",
290 argv[idx]->arg);
291 return CMD_WARNING;
292 }
293 }
294
295 cpu_record_print(vty, filter);
296 return CMD_SUCCESS;
297 }
298
299 DEFUN (clear_thread_cpu,
300 clear_thread_cpu_cmd,
301 "clear thread cpu [FILTER]",
302 "Clear stored data in all pthreads\n"
303 "Thread information\n"
304 "Thread CPU usage\n"
305 "Display filter (rwtexb)\n")
306 {
307 uint8_t filter = (uint8_t)-1U;
308 int idx = 0;
309
310 if (argv_find(argv, argc, "FILTER", &idx)) {
311 filter = parse_filter(argv[idx]->arg);
312 if (!filter) {
313 vty_out(vty,
314 "Invalid filter \"%s\" specified; must contain at least"
315 "one of 'RWTEXB'\n",
316 argv[idx]->arg);
317 return CMD_WARNING;
318 }
319 }
320
321 cpu_record_clear(filter);
322 return CMD_SUCCESS;
323 }
324
325 void thread_cmd_init(void)
326 {
327 install_element(VIEW_NODE, &show_thread_cpu_cmd);
328 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
329 }
330 /* CLI end ------------------------------------------------------------------ */
331
332
333 static int thread_timer_cmp(void *a, void *b)
334 {
335 struct thread *thread_a = a;
336 struct thread *thread_b = b;
337
338 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, <))
339 return -1;
340 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, >))
341 return 1;
342 return 0;
343 }
344
345 static void thread_timer_update(void *node, int actual_position)
346 {
347 struct thread *thread = node;
348
349 thread->index = actual_position;
350 }
351
352 static void cancelreq_del(void *cr)
353 {
354 XFREE(MTYPE_TMP, cr);
355 }
356
357 /* initializer, only ever called once */
358 static void initializer()
359 {
360 pthread_key_create(&thread_current, NULL);
361 }
362
363 struct thread_master *thread_master_create(const char *name)
364 {
365 struct thread_master *rv;
366 struct rlimit limit;
367
368 pthread_once(&init_once, &initializer);
369
370 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
371 if (rv == NULL)
372 return NULL;
373
374 /* Initialize master mutex */
375 pthread_mutex_init(&rv->mtx, NULL);
376 pthread_cond_init(&rv->cancel_cond, NULL);
377
378 /* Set name */
379 rv->name = name ? XSTRDUP(MTYPE_THREAD_MASTER, name) : NULL;
380
381 /* Initialize I/O task data structures */
382 getrlimit(RLIMIT_NOFILE, &limit);
383 rv->fd_limit = (int)limit.rlim_cur;
384 rv->read =
385 XCALLOC(MTYPE_THREAD, sizeof(struct thread *) * rv->fd_limit);
386 if (rv->read == NULL) {
387 XFREE(MTYPE_THREAD_MASTER, rv);
388 return NULL;
389 }
390 rv->write =
391 XCALLOC(MTYPE_THREAD, sizeof(struct thread *) * rv->fd_limit);
392 if (rv->write == NULL) {
393 XFREE(MTYPE_THREAD, rv->read);
394 XFREE(MTYPE_THREAD_MASTER, rv);
395 return NULL;
396 }
397
398 rv->cpu_record = hash_create_size(
399 8, (unsigned int (*)(void *))cpu_record_hash_key,
400 (int (*)(const void *, const void *))cpu_record_hash_cmp,
401 "Thread Hash");
402
403
404 /* Initialize the timer queues */
405 rv->timer = pqueue_create();
406 rv->timer->cmp = thread_timer_cmp;
407 rv->timer->update = thread_timer_update;
408
409 /* Initialize thread_fetch() settings */
410 rv->spin = true;
411 rv->handle_signals = true;
412
413 /* Set pthread owner, should be updated by actual owner */
414 rv->owner = pthread_self();
415 rv->cancel_req = list_new();
416 rv->cancel_req->del = cancelreq_del;
417 rv->canceled = true;
418
419 /* Initialize pipe poker */
420 pipe(rv->io_pipe);
421 set_nonblocking(rv->io_pipe[0]);
422 set_nonblocking(rv->io_pipe[1]);
423
424 /* Initialize data structures for poll() */
425 rv->handler.pfdsize = rv->fd_limit;
426 rv->handler.pfdcount = 0;
427 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
428 sizeof(struct pollfd) * rv->handler.pfdsize);
429 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
430 sizeof(struct pollfd) * rv->handler.pfdsize);
431
432 /* add to list of threadmasters */
433 pthread_mutex_lock(&masters_mtx);
434 {
435 if (!masters)
436 masters = list_new();
437
438 listnode_add(masters, rv);
439 }
440 pthread_mutex_unlock(&masters_mtx);
441
442 return rv;
443 }
444
445 void thread_master_set_name(struct thread_master *master, const char *name)
446 {
447 pthread_mutex_lock(&master->mtx);
448 {
449 if (master->name)
450 XFREE(MTYPE_THREAD_MASTER, master->name);
451 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
452 }
453 pthread_mutex_unlock(&master->mtx);
454 }
455
456 /* Add a new thread to the list. */
457 static void thread_list_add(struct thread_list *list, struct thread *thread)
458 {
459 thread->next = NULL;
460 thread->prev = list->tail;
461 if (list->tail)
462 list->tail->next = thread;
463 else
464 list->head = thread;
465 list->tail = thread;
466 list->count++;
467 }
468
469 /* Delete a thread from the list. */
470 static struct thread *thread_list_delete(struct thread_list *list,
471 struct thread *thread)
472 {
473 if (thread->next)
474 thread->next->prev = thread->prev;
475 else
476 list->tail = thread->prev;
477 if (thread->prev)
478 thread->prev->next = thread->next;
479 else
480 list->head = thread->next;
481 thread->next = thread->prev = NULL;
482 list->count--;
483 return thread;
484 }
485
486 /* Thread list is empty or not. */
487 static int thread_empty(struct thread_list *list)
488 {
489 return list->head ? 0 : 1;
490 }
491
492 /* Delete top of the list and return it. */
493 static struct thread *thread_trim_head(struct thread_list *list)
494 {
495 if (!thread_empty(list))
496 return thread_list_delete(list, list->head);
497 return NULL;
498 }
499
500 /* Move thread to unuse list. */
501 static void thread_add_unuse(struct thread_master *m, struct thread *thread)
502 {
503 assert(m != NULL && thread != NULL);
504 assert(thread->next == NULL);
505 assert(thread->prev == NULL);
506 thread->ref = NULL;
507
508 thread->type = THREAD_UNUSED;
509 thread->hist->total_active--;
510 thread_list_add(&m->unuse, thread);
511 }
512
513 /* Free all unused thread. */
514 static void thread_list_free(struct thread_master *m, struct thread_list *list)
515 {
516 struct thread *t;
517 struct thread *next;
518
519 for (t = list->head; t; t = next) {
520 next = t->next;
521 XFREE(MTYPE_THREAD, t);
522 list->count--;
523 m->alloc--;
524 }
525 }
526
527 static void thread_array_free(struct thread_master *m,
528 struct thread **thread_array)
529 {
530 struct thread *t;
531 int index;
532
533 for (index = 0; index < m->fd_limit; ++index) {
534 t = thread_array[index];
535 if (t) {
536 thread_array[index] = NULL;
537 XFREE(MTYPE_THREAD, t);
538 m->alloc--;
539 }
540 }
541 XFREE(MTYPE_THREAD, thread_array);
542 }
543
544 static void thread_queue_free(struct thread_master *m, struct pqueue *queue)
545 {
546 int i;
547
548 for (i = 0; i < queue->size; i++)
549 XFREE(MTYPE_THREAD, queue->array[i]);
550
551 m->alloc -= queue->size;
552 pqueue_delete(queue);
553 }
554
555 /*
556 * thread_master_free_unused
557 *
558 * As threads are finished with they are put on the
559 * unuse list for later reuse.
560 * If we are shutting down, Free up unused threads
561 * So we can see if we forget to shut anything off
562 */
563 void thread_master_free_unused(struct thread_master *m)
564 {
565 pthread_mutex_lock(&m->mtx);
566 {
567 struct thread *t;
568 while ((t = thread_trim_head(&m->unuse)) != NULL) {
569 pthread_mutex_destroy(&t->mtx);
570 XFREE(MTYPE_THREAD, t);
571 }
572 }
573 pthread_mutex_unlock(&m->mtx);
574 }
575
576 /* Stop thread scheduler. */
577 void thread_master_free(struct thread_master *m)
578 {
579 pthread_mutex_lock(&masters_mtx);
580 {
581 listnode_delete(masters, m);
582 if (masters->count == 0) {
583 list_delete_and_null(&masters);
584 }
585 }
586 pthread_mutex_unlock(&masters_mtx);
587
588 thread_array_free(m, m->read);
589 thread_array_free(m, m->write);
590 thread_queue_free(m, m->timer);
591 thread_list_free(m, &m->event);
592 thread_list_free(m, &m->ready);
593 thread_list_free(m, &m->unuse);
594 pthread_mutex_destroy(&m->mtx);
595 pthread_cond_destroy(&m->cancel_cond);
596 close(m->io_pipe[0]);
597 close(m->io_pipe[1]);
598 list_delete_and_null(&m->cancel_req);
599 m->cancel_req = NULL;
600
601 hash_clean(m->cpu_record, cpu_record_hash_free);
602 hash_free(m->cpu_record);
603 m->cpu_record = NULL;
604
605 if (m->name)
606 XFREE(MTYPE_THREAD_MASTER, m->name);
607 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
608 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
609 XFREE(MTYPE_THREAD_MASTER, m);
610 }
611
612 /* Return remain time in second. */
613 unsigned long thread_timer_remain_second(struct thread *thread)
614 {
615 int64_t remain;
616
617 pthread_mutex_lock(&thread->mtx);
618 {
619 remain = monotime_until(&thread->u.sands, NULL) / 1000000LL;
620 }
621 pthread_mutex_unlock(&thread->mtx);
622
623 return remain < 0 ? 0 : remain;
624 }
625
626 #define debugargdef const char *funcname, const char *schedfrom, int fromln
627 #define debugargpass funcname, schedfrom, fromln
628
629 struct timeval thread_timer_remain(struct thread *thread)
630 {
631 struct timeval remain;
632 pthread_mutex_lock(&thread->mtx);
633 {
634 monotime_until(&thread->u.sands, &remain);
635 }
636 pthread_mutex_unlock(&thread->mtx);
637 return remain;
638 }
639
640 /* Get new thread. */
641 static struct thread *thread_get(struct thread_master *m, uint8_t type,
642 int (*func)(struct thread *), void *arg,
643 debugargdef)
644 {
645 struct thread *thread = thread_trim_head(&m->unuse);
646 struct cpu_thread_history tmp;
647
648 if (!thread) {
649 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
650 /* mutex only needs to be initialized at struct creation. */
651 pthread_mutex_init(&thread->mtx, NULL);
652 m->alloc++;
653 }
654
655 thread->type = type;
656 thread->add_type = type;
657 thread->master = m;
658 thread->arg = arg;
659 thread->index = -1;
660 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
661 thread->ref = NULL;
662
663 /*
664 * So if the passed in funcname is not what we have
665 * stored that means the thread->hist needs to be
666 * updated. We keep the last one around in unused
667 * under the assumption that we are probably
668 * going to immediately allocate the same
669 * type of thread.
670 * This hopefully saves us some serious
671 * hash_get lookups.
672 */
673 if (thread->funcname != funcname || thread->func != func) {
674 tmp.func = func;
675 tmp.funcname = funcname;
676 thread->hist =
677 hash_get(m->cpu_record, &tmp,
678 (void *(*)(void *))cpu_record_hash_alloc);
679 }
680 thread->hist->total_active++;
681 thread->func = func;
682 thread->funcname = funcname;
683 thread->schedfrom = schedfrom;
684 thread->schedfrom_line = fromln;
685
686 return thread;
687 }
688
689 static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
690 nfds_t count, const struct timeval *timer_wait)
691 {
692 /* If timer_wait is null here, that means poll() should block
693 * indefinitely,
694 * unless the thread_master has overriden it by setting
695 * ->selectpoll_timeout.
696 * If the value is positive, it specifies the maximum number of
697 * milliseconds
698 * to wait. If the timeout is -1, it specifies that we should never wait
699 * and
700 * always return immediately even if no event is detected. If the value
701 * is
702 * zero, the behavior is default. */
703 int timeout = -1;
704
705 /* number of file descriptors with events */
706 int num;
707
708 if (timer_wait != NULL
709 && m->selectpoll_timeout == 0) // use the default value
710 timeout = (timer_wait->tv_sec * 1000)
711 + (timer_wait->tv_usec / 1000);
712 else if (m->selectpoll_timeout > 0) // use the user's timeout
713 timeout = m->selectpoll_timeout;
714 else if (m->selectpoll_timeout
715 < 0) // effect a poll (return immediately)
716 timeout = 0;
717
718 /* add poll pipe poker */
719 assert(count + 1 < pfdsize);
720 pfds[count].fd = m->io_pipe[0];
721 pfds[count].events = POLLIN;
722 pfds[count].revents = 0x00;
723
724 num = poll(pfds, count + 1, timeout);
725
726 unsigned char trash[64];
727 if (num > 0 && pfds[count].revents != 0 && num--)
728 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
729 ;
730
731 return num;
732 }
733
734 /* Add new read thread. */
735 struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
736 int (*func)(struct thread *),
737 void *arg, int fd,
738 struct thread **t_ptr,
739 debugargdef)
740 {
741 struct thread *thread = NULL;
742
743 pthread_mutex_lock(&m->mtx);
744 {
745 if (t_ptr
746 && *t_ptr) // thread is already scheduled; don't reschedule
747 {
748 pthread_mutex_unlock(&m->mtx);
749 return NULL;
750 }
751
752 /* default to a new pollfd */
753 nfds_t queuepos = m->handler.pfdcount;
754
755 /* if we already have a pollfd for our file descriptor, find and
756 * use it */
757 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
758 if (m->handler.pfds[i].fd == fd) {
759 queuepos = i;
760 break;
761 }
762
763 /* make sure we have room for this fd + pipe poker fd */
764 assert(queuepos + 1 < m->handler.pfdsize);
765
766 thread = thread_get(m, dir, func, arg, debugargpass);
767
768 m->handler.pfds[queuepos].fd = fd;
769 m->handler.pfds[queuepos].events |=
770 (dir == THREAD_READ ? POLLIN : POLLOUT);
771
772 if (queuepos == m->handler.pfdcount)
773 m->handler.pfdcount++;
774
775 if (thread) {
776 pthread_mutex_lock(&thread->mtx);
777 {
778 thread->u.fd = fd;
779 if (dir == THREAD_READ)
780 m->read[thread->u.fd] = thread;
781 else
782 m->write[thread->u.fd] = thread;
783 }
784 pthread_mutex_unlock(&thread->mtx);
785
786 if (t_ptr) {
787 *t_ptr = thread;
788 thread->ref = t_ptr;
789 }
790 }
791
792 AWAKEN(m);
793 }
794 pthread_mutex_unlock(&m->mtx);
795
796 return thread;
797 }
798
799 static struct thread *
800 funcname_thread_add_timer_timeval(struct thread_master *m,
801 int (*func)(struct thread *), int type,
802 void *arg, struct timeval *time_relative,
803 struct thread **t_ptr, debugargdef)
804 {
805 struct thread *thread;
806 struct pqueue *queue;
807
808 assert(m != NULL);
809
810 assert(type == THREAD_TIMER);
811 assert(time_relative);
812
813 pthread_mutex_lock(&m->mtx);
814 {
815 if (t_ptr
816 && *t_ptr) // thread is already scheduled; don't reschedule
817 {
818 pthread_mutex_unlock(&m->mtx);
819 return NULL;
820 }
821
822 queue = m->timer;
823 thread = thread_get(m, type, func, arg, debugargpass);
824
825 pthread_mutex_lock(&thread->mtx);
826 {
827 monotime(&thread->u.sands);
828 timeradd(&thread->u.sands, time_relative,
829 &thread->u.sands);
830 pqueue_enqueue(thread, queue);
831 if (t_ptr) {
832 *t_ptr = thread;
833 thread->ref = t_ptr;
834 }
835 }
836 pthread_mutex_unlock(&thread->mtx);
837
838 AWAKEN(m);
839 }
840 pthread_mutex_unlock(&m->mtx);
841
842 return thread;
843 }
844
845
846 /* Add timer event thread. */
847 struct thread *funcname_thread_add_timer(struct thread_master *m,
848 int (*func)(struct thread *),
849 void *arg, long timer,
850 struct thread **t_ptr, debugargdef)
851 {
852 struct timeval trel;
853
854 assert(m != NULL);
855
856 trel.tv_sec = timer;
857 trel.tv_usec = 0;
858
859 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
860 &trel, t_ptr, debugargpass);
861 }
862
863 /* Add timer event thread with "millisecond" resolution */
864 struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
865 int (*func)(struct thread *),
866 void *arg, long timer,
867 struct thread **t_ptr,
868 debugargdef)
869 {
870 struct timeval trel;
871
872 assert(m != NULL);
873
874 trel.tv_sec = timer / 1000;
875 trel.tv_usec = 1000 * (timer % 1000);
876
877 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
878 &trel, t_ptr, debugargpass);
879 }
880
881 /* Add timer event thread with "millisecond" resolution */
882 struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
883 int (*func)(struct thread *),
884 void *arg, struct timeval *tv,
885 struct thread **t_ptr, debugargdef)
886 {
887 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
888 t_ptr, debugargpass);
889 }
890
891 /* Add simple event thread. */
892 struct thread *funcname_thread_add_event(struct thread_master *m,
893 int (*func)(struct thread *),
894 void *arg, int val,
895 struct thread **t_ptr, debugargdef)
896 {
897 struct thread *thread;
898
899 assert(m != NULL);
900
901 pthread_mutex_lock(&m->mtx);
902 {
903 if (t_ptr
904 && *t_ptr) // thread is already scheduled; don't reschedule
905 {
906 pthread_mutex_unlock(&m->mtx);
907 return NULL;
908 }
909
910 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
911 pthread_mutex_lock(&thread->mtx);
912 {
913 thread->u.val = val;
914 thread_list_add(&m->event, thread);
915 }
916 pthread_mutex_unlock(&thread->mtx);
917
918 if (t_ptr) {
919 *t_ptr = thread;
920 thread->ref = t_ptr;
921 }
922
923 AWAKEN(m);
924 }
925 pthread_mutex_unlock(&m->mtx);
926
927 return thread;
928 }
929
930 /* Thread cancellation ------------------------------------------------------ */
931
932 /**
933 * NOT's out the .events field of pollfd corresponding to the given file
934 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
935 *
936 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
937 * implementation for details.
938 *
939 * @param master
940 * @param fd
941 * @param state the event to cancel. One or more (OR'd together) of the
942 * following:
943 * - POLLIN
944 * - POLLOUT
945 */
946 static void thread_cancel_rw(struct thread_master *master, int fd, short state)
947 {
948 bool found = false;
949
950 /* Cancel POLLHUP too just in case some bozo set it */
951 state |= POLLHUP;
952
953 /* find the index of corresponding pollfd */
954 nfds_t i;
955
956 for (i = 0; i < master->handler.pfdcount; i++)
957 if (master->handler.pfds[i].fd == fd) {
958 found = true;
959 break;
960 }
961
962 if (!found) {
963 zlog_debug(
964 "[!] Received cancellation request for nonexistent rw job");
965 zlog_debug("[!] threadmaster: %s | fd: %d",
966 master->name ? master->name : "", fd);
967 return;
968 }
969
970 /* NOT out event. */
971 master->handler.pfds[i].events &= ~(state);
972
973 /* If all events are canceled, delete / resize the pollfd array. */
974 if (master->handler.pfds[i].events == 0) {
975 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
976 (master->handler.pfdcount - i - 1)
977 * sizeof(struct pollfd));
978 master->handler.pfdcount--;
979 }
980
981 /* If we have the same pollfd in the copy, perform the same operations,
982 * otherwise return. */
983 if (i >= master->handler.copycount)
984 return;
985
986 master->handler.copy[i].events &= ~(state);
987
988 if (master->handler.copy[i].events == 0) {
989 memmove(master->handler.copy + i, master->handler.copy + i + 1,
990 (master->handler.copycount - i - 1)
991 * sizeof(struct pollfd));
992 master->handler.copycount--;
993 }
994 }
995
996 /**
997 * Process cancellation requests.
998 *
999 * This may only be run from the pthread which owns the thread_master.
1000 *
1001 * @param master the thread master to process
1002 * @REQUIRE master->mtx
1003 */
1004 static void do_thread_cancel(struct thread_master *master)
1005 {
1006 struct thread_list *list = NULL;
1007 struct pqueue *queue = NULL;
1008 struct thread **thread_array = NULL;
1009 struct thread *thread;
1010
1011 struct cancel_req *cr;
1012 struct listnode *ln;
1013 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1014 /* If this is an event object cancellation, linear search
1015 * through event
1016 * list deleting any events which have the specified argument.
1017 * We also
1018 * need to check every thread in the ready queue. */
1019 if (cr->eventobj) {
1020 struct thread *t;
1021 thread = master->event.head;
1022
1023 while (thread) {
1024 t = thread;
1025 thread = t->next;
1026
1027 if (t->arg == cr->eventobj) {
1028 thread_list_delete(&master->event, t);
1029 if (t->ref)
1030 *t->ref = NULL;
1031 thread_add_unuse(master, t);
1032 }
1033 }
1034
1035 thread = master->ready.head;
1036 while (thread) {
1037 t = thread;
1038 thread = t->next;
1039
1040 if (t->arg == cr->eventobj) {
1041 thread_list_delete(&master->ready, t);
1042 if (t->ref)
1043 *t->ref = NULL;
1044 thread_add_unuse(master, t);
1045 }
1046 }
1047 continue;
1048 }
1049
1050 /* The pointer varies depending on whether the cancellation
1051 * request was
1052 * made asynchronously or not. If it was, we need to check
1053 * whether the
1054 * thread even exists anymore before cancelling it. */
1055 thread = (cr->thread) ? cr->thread : *cr->threadref;
1056
1057 if (!thread)
1058 continue;
1059
1060 /* Determine the appropriate queue to cancel the thread from */
1061 switch (thread->type) {
1062 case THREAD_READ:
1063 thread_cancel_rw(master, thread->u.fd, POLLIN);
1064 thread_array = master->read;
1065 break;
1066 case THREAD_WRITE:
1067 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1068 thread_array = master->write;
1069 break;
1070 case THREAD_TIMER:
1071 queue = master->timer;
1072 break;
1073 case THREAD_EVENT:
1074 list = &master->event;
1075 break;
1076 case THREAD_READY:
1077 list = &master->ready;
1078 break;
1079 default:
1080 continue;
1081 break;
1082 }
1083
1084 if (queue) {
1085 assert(thread->index >= 0);
1086 assert(thread == queue->array[thread->index]);
1087 pqueue_remove_at(thread->index, queue);
1088 } else if (list) {
1089 thread_list_delete(list, thread);
1090 } else if (thread_array) {
1091 thread_array[thread->u.fd] = NULL;
1092 } else {
1093 assert(!"Thread should be either in queue or list or array!");
1094 }
1095
1096 if (thread->ref)
1097 *thread->ref = NULL;
1098
1099 thread_add_unuse(thread->master, thread);
1100 }
1101
1102 /* Delete and free all cancellation requests */
1103 list_delete_all_node(master->cancel_req);
1104
1105 /* Wake up any threads which may be blocked in thread_cancel_async() */
1106 master->canceled = true;
1107 pthread_cond_broadcast(&master->cancel_cond);
1108 }
1109
1110 /**
1111 * Cancel any events which have the specified argument.
1112 *
1113 * MT-Unsafe
1114 *
1115 * @param m the thread_master to cancel from
1116 * @param arg the argument passed when creating the event
1117 */
1118 void thread_cancel_event(struct thread_master *master, void *arg)
1119 {
1120 assert(master->owner == pthread_self());
1121
1122 pthread_mutex_lock(&master->mtx);
1123 {
1124 struct cancel_req *cr =
1125 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1126 cr->eventobj = arg;
1127 listnode_add(master->cancel_req, cr);
1128 do_thread_cancel(master);
1129 }
1130 pthread_mutex_unlock(&master->mtx);
1131 }
1132
1133 /**
1134 * Cancel a specific task.
1135 *
1136 * MT-Unsafe
1137 *
1138 * @param thread task to cancel
1139 */
1140 void thread_cancel(struct thread *thread)
1141 {
1142 assert(thread->master->owner == pthread_self());
1143
1144 pthread_mutex_lock(&thread->master->mtx);
1145 {
1146 struct cancel_req *cr =
1147 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1148 cr->thread = thread;
1149 listnode_add(thread->master->cancel_req, cr);
1150 do_thread_cancel(thread->master);
1151 }
1152 pthread_mutex_unlock(&thread->master->mtx);
1153 }
1154
1155 /**
1156 * Asynchronous cancellation.
1157 *
1158 * Called with either a struct thread ** or void * to an event argument,
1159 * this function posts the correct cancellation request and blocks until it is
1160 * serviced.
1161 *
1162 * If the thread is currently running, execution blocks until it completes.
1163 *
1164 * The last two parameters are mutually exclusive, i.e. if you pass one the
1165 * other must be NULL.
1166 *
1167 * When the cancellation procedure executes on the target thread_master, the
1168 * thread * provided is checked for nullity. If it is null, the thread is
1169 * assumed to no longer exist and the cancellation request is a no-op. Thus
1170 * users of this API must pass a back-reference when scheduling the original
1171 * task.
1172 *
1173 * MT-Safe
1174 *
1175 * @param master the thread master with the relevant event / task
1176 * @param thread pointer to thread to cancel
1177 * @param eventobj the event
1178 */
1179 void thread_cancel_async(struct thread_master *master, struct thread **thread,
1180 void *eventobj)
1181 {
1182 assert(!(thread && eventobj) && (thread || eventobj));
1183 assert(master->owner != pthread_self());
1184
1185 pthread_mutex_lock(&master->mtx);
1186 {
1187 master->canceled = false;
1188
1189 if (thread) {
1190 struct cancel_req *cr =
1191 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1192 cr->threadref = thread;
1193 listnode_add(master->cancel_req, cr);
1194 } else if (eventobj) {
1195 struct cancel_req *cr =
1196 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1197 cr->eventobj = eventobj;
1198 listnode_add(master->cancel_req, cr);
1199 }
1200 AWAKEN(master);
1201
1202 while (!master->canceled)
1203 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1204 }
1205 pthread_mutex_unlock(&master->mtx);
1206 }
1207 /* ------------------------------------------------------------------------- */
1208
1209 static struct timeval *thread_timer_wait(struct pqueue *queue,
1210 struct timeval *timer_val)
1211 {
1212 if (queue->size) {
1213 struct thread *next_timer = queue->array[0];
1214 monotime_until(&next_timer->u.sands, timer_val);
1215 return timer_val;
1216 }
1217 return NULL;
1218 }
1219
1220 static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1221 struct thread *fetch)
1222 {
1223 *fetch = *thread;
1224 thread_add_unuse(m, thread);
1225 return fetch;
1226 }
1227
1228 static int thread_process_io_helper(struct thread_master *m,
1229 struct thread *thread, short state, int pos)
1230 {
1231 struct thread **thread_array;
1232
1233 if (!thread)
1234 return 0;
1235
1236 if (thread->type == THREAD_READ)
1237 thread_array = m->read;
1238 else
1239 thread_array = m->write;
1240
1241 thread_array[thread->u.fd] = NULL;
1242 thread_list_add(&m->ready, thread);
1243 thread->type = THREAD_READY;
1244 /* if another pthread scheduled this file descriptor for the event we're
1245 * responding to, no problem; we're getting to it now */
1246 thread->master->handler.pfds[pos].events &= ~(state);
1247 return 1;
1248 }
1249
1250 /**
1251 * Process I/O events.
1252 *
1253 * Walks through file descriptor array looking for those pollfds whose .revents
1254 * field has something interesting. Deletes any invalid file descriptors.
1255 *
1256 * @param m the thread master
1257 * @param num the number of active file descriptors (return value of poll())
1258 */
1259 static void thread_process_io(struct thread_master *m, unsigned int num)
1260 {
1261 unsigned int ready = 0;
1262 struct pollfd *pfds = m->handler.copy;
1263
1264 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1265 /* no event for current fd? immediately continue */
1266 if (pfds[i].revents == 0)
1267 continue;
1268
1269 ready++;
1270
1271 /* Unless someone has called thread_cancel from another pthread,
1272 * the only
1273 * thing that could have changed in m->handler.pfds while we
1274 * were
1275 * asleep is the .events field in a given pollfd. Barring
1276 * thread_cancel()
1277 * that value should be a superset of the values we have in our
1278 * copy, so
1279 * there's no need to update it. Similarily, barring deletion,
1280 * the fd
1281 * should still be a valid index into the master's pfds. */
1282 if (pfds[i].revents & (POLLIN | POLLHUP))
1283 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
1284 i);
1285 if (pfds[i].revents & POLLOUT)
1286 thread_process_io_helper(m, m->write[pfds[i].fd],
1287 POLLOUT, i);
1288
1289 /* if one of our file descriptors is garbage, remove the same
1290 * from
1291 * both pfds + update sizes and index */
1292 if (pfds[i].revents & POLLNVAL) {
1293 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1294 (m->handler.pfdcount - i - 1)
1295 * sizeof(struct pollfd));
1296 m->handler.pfdcount--;
1297
1298 memmove(pfds + i, pfds + i + 1,
1299 (m->handler.copycount - i - 1)
1300 * sizeof(struct pollfd));
1301 m->handler.copycount--;
1302
1303 i--;
1304 }
1305 }
1306 }
1307
1308 /* Add all timers that have popped to the ready list. */
1309 static unsigned int thread_process_timers(struct pqueue *queue,
1310 struct timeval *timenow)
1311 {
1312 struct thread *thread;
1313 unsigned int ready = 0;
1314
1315 while (queue->size) {
1316 thread = queue->array[0];
1317 if (timercmp(timenow, &thread->u.sands, <))
1318 return ready;
1319 pqueue_dequeue(queue);
1320 thread->type = THREAD_READY;
1321 thread_list_add(&thread->master->ready, thread);
1322 ready++;
1323 }
1324 return ready;
1325 }
1326
1327 /* process a list en masse, e.g. for event thread lists */
1328 static unsigned int thread_process(struct thread_list *list)
1329 {
1330 struct thread *thread;
1331 struct thread *next;
1332 unsigned int ready = 0;
1333
1334 for (thread = list->head; thread; thread = next) {
1335 next = thread->next;
1336 thread_list_delete(list, thread);
1337 thread->type = THREAD_READY;
1338 thread_list_add(&thread->master->ready, thread);
1339 ready++;
1340 }
1341 return ready;
1342 }
1343
1344
1345 /* Fetch next ready thread. */
1346 struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
1347 {
1348 struct thread *thread = NULL;
1349 struct timeval now;
1350 struct timeval zerotime = {0, 0};
1351 struct timeval tv;
1352 struct timeval *tw = NULL;
1353
1354 int num = 0;
1355
1356 do {
1357 /* Handle signals if any */
1358 if (m->handle_signals)
1359 quagga_sigevent_process();
1360
1361 pthread_mutex_lock(&m->mtx);
1362
1363 /* Process any pending cancellation requests */
1364 do_thread_cancel(m);
1365
1366 /*
1367 * Attempt to flush ready queue before going into poll().
1368 * This is performance-critical. Think twice before modifying.
1369 */
1370 if ((thread = thread_trim_head(&m->ready))) {
1371 fetch = thread_run(m, thread, fetch);
1372 if (fetch->ref)
1373 *fetch->ref = NULL;
1374 pthread_mutex_unlock(&m->mtx);
1375 break;
1376 }
1377
1378 /* otherwise, tick through scheduling sequence */
1379
1380 /*
1381 * Post events to ready queue. This must come before the
1382 * following block since events should occur immediately
1383 */
1384 thread_process(&m->event);
1385
1386 /*
1387 * If there are no tasks on the ready queue, we will poll()
1388 * until a timer expires or we receive I/O, whichever comes
1389 * first. The strategy for doing this is:
1390 *
1391 * - If there are events pending, set the poll() timeout to zero
1392 * - If there are no events pending, but there are timers
1393 * pending, set the
1394 * timeout to the smallest remaining time on any timer
1395 * - If there are neither timers nor events pending, but there
1396 * are file
1397 * descriptors pending, block indefinitely in poll()
1398 * - If nothing is pending, it's time for the application to die
1399 *
1400 * In every case except the last, we need to hit poll() at least
1401 * once per loop to avoid starvation by events
1402 */
1403 if (m->ready.count == 0)
1404 tw = thread_timer_wait(m->timer, &tv);
1405
1406 if (m->ready.count != 0 || (tw && !timercmp(tw, &zerotime, >)))
1407 tw = &zerotime;
1408
1409 if (!tw && m->handler.pfdcount == 0) { /* die */
1410 pthread_mutex_unlock(&m->mtx);
1411 fetch = NULL;
1412 break;
1413 }
1414
1415 /*
1416 * Copy pollfd array + # active pollfds in it. Not necessary to
1417 * copy the array size as this is fixed.
1418 */
1419 m->handler.copycount = m->handler.pfdcount;
1420 memcpy(m->handler.copy, m->handler.pfds,
1421 m->handler.copycount * sizeof(struct pollfd));
1422
1423 pthread_mutex_unlock(&m->mtx);
1424 {
1425 num = fd_poll(m, m->handler.copy, m->handler.pfdsize,
1426 m->handler.copycount, tw);
1427 }
1428 pthread_mutex_lock(&m->mtx);
1429
1430 /* Handle any errors received in poll() */
1431 if (num < 0) {
1432 if (errno == EINTR) {
1433 pthread_mutex_unlock(&m->mtx);
1434 /* loop around to signal handler */
1435 continue;
1436 }
1437
1438 /* else die */
1439 zlog_warn("poll() error: %s", safe_strerror(errno));
1440 pthread_mutex_unlock(&m->mtx);
1441 fetch = NULL;
1442 break;
1443 }
1444
1445 /* Post timers to ready queue. */
1446 monotime(&now);
1447 thread_process_timers(m->timer, &now);
1448
1449 /* Post I/O to ready queue. */
1450 if (num > 0)
1451 thread_process_io(m, num);
1452
1453 pthread_mutex_unlock(&m->mtx);
1454
1455 } while (!thread && m->spin);
1456
1457 return fetch;
1458 }
1459
1460 static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
1461 {
1462 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1463 + (a.tv_usec - b.tv_usec));
1464 }
1465
1466 unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1467 unsigned long *cputime)
1468 {
1469 /* This is 'user + sys' time. */
1470 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1471 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1472 return timeval_elapsed(now->real, start->real);
1473 }
1474
1475 /* We should aim to yield after yield milliseconds, which defaults
1476 to THREAD_YIELD_TIME_SLOT .
1477 Note: we are using real (wall clock) time for this calculation.
1478 It could be argued that CPU time may make more sense in certain
1479 contexts. The things to consider are whether the thread may have
1480 blocked (in which case wall time increases, but CPU time does not),
1481 or whether the system is heavily loaded with other processes competing
1482 for CPU time. On balance, wall clock time seems to make sense.
1483 Plus it has the added benefit that gettimeofday should be faster
1484 than calling getrusage. */
1485 int thread_should_yield(struct thread *thread)
1486 {
1487 int result;
1488 pthread_mutex_lock(&thread->mtx);
1489 {
1490 result = monotime_since(&thread->real, NULL)
1491 > (int64_t)thread->yield;
1492 }
1493 pthread_mutex_unlock(&thread->mtx);
1494 return result;
1495 }
1496
1497 void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
1498 {
1499 pthread_mutex_lock(&thread->mtx);
1500 {
1501 thread->yield = yield_time;
1502 }
1503 pthread_mutex_unlock(&thread->mtx);
1504 }
1505
1506 void thread_getrusage(RUSAGE_T *r)
1507 {
1508 monotime(&r->real);
1509 getrusage(RUSAGE_SELF, &(r->cpu));
1510 }
1511
1512 /*
1513 * Call a thread.
1514 *
1515 * This function will atomically update the thread's usage history. At present
1516 * this is the only spot where usage history is written. Nevertheless the code
1517 * has been written such that the introduction of writers in the future should
1518 * not need to update it provided the writers atomically perform only the
1519 * operations done here, i.e. updating the total and maximum times. In
1520 * particular, the maximum real and cpu times must be monotonically increasing
1521 * or this code is not correct.
1522 */
1523 void thread_call(struct thread *thread)
1524 {
1525 _Atomic unsigned long realtime, cputime;
1526 unsigned long exp;
1527 unsigned long helper;
1528 RUSAGE_T before, after;
1529
1530 GETRUSAGE(&before);
1531 thread->real = before.real;
1532
1533 pthread_setspecific(thread_current, thread);
1534 (*thread->func)(thread);
1535 pthread_setspecific(thread_current, NULL);
1536
1537 GETRUSAGE(&after);
1538
1539 realtime = thread_consumed_time(&after, &before, &helper);
1540 cputime = helper;
1541
1542 /* update realtime */
1543 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1544 memory_order_seq_cst);
1545 exp = atomic_load_explicit(&thread->hist->real.max,
1546 memory_order_seq_cst);
1547 while (exp < realtime
1548 && !atomic_compare_exchange_weak_explicit(
1549 &thread->hist->real.max, &exp, realtime,
1550 memory_order_seq_cst, memory_order_seq_cst))
1551 ;
1552
1553 /* update cputime */
1554 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1555 memory_order_seq_cst);
1556 exp = atomic_load_explicit(&thread->hist->cpu.max,
1557 memory_order_seq_cst);
1558 while (exp < cputime
1559 && !atomic_compare_exchange_weak_explicit(
1560 &thread->hist->cpu.max, &exp, cputime,
1561 memory_order_seq_cst, memory_order_seq_cst))
1562 ;
1563
1564 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1565 memory_order_seq_cst);
1566 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1567 memory_order_seq_cst);
1568
1569 #ifdef CONSUMED_TIME_CHECK
1570 if (realtime > CONSUMED_TIME_CHECK) {
1571 /*
1572 * We have a CPU Hog on our hands.
1573 * Whinge about it now, so we're aware this is yet another task
1574 * to fix.
1575 */
1576 zlog_warn(
1577 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1578 thread->funcname, (unsigned long)thread->func,
1579 realtime / 1000, cputime / 1000);
1580 }
1581 #endif /* CONSUMED_TIME_CHECK */
1582 }
1583
1584 /* Execute thread */
1585 void funcname_thread_execute(struct thread_master *m,
1586 int (*func)(struct thread *), void *arg, int val,
1587 debugargdef)
1588 {
1589 struct cpu_thread_history tmp;
1590 struct thread dummy;
1591
1592 memset(&dummy, 0, sizeof(struct thread));
1593
1594 pthread_mutex_init(&dummy.mtx, NULL);
1595 dummy.type = THREAD_EVENT;
1596 dummy.add_type = THREAD_EXECUTE;
1597 dummy.master = NULL;
1598 dummy.arg = arg;
1599 dummy.u.val = val;
1600
1601 tmp.func = dummy.func = func;
1602 tmp.funcname = dummy.funcname = funcname;
1603 dummy.hist = hash_get(m->cpu_record, &tmp,
1604 (void *(*)(void *))cpu_record_hash_alloc);
1605
1606 dummy.schedfrom = schedfrom;
1607 dummy.schedfrom_line = fromln;
1608
1609 thread_call(&dummy);
1610 }