]> git.proxmox.com Git - mirror_frr.git/blob - lib/thread.c
Merge pull request #2726 from sworleys/Netlink-Filter-AFI
[mirror_frr.git] / lib / thread.c
1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
21 /* #define DEBUG */
22
23 #include <zebra.h>
24 #include <sys/resource.h>
25
26 #include "thread.h"
27 #include "memory.h"
28 #include "log.h"
29 #include "hash.h"
30 #include "pqueue.h"
31 #include "command.h"
32 #include "sigevent.h"
33 #include "network.h"
34 #include "jhash.h"
35 #include "frratomic.h"
36
37 DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
38 DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
39 DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
40
41 #if defined(__APPLE__)
42 #include <mach/mach.h>
43 #include <mach/mach_time.h>
44 #endif
45
46 #define AWAKEN(m) \
47 do { \
48 static unsigned char wakebyte = 0x01; \
49 write(m->io_pipe[1], &wakebyte, 1); \
50 } while (0);
51
52 /* control variable for initializer */
53 pthread_once_t init_once = PTHREAD_ONCE_INIT;
54 pthread_key_t thread_current;
55
56 pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
57 static struct list *masters;
58
59
60 /* CLI start ---------------------------------------------------------------- */
61 static unsigned int cpu_record_hash_key(struct cpu_thread_history *a)
62 {
63 int size = sizeof(a->func);
64
65 return jhash(&a->func, size, 0);
66 }
67
68 static int cpu_record_hash_cmp(const struct cpu_thread_history *a,
69 const struct cpu_thread_history *b)
70 {
71 return a->func == b->func;
72 }
73
74 static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
75 {
76 struct cpu_thread_history *new;
77 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
78 new->func = a->func;
79 new->funcname = a->funcname;
80 return new;
81 }
82
83 static void cpu_record_hash_free(void *a)
84 {
85 struct cpu_thread_history *hist = a;
86
87 XFREE(MTYPE_THREAD_STATS, hist);
88 }
89
90 static void vty_out_cpu_thread_history(struct vty *vty,
91 struct cpu_thread_history *a)
92 {
93 vty_out(vty, "%5d %10lu.%03lu %9u %8lu %9lu %8lu %9lu", a->total_active,
94 a->cpu.total / 1000, a->cpu.total % 1000, a->total_calls,
95 a->cpu.total / a->total_calls, a->cpu.max,
96 a->real.total / a->total_calls, a->real.max);
97 vty_out(vty, " %c%c%c%c%c %s\n",
98 a->types & (1 << THREAD_READ) ? 'R' : ' ',
99 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
100 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
101 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
102 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
103 }
104
105 static void cpu_record_hash_print(struct hash_backet *bucket, void *args[])
106 {
107 struct cpu_thread_history *totals = args[0];
108 struct cpu_thread_history copy;
109 struct vty *vty = args[1];
110 uint8_t *filter = args[2];
111
112 struct cpu_thread_history *a = bucket->data;
113
114 copy.total_active =
115 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
116 copy.total_calls =
117 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
118 copy.cpu.total =
119 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
120 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
121 copy.real.total =
122 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
123 copy.real.max =
124 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
125 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
126 copy.funcname = a->funcname;
127
128 if (!(copy.types & *filter))
129 return;
130
131 vty_out_cpu_thread_history(vty, &copy);
132 totals->total_active += copy.total_active;
133 totals->total_calls += copy.total_calls;
134 totals->real.total += copy.real.total;
135 if (totals->real.max < copy.real.max)
136 totals->real.max = copy.real.max;
137 totals->cpu.total += copy.cpu.total;
138 if (totals->cpu.max < copy.cpu.max)
139 totals->cpu.max = copy.cpu.max;
140 }
141
142 static void cpu_record_print(struct vty *vty, uint8_t filter)
143 {
144 struct cpu_thread_history tmp;
145 void *args[3] = {&tmp, vty, &filter};
146 struct thread_master *m;
147 struct listnode *ln;
148
149 memset(&tmp, 0, sizeof tmp);
150 tmp.funcname = "TOTAL";
151 tmp.types = filter;
152
153 pthread_mutex_lock(&masters_mtx);
154 {
155 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
156 const char *name = m->name ? m->name : "main";
157
158 char underline[strlen(name) + 1];
159 memset(underline, '-', sizeof(underline));
160 underline[sizeof(underline) - 1] = '\0';
161
162 vty_out(vty, "\n");
163 vty_out(vty, "Showing statistics for pthread %s\n",
164 name);
165 vty_out(vty, "-------------------------------%s\n",
166 underline);
167 vty_out(vty, "%21s %18s %18s\n", "",
168 "CPU (user+system):", "Real (wall-clock):");
169 vty_out(vty,
170 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
171 vty_out(vty, " Avg uSec Max uSecs");
172 vty_out(vty, " Type Thread\n");
173
174 if (m->cpu_record->count)
175 hash_iterate(
176 m->cpu_record,
177 (void (*)(struct hash_backet *,
178 void *))cpu_record_hash_print,
179 args);
180 else
181 vty_out(vty, "No data to display yet.\n");
182
183 vty_out(vty, "\n");
184 }
185 }
186 pthread_mutex_unlock(&masters_mtx);
187
188 vty_out(vty, "\n");
189 vty_out(vty, "Total thread statistics\n");
190 vty_out(vty, "-------------------------\n");
191 vty_out(vty, "%21s %18s %18s\n", "",
192 "CPU (user+system):", "Real (wall-clock):");
193 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
194 vty_out(vty, " Avg uSec Max uSecs");
195 vty_out(vty, " Type Thread\n");
196
197 if (tmp.total_calls > 0)
198 vty_out_cpu_thread_history(vty, &tmp);
199 }
200
201 static void cpu_record_hash_clear(struct hash_backet *bucket, void *args[])
202 {
203 uint8_t *filter = args[0];
204 struct hash *cpu_record = args[1];
205
206 struct cpu_thread_history *a = bucket->data;
207
208 if (!(a->types & *filter))
209 return;
210
211 hash_release(cpu_record, bucket->data);
212 }
213
214 static void cpu_record_clear(uint8_t filter)
215 {
216 uint8_t *tmp = &filter;
217 struct thread_master *m;
218 struct listnode *ln;
219
220 pthread_mutex_lock(&masters_mtx);
221 {
222 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
223 pthread_mutex_lock(&m->mtx);
224 {
225 void *args[2] = {tmp, m->cpu_record};
226 hash_iterate(
227 m->cpu_record,
228 (void (*)(struct hash_backet *,
229 void *))cpu_record_hash_clear,
230 args);
231 }
232 pthread_mutex_unlock(&m->mtx);
233 }
234 }
235 pthread_mutex_unlock(&masters_mtx);
236 }
237
238 static uint8_t parse_filter(const char *filterstr)
239 {
240 int i = 0;
241 int filter = 0;
242
243 while (filterstr[i] != '\0') {
244 switch (filterstr[i]) {
245 case 'r':
246 case 'R':
247 filter |= (1 << THREAD_READ);
248 break;
249 case 'w':
250 case 'W':
251 filter |= (1 << THREAD_WRITE);
252 break;
253 case 't':
254 case 'T':
255 filter |= (1 << THREAD_TIMER);
256 break;
257 case 'e':
258 case 'E':
259 filter |= (1 << THREAD_EVENT);
260 break;
261 case 'x':
262 case 'X':
263 filter |= (1 << THREAD_EXECUTE);
264 break;
265 default:
266 break;
267 }
268 ++i;
269 }
270 return filter;
271 }
272
273 DEFUN (show_thread_cpu,
274 show_thread_cpu_cmd,
275 "show thread cpu [FILTER]",
276 SHOW_STR
277 "Thread information\n"
278 "Thread CPU usage\n"
279 "Display filter (rwtexb)\n")
280 {
281 uint8_t filter = (uint8_t)-1U;
282 int idx = 0;
283
284 if (argv_find(argv, argc, "FILTER", &idx)) {
285 filter = parse_filter(argv[idx]->arg);
286 if (!filter) {
287 vty_out(vty,
288 "Invalid filter \"%s\" specified; must contain at least"
289 "one of 'RWTEXB'\n",
290 argv[idx]->arg);
291 return CMD_WARNING;
292 }
293 }
294
295 cpu_record_print(vty, filter);
296 return CMD_SUCCESS;
297 }
298
299 static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
300 {
301 const char *name = m->name ? m->name : "main";
302 char underline[strlen(name) + 1];
303 uint32_t i;
304
305 memset(underline, '-', sizeof(underline));
306 underline[sizeof(underline) - 1] = '\0';
307
308 vty_out(vty, "\nShowing poll FD's for %s\n", name);
309 vty_out(vty, "----------------------%s\n", underline);
310 vty_out(vty, "Count: %u\n", (uint32_t)m->handler.pfdcount);
311 for (i = 0; i < m->handler.pfdcount; i++)
312 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\n", i,
313 m->handler.pfds[i].fd,
314 m->handler.pfds[i].events,
315 m->handler.pfds[i].revents);
316 }
317
318 DEFUN (show_thread_poll,
319 show_thread_poll_cmd,
320 "show thread poll",
321 SHOW_STR
322 "Thread information\n"
323 "Show poll FD's and information\n")
324 {
325 struct listnode *node;
326 struct thread_master *m;
327
328 pthread_mutex_lock(&masters_mtx);
329 {
330 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
331 show_thread_poll_helper(vty, m);
332 }
333 }
334 pthread_mutex_unlock(&masters_mtx);
335
336 return CMD_SUCCESS;
337 }
338
339
340 DEFUN (clear_thread_cpu,
341 clear_thread_cpu_cmd,
342 "clear thread cpu [FILTER]",
343 "Clear stored data in all pthreads\n"
344 "Thread information\n"
345 "Thread CPU usage\n"
346 "Display filter (rwtexb)\n")
347 {
348 uint8_t filter = (uint8_t)-1U;
349 int idx = 0;
350
351 if (argv_find(argv, argc, "FILTER", &idx)) {
352 filter = parse_filter(argv[idx]->arg);
353 if (!filter) {
354 vty_out(vty,
355 "Invalid filter \"%s\" specified; must contain at least"
356 "one of 'RWTEXB'\n",
357 argv[idx]->arg);
358 return CMD_WARNING;
359 }
360 }
361
362 cpu_record_clear(filter);
363 return CMD_SUCCESS;
364 }
365
366 void thread_cmd_init(void)
367 {
368 install_element(VIEW_NODE, &show_thread_cpu_cmd);
369 install_element(VIEW_NODE, &show_thread_poll_cmd);
370 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
371 }
372 /* CLI end ------------------------------------------------------------------ */
373
374
375 static int thread_timer_cmp(void *a, void *b)
376 {
377 struct thread *thread_a = a;
378 struct thread *thread_b = b;
379
380 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, <))
381 return -1;
382 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, >))
383 return 1;
384 return 0;
385 }
386
387 static void thread_timer_update(void *node, int actual_position)
388 {
389 struct thread *thread = node;
390
391 thread->index = actual_position;
392 }
393
394 static void cancelreq_del(void *cr)
395 {
396 XFREE(MTYPE_TMP, cr);
397 }
398
399 /* initializer, only ever called once */
400 static void initializer()
401 {
402 pthread_key_create(&thread_current, NULL);
403 }
404
405 struct thread_master *thread_master_create(const char *name)
406 {
407 struct thread_master *rv;
408 struct rlimit limit;
409
410 pthread_once(&init_once, &initializer);
411
412 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
413 if (rv == NULL)
414 return NULL;
415
416 /* Initialize master mutex */
417 pthread_mutex_init(&rv->mtx, NULL);
418 pthread_cond_init(&rv->cancel_cond, NULL);
419
420 /* Set name */
421 rv->name = name ? XSTRDUP(MTYPE_THREAD_MASTER, name) : NULL;
422
423 /* Initialize I/O task data structures */
424 getrlimit(RLIMIT_NOFILE, &limit);
425 rv->fd_limit = (int)limit.rlim_cur;
426 rv->read =
427 XCALLOC(MTYPE_THREAD, sizeof(struct thread *) * rv->fd_limit);
428 if (rv->read == NULL) {
429 XFREE(MTYPE_THREAD_MASTER, rv);
430 return NULL;
431 }
432 rv->write =
433 XCALLOC(MTYPE_THREAD, sizeof(struct thread *) * rv->fd_limit);
434 if (rv->write == NULL) {
435 XFREE(MTYPE_THREAD, rv->read);
436 XFREE(MTYPE_THREAD_MASTER, rv);
437 return NULL;
438 }
439
440 rv->cpu_record = hash_create_size(
441 8, (unsigned int (*)(void *))cpu_record_hash_key,
442 (int (*)(const void *, const void *))cpu_record_hash_cmp,
443 "Thread Hash");
444
445
446 /* Initialize the timer queues */
447 rv->timer = pqueue_create();
448 rv->timer->cmp = thread_timer_cmp;
449 rv->timer->update = thread_timer_update;
450
451 /* Initialize thread_fetch() settings */
452 rv->spin = true;
453 rv->handle_signals = true;
454
455 /* Set pthread owner, should be updated by actual owner */
456 rv->owner = pthread_self();
457 rv->cancel_req = list_new();
458 rv->cancel_req->del = cancelreq_del;
459 rv->canceled = true;
460
461 /* Initialize pipe poker */
462 pipe(rv->io_pipe);
463 set_nonblocking(rv->io_pipe[0]);
464 set_nonblocking(rv->io_pipe[1]);
465
466 /* Initialize data structures for poll() */
467 rv->handler.pfdsize = rv->fd_limit;
468 rv->handler.pfdcount = 0;
469 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
470 sizeof(struct pollfd) * rv->handler.pfdsize);
471 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
472 sizeof(struct pollfd) * rv->handler.pfdsize);
473
474 /* add to list of threadmasters */
475 pthread_mutex_lock(&masters_mtx);
476 {
477 if (!masters)
478 masters = list_new();
479
480 listnode_add(masters, rv);
481 }
482 pthread_mutex_unlock(&masters_mtx);
483
484 return rv;
485 }
486
487 void thread_master_set_name(struct thread_master *master, const char *name)
488 {
489 pthread_mutex_lock(&master->mtx);
490 {
491 if (master->name)
492 XFREE(MTYPE_THREAD_MASTER, master->name);
493 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
494 }
495 pthread_mutex_unlock(&master->mtx);
496 }
497
498 /* Add a new thread to the list. */
499 static void thread_list_add(struct thread_list *list, struct thread *thread)
500 {
501 thread->next = NULL;
502 thread->prev = list->tail;
503 if (list->tail)
504 list->tail->next = thread;
505 else
506 list->head = thread;
507 list->tail = thread;
508 list->count++;
509 }
510
511 /* Delete a thread from the list. */
512 static struct thread *thread_list_delete(struct thread_list *list,
513 struct thread *thread)
514 {
515 if (thread->next)
516 thread->next->prev = thread->prev;
517 else
518 list->tail = thread->prev;
519 if (thread->prev)
520 thread->prev->next = thread->next;
521 else
522 list->head = thread->next;
523 thread->next = thread->prev = NULL;
524 list->count--;
525 return thread;
526 }
527
528 /* Thread list is empty or not. */
529 static int thread_empty(struct thread_list *list)
530 {
531 return list->head ? 0 : 1;
532 }
533
534 /* Delete top of the list and return it. */
535 static struct thread *thread_trim_head(struct thread_list *list)
536 {
537 if (!thread_empty(list))
538 return thread_list_delete(list, list->head);
539 return NULL;
540 }
541
542 /* Move thread to unuse list. */
543 static void thread_add_unuse(struct thread_master *m, struct thread *thread)
544 {
545 assert(m != NULL && thread != NULL);
546 assert(thread->next == NULL);
547 assert(thread->prev == NULL);
548 thread->ref = NULL;
549
550 thread->type = THREAD_UNUSED;
551 thread->hist->total_active--;
552 thread_list_add(&m->unuse, thread);
553 }
554
555 /* Free all unused thread. */
556 static void thread_list_free(struct thread_master *m, struct thread_list *list)
557 {
558 struct thread *t;
559 struct thread *next;
560
561 for (t = list->head; t; t = next) {
562 next = t->next;
563 XFREE(MTYPE_THREAD, t);
564 list->count--;
565 m->alloc--;
566 }
567 }
568
569 static void thread_array_free(struct thread_master *m,
570 struct thread **thread_array)
571 {
572 struct thread *t;
573 int index;
574
575 for (index = 0; index < m->fd_limit; ++index) {
576 t = thread_array[index];
577 if (t) {
578 thread_array[index] = NULL;
579 XFREE(MTYPE_THREAD, t);
580 m->alloc--;
581 }
582 }
583 XFREE(MTYPE_THREAD, thread_array);
584 }
585
586 static void thread_queue_free(struct thread_master *m, struct pqueue *queue)
587 {
588 int i;
589
590 for (i = 0; i < queue->size; i++)
591 XFREE(MTYPE_THREAD, queue->array[i]);
592
593 m->alloc -= queue->size;
594 pqueue_delete(queue);
595 }
596
597 /*
598 * thread_master_free_unused
599 *
600 * As threads are finished with they are put on the
601 * unuse list for later reuse.
602 * If we are shutting down, Free up unused threads
603 * So we can see if we forget to shut anything off
604 */
605 void thread_master_free_unused(struct thread_master *m)
606 {
607 pthread_mutex_lock(&m->mtx);
608 {
609 struct thread *t;
610 while ((t = thread_trim_head(&m->unuse)) != NULL) {
611 pthread_mutex_destroy(&t->mtx);
612 XFREE(MTYPE_THREAD, t);
613 }
614 }
615 pthread_mutex_unlock(&m->mtx);
616 }
617
618 /* Stop thread scheduler. */
619 void thread_master_free(struct thread_master *m)
620 {
621 pthread_mutex_lock(&masters_mtx);
622 {
623 listnode_delete(masters, m);
624 if (masters->count == 0) {
625 list_delete_and_null(&masters);
626 }
627 }
628 pthread_mutex_unlock(&masters_mtx);
629
630 thread_array_free(m, m->read);
631 thread_array_free(m, m->write);
632 thread_queue_free(m, m->timer);
633 thread_list_free(m, &m->event);
634 thread_list_free(m, &m->ready);
635 thread_list_free(m, &m->unuse);
636 pthread_mutex_destroy(&m->mtx);
637 pthread_cond_destroy(&m->cancel_cond);
638 close(m->io_pipe[0]);
639 close(m->io_pipe[1]);
640 list_delete_and_null(&m->cancel_req);
641 m->cancel_req = NULL;
642
643 hash_clean(m->cpu_record, cpu_record_hash_free);
644 hash_free(m->cpu_record);
645 m->cpu_record = NULL;
646
647 if (m->name)
648 XFREE(MTYPE_THREAD_MASTER, m->name);
649 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
650 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
651 XFREE(MTYPE_THREAD_MASTER, m);
652 }
653
654 /* Return remain time in second. */
655 unsigned long thread_timer_remain_second(struct thread *thread)
656 {
657 int64_t remain;
658
659 pthread_mutex_lock(&thread->mtx);
660 {
661 remain = monotime_until(&thread->u.sands, NULL) / 1000000LL;
662 }
663 pthread_mutex_unlock(&thread->mtx);
664
665 return remain < 0 ? 0 : remain;
666 }
667
668 #define debugargdef const char *funcname, const char *schedfrom, int fromln
669 #define debugargpass funcname, schedfrom, fromln
670
671 struct timeval thread_timer_remain(struct thread *thread)
672 {
673 struct timeval remain;
674 pthread_mutex_lock(&thread->mtx);
675 {
676 monotime_until(&thread->u.sands, &remain);
677 }
678 pthread_mutex_unlock(&thread->mtx);
679 return remain;
680 }
681
682 /* Get new thread. */
683 static struct thread *thread_get(struct thread_master *m, uint8_t type,
684 int (*func)(struct thread *), void *arg,
685 debugargdef)
686 {
687 struct thread *thread = thread_trim_head(&m->unuse);
688 struct cpu_thread_history tmp;
689
690 if (!thread) {
691 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
692 /* mutex only needs to be initialized at struct creation. */
693 pthread_mutex_init(&thread->mtx, NULL);
694 m->alloc++;
695 }
696
697 thread->type = type;
698 thread->add_type = type;
699 thread->master = m;
700 thread->arg = arg;
701 thread->index = -1;
702 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
703 thread->ref = NULL;
704
705 /*
706 * So if the passed in funcname is not what we have
707 * stored that means the thread->hist needs to be
708 * updated. We keep the last one around in unused
709 * under the assumption that we are probably
710 * going to immediately allocate the same
711 * type of thread.
712 * This hopefully saves us some serious
713 * hash_get lookups.
714 */
715 if (thread->funcname != funcname || thread->func != func) {
716 tmp.func = func;
717 tmp.funcname = funcname;
718 thread->hist =
719 hash_get(m->cpu_record, &tmp,
720 (void *(*)(void *))cpu_record_hash_alloc);
721 }
722 thread->hist->total_active++;
723 thread->func = func;
724 thread->funcname = funcname;
725 thread->schedfrom = schedfrom;
726 thread->schedfrom_line = fromln;
727
728 return thread;
729 }
730
731 static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
732 nfds_t count, const struct timeval *timer_wait)
733 {
734 /* If timer_wait is null here, that means poll() should block
735 * indefinitely,
736 * unless the thread_master has overriden it by setting
737 * ->selectpoll_timeout.
738 * If the value is positive, it specifies the maximum number of
739 * milliseconds
740 * to wait. If the timeout is -1, it specifies that we should never wait
741 * and
742 * always return immediately even if no event is detected. If the value
743 * is
744 * zero, the behavior is default. */
745 int timeout = -1;
746
747 /* number of file descriptors with events */
748 int num;
749
750 if (timer_wait != NULL
751 && m->selectpoll_timeout == 0) // use the default value
752 timeout = (timer_wait->tv_sec * 1000)
753 + (timer_wait->tv_usec / 1000);
754 else if (m->selectpoll_timeout > 0) // use the user's timeout
755 timeout = m->selectpoll_timeout;
756 else if (m->selectpoll_timeout
757 < 0) // effect a poll (return immediately)
758 timeout = 0;
759
760 /* add poll pipe poker */
761 assert(count + 1 < pfdsize);
762 pfds[count].fd = m->io_pipe[0];
763 pfds[count].events = POLLIN;
764 pfds[count].revents = 0x00;
765
766 num = poll(pfds, count + 1, timeout);
767
768 unsigned char trash[64];
769 if (num > 0 && pfds[count].revents != 0 && num--)
770 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
771 ;
772
773 return num;
774 }
775
776 /* Add new read thread. */
777 struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
778 int (*func)(struct thread *),
779 void *arg, int fd,
780 struct thread **t_ptr,
781 debugargdef)
782 {
783 struct thread *thread = NULL;
784
785 assert(fd >= 0 && fd < m->fd_limit);
786 pthread_mutex_lock(&m->mtx);
787 {
788 if (t_ptr
789 && *t_ptr) // thread is already scheduled; don't reschedule
790 {
791 pthread_mutex_unlock(&m->mtx);
792 return NULL;
793 }
794
795 /* default to a new pollfd */
796 nfds_t queuepos = m->handler.pfdcount;
797
798 /* if we already have a pollfd for our file descriptor, find and
799 * use it */
800 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
801 if (m->handler.pfds[i].fd == fd) {
802 queuepos = i;
803 break;
804 }
805
806 /* make sure we have room for this fd + pipe poker fd */
807 assert(queuepos + 1 < m->handler.pfdsize);
808
809 thread = thread_get(m, dir, func, arg, debugargpass);
810
811 m->handler.pfds[queuepos].fd = fd;
812 m->handler.pfds[queuepos].events |=
813 (dir == THREAD_READ ? POLLIN : POLLOUT);
814
815 if (queuepos == m->handler.pfdcount)
816 m->handler.pfdcount++;
817
818 if (thread) {
819 pthread_mutex_lock(&thread->mtx);
820 {
821 thread->u.fd = fd;
822 if (dir == THREAD_READ)
823 m->read[thread->u.fd] = thread;
824 else
825 m->write[thread->u.fd] = thread;
826 }
827 pthread_mutex_unlock(&thread->mtx);
828
829 if (t_ptr) {
830 *t_ptr = thread;
831 thread->ref = t_ptr;
832 }
833 }
834
835 AWAKEN(m);
836 }
837 pthread_mutex_unlock(&m->mtx);
838
839 return thread;
840 }
841
842 static struct thread *
843 funcname_thread_add_timer_timeval(struct thread_master *m,
844 int (*func)(struct thread *), int type,
845 void *arg, struct timeval *time_relative,
846 struct thread **t_ptr, debugargdef)
847 {
848 struct thread *thread;
849 struct pqueue *queue;
850
851 assert(m != NULL);
852
853 assert(type == THREAD_TIMER);
854 assert(time_relative);
855
856 pthread_mutex_lock(&m->mtx);
857 {
858 if (t_ptr
859 && *t_ptr) // thread is already scheduled; don't reschedule
860 {
861 pthread_mutex_unlock(&m->mtx);
862 return NULL;
863 }
864
865 queue = m->timer;
866 thread = thread_get(m, type, func, arg, debugargpass);
867
868 pthread_mutex_lock(&thread->mtx);
869 {
870 monotime(&thread->u.sands);
871 timeradd(&thread->u.sands, time_relative,
872 &thread->u.sands);
873 pqueue_enqueue(thread, queue);
874 if (t_ptr) {
875 *t_ptr = thread;
876 thread->ref = t_ptr;
877 }
878 }
879 pthread_mutex_unlock(&thread->mtx);
880
881 AWAKEN(m);
882 }
883 pthread_mutex_unlock(&m->mtx);
884
885 return thread;
886 }
887
888
889 /* Add timer event thread. */
890 struct thread *funcname_thread_add_timer(struct thread_master *m,
891 int (*func)(struct thread *),
892 void *arg, long timer,
893 struct thread **t_ptr, debugargdef)
894 {
895 struct timeval trel;
896
897 assert(m != NULL);
898
899 trel.tv_sec = timer;
900 trel.tv_usec = 0;
901
902 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
903 &trel, t_ptr, debugargpass);
904 }
905
906 /* Add timer event thread with "millisecond" resolution */
907 struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
908 int (*func)(struct thread *),
909 void *arg, long timer,
910 struct thread **t_ptr,
911 debugargdef)
912 {
913 struct timeval trel;
914
915 assert(m != NULL);
916
917 trel.tv_sec = timer / 1000;
918 trel.tv_usec = 1000 * (timer % 1000);
919
920 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
921 &trel, t_ptr, debugargpass);
922 }
923
924 /* Add timer event thread with "millisecond" resolution */
925 struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
926 int (*func)(struct thread *),
927 void *arg, struct timeval *tv,
928 struct thread **t_ptr, debugargdef)
929 {
930 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
931 t_ptr, debugargpass);
932 }
933
934 /* Add simple event thread. */
935 struct thread *funcname_thread_add_event(struct thread_master *m,
936 int (*func)(struct thread *),
937 void *arg, int val,
938 struct thread **t_ptr, debugargdef)
939 {
940 struct thread *thread;
941
942 assert(m != NULL);
943
944 pthread_mutex_lock(&m->mtx);
945 {
946 if (t_ptr
947 && *t_ptr) // thread is already scheduled; don't reschedule
948 {
949 pthread_mutex_unlock(&m->mtx);
950 return NULL;
951 }
952
953 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
954 pthread_mutex_lock(&thread->mtx);
955 {
956 thread->u.val = val;
957 thread_list_add(&m->event, thread);
958 }
959 pthread_mutex_unlock(&thread->mtx);
960
961 if (t_ptr) {
962 *t_ptr = thread;
963 thread->ref = t_ptr;
964 }
965
966 AWAKEN(m);
967 }
968 pthread_mutex_unlock(&m->mtx);
969
970 return thread;
971 }
972
973 /* Thread cancellation ------------------------------------------------------ */
974
975 /**
976 * NOT's out the .events field of pollfd corresponding to the given file
977 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
978 *
979 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
980 * implementation for details.
981 *
982 * @param master
983 * @param fd
984 * @param state the event to cancel. One or more (OR'd together) of the
985 * following:
986 * - POLLIN
987 * - POLLOUT
988 */
989 static void thread_cancel_rw(struct thread_master *master, int fd, short state)
990 {
991 bool found = false;
992
993 /* Cancel POLLHUP too just in case some bozo set it */
994 state |= POLLHUP;
995
996 /* find the index of corresponding pollfd */
997 nfds_t i;
998
999 for (i = 0; i < master->handler.pfdcount; i++)
1000 if (master->handler.pfds[i].fd == fd) {
1001 found = true;
1002 break;
1003 }
1004
1005 if (!found) {
1006 zlog_debug(
1007 "[!] Received cancellation request for nonexistent rw job");
1008 zlog_debug("[!] threadmaster: %s | fd: %d",
1009 master->name ? master->name : "", fd);
1010 return;
1011 }
1012
1013 /* NOT out event. */
1014 master->handler.pfds[i].events &= ~(state);
1015
1016 /* If all events are canceled, delete / resize the pollfd array. */
1017 if (master->handler.pfds[i].events == 0) {
1018 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
1019 (master->handler.pfdcount - i - 1)
1020 * sizeof(struct pollfd));
1021 master->handler.pfdcount--;
1022 }
1023
1024 /* If we have the same pollfd in the copy, perform the same operations,
1025 * otherwise return. */
1026 if (i >= master->handler.copycount)
1027 return;
1028
1029 master->handler.copy[i].events &= ~(state);
1030
1031 if (master->handler.copy[i].events == 0) {
1032 memmove(master->handler.copy + i, master->handler.copy + i + 1,
1033 (master->handler.copycount - i - 1)
1034 * sizeof(struct pollfd));
1035 master->handler.copycount--;
1036 }
1037 }
1038
1039 /**
1040 * Process cancellation requests.
1041 *
1042 * This may only be run from the pthread which owns the thread_master.
1043 *
1044 * @param master the thread master to process
1045 * @REQUIRE master->mtx
1046 */
1047 static void do_thread_cancel(struct thread_master *master)
1048 {
1049 struct thread_list *list = NULL;
1050 struct pqueue *queue = NULL;
1051 struct thread **thread_array = NULL;
1052 struct thread *thread;
1053
1054 struct cancel_req *cr;
1055 struct listnode *ln;
1056 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1057 /* If this is an event object cancellation, linear search
1058 * through event
1059 * list deleting any events which have the specified argument.
1060 * We also
1061 * need to check every thread in the ready queue. */
1062 if (cr->eventobj) {
1063 struct thread *t;
1064 thread = master->event.head;
1065
1066 while (thread) {
1067 t = thread;
1068 thread = t->next;
1069
1070 if (t->arg == cr->eventobj) {
1071 thread_list_delete(&master->event, t);
1072 if (t->ref)
1073 *t->ref = NULL;
1074 thread_add_unuse(master, t);
1075 }
1076 }
1077
1078 thread = master->ready.head;
1079 while (thread) {
1080 t = thread;
1081 thread = t->next;
1082
1083 if (t->arg == cr->eventobj) {
1084 thread_list_delete(&master->ready, t);
1085 if (t->ref)
1086 *t->ref = NULL;
1087 thread_add_unuse(master, t);
1088 }
1089 }
1090 continue;
1091 }
1092
1093 /* The pointer varies depending on whether the cancellation
1094 * request was
1095 * made asynchronously or not. If it was, we need to check
1096 * whether the
1097 * thread even exists anymore before cancelling it. */
1098 thread = (cr->thread) ? cr->thread : *cr->threadref;
1099
1100 if (!thread)
1101 continue;
1102
1103 /* Determine the appropriate queue to cancel the thread from */
1104 switch (thread->type) {
1105 case THREAD_READ:
1106 thread_cancel_rw(master, thread->u.fd, POLLIN);
1107 thread_array = master->read;
1108 break;
1109 case THREAD_WRITE:
1110 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1111 thread_array = master->write;
1112 break;
1113 case THREAD_TIMER:
1114 queue = master->timer;
1115 break;
1116 case THREAD_EVENT:
1117 list = &master->event;
1118 break;
1119 case THREAD_READY:
1120 list = &master->ready;
1121 break;
1122 default:
1123 continue;
1124 break;
1125 }
1126
1127 if (queue) {
1128 assert(thread->index >= 0);
1129 assert(thread == queue->array[thread->index]);
1130 pqueue_remove_at(thread->index, queue);
1131 } else if (list) {
1132 thread_list_delete(list, thread);
1133 } else if (thread_array) {
1134 thread_array[thread->u.fd] = NULL;
1135 } else {
1136 assert(!"Thread should be either in queue or list or array!");
1137 }
1138
1139 if (thread->ref)
1140 *thread->ref = NULL;
1141
1142 thread_add_unuse(thread->master, thread);
1143 }
1144
1145 /* Delete and free all cancellation requests */
1146 list_delete_all_node(master->cancel_req);
1147
1148 /* Wake up any threads which may be blocked in thread_cancel_async() */
1149 master->canceled = true;
1150 pthread_cond_broadcast(&master->cancel_cond);
1151 }
1152
1153 /**
1154 * Cancel any events which have the specified argument.
1155 *
1156 * MT-Unsafe
1157 *
1158 * @param m the thread_master to cancel from
1159 * @param arg the argument passed when creating the event
1160 */
1161 void thread_cancel_event(struct thread_master *master, void *arg)
1162 {
1163 assert(master->owner == pthread_self());
1164
1165 pthread_mutex_lock(&master->mtx);
1166 {
1167 struct cancel_req *cr =
1168 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1169 cr->eventobj = arg;
1170 listnode_add(master->cancel_req, cr);
1171 do_thread_cancel(master);
1172 }
1173 pthread_mutex_unlock(&master->mtx);
1174 }
1175
1176 /**
1177 * Cancel a specific task.
1178 *
1179 * MT-Unsafe
1180 *
1181 * @param thread task to cancel
1182 */
1183 void thread_cancel(struct thread *thread)
1184 {
1185 assert(thread->master->owner == pthread_self());
1186
1187 pthread_mutex_lock(&thread->master->mtx);
1188 {
1189 struct cancel_req *cr =
1190 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1191 cr->thread = thread;
1192 listnode_add(thread->master->cancel_req, cr);
1193 do_thread_cancel(thread->master);
1194 }
1195 pthread_mutex_unlock(&thread->master->mtx);
1196 }
1197
1198 /**
1199 * Asynchronous cancellation.
1200 *
1201 * Called with either a struct thread ** or void * to an event argument,
1202 * this function posts the correct cancellation request and blocks until it is
1203 * serviced.
1204 *
1205 * If the thread is currently running, execution blocks until it completes.
1206 *
1207 * The last two parameters are mutually exclusive, i.e. if you pass one the
1208 * other must be NULL.
1209 *
1210 * When the cancellation procedure executes on the target thread_master, the
1211 * thread * provided is checked for nullity. If it is null, the thread is
1212 * assumed to no longer exist and the cancellation request is a no-op. Thus
1213 * users of this API must pass a back-reference when scheduling the original
1214 * task.
1215 *
1216 * MT-Safe
1217 *
1218 * @param master the thread master with the relevant event / task
1219 * @param thread pointer to thread to cancel
1220 * @param eventobj the event
1221 */
1222 void thread_cancel_async(struct thread_master *master, struct thread **thread,
1223 void *eventobj)
1224 {
1225 assert(!(thread && eventobj) && (thread || eventobj));
1226 assert(master->owner != pthread_self());
1227
1228 pthread_mutex_lock(&master->mtx);
1229 {
1230 master->canceled = false;
1231
1232 if (thread) {
1233 struct cancel_req *cr =
1234 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1235 cr->threadref = thread;
1236 listnode_add(master->cancel_req, cr);
1237 } else if (eventobj) {
1238 struct cancel_req *cr =
1239 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1240 cr->eventobj = eventobj;
1241 listnode_add(master->cancel_req, cr);
1242 }
1243 AWAKEN(master);
1244
1245 while (!master->canceled)
1246 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1247 }
1248 pthread_mutex_unlock(&master->mtx);
1249 }
1250 /* ------------------------------------------------------------------------- */
1251
1252 static struct timeval *thread_timer_wait(struct pqueue *queue,
1253 struct timeval *timer_val)
1254 {
1255 if (queue->size) {
1256 struct thread *next_timer = queue->array[0];
1257 monotime_until(&next_timer->u.sands, timer_val);
1258 return timer_val;
1259 }
1260 return NULL;
1261 }
1262
1263 static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1264 struct thread *fetch)
1265 {
1266 *fetch = *thread;
1267 thread_add_unuse(m, thread);
1268 return fetch;
1269 }
1270
1271 static int thread_process_io_helper(struct thread_master *m,
1272 struct thread *thread, short state, int pos)
1273 {
1274 struct thread **thread_array;
1275
1276 if (!thread)
1277 return 0;
1278
1279 if (thread->type == THREAD_READ)
1280 thread_array = m->read;
1281 else
1282 thread_array = m->write;
1283
1284 thread_array[thread->u.fd] = NULL;
1285 thread_list_add(&m->ready, thread);
1286 thread->type = THREAD_READY;
1287 /* if another pthread scheduled this file descriptor for the event we're
1288 * responding to, no problem; we're getting to it now */
1289 thread->master->handler.pfds[pos].events &= ~(state);
1290 return 1;
1291 }
1292
1293 /**
1294 * Process I/O events.
1295 *
1296 * Walks through file descriptor array looking for those pollfds whose .revents
1297 * field has something interesting. Deletes any invalid file descriptors.
1298 *
1299 * @param m the thread master
1300 * @param num the number of active file descriptors (return value of poll())
1301 */
1302 static void thread_process_io(struct thread_master *m, unsigned int num)
1303 {
1304 unsigned int ready = 0;
1305 struct pollfd *pfds = m->handler.copy;
1306
1307 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1308 /* no event for current fd? immediately continue */
1309 if (pfds[i].revents == 0)
1310 continue;
1311
1312 ready++;
1313
1314 /* Unless someone has called thread_cancel from another pthread,
1315 * the only
1316 * thing that could have changed in m->handler.pfds while we
1317 * were
1318 * asleep is the .events field in a given pollfd. Barring
1319 * thread_cancel()
1320 * that value should be a superset of the values we have in our
1321 * copy, so
1322 * there's no need to update it. Similarily, barring deletion,
1323 * the fd
1324 * should still be a valid index into the master's pfds. */
1325 if (pfds[i].revents & (POLLIN | POLLHUP))
1326 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
1327 i);
1328 if (pfds[i].revents & POLLOUT)
1329 thread_process_io_helper(m, m->write[pfds[i].fd],
1330 POLLOUT, i);
1331
1332 /* if one of our file descriptors is garbage, remove the same
1333 * from
1334 * both pfds + update sizes and index */
1335 if (pfds[i].revents & POLLNVAL) {
1336 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1337 (m->handler.pfdcount - i - 1)
1338 * sizeof(struct pollfd));
1339 m->handler.pfdcount--;
1340
1341 memmove(pfds + i, pfds + i + 1,
1342 (m->handler.copycount - i - 1)
1343 * sizeof(struct pollfd));
1344 m->handler.copycount--;
1345
1346 i--;
1347 }
1348 }
1349 }
1350
1351 /* Add all timers that have popped to the ready list. */
1352 static unsigned int thread_process_timers(struct pqueue *queue,
1353 struct timeval *timenow)
1354 {
1355 struct thread *thread;
1356 unsigned int ready = 0;
1357
1358 while (queue->size) {
1359 thread = queue->array[0];
1360 if (timercmp(timenow, &thread->u.sands, <))
1361 return ready;
1362 pqueue_dequeue(queue);
1363 thread->type = THREAD_READY;
1364 thread_list_add(&thread->master->ready, thread);
1365 ready++;
1366 }
1367 return ready;
1368 }
1369
1370 /* process a list en masse, e.g. for event thread lists */
1371 static unsigned int thread_process(struct thread_list *list)
1372 {
1373 struct thread *thread;
1374 struct thread *next;
1375 unsigned int ready = 0;
1376
1377 for (thread = list->head; thread; thread = next) {
1378 next = thread->next;
1379 thread_list_delete(list, thread);
1380 thread->type = THREAD_READY;
1381 thread_list_add(&thread->master->ready, thread);
1382 ready++;
1383 }
1384 return ready;
1385 }
1386
1387
1388 /* Fetch next ready thread. */
1389 struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
1390 {
1391 struct thread *thread = NULL;
1392 struct timeval now;
1393 struct timeval zerotime = {0, 0};
1394 struct timeval tv;
1395 struct timeval *tw = NULL;
1396
1397 int num = 0;
1398
1399 do {
1400 /* Handle signals if any */
1401 if (m->handle_signals)
1402 quagga_sigevent_process();
1403
1404 pthread_mutex_lock(&m->mtx);
1405
1406 /* Process any pending cancellation requests */
1407 do_thread_cancel(m);
1408
1409 /*
1410 * Attempt to flush ready queue before going into poll().
1411 * This is performance-critical. Think twice before modifying.
1412 */
1413 if ((thread = thread_trim_head(&m->ready))) {
1414 fetch = thread_run(m, thread, fetch);
1415 if (fetch->ref)
1416 *fetch->ref = NULL;
1417 pthread_mutex_unlock(&m->mtx);
1418 break;
1419 }
1420
1421 /* otherwise, tick through scheduling sequence */
1422
1423 /*
1424 * Post events to ready queue. This must come before the
1425 * following block since events should occur immediately
1426 */
1427 thread_process(&m->event);
1428
1429 /*
1430 * If there are no tasks on the ready queue, we will poll()
1431 * until a timer expires or we receive I/O, whichever comes
1432 * first. The strategy for doing this is:
1433 *
1434 * - If there are events pending, set the poll() timeout to zero
1435 * - If there are no events pending, but there are timers
1436 * pending, set the
1437 * timeout to the smallest remaining time on any timer
1438 * - If there are neither timers nor events pending, but there
1439 * are file
1440 * descriptors pending, block indefinitely in poll()
1441 * - If nothing is pending, it's time for the application to die
1442 *
1443 * In every case except the last, we need to hit poll() at least
1444 * once per loop to avoid starvation by events
1445 */
1446 if (m->ready.count == 0)
1447 tw = thread_timer_wait(m->timer, &tv);
1448
1449 if (m->ready.count != 0 || (tw && !timercmp(tw, &zerotime, >)))
1450 tw = &zerotime;
1451
1452 if (!tw && m->handler.pfdcount == 0) { /* die */
1453 pthread_mutex_unlock(&m->mtx);
1454 fetch = NULL;
1455 break;
1456 }
1457
1458 /*
1459 * Copy pollfd array + # active pollfds in it. Not necessary to
1460 * copy the array size as this is fixed.
1461 */
1462 m->handler.copycount = m->handler.pfdcount;
1463 memcpy(m->handler.copy, m->handler.pfds,
1464 m->handler.copycount * sizeof(struct pollfd));
1465
1466 pthread_mutex_unlock(&m->mtx);
1467 {
1468 num = fd_poll(m, m->handler.copy, m->handler.pfdsize,
1469 m->handler.copycount, tw);
1470 }
1471 pthread_mutex_lock(&m->mtx);
1472
1473 /* Handle any errors received in poll() */
1474 if (num < 0) {
1475 if (errno == EINTR) {
1476 pthread_mutex_unlock(&m->mtx);
1477 /* loop around to signal handler */
1478 continue;
1479 }
1480
1481 /* else die */
1482 zlog_warn("poll() error: %s", safe_strerror(errno));
1483 pthread_mutex_unlock(&m->mtx);
1484 fetch = NULL;
1485 break;
1486 }
1487
1488 /* Post timers to ready queue. */
1489 monotime(&now);
1490 thread_process_timers(m->timer, &now);
1491
1492 /* Post I/O to ready queue. */
1493 if (num > 0)
1494 thread_process_io(m, num);
1495
1496 pthread_mutex_unlock(&m->mtx);
1497
1498 } while (!thread && m->spin);
1499
1500 return fetch;
1501 }
1502
1503 static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
1504 {
1505 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1506 + (a.tv_usec - b.tv_usec));
1507 }
1508
1509 unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1510 unsigned long *cputime)
1511 {
1512 /* This is 'user + sys' time. */
1513 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1514 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1515 return timeval_elapsed(now->real, start->real);
1516 }
1517
1518 /* We should aim to yield after yield milliseconds, which defaults
1519 to THREAD_YIELD_TIME_SLOT .
1520 Note: we are using real (wall clock) time for this calculation.
1521 It could be argued that CPU time may make more sense in certain
1522 contexts. The things to consider are whether the thread may have
1523 blocked (in which case wall time increases, but CPU time does not),
1524 or whether the system is heavily loaded with other processes competing
1525 for CPU time. On balance, wall clock time seems to make sense.
1526 Plus it has the added benefit that gettimeofday should be faster
1527 than calling getrusage. */
1528 int thread_should_yield(struct thread *thread)
1529 {
1530 int result;
1531 pthread_mutex_lock(&thread->mtx);
1532 {
1533 result = monotime_since(&thread->real, NULL)
1534 > (int64_t)thread->yield;
1535 }
1536 pthread_mutex_unlock(&thread->mtx);
1537 return result;
1538 }
1539
1540 void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
1541 {
1542 pthread_mutex_lock(&thread->mtx);
1543 {
1544 thread->yield = yield_time;
1545 }
1546 pthread_mutex_unlock(&thread->mtx);
1547 }
1548
1549 void thread_getrusage(RUSAGE_T *r)
1550 {
1551 monotime(&r->real);
1552 getrusage(RUSAGE_SELF, &(r->cpu));
1553 }
1554
1555 /*
1556 * Call a thread.
1557 *
1558 * This function will atomically update the thread's usage history. At present
1559 * this is the only spot where usage history is written. Nevertheless the code
1560 * has been written such that the introduction of writers in the future should
1561 * not need to update it provided the writers atomically perform only the
1562 * operations done here, i.e. updating the total and maximum times. In
1563 * particular, the maximum real and cpu times must be monotonically increasing
1564 * or this code is not correct.
1565 */
1566 void thread_call(struct thread *thread)
1567 {
1568 _Atomic unsigned long realtime, cputime;
1569 unsigned long exp;
1570 unsigned long helper;
1571 RUSAGE_T before, after;
1572
1573 GETRUSAGE(&before);
1574 thread->real = before.real;
1575
1576 pthread_setspecific(thread_current, thread);
1577 (*thread->func)(thread);
1578 pthread_setspecific(thread_current, NULL);
1579
1580 GETRUSAGE(&after);
1581
1582 realtime = thread_consumed_time(&after, &before, &helper);
1583 cputime = helper;
1584
1585 /* update realtime */
1586 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1587 memory_order_seq_cst);
1588 exp = atomic_load_explicit(&thread->hist->real.max,
1589 memory_order_seq_cst);
1590 while (exp < realtime
1591 && !atomic_compare_exchange_weak_explicit(
1592 &thread->hist->real.max, &exp, realtime,
1593 memory_order_seq_cst, memory_order_seq_cst))
1594 ;
1595
1596 /* update cputime */
1597 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1598 memory_order_seq_cst);
1599 exp = atomic_load_explicit(&thread->hist->cpu.max,
1600 memory_order_seq_cst);
1601 while (exp < cputime
1602 && !atomic_compare_exchange_weak_explicit(
1603 &thread->hist->cpu.max, &exp, cputime,
1604 memory_order_seq_cst, memory_order_seq_cst))
1605 ;
1606
1607 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1608 memory_order_seq_cst);
1609 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1610 memory_order_seq_cst);
1611
1612 #ifdef CONSUMED_TIME_CHECK
1613 if (realtime > CONSUMED_TIME_CHECK) {
1614 /*
1615 * We have a CPU Hog on our hands.
1616 * Whinge about it now, so we're aware this is yet another task
1617 * to fix.
1618 */
1619 zlog_warn(
1620 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1621 thread->funcname, (unsigned long)thread->func,
1622 realtime / 1000, cputime / 1000);
1623 }
1624 #endif /* CONSUMED_TIME_CHECK */
1625 }
1626
1627 /* Execute thread */
1628 void funcname_thread_execute(struct thread_master *m,
1629 int (*func)(struct thread *), void *arg, int val,
1630 debugargdef)
1631 {
1632 struct cpu_thread_history tmp;
1633 struct thread dummy;
1634
1635 memset(&dummy, 0, sizeof(struct thread));
1636
1637 pthread_mutex_init(&dummy.mtx, NULL);
1638 dummy.type = THREAD_EVENT;
1639 dummy.add_type = THREAD_EXECUTE;
1640 dummy.master = NULL;
1641 dummy.arg = arg;
1642 dummy.u.val = val;
1643
1644 tmp.func = dummy.func = func;
1645 tmp.funcname = dummy.funcname = funcname;
1646 dummy.hist = hash_get(m->cpu_record, &tmp,
1647 (void *(*)(void *))cpu_record_hash_alloc);
1648
1649 dummy.schedfrom = schedfrom;
1650 dummy.schedfrom_line = fromln;
1651
1652 thread_call(&dummy);
1653 }