]> git.proxmox.com Git - mirror_frr.git/blame - lib/thread.c
zebra: don't close client socket from I/O pthread
[mirror_frr.git] / lib / thread.c
CommitLineData
718e3744 1/* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
896014f4
DL
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
718e3744 19 */
20
21/* #define DEBUG */
22
23#include <zebra.h>
308d14ae 24#include <sys/resource.h>
718e3744 25
26#include "thread.h"
27#include "memory.h"
28#include "log.h"
e04ab74d 29#include "hash.h"
4becea72 30#include "pqueue.h"
e04ab74d 31#include "command.h"
05c447dd 32#include "sigevent.h"
3bf2673b 33#include "network.h"
bd74dc61 34#include "jhash.h"
fbcac826 35#include "frratomic.h"
d6be5fb9 36
d62a17ae 37DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
4a1ab8e4 38DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
d62a17ae 39DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
4a1ab8e4 40
3b96b781
HT
41#if defined(__APPLE__)
42#include <mach/mach.h>
43#include <mach/mach_time.h>
44#endif
45
d62a17ae 46#define AWAKEN(m) \
47 do { \
48 static unsigned char wakebyte = 0x01; \
49 write(m->io_pipe[1], &wakebyte, 1); \
50 } while (0);
3bf2673b 51
62f44022 52/* control variable for initializer */
e0bebc7c 53pthread_once_t init_once = PTHREAD_ONCE_INIT;
e0bebc7c 54pthread_key_t thread_current;
6b0655a2 55
62f44022
QY
56pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
57static struct list *masters;
58
6b0655a2 59
62f44022 60/* CLI start ---------------------------------------------------------------- */
d62a17ae 61static unsigned int cpu_record_hash_key(struct cpu_thread_history *a)
e04ab74d 62{
883cc51d 63 int size = sizeof(a->func);
bd74dc61
DS
64
65 return jhash(&a->func, size, 0);
e04ab74d 66}
67
d62a17ae 68static int cpu_record_hash_cmp(const struct cpu_thread_history *a,
69 const struct cpu_thread_history *b)
e04ab74d 70{
d62a17ae 71 return a->func == b->func;
e04ab74d 72}
73
d62a17ae 74static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
e04ab74d 75{
d62a17ae 76 struct cpu_thread_history *new;
77 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
78 new->func = a->func;
79 new->funcname = a->funcname;
80 return new;
e04ab74d 81}
82
d62a17ae 83static void cpu_record_hash_free(void *a)
228da428 84{
d62a17ae 85 struct cpu_thread_history *hist = a;
86
87 XFREE(MTYPE_THREAD_STATS, hist);
228da428
CC
88}
89
d62a17ae 90static void vty_out_cpu_thread_history(struct vty *vty,
91 struct cpu_thread_history *a)
e04ab74d 92{
d1667f53 93 vty_out(vty, "%5d %10lu.%03lu %9u %8lu %9lu %8lu %9lu", a->total_active,
d62a17ae 94 a->cpu.total / 1000, a->cpu.total % 1000, a->total_calls,
95 a->cpu.total / a->total_calls, a->cpu.max,
96 a->real.total / a->total_calls, a->real.max);
97 vty_out(vty, " %c%c%c%c%c %s\n",
98 a->types & (1 << THREAD_READ) ? 'R' : ' ',
99 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
100 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
101 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
102 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
e04ab74d 103}
104
d62a17ae 105static void cpu_record_hash_print(struct hash_backet *bucket, void *args[])
e04ab74d 106{
d62a17ae 107 struct cpu_thread_history *totals = args[0];
fbcac826 108 struct cpu_thread_history copy;
d62a17ae 109 struct vty *vty = args[1];
fbcac826 110 uint8_t *filter = args[2];
d62a17ae 111
112 struct cpu_thread_history *a = bucket->data;
113
fbcac826
QY
114 copy.total_active =
115 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
116 copy.total_calls =
117 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
118 copy.cpu.total =
119 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
120 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
121 copy.real.total =
122 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
123 copy.real.max =
124 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
125 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
126 copy.funcname = a->funcname;
127
128 if (!(copy.types & *filter))
d62a17ae 129 return;
fbcac826
QY
130
131 vty_out_cpu_thread_history(vty, &copy);
132 totals->total_active += copy.total_active;
133 totals->total_calls += copy.total_calls;
134 totals->real.total += copy.real.total;
135 if (totals->real.max < copy.real.max)
136 totals->real.max = copy.real.max;
137 totals->cpu.total += copy.cpu.total;
138 if (totals->cpu.max < copy.cpu.max)
139 totals->cpu.max = copy.cpu.max;
e04ab74d 140}
141
fbcac826 142static void cpu_record_print(struct vty *vty, uint8_t filter)
e04ab74d 143{
d62a17ae 144 struct cpu_thread_history tmp;
145 void *args[3] = {&tmp, vty, &filter};
146 struct thread_master *m;
147 struct listnode *ln;
148
149 memset(&tmp, 0, sizeof tmp);
150 tmp.funcname = "TOTAL";
151 tmp.types = filter;
152
153 pthread_mutex_lock(&masters_mtx);
154 {
155 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
156 const char *name = m->name ? m->name : "main";
157
158 char underline[strlen(name) + 1];
159 memset(underline, '-', sizeof(underline));
4f113d60 160 underline[sizeof(underline) - 1] = '\0';
d62a17ae 161
162 vty_out(vty, "\n");
163 vty_out(vty, "Showing statistics for pthread %s\n",
164 name);
165 vty_out(vty, "-------------------------------%s\n",
166 underline);
167 vty_out(vty, "%21s %18s %18s\n", "",
168 "CPU (user+system):", "Real (wall-clock):");
169 vty_out(vty,
170 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
171 vty_out(vty, " Avg uSec Max uSecs");
172 vty_out(vty, " Type Thread\n");
173
174 if (m->cpu_record->count)
175 hash_iterate(
176 m->cpu_record,
177 (void (*)(struct hash_backet *,
178 void *))cpu_record_hash_print,
179 args);
180 else
181 vty_out(vty, "No data to display yet.\n");
182
183 vty_out(vty, "\n");
184 }
185 }
186 pthread_mutex_unlock(&masters_mtx);
187
188 vty_out(vty, "\n");
189 vty_out(vty, "Total thread statistics\n");
190 vty_out(vty, "-------------------------\n");
191 vty_out(vty, "%21s %18s %18s\n", "",
192 "CPU (user+system):", "Real (wall-clock):");
193 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
194 vty_out(vty, " Avg uSec Max uSecs");
195 vty_out(vty, " Type Thread\n");
196
197 if (tmp.total_calls > 0)
198 vty_out_cpu_thread_history(vty, &tmp);
e04ab74d 199}
200
d62a17ae 201static void cpu_record_hash_clear(struct hash_backet *bucket, void *args[])
e276eb82 202{
fbcac826 203 uint8_t *filter = args[0];
d62a17ae 204 struct hash *cpu_record = args[1];
205
206 struct cpu_thread_history *a = bucket->data;
62f44022 207
d62a17ae 208 if (!(a->types & *filter))
209 return;
f48f65d2 210
d62a17ae 211 hash_release(cpu_record, bucket->data);
e276eb82
PJ
212}
213
fbcac826 214static void cpu_record_clear(uint8_t filter)
e276eb82 215{
fbcac826 216 uint8_t *tmp = &filter;
d62a17ae 217 struct thread_master *m;
218 struct listnode *ln;
219
220 pthread_mutex_lock(&masters_mtx);
221 {
222 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
223 pthread_mutex_lock(&m->mtx);
224 {
225 void *args[2] = {tmp, m->cpu_record};
226 hash_iterate(
227 m->cpu_record,
228 (void (*)(struct hash_backet *,
229 void *))cpu_record_hash_clear,
230 args);
231 }
232 pthread_mutex_unlock(&m->mtx);
233 }
234 }
235 pthread_mutex_unlock(&masters_mtx);
62f44022
QY
236}
237
fbcac826 238static uint8_t parse_filter(const char *filterstr)
62f44022 239{
d62a17ae 240 int i = 0;
241 int filter = 0;
242
243 while (filterstr[i] != '\0') {
244 switch (filterstr[i]) {
245 case 'r':
246 case 'R':
247 filter |= (1 << THREAD_READ);
248 break;
249 case 'w':
250 case 'W':
251 filter |= (1 << THREAD_WRITE);
252 break;
253 case 't':
254 case 'T':
255 filter |= (1 << THREAD_TIMER);
256 break;
257 case 'e':
258 case 'E':
259 filter |= (1 << THREAD_EVENT);
260 break;
261 case 'x':
262 case 'X':
263 filter |= (1 << THREAD_EXECUTE);
264 break;
265 default:
266 break;
267 }
268 ++i;
269 }
270 return filter;
62f44022
QY
271}
272
273DEFUN (show_thread_cpu,
274 show_thread_cpu_cmd,
275 "show thread cpu [FILTER]",
276 SHOW_STR
277 "Thread information\n"
278 "Thread CPU usage\n"
279 "Display filter (rwtexb)\n")
280{
fbcac826 281 uint8_t filter = (uint8_t)-1U;
d62a17ae 282 int idx = 0;
283
284 if (argv_find(argv, argc, "FILTER", &idx)) {
285 filter = parse_filter(argv[idx]->arg);
286 if (!filter) {
287 vty_out(vty,
288 "Invalid filter \"%s\" specified; must contain at least"
289 "one of 'RWTEXB'\n",
290 argv[idx]->arg);
291 return CMD_WARNING;
292 }
293 }
294
295 cpu_record_print(vty, filter);
296 return CMD_SUCCESS;
e276eb82
PJ
297}
298
8872626b
DS
299static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
300{
301 const char *name = m->name ? m->name : "main";
302 char underline[strlen(name) + 1];
303 uint32_t i;
304
305 memset(underline, '-', sizeof(underline));
306 underline[sizeof(underline) - 1] = '\0';
307
308 vty_out(vty, "\nShowing poll FD's for %s\n", name);
309 vty_out(vty, "----------------------%s\n", underline);
310 vty_out(vty, "Count: %u\n", (uint32_t)m->handler.pfdcount);
311 for (i = 0; i < m->handler.pfdcount; i++)
312 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\n", i,
313 m->handler.pfds[i].fd,
314 m->handler.pfds[i].events,
315 m->handler.pfds[i].revents);
316}
317
318DEFUN (show_thread_poll,
319 show_thread_poll_cmd,
320 "show thread poll",
321 SHOW_STR
322 "Thread information\n"
323 "Show poll FD's and information\n")
324{
325 struct listnode *node;
326 struct thread_master *m;
327
328 pthread_mutex_lock(&masters_mtx);
329 {
330 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
331 show_thread_poll_helper(vty, m);
332 }
333 }
334 pthread_mutex_unlock(&masters_mtx);
335
336 return CMD_SUCCESS;
337}
338
339
49d41a26
DS
340DEFUN (clear_thread_cpu,
341 clear_thread_cpu_cmd,
342 "clear thread cpu [FILTER]",
62f44022 343 "Clear stored data in all pthreads\n"
49d41a26
DS
344 "Thread information\n"
345 "Thread CPU usage\n"
346 "Display filter (rwtexb)\n")
e276eb82 347{
fbcac826 348 uint8_t filter = (uint8_t)-1U;
d62a17ae 349 int idx = 0;
350
351 if (argv_find(argv, argc, "FILTER", &idx)) {
352 filter = parse_filter(argv[idx]->arg);
353 if (!filter) {
354 vty_out(vty,
355 "Invalid filter \"%s\" specified; must contain at least"
356 "one of 'RWTEXB'\n",
357 argv[idx]->arg);
358 return CMD_WARNING;
359 }
360 }
361
362 cpu_record_clear(filter);
363 return CMD_SUCCESS;
e276eb82 364}
6b0655a2 365
d62a17ae 366void thread_cmd_init(void)
0b84f294 367{
d62a17ae 368 install_element(VIEW_NODE, &show_thread_cpu_cmd);
8872626b 369 install_element(VIEW_NODE, &show_thread_poll_cmd);
d62a17ae 370 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
0b84f294 371}
62f44022
QY
372/* CLI end ------------------------------------------------------------------ */
373
0b84f294 374
d62a17ae 375static int thread_timer_cmp(void *a, void *b)
4becea72 376{
d62a17ae 377 struct thread *thread_a = a;
378 struct thread *thread_b = b;
379
380 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, <))
381 return -1;
382 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, >))
383 return 1;
384 return 0;
4becea72
CF
385}
386
d62a17ae 387static void thread_timer_update(void *node, int actual_position)
4becea72 388{
d62a17ae 389 struct thread *thread = node;
4becea72 390
d62a17ae 391 thread->index = actual_position;
4becea72
CF
392}
393
d62a17ae 394static void cancelreq_del(void *cr)
63ccb9cb 395{
d62a17ae 396 XFREE(MTYPE_TMP, cr);
63ccb9cb
QY
397}
398
e0bebc7c 399/* initializer, only ever called once */
d62a17ae 400static void initializer()
e0bebc7c 401{
d62a17ae 402 pthread_key_create(&thread_current, NULL);
e0bebc7c
QY
403}
404
d62a17ae 405struct thread_master *thread_master_create(const char *name)
718e3744 406{
d62a17ae 407 struct thread_master *rv;
408 struct rlimit limit;
409
410 pthread_once(&init_once, &initializer);
411
412 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
413 if (rv == NULL)
414 return NULL;
415
416 /* Initialize master mutex */
417 pthread_mutex_init(&rv->mtx, NULL);
418 pthread_cond_init(&rv->cancel_cond, NULL);
419
420 /* Set name */
421 rv->name = name ? XSTRDUP(MTYPE_THREAD_MASTER, name) : NULL;
422
423 /* Initialize I/O task data structures */
424 getrlimit(RLIMIT_NOFILE, &limit);
425 rv->fd_limit = (int)limit.rlim_cur;
426 rv->read =
427 XCALLOC(MTYPE_THREAD, sizeof(struct thread *) * rv->fd_limit);
428 if (rv->read == NULL) {
429 XFREE(MTYPE_THREAD_MASTER, rv);
430 return NULL;
431 }
432 rv->write =
433 XCALLOC(MTYPE_THREAD, sizeof(struct thread *) * rv->fd_limit);
434 if (rv->write == NULL) {
435 XFREE(MTYPE_THREAD, rv->read);
436 XFREE(MTYPE_THREAD_MASTER, rv);
437 return NULL;
438 }
439
bd74dc61 440 rv->cpu_record = hash_create_size(
996c9314 441 8, (unsigned int (*)(void *))cpu_record_hash_key,
bd74dc61
DS
442 (int (*)(const void *, const void *))cpu_record_hash_cmp,
443 "Thread Hash");
d62a17ae 444
445
446 /* Initialize the timer queues */
447 rv->timer = pqueue_create();
448 rv->timer->cmp = thread_timer_cmp;
449 rv->timer->update = thread_timer_update;
450
451 /* Initialize thread_fetch() settings */
452 rv->spin = true;
453 rv->handle_signals = true;
454
455 /* Set pthread owner, should be updated by actual owner */
456 rv->owner = pthread_self();
457 rv->cancel_req = list_new();
458 rv->cancel_req->del = cancelreq_del;
459 rv->canceled = true;
460
461 /* Initialize pipe poker */
462 pipe(rv->io_pipe);
463 set_nonblocking(rv->io_pipe[0]);
464 set_nonblocking(rv->io_pipe[1]);
465
466 /* Initialize data structures for poll() */
467 rv->handler.pfdsize = rv->fd_limit;
468 rv->handler.pfdcount = 0;
469 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
470 sizeof(struct pollfd) * rv->handler.pfdsize);
471 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
472 sizeof(struct pollfd) * rv->handler.pfdsize);
473
eff09c66 474 /* add to list of threadmasters */
d62a17ae 475 pthread_mutex_lock(&masters_mtx);
476 {
eff09c66
QY
477 if (!masters)
478 masters = list_new();
479
d62a17ae 480 listnode_add(masters, rv);
481 }
482 pthread_mutex_unlock(&masters_mtx);
483
484 return rv;
718e3744 485}
486
d8a8a8de
QY
487void thread_master_set_name(struct thread_master *master, const char *name)
488{
489 pthread_mutex_lock(&master->mtx);
490 {
491 if (master->name)
492 XFREE(MTYPE_THREAD_MASTER, master->name);
493 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
494 }
495 pthread_mutex_unlock(&master->mtx);
496}
497
718e3744 498/* Add a new thread to the list. */
d62a17ae 499static void thread_list_add(struct thread_list *list, struct thread *thread)
718e3744 500{
d62a17ae 501 thread->next = NULL;
502 thread->prev = list->tail;
503 if (list->tail)
504 list->tail->next = thread;
505 else
506 list->head = thread;
507 list->tail = thread;
508 list->count++;
718e3744 509}
510
718e3744 511/* Delete a thread from the list. */
d62a17ae 512static struct thread *thread_list_delete(struct thread_list *list,
513 struct thread *thread)
718e3744 514{
d62a17ae 515 if (thread->next)
516 thread->next->prev = thread->prev;
517 else
518 list->tail = thread->prev;
519 if (thread->prev)
520 thread->prev->next = thread->next;
521 else
522 list->head = thread->next;
523 thread->next = thread->prev = NULL;
524 list->count--;
525 return thread;
718e3744 526}
527
495f0b13 528/* Thread list is empty or not. */
d62a17ae 529static int thread_empty(struct thread_list *list)
495f0b13 530{
d62a17ae 531 return list->head ? 0 : 1;
495f0b13
DS
532}
533
534/* Delete top of the list and return it. */
d62a17ae 535static struct thread *thread_trim_head(struct thread_list *list)
495f0b13 536{
d62a17ae 537 if (!thread_empty(list))
538 return thread_list_delete(list, list->head);
539 return NULL;
495f0b13
DS
540}
541
718e3744 542/* Move thread to unuse list. */
d62a17ae 543static void thread_add_unuse(struct thread_master *m, struct thread *thread)
718e3744 544{
d62a17ae 545 assert(m != NULL && thread != NULL);
546 assert(thread->next == NULL);
547 assert(thread->prev == NULL);
548 thread->ref = NULL;
549
550 thread->type = THREAD_UNUSED;
551 thread->hist->total_active--;
552 thread_list_add(&m->unuse, thread);
718e3744 553}
554
555/* Free all unused thread. */
d62a17ae 556static void thread_list_free(struct thread_master *m, struct thread_list *list)
718e3744 557{
d62a17ae 558 struct thread *t;
559 struct thread *next;
560
561 for (t = list->head; t; t = next) {
562 next = t->next;
563 XFREE(MTYPE_THREAD, t);
564 list->count--;
565 m->alloc--;
566 }
718e3744 567}
568
d62a17ae 569static void thread_array_free(struct thread_master *m,
570 struct thread **thread_array)
308d14ae 571{
d62a17ae 572 struct thread *t;
573 int index;
574
575 for (index = 0; index < m->fd_limit; ++index) {
576 t = thread_array[index];
577 if (t) {
578 thread_array[index] = NULL;
579 XFREE(MTYPE_THREAD, t);
580 m->alloc--;
581 }
582 }
583 XFREE(MTYPE_THREAD, thread_array);
308d14ae
DV
584}
585
d62a17ae 586static void thread_queue_free(struct thread_master *m, struct pqueue *queue)
4becea72 587{
d62a17ae 588 int i;
4becea72 589
d62a17ae 590 for (i = 0; i < queue->size; i++)
591 XFREE(MTYPE_THREAD, queue->array[i]);
4becea72 592
d62a17ae 593 m->alloc -= queue->size;
594 pqueue_delete(queue);
4becea72
CF
595}
596
495f0b13
DS
597/*
598 * thread_master_free_unused
599 *
600 * As threads are finished with they are put on the
601 * unuse list for later reuse.
602 * If we are shutting down, Free up unused threads
603 * So we can see if we forget to shut anything off
604 */
d62a17ae 605void thread_master_free_unused(struct thread_master *m)
495f0b13 606{
d62a17ae 607 pthread_mutex_lock(&m->mtx);
608 {
609 struct thread *t;
610 while ((t = thread_trim_head(&m->unuse)) != NULL) {
611 pthread_mutex_destroy(&t->mtx);
612 XFREE(MTYPE_THREAD, t);
613 }
614 }
615 pthread_mutex_unlock(&m->mtx);
495f0b13
DS
616}
617
718e3744 618/* Stop thread scheduler. */
d62a17ae 619void thread_master_free(struct thread_master *m)
718e3744 620{
d62a17ae 621 pthread_mutex_lock(&masters_mtx);
622 {
623 listnode_delete(masters, m);
eff09c66 624 if (masters->count == 0) {
acdf5e25 625 list_delete_and_null(&masters);
eff09c66 626 }
d62a17ae 627 }
628 pthread_mutex_unlock(&masters_mtx);
629
630 thread_array_free(m, m->read);
631 thread_array_free(m, m->write);
632 thread_queue_free(m, m->timer);
633 thread_list_free(m, &m->event);
634 thread_list_free(m, &m->ready);
635 thread_list_free(m, &m->unuse);
636 pthread_mutex_destroy(&m->mtx);
33844bbe 637 pthread_cond_destroy(&m->cancel_cond);
d62a17ae 638 close(m->io_pipe[0]);
639 close(m->io_pipe[1]);
affe9e99 640 list_delete_and_null(&m->cancel_req);
1a0a92ea 641 m->cancel_req = NULL;
d62a17ae 642
643 hash_clean(m->cpu_record, cpu_record_hash_free);
644 hash_free(m->cpu_record);
645 m->cpu_record = NULL;
646
4e1000a1
QY
647 if (m->name)
648 XFREE(MTYPE_THREAD_MASTER, m->name);
d62a17ae 649 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
650 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
651 XFREE(MTYPE_THREAD_MASTER, m);
718e3744 652}
653
718e3744 654/* Return remain time in second. */
d62a17ae 655unsigned long thread_timer_remain_second(struct thread *thread)
718e3744 656{
d62a17ae 657 int64_t remain;
1189d95f 658
d62a17ae 659 pthread_mutex_lock(&thread->mtx);
660 {
661 remain = monotime_until(&thread->u.sands, NULL) / 1000000LL;
662 }
663 pthread_mutex_unlock(&thread->mtx);
1189d95f 664
d62a17ae 665 return remain < 0 ? 0 : remain;
718e3744 666}
667
9c7753e4
DL
668#define debugargdef const char *funcname, const char *schedfrom, int fromln
669#define debugargpass funcname, schedfrom, fromln
e04ab74d 670
d62a17ae 671struct timeval thread_timer_remain(struct thread *thread)
6ac44687 672{
d62a17ae 673 struct timeval remain;
674 pthread_mutex_lock(&thread->mtx);
675 {
676 monotime_until(&thread->u.sands, &remain);
677 }
678 pthread_mutex_unlock(&thread->mtx);
679 return remain;
6ac44687
CF
680}
681
718e3744 682/* Get new thread. */
d7c0a89a 683static struct thread *thread_get(struct thread_master *m, uint8_t type,
d62a17ae 684 int (*func)(struct thread *), void *arg,
685 debugargdef)
718e3744 686{
d62a17ae 687 struct thread *thread = thread_trim_head(&m->unuse);
688 struct cpu_thread_history tmp;
689
690 if (!thread) {
691 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
692 /* mutex only needs to be initialized at struct creation. */
693 pthread_mutex_init(&thread->mtx, NULL);
694 m->alloc++;
695 }
696
697 thread->type = type;
698 thread->add_type = type;
699 thread->master = m;
700 thread->arg = arg;
701 thread->index = -1;
702 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
703 thread->ref = NULL;
704
705 /*
706 * So if the passed in funcname is not what we have
707 * stored that means the thread->hist needs to be
708 * updated. We keep the last one around in unused
709 * under the assumption that we are probably
710 * going to immediately allocate the same
711 * type of thread.
712 * This hopefully saves us some serious
713 * hash_get lookups.
714 */
715 if (thread->funcname != funcname || thread->func != func) {
716 tmp.func = func;
717 tmp.funcname = funcname;
718 thread->hist =
719 hash_get(m->cpu_record, &tmp,
720 (void *(*)(void *))cpu_record_hash_alloc);
721 }
722 thread->hist->total_active++;
723 thread->func = func;
724 thread->funcname = funcname;
725 thread->schedfrom = schedfrom;
726 thread->schedfrom_line = fromln;
727
728 return thread;
718e3744 729}
730
d62a17ae 731static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
732 nfds_t count, const struct timeval *timer_wait)
209a72a6 733{
d62a17ae 734 /* If timer_wait is null here, that means poll() should block
735 * indefinitely,
736 * unless the thread_master has overriden it by setting
737 * ->selectpoll_timeout.
738 * If the value is positive, it specifies the maximum number of
739 * milliseconds
740 * to wait. If the timeout is -1, it specifies that we should never wait
741 * and
742 * always return immediately even if no event is detected. If the value
743 * is
744 * zero, the behavior is default. */
745 int timeout = -1;
746
747 /* number of file descriptors with events */
748 int num;
749
750 if (timer_wait != NULL
751 && m->selectpoll_timeout == 0) // use the default value
752 timeout = (timer_wait->tv_sec * 1000)
753 + (timer_wait->tv_usec / 1000);
754 else if (m->selectpoll_timeout > 0) // use the user's timeout
755 timeout = m->selectpoll_timeout;
756 else if (m->selectpoll_timeout
757 < 0) // effect a poll (return immediately)
758 timeout = 0;
759
760 /* add poll pipe poker */
761 assert(count + 1 < pfdsize);
762 pfds[count].fd = m->io_pipe[0];
763 pfds[count].events = POLLIN;
764 pfds[count].revents = 0x00;
765
766 num = poll(pfds, count + 1, timeout);
767
768 unsigned char trash[64];
769 if (num > 0 && pfds[count].revents != 0 && num--)
770 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
771 ;
772
773 return num;
209a72a6
DS
774}
775
718e3744 776/* Add new read thread. */
d62a17ae 777struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
778 int (*func)(struct thread *),
779 void *arg, int fd,
780 struct thread **t_ptr,
781 debugargdef)
718e3744 782{
d62a17ae 783 struct thread *thread = NULL;
784
785 pthread_mutex_lock(&m->mtx);
786 {
787 if (t_ptr
788 && *t_ptr) // thread is already scheduled; don't reschedule
789 {
790 pthread_mutex_unlock(&m->mtx);
791 return NULL;
792 }
793
794 /* default to a new pollfd */
795 nfds_t queuepos = m->handler.pfdcount;
796
797 /* if we already have a pollfd for our file descriptor, find and
798 * use it */
799 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
800 if (m->handler.pfds[i].fd == fd) {
801 queuepos = i;
802 break;
803 }
804
805 /* make sure we have room for this fd + pipe poker fd */
806 assert(queuepos + 1 < m->handler.pfdsize);
807
808 thread = thread_get(m, dir, func, arg, debugargpass);
809
810 m->handler.pfds[queuepos].fd = fd;
811 m->handler.pfds[queuepos].events |=
812 (dir == THREAD_READ ? POLLIN : POLLOUT);
813
814 if (queuepos == m->handler.pfdcount)
815 m->handler.pfdcount++;
816
817 if (thread) {
818 pthread_mutex_lock(&thread->mtx);
819 {
820 thread->u.fd = fd;
821 if (dir == THREAD_READ)
822 m->read[thread->u.fd] = thread;
823 else
824 m->write[thread->u.fd] = thread;
825 }
826 pthread_mutex_unlock(&thread->mtx);
827
828 if (t_ptr) {
829 *t_ptr = thread;
830 thread->ref = t_ptr;
831 }
832 }
833
834 AWAKEN(m);
835 }
836 pthread_mutex_unlock(&m->mtx);
837
838 return thread;
718e3744 839}
840
56a94b36 841static struct thread *
d62a17ae 842funcname_thread_add_timer_timeval(struct thread_master *m,
843 int (*func)(struct thread *), int type,
844 void *arg, struct timeval *time_relative,
845 struct thread **t_ptr, debugargdef)
718e3744 846{
d62a17ae 847 struct thread *thread;
848 struct pqueue *queue;
849
850 assert(m != NULL);
851
852 assert(type == THREAD_TIMER);
853 assert(time_relative);
854
855 pthread_mutex_lock(&m->mtx);
856 {
857 if (t_ptr
858 && *t_ptr) // thread is already scheduled; don't reschedule
859 {
860 pthread_mutex_unlock(&m->mtx);
861 return NULL;
862 }
863
864 queue = m->timer;
865 thread = thread_get(m, type, func, arg, debugargpass);
866
867 pthread_mutex_lock(&thread->mtx);
868 {
869 monotime(&thread->u.sands);
870 timeradd(&thread->u.sands, time_relative,
871 &thread->u.sands);
872 pqueue_enqueue(thread, queue);
873 if (t_ptr) {
874 *t_ptr = thread;
875 thread->ref = t_ptr;
876 }
877 }
878 pthread_mutex_unlock(&thread->mtx);
879
880 AWAKEN(m);
881 }
882 pthread_mutex_unlock(&m->mtx);
883
884 return thread;
9e867fe6 885}
886
98c91ac6 887
888/* Add timer event thread. */
d62a17ae 889struct thread *funcname_thread_add_timer(struct thread_master *m,
890 int (*func)(struct thread *),
891 void *arg, long timer,
892 struct thread **t_ptr, debugargdef)
9e867fe6 893{
d62a17ae 894 struct timeval trel;
9e867fe6 895
d62a17ae 896 assert(m != NULL);
9e867fe6 897
d62a17ae 898 trel.tv_sec = timer;
899 trel.tv_usec = 0;
9e867fe6 900
d62a17ae 901 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
902 &trel, t_ptr, debugargpass);
98c91ac6 903}
9e867fe6 904
98c91ac6 905/* Add timer event thread with "millisecond" resolution */
d62a17ae 906struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
907 int (*func)(struct thread *),
908 void *arg, long timer,
909 struct thread **t_ptr,
910 debugargdef)
98c91ac6 911{
d62a17ae 912 struct timeval trel;
9e867fe6 913
d62a17ae 914 assert(m != NULL);
718e3744 915
d62a17ae 916 trel.tv_sec = timer / 1000;
917 trel.tv_usec = 1000 * (timer % 1000);
98c91ac6 918
d62a17ae 919 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
920 &trel, t_ptr, debugargpass);
a48b4e6d 921}
922
d03c4cbd 923/* Add timer event thread with "millisecond" resolution */
d62a17ae 924struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
925 int (*func)(struct thread *),
926 void *arg, struct timeval *tv,
927 struct thread **t_ptr, debugargdef)
d03c4cbd 928{
d62a17ae 929 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
930 t_ptr, debugargpass);
d03c4cbd
DL
931}
932
718e3744 933/* Add simple event thread. */
d62a17ae 934struct thread *funcname_thread_add_event(struct thread_master *m,
935 int (*func)(struct thread *),
936 void *arg, int val,
937 struct thread **t_ptr, debugargdef)
718e3744 938{
d62a17ae 939 struct thread *thread;
940
941 assert(m != NULL);
942
943 pthread_mutex_lock(&m->mtx);
944 {
945 if (t_ptr
946 && *t_ptr) // thread is already scheduled; don't reschedule
947 {
948 pthread_mutex_unlock(&m->mtx);
949 return NULL;
950 }
951
952 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
953 pthread_mutex_lock(&thread->mtx);
954 {
955 thread->u.val = val;
956 thread_list_add(&m->event, thread);
957 }
958 pthread_mutex_unlock(&thread->mtx);
959
960 if (t_ptr) {
961 *t_ptr = thread;
962 thread->ref = t_ptr;
963 }
964
965 AWAKEN(m);
966 }
967 pthread_mutex_unlock(&m->mtx);
968
969 return thread;
718e3744 970}
971
63ccb9cb
QY
972/* Thread cancellation ------------------------------------------------------ */
973
8797240e
QY
974/**
975 * NOT's out the .events field of pollfd corresponding to the given file
976 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
977 *
978 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
979 * implementation for details.
980 *
981 * @param master
982 * @param fd
983 * @param state the event to cancel. One or more (OR'd together) of the
984 * following:
985 * - POLLIN
986 * - POLLOUT
987 */
d62a17ae 988static void thread_cancel_rw(struct thread_master *master, int fd, short state)
0a95a0d0 989{
42d74538
QY
990 bool found = false;
991
d62a17ae 992 /* Cancel POLLHUP too just in case some bozo set it */
993 state |= POLLHUP;
994
995 /* find the index of corresponding pollfd */
996 nfds_t i;
997
998 for (i = 0; i < master->handler.pfdcount; i++)
42d74538
QY
999 if (master->handler.pfds[i].fd == fd) {
1000 found = true;
d62a17ae 1001 break;
42d74538
QY
1002 }
1003
1004 if (!found) {
1005 zlog_debug(
1006 "[!] Received cancellation request for nonexistent rw job");
1007 zlog_debug("[!] threadmaster: %s | fd: %d",
996c9314 1008 master->name ? master->name : "", fd);
42d74538
QY
1009 return;
1010 }
d62a17ae 1011
1012 /* NOT out event. */
1013 master->handler.pfds[i].events &= ~(state);
1014
1015 /* If all events are canceled, delete / resize the pollfd array. */
1016 if (master->handler.pfds[i].events == 0) {
1017 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
1018 (master->handler.pfdcount - i - 1)
1019 * sizeof(struct pollfd));
1020 master->handler.pfdcount--;
1021 }
1022
1023 /* If we have the same pollfd in the copy, perform the same operations,
1024 * otherwise return. */
1025 if (i >= master->handler.copycount)
1026 return;
1027
1028 master->handler.copy[i].events &= ~(state);
1029
1030 if (master->handler.copy[i].events == 0) {
1031 memmove(master->handler.copy + i, master->handler.copy + i + 1,
1032 (master->handler.copycount - i - 1)
1033 * sizeof(struct pollfd));
1034 master->handler.copycount--;
1035 }
0a95a0d0
DS
1036}
1037
1189d95f 1038/**
63ccb9cb 1039 * Process cancellation requests.
1189d95f 1040 *
63ccb9cb
QY
1041 * This may only be run from the pthread which owns the thread_master.
1042 *
1043 * @param master the thread master to process
1044 * @REQUIRE master->mtx
1189d95f 1045 */
d62a17ae 1046static void do_thread_cancel(struct thread_master *master)
718e3744 1047{
d62a17ae 1048 struct thread_list *list = NULL;
1049 struct pqueue *queue = NULL;
1050 struct thread **thread_array = NULL;
1051 struct thread *thread;
1052
1053 struct cancel_req *cr;
1054 struct listnode *ln;
1055 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1056 /* If this is an event object cancellation, linear search
1057 * through event
1058 * list deleting any events which have the specified argument.
1059 * We also
1060 * need to check every thread in the ready queue. */
1061 if (cr->eventobj) {
1062 struct thread *t;
1063 thread = master->event.head;
1064
1065 while (thread) {
1066 t = thread;
1067 thread = t->next;
1068
1069 if (t->arg == cr->eventobj) {
1070 thread_list_delete(&master->event, t);
1071 if (t->ref)
1072 *t->ref = NULL;
1073 thread_add_unuse(master, t);
1074 }
1075 }
1076
1077 thread = master->ready.head;
1078 while (thread) {
1079 t = thread;
1080 thread = t->next;
1081
1082 if (t->arg == cr->eventobj) {
1083 thread_list_delete(&master->ready, t);
1084 if (t->ref)
1085 *t->ref = NULL;
1086 thread_add_unuse(master, t);
1087 }
1088 }
1089 continue;
1090 }
1091
1092 /* The pointer varies depending on whether the cancellation
1093 * request was
1094 * made asynchronously or not. If it was, we need to check
1095 * whether the
1096 * thread even exists anymore before cancelling it. */
1097 thread = (cr->thread) ? cr->thread : *cr->threadref;
1098
1099 if (!thread)
1100 continue;
1101
1102 /* Determine the appropriate queue to cancel the thread from */
1103 switch (thread->type) {
1104 case THREAD_READ:
1105 thread_cancel_rw(master, thread->u.fd, POLLIN);
1106 thread_array = master->read;
1107 break;
1108 case THREAD_WRITE:
1109 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1110 thread_array = master->write;
1111 break;
1112 case THREAD_TIMER:
1113 queue = master->timer;
1114 break;
1115 case THREAD_EVENT:
1116 list = &master->event;
1117 break;
1118 case THREAD_READY:
1119 list = &master->ready;
1120 break;
1121 default:
1122 continue;
1123 break;
1124 }
1125
1126 if (queue) {
1127 assert(thread->index >= 0);
522f7f99
DS
1128 assert(thread == queue->array[thread->index]);
1129 pqueue_remove_at(thread->index, queue);
d62a17ae 1130 } else if (list) {
1131 thread_list_delete(list, thread);
1132 } else if (thread_array) {
1133 thread_array[thread->u.fd] = NULL;
1134 } else {
1135 assert(!"Thread should be either in queue or list or array!");
1136 }
1137
1138 if (thread->ref)
1139 *thread->ref = NULL;
1140
1141 thread_add_unuse(thread->master, thread);
1142 }
1143
1144 /* Delete and free all cancellation requests */
1145 list_delete_all_node(master->cancel_req);
1146
1147 /* Wake up any threads which may be blocked in thread_cancel_async() */
1148 master->canceled = true;
1149 pthread_cond_broadcast(&master->cancel_cond);
718e3744 1150}
1151
63ccb9cb
QY
1152/**
1153 * Cancel any events which have the specified argument.
1154 *
1155 * MT-Unsafe
1156 *
1157 * @param m the thread_master to cancel from
1158 * @param arg the argument passed when creating the event
1159 */
d62a17ae 1160void thread_cancel_event(struct thread_master *master, void *arg)
718e3744 1161{
d62a17ae 1162 assert(master->owner == pthread_self());
1163
1164 pthread_mutex_lock(&master->mtx);
1165 {
1166 struct cancel_req *cr =
1167 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1168 cr->eventobj = arg;
1169 listnode_add(master->cancel_req, cr);
1170 do_thread_cancel(master);
1171 }
1172 pthread_mutex_unlock(&master->mtx);
63ccb9cb 1173}
1189d95f 1174
63ccb9cb
QY
1175/**
1176 * Cancel a specific task.
1177 *
1178 * MT-Unsafe
1179 *
1180 * @param thread task to cancel
1181 */
d62a17ae 1182void thread_cancel(struct thread *thread)
63ccb9cb 1183{
d62a17ae 1184 assert(thread->master->owner == pthread_self());
1185
1186 pthread_mutex_lock(&thread->master->mtx);
1187 {
1188 struct cancel_req *cr =
1189 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1190 cr->thread = thread;
1191 listnode_add(thread->master->cancel_req, cr);
1192 do_thread_cancel(thread->master);
1193 }
1194 pthread_mutex_unlock(&thread->master->mtx);
63ccb9cb 1195}
1189d95f 1196
63ccb9cb
QY
1197/**
1198 * Asynchronous cancellation.
1199 *
8797240e
QY
1200 * Called with either a struct thread ** or void * to an event argument,
1201 * this function posts the correct cancellation request and blocks until it is
1202 * serviced.
63ccb9cb
QY
1203 *
1204 * If the thread is currently running, execution blocks until it completes.
1205 *
8797240e
QY
1206 * The last two parameters are mutually exclusive, i.e. if you pass one the
1207 * other must be NULL.
1208 *
1209 * When the cancellation procedure executes on the target thread_master, the
1210 * thread * provided is checked for nullity. If it is null, the thread is
1211 * assumed to no longer exist and the cancellation request is a no-op. Thus
1212 * users of this API must pass a back-reference when scheduling the original
1213 * task.
1214 *
63ccb9cb
QY
1215 * MT-Safe
1216 *
8797240e
QY
1217 * @param master the thread master with the relevant event / task
1218 * @param thread pointer to thread to cancel
1219 * @param eventobj the event
63ccb9cb 1220 */
d62a17ae 1221void thread_cancel_async(struct thread_master *master, struct thread **thread,
1222 void *eventobj)
63ccb9cb 1223{
d62a17ae 1224 assert(!(thread && eventobj) && (thread || eventobj));
1225 assert(master->owner != pthread_self());
1226
1227 pthread_mutex_lock(&master->mtx);
1228 {
1229 master->canceled = false;
1230
1231 if (thread) {
1232 struct cancel_req *cr =
1233 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1234 cr->threadref = thread;
1235 listnode_add(master->cancel_req, cr);
1236 } else if (eventobj) {
1237 struct cancel_req *cr =
1238 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1239 cr->eventobj = eventobj;
1240 listnode_add(master->cancel_req, cr);
1241 }
1242 AWAKEN(master);
1243
1244 while (!master->canceled)
1245 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1246 }
1247 pthread_mutex_unlock(&master->mtx);
718e3744 1248}
63ccb9cb 1249/* ------------------------------------------------------------------------- */
718e3744 1250
d62a17ae 1251static struct timeval *thread_timer_wait(struct pqueue *queue,
1252 struct timeval *timer_val)
718e3744 1253{
d62a17ae 1254 if (queue->size) {
1255 struct thread *next_timer = queue->array[0];
1256 monotime_until(&next_timer->u.sands, timer_val);
1257 return timer_val;
1258 }
1259 return NULL;
718e3744 1260}
718e3744 1261
d62a17ae 1262static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1263 struct thread *fetch)
718e3744 1264{
d62a17ae 1265 *fetch = *thread;
1266 thread_add_unuse(m, thread);
1267 return fetch;
718e3744 1268}
1269
d62a17ae 1270static int thread_process_io_helper(struct thread_master *m,
1271 struct thread *thread, short state, int pos)
5d4ccd4e 1272{
d62a17ae 1273 struct thread **thread_array;
1274
1275 if (!thread)
1276 return 0;
1277
1278 if (thread->type == THREAD_READ)
1279 thread_array = m->read;
1280 else
1281 thread_array = m->write;
1282
1283 thread_array[thread->u.fd] = NULL;
1284 thread_list_add(&m->ready, thread);
1285 thread->type = THREAD_READY;
1286 /* if another pthread scheduled this file descriptor for the event we're
1287 * responding to, no problem; we're getting to it now */
1288 thread->master->handler.pfds[pos].events &= ~(state);
1289 return 1;
5d4ccd4e
DS
1290}
1291
8797240e
QY
1292/**
1293 * Process I/O events.
1294 *
1295 * Walks through file descriptor array looking for those pollfds whose .revents
1296 * field has something interesting. Deletes any invalid file descriptors.
1297 *
1298 * @param m the thread master
1299 * @param num the number of active file descriptors (return value of poll())
1300 */
d62a17ae 1301static void thread_process_io(struct thread_master *m, unsigned int num)
0a95a0d0 1302{
d62a17ae 1303 unsigned int ready = 0;
1304 struct pollfd *pfds = m->handler.copy;
1305
1306 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1307 /* no event for current fd? immediately continue */
1308 if (pfds[i].revents == 0)
1309 continue;
1310
1311 ready++;
1312
1313 /* Unless someone has called thread_cancel from another pthread,
1314 * the only
1315 * thing that could have changed in m->handler.pfds while we
1316 * were
1317 * asleep is the .events field in a given pollfd. Barring
1318 * thread_cancel()
1319 * that value should be a superset of the values we have in our
1320 * copy, so
1321 * there's no need to update it. Similarily, barring deletion,
1322 * the fd
1323 * should still be a valid index into the master's pfds. */
1324 if (pfds[i].revents & (POLLIN | POLLHUP))
1325 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
1326 i);
1327 if (pfds[i].revents & POLLOUT)
1328 thread_process_io_helper(m, m->write[pfds[i].fd],
1329 POLLOUT, i);
1330
1331 /* if one of our file descriptors is garbage, remove the same
1332 * from
1333 * both pfds + update sizes and index */
1334 if (pfds[i].revents & POLLNVAL) {
1335 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1336 (m->handler.pfdcount - i - 1)
1337 * sizeof(struct pollfd));
1338 m->handler.pfdcount--;
1339
1340 memmove(pfds + i, pfds + i + 1,
1341 (m->handler.copycount - i - 1)
1342 * sizeof(struct pollfd));
1343 m->handler.copycount--;
1344
1345 i--;
1346 }
1347 }
718e3744 1348}
1349
8b70d0b0 1350/* Add all timers that have popped to the ready list. */
d62a17ae 1351static unsigned int thread_process_timers(struct pqueue *queue,
1352 struct timeval *timenow)
a48b4e6d 1353{
d62a17ae 1354 struct thread *thread;
1355 unsigned int ready = 0;
1356
1357 while (queue->size) {
1358 thread = queue->array[0];
1359 if (timercmp(timenow, &thread->u.sands, <))
1360 return ready;
1361 pqueue_dequeue(queue);
1362 thread->type = THREAD_READY;
1363 thread_list_add(&thread->master->ready, thread);
1364 ready++;
1365 }
1366 return ready;
a48b4e6d 1367}
1368
2613abe6 1369/* process a list en masse, e.g. for event thread lists */
d62a17ae 1370static unsigned int thread_process(struct thread_list *list)
2613abe6 1371{
d62a17ae 1372 struct thread *thread;
1373 struct thread *next;
1374 unsigned int ready = 0;
1375
1376 for (thread = list->head; thread; thread = next) {
1377 next = thread->next;
1378 thread_list_delete(list, thread);
1379 thread->type = THREAD_READY;
1380 thread_list_add(&thread->master->ready, thread);
1381 ready++;
1382 }
1383 return ready;
2613abe6
PJ
1384}
1385
1386
718e3744 1387/* Fetch next ready thread. */
d62a17ae 1388struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
718e3744 1389{
d62a17ae 1390 struct thread *thread = NULL;
1391 struct timeval now;
1392 struct timeval zerotime = {0, 0};
1393 struct timeval tv;
1394 struct timeval *tw = NULL;
1395
1396 int num = 0;
1397
1398 do {
1399 /* Handle signals if any */
1400 if (m->handle_signals)
1401 quagga_sigevent_process();
1402
1403 pthread_mutex_lock(&m->mtx);
1404
1405 /* Process any pending cancellation requests */
1406 do_thread_cancel(m);
1407
e3c9529e
QY
1408 /*
1409 * Attempt to flush ready queue before going into poll().
1410 * This is performance-critical. Think twice before modifying.
1411 */
1412 if ((thread = thread_trim_head(&m->ready))) {
1413 fetch = thread_run(m, thread, fetch);
1414 if (fetch->ref)
1415 *fetch->ref = NULL;
1416 pthread_mutex_unlock(&m->mtx);
1417 break;
1418 }
1419
1420 /* otherwise, tick through scheduling sequence */
1421
bca37d17
QY
1422 /*
1423 * Post events to ready queue. This must come before the
1424 * following block since events should occur immediately
1425 */
d62a17ae 1426 thread_process(&m->event);
1427
bca37d17
QY
1428 /*
1429 * If there are no tasks on the ready queue, we will poll()
1430 * until a timer expires or we receive I/O, whichever comes
1431 * first. The strategy for doing this is:
d62a17ae 1432 *
1433 * - If there are events pending, set the poll() timeout to zero
1434 * - If there are no events pending, but there are timers
1435 * pending, set the
1436 * timeout to the smallest remaining time on any timer
1437 * - If there are neither timers nor events pending, but there
1438 * are file
1439 * descriptors pending, block indefinitely in poll()
1440 * - If nothing is pending, it's time for the application to die
1441 *
1442 * In every case except the last, we need to hit poll() at least
bca37d17
QY
1443 * once per loop to avoid starvation by events
1444 */
d62a17ae 1445 if (m->ready.count == 0)
1446 tw = thread_timer_wait(m->timer, &tv);
1447
1448 if (m->ready.count != 0 || (tw && !timercmp(tw, &zerotime, >)))
1449 tw = &zerotime;
1450
1451 if (!tw && m->handler.pfdcount == 0) { /* die */
1452 pthread_mutex_unlock(&m->mtx);
1453 fetch = NULL;
1454 break;
1455 }
1456
bca37d17
QY
1457 /*
1458 * Copy pollfd array + # active pollfds in it. Not necessary to
1459 * copy the array size as this is fixed.
1460 */
d62a17ae 1461 m->handler.copycount = m->handler.pfdcount;
1462 memcpy(m->handler.copy, m->handler.pfds,
1463 m->handler.copycount * sizeof(struct pollfd));
1464
e3c9529e
QY
1465 pthread_mutex_unlock(&m->mtx);
1466 {
1467 num = fd_poll(m, m->handler.copy, m->handler.pfdsize,
1468 m->handler.copycount, tw);
1469 }
1470 pthread_mutex_lock(&m->mtx);
d764d2cc 1471
e3c9529e
QY
1472 /* Handle any errors received in poll() */
1473 if (num < 0) {
1474 if (errno == EINTR) {
d62a17ae 1475 pthread_mutex_unlock(&m->mtx);
e3c9529e
QY
1476 /* loop around to signal handler */
1477 continue;
d62a17ae 1478 }
1479
e3c9529e
QY
1480 /* else die */
1481 zlog_warn("poll() error: %s", safe_strerror(errno));
1482 pthread_mutex_unlock(&m->mtx);
1483 fetch = NULL;
1484 break;
bca37d17 1485 }
d62a17ae 1486
1487 /* Post timers to ready queue. */
1488 monotime(&now);
1489 thread_process_timers(m->timer, &now);
1490
1491 /* Post I/O to ready queue. */
1492 if (num > 0)
1493 thread_process_io(m, num);
1494
d62a17ae 1495 pthread_mutex_unlock(&m->mtx);
1496
1497 } while (!thread && m->spin);
1498
1499 return fetch;
718e3744 1500}
1501
d62a17ae 1502static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
62f44022 1503{
d62a17ae 1504 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1505 + (a.tv_usec - b.tv_usec));
62f44022
QY
1506}
1507
d62a17ae 1508unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1509 unsigned long *cputime)
718e3744 1510{
d62a17ae 1511 /* This is 'user + sys' time. */
1512 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1513 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1514 return timeval_elapsed(now->real, start->real);
8b70d0b0 1515}
1516
50596be0
DS
1517/* We should aim to yield after yield milliseconds, which defaults
1518 to THREAD_YIELD_TIME_SLOT .
8b70d0b0 1519 Note: we are using real (wall clock) time for this calculation.
1520 It could be argued that CPU time may make more sense in certain
1521 contexts. The things to consider are whether the thread may have
1522 blocked (in which case wall time increases, but CPU time does not),
1523 or whether the system is heavily loaded with other processes competing
d62a17ae 1524 for CPU time. On balance, wall clock time seems to make sense.
8b70d0b0 1525 Plus it has the added benefit that gettimeofday should be faster
1526 than calling getrusage. */
d62a17ae 1527int thread_should_yield(struct thread *thread)
718e3744 1528{
d62a17ae 1529 int result;
1530 pthread_mutex_lock(&thread->mtx);
1531 {
1532 result = monotime_since(&thread->real, NULL)
1533 > (int64_t)thread->yield;
1534 }
1535 pthread_mutex_unlock(&thread->mtx);
1536 return result;
50596be0
DS
1537}
1538
d62a17ae 1539void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
50596be0 1540{
d62a17ae 1541 pthread_mutex_lock(&thread->mtx);
1542 {
1543 thread->yield = yield_time;
1544 }
1545 pthread_mutex_unlock(&thread->mtx);
718e3744 1546}
1547
d62a17ae 1548void thread_getrusage(RUSAGE_T *r)
db9c0df9 1549{
d62a17ae 1550 monotime(&r->real);
1551 getrusage(RUSAGE_SELF, &(r->cpu));
db9c0df9
PJ
1552}
1553
fbcac826
QY
1554/*
1555 * Call a thread.
1556 *
1557 * This function will atomically update the thread's usage history. At present
1558 * this is the only spot where usage history is written. Nevertheless the code
1559 * has been written such that the introduction of writers in the future should
1560 * not need to update it provided the writers atomically perform only the
1561 * operations done here, i.e. updating the total and maximum times. In
1562 * particular, the maximum real and cpu times must be monotonically increasing
1563 * or this code is not correct.
1564 */
d62a17ae 1565void thread_call(struct thread *thread)
718e3744 1566{
fbcac826
QY
1567 _Atomic unsigned long realtime, cputime;
1568 unsigned long exp;
1569 unsigned long helper;
d62a17ae 1570 RUSAGE_T before, after;
cc8b13a0 1571
d62a17ae 1572 GETRUSAGE(&before);
1573 thread->real = before.real;
718e3744 1574
d62a17ae 1575 pthread_setspecific(thread_current, thread);
1576 (*thread->func)(thread);
1577 pthread_setspecific(thread_current, NULL);
718e3744 1578
d62a17ae 1579 GETRUSAGE(&after);
718e3744 1580
fbcac826
QY
1581 realtime = thread_consumed_time(&after, &before, &helper);
1582 cputime = helper;
1583
1584 /* update realtime */
1585 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1586 memory_order_seq_cst);
1587 exp = atomic_load_explicit(&thread->hist->real.max,
1588 memory_order_seq_cst);
1589 while (exp < realtime
1590 && !atomic_compare_exchange_weak_explicit(
1591 &thread->hist->real.max, &exp, realtime,
1592 memory_order_seq_cst, memory_order_seq_cst))
1593 ;
1594
1595 /* update cputime */
1596 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1597 memory_order_seq_cst);
1598 exp = atomic_load_explicit(&thread->hist->cpu.max,
1599 memory_order_seq_cst);
1600 while (exp < cputime
1601 && !atomic_compare_exchange_weak_explicit(
1602 &thread->hist->cpu.max, &exp, cputime,
1603 memory_order_seq_cst, memory_order_seq_cst))
1604 ;
1605
1606 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1607 memory_order_seq_cst);
1608 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1609 memory_order_seq_cst);
718e3744 1610
924b9229 1611#ifdef CONSUMED_TIME_CHECK
d62a17ae 1612 if (realtime > CONSUMED_TIME_CHECK) {
1613 /*
1614 * We have a CPU Hog on our hands.
1615 * Whinge about it now, so we're aware this is yet another task
1616 * to fix.
1617 */
1618 zlog_warn(
1619 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1620 thread->funcname, (unsigned long)thread->func,
1621 realtime / 1000, cputime / 1000);
1622 }
924b9229 1623#endif /* CONSUMED_TIME_CHECK */
718e3744 1624}
1625
1626/* Execute thread */
d62a17ae 1627void funcname_thread_execute(struct thread_master *m,
1628 int (*func)(struct thread *), void *arg, int val,
1629 debugargdef)
718e3744 1630{
d62a17ae 1631 struct cpu_thread_history tmp;
1632 struct thread dummy;
718e3744 1633
d62a17ae 1634 memset(&dummy, 0, sizeof(struct thread));
718e3744 1635
d62a17ae 1636 pthread_mutex_init(&dummy.mtx, NULL);
1637 dummy.type = THREAD_EVENT;
1638 dummy.add_type = THREAD_EXECUTE;
1639 dummy.master = NULL;
1640 dummy.arg = arg;
1641 dummy.u.val = val;
9c7753e4 1642
d62a17ae 1643 tmp.func = dummy.func = func;
1644 tmp.funcname = dummy.funcname = funcname;
1645 dummy.hist = hash_get(m->cpu_record, &tmp,
1646 (void *(*)(void *))cpu_record_hash_alloc);
f7c62e11 1647
d62a17ae 1648 dummy.schedfrom = schedfrom;
1649 dummy.schedfrom_line = fromln;
9c7753e4 1650
d62a17ae 1651 thread_call(&dummy);
718e3744 1652}