]> git.proxmox.com Git - mirror_frr.git/blob - lib/thread.c
Merge branch 'master' of https://github.com/FRRouting/frr into community
[mirror_frr.git] / lib / thread.c
1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
21 /* #define DEBUG */
22
23 #include <zebra.h>
24 #include <sys/resource.h>
25
26 #include "thread.h"
27 #include "memory.h"
28 #include "log.h"
29 #include "hash.h"
30 #include "pqueue.h"
31 #include "command.h"
32 #include "sigevent.h"
33 #include "network.h"
34 #include "jhash.h"
35 #include "frratomic.h"
36 #include "lib_errors.h"
37
38 DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
39 DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
40 DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
41 DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
42
43 #if defined(__APPLE__)
44 #include <mach/mach.h>
45 #include <mach/mach_time.h>
46 #endif
47
48 #define AWAKEN(m) \
49 do { \
50 static unsigned char wakebyte = 0x01; \
51 write(m->io_pipe[1], &wakebyte, 1); \
52 } while (0);
53
54 /* control variable for initializer */
55 pthread_once_t init_once = PTHREAD_ONCE_INIT;
56 pthread_key_t thread_current;
57
58 pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
59 static struct list *masters;
60
61 static void thread_free(struct thread_master *master, struct thread *thread);
62
63 /* CLI start ---------------------------------------------------------------- */
64 static unsigned int cpu_record_hash_key(struct cpu_thread_history *a)
65 {
66 int size = sizeof(a->func);
67
68 return jhash(&a->func, size, 0);
69 }
70
71 static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
72 const struct cpu_thread_history *b)
73 {
74 return a->func == b->func;
75 }
76
77 static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
78 {
79 struct cpu_thread_history *new;
80 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
81 new->func = a->func;
82 new->funcname = a->funcname;
83 return new;
84 }
85
86 static void cpu_record_hash_free(void *a)
87 {
88 struct cpu_thread_history *hist = a;
89
90 XFREE(MTYPE_THREAD_STATS, hist);
91 }
92
93 static void vty_out_cpu_thread_history(struct vty *vty,
94 struct cpu_thread_history *a)
95 {
96 vty_out(vty, "%5d %10lu.%03lu %9u %8lu %9lu %8lu %9lu", a->total_active,
97 a->cpu.total / 1000, a->cpu.total % 1000, a->total_calls,
98 a->cpu.total / a->total_calls, a->cpu.max,
99 a->real.total / a->total_calls, a->real.max);
100 vty_out(vty, " %c%c%c%c%c %s\n",
101 a->types & (1 << THREAD_READ) ? 'R' : ' ',
102 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
103 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
104 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
105 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
106 }
107
108 static void cpu_record_hash_print(struct hash_backet *bucket, void *args[])
109 {
110 struct cpu_thread_history *totals = args[0];
111 struct cpu_thread_history copy;
112 struct vty *vty = args[1];
113 uint8_t *filter = args[2];
114
115 struct cpu_thread_history *a = bucket->data;
116
117 copy.total_active =
118 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
119 copy.total_calls =
120 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
121 copy.cpu.total =
122 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
123 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
124 copy.real.total =
125 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
126 copy.real.max =
127 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
128 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
129 copy.funcname = a->funcname;
130
131 if (!(copy.types & *filter))
132 return;
133
134 vty_out_cpu_thread_history(vty, &copy);
135 totals->total_active += copy.total_active;
136 totals->total_calls += copy.total_calls;
137 totals->real.total += copy.real.total;
138 if (totals->real.max < copy.real.max)
139 totals->real.max = copy.real.max;
140 totals->cpu.total += copy.cpu.total;
141 if (totals->cpu.max < copy.cpu.max)
142 totals->cpu.max = copy.cpu.max;
143 }
144
145 static void cpu_record_print(struct vty *vty, uint8_t filter)
146 {
147 struct cpu_thread_history tmp;
148 void *args[3] = {&tmp, vty, &filter};
149 struct thread_master *m;
150 struct listnode *ln;
151
152 memset(&tmp, 0, sizeof tmp);
153 tmp.funcname = "TOTAL";
154 tmp.types = filter;
155
156 pthread_mutex_lock(&masters_mtx);
157 {
158 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
159 const char *name = m->name ? m->name : "main";
160
161 char underline[strlen(name) + 1];
162 memset(underline, '-', sizeof(underline));
163 underline[sizeof(underline) - 1] = '\0';
164
165 vty_out(vty, "\n");
166 vty_out(vty, "Showing statistics for pthread %s\n",
167 name);
168 vty_out(vty, "-------------------------------%s\n",
169 underline);
170 vty_out(vty, "%21s %18s %18s\n", "",
171 "CPU (user+system):", "Real (wall-clock):");
172 vty_out(vty,
173 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
174 vty_out(vty, " Avg uSec Max uSecs");
175 vty_out(vty, " Type Thread\n");
176
177 if (m->cpu_record->count)
178 hash_iterate(
179 m->cpu_record,
180 (void (*)(struct hash_backet *,
181 void *))cpu_record_hash_print,
182 args);
183 else
184 vty_out(vty, "No data to display yet.\n");
185
186 vty_out(vty, "\n");
187 }
188 }
189 pthread_mutex_unlock(&masters_mtx);
190
191 vty_out(vty, "\n");
192 vty_out(vty, "Total thread statistics\n");
193 vty_out(vty, "-------------------------\n");
194 vty_out(vty, "%21s %18s %18s\n", "",
195 "CPU (user+system):", "Real (wall-clock):");
196 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
197 vty_out(vty, " Avg uSec Max uSecs");
198 vty_out(vty, " Type Thread\n");
199
200 if (tmp.total_calls > 0)
201 vty_out_cpu_thread_history(vty, &tmp);
202 }
203
204 static void cpu_record_hash_clear(struct hash_backet *bucket, void *args[])
205 {
206 uint8_t *filter = args[0];
207 struct hash *cpu_record = args[1];
208
209 struct cpu_thread_history *a = bucket->data;
210
211 if (!(a->types & *filter))
212 return;
213
214 hash_release(cpu_record, bucket->data);
215 }
216
217 static void cpu_record_clear(uint8_t filter)
218 {
219 uint8_t *tmp = &filter;
220 struct thread_master *m;
221 struct listnode *ln;
222
223 pthread_mutex_lock(&masters_mtx);
224 {
225 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
226 pthread_mutex_lock(&m->mtx);
227 {
228 void *args[2] = {tmp, m->cpu_record};
229 hash_iterate(
230 m->cpu_record,
231 (void (*)(struct hash_backet *,
232 void *))cpu_record_hash_clear,
233 args);
234 }
235 pthread_mutex_unlock(&m->mtx);
236 }
237 }
238 pthread_mutex_unlock(&masters_mtx);
239 }
240
241 static uint8_t parse_filter(const char *filterstr)
242 {
243 int i = 0;
244 int filter = 0;
245
246 while (filterstr[i] != '\0') {
247 switch (filterstr[i]) {
248 case 'r':
249 case 'R':
250 filter |= (1 << THREAD_READ);
251 break;
252 case 'w':
253 case 'W':
254 filter |= (1 << THREAD_WRITE);
255 break;
256 case 't':
257 case 'T':
258 filter |= (1 << THREAD_TIMER);
259 break;
260 case 'e':
261 case 'E':
262 filter |= (1 << THREAD_EVENT);
263 break;
264 case 'x':
265 case 'X':
266 filter |= (1 << THREAD_EXECUTE);
267 break;
268 default:
269 break;
270 }
271 ++i;
272 }
273 return filter;
274 }
275
276 DEFUN (show_thread_cpu,
277 show_thread_cpu_cmd,
278 "show thread cpu [FILTER]",
279 SHOW_STR
280 "Thread information\n"
281 "Thread CPU usage\n"
282 "Display filter (rwtexb)\n")
283 {
284 uint8_t filter = (uint8_t)-1U;
285 int idx = 0;
286
287 if (argv_find(argv, argc, "FILTER", &idx)) {
288 filter = parse_filter(argv[idx]->arg);
289 if (!filter) {
290 vty_out(vty,
291 "Invalid filter \"%s\" specified; must contain at least"
292 "one of 'RWTEXB'\n",
293 argv[idx]->arg);
294 return CMD_WARNING;
295 }
296 }
297
298 cpu_record_print(vty, filter);
299 return CMD_SUCCESS;
300 }
301
302 static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
303 {
304 const char *name = m->name ? m->name : "main";
305 char underline[strlen(name) + 1];
306 uint32_t i;
307
308 memset(underline, '-', sizeof(underline));
309 underline[sizeof(underline) - 1] = '\0';
310
311 vty_out(vty, "\nShowing poll FD's for %s\n", name);
312 vty_out(vty, "----------------------%s\n", underline);
313 vty_out(vty, "Count: %u\n", (uint32_t)m->handler.pfdcount);
314 for (i = 0; i < m->handler.pfdcount; i++)
315 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\n", i,
316 m->handler.pfds[i].fd,
317 m->handler.pfds[i].events,
318 m->handler.pfds[i].revents);
319 }
320
321 DEFUN (show_thread_poll,
322 show_thread_poll_cmd,
323 "show thread poll",
324 SHOW_STR
325 "Thread information\n"
326 "Show poll FD's and information\n")
327 {
328 struct listnode *node;
329 struct thread_master *m;
330
331 pthread_mutex_lock(&masters_mtx);
332 {
333 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
334 show_thread_poll_helper(vty, m);
335 }
336 }
337 pthread_mutex_unlock(&masters_mtx);
338
339 return CMD_SUCCESS;
340 }
341
342
343 DEFUN (clear_thread_cpu,
344 clear_thread_cpu_cmd,
345 "clear thread cpu [FILTER]",
346 "Clear stored data in all pthreads\n"
347 "Thread information\n"
348 "Thread CPU usage\n"
349 "Display filter (rwtexb)\n")
350 {
351 uint8_t filter = (uint8_t)-1U;
352 int idx = 0;
353
354 if (argv_find(argv, argc, "FILTER", &idx)) {
355 filter = parse_filter(argv[idx]->arg);
356 if (!filter) {
357 vty_out(vty,
358 "Invalid filter \"%s\" specified; must contain at least"
359 "one of 'RWTEXB'\n",
360 argv[idx]->arg);
361 return CMD_WARNING;
362 }
363 }
364
365 cpu_record_clear(filter);
366 return CMD_SUCCESS;
367 }
368
369 void thread_cmd_init(void)
370 {
371 install_element(VIEW_NODE, &show_thread_cpu_cmd);
372 install_element(VIEW_NODE, &show_thread_poll_cmd);
373 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
374 }
375 /* CLI end ------------------------------------------------------------------ */
376
377
378 static int thread_timer_cmp(void *a, void *b)
379 {
380 struct thread *thread_a = a;
381 struct thread *thread_b = b;
382
383 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, <))
384 return -1;
385 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, >))
386 return 1;
387 return 0;
388 }
389
390 static void thread_timer_update(void *node, int actual_position)
391 {
392 struct thread *thread = node;
393
394 thread->index = actual_position;
395 }
396
397 static void cancelreq_del(void *cr)
398 {
399 XFREE(MTYPE_TMP, cr);
400 }
401
402 /* initializer, only ever called once */
403 static void initializer()
404 {
405 pthread_key_create(&thread_current, NULL);
406 }
407
408 struct thread_master *thread_master_create(const char *name)
409 {
410 struct thread_master *rv;
411 struct rlimit limit;
412
413 pthread_once(&init_once, &initializer);
414
415 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
416 if (rv == NULL)
417 return NULL;
418
419 /* Initialize master mutex */
420 pthread_mutex_init(&rv->mtx, NULL);
421 pthread_cond_init(&rv->cancel_cond, NULL);
422
423 /* Set name */
424 rv->name = name ? XSTRDUP(MTYPE_THREAD_MASTER, name) : NULL;
425
426 /* Initialize I/O task data structures */
427 getrlimit(RLIMIT_NOFILE, &limit);
428 rv->fd_limit = (int)limit.rlim_cur;
429 rv->read = XCALLOC(MTYPE_THREAD_POLL,
430 sizeof(struct thread *) * rv->fd_limit);
431
432 rv->write = XCALLOC(MTYPE_THREAD_POLL,
433 sizeof(struct thread *) * rv->fd_limit);
434
435 rv->cpu_record = hash_create_size(
436 8, (unsigned int (*)(void *))cpu_record_hash_key,
437 (bool (*)(const void *, const void *))cpu_record_hash_cmp,
438 "Thread Hash");
439
440
441 /* Initialize the timer queues */
442 rv->timer = pqueue_create();
443 rv->timer->cmp = thread_timer_cmp;
444 rv->timer->update = thread_timer_update;
445
446 /* Initialize thread_fetch() settings */
447 rv->spin = true;
448 rv->handle_signals = true;
449
450 /* Set pthread owner, should be updated by actual owner */
451 rv->owner = pthread_self();
452 rv->cancel_req = list_new();
453 rv->cancel_req->del = cancelreq_del;
454 rv->canceled = true;
455
456 /* Initialize pipe poker */
457 pipe(rv->io_pipe);
458 set_nonblocking(rv->io_pipe[0]);
459 set_nonblocking(rv->io_pipe[1]);
460
461 /* Initialize data structures for poll() */
462 rv->handler.pfdsize = rv->fd_limit;
463 rv->handler.pfdcount = 0;
464 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
465 sizeof(struct pollfd) * rv->handler.pfdsize);
466 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
467 sizeof(struct pollfd) * rv->handler.pfdsize);
468
469 /* add to list of threadmasters */
470 pthread_mutex_lock(&masters_mtx);
471 {
472 if (!masters)
473 masters = list_new();
474
475 listnode_add(masters, rv);
476 }
477 pthread_mutex_unlock(&masters_mtx);
478
479 return rv;
480 }
481
482 void thread_master_set_name(struct thread_master *master, const char *name)
483 {
484 pthread_mutex_lock(&master->mtx);
485 {
486 if (master->name)
487 XFREE(MTYPE_THREAD_MASTER, master->name);
488 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
489 }
490 pthread_mutex_unlock(&master->mtx);
491 }
492
493 /* Add a new thread to the list. */
494 static void thread_list_add(struct thread_list *list, struct thread *thread)
495 {
496 thread->next = NULL;
497 thread->prev = list->tail;
498 if (list->tail)
499 list->tail->next = thread;
500 else
501 list->head = thread;
502 list->tail = thread;
503 list->count++;
504 }
505
506 /* Delete a thread from the list. */
507 static struct thread *thread_list_delete(struct thread_list *list,
508 struct thread *thread)
509 {
510 if (thread->next)
511 thread->next->prev = thread->prev;
512 else
513 list->tail = thread->prev;
514 if (thread->prev)
515 thread->prev->next = thread->next;
516 else
517 list->head = thread->next;
518 thread->next = thread->prev = NULL;
519 list->count--;
520 return thread;
521 }
522
523 /* Thread list is empty or not. */
524 static int thread_empty(struct thread_list *list)
525 {
526 return list->head ? 0 : 1;
527 }
528
529 /* Delete top of the list and return it. */
530 static struct thread *thread_trim_head(struct thread_list *list)
531 {
532 if (!thread_empty(list))
533 return thread_list_delete(list, list->head);
534 return NULL;
535 }
536
537 #define THREAD_UNUSED_DEPTH 10
538
539 /* Move thread to unuse list. */
540 static void thread_add_unuse(struct thread_master *m, struct thread *thread)
541 {
542 pthread_mutex_t mtxc = thread->mtx;
543
544 assert(m != NULL && thread != NULL);
545 assert(thread->next == NULL);
546 assert(thread->prev == NULL);
547
548 thread->hist->total_active--;
549 memset(thread, 0, sizeof(struct thread));
550 thread->type = THREAD_UNUSED;
551
552 /* Restore the thread mutex context. */
553 thread->mtx = mtxc;
554
555 if (m->unuse.count < THREAD_UNUSED_DEPTH) {
556 thread_list_add(&m->unuse, thread);
557 return;
558 }
559
560 thread_free(m, thread);
561 }
562
563 /* Free all unused thread. */
564 static void thread_list_free(struct thread_master *m, struct thread_list *list)
565 {
566 struct thread *t;
567 struct thread *next;
568
569 for (t = list->head; t; t = next) {
570 next = t->next;
571 thread_free(m, t);
572 list->count--;
573 }
574 }
575
576 static void thread_array_free(struct thread_master *m,
577 struct thread **thread_array)
578 {
579 struct thread *t;
580 int index;
581
582 for (index = 0; index < m->fd_limit; ++index) {
583 t = thread_array[index];
584 if (t) {
585 thread_array[index] = NULL;
586 thread_free(m, t);
587 }
588 }
589 XFREE(MTYPE_THREAD_POLL, thread_array);
590 }
591
592 static void thread_queue_free(struct thread_master *m, struct pqueue *queue)
593 {
594 int i;
595
596 for (i = 0; i < queue->size; i++)
597 thread_free(m, queue->array[i]);
598
599 pqueue_delete(queue);
600 }
601
602 /*
603 * thread_master_free_unused
604 *
605 * As threads are finished with they are put on the
606 * unuse list for later reuse.
607 * If we are shutting down, Free up unused threads
608 * So we can see if we forget to shut anything off
609 */
610 void thread_master_free_unused(struct thread_master *m)
611 {
612 pthread_mutex_lock(&m->mtx);
613 {
614 struct thread *t;
615 while ((t = thread_trim_head(&m->unuse)) != NULL) {
616 thread_free(m, t);
617 }
618 }
619 pthread_mutex_unlock(&m->mtx);
620 }
621
622 /* Stop thread scheduler. */
623 void thread_master_free(struct thread_master *m)
624 {
625 pthread_mutex_lock(&masters_mtx);
626 {
627 listnode_delete(masters, m);
628 if (masters->count == 0) {
629 list_delete(&masters);
630 }
631 }
632 pthread_mutex_unlock(&masters_mtx);
633
634 thread_array_free(m, m->read);
635 thread_array_free(m, m->write);
636 thread_queue_free(m, m->timer);
637 thread_list_free(m, &m->event);
638 thread_list_free(m, &m->ready);
639 thread_list_free(m, &m->unuse);
640 pthread_mutex_destroy(&m->mtx);
641 pthread_cond_destroy(&m->cancel_cond);
642 close(m->io_pipe[0]);
643 close(m->io_pipe[1]);
644 list_delete(&m->cancel_req);
645 m->cancel_req = NULL;
646
647 hash_clean(m->cpu_record, cpu_record_hash_free);
648 hash_free(m->cpu_record);
649 m->cpu_record = NULL;
650
651 if (m->name)
652 XFREE(MTYPE_THREAD_MASTER, m->name);
653 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
654 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
655 XFREE(MTYPE_THREAD_MASTER, m);
656 }
657
658 /* Return remain time in second. */
659 unsigned long thread_timer_remain_second(struct thread *thread)
660 {
661 int64_t remain;
662
663 pthread_mutex_lock(&thread->mtx);
664 {
665 remain = monotime_until(&thread->u.sands, NULL) / 1000000LL;
666 }
667 pthread_mutex_unlock(&thread->mtx);
668
669 return remain < 0 ? 0 : remain;
670 }
671
672 #define debugargdef const char *funcname, const char *schedfrom, int fromln
673 #define debugargpass funcname, schedfrom, fromln
674
675 struct timeval thread_timer_remain(struct thread *thread)
676 {
677 struct timeval remain;
678 pthread_mutex_lock(&thread->mtx);
679 {
680 monotime_until(&thread->u.sands, &remain);
681 }
682 pthread_mutex_unlock(&thread->mtx);
683 return remain;
684 }
685
686 /* Get new thread. */
687 static struct thread *thread_get(struct thread_master *m, uint8_t type,
688 int (*func)(struct thread *), void *arg,
689 debugargdef)
690 {
691 struct thread *thread = thread_trim_head(&m->unuse);
692 struct cpu_thread_history tmp;
693
694 if (!thread) {
695 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
696 /* mutex only needs to be initialized at struct creation. */
697 pthread_mutex_init(&thread->mtx, NULL);
698 m->alloc++;
699 }
700
701 thread->type = type;
702 thread->add_type = type;
703 thread->master = m;
704 thread->arg = arg;
705 thread->index = -1;
706 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
707 thread->ref = NULL;
708
709 /*
710 * So if the passed in funcname is not what we have
711 * stored that means the thread->hist needs to be
712 * updated. We keep the last one around in unused
713 * under the assumption that we are probably
714 * going to immediately allocate the same
715 * type of thread.
716 * This hopefully saves us some serious
717 * hash_get lookups.
718 */
719 if (thread->funcname != funcname || thread->func != func) {
720 tmp.func = func;
721 tmp.funcname = funcname;
722 thread->hist =
723 hash_get(m->cpu_record, &tmp,
724 (void *(*)(void *))cpu_record_hash_alloc);
725 }
726 thread->hist->total_active++;
727 thread->func = func;
728 thread->funcname = funcname;
729 thread->schedfrom = schedfrom;
730 thread->schedfrom_line = fromln;
731
732 return thread;
733 }
734
735 static void thread_free(struct thread_master *master, struct thread *thread)
736 {
737 /* Update statistics. */
738 assert(master->alloc > 0);
739 master->alloc--;
740
741 /* Free allocated resources. */
742 pthread_mutex_destroy(&thread->mtx);
743 XFREE(MTYPE_THREAD, thread);
744 }
745
746 static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
747 nfds_t count, const struct timeval *timer_wait)
748 {
749 /* If timer_wait is null here, that means poll() should block
750 * indefinitely,
751 * unless the thread_master has overriden it by setting
752 * ->selectpoll_timeout.
753 * If the value is positive, it specifies the maximum number of
754 * milliseconds
755 * to wait. If the timeout is -1, it specifies that we should never wait
756 * and
757 * always return immediately even if no event is detected. If the value
758 * is
759 * zero, the behavior is default. */
760 int timeout = -1;
761
762 /* number of file descriptors with events */
763 int num;
764
765 if (timer_wait != NULL
766 && m->selectpoll_timeout == 0) // use the default value
767 timeout = (timer_wait->tv_sec * 1000)
768 + (timer_wait->tv_usec / 1000);
769 else if (m->selectpoll_timeout > 0) // use the user's timeout
770 timeout = m->selectpoll_timeout;
771 else if (m->selectpoll_timeout
772 < 0) // effect a poll (return immediately)
773 timeout = 0;
774
775 /* add poll pipe poker */
776 assert(count + 1 < pfdsize);
777 pfds[count].fd = m->io_pipe[0];
778 pfds[count].events = POLLIN;
779 pfds[count].revents = 0x00;
780
781 num = poll(pfds, count + 1, timeout);
782
783 unsigned char trash[64];
784 if (num > 0 && pfds[count].revents != 0 && num--)
785 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
786 ;
787
788 return num;
789 }
790
791 /* Add new read thread. */
792 struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
793 int (*func)(struct thread *),
794 void *arg, int fd,
795 struct thread **t_ptr,
796 debugargdef)
797 {
798 struct thread *thread = NULL;
799
800 assert(fd >= 0 && fd < m->fd_limit);
801 pthread_mutex_lock(&m->mtx);
802 {
803 if (t_ptr
804 && *t_ptr) // thread is already scheduled; don't reschedule
805 {
806 pthread_mutex_unlock(&m->mtx);
807 return NULL;
808 }
809
810 /* default to a new pollfd */
811 nfds_t queuepos = m->handler.pfdcount;
812
813 /* if we already have a pollfd for our file descriptor, find and
814 * use it */
815 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
816 if (m->handler.pfds[i].fd == fd) {
817 queuepos = i;
818 break;
819 }
820
821 /* make sure we have room for this fd + pipe poker fd */
822 assert(queuepos + 1 < m->handler.pfdsize);
823
824 thread = thread_get(m, dir, func, arg, debugargpass);
825
826 m->handler.pfds[queuepos].fd = fd;
827 m->handler.pfds[queuepos].events |=
828 (dir == THREAD_READ ? POLLIN : POLLOUT);
829
830 if (queuepos == m->handler.pfdcount)
831 m->handler.pfdcount++;
832
833 if (thread) {
834 pthread_mutex_lock(&thread->mtx);
835 {
836 thread->u.fd = fd;
837 if (dir == THREAD_READ)
838 m->read[thread->u.fd] = thread;
839 else
840 m->write[thread->u.fd] = thread;
841 }
842 pthread_mutex_unlock(&thread->mtx);
843
844 if (t_ptr) {
845 *t_ptr = thread;
846 thread->ref = t_ptr;
847 }
848 }
849
850 AWAKEN(m);
851 }
852 pthread_mutex_unlock(&m->mtx);
853
854 return thread;
855 }
856
857 static struct thread *
858 funcname_thread_add_timer_timeval(struct thread_master *m,
859 int (*func)(struct thread *), int type,
860 void *arg, struct timeval *time_relative,
861 struct thread **t_ptr, debugargdef)
862 {
863 struct thread *thread;
864 struct pqueue *queue;
865
866 assert(m != NULL);
867
868 assert(type == THREAD_TIMER);
869 assert(time_relative);
870
871 pthread_mutex_lock(&m->mtx);
872 {
873 if (t_ptr
874 && *t_ptr) // thread is already scheduled; don't reschedule
875 {
876 pthread_mutex_unlock(&m->mtx);
877 return NULL;
878 }
879
880 queue = m->timer;
881 thread = thread_get(m, type, func, arg, debugargpass);
882
883 pthread_mutex_lock(&thread->mtx);
884 {
885 monotime(&thread->u.sands);
886 timeradd(&thread->u.sands, time_relative,
887 &thread->u.sands);
888 pqueue_enqueue(thread, queue);
889 if (t_ptr) {
890 *t_ptr = thread;
891 thread->ref = t_ptr;
892 }
893 }
894 pthread_mutex_unlock(&thread->mtx);
895
896 AWAKEN(m);
897 }
898 pthread_mutex_unlock(&m->mtx);
899
900 return thread;
901 }
902
903
904 /* Add timer event thread. */
905 struct thread *funcname_thread_add_timer(struct thread_master *m,
906 int (*func)(struct thread *),
907 void *arg, long timer,
908 struct thread **t_ptr, debugargdef)
909 {
910 struct timeval trel;
911
912 assert(m != NULL);
913
914 trel.tv_sec = timer;
915 trel.tv_usec = 0;
916
917 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
918 &trel, t_ptr, debugargpass);
919 }
920
921 /* Add timer event thread with "millisecond" resolution */
922 struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
923 int (*func)(struct thread *),
924 void *arg, long timer,
925 struct thread **t_ptr,
926 debugargdef)
927 {
928 struct timeval trel;
929
930 assert(m != NULL);
931
932 trel.tv_sec = timer / 1000;
933 trel.tv_usec = 1000 * (timer % 1000);
934
935 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
936 &trel, t_ptr, debugargpass);
937 }
938
939 /* Add timer event thread with "millisecond" resolution */
940 struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
941 int (*func)(struct thread *),
942 void *arg, struct timeval *tv,
943 struct thread **t_ptr, debugargdef)
944 {
945 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
946 t_ptr, debugargpass);
947 }
948
949 /* Add simple event thread. */
950 struct thread *funcname_thread_add_event(struct thread_master *m,
951 int (*func)(struct thread *),
952 void *arg, int val,
953 struct thread **t_ptr, debugargdef)
954 {
955 struct thread *thread;
956
957 assert(m != NULL);
958
959 pthread_mutex_lock(&m->mtx);
960 {
961 if (t_ptr
962 && *t_ptr) // thread is already scheduled; don't reschedule
963 {
964 pthread_mutex_unlock(&m->mtx);
965 return NULL;
966 }
967
968 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
969 pthread_mutex_lock(&thread->mtx);
970 {
971 thread->u.val = val;
972 thread_list_add(&m->event, thread);
973 }
974 pthread_mutex_unlock(&thread->mtx);
975
976 if (t_ptr) {
977 *t_ptr = thread;
978 thread->ref = t_ptr;
979 }
980
981 AWAKEN(m);
982 }
983 pthread_mutex_unlock(&m->mtx);
984
985 return thread;
986 }
987
988 /* Thread cancellation ------------------------------------------------------ */
989
990 /**
991 * NOT's out the .events field of pollfd corresponding to the given file
992 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
993 *
994 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
995 * implementation for details.
996 *
997 * @param master
998 * @param fd
999 * @param state the event to cancel. One or more (OR'd together) of the
1000 * following:
1001 * - POLLIN
1002 * - POLLOUT
1003 */
1004 static void thread_cancel_rw(struct thread_master *master, int fd, short state)
1005 {
1006 bool found = false;
1007
1008 /* Cancel POLLHUP too just in case some bozo set it */
1009 state |= POLLHUP;
1010
1011 /* find the index of corresponding pollfd */
1012 nfds_t i;
1013
1014 for (i = 0; i < master->handler.pfdcount; i++)
1015 if (master->handler.pfds[i].fd == fd) {
1016 found = true;
1017 break;
1018 }
1019
1020 if (!found) {
1021 zlog_debug(
1022 "[!] Received cancellation request for nonexistent rw job");
1023 zlog_debug("[!] threadmaster: %s | fd: %d",
1024 master->name ? master->name : "", fd);
1025 return;
1026 }
1027
1028 /* NOT out event. */
1029 master->handler.pfds[i].events &= ~(state);
1030
1031 /* If all events are canceled, delete / resize the pollfd array. */
1032 if (master->handler.pfds[i].events == 0) {
1033 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
1034 (master->handler.pfdcount - i - 1)
1035 * sizeof(struct pollfd));
1036 master->handler.pfdcount--;
1037 }
1038
1039 /* If we have the same pollfd in the copy, perform the same operations,
1040 * otherwise return. */
1041 if (i >= master->handler.copycount)
1042 return;
1043
1044 master->handler.copy[i].events &= ~(state);
1045
1046 if (master->handler.copy[i].events == 0) {
1047 memmove(master->handler.copy + i, master->handler.copy + i + 1,
1048 (master->handler.copycount - i - 1)
1049 * sizeof(struct pollfd));
1050 master->handler.copycount--;
1051 }
1052 }
1053
1054 /**
1055 * Process cancellation requests.
1056 *
1057 * This may only be run from the pthread which owns the thread_master.
1058 *
1059 * @param master the thread master to process
1060 * @REQUIRE master->mtx
1061 */
1062 static void do_thread_cancel(struct thread_master *master)
1063 {
1064 struct thread_list *list = NULL;
1065 struct pqueue *queue = NULL;
1066 struct thread **thread_array = NULL;
1067 struct thread *thread;
1068
1069 struct cancel_req *cr;
1070 struct listnode *ln;
1071 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1072 /* If this is an event object cancellation, linear search
1073 * through event
1074 * list deleting any events which have the specified argument.
1075 * We also
1076 * need to check every thread in the ready queue. */
1077 if (cr->eventobj) {
1078 struct thread *t;
1079 thread = master->event.head;
1080
1081 while (thread) {
1082 t = thread;
1083 thread = t->next;
1084
1085 if (t->arg == cr->eventobj) {
1086 thread_list_delete(&master->event, t);
1087 if (t->ref)
1088 *t->ref = NULL;
1089 thread_add_unuse(master, t);
1090 }
1091 }
1092
1093 thread = master->ready.head;
1094 while (thread) {
1095 t = thread;
1096 thread = t->next;
1097
1098 if (t->arg == cr->eventobj) {
1099 thread_list_delete(&master->ready, t);
1100 if (t->ref)
1101 *t->ref = NULL;
1102 thread_add_unuse(master, t);
1103 }
1104 }
1105 continue;
1106 }
1107
1108 /* The pointer varies depending on whether the cancellation
1109 * request was
1110 * made asynchronously or not. If it was, we need to check
1111 * whether the
1112 * thread even exists anymore before cancelling it. */
1113 thread = (cr->thread) ? cr->thread : *cr->threadref;
1114
1115 if (!thread)
1116 continue;
1117
1118 /* Determine the appropriate queue to cancel the thread from */
1119 switch (thread->type) {
1120 case THREAD_READ:
1121 thread_cancel_rw(master, thread->u.fd, POLLIN);
1122 thread_array = master->read;
1123 break;
1124 case THREAD_WRITE:
1125 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1126 thread_array = master->write;
1127 break;
1128 case THREAD_TIMER:
1129 queue = master->timer;
1130 break;
1131 case THREAD_EVENT:
1132 list = &master->event;
1133 break;
1134 case THREAD_READY:
1135 list = &master->ready;
1136 break;
1137 default:
1138 continue;
1139 break;
1140 }
1141
1142 if (queue) {
1143 assert(thread->index >= 0);
1144 assert(thread == queue->array[thread->index]);
1145 pqueue_remove_at(thread->index, queue);
1146 } else if (list) {
1147 thread_list_delete(list, thread);
1148 } else if (thread_array) {
1149 thread_array[thread->u.fd] = NULL;
1150 } else {
1151 assert(!"Thread should be either in queue or list or array!");
1152 }
1153
1154 if (thread->ref)
1155 *thread->ref = NULL;
1156
1157 thread_add_unuse(thread->master, thread);
1158 }
1159
1160 /* Delete and free all cancellation requests */
1161 list_delete_all_node(master->cancel_req);
1162
1163 /* Wake up any threads which may be blocked in thread_cancel_async() */
1164 master->canceled = true;
1165 pthread_cond_broadcast(&master->cancel_cond);
1166 }
1167
1168 /**
1169 * Cancel any events which have the specified argument.
1170 *
1171 * MT-Unsafe
1172 *
1173 * @param m the thread_master to cancel from
1174 * @param arg the argument passed when creating the event
1175 */
1176 void thread_cancel_event(struct thread_master *master, void *arg)
1177 {
1178 assert(master->owner == pthread_self());
1179
1180 pthread_mutex_lock(&master->mtx);
1181 {
1182 struct cancel_req *cr =
1183 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1184 cr->eventobj = arg;
1185 listnode_add(master->cancel_req, cr);
1186 do_thread_cancel(master);
1187 }
1188 pthread_mutex_unlock(&master->mtx);
1189 }
1190
1191 /**
1192 * Cancel a specific task.
1193 *
1194 * MT-Unsafe
1195 *
1196 * @param thread task to cancel
1197 */
1198 void thread_cancel(struct thread *thread)
1199 {
1200 struct thread_master *master = thread->master;
1201
1202 assert(master->owner == pthread_self());
1203
1204 pthread_mutex_lock(&master->mtx);
1205 {
1206 struct cancel_req *cr =
1207 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1208 cr->thread = thread;
1209 listnode_add(master->cancel_req, cr);
1210 do_thread_cancel(master);
1211 }
1212 pthread_mutex_unlock(&master->mtx);
1213 }
1214
1215 /**
1216 * Asynchronous cancellation.
1217 *
1218 * Called with either a struct thread ** or void * to an event argument,
1219 * this function posts the correct cancellation request and blocks until it is
1220 * serviced.
1221 *
1222 * If the thread is currently running, execution blocks until it completes.
1223 *
1224 * The last two parameters are mutually exclusive, i.e. if you pass one the
1225 * other must be NULL.
1226 *
1227 * When the cancellation procedure executes on the target thread_master, the
1228 * thread * provided is checked for nullity. If it is null, the thread is
1229 * assumed to no longer exist and the cancellation request is a no-op. Thus
1230 * users of this API must pass a back-reference when scheduling the original
1231 * task.
1232 *
1233 * MT-Safe
1234 *
1235 * @param master the thread master with the relevant event / task
1236 * @param thread pointer to thread to cancel
1237 * @param eventobj the event
1238 */
1239 void thread_cancel_async(struct thread_master *master, struct thread **thread,
1240 void *eventobj)
1241 {
1242 assert(!(thread && eventobj) && (thread || eventobj));
1243 assert(master->owner != pthread_self());
1244
1245 pthread_mutex_lock(&master->mtx);
1246 {
1247 master->canceled = false;
1248
1249 if (thread) {
1250 struct cancel_req *cr =
1251 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1252 cr->threadref = thread;
1253 listnode_add(master->cancel_req, cr);
1254 } else if (eventobj) {
1255 struct cancel_req *cr =
1256 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1257 cr->eventobj = eventobj;
1258 listnode_add(master->cancel_req, cr);
1259 }
1260 AWAKEN(master);
1261
1262 while (!master->canceled)
1263 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1264 }
1265 pthread_mutex_unlock(&master->mtx);
1266 }
1267 /* ------------------------------------------------------------------------- */
1268
1269 static struct timeval *thread_timer_wait(struct pqueue *queue,
1270 struct timeval *timer_val)
1271 {
1272 if (queue->size) {
1273 struct thread *next_timer = queue->array[0];
1274 monotime_until(&next_timer->u.sands, timer_val);
1275 return timer_val;
1276 }
1277 return NULL;
1278 }
1279
1280 static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1281 struct thread *fetch)
1282 {
1283 *fetch = *thread;
1284 thread_add_unuse(m, thread);
1285 return fetch;
1286 }
1287
1288 static int thread_process_io_helper(struct thread_master *m,
1289 struct thread *thread, short state, int pos)
1290 {
1291 struct thread **thread_array;
1292
1293 if (!thread)
1294 return 0;
1295
1296 if (thread->type == THREAD_READ)
1297 thread_array = m->read;
1298 else
1299 thread_array = m->write;
1300
1301 thread_array[thread->u.fd] = NULL;
1302 thread_list_add(&m->ready, thread);
1303 thread->type = THREAD_READY;
1304 /* if another pthread scheduled this file descriptor for the event we're
1305 * responding to, no problem; we're getting to it now */
1306 thread->master->handler.pfds[pos].events &= ~(state);
1307 return 1;
1308 }
1309
1310 /**
1311 * Process I/O events.
1312 *
1313 * Walks through file descriptor array looking for those pollfds whose .revents
1314 * field has something interesting. Deletes any invalid file descriptors.
1315 *
1316 * @param m the thread master
1317 * @param num the number of active file descriptors (return value of poll())
1318 */
1319 static void thread_process_io(struct thread_master *m, unsigned int num)
1320 {
1321 unsigned int ready = 0;
1322 struct pollfd *pfds = m->handler.copy;
1323
1324 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1325 /* no event for current fd? immediately continue */
1326 if (pfds[i].revents == 0)
1327 continue;
1328
1329 ready++;
1330
1331 /* Unless someone has called thread_cancel from another pthread,
1332 * the only
1333 * thing that could have changed in m->handler.pfds while we
1334 * were
1335 * asleep is the .events field in a given pollfd. Barring
1336 * thread_cancel()
1337 * that value should be a superset of the values we have in our
1338 * copy, so
1339 * there's no need to update it. Similarily, barring deletion,
1340 * the fd
1341 * should still be a valid index into the master's pfds. */
1342 if (pfds[i].revents & (POLLIN | POLLHUP))
1343 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
1344 i);
1345 if (pfds[i].revents & POLLOUT)
1346 thread_process_io_helper(m, m->write[pfds[i].fd],
1347 POLLOUT, i);
1348
1349 /* if one of our file descriptors is garbage, remove the same
1350 * from
1351 * both pfds + update sizes and index */
1352 if (pfds[i].revents & POLLNVAL) {
1353 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1354 (m->handler.pfdcount - i - 1)
1355 * sizeof(struct pollfd));
1356 m->handler.pfdcount--;
1357
1358 memmove(pfds + i, pfds + i + 1,
1359 (m->handler.copycount - i - 1)
1360 * sizeof(struct pollfd));
1361 m->handler.copycount--;
1362
1363 i--;
1364 }
1365 }
1366 }
1367
1368 /* Add all timers that have popped to the ready list. */
1369 static unsigned int thread_process_timers(struct pqueue *queue,
1370 struct timeval *timenow)
1371 {
1372 struct thread *thread;
1373 unsigned int ready = 0;
1374
1375 while (queue->size) {
1376 thread = queue->array[0];
1377 if (timercmp(timenow, &thread->u.sands, <))
1378 return ready;
1379 pqueue_dequeue(queue);
1380 thread->type = THREAD_READY;
1381 thread_list_add(&thread->master->ready, thread);
1382 ready++;
1383 }
1384 return ready;
1385 }
1386
1387 /* process a list en masse, e.g. for event thread lists */
1388 static unsigned int thread_process(struct thread_list *list)
1389 {
1390 struct thread *thread;
1391 struct thread *next;
1392 unsigned int ready = 0;
1393
1394 for (thread = list->head; thread; thread = next) {
1395 next = thread->next;
1396 thread_list_delete(list, thread);
1397 thread->type = THREAD_READY;
1398 thread_list_add(&thread->master->ready, thread);
1399 ready++;
1400 }
1401 return ready;
1402 }
1403
1404
1405 /* Fetch next ready thread. */
1406 struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
1407 {
1408 struct thread *thread = NULL;
1409 struct timeval now;
1410 struct timeval zerotime = {0, 0};
1411 struct timeval tv;
1412 struct timeval *tw = NULL;
1413
1414 int num = 0;
1415
1416 do {
1417 /* Handle signals if any */
1418 if (m->handle_signals)
1419 quagga_sigevent_process();
1420
1421 pthread_mutex_lock(&m->mtx);
1422
1423 /* Process any pending cancellation requests */
1424 do_thread_cancel(m);
1425
1426 /*
1427 * Attempt to flush ready queue before going into poll().
1428 * This is performance-critical. Think twice before modifying.
1429 */
1430 if ((thread = thread_trim_head(&m->ready))) {
1431 fetch = thread_run(m, thread, fetch);
1432 if (fetch->ref)
1433 *fetch->ref = NULL;
1434 pthread_mutex_unlock(&m->mtx);
1435 break;
1436 }
1437
1438 /* otherwise, tick through scheduling sequence */
1439
1440 /*
1441 * Post events to ready queue. This must come before the
1442 * following block since events should occur immediately
1443 */
1444 thread_process(&m->event);
1445
1446 /*
1447 * If there are no tasks on the ready queue, we will poll()
1448 * until a timer expires or we receive I/O, whichever comes
1449 * first. The strategy for doing this is:
1450 *
1451 * - If there are events pending, set the poll() timeout to zero
1452 * - If there are no events pending, but there are timers
1453 * pending, set the
1454 * timeout to the smallest remaining time on any timer
1455 * - If there are neither timers nor events pending, but there
1456 * are file
1457 * descriptors pending, block indefinitely in poll()
1458 * - If nothing is pending, it's time for the application to die
1459 *
1460 * In every case except the last, we need to hit poll() at least
1461 * once per loop to avoid starvation by events
1462 */
1463 if (m->ready.count == 0)
1464 tw = thread_timer_wait(m->timer, &tv);
1465
1466 if (m->ready.count != 0 || (tw && !timercmp(tw, &zerotime, >)))
1467 tw = &zerotime;
1468
1469 if (!tw && m->handler.pfdcount == 0) { /* die */
1470 pthread_mutex_unlock(&m->mtx);
1471 fetch = NULL;
1472 break;
1473 }
1474
1475 /*
1476 * Copy pollfd array + # active pollfds in it. Not necessary to
1477 * copy the array size as this is fixed.
1478 */
1479 m->handler.copycount = m->handler.pfdcount;
1480 memcpy(m->handler.copy, m->handler.pfds,
1481 m->handler.copycount * sizeof(struct pollfd));
1482
1483 pthread_mutex_unlock(&m->mtx);
1484 {
1485 num = fd_poll(m, m->handler.copy, m->handler.pfdsize,
1486 m->handler.copycount, tw);
1487 }
1488 pthread_mutex_lock(&m->mtx);
1489
1490 /* Handle any errors received in poll() */
1491 if (num < 0) {
1492 if (errno == EINTR) {
1493 pthread_mutex_unlock(&m->mtx);
1494 /* loop around to signal handler */
1495 continue;
1496 }
1497
1498 /* else die */
1499 flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
1500 safe_strerror(errno));
1501 pthread_mutex_unlock(&m->mtx);
1502 fetch = NULL;
1503 break;
1504 }
1505
1506 /* Post timers to ready queue. */
1507 monotime(&now);
1508 thread_process_timers(m->timer, &now);
1509
1510 /* Post I/O to ready queue. */
1511 if (num > 0)
1512 thread_process_io(m, num);
1513
1514 pthread_mutex_unlock(&m->mtx);
1515
1516 } while (!thread && m->spin);
1517
1518 return fetch;
1519 }
1520
1521 static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
1522 {
1523 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1524 + (a.tv_usec - b.tv_usec));
1525 }
1526
1527 unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1528 unsigned long *cputime)
1529 {
1530 /* This is 'user + sys' time. */
1531 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1532 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1533 return timeval_elapsed(now->real, start->real);
1534 }
1535
1536 /* We should aim to yield after yield milliseconds, which defaults
1537 to THREAD_YIELD_TIME_SLOT .
1538 Note: we are using real (wall clock) time for this calculation.
1539 It could be argued that CPU time may make more sense in certain
1540 contexts. The things to consider are whether the thread may have
1541 blocked (in which case wall time increases, but CPU time does not),
1542 or whether the system is heavily loaded with other processes competing
1543 for CPU time. On balance, wall clock time seems to make sense.
1544 Plus it has the added benefit that gettimeofday should be faster
1545 than calling getrusage. */
1546 int thread_should_yield(struct thread *thread)
1547 {
1548 int result;
1549 pthread_mutex_lock(&thread->mtx);
1550 {
1551 result = monotime_since(&thread->real, NULL)
1552 > (int64_t)thread->yield;
1553 }
1554 pthread_mutex_unlock(&thread->mtx);
1555 return result;
1556 }
1557
1558 void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
1559 {
1560 pthread_mutex_lock(&thread->mtx);
1561 {
1562 thread->yield = yield_time;
1563 }
1564 pthread_mutex_unlock(&thread->mtx);
1565 }
1566
1567 void thread_getrusage(RUSAGE_T *r)
1568 {
1569 monotime(&r->real);
1570 getrusage(RUSAGE_SELF, &(r->cpu));
1571 }
1572
1573 /*
1574 * Call a thread.
1575 *
1576 * This function will atomically update the thread's usage history. At present
1577 * this is the only spot where usage history is written. Nevertheless the code
1578 * has been written such that the introduction of writers in the future should
1579 * not need to update it provided the writers atomically perform only the
1580 * operations done here, i.e. updating the total and maximum times. In
1581 * particular, the maximum real and cpu times must be monotonically increasing
1582 * or this code is not correct.
1583 */
1584 void thread_call(struct thread *thread)
1585 {
1586 _Atomic unsigned long realtime, cputime;
1587 unsigned long exp;
1588 unsigned long helper;
1589 RUSAGE_T before, after;
1590
1591 GETRUSAGE(&before);
1592 thread->real = before.real;
1593
1594 pthread_setspecific(thread_current, thread);
1595 (*thread->func)(thread);
1596 pthread_setspecific(thread_current, NULL);
1597
1598 GETRUSAGE(&after);
1599
1600 realtime = thread_consumed_time(&after, &before, &helper);
1601 cputime = helper;
1602
1603 /* update realtime */
1604 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1605 memory_order_seq_cst);
1606 exp = atomic_load_explicit(&thread->hist->real.max,
1607 memory_order_seq_cst);
1608 while (exp < realtime
1609 && !atomic_compare_exchange_weak_explicit(
1610 &thread->hist->real.max, &exp, realtime,
1611 memory_order_seq_cst, memory_order_seq_cst))
1612 ;
1613
1614 /* update cputime */
1615 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1616 memory_order_seq_cst);
1617 exp = atomic_load_explicit(&thread->hist->cpu.max,
1618 memory_order_seq_cst);
1619 while (exp < cputime
1620 && !atomic_compare_exchange_weak_explicit(
1621 &thread->hist->cpu.max, &exp, cputime,
1622 memory_order_seq_cst, memory_order_seq_cst))
1623 ;
1624
1625 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1626 memory_order_seq_cst);
1627 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1628 memory_order_seq_cst);
1629
1630 #ifdef CONSUMED_TIME_CHECK
1631 if (realtime > CONSUMED_TIME_CHECK) {
1632 /*
1633 * We have a CPU Hog on our hands.
1634 * Whinge about it now, so we're aware this is yet another task
1635 * to fix.
1636 */
1637 flog_warn(
1638 EC_LIB_SLOW_THREAD,
1639 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1640 thread->funcname, (unsigned long)thread->func,
1641 realtime / 1000, cputime / 1000);
1642 }
1643 #endif /* CONSUMED_TIME_CHECK */
1644 }
1645
1646 /* Execute thread */
1647 void funcname_thread_execute(struct thread_master *m,
1648 int (*func)(struct thread *), void *arg, int val,
1649 debugargdef)
1650 {
1651 struct thread *thread;
1652
1653 /* Get or allocate new thread to execute. */
1654 pthread_mutex_lock(&m->mtx);
1655 {
1656 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
1657
1658 /* Set its event value. */
1659 pthread_mutex_lock(&thread->mtx);
1660 {
1661 thread->add_type = THREAD_EXECUTE;
1662 thread->u.val = val;
1663 thread->ref = &thread;
1664 }
1665 pthread_mutex_unlock(&thread->mtx);
1666 }
1667 pthread_mutex_unlock(&m->mtx);
1668
1669 /* Execute thread doing all accounting. */
1670 thread_call(thread);
1671
1672 /* Give back or free thread. */
1673 thread_add_unuse(m, thread);
1674 }