]> git.proxmox.com Git - mirror_frr.git/blob - lib/thread.c
zebra, lib: fix the ZEBRA_INTERFACE_VRF_UPDATE zapi message
[mirror_frr.git] / lib / thread.c
1 /* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
21 /* #define DEBUG */
22
23 #include <zebra.h>
24 #include <sys/resource.h>
25
26 #include "thread.h"
27 #include "memory.h"
28 #include "log.h"
29 #include "hash.h"
30 #include "pqueue.h"
31 #include "command.h"
32 #include "sigevent.h"
33 #include "network.h"
34 #include "jhash.h"
35 #include "frratomic.h"
36 #include "lib_errors.h"
37
38 DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
39 DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
40 DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
41 DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
42
43 #if defined(__APPLE__)
44 #include <mach/mach.h>
45 #include <mach/mach_time.h>
46 #endif
47
48 #define AWAKEN(m) \
49 do { \
50 static unsigned char wakebyte = 0x01; \
51 write(m->io_pipe[1], &wakebyte, 1); \
52 } while (0);
53
54 /* control variable for initializer */
55 pthread_once_t init_once = PTHREAD_ONCE_INIT;
56 pthread_key_t thread_current;
57
58 pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
59 static struct list *masters;
60
61 static void thread_free(struct thread_master *master, struct thread *thread);
62
63 /* CLI start ---------------------------------------------------------------- */
64 static unsigned int cpu_record_hash_key(struct cpu_thread_history *a)
65 {
66 int size = sizeof(a->func);
67
68 return jhash(&a->func, size, 0);
69 }
70
71 static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
72 const struct cpu_thread_history *b)
73 {
74 return a->func == b->func;
75 }
76
77 static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
78 {
79 struct cpu_thread_history *new;
80 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
81 new->func = a->func;
82 new->funcname = a->funcname;
83 return new;
84 }
85
86 static void cpu_record_hash_free(void *a)
87 {
88 struct cpu_thread_history *hist = a;
89
90 XFREE(MTYPE_THREAD_STATS, hist);
91 }
92
93 static void vty_out_cpu_thread_history(struct vty *vty,
94 struct cpu_thread_history *a)
95 {
96 vty_out(vty, "%5d %10lu.%03lu %9u %8lu %9lu %8lu %9lu", a->total_active,
97 a->cpu.total / 1000, a->cpu.total % 1000, a->total_calls,
98 a->cpu.total / a->total_calls, a->cpu.max,
99 a->real.total / a->total_calls, a->real.max);
100 vty_out(vty, " %c%c%c%c%c %s\n",
101 a->types & (1 << THREAD_READ) ? 'R' : ' ',
102 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
103 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
104 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
105 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
106 }
107
108 static void cpu_record_hash_print(struct hash_backet *bucket, void *args[])
109 {
110 struct cpu_thread_history *totals = args[0];
111 struct cpu_thread_history copy;
112 struct vty *vty = args[1];
113 uint8_t *filter = args[2];
114
115 struct cpu_thread_history *a = bucket->data;
116
117 copy.total_active =
118 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
119 copy.total_calls =
120 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
121 copy.cpu.total =
122 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
123 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
124 copy.real.total =
125 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
126 copy.real.max =
127 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
128 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
129 copy.funcname = a->funcname;
130
131 if (!(copy.types & *filter))
132 return;
133
134 vty_out_cpu_thread_history(vty, &copy);
135 totals->total_active += copy.total_active;
136 totals->total_calls += copy.total_calls;
137 totals->real.total += copy.real.total;
138 if (totals->real.max < copy.real.max)
139 totals->real.max = copy.real.max;
140 totals->cpu.total += copy.cpu.total;
141 if (totals->cpu.max < copy.cpu.max)
142 totals->cpu.max = copy.cpu.max;
143 }
144
145 static void cpu_record_print(struct vty *vty, uint8_t filter)
146 {
147 struct cpu_thread_history tmp;
148 void *args[3] = {&tmp, vty, &filter};
149 struct thread_master *m;
150 struct listnode *ln;
151
152 memset(&tmp, 0, sizeof tmp);
153 tmp.funcname = "TOTAL";
154 tmp.types = filter;
155
156 pthread_mutex_lock(&masters_mtx);
157 {
158 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
159 const char *name = m->name ? m->name : "main";
160
161 char underline[strlen(name) + 1];
162 memset(underline, '-', sizeof(underline));
163 underline[sizeof(underline) - 1] = '\0';
164
165 vty_out(vty, "\n");
166 vty_out(vty, "Showing statistics for pthread %s\n",
167 name);
168 vty_out(vty, "-------------------------------%s\n",
169 underline);
170 vty_out(vty, "%21s %18s %18s\n", "",
171 "CPU (user+system):", "Real (wall-clock):");
172 vty_out(vty,
173 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
174 vty_out(vty, " Avg uSec Max uSecs");
175 vty_out(vty, " Type Thread\n");
176
177 if (m->cpu_record->count)
178 hash_iterate(
179 m->cpu_record,
180 (void (*)(struct hash_backet *,
181 void *))cpu_record_hash_print,
182 args);
183 else
184 vty_out(vty, "No data to display yet.\n");
185
186 vty_out(vty, "\n");
187 }
188 }
189 pthread_mutex_unlock(&masters_mtx);
190
191 vty_out(vty, "\n");
192 vty_out(vty, "Total thread statistics\n");
193 vty_out(vty, "-------------------------\n");
194 vty_out(vty, "%21s %18s %18s\n", "",
195 "CPU (user+system):", "Real (wall-clock):");
196 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
197 vty_out(vty, " Avg uSec Max uSecs");
198 vty_out(vty, " Type Thread\n");
199
200 if (tmp.total_calls > 0)
201 vty_out_cpu_thread_history(vty, &tmp);
202 }
203
204 static void cpu_record_hash_clear(struct hash_backet *bucket, void *args[])
205 {
206 uint8_t *filter = args[0];
207 struct hash *cpu_record = args[1];
208
209 struct cpu_thread_history *a = bucket->data;
210
211 if (!(a->types & *filter))
212 return;
213
214 hash_release(cpu_record, bucket->data);
215 }
216
217 static void cpu_record_clear(uint8_t filter)
218 {
219 uint8_t *tmp = &filter;
220 struct thread_master *m;
221 struct listnode *ln;
222
223 pthread_mutex_lock(&masters_mtx);
224 {
225 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
226 pthread_mutex_lock(&m->mtx);
227 {
228 void *args[2] = {tmp, m->cpu_record};
229 hash_iterate(
230 m->cpu_record,
231 (void (*)(struct hash_backet *,
232 void *))cpu_record_hash_clear,
233 args);
234 }
235 pthread_mutex_unlock(&m->mtx);
236 }
237 }
238 pthread_mutex_unlock(&masters_mtx);
239 }
240
241 static uint8_t parse_filter(const char *filterstr)
242 {
243 int i = 0;
244 int filter = 0;
245
246 while (filterstr[i] != '\0') {
247 switch (filterstr[i]) {
248 case 'r':
249 case 'R':
250 filter |= (1 << THREAD_READ);
251 break;
252 case 'w':
253 case 'W':
254 filter |= (1 << THREAD_WRITE);
255 break;
256 case 't':
257 case 'T':
258 filter |= (1 << THREAD_TIMER);
259 break;
260 case 'e':
261 case 'E':
262 filter |= (1 << THREAD_EVENT);
263 break;
264 case 'x':
265 case 'X':
266 filter |= (1 << THREAD_EXECUTE);
267 break;
268 default:
269 break;
270 }
271 ++i;
272 }
273 return filter;
274 }
275
276 DEFUN (show_thread_cpu,
277 show_thread_cpu_cmd,
278 "show thread cpu [FILTER]",
279 SHOW_STR
280 "Thread information\n"
281 "Thread CPU usage\n"
282 "Display filter (rwtexb)\n")
283 {
284 uint8_t filter = (uint8_t)-1U;
285 int idx = 0;
286
287 if (argv_find(argv, argc, "FILTER", &idx)) {
288 filter = parse_filter(argv[idx]->arg);
289 if (!filter) {
290 vty_out(vty,
291 "Invalid filter \"%s\" specified; must contain at least"
292 "one of 'RWTEXB'\n",
293 argv[idx]->arg);
294 return CMD_WARNING;
295 }
296 }
297
298 cpu_record_print(vty, filter);
299 return CMD_SUCCESS;
300 }
301
302 static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
303 {
304 const char *name = m->name ? m->name : "main";
305 char underline[strlen(name) + 1];
306 uint32_t i;
307
308 memset(underline, '-', sizeof(underline));
309 underline[sizeof(underline) - 1] = '\0';
310
311 vty_out(vty, "\nShowing poll FD's for %s\n", name);
312 vty_out(vty, "----------------------%s\n", underline);
313 vty_out(vty, "Count: %u\n", (uint32_t)m->handler.pfdcount);
314 for (i = 0; i < m->handler.pfdcount; i++)
315 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\n", i,
316 m->handler.pfds[i].fd,
317 m->handler.pfds[i].events,
318 m->handler.pfds[i].revents);
319 }
320
321 DEFUN (show_thread_poll,
322 show_thread_poll_cmd,
323 "show thread poll",
324 SHOW_STR
325 "Thread information\n"
326 "Show poll FD's and information\n")
327 {
328 struct listnode *node;
329 struct thread_master *m;
330
331 pthread_mutex_lock(&masters_mtx);
332 {
333 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
334 show_thread_poll_helper(vty, m);
335 }
336 }
337 pthread_mutex_unlock(&masters_mtx);
338
339 return CMD_SUCCESS;
340 }
341
342
343 DEFUN (clear_thread_cpu,
344 clear_thread_cpu_cmd,
345 "clear thread cpu [FILTER]",
346 "Clear stored data in all pthreads\n"
347 "Thread information\n"
348 "Thread CPU usage\n"
349 "Display filter (rwtexb)\n")
350 {
351 uint8_t filter = (uint8_t)-1U;
352 int idx = 0;
353
354 if (argv_find(argv, argc, "FILTER", &idx)) {
355 filter = parse_filter(argv[idx]->arg);
356 if (!filter) {
357 vty_out(vty,
358 "Invalid filter \"%s\" specified; must contain at least"
359 "one of 'RWTEXB'\n",
360 argv[idx]->arg);
361 return CMD_WARNING;
362 }
363 }
364
365 cpu_record_clear(filter);
366 return CMD_SUCCESS;
367 }
368
369 void thread_cmd_init(void)
370 {
371 install_element(VIEW_NODE, &show_thread_cpu_cmd);
372 install_element(VIEW_NODE, &show_thread_poll_cmd);
373 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
374 }
375 /* CLI end ------------------------------------------------------------------ */
376
377
378 static int thread_timer_cmp(void *a, void *b)
379 {
380 struct thread *thread_a = a;
381 struct thread *thread_b = b;
382
383 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, <))
384 return -1;
385 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, >))
386 return 1;
387 return 0;
388 }
389
390 static void thread_timer_update(void *node, int actual_position)
391 {
392 struct thread *thread = node;
393
394 thread->index = actual_position;
395 }
396
397 static void cancelreq_del(void *cr)
398 {
399 XFREE(MTYPE_TMP, cr);
400 }
401
402 /* initializer, only ever called once */
403 static void initializer()
404 {
405 pthread_key_create(&thread_current, NULL);
406 }
407
408 struct thread_master *thread_master_create(const char *name)
409 {
410 struct thread_master *rv;
411 struct rlimit limit;
412
413 pthread_once(&init_once, &initializer);
414
415 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
416 if (rv == NULL)
417 return NULL;
418
419 /* Initialize master mutex */
420 pthread_mutex_init(&rv->mtx, NULL);
421 pthread_cond_init(&rv->cancel_cond, NULL);
422
423 /* Set name */
424 rv->name = name ? XSTRDUP(MTYPE_THREAD_MASTER, name) : NULL;
425
426 /* Initialize I/O task data structures */
427 getrlimit(RLIMIT_NOFILE, &limit);
428 rv->fd_limit = (int)limit.rlim_cur;
429 rv->read = XCALLOC(MTYPE_THREAD_POLL,
430 sizeof(struct thread *) * rv->fd_limit);
431
432 rv->write = XCALLOC(MTYPE_THREAD_POLL,
433 sizeof(struct thread *) * rv->fd_limit);
434
435 rv->cpu_record = hash_create_size(
436 8, (unsigned int (*)(void *))cpu_record_hash_key,
437 (bool (*)(const void *, const void *))cpu_record_hash_cmp,
438 "Thread Hash");
439
440
441 /* Initialize the timer queues */
442 rv->timer = pqueue_create();
443 rv->timer->cmp = thread_timer_cmp;
444 rv->timer->update = thread_timer_update;
445
446 /* Initialize thread_fetch() settings */
447 rv->spin = true;
448 rv->handle_signals = true;
449
450 /* Set pthread owner, should be updated by actual owner */
451 rv->owner = pthread_self();
452 rv->cancel_req = list_new();
453 rv->cancel_req->del = cancelreq_del;
454 rv->canceled = true;
455
456 /* Initialize pipe poker */
457 pipe(rv->io_pipe);
458 set_nonblocking(rv->io_pipe[0]);
459 set_nonblocking(rv->io_pipe[1]);
460
461 /* Initialize data structures for poll() */
462 rv->handler.pfdsize = rv->fd_limit;
463 rv->handler.pfdcount = 0;
464 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
465 sizeof(struct pollfd) * rv->handler.pfdsize);
466 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
467 sizeof(struct pollfd) * rv->handler.pfdsize);
468
469 /* add to list of threadmasters */
470 pthread_mutex_lock(&masters_mtx);
471 {
472 if (!masters)
473 masters = list_new();
474
475 listnode_add(masters, rv);
476 }
477 pthread_mutex_unlock(&masters_mtx);
478
479 return rv;
480 }
481
482 void thread_master_set_name(struct thread_master *master, const char *name)
483 {
484 pthread_mutex_lock(&master->mtx);
485 {
486 if (master->name)
487 XFREE(MTYPE_THREAD_MASTER, master->name);
488 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
489 }
490 pthread_mutex_unlock(&master->mtx);
491 }
492
493 /* Add a new thread to the list. */
494 static void thread_list_add(struct thread_list *list, struct thread *thread)
495 {
496 thread->next = NULL;
497 thread->prev = list->tail;
498 if (list->tail)
499 list->tail->next = thread;
500 else
501 list->head = thread;
502 list->tail = thread;
503 list->count++;
504 }
505
506 /* Delete a thread from the list. */
507 static struct thread *thread_list_delete(struct thread_list *list,
508 struct thread *thread)
509 {
510 if (thread->next)
511 thread->next->prev = thread->prev;
512 else
513 list->tail = thread->prev;
514 if (thread->prev)
515 thread->prev->next = thread->next;
516 else
517 list->head = thread->next;
518 thread->next = thread->prev = NULL;
519 list->count--;
520 return thread;
521 }
522
523 /* Thread list is empty or not. */
524 static int thread_empty(struct thread_list *list)
525 {
526 return list->head ? 0 : 1;
527 }
528
529 /* Delete top of the list and return it. */
530 static struct thread *thread_trim_head(struct thread_list *list)
531 {
532 if (!thread_empty(list))
533 return thread_list_delete(list, list->head);
534 return NULL;
535 }
536
537 #define THREAD_UNUSED_DEPTH 10
538
539 /* Move thread to unuse list. */
540 static void thread_add_unuse(struct thread_master *m, struct thread *thread)
541 {
542 pthread_mutex_t mtxc = thread->mtx;
543
544 assert(m != NULL && thread != NULL);
545 assert(thread->next == NULL);
546 assert(thread->prev == NULL);
547
548 thread->hist->total_active--;
549 memset(thread, 0, sizeof(struct thread));
550 thread->type = THREAD_UNUSED;
551
552 /* Restore the thread mutex context. */
553 thread->mtx = mtxc;
554
555 if (m->unuse.count < THREAD_UNUSED_DEPTH) {
556 thread_list_add(&m->unuse, thread);
557 return;
558 }
559
560 thread_free(m, thread);
561 }
562
563 /* Free all unused thread. */
564 static void thread_list_free(struct thread_master *m, struct thread_list *list)
565 {
566 struct thread *t;
567 struct thread *next;
568
569 for (t = list->head; t; t = next) {
570 next = t->next;
571 thread_free(m, t);
572 list->count--;
573 }
574 }
575
576 static void thread_array_free(struct thread_master *m,
577 struct thread **thread_array)
578 {
579 struct thread *t;
580 int index;
581
582 for (index = 0; index < m->fd_limit; ++index) {
583 t = thread_array[index];
584 if (t) {
585 thread_array[index] = NULL;
586 thread_free(m, t);
587 }
588 }
589 XFREE(MTYPE_THREAD_POLL, thread_array);
590 }
591
592 static void thread_queue_free(struct thread_master *m, struct pqueue *queue)
593 {
594 int i;
595
596 for (i = 0; i < queue->size; i++)
597 thread_free(m, queue->array[i]);
598
599 pqueue_delete(queue);
600 }
601
602 /*
603 * thread_master_free_unused
604 *
605 * As threads are finished with they are put on the
606 * unuse list for later reuse.
607 * If we are shutting down, Free up unused threads
608 * So we can see if we forget to shut anything off
609 */
610 void thread_master_free_unused(struct thread_master *m)
611 {
612 pthread_mutex_lock(&m->mtx);
613 {
614 struct thread *t;
615 while ((t = thread_trim_head(&m->unuse)) != NULL) {
616 thread_free(m, t);
617 }
618 }
619 pthread_mutex_unlock(&m->mtx);
620 }
621
622 /* Stop thread scheduler. */
623 void thread_master_free(struct thread_master *m)
624 {
625 pthread_mutex_lock(&masters_mtx);
626 {
627 listnode_delete(masters, m);
628 if (masters->count == 0) {
629 list_delete(&masters);
630 }
631 }
632 pthread_mutex_unlock(&masters_mtx);
633
634 thread_array_free(m, m->read);
635 thread_array_free(m, m->write);
636 thread_queue_free(m, m->timer);
637 thread_list_free(m, &m->event);
638 thread_list_free(m, &m->ready);
639 thread_list_free(m, &m->unuse);
640 pthread_mutex_destroy(&m->mtx);
641 pthread_cond_destroy(&m->cancel_cond);
642 close(m->io_pipe[0]);
643 close(m->io_pipe[1]);
644 list_delete(&m->cancel_req);
645 m->cancel_req = NULL;
646
647 hash_clean(m->cpu_record, cpu_record_hash_free);
648 hash_free(m->cpu_record);
649 m->cpu_record = NULL;
650
651 if (m->name)
652 XFREE(MTYPE_THREAD_MASTER, m->name);
653 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
654 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
655 XFREE(MTYPE_THREAD_MASTER, m);
656 }
657
658 /* Return remain time in miliseconds. */
659 unsigned long thread_timer_remain_msec(struct thread *thread)
660 {
661 int64_t remain;
662
663 pthread_mutex_lock(&thread->mtx);
664 {
665 remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
666 }
667 pthread_mutex_unlock(&thread->mtx);
668
669 return remain < 0 ? 0 : remain;
670 }
671
672 /* Return remain time in seconds. */
673 unsigned long thread_timer_remain_second(struct thread *thread)
674 {
675 return thread_timer_remain_msec(thread) / 1000LL;
676 }
677
678 #define debugargdef const char *funcname, const char *schedfrom, int fromln
679 #define debugargpass funcname, schedfrom, fromln
680
681 struct timeval thread_timer_remain(struct thread *thread)
682 {
683 struct timeval remain;
684 pthread_mutex_lock(&thread->mtx);
685 {
686 monotime_until(&thread->u.sands, &remain);
687 }
688 pthread_mutex_unlock(&thread->mtx);
689 return remain;
690 }
691
692 /* Get new thread. */
693 static struct thread *thread_get(struct thread_master *m, uint8_t type,
694 int (*func)(struct thread *), void *arg,
695 debugargdef)
696 {
697 struct thread *thread = thread_trim_head(&m->unuse);
698 struct cpu_thread_history tmp;
699
700 if (!thread) {
701 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
702 /* mutex only needs to be initialized at struct creation. */
703 pthread_mutex_init(&thread->mtx, NULL);
704 m->alloc++;
705 }
706
707 thread->type = type;
708 thread->add_type = type;
709 thread->master = m;
710 thread->arg = arg;
711 thread->index = -1;
712 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
713 thread->ref = NULL;
714
715 /*
716 * So if the passed in funcname is not what we have
717 * stored that means the thread->hist needs to be
718 * updated. We keep the last one around in unused
719 * under the assumption that we are probably
720 * going to immediately allocate the same
721 * type of thread.
722 * This hopefully saves us some serious
723 * hash_get lookups.
724 */
725 if (thread->funcname != funcname || thread->func != func) {
726 tmp.func = func;
727 tmp.funcname = funcname;
728 thread->hist =
729 hash_get(m->cpu_record, &tmp,
730 (void *(*)(void *))cpu_record_hash_alloc);
731 }
732 thread->hist->total_active++;
733 thread->func = func;
734 thread->funcname = funcname;
735 thread->schedfrom = schedfrom;
736 thread->schedfrom_line = fromln;
737
738 return thread;
739 }
740
741 static void thread_free(struct thread_master *master, struct thread *thread)
742 {
743 /* Update statistics. */
744 assert(master->alloc > 0);
745 master->alloc--;
746
747 /* Free allocated resources. */
748 pthread_mutex_destroy(&thread->mtx);
749 XFREE(MTYPE_THREAD, thread);
750 }
751
752 static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
753 nfds_t count, const struct timeval *timer_wait)
754 {
755 /* If timer_wait is null here, that means poll() should block
756 * indefinitely,
757 * unless the thread_master has overriden it by setting
758 * ->selectpoll_timeout.
759 * If the value is positive, it specifies the maximum number of
760 * milliseconds
761 * to wait. If the timeout is -1, it specifies that we should never wait
762 * and
763 * always return immediately even if no event is detected. If the value
764 * is
765 * zero, the behavior is default. */
766 int timeout = -1;
767
768 /* number of file descriptors with events */
769 int num;
770
771 if (timer_wait != NULL
772 && m->selectpoll_timeout == 0) // use the default value
773 timeout = (timer_wait->tv_sec * 1000)
774 + (timer_wait->tv_usec / 1000);
775 else if (m->selectpoll_timeout > 0) // use the user's timeout
776 timeout = m->selectpoll_timeout;
777 else if (m->selectpoll_timeout
778 < 0) // effect a poll (return immediately)
779 timeout = 0;
780
781 /* add poll pipe poker */
782 assert(count + 1 < pfdsize);
783 pfds[count].fd = m->io_pipe[0];
784 pfds[count].events = POLLIN;
785 pfds[count].revents = 0x00;
786
787 num = poll(pfds, count + 1, timeout);
788
789 unsigned char trash[64];
790 if (num > 0 && pfds[count].revents != 0 && num--)
791 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
792 ;
793
794 return num;
795 }
796
797 /* Add new read thread. */
798 struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
799 int (*func)(struct thread *),
800 void *arg, int fd,
801 struct thread **t_ptr,
802 debugargdef)
803 {
804 struct thread *thread = NULL;
805
806 assert(fd >= 0 && fd < m->fd_limit);
807 pthread_mutex_lock(&m->mtx);
808 {
809 if (t_ptr
810 && *t_ptr) // thread is already scheduled; don't reschedule
811 {
812 pthread_mutex_unlock(&m->mtx);
813 return NULL;
814 }
815
816 /* default to a new pollfd */
817 nfds_t queuepos = m->handler.pfdcount;
818
819 /* if we already have a pollfd for our file descriptor, find and
820 * use it */
821 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
822 if (m->handler.pfds[i].fd == fd) {
823 queuepos = i;
824 break;
825 }
826
827 /* make sure we have room for this fd + pipe poker fd */
828 assert(queuepos + 1 < m->handler.pfdsize);
829
830 thread = thread_get(m, dir, func, arg, debugargpass);
831
832 m->handler.pfds[queuepos].fd = fd;
833 m->handler.pfds[queuepos].events |=
834 (dir == THREAD_READ ? POLLIN : POLLOUT);
835
836 if (queuepos == m->handler.pfdcount)
837 m->handler.pfdcount++;
838
839 if (thread) {
840 pthread_mutex_lock(&thread->mtx);
841 {
842 thread->u.fd = fd;
843 if (dir == THREAD_READ)
844 m->read[thread->u.fd] = thread;
845 else
846 m->write[thread->u.fd] = thread;
847 }
848 pthread_mutex_unlock(&thread->mtx);
849
850 if (t_ptr) {
851 *t_ptr = thread;
852 thread->ref = t_ptr;
853 }
854 }
855
856 AWAKEN(m);
857 }
858 pthread_mutex_unlock(&m->mtx);
859
860 return thread;
861 }
862
863 static struct thread *
864 funcname_thread_add_timer_timeval(struct thread_master *m,
865 int (*func)(struct thread *), int type,
866 void *arg, struct timeval *time_relative,
867 struct thread **t_ptr, debugargdef)
868 {
869 struct thread *thread;
870 struct pqueue *queue;
871
872 assert(m != NULL);
873
874 assert(type == THREAD_TIMER);
875 assert(time_relative);
876
877 pthread_mutex_lock(&m->mtx);
878 {
879 if (t_ptr
880 && *t_ptr) // thread is already scheduled; don't reschedule
881 {
882 pthread_mutex_unlock(&m->mtx);
883 return NULL;
884 }
885
886 queue = m->timer;
887 thread = thread_get(m, type, func, arg, debugargpass);
888
889 pthread_mutex_lock(&thread->mtx);
890 {
891 monotime(&thread->u.sands);
892 timeradd(&thread->u.sands, time_relative,
893 &thread->u.sands);
894 pqueue_enqueue(thread, queue);
895 if (t_ptr) {
896 *t_ptr = thread;
897 thread->ref = t_ptr;
898 }
899 }
900 pthread_mutex_unlock(&thread->mtx);
901
902 AWAKEN(m);
903 }
904 pthread_mutex_unlock(&m->mtx);
905
906 return thread;
907 }
908
909
910 /* Add timer event thread. */
911 struct thread *funcname_thread_add_timer(struct thread_master *m,
912 int (*func)(struct thread *),
913 void *arg, long timer,
914 struct thread **t_ptr, debugargdef)
915 {
916 struct timeval trel;
917
918 assert(m != NULL);
919
920 trel.tv_sec = timer;
921 trel.tv_usec = 0;
922
923 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
924 &trel, t_ptr, debugargpass);
925 }
926
927 /* Add timer event thread with "millisecond" resolution */
928 struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
929 int (*func)(struct thread *),
930 void *arg, long timer,
931 struct thread **t_ptr,
932 debugargdef)
933 {
934 struct timeval trel;
935
936 assert(m != NULL);
937
938 trel.tv_sec = timer / 1000;
939 trel.tv_usec = 1000 * (timer % 1000);
940
941 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
942 &trel, t_ptr, debugargpass);
943 }
944
945 /* Add timer event thread with "millisecond" resolution */
946 struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
947 int (*func)(struct thread *),
948 void *arg, struct timeval *tv,
949 struct thread **t_ptr, debugargdef)
950 {
951 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
952 t_ptr, debugargpass);
953 }
954
955 /* Add simple event thread. */
956 struct thread *funcname_thread_add_event(struct thread_master *m,
957 int (*func)(struct thread *),
958 void *arg, int val,
959 struct thread **t_ptr, debugargdef)
960 {
961 struct thread *thread;
962
963 assert(m != NULL);
964
965 pthread_mutex_lock(&m->mtx);
966 {
967 if (t_ptr
968 && *t_ptr) // thread is already scheduled; don't reschedule
969 {
970 pthread_mutex_unlock(&m->mtx);
971 return NULL;
972 }
973
974 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
975 pthread_mutex_lock(&thread->mtx);
976 {
977 thread->u.val = val;
978 thread_list_add(&m->event, thread);
979 }
980 pthread_mutex_unlock(&thread->mtx);
981
982 if (t_ptr) {
983 *t_ptr = thread;
984 thread->ref = t_ptr;
985 }
986
987 AWAKEN(m);
988 }
989 pthread_mutex_unlock(&m->mtx);
990
991 return thread;
992 }
993
994 /* Thread cancellation ------------------------------------------------------ */
995
996 /**
997 * NOT's out the .events field of pollfd corresponding to the given file
998 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
999 *
1000 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
1001 * implementation for details.
1002 *
1003 * @param master
1004 * @param fd
1005 * @param state the event to cancel. One or more (OR'd together) of the
1006 * following:
1007 * - POLLIN
1008 * - POLLOUT
1009 */
1010 static void thread_cancel_rw(struct thread_master *master, int fd, short state)
1011 {
1012 bool found = false;
1013
1014 /* Cancel POLLHUP too just in case some bozo set it */
1015 state |= POLLHUP;
1016
1017 /* find the index of corresponding pollfd */
1018 nfds_t i;
1019
1020 for (i = 0; i < master->handler.pfdcount; i++)
1021 if (master->handler.pfds[i].fd == fd) {
1022 found = true;
1023 break;
1024 }
1025
1026 if (!found) {
1027 zlog_debug(
1028 "[!] Received cancellation request for nonexistent rw job");
1029 zlog_debug("[!] threadmaster: %s | fd: %d",
1030 master->name ? master->name : "", fd);
1031 return;
1032 }
1033
1034 /* NOT out event. */
1035 master->handler.pfds[i].events &= ~(state);
1036
1037 /* If all events are canceled, delete / resize the pollfd array. */
1038 if (master->handler.pfds[i].events == 0) {
1039 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
1040 (master->handler.pfdcount - i - 1)
1041 * sizeof(struct pollfd));
1042 master->handler.pfdcount--;
1043 }
1044
1045 /* If we have the same pollfd in the copy, perform the same operations,
1046 * otherwise return. */
1047 if (i >= master->handler.copycount)
1048 return;
1049
1050 master->handler.copy[i].events &= ~(state);
1051
1052 if (master->handler.copy[i].events == 0) {
1053 memmove(master->handler.copy + i, master->handler.copy + i + 1,
1054 (master->handler.copycount - i - 1)
1055 * sizeof(struct pollfd));
1056 master->handler.copycount--;
1057 }
1058 }
1059
1060 /**
1061 * Process cancellation requests.
1062 *
1063 * This may only be run from the pthread which owns the thread_master.
1064 *
1065 * @param master the thread master to process
1066 * @REQUIRE master->mtx
1067 */
1068 static void do_thread_cancel(struct thread_master *master)
1069 {
1070 struct thread_list *list = NULL;
1071 struct pqueue *queue = NULL;
1072 struct thread **thread_array = NULL;
1073 struct thread *thread;
1074
1075 struct cancel_req *cr;
1076 struct listnode *ln;
1077 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1078 /* If this is an event object cancellation, linear search
1079 * through event
1080 * list deleting any events which have the specified argument.
1081 * We also
1082 * need to check every thread in the ready queue. */
1083 if (cr->eventobj) {
1084 struct thread *t;
1085 thread = master->event.head;
1086
1087 while (thread) {
1088 t = thread;
1089 thread = t->next;
1090
1091 if (t->arg == cr->eventobj) {
1092 thread_list_delete(&master->event, t);
1093 if (t->ref)
1094 *t->ref = NULL;
1095 thread_add_unuse(master, t);
1096 }
1097 }
1098
1099 thread = master->ready.head;
1100 while (thread) {
1101 t = thread;
1102 thread = t->next;
1103
1104 if (t->arg == cr->eventobj) {
1105 thread_list_delete(&master->ready, t);
1106 if (t->ref)
1107 *t->ref = NULL;
1108 thread_add_unuse(master, t);
1109 }
1110 }
1111 continue;
1112 }
1113
1114 /* The pointer varies depending on whether the cancellation
1115 * request was
1116 * made asynchronously or not. If it was, we need to check
1117 * whether the
1118 * thread even exists anymore before cancelling it. */
1119 thread = (cr->thread) ? cr->thread : *cr->threadref;
1120
1121 if (!thread)
1122 continue;
1123
1124 /* Determine the appropriate queue to cancel the thread from */
1125 switch (thread->type) {
1126 case THREAD_READ:
1127 thread_cancel_rw(master, thread->u.fd, POLLIN);
1128 thread_array = master->read;
1129 break;
1130 case THREAD_WRITE:
1131 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1132 thread_array = master->write;
1133 break;
1134 case THREAD_TIMER:
1135 queue = master->timer;
1136 break;
1137 case THREAD_EVENT:
1138 list = &master->event;
1139 break;
1140 case THREAD_READY:
1141 list = &master->ready;
1142 break;
1143 default:
1144 continue;
1145 break;
1146 }
1147
1148 if (queue) {
1149 assert(thread->index >= 0);
1150 assert(thread == queue->array[thread->index]);
1151 pqueue_remove_at(thread->index, queue);
1152 } else if (list) {
1153 thread_list_delete(list, thread);
1154 } else if (thread_array) {
1155 thread_array[thread->u.fd] = NULL;
1156 } else {
1157 assert(!"Thread should be either in queue or list or array!");
1158 }
1159
1160 if (thread->ref)
1161 *thread->ref = NULL;
1162
1163 thread_add_unuse(thread->master, thread);
1164 }
1165
1166 /* Delete and free all cancellation requests */
1167 list_delete_all_node(master->cancel_req);
1168
1169 /* Wake up any threads which may be blocked in thread_cancel_async() */
1170 master->canceled = true;
1171 pthread_cond_broadcast(&master->cancel_cond);
1172 }
1173
1174 /**
1175 * Cancel any events which have the specified argument.
1176 *
1177 * MT-Unsafe
1178 *
1179 * @param m the thread_master to cancel from
1180 * @param arg the argument passed when creating the event
1181 */
1182 void thread_cancel_event(struct thread_master *master, void *arg)
1183 {
1184 assert(master->owner == pthread_self());
1185
1186 pthread_mutex_lock(&master->mtx);
1187 {
1188 struct cancel_req *cr =
1189 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1190 cr->eventobj = arg;
1191 listnode_add(master->cancel_req, cr);
1192 do_thread_cancel(master);
1193 }
1194 pthread_mutex_unlock(&master->mtx);
1195 }
1196
1197 /**
1198 * Cancel a specific task.
1199 *
1200 * MT-Unsafe
1201 *
1202 * @param thread task to cancel
1203 */
1204 void thread_cancel(struct thread *thread)
1205 {
1206 struct thread_master *master = thread->master;
1207
1208 assert(master->owner == pthread_self());
1209
1210 pthread_mutex_lock(&master->mtx);
1211 {
1212 struct cancel_req *cr =
1213 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1214 cr->thread = thread;
1215 listnode_add(master->cancel_req, cr);
1216 do_thread_cancel(master);
1217 }
1218 pthread_mutex_unlock(&master->mtx);
1219 }
1220
1221 /**
1222 * Asynchronous cancellation.
1223 *
1224 * Called with either a struct thread ** or void * to an event argument,
1225 * this function posts the correct cancellation request and blocks until it is
1226 * serviced.
1227 *
1228 * If the thread is currently running, execution blocks until it completes.
1229 *
1230 * The last two parameters are mutually exclusive, i.e. if you pass one the
1231 * other must be NULL.
1232 *
1233 * When the cancellation procedure executes on the target thread_master, the
1234 * thread * provided is checked for nullity. If it is null, the thread is
1235 * assumed to no longer exist and the cancellation request is a no-op. Thus
1236 * users of this API must pass a back-reference when scheduling the original
1237 * task.
1238 *
1239 * MT-Safe
1240 *
1241 * @param master the thread master with the relevant event / task
1242 * @param thread pointer to thread to cancel
1243 * @param eventobj the event
1244 */
1245 void thread_cancel_async(struct thread_master *master, struct thread **thread,
1246 void *eventobj)
1247 {
1248 assert(!(thread && eventobj) && (thread || eventobj));
1249 assert(master->owner != pthread_self());
1250
1251 pthread_mutex_lock(&master->mtx);
1252 {
1253 master->canceled = false;
1254
1255 if (thread) {
1256 struct cancel_req *cr =
1257 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1258 cr->threadref = thread;
1259 listnode_add(master->cancel_req, cr);
1260 } else if (eventobj) {
1261 struct cancel_req *cr =
1262 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1263 cr->eventobj = eventobj;
1264 listnode_add(master->cancel_req, cr);
1265 }
1266 AWAKEN(master);
1267
1268 while (!master->canceled)
1269 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1270 }
1271 pthread_mutex_unlock(&master->mtx);
1272 }
1273 /* ------------------------------------------------------------------------- */
1274
1275 static struct timeval *thread_timer_wait(struct pqueue *queue,
1276 struct timeval *timer_val)
1277 {
1278 if (queue->size) {
1279 struct thread *next_timer = queue->array[0];
1280 monotime_until(&next_timer->u.sands, timer_val);
1281 return timer_val;
1282 }
1283 return NULL;
1284 }
1285
1286 static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1287 struct thread *fetch)
1288 {
1289 *fetch = *thread;
1290 thread_add_unuse(m, thread);
1291 return fetch;
1292 }
1293
1294 static int thread_process_io_helper(struct thread_master *m,
1295 struct thread *thread, short state, int pos)
1296 {
1297 struct thread **thread_array;
1298
1299 if (!thread)
1300 return 0;
1301
1302 if (thread->type == THREAD_READ)
1303 thread_array = m->read;
1304 else
1305 thread_array = m->write;
1306
1307 thread_array[thread->u.fd] = NULL;
1308 thread_list_add(&m->ready, thread);
1309 thread->type = THREAD_READY;
1310 /* if another pthread scheduled this file descriptor for the event we're
1311 * responding to, no problem; we're getting to it now */
1312 thread->master->handler.pfds[pos].events &= ~(state);
1313 return 1;
1314 }
1315
1316 /**
1317 * Process I/O events.
1318 *
1319 * Walks through file descriptor array looking for those pollfds whose .revents
1320 * field has something interesting. Deletes any invalid file descriptors.
1321 *
1322 * @param m the thread master
1323 * @param num the number of active file descriptors (return value of poll())
1324 */
1325 static void thread_process_io(struct thread_master *m, unsigned int num)
1326 {
1327 unsigned int ready = 0;
1328 struct pollfd *pfds = m->handler.copy;
1329
1330 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1331 /* no event for current fd? immediately continue */
1332 if (pfds[i].revents == 0)
1333 continue;
1334
1335 ready++;
1336
1337 /* Unless someone has called thread_cancel from another pthread,
1338 * the only
1339 * thing that could have changed in m->handler.pfds while we
1340 * were
1341 * asleep is the .events field in a given pollfd. Barring
1342 * thread_cancel()
1343 * that value should be a superset of the values we have in our
1344 * copy, so
1345 * there's no need to update it. Similarily, barring deletion,
1346 * the fd
1347 * should still be a valid index into the master's pfds. */
1348 if (pfds[i].revents & (POLLIN | POLLHUP))
1349 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
1350 i);
1351 if (pfds[i].revents & POLLOUT)
1352 thread_process_io_helper(m, m->write[pfds[i].fd],
1353 POLLOUT, i);
1354
1355 /* if one of our file descriptors is garbage, remove the same
1356 * from
1357 * both pfds + update sizes and index */
1358 if (pfds[i].revents & POLLNVAL) {
1359 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1360 (m->handler.pfdcount - i - 1)
1361 * sizeof(struct pollfd));
1362 m->handler.pfdcount--;
1363
1364 memmove(pfds + i, pfds + i + 1,
1365 (m->handler.copycount - i - 1)
1366 * sizeof(struct pollfd));
1367 m->handler.copycount--;
1368
1369 i--;
1370 }
1371 }
1372 }
1373
1374 /* Add all timers that have popped to the ready list. */
1375 static unsigned int thread_process_timers(struct pqueue *queue,
1376 struct timeval *timenow)
1377 {
1378 struct thread *thread;
1379 unsigned int ready = 0;
1380
1381 while (queue->size) {
1382 thread = queue->array[0];
1383 if (timercmp(timenow, &thread->u.sands, <))
1384 return ready;
1385 pqueue_dequeue(queue);
1386 thread->type = THREAD_READY;
1387 thread_list_add(&thread->master->ready, thread);
1388 ready++;
1389 }
1390 return ready;
1391 }
1392
1393 /* process a list en masse, e.g. for event thread lists */
1394 static unsigned int thread_process(struct thread_list *list)
1395 {
1396 struct thread *thread;
1397 struct thread *next;
1398 unsigned int ready = 0;
1399
1400 for (thread = list->head; thread; thread = next) {
1401 next = thread->next;
1402 thread_list_delete(list, thread);
1403 thread->type = THREAD_READY;
1404 thread_list_add(&thread->master->ready, thread);
1405 ready++;
1406 }
1407 return ready;
1408 }
1409
1410
1411 /* Fetch next ready thread. */
1412 struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
1413 {
1414 struct thread *thread = NULL;
1415 struct timeval now;
1416 struct timeval zerotime = {0, 0};
1417 struct timeval tv;
1418 struct timeval *tw = NULL;
1419
1420 int num = 0;
1421
1422 do {
1423 /* Handle signals if any */
1424 if (m->handle_signals)
1425 quagga_sigevent_process();
1426
1427 pthread_mutex_lock(&m->mtx);
1428
1429 /* Process any pending cancellation requests */
1430 do_thread_cancel(m);
1431
1432 /*
1433 * Attempt to flush ready queue before going into poll().
1434 * This is performance-critical. Think twice before modifying.
1435 */
1436 if ((thread = thread_trim_head(&m->ready))) {
1437 fetch = thread_run(m, thread, fetch);
1438 if (fetch->ref)
1439 *fetch->ref = NULL;
1440 pthread_mutex_unlock(&m->mtx);
1441 break;
1442 }
1443
1444 /* otherwise, tick through scheduling sequence */
1445
1446 /*
1447 * Post events to ready queue. This must come before the
1448 * following block since events should occur immediately
1449 */
1450 thread_process(&m->event);
1451
1452 /*
1453 * If there are no tasks on the ready queue, we will poll()
1454 * until a timer expires or we receive I/O, whichever comes
1455 * first. The strategy for doing this is:
1456 *
1457 * - If there are events pending, set the poll() timeout to zero
1458 * - If there are no events pending, but there are timers
1459 * pending, set the
1460 * timeout to the smallest remaining time on any timer
1461 * - If there are neither timers nor events pending, but there
1462 * are file
1463 * descriptors pending, block indefinitely in poll()
1464 * - If nothing is pending, it's time for the application to die
1465 *
1466 * In every case except the last, we need to hit poll() at least
1467 * once per loop to avoid starvation by events
1468 */
1469 if (m->ready.count == 0)
1470 tw = thread_timer_wait(m->timer, &tv);
1471
1472 if (m->ready.count != 0 || (tw && !timercmp(tw, &zerotime, >)))
1473 tw = &zerotime;
1474
1475 if (!tw && m->handler.pfdcount == 0) { /* die */
1476 pthread_mutex_unlock(&m->mtx);
1477 fetch = NULL;
1478 break;
1479 }
1480
1481 /*
1482 * Copy pollfd array + # active pollfds in it. Not necessary to
1483 * copy the array size as this is fixed.
1484 */
1485 m->handler.copycount = m->handler.pfdcount;
1486 memcpy(m->handler.copy, m->handler.pfds,
1487 m->handler.copycount * sizeof(struct pollfd));
1488
1489 pthread_mutex_unlock(&m->mtx);
1490 {
1491 num = fd_poll(m, m->handler.copy, m->handler.pfdsize,
1492 m->handler.copycount, tw);
1493 }
1494 pthread_mutex_lock(&m->mtx);
1495
1496 /* Handle any errors received in poll() */
1497 if (num < 0) {
1498 if (errno == EINTR) {
1499 pthread_mutex_unlock(&m->mtx);
1500 /* loop around to signal handler */
1501 continue;
1502 }
1503
1504 /* else die */
1505 flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
1506 safe_strerror(errno));
1507 pthread_mutex_unlock(&m->mtx);
1508 fetch = NULL;
1509 break;
1510 }
1511
1512 /* Post timers to ready queue. */
1513 monotime(&now);
1514 thread_process_timers(m->timer, &now);
1515
1516 /* Post I/O to ready queue. */
1517 if (num > 0)
1518 thread_process_io(m, num);
1519
1520 pthread_mutex_unlock(&m->mtx);
1521
1522 } while (!thread && m->spin);
1523
1524 return fetch;
1525 }
1526
1527 static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
1528 {
1529 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1530 + (a.tv_usec - b.tv_usec));
1531 }
1532
1533 unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1534 unsigned long *cputime)
1535 {
1536 /* This is 'user + sys' time. */
1537 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1538 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1539 return timeval_elapsed(now->real, start->real);
1540 }
1541
1542 /* We should aim to yield after yield milliseconds, which defaults
1543 to THREAD_YIELD_TIME_SLOT .
1544 Note: we are using real (wall clock) time for this calculation.
1545 It could be argued that CPU time may make more sense in certain
1546 contexts. The things to consider are whether the thread may have
1547 blocked (in which case wall time increases, but CPU time does not),
1548 or whether the system is heavily loaded with other processes competing
1549 for CPU time. On balance, wall clock time seems to make sense.
1550 Plus it has the added benefit that gettimeofday should be faster
1551 than calling getrusage. */
1552 int thread_should_yield(struct thread *thread)
1553 {
1554 int result;
1555 pthread_mutex_lock(&thread->mtx);
1556 {
1557 result = monotime_since(&thread->real, NULL)
1558 > (int64_t)thread->yield;
1559 }
1560 pthread_mutex_unlock(&thread->mtx);
1561 return result;
1562 }
1563
1564 void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
1565 {
1566 pthread_mutex_lock(&thread->mtx);
1567 {
1568 thread->yield = yield_time;
1569 }
1570 pthread_mutex_unlock(&thread->mtx);
1571 }
1572
1573 void thread_getrusage(RUSAGE_T *r)
1574 {
1575 monotime(&r->real);
1576 getrusage(RUSAGE_SELF, &(r->cpu));
1577 }
1578
1579 /*
1580 * Call a thread.
1581 *
1582 * This function will atomically update the thread's usage history. At present
1583 * this is the only spot where usage history is written. Nevertheless the code
1584 * has been written such that the introduction of writers in the future should
1585 * not need to update it provided the writers atomically perform only the
1586 * operations done here, i.e. updating the total and maximum times. In
1587 * particular, the maximum real and cpu times must be monotonically increasing
1588 * or this code is not correct.
1589 */
1590 void thread_call(struct thread *thread)
1591 {
1592 _Atomic unsigned long realtime, cputime;
1593 unsigned long exp;
1594 unsigned long helper;
1595 RUSAGE_T before, after;
1596
1597 GETRUSAGE(&before);
1598 thread->real = before.real;
1599
1600 pthread_setspecific(thread_current, thread);
1601 (*thread->func)(thread);
1602 pthread_setspecific(thread_current, NULL);
1603
1604 GETRUSAGE(&after);
1605
1606 realtime = thread_consumed_time(&after, &before, &helper);
1607 cputime = helper;
1608
1609 /* update realtime */
1610 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1611 memory_order_seq_cst);
1612 exp = atomic_load_explicit(&thread->hist->real.max,
1613 memory_order_seq_cst);
1614 while (exp < realtime
1615 && !atomic_compare_exchange_weak_explicit(
1616 &thread->hist->real.max, &exp, realtime,
1617 memory_order_seq_cst, memory_order_seq_cst))
1618 ;
1619
1620 /* update cputime */
1621 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1622 memory_order_seq_cst);
1623 exp = atomic_load_explicit(&thread->hist->cpu.max,
1624 memory_order_seq_cst);
1625 while (exp < cputime
1626 && !atomic_compare_exchange_weak_explicit(
1627 &thread->hist->cpu.max, &exp, cputime,
1628 memory_order_seq_cst, memory_order_seq_cst))
1629 ;
1630
1631 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1632 memory_order_seq_cst);
1633 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1634 memory_order_seq_cst);
1635
1636 #ifdef CONSUMED_TIME_CHECK
1637 if (realtime > CONSUMED_TIME_CHECK) {
1638 /*
1639 * We have a CPU Hog on our hands.
1640 * Whinge about it now, so we're aware this is yet another task
1641 * to fix.
1642 */
1643 flog_warn(
1644 EC_LIB_SLOW_THREAD,
1645 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1646 thread->funcname, (unsigned long)thread->func,
1647 realtime / 1000, cputime / 1000);
1648 }
1649 #endif /* CONSUMED_TIME_CHECK */
1650 }
1651
1652 /* Execute thread */
1653 void funcname_thread_execute(struct thread_master *m,
1654 int (*func)(struct thread *), void *arg, int val,
1655 debugargdef)
1656 {
1657 struct thread *thread;
1658
1659 /* Get or allocate new thread to execute. */
1660 pthread_mutex_lock(&m->mtx);
1661 {
1662 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
1663
1664 /* Set its event value. */
1665 pthread_mutex_lock(&thread->mtx);
1666 {
1667 thread->add_type = THREAD_EXECUTE;
1668 thread->u.val = val;
1669 thread->ref = &thread;
1670 }
1671 pthread_mutex_unlock(&thread->mtx);
1672 }
1673 pthread_mutex_unlock(&m->mtx);
1674
1675 /* Execute thread doing all accounting. */
1676 thread_call(thread);
1677
1678 /* Give back or free thread. */
1679 thread_add_unuse(m, thread);
1680 }