]> git.proxmox.com Git - mirror_frr.git/blob - lib/thread.c
zebra: Allow json output to give a bit more data
[mirror_frr.git] / lib / thread.c
1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
21 /* #define DEBUG */
22
23 #include <zebra.h>
24 #include <sys/resource.h>
25
26 #include "thread.h"
27 #include "memory.h"
28 #include "log.h"
29 #include "hash.h"
30 #include "pqueue.h"
31 #include "command.h"
32 #include "sigevent.h"
33 #include "network.h"
34 #include "jhash.h"
35 #include "frratomic.h"
36
37 DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
38 DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
39 DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
40 DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
41
42 #if defined(__APPLE__)
43 #include <mach/mach.h>
44 #include <mach/mach_time.h>
45 #endif
46
47 #define AWAKEN(m) \
48 do { \
49 static unsigned char wakebyte = 0x01; \
50 write(m->io_pipe[1], &wakebyte, 1); \
51 } while (0);
52
53 /* control variable for initializer */
54 pthread_once_t init_once = PTHREAD_ONCE_INIT;
55 pthread_key_t thread_current;
56
57 pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
58 static struct list *masters;
59
60 static void thread_free(struct thread_master *master, struct thread *thread);
61
62 /* CLI start ---------------------------------------------------------------- */
63 static unsigned int cpu_record_hash_key(struct cpu_thread_history *a)
64 {
65 int size = sizeof(a->func);
66
67 return jhash(&a->func, size, 0);
68 }
69
70 static int cpu_record_hash_cmp(const struct cpu_thread_history *a,
71 const struct cpu_thread_history *b)
72 {
73 return a->func == b->func;
74 }
75
76 static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
77 {
78 struct cpu_thread_history *new;
79 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
80 new->func = a->func;
81 new->funcname = a->funcname;
82 return new;
83 }
84
85 static void cpu_record_hash_free(void *a)
86 {
87 struct cpu_thread_history *hist = a;
88
89 XFREE(MTYPE_THREAD_STATS, hist);
90 }
91
92 static void vty_out_cpu_thread_history(struct vty *vty,
93 struct cpu_thread_history *a)
94 {
95 vty_out(vty, "%5d %10lu.%03lu %9u %8lu %9lu %8lu %9lu", a->total_active,
96 a->cpu.total / 1000, a->cpu.total % 1000, a->total_calls,
97 a->cpu.total / a->total_calls, a->cpu.max,
98 a->real.total / a->total_calls, a->real.max);
99 vty_out(vty, " %c%c%c%c%c %s\n",
100 a->types & (1 << THREAD_READ) ? 'R' : ' ',
101 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
102 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
103 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
104 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
105 }
106
107 static void cpu_record_hash_print(struct hash_backet *bucket, void *args[])
108 {
109 struct cpu_thread_history *totals = args[0];
110 struct cpu_thread_history copy;
111 struct vty *vty = args[1];
112 uint8_t *filter = args[2];
113
114 struct cpu_thread_history *a = bucket->data;
115
116 copy.total_active =
117 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
118 copy.total_calls =
119 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
120 copy.cpu.total =
121 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
122 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
123 copy.real.total =
124 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
125 copy.real.max =
126 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
127 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
128 copy.funcname = a->funcname;
129
130 if (!(copy.types & *filter))
131 return;
132
133 vty_out_cpu_thread_history(vty, &copy);
134 totals->total_active += copy.total_active;
135 totals->total_calls += copy.total_calls;
136 totals->real.total += copy.real.total;
137 if (totals->real.max < copy.real.max)
138 totals->real.max = copy.real.max;
139 totals->cpu.total += copy.cpu.total;
140 if (totals->cpu.max < copy.cpu.max)
141 totals->cpu.max = copy.cpu.max;
142 }
143
144 static void cpu_record_print(struct vty *vty, uint8_t filter)
145 {
146 struct cpu_thread_history tmp;
147 void *args[3] = {&tmp, vty, &filter};
148 struct thread_master *m;
149 struct listnode *ln;
150
151 memset(&tmp, 0, sizeof tmp);
152 tmp.funcname = "TOTAL";
153 tmp.types = filter;
154
155 pthread_mutex_lock(&masters_mtx);
156 {
157 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
158 const char *name = m->name ? m->name : "main";
159
160 char underline[strlen(name) + 1];
161 memset(underline, '-', sizeof(underline));
162 underline[sizeof(underline) - 1] = '\0';
163
164 vty_out(vty, "\n");
165 vty_out(vty, "Showing statistics for pthread %s\n",
166 name);
167 vty_out(vty, "-------------------------------%s\n",
168 underline);
169 vty_out(vty, "%21s %18s %18s\n", "",
170 "CPU (user+system):", "Real (wall-clock):");
171 vty_out(vty,
172 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
173 vty_out(vty, " Avg uSec Max uSecs");
174 vty_out(vty, " Type Thread\n");
175
176 if (m->cpu_record->count)
177 hash_iterate(
178 m->cpu_record,
179 (void (*)(struct hash_backet *,
180 void *))cpu_record_hash_print,
181 args);
182 else
183 vty_out(vty, "No data to display yet.\n");
184
185 vty_out(vty, "\n");
186 }
187 }
188 pthread_mutex_unlock(&masters_mtx);
189
190 vty_out(vty, "\n");
191 vty_out(vty, "Total thread statistics\n");
192 vty_out(vty, "-------------------------\n");
193 vty_out(vty, "%21s %18s %18s\n", "",
194 "CPU (user+system):", "Real (wall-clock):");
195 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
196 vty_out(vty, " Avg uSec Max uSecs");
197 vty_out(vty, " Type Thread\n");
198
199 if (tmp.total_calls > 0)
200 vty_out_cpu_thread_history(vty, &tmp);
201 }
202
203 static void cpu_record_hash_clear(struct hash_backet *bucket, void *args[])
204 {
205 uint8_t *filter = args[0];
206 struct hash *cpu_record = args[1];
207
208 struct cpu_thread_history *a = bucket->data;
209
210 if (!(a->types & *filter))
211 return;
212
213 hash_release(cpu_record, bucket->data);
214 }
215
216 static void cpu_record_clear(uint8_t filter)
217 {
218 uint8_t *tmp = &filter;
219 struct thread_master *m;
220 struct listnode *ln;
221
222 pthread_mutex_lock(&masters_mtx);
223 {
224 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
225 pthread_mutex_lock(&m->mtx);
226 {
227 void *args[2] = {tmp, m->cpu_record};
228 hash_iterate(
229 m->cpu_record,
230 (void (*)(struct hash_backet *,
231 void *))cpu_record_hash_clear,
232 args);
233 }
234 pthread_mutex_unlock(&m->mtx);
235 }
236 }
237 pthread_mutex_unlock(&masters_mtx);
238 }
239
240 static uint8_t parse_filter(const char *filterstr)
241 {
242 int i = 0;
243 int filter = 0;
244
245 while (filterstr[i] != '\0') {
246 switch (filterstr[i]) {
247 case 'r':
248 case 'R':
249 filter |= (1 << THREAD_READ);
250 break;
251 case 'w':
252 case 'W':
253 filter |= (1 << THREAD_WRITE);
254 break;
255 case 't':
256 case 'T':
257 filter |= (1 << THREAD_TIMER);
258 break;
259 case 'e':
260 case 'E':
261 filter |= (1 << THREAD_EVENT);
262 break;
263 case 'x':
264 case 'X':
265 filter |= (1 << THREAD_EXECUTE);
266 break;
267 default:
268 break;
269 }
270 ++i;
271 }
272 return filter;
273 }
274
275 DEFUN (show_thread_cpu,
276 show_thread_cpu_cmd,
277 "show thread cpu [FILTER]",
278 SHOW_STR
279 "Thread information\n"
280 "Thread CPU usage\n"
281 "Display filter (rwtexb)\n")
282 {
283 uint8_t filter = (uint8_t)-1U;
284 int idx = 0;
285
286 if (argv_find(argv, argc, "FILTER", &idx)) {
287 filter = parse_filter(argv[idx]->arg);
288 if (!filter) {
289 vty_out(vty,
290 "Invalid filter \"%s\" specified; must contain at least"
291 "one of 'RWTEXB'\n",
292 argv[idx]->arg);
293 return CMD_WARNING;
294 }
295 }
296
297 cpu_record_print(vty, filter);
298 return CMD_SUCCESS;
299 }
300
301 static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
302 {
303 const char *name = m->name ? m->name : "main";
304 char underline[strlen(name) + 1];
305 uint32_t i;
306
307 memset(underline, '-', sizeof(underline));
308 underline[sizeof(underline) - 1] = '\0';
309
310 vty_out(vty, "\nShowing poll FD's for %s\n", name);
311 vty_out(vty, "----------------------%s\n", underline);
312 vty_out(vty, "Count: %u\n", (uint32_t)m->handler.pfdcount);
313 for (i = 0; i < m->handler.pfdcount; i++)
314 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\n", i,
315 m->handler.pfds[i].fd,
316 m->handler.pfds[i].events,
317 m->handler.pfds[i].revents);
318 }
319
320 DEFUN (show_thread_poll,
321 show_thread_poll_cmd,
322 "show thread poll",
323 SHOW_STR
324 "Thread information\n"
325 "Show poll FD's and information\n")
326 {
327 struct listnode *node;
328 struct thread_master *m;
329
330 pthread_mutex_lock(&masters_mtx);
331 {
332 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
333 show_thread_poll_helper(vty, m);
334 }
335 }
336 pthread_mutex_unlock(&masters_mtx);
337
338 return CMD_SUCCESS;
339 }
340
341
342 DEFUN (clear_thread_cpu,
343 clear_thread_cpu_cmd,
344 "clear thread cpu [FILTER]",
345 "Clear stored data in all pthreads\n"
346 "Thread information\n"
347 "Thread CPU usage\n"
348 "Display filter (rwtexb)\n")
349 {
350 uint8_t filter = (uint8_t)-1U;
351 int idx = 0;
352
353 if (argv_find(argv, argc, "FILTER", &idx)) {
354 filter = parse_filter(argv[idx]->arg);
355 if (!filter) {
356 vty_out(vty,
357 "Invalid filter \"%s\" specified; must contain at least"
358 "one of 'RWTEXB'\n",
359 argv[idx]->arg);
360 return CMD_WARNING;
361 }
362 }
363
364 cpu_record_clear(filter);
365 return CMD_SUCCESS;
366 }
367
368 void thread_cmd_init(void)
369 {
370 install_element(VIEW_NODE, &show_thread_cpu_cmd);
371 install_element(VIEW_NODE, &show_thread_poll_cmd);
372 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
373 }
374 /* CLI end ------------------------------------------------------------------ */
375
376
377 static int thread_timer_cmp(void *a, void *b)
378 {
379 struct thread *thread_a = a;
380 struct thread *thread_b = b;
381
382 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, <))
383 return -1;
384 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, >))
385 return 1;
386 return 0;
387 }
388
389 static void thread_timer_update(void *node, int actual_position)
390 {
391 struct thread *thread = node;
392
393 thread->index = actual_position;
394 }
395
396 static void cancelreq_del(void *cr)
397 {
398 XFREE(MTYPE_TMP, cr);
399 }
400
401 /* initializer, only ever called once */
402 static void initializer()
403 {
404 pthread_key_create(&thread_current, NULL);
405 }
406
407 struct thread_master *thread_master_create(const char *name)
408 {
409 struct thread_master *rv;
410 struct rlimit limit;
411
412 pthread_once(&init_once, &initializer);
413
414 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
415 if (rv == NULL)
416 return NULL;
417
418 /* Initialize master mutex */
419 pthread_mutex_init(&rv->mtx, NULL);
420 pthread_cond_init(&rv->cancel_cond, NULL);
421
422 /* Set name */
423 rv->name = name ? XSTRDUP(MTYPE_THREAD_MASTER, name) : NULL;
424
425 /* Initialize I/O task data structures */
426 getrlimit(RLIMIT_NOFILE, &limit);
427 rv->fd_limit = (int)limit.rlim_cur;
428 rv->read = XCALLOC(MTYPE_THREAD_POLL,
429 sizeof(struct thread *) * rv->fd_limit);
430
431 rv->write = XCALLOC(MTYPE_THREAD_POLL,
432 sizeof(struct thread *) * rv->fd_limit);
433
434 rv->cpu_record = hash_create_size(
435 8, (unsigned int (*)(void *))cpu_record_hash_key,
436 (int (*)(const void *, const void *))cpu_record_hash_cmp,
437 "Thread Hash");
438
439
440 /* Initialize the timer queues */
441 rv->timer = pqueue_create();
442 rv->timer->cmp = thread_timer_cmp;
443 rv->timer->update = thread_timer_update;
444
445 /* Initialize thread_fetch() settings */
446 rv->spin = true;
447 rv->handle_signals = true;
448
449 /* Set pthread owner, should be updated by actual owner */
450 rv->owner = pthread_self();
451 rv->cancel_req = list_new();
452 rv->cancel_req->del = cancelreq_del;
453 rv->canceled = true;
454
455 /* Initialize pipe poker */
456 pipe(rv->io_pipe);
457 set_nonblocking(rv->io_pipe[0]);
458 set_nonblocking(rv->io_pipe[1]);
459
460 /* Initialize data structures for poll() */
461 rv->handler.pfdsize = rv->fd_limit;
462 rv->handler.pfdcount = 0;
463 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
464 sizeof(struct pollfd) * rv->handler.pfdsize);
465 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
466 sizeof(struct pollfd) * rv->handler.pfdsize);
467
468 /* add to list of threadmasters */
469 pthread_mutex_lock(&masters_mtx);
470 {
471 if (!masters)
472 masters = list_new();
473
474 listnode_add(masters, rv);
475 }
476 pthread_mutex_unlock(&masters_mtx);
477
478 return rv;
479 }
480
481 void thread_master_set_name(struct thread_master *master, const char *name)
482 {
483 pthread_mutex_lock(&master->mtx);
484 {
485 if (master->name)
486 XFREE(MTYPE_THREAD_MASTER, master->name);
487 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
488 }
489 pthread_mutex_unlock(&master->mtx);
490 }
491
492 /* Add a new thread to the list. */
493 static void thread_list_add(struct thread_list *list, struct thread *thread)
494 {
495 thread->next = NULL;
496 thread->prev = list->tail;
497 if (list->tail)
498 list->tail->next = thread;
499 else
500 list->head = thread;
501 list->tail = thread;
502 list->count++;
503 }
504
505 /* Delete a thread from the list. */
506 static struct thread *thread_list_delete(struct thread_list *list,
507 struct thread *thread)
508 {
509 if (thread->next)
510 thread->next->prev = thread->prev;
511 else
512 list->tail = thread->prev;
513 if (thread->prev)
514 thread->prev->next = thread->next;
515 else
516 list->head = thread->next;
517 thread->next = thread->prev = NULL;
518 list->count--;
519 return thread;
520 }
521
522 /* Thread list is empty or not. */
523 static int thread_empty(struct thread_list *list)
524 {
525 return list->head ? 0 : 1;
526 }
527
528 /* Delete top of the list and return it. */
529 static struct thread *thread_trim_head(struct thread_list *list)
530 {
531 if (!thread_empty(list))
532 return thread_list_delete(list, list->head);
533 return NULL;
534 }
535
536 #define THREAD_UNUSED_DEPTH 10
537
538 /* Move thread to unuse list. */
539 static void thread_add_unuse(struct thread_master *m, struct thread *thread)
540 {
541 pthread_mutex_t mtxc = thread->mtx;
542
543 assert(m != NULL && thread != NULL);
544 assert(thread->next == NULL);
545 assert(thread->prev == NULL);
546
547 thread->hist->total_active--;
548 memset(thread, 0, sizeof(struct thread));
549 thread->type = THREAD_UNUSED;
550
551 /* Restore the thread mutex context. */
552 thread->mtx = mtxc;
553
554 if (m->unuse.count < THREAD_UNUSED_DEPTH) {
555 thread_list_add(&m->unuse, thread);
556 return;
557 }
558
559 thread_free(m, thread);
560 }
561
562 /* Free all unused thread. */
563 static void thread_list_free(struct thread_master *m, struct thread_list *list)
564 {
565 struct thread *t;
566 struct thread *next;
567
568 for (t = list->head; t; t = next) {
569 next = t->next;
570 thread_free(m, t);
571 list->count--;
572 }
573 }
574
575 static void thread_array_free(struct thread_master *m,
576 struct thread **thread_array)
577 {
578 struct thread *t;
579 int index;
580
581 for (index = 0; index < m->fd_limit; ++index) {
582 t = thread_array[index];
583 if (t) {
584 thread_array[index] = NULL;
585 thread_free(m, t);
586 }
587 }
588 XFREE(MTYPE_THREAD_POLL, thread_array);
589 }
590
591 static void thread_queue_free(struct thread_master *m, struct pqueue *queue)
592 {
593 int i;
594
595 for (i = 0; i < queue->size; i++)
596 thread_free(m, queue->array[i]);
597
598 pqueue_delete(queue);
599 }
600
601 /*
602 * thread_master_free_unused
603 *
604 * As threads are finished with they are put on the
605 * unuse list for later reuse.
606 * If we are shutting down, Free up unused threads
607 * So we can see if we forget to shut anything off
608 */
609 void thread_master_free_unused(struct thread_master *m)
610 {
611 pthread_mutex_lock(&m->mtx);
612 {
613 struct thread *t;
614 while ((t = thread_trim_head(&m->unuse)) != NULL) {
615 thread_free(m, t);
616 }
617 }
618 pthread_mutex_unlock(&m->mtx);
619 }
620
621 /* Stop thread scheduler. */
622 void thread_master_free(struct thread_master *m)
623 {
624 pthread_mutex_lock(&masters_mtx);
625 {
626 listnode_delete(masters, m);
627 if (masters->count == 0) {
628 list_delete_and_null(&masters);
629 }
630 }
631 pthread_mutex_unlock(&masters_mtx);
632
633 thread_array_free(m, m->read);
634 thread_array_free(m, m->write);
635 thread_queue_free(m, m->timer);
636 thread_list_free(m, &m->event);
637 thread_list_free(m, &m->ready);
638 thread_list_free(m, &m->unuse);
639 pthread_mutex_destroy(&m->mtx);
640 pthread_cond_destroy(&m->cancel_cond);
641 close(m->io_pipe[0]);
642 close(m->io_pipe[1]);
643 list_delete_and_null(&m->cancel_req);
644 m->cancel_req = NULL;
645
646 hash_clean(m->cpu_record, cpu_record_hash_free);
647 hash_free(m->cpu_record);
648 m->cpu_record = NULL;
649
650 if (m->name)
651 XFREE(MTYPE_THREAD_MASTER, m->name);
652 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
653 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
654 XFREE(MTYPE_THREAD_MASTER, m);
655 }
656
657 /* Return remain time in second. */
658 unsigned long thread_timer_remain_second(struct thread *thread)
659 {
660 int64_t remain;
661
662 pthread_mutex_lock(&thread->mtx);
663 {
664 remain = monotime_until(&thread->u.sands, NULL) / 1000000LL;
665 }
666 pthread_mutex_unlock(&thread->mtx);
667
668 return remain < 0 ? 0 : remain;
669 }
670
671 #define debugargdef const char *funcname, const char *schedfrom, int fromln
672 #define debugargpass funcname, schedfrom, fromln
673
674 struct timeval thread_timer_remain(struct thread *thread)
675 {
676 struct timeval remain;
677 pthread_mutex_lock(&thread->mtx);
678 {
679 monotime_until(&thread->u.sands, &remain);
680 }
681 pthread_mutex_unlock(&thread->mtx);
682 return remain;
683 }
684
685 /* Get new thread. */
686 static struct thread *thread_get(struct thread_master *m, uint8_t type,
687 int (*func)(struct thread *), void *arg,
688 debugargdef)
689 {
690 struct thread *thread = thread_trim_head(&m->unuse);
691 struct cpu_thread_history tmp;
692
693 if (!thread) {
694 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
695 /* mutex only needs to be initialized at struct creation. */
696 pthread_mutex_init(&thread->mtx, NULL);
697 m->alloc++;
698 }
699
700 thread->type = type;
701 thread->add_type = type;
702 thread->master = m;
703 thread->arg = arg;
704 thread->index = -1;
705 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
706 thread->ref = NULL;
707
708 /*
709 * So if the passed in funcname is not what we have
710 * stored that means the thread->hist needs to be
711 * updated. We keep the last one around in unused
712 * under the assumption that we are probably
713 * going to immediately allocate the same
714 * type of thread.
715 * This hopefully saves us some serious
716 * hash_get lookups.
717 */
718 if (thread->funcname != funcname || thread->func != func) {
719 tmp.func = func;
720 tmp.funcname = funcname;
721 thread->hist =
722 hash_get(m->cpu_record, &tmp,
723 (void *(*)(void *))cpu_record_hash_alloc);
724 }
725 thread->hist->total_active++;
726 thread->func = func;
727 thread->funcname = funcname;
728 thread->schedfrom = schedfrom;
729 thread->schedfrom_line = fromln;
730
731 return thread;
732 }
733
734 static void thread_free(struct thread_master *master, struct thread *thread)
735 {
736 /* Update statistics. */
737 assert(master->alloc > 0);
738 master->alloc--;
739
740 /* Free allocated resources. */
741 pthread_mutex_destroy(&thread->mtx);
742 XFREE(MTYPE_THREAD, thread);
743 }
744
745 static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
746 nfds_t count, const struct timeval *timer_wait)
747 {
748 /* If timer_wait is null here, that means poll() should block
749 * indefinitely,
750 * unless the thread_master has overriden it by setting
751 * ->selectpoll_timeout.
752 * If the value is positive, it specifies the maximum number of
753 * milliseconds
754 * to wait. If the timeout is -1, it specifies that we should never wait
755 * and
756 * always return immediately even if no event is detected. If the value
757 * is
758 * zero, the behavior is default. */
759 int timeout = -1;
760
761 /* number of file descriptors with events */
762 int num;
763
764 if (timer_wait != NULL
765 && m->selectpoll_timeout == 0) // use the default value
766 timeout = (timer_wait->tv_sec * 1000)
767 + (timer_wait->tv_usec / 1000);
768 else if (m->selectpoll_timeout > 0) // use the user's timeout
769 timeout = m->selectpoll_timeout;
770 else if (m->selectpoll_timeout
771 < 0) // effect a poll (return immediately)
772 timeout = 0;
773
774 /* add poll pipe poker */
775 assert(count + 1 < pfdsize);
776 pfds[count].fd = m->io_pipe[0];
777 pfds[count].events = POLLIN;
778 pfds[count].revents = 0x00;
779
780 num = poll(pfds, count + 1, timeout);
781
782 unsigned char trash[64];
783 if (num > 0 && pfds[count].revents != 0 && num--)
784 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
785 ;
786
787 return num;
788 }
789
790 /* Add new read thread. */
791 struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
792 int (*func)(struct thread *),
793 void *arg, int fd,
794 struct thread **t_ptr,
795 debugargdef)
796 {
797 struct thread *thread = NULL;
798
799 assert(fd >= 0 && fd < m->fd_limit);
800 pthread_mutex_lock(&m->mtx);
801 {
802 if (t_ptr
803 && *t_ptr) // thread is already scheduled; don't reschedule
804 {
805 pthread_mutex_unlock(&m->mtx);
806 return NULL;
807 }
808
809 /* default to a new pollfd */
810 nfds_t queuepos = m->handler.pfdcount;
811
812 /* if we already have a pollfd for our file descriptor, find and
813 * use it */
814 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
815 if (m->handler.pfds[i].fd == fd) {
816 queuepos = i;
817 break;
818 }
819
820 /* make sure we have room for this fd + pipe poker fd */
821 assert(queuepos + 1 < m->handler.pfdsize);
822
823 thread = thread_get(m, dir, func, arg, debugargpass);
824
825 m->handler.pfds[queuepos].fd = fd;
826 m->handler.pfds[queuepos].events |=
827 (dir == THREAD_READ ? POLLIN : POLLOUT);
828
829 if (queuepos == m->handler.pfdcount)
830 m->handler.pfdcount++;
831
832 if (thread) {
833 pthread_mutex_lock(&thread->mtx);
834 {
835 thread->u.fd = fd;
836 if (dir == THREAD_READ)
837 m->read[thread->u.fd] = thread;
838 else
839 m->write[thread->u.fd] = thread;
840 }
841 pthread_mutex_unlock(&thread->mtx);
842
843 if (t_ptr) {
844 *t_ptr = thread;
845 thread->ref = t_ptr;
846 }
847 }
848
849 AWAKEN(m);
850 }
851 pthread_mutex_unlock(&m->mtx);
852
853 return thread;
854 }
855
856 static struct thread *
857 funcname_thread_add_timer_timeval(struct thread_master *m,
858 int (*func)(struct thread *), int type,
859 void *arg, struct timeval *time_relative,
860 struct thread **t_ptr, debugargdef)
861 {
862 struct thread *thread;
863 struct pqueue *queue;
864
865 assert(m != NULL);
866
867 assert(type == THREAD_TIMER);
868 assert(time_relative);
869
870 pthread_mutex_lock(&m->mtx);
871 {
872 if (t_ptr
873 && *t_ptr) // thread is already scheduled; don't reschedule
874 {
875 pthread_mutex_unlock(&m->mtx);
876 return NULL;
877 }
878
879 queue = m->timer;
880 thread = thread_get(m, type, func, arg, debugargpass);
881
882 pthread_mutex_lock(&thread->mtx);
883 {
884 monotime(&thread->u.sands);
885 timeradd(&thread->u.sands, time_relative,
886 &thread->u.sands);
887 pqueue_enqueue(thread, queue);
888 if (t_ptr) {
889 *t_ptr = thread;
890 thread->ref = t_ptr;
891 }
892 }
893 pthread_mutex_unlock(&thread->mtx);
894
895 AWAKEN(m);
896 }
897 pthread_mutex_unlock(&m->mtx);
898
899 return thread;
900 }
901
902
903 /* Add timer event thread. */
904 struct thread *funcname_thread_add_timer(struct thread_master *m,
905 int (*func)(struct thread *),
906 void *arg, long timer,
907 struct thread **t_ptr, debugargdef)
908 {
909 struct timeval trel;
910
911 assert(m != NULL);
912
913 trel.tv_sec = timer;
914 trel.tv_usec = 0;
915
916 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
917 &trel, t_ptr, debugargpass);
918 }
919
920 /* Add timer event thread with "millisecond" resolution */
921 struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
922 int (*func)(struct thread *),
923 void *arg, long timer,
924 struct thread **t_ptr,
925 debugargdef)
926 {
927 struct timeval trel;
928
929 assert(m != NULL);
930
931 trel.tv_sec = timer / 1000;
932 trel.tv_usec = 1000 * (timer % 1000);
933
934 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
935 &trel, t_ptr, debugargpass);
936 }
937
938 /* Add timer event thread with "millisecond" resolution */
939 struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
940 int (*func)(struct thread *),
941 void *arg, struct timeval *tv,
942 struct thread **t_ptr, debugargdef)
943 {
944 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
945 t_ptr, debugargpass);
946 }
947
948 /* Add simple event thread. */
949 struct thread *funcname_thread_add_event(struct thread_master *m,
950 int (*func)(struct thread *),
951 void *arg, int val,
952 struct thread **t_ptr, debugargdef)
953 {
954 struct thread *thread;
955
956 assert(m != NULL);
957
958 pthread_mutex_lock(&m->mtx);
959 {
960 if (t_ptr
961 && *t_ptr) // thread is already scheduled; don't reschedule
962 {
963 pthread_mutex_unlock(&m->mtx);
964 return NULL;
965 }
966
967 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
968 pthread_mutex_lock(&thread->mtx);
969 {
970 thread->u.val = val;
971 thread_list_add(&m->event, thread);
972 }
973 pthread_mutex_unlock(&thread->mtx);
974
975 if (t_ptr) {
976 *t_ptr = thread;
977 thread->ref = t_ptr;
978 }
979
980 AWAKEN(m);
981 }
982 pthread_mutex_unlock(&m->mtx);
983
984 return thread;
985 }
986
987 /* Thread cancellation ------------------------------------------------------ */
988
989 /**
990 * NOT's out the .events field of pollfd corresponding to the given file
991 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
992 *
993 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
994 * implementation for details.
995 *
996 * @param master
997 * @param fd
998 * @param state the event to cancel. One or more (OR'd together) of the
999 * following:
1000 * - POLLIN
1001 * - POLLOUT
1002 */
1003 static void thread_cancel_rw(struct thread_master *master, int fd, short state)
1004 {
1005 bool found = false;
1006
1007 /* Cancel POLLHUP too just in case some bozo set it */
1008 state |= POLLHUP;
1009
1010 /* find the index of corresponding pollfd */
1011 nfds_t i;
1012
1013 for (i = 0; i < master->handler.pfdcount; i++)
1014 if (master->handler.pfds[i].fd == fd) {
1015 found = true;
1016 break;
1017 }
1018
1019 if (!found) {
1020 zlog_debug(
1021 "[!] Received cancellation request for nonexistent rw job");
1022 zlog_debug("[!] threadmaster: %s | fd: %d",
1023 master->name ? master->name : "", fd);
1024 return;
1025 }
1026
1027 /* NOT out event. */
1028 master->handler.pfds[i].events &= ~(state);
1029
1030 /* If all events are canceled, delete / resize the pollfd array. */
1031 if (master->handler.pfds[i].events == 0) {
1032 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
1033 (master->handler.pfdcount - i - 1)
1034 * sizeof(struct pollfd));
1035 master->handler.pfdcount--;
1036 }
1037
1038 /* If we have the same pollfd in the copy, perform the same operations,
1039 * otherwise return. */
1040 if (i >= master->handler.copycount)
1041 return;
1042
1043 master->handler.copy[i].events &= ~(state);
1044
1045 if (master->handler.copy[i].events == 0) {
1046 memmove(master->handler.copy + i, master->handler.copy + i + 1,
1047 (master->handler.copycount - i - 1)
1048 * sizeof(struct pollfd));
1049 master->handler.copycount--;
1050 }
1051 }
1052
1053 /**
1054 * Process cancellation requests.
1055 *
1056 * This may only be run from the pthread which owns the thread_master.
1057 *
1058 * @param master the thread master to process
1059 * @REQUIRE master->mtx
1060 */
1061 static void do_thread_cancel(struct thread_master *master)
1062 {
1063 struct thread_list *list = NULL;
1064 struct pqueue *queue = NULL;
1065 struct thread **thread_array = NULL;
1066 struct thread *thread;
1067
1068 struct cancel_req *cr;
1069 struct listnode *ln;
1070 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1071 /* If this is an event object cancellation, linear search
1072 * through event
1073 * list deleting any events which have the specified argument.
1074 * We also
1075 * need to check every thread in the ready queue. */
1076 if (cr->eventobj) {
1077 struct thread *t;
1078 thread = master->event.head;
1079
1080 while (thread) {
1081 t = thread;
1082 thread = t->next;
1083
1084 if (t->arg == cr->eventobj) {
1085 thread_list_delete(&master->event, t);
1086 if (t->ref)
1087 *t->ref = NULL;
1088 thread_add_unuse(master, t);
1089 }
1090 }
1091
1092 thread = master->ready.head;
1093 while (thread) {
1094 t = thread;
1095 thread = t->next;
1096
1097 if (t->arg == cr->eventobj) {
1098 thread_list_delete(&master->ready, t);
1099 if (t->ref)
1100 *t->ref = NULL;
1101 thread_add_unuse(master, t);
1102 }
1103 }
1104 continue;
1105 }
1106
1107 /* The pointer varies depending on whether the cancellation
1108 * request was
1109 * made asynchronously or not. If it was, we need to check
1110 * whether the
1111 * thread even exists anymore before cancelling it. */
1112 thread = (cr->thread) ? cr->thread : *cr->threadref;
1113
1114 if (!thread)
1115 continue;
1116
1117 /* Determine the appropriate queue to cancel the thread from */
1118 switch (thread->type) {
1119 case THREAD_READ:
1120 thread_cancel_rw(master, thread->u.fd, POLLIN);
1121 thread_array = master->read;
1122 break;
1123 case THREAD_WRITE:
1124 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1125 thread_array = master->write;
1126 break;
1127 case THREAD_TIMER:
1128 queue = master->timer;
1129 break;
1130 case THREAD_EVENT:
1131 list = &master->event;
1132 break;
1133 case THREAD_READY:
1134 list = &master->ready;
1135 break;
1136 default:
1137 continue;
1138 break;
1139 }
1140
1141 if (queue) {
1142 assert(thread->index >= 0);
1143 assert(thread == queue->array[thread->index]);
1144 pqueue_remove_at(thread->index, queue);
1145 } else if (list) {
1146 thread_list_delete(list, thread);
1147 } else if (thread_array) {
1148 thread_array[thread->u.fd] = NULL;
1149 } else {
1150 assert(!"Thread should be either in queue or list or array!");
1151 }
1152
1153 if (thread->ref)
1154 *thread->ref = NULL;
1155
1156 thread_add_unuse(thread->master, thread);
1157 }
1158
1159 /* Delete and free all cancellation requests */
1160 list_delete_all_node(master->cancel_req);
1161
1162 /* Wake up any threads which may be blocked in thread_cancel_async() */
1163 master->canceled = true;
1164 pthread_cond_broadcast(&master->cancel_cond);
1165 }
1166
1167 /**
1168 * Cancel any events which have the specified argument.
1169 *
1170 * MT-Unsafe
1171 *
1172 * @param m the thread_master to cancel from
1173 * @param arg the argument passed when creating the event
1174 */
1175 void thread_cancel_event(struct thread_master *master, void *arg)
1176 {
1177 assert(master->owner == pthread_self());
1178
1179 pthread_mutex_lock(&master->mtx);
1180 {
1181 struct cancel_req *cr =
1182 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1183 cr->eventobj = arg;
1184 listnode_add(master->cancel_req, cr);
1185 do_thread_cancel(master);
1186 }
1187 pthread_mutex_unlock(&master->mtx);
1188 }
1189
1190 /**
1191 * Cancel a specific task.
1192 *
1193 * MT-Unsafe
1194 *
1195 * @param thread task to cancel
1196 */
1197 void thread_cancel(struct thread *thread)
1198 {
1199 struct thread_master *master = thread->master;
1200
1201 assert(master->owner == pthread_self());
1202
1203 pthread_mutex_lock(&master->mtx);
1204 {
1205 struct cancel_req *cr =
1206 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1207 cr->thread = thread;
1208 listnode_add(master->cancel_req, cr);
1209 do_thread_cancel(master);
1210 }
1211 pthread_mutex_unlock(&master->mtx);
1212 }
1213
1214 /**
1215 * Asynchronous cancellation.
1216 *
1217 * Called with either a struct thread ** or void * to an event argument,
1218 * this function posts the correct cancellation request and blocks until it is
1219 * serviced.
1220 *
1221 * If the thread is currently running, execution blocks until it completes.
1222 *
1223 * The last two parameters are mutually exclusive, i.e. if you pass one the
1224 * other must be NULL.
1225 *
1226 * When the cancellation procedure executes on the target thread_master, the
1227 * thread * provided is checked for nullity. If it is null, the thread is
1228 * assumed to no longer exist and the cancellation request is a no-op. Thus
1229 * users of this API must pass a back-reference when scheduling the original
1230 * task.
1231 *
1232 * MT-Safe
1233 *
1234 * @param master the thread master with the relevant event / task
1235 * @param thread pointer to thread to cancel
1236 * @param eventobj the event
1237 */
1238 void thread_cancel_async(struct thread_master *master, struct thread **thread,
1239 void *eventobj)
1240 {
1241 assert(!(thread && eventobj) && (thread || eventobj));
1242 assert(master->owner != pthread_self());
1243
1244 pthread_mutex_lock(&master->mtx);
1245 {
1246 master->canceled = false;
1247
1248 if (thread) {
1249 struct cancel_req *cr =
1250 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1251 cr->threadref = thread;
1252 listnode_add(master->cancel_req, cr);
1253 } else if (eventobj) {
1254 struct cancel_req *cr =
1255 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1256 cr->eventobj = eventobj;
1257 listnode_add(master->cancel_req, cr);
1258 }
1259 AWAKEN(master);
1260
1261 while (!master->canceled)
1262 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1263 }
1264 pthread_mutex_unlock(&master->mtx);
1265 }
1266 /* ------------------------------------------------------------------------- */
1267
1268 static struct timeval *thread_timer_wait(struct pqueue *queue,
1269 struct timeval *timer_val)
1270 {
1271 if (queue->size) {
1272 struct thread *next_timer = queue->array[0];
1273 monotime_until(&next_timer->u.sands, timer_val);
1274 return timer_val;
1275 }
1276 return NULL;
1277 }
1278
1279 static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1280 struct thread *fetch)
1281 {
1282 *fetch = *thread;
1283 thread_add_unuse(m, thread);
1284 return fetch;
1285 }
1286
1287 static int thread_process_io_helper(struct thread_master *m,
1288 struct thread *thread, short state, int pos)
1289 {
1290 struct thread **thread_array;
1291
1292 if (!thread)
1293 return 0;
1294
1295 if (thread->type == THREAD_READ)
1296 thread_array = m->read;
1297 else
1298 thread_array = m->write;
1299
1300 thread_array[thread->u.fd] = NULL;
1301 thread_list_add(&m->ready, thread);
1302 thread->type = THREAD_READY;
1303 /* if another pthread scheduled this file descriptor for the event we're
1304 * responding to, no problem; we're getting to it now */
1305 thread->master->handler.pfds[pos].events &= ~(state);
1306 return 1;
1307 }
1308
1309 /**
1310 * Process I/O events.
1311 *
1312 * Walks through file descriptor array looking for those pollfds whose .revents
1313 * field has something interesting. Deletes any invalid file descriptors.
1314 *
1315 * @param m the thread master
1316 * @param num the number of active file descriptors (return value of poll())
1317 */
1318 static void thread_process_io(struct thread_master *m, unsigned int num)
1319 {
1320 unsigned int ready = 0;
1321 struct pollfd *pfds = m->handler.copy;
1322
1323 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1324 /* no event for current fd? immediately continue */
1325 if (pfds[i].revents == 0)
1326 continue;
1327
1328 ready++;
1329
1330 /* Unless someone has called thread_cancel from another pthread,
1331 * the only
1332 * thing that could have changed in m->handler.pfds while we
1333 * were
1334 * asleep is the .events field in a given pollfd. Barring
1335 * thread_cancel()
1336 * that value should be a superset of the values we have in our
1337 * copy, so
1338 * there's no need to update it. Similarily, barring deletion,
1339 * the fd
1340 * should still be a valid index into the master's pfds. */
1341 if (pfds[i].revents & (POLLIN | POLLHUP))
1342 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
1343 i);
1344 if (pfds[i].revents & POLLOUT)
1345 thread_process_io_helper(m, m->write[pfds[i].fd],
1346 POLLOUT, i);
1347
1348 /* if one of our file descriptors is garbage, remove the same
1349 * from
1350 * both pfds + update sizes and index */
1351 if (pfds[i].revents & POLLNVAL) {
1352 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1353 (m->handler.pfdcount - i - 1)
1354 * sizeof(struct pollfd));
1355 m->handler.pfdcount--;
1356
1357 memmove(pfds + i, pfds + i + 1,
1358 (m->handler.copycount - i - 1)
1359 * sizeof(struct pollfd));
1360 m->handler.copycount--;
1361
1362 i--;
1363 }
1364 }
1365 }
1366
1367 /* Add all timers that have popped to the ready list. */
1368 static unsigned int thread_process_timers(struct pqueue *queue,
1369 struct timeval *timenow)
1370 {
1371 struct thread *thread;
1372 unsigned int ready = 0;
1373
1374 while (queue->size) {
1375 thread = queue->array[0];
1376 if (timercmp(timenow, &thread->u.sands, <))
1377 return ready;
1378 pqueue_dequeue(queue);
1379 thread->type = THREAD_READY;
1380 thread_list_add(&thread->master->ready, thread);
1381 ready++;
1382 }
1383 return ready;
1384 }
1385
1386 /* process a list en masse, e.g. for event thread lists */
1387 static unsigned int thread_process(struct thread_list *list)
1388 {
1389 struct thread *thread;
1390 struct thread *next;
1391 unsigned int ready = 0;
1392
1393 for (thread = list->head; thread; thread = next) {
1394 next = thread->next;
1395 thread_list_delete(list, thread);
1396 thread->type = THREAD_READY;
1397 thread_list_add(&thread->master->ready, thread);
1398 ready++;
1399 }
1400 return ready;
1401 }
1402
1403
1404 /* Fetch next ready thread. */
1405 struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
1406 {
1407 struct thread *thread = NULL;
1408 struct timeval now;
1409 struct timeval zerotime = {0, 0};
1410 struct timeval tv;
1411 struct timeval *tw = NULL;
1412
1413 int num = 0;
1414
1415 do {
1416 /* Handle signals if any */
1417 if (m->handle_signals)
1418 quagga_sigevent_process();
1419
1420 pthread_mutex_lock(&m->mtx);
1421
1422 /* Process any pending cancellation requests */
1423 do_thread_cancel(m);
1424
1425 /*
1426 * Attempt to flush ready queue before going into poll().
1427 * This is performance-critical. Think twice before modifying.
1428 */
1429 if ((thread = thread_trim_head(&m->ready))) {
1430 fetch = thread_run(m, thread, fetch);
1431 if (fetch->ref)
1432 *fetch->ref = NULL;
1433 pthread_mutex_unlock(&m->mtx);
1434 break;
1435 }
1436
1437 /* otherwise, tick through scheduling sequence */
1438
1439 /*
1440 * Post events to ready queue. This must come before the
1441 * following block since events should occur immediately
1442 */
1443 thread_process(&m->event);
1444
1445 /*
1446 * If there are no tasks on the ready queue, we will poll()
1447 * until a timer expires or we receive I/O, whichever comes
1448 * first. The strategy for doing this is:
1449 *
1450 * - If there are events pending, set the poll() timeout to zero
1451 * - If there are no events pending, but there are timers
1452 * pending, set the
1453 * timeout to the smallest remaining time on any timer
1454 * - If there are neither timers nor events pending, but there
1455 * are file
1456 * descriptors pending, block indefinitely in poll()
1457 * - If nothing is pending, it's time for the application to die
1458 *
1459 * In every case except the last, we need to hit poll() at least
1460 * once per loop to avoid starvation by events
1461 */
1462 if (m->ready.count == 0)
1463 tw = thread_timer_wait(m->timer, &tv);
1464
1465 if (m->ready.count != 0 || (tw && !timercmp(tw, &zerotime, >)))
1466 tw = &zerotime;
1467
1468 if (!tw && m->handler.pfdcount == 0) { /* die */
1469 pthread_mutex_unlock(&m->mtx);
1470 fetch = NULL;
1471 break;
1472 }
1473
1474 /*
1475 * Copy pollfd array + # active pollfds in it. Not necessary to
1476 * copy the array size as this is fixed.
1477 */
1478 m->handler.copycount = m->handler.pfdcount;
1479 memcpy(m->handler.copy, m->handler.pfds,
1480 m->handler.copycount * sizeof(struct pollfd));
1481
1482 pthread_mutex_unlock(&m->mtx);
1483 {
1484 num = fd_poll(m, m->handler.copy, m->handler.pfdsize,
1485 m->handler.copycount, tw);
1486 }
1487 pthread_mutex_lock(&m->mtx);
1488
1489 /* Handle any errors received in poll() */
1490 if (num < 0) {
1491 if (errno == EINTR) {
1492 pthread_mutex_unlock(&m->mtx);
1493 /* loop around to signal handler */
1494 continue;
1495 }
1496
1497 /* else die */
1498 zlog_warn("poll() error: %s", safe_strerror(errno));
1499 pthread_mutex_unlock(&m->mtx);
1500 fetch = NULL;
1501 break;
1502 }
1503
1504 /* Post timers to ready queue. */
1505 monotime(&now);
1506 thread_process_timers(m->timer, &now);
1507
1508 /* Post I/O to ready queue. */
1509 if (num > 0)
1510 thread_process_io(m, num);
1511
1512 pthread_mutex_unlock(&m->mtx);
1513
1514 } while (!thread && m->spin);
1515
1516 return fetch;
1517 }
1518
1519 static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
1520 {
1521 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1522 + (a.tv_usec - b.tv_usec));
1523 }
1524
1525 unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1526 unsigned long *cputime)
1527 {
1528 /* This is 'user + sys' time. */
1529 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1530 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1531 return timeval_elapsed(now->real, start->real);
1532 }
1533
1534 /* We should aim to yield after yield milliseconds, which defaults
1535 to THREAD_YIELD_TIME_SLOT .
1536 Note: we are using real (wall clock) time for this calculation.
1537 It could be argued that CPU time may make more sense in certain
1538 contexts. The things to consider are whether the thread may have
1539 blocked (in which case wall time increases, but CPU time does not),
1540 or whether the system is heavily loaded with other processes competing
1541 for CPU time. On balance, wall clock time seems to make sense.
1542 Plus it has the added benefit that gettimeofday should be faster
1543 than calling getrusage. */
1544 int thread_should_yield(struct thread *thread)
1545 {
1546 int result;
1547 pthread_mutex_lock(&thread->mtx);
1548 {
1549 result = monotime_since(&thread->real, NULL)
1550 > (int64_t)thread->yield;
1551 }
1552 pthread_mutex_unlock(&thread->mtx);
1553 return result;
1554 }
1555
1556 void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
1557 {
1558 pthread_mutex_lock(&thread->mtx);
1559 {
1560 thread->yield = yield_time;
1561 }
1562 pthread_mutex_unlock(&thread->mtx);
1563 }
1564
1565 void thread_getrusage(RUSAGE_T *r)
1566 {
1567 monotime(&r->real);
1568 getrusage(RUSAGE_SELF, &(r->cpu));
1569 }
1570
1571 /*
1572 * Call a thread.
1573 *
1574 * This function will atomically update the thread's usage history. At present
1575 * this is the only spot where usage history is written. Nevertheless the code
1576 * has been written such that the introduction of writers in the future should
1577 * not need to update it provided the writers atomically perform only the
1578 * operations done here, i.e. updating the total and maximum times. In
1579 * particular, the maximum real and cpu times must be monotonically increasing
1580 * or this code is not correct.
1581 */
1582 void thread_call(struct thread *thread)
1583 {
1584 _Atomic unsigned long realtime, cputime;
1585 unsigned long exp;
1586 unsigned long helper;
1587 RUSAGE_T before, after;
1588
1589 GETRUSAGE(&before);
1590 thread->real = before.real;
1591
1592 pthread_setspecific(thread_current, thread);
1593 (*thread->func)(thread);
1594 pthread_setspecific(thread_current, NULL);
1595
1596 GETRUSAGE(&after);
1597
1598 realtime = thread_consumed_time(&after, &before, &helper);
1599 cputime = helper;
1600
1601 /* update realtime */
1602 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1603 memory_order_seq_cst);
1604 exp = atomic_load_explicit(&thread->hist->real.max,
1605 memory_order_seq_cst);
1606 while (exp < realtime
1607 && !atomic_compare_exchange_weak_explicit(
1608 &thread->hist->real.max, &exp, realtime,
1609 memory_order_seq_cst, memory_order_seq_cst))
1610 ;
1611
1612 /* update cputime */
1613 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1614 memory_order_seq_cst);
1615 exp = atomic_load_explicit(&thread->hist->cpu.max,
1616 memory_order_seq_cst);
1617 while (exp < cputime
1618 && !atomic_compare_exchange_weak_explicit(
1619 &thread->hist->cpu.max, &exp, cputime,
1620 memory_order_seq_cst, memory_order_seq_cst))
1621 ;
1622
1623 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1624 memory_order_seq_cst);
1625 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1626 memory_order_seq_cst);
1627
1628 #ifdef CONSUMED_TIME_CHECK
1629 if (realtime > CONSUMED_TIME_CHECK) {
1630 /*
1631 * We have a CPU Hog on our hands.
1632 * Whinge about it now, so we're aware this is yet another task
1633 * to fix.
1634 */
1635 zlog_warn(
1636 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1637 thread->funcname, (unsigned long)thread->func,
1638 realtime / 1000, cputime / 1000);
1639 }
1640 #endif /* CONSUMED_TIME_CHECK */
1641 }
1642
1643 /* Execute thread */
1644 void funcname_thread_execute(struct thread_master *m,
1645 int (*func)(struct thread *), void *arg, int val,
1646 debugargdef)
1647 {
1648 struct thread *thread;
1649
1650 /* Get or allocate new thread to execute. */
1651 pthread_mutex_lock(&m->mtx);
1652 {
1653 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
1654
1655 /* Set its event value. */
1656 pthread_mutex_lock(&thread->mtx);
1657 {
1658 thread->add_type = THREAD_EXECUTE;
1659 thread->u.val = val;
1660 thread->ref = &thread;
1661 }
1662 pthread_mutex_unlock(&thread->mtx);
1663 }
1664 pthread_mutex_unlock(&m->mtx);
1665
1666 /* Execute thread doing all accounting. */
1667 thread_call(thread);
1668
1669 /* Give back or free thread. */
1670 thread_add_unuse(m, thread);
1671 }