]> git.proxmox.com Git - mirror_frr.git/blame - lib/thread.c
*: Rename backet to bucket
[mirror_frr.git] / lib / thread.c
CommitLineData
718e3744 1/* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
896014f4
DL
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
718e3744 19 */
20
21/* #define DEBUG */
22
23#include <zebra.h>
308d14ae 24#include <sys/resource.h>
718e3744 25
26#include "thread.h"
27#include "memory.h"
28#include "log.h"
e04ab74d 29#include "hash.h"
4becea72 30#include "pqueue.h"
e04ab74d 31#include "command.h"
05c447dd 32#include "sigevent.h"
3bf2673b 33#include "network.h"
bd74dc61 34#include "jhash.h"
fbcac826 35#include "frratomic.h"
9ef9495e 36#include "lib_errors.h"
d6be5fb9 37
d62a17ae 38DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
4a1ab8e4 39DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
a6f235f3 40DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
d62a17ae 41DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
4a1ab8e4 42
3b96b781
HT
43#if defined(__APPLE__)
44#include <mach/mach.h>
45#include <mach/mach_time.h>
46#endif
47
d62a17ae 48#define AWAKEN(m) \
49 do { \
50 static unsigned char wakebyte = 0x01; \
51 write(m->io_pipe[1], &wakebyte, 1); \
52 } while (0);
3bf2673b 53
62f44022 54/* control variable for initializer */
e0bebc7c 55pthread_once_t init_once = PTHREAD_ONCE_INIT;
e0bebc7c 56pthread_key_t thread_current;
6b0655a2 57
62f44022
QY
58pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
59static struct list *masters;
60
6655966d 61static void thread_free(struct thread_master *master, struct thread *thread);
6b0655a2 62
62f44022 63/* CLI start ---------------------------------------------------------------- */
d62a17ae 64static unsigned int cpu_record_hash_key(struct cpu_thread_history *a)
e04ab74d 65{
883cc51d 66 int size = sizeof(a->func);
bd74dc61
DS
67
68 return jhash(&a->func, size, 0);
e04ab74d 69}
70
74df8d6d 71static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
d62a17ae 72 const struct cpu_thread_history *b)
e04ab74d 73{
d62a17ae 74 return a->func == b->func;
e04ab74d 75}
76
d62a17ae 77static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
e04ab74d 78{
d62a17ae 79 struct cpu_thread_history *new;
80 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
81 new->func = a->func;
82 new->funcname = a->funcname;
83 return new;
e04ab74d 84}
85
d62a17ae 86static void cpu_record_hash_free(void *a)
228da428 87{
d62a17ae 88 struct cpu_thread_history *hist = a;
89
90 XFREE(MTYPE_THREAD_STATS, hist);
228da428
CC
91}
92
d62a17ae 93static void vty_out_cpu_thread_history(struct vty *vty,
94 struct cpu_thread_history *a)
e04ab74d 95{
c8a65463
DL
96 vty_out(vty, "%5"PRIdFAST32" %10lu.%03lu %9"PRIuFAST32
97 " %8lu %9lu %8lu %9lu", a->total_active,
d62a17ae 98 a->cpu.total / 1000, a->cpu.total % 1000, a->total_calls,
99 a->cpu.total / a->total_calls, a->cpu.max,
100 a->real.total / a->total_calls, a->real.max);
101 vty_out(vty, " %c%c%c%c%c %s\n",
102 a->types & (1 << THREAD_READ) ? 'R' : ' ',
103 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
104 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
105 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
106 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
e04ab74d 107}
108
e3b78da8 109static void cpu_record_hash_print(struct hash_bucket *bucket, void *args[])
e04ab74d 110{
d62a17ae 111 struct cpu_thread_history *totals = args[0];
fbcac826 112 struct cpu_thread_history copy;
d62a17ae 113 struct vty *vty = args[1];
fbcac826 114 uint8_t *filter = args[2];
d62a17ae 115
116 struct cpu_thread_history *a = bucket->data;
117
fbcac826
QY
118 copy.total_active =
119 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
120 copy.total_calls =
121 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
122 copy.cpu.total =
123 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
124 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
125 copy.real.total =
126 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
127 copy.real.max =
128 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
129 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
130 copy.funcname = a->funcname;
131
132 if (!(copy.types & *filter))
d62a17ae 133 return;
fbcac826
QY
134
135 vty_out_cpu_thread_history(vty, &copy);
136 totals->total_active += copy.total_active;
137 totals->total_calls += copy.total_calls;
138 totals->real.total += copy.real.total;
139 if (totals->real.max < copy.real.max)
140 totals->real.max = copy.real.max;
141 totals->cpu.total += copy.cpu.total;
142 if (totals->cpu.max < copy.cpu.max)
143 totals->cpu.max = copy.cpu.max;
e04ab74d 144}
145
fbcac826 146static void cpu_record_print(struct vty *vty, uint8_t filter)
e04ab74d 147{
d62a17ae 148 struct cpu_thread_history tmp;
149 void *args[3] = {&tmp, vty, &filter};
150 struct thread_master *m;
151 struct listnode *ln;
152
153 memset(&tmp, 0, sizeof tmp);
154 tmp.funcname = "TOTAL";
155 tmp.types = filter;
156
157 pthread_mutex_lock(&masters_mtx);
158 {
159 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
160 const char *name = m->name ? m->name : "main";
161
162 char underline[strlen(name) + 1];
163 memset(underline, '-', sizeof(underline));
4f113d60 164 underline[sizeof(underline) - 1] = '\0';
d62a17ae 165
166 vty_out(vty, "\n");
167 vty_out(vty, "Showing statistics for pthread %s\n",
168 name);
169 vty_out(vty, "-------------------------------%s\n",
170 underline);
171 vty_out(vty, "%21s %18s %18s\n", "",
172 "CPU (user+system):", "Real (wall-clock):");
173 vty_out(vty,
174 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
175 vty_out(vty, " Avg uSec Max uSecs");
176 vty_out(vty, " Type Thread\n");
177
178 if (m->cpu_record->count)
179 hash_iterate(
180 m->cpu_record,
e3b78da8 181 (void (*)(struct hash_bucket *,
d62a17ae 182 void *))cpu_record_hash_print,
183 args);
184 else
185 vty_out(vty, "No data to display yet.\n");
186
187 vty_out(vty, "\n");
188 }
189 }
190 pthread_mutex_unlock(&masters_mtx);
191
192 vty_out(vty, "\n");
193 vty_out(vty, "Total thread statistics\n");
194 vty_out(vty, "-------------------------\n");
195 vty_out(vty, "%21s %18s %18s\n", "",
196 "CPU (user+system):", "Real (wall-clock):");
197 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
198 vty_out(vty, " Avg uSec Max uSecs");
199 vty_out(vty, " Type Thread\n");
200
201 if (tmp.total_calls > 0)
202 vty_out_cpu_thread_history(vty, &tmp);
e04ab74d 203}
204
e3b78da8 205static void cpu_record_hash_clear(struct hash_bucket *bucket, void *args[])
e276eb82 206{
fbcac826 207 uint8_t *filter = args[0];
d62a17ae 208 struct hash *cpu_record = args[1];
209
210 struct cpu_thread_history *a = bucket->data;
62f44022 211
d62a17ae 212 if (!(a->types & *filter))
213 return;
f48f65d2 214
d62a17ae 215 hash_release(cpu_record, bucket->data);
e276eb82
PJ
216}
217
fbcac826 218static void cpu_record_clear(uint8_t filter)
e276eb82 219{
fbcac826 220 uint8_t *tmp = &filter;
d62a17ae 221 struct thread_master *m;
222 struct listnode *ln;
223
224 pthread_mutex_lock(&masters_mtx);
225 {
226 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
227 pthread_mutex_lock(&m->mtx);
228 {
229 void *args[2] = {tmp, m->cpu_record};
230 hash_iterate(
231 m->cpu_record,
e3b78da8 232 (void (*)(struct hash_bucket *,
d62a17ae 233 void *))cpu_record_hash_clear,
234 args);
235 }
236 pthread_mutex_unlock(&m->mtx);
237 }
238 }
239 pthread_mutex_unlock(&masters_mtx);
62f44022
QY
240}
241
fbcac826 242static uint8_t parse_filter(const char *filterstr)
62f44022 243{
d62a17ae 244 int i = 0;
245 int filter = 0;
246
247 while (filterstr[i] != '\0') {
248 switch (filterstr[i]) {
249 case 'r':
250 case 'R':
251 filter |= (1 << THREAD_READ);
252 break;
253 case 'w':
254 case 'W':
255 filter |= (1 << THREAD_WRITE);
256 break;
257 case 't':
258 case 'T':
259 filter |= (1 << THREAD_TIMER);
260 break;
261 case 'e':
262 case 'E':
263 filter |= (1 << THREAD_EVENT);
264 break;
265 case 'x':
266 case 'X':
267 filter |= (1 << THREAD_EXECUTE);
268 break;
269 default:
270 break;
271 }
272 ++i;
273 }
274 return filter;
62f44022
QY
275}
276
277DEFUN (show_thread_cpu,
278 show_thread_cpu_cmd,
279 "show thread cpu [FILTER]",
280 SHOW_STR
281 "Thread information\n"
282 "Thread CPU usage\n"
283 "Display filter (rwtexb)\n")
284{
fbcac826 285 uint8_t filter = (uint8_t)-1U;
d62a17ae 286 int idx = 0;
287
288 if (argv_find(argv, argc, "FILTER", &idx)) {
289 filter = parse_filter(argv[idx]->arg);
290 if (!filter) {
291 vty_out(vty,
292 "Invalid filter \"%s\" specified; must contain at least"
293 "one of 'RWTEXB'\n",
294 argv[idx]->arg);
295 return CMD_WARNING;
296 }
297 }
298
299 cpu_record_print(vty, filter);
300 return CMD_SUCCESS;
e276eb82
PJ
301}
302
8872626b
DS
303static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
304{
305 const char *name = m->name ? m->name : "main";
306 char underline[strlen(name) + 1];
307 uint32_t i;
308
309 memset(underline, '-', sizeof(underline));
310 underline[sizeof(underline) - 1] = '\0';
311
312 vty_out(vty, "\nShowing poll FD's for %s\n", name);
313 vty_out(vty, "----------------------%s\n", underline);
314 vty_out(vty, "Count: %u\n", (uint32_t)m->handler.pfdcount);
315 for (i = 0; i < m->handler.pfdcount; i++)
316 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\n", i,
317 m->handler.pfds[i].fd,
318 m->handler.pfds[i].events,
319 m->handler.pfds[i].revents);
320}
321
322DEFUN (show_thread_poll,
323 show_thread_poll_cmd,
324 "show thread poll",
325 SHOW_STR
326 "Thread information\n"
327 "Show poll FD's and information\n")
328{
329 struct listnode *node;
330 struct thread_master *m;
331
332 pthread_mutex_lock(&masters_mtx);
333 {
334 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
335 show_thread_poll_helper(vty, m);
336 }
337 }
338 pthread_mutex_unlock(&masters_mtx);
339
340 return CMD_SUCCESS;
341}
342
343
49d41a26
DS
344DEFUN (clear_thread_cpu,
345 clear_thread_cpu_cmd,
346 "clear thread cpu [FILTER]",
62f44022 347 "Clear stored data in all pthreads\n"
49d41a26
DS
348 "Thread information\n"
349 "Thread CPU usage\n"
350 "Display filter (rwtexb)\n")
e276eb82 351{
fbcac826 352 uint8_t filter = (uint8_t)-1U;
d62a17ae 353 int idx = 0;
354
355 if (argv_find(argv, argc, "FILTER", &idx)) {
356 filter = parse_filter(argv[idx]->arg);
357 if (!filter) {
358 vty_out(vty,
359 "Invalid filter \"%s\" specified; must contain at least"
360 "one of 'RWTEXB'\n",
361 argv[idx]->arg);
362 return CMD_WARNING;
363 }
364 }
365
366 cpu_record_clear(filter);
367 return CMD_SUCCESS;
e276eb82 368}
6b0655a2 369
d62a17ae 370void thread_cmd_init(void)
0b84f294 371{
d62a17ae 372 install_element(VIEW_NODE, &show_thread_cpu_cmd);
8872626b 373 install_element(VIEW_NODE, &show_thread_poll_cmd);
d62a17ae 374 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
0b84f294 375}
62f44022
QY
376/* CLI end ------------------------------------------------------------------ */
377
0b84f294 378
d62a17ae 379static int thread_timer_cmp(void *a, void *b)
4becea72 380{
d62a17ae 381 struct thread *thread_a = a;
382 struct thread *thread_b = b;
383
384 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, <))
385 return -1;
386 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, >))
387 return 1;
388 return 0;
4becea72
CF
389}
390
d62a17ae 391static void thread_timer_update(void *node, int actual_position)
4becea72 392{
d62a17ae 393 struct thread *thread = node;
4becea72 394
d62a17ae 395 thread->index = actual_position;
4becea72
CF
396}
397
d62a17ae 398static void cancelreq_del(void *cr)
63ccb9cb 399{
d62a17ae 400 XFREE(MTYPE_TMP, cr);
63ccb9cb
QY
401}
402
e0bebc7c 403/* initializer, only ever called once */
4d762f26 404static void initializer(void)
e0bebc7c 405{
d62a17ae 406 pthread_key_create(&thread_current, NULL);
e0bebc7c
QY
407}
408
d62a17ae 409struct thread_master *thread_master_create(const char *name)
718e3744 410{
d62a17ae 411 struct thread_master *rv;
412 struct rlimit limit;
413
414 pthread_once(&init_once, &initializer);
415
416 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
417 if (rv == NULL)
418 return NULL;
419
420 /* Initialize master mutex */
421 pthread_mutex_init(&rv->mtx, NULL);
422 pthread_cond_init(&rv->cancel_cond, NULL);
423
424 /* Set name */
425 rv->name = name ? XSTRDUP(MTYPE_THREAD_MASTER, name) : NULL;
426
427 /* Initialize I/O task data structures */
428 getrlimit(RLIMIT_NOFILE, &limit);
429 rv->fd_limit = (int)limit.rlim_cur;
a6f235f3
DS
430 rv->read = XCALLOC(MTYPE_THREAD_POLL,
431 sizeof(struct thread *) * rv->fd_limit);
432
433 rv->write = XCALLOC(MTYPE_THREAD_POLL,
434 sizeof(struct thread *) * rv->fd_limit);
d62a17ae 435
bd74dc61 436 rv->cpu_record = hash_create_size(
996c9314 437 8, (unsigned int (*)(void *))cpu_record_hash_key,
74df8d6d 438 (bool (*)(const void *, const void *))cpu_record_hash_cmp,
bd74dc61 439 "Thread Hash");
d62a17ae 440
441
442 /* Initialize the timer queues */
443 rv->timer = pqueue_create();
444 rv->timer->cmp = thread_timer_cmp;
445 rv->timer->update = thread_timer_update;
446
447 /* Initialize thread_fetch() settings */
448 rv->spin = true;
449 rv->handle_signals = true;
450
451 /* Set pthread owner, should be updated by actual owner */
452 rv->owner = pthread_self();
453 rv->cancel_req = list_new();
454 rv->cancel_req->del = cancelreq_del;
455 rv->canceled = true;
456
457 /* Initialize pipe poker */
458 pipe(rv->io_pipe);
459 set_nonblocking(rv->io_pipe[0]);
460 set_nonblocking(rv->io_pipe[1]);
461
462 /* Initialize data structures for poll() */
463 rv->handler.pfdsize = rv->fd_limit;
464 rv->handler.pfdcount = 0;
465 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
466 sizeof(struct pollfd) * rv->handler.pfdsize);
467 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
468 sizeof(struct pollfd) * rv->handler.pfdsize);
469
eff09c66 470 /* add to list of threadmasters */
d62a17ae 471 pthread_mutex_lock(&masters_mtx);
472 {
eff09c66
QY
473 if (!masters)
474 masters = list_new();
475
d62a17ae 476 listnode_add(masters, rv);
477 }
478 pthread_mutex_unlock(&masters_mtx);
479
480 return rv;
718e3744 481}
482
d8a8a8de
QY
483void thread_master_set_name(struct thread_master *master, const char *name)
484{
485 pthread_mutex_lock(&master->mtx);
486 {
487 if (master->name)
488 XFREE(MTYPE_THREAD_MASTER, master->name);
489 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
490 }
491 pthread_mutex_unlock(&master->mtx);
492}
493
718e3744 494/* Add a new thread to the list. */
d62a17ae 495static void thread_list_add(struct thread_list *list, struct thread *thread)
718e3744 496{
d62a17ae 497 thread->next = NULL;
498 thread->prev = list->tail;
499 if (list->tail)
500 list->tail->next = thread;
501 else
502 list->head = thread;
503 list->tail = thread;
504 list->count++;
718e3744 505}
506
718e3744 507/* Delete a thread from the list. */
d62a17ae 508static struct thread *thread_list_delete(struct thread_list *list,
509 struct thread *thread)
718e3744 510{
d62a17ae 511 if (thread->next)
512 thread->next->prev = thread->prev;
513 else
514 list->tail = thread->prev;
515 if (thread->prev)
516 thread->prev->next = thread->next;
517 else
518 list->head = thread->next;
519 thread->next = thread->prev = NULL;
520 list->count--;
521 return thread;
718e3744 522}
523
495f0b13 524/* Thread list is empty or not. */
d62a17ae 525static int thread_empty(struct thread_list *list)
495f0b13 526{
d62a17ae 527 return list->head ? 0 : 1;
495f0b13
DS
528}
529
530/* Delete top of the list and return it. */
d62a17ae 531static struct thread *thread_trim_head(struct thread_list *list)
495f0b13 532{
d62a17ae 533 if (!thread_empty(list))
534 return thread_list_delete(list, list->head);
535 return NULL;
495f0b13
DS
536}
537
6ed04aa2
DS
538#define THREAD_UNUSED_DEPTH 10
539
718e3744 540/* Move thread to unuse list. */
d62a17ae 541static void thread_add_unuse(struct thread_master *m, struct thread *thread)
718e3744 542{
6655966d
RZ
543 pthread_mutex_t mtxc = thread->mtx;
544
d62a17ae 545 assert(m != NULL && thread != NULL);
546 assert(thread->next == NULL);
547 assert(thread->prev == NULL);
d62a17ae 548
d62a17ae 549 thread->hist->total_active--;
6ed04aa2
DS
550 memset(thread, 0, sizeof(struct thread));
551 thread->type = THREAD_UNUSED;
552
6655966d
RZ
553 /* Restore the thread mutex context. */
554 thread->mtx = mtxc;
555
556 if (m->unuse.count < THREAD_UNUSED_DEPTH) {
6ed04aa2 557 thread_list_add(&m->unuse, thread);
6655966d
RZ
558 return;
559 }
560
561 thread_free(m, thread);
718e3744 562}
563
564/* Free all unused thread. */
d62a17ae 565static void thread_list_free(struct thread_master *m, struct thread_list *list)
718e3744 566{
d62a17ae 567 struct thread *t;
568 struct thread *next;
569
570 for (t = list->head; t; t = next) {
571 next = t->next;
6655966d 572 thread_free(m, t);
d62a17ae 573 list->count--;
d62a17ae 574 }
718e3744 575}
576
d62a17ae 577static void thread_array_free(struct thread_master *m,
578 struct thread **thread_array)
308d14ae 579{
d62a17ae 580 struct thread *t;
581 int index;
582
583 for (index = 0; index < m->fd_limit; ++index) {
584 t = thread_array[index];
585 if (t) {
586 thread_array[index] = NULL;
6655966d 587 thread_free(m, t);
d62a17ae 588 }
589 }
a6f235f3 590 XFREE(MTYPE_THREAD_POLL, thread_array);
308d14ae
DV
591}
592
d62a17ae 593static void thread_queue_free(struct thread_master *m, struct pqueue *queue)
4becea72 594{
d62a17ae 595 int i;
4becea72 596
d62a17ae 597 for (i = 0; i < queue->size; i++)
6655966d 598 thread_free(m, queue->array[i]);
4becea72 599
d62a17ae 600 pqueue_delete(queue);
4becea72
CF
601}
602
495f0b13
DS
603/*
604 * thread_master_free_unused
605 *
606 * As threads are finished with they are put on the
607 * unuse list for later reuse.
608 * If we are shutting down, Free up unused threads
609 * So we can see if we forget to shut anything off
610 */
d62a17ae 611void thread_master_free_unused(struct thread_master *m)
495f0b13 612{
d62a17ae 613 pthread_mutex_lock(&m->mtx);
614 {
615 struct thread *t;
616 while ((t = thread_trim_head(&m->unuse)) != NULL) {
6655966d 617 thread_free(m, t);
d62a17ae 618 }
619 }
620 pthread_mutex_unlock(&m->mtx);
495f0b13
DS
621}
622
718e3744 623/* Stop thread scheduler. */
d62a17ae 624void thread_master_free(struct thread_master *m)
718e3744 625{
d62a17ae 626 pthread_mutex_lock(&masters_mtx);
627 {
628 listnode_delete(masters, m);
eff09c66 629 if (masters->count == 0) {
6a154c88 630 list_delete(&masters);
eff09c66 631 }
d62a17ae 632 }
633 pthread_mutex_unlock(&masters_mtx);
634
635 thread_array_free(m, m->read);
636 thread_array_free(m, m->write);
637 thread_queue_free(m, m->timer);
638 thread_list_free(m, &m->event);
639 thread_list_free(m, &m->ready);
640 thread_list_free(m, &m->unuse);
641 pthread_mutex_destroy(&m->mtx);
33844bbe 642 pthread_cond_destroy(&m->cancel_cond);
d62a17ae 643 close(m->io_pipe[0]);
644 close(m->io_pipe[1]);
6a154c88 645 list_delete(&m->cancel_req);
1a0a92ea 646 m->cancel_req = NULL;
d62a17ae 647
648 hash_clean(m->cpu_record, cpu_record_hash_free);
649 hash_free(m->cpu_record);
650 m->cpu_record = NULL;
651
4e1000a1
QY
652 if (m->name)
653 XFREE(MTYPE_THREAD_MASTER, m->name);
d62a17ae 654 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
655 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
656 XFREE(MTYPE_THREAD_MASTER, m);
718e3744 657}
658
78ca0342
CF
659/* Return remain time in miliseconds. */
660unsigned long thread_timer_remain_msec(struct thread *thread)
718e3744 661{
d62a17ae 662 int64_t remain;
1189d95f 663
d62a17ae 664 pthread_mutex_lock(&thread->mtx);
665 {
78ca0342 666 remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
d62a17ae 667 }
668 pthread_mutex_unlock(&thread->mtx);
1189d95f 669
d62a17ae 670 return remain < 0 ? 0 : remain;
718e3744 671}
672
78ca0342
CF
673/* Return remain time in seconds. */
674unsigned long thread_timer_remain_second(struct thread *thread)
675{
676 return thread_timer_remain_msec(thread) / 1000LL;
677}
678
9c7753e4
DL
679#define debugargdef const char *funcname, const char *schedfrom, int fromln
680#define debugargpass funcname, schedfrom, fromln
e04ab74d 681
d62a17ae 682struct timeval thread_timer_remain(struct thread *thread)
6ac44687 683{
d62a17ae 684 struct timeval remain;
685 pthread_mutex_lock(&thread->mtx);
686 {
687 monotime_until(&thread->u.sands, &remain);
688 }
689 pthread_mutex_unlock(&thread->mtx);
690 return remain;
6ac44687
CF
691}
692
718e3744 693/* Get new thread. */
d7c0a89a 694static struct thread *thread_get(struct thread_master *m, uint8_t type,
d62a17ae 695 int (*func)(struct thread *), void *arg,
696 debugargdef)
718e3744 697{
d62a17ae 698 struct thread *thread = thread_trim_head(&m->unuse);
699 struct cpu_thread_history tmp;
700
701 if (!thread) {
702 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
703 /* mutex only needs to be initialized at struct creation. */
704 pthread_mutex_init(&thread->mtx, NULL);
705 m->alloc++;
706 }
707
708 thread->type = type;
709 thread->add_type = type;
710 thread->master = m;
711 thread->arg = arg;
712 thread->index = -1;
713 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
714 thread->ref = NULL;
715
716 /*
717 * So if the passed in funcname is not what we have
718 * stored that means the thread->hist needs to be
719 * updated. We keep the last one around in unused
720 * under the assumption that we are probably
721 * going to immediately allocate the same
722 * type of thread.
723 * This hopefully saves us some serious
724 * hash_get lookups.
725 */
726 if (thread->funcname != funcname || thread->func != func) {
727 tmp.func = func;
728 tmp.funcname = funcname;
729 thread->hist =
730 hash_get(m->cpu_record, &tmp,
731 (void *(*)(void *))cpu_record_hash_alloc);
732 }
733 thread->hist->total_active++;
734 thread->func = func;
735 thread->funcname = funcname;
736 thread->schedfrom = schedfrom;
737 thread->schedfrom_line = fromln;
738
739 return thread;
718e3744 740}
741
6655966d
RZ
742static void thread_free(struct thread_master *master, struct thread *thread)
743{
744 /* Update statistics. */
745 assert(master->alloc > 0);
746 master->alloc--;
747
748 /* Free allocated resources. */
749 pthread_mutex_destroy(&thread->mtx);
750 XFREE(MTYPE_THREAD, thread);
751}
752
d62a17ae 753static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
754 nfds_t count, const struct timeval *timer_wait)
209a72a6 755{
d62a17ae 756 /* If timer_wait is null here, that means poll() should block
757 * indefinitely,
758 * unless the thread_master has overriden it by setting
759 * ->selectpoll_timeout.
760 * If the value is positive, it specifies the maximum number of
761 * milliseconds
762 * to wait. If the timeout is -1, it specifies that we should never wait
763 * and
764 * always return immediately even if no event is detected. If the value
765 * is
766 * zero, the behavior is default. */
767 int timeout = -1;
768
769 /* number of file descriptors with events */
770 int num;
771
772 if (timer_wait != NULL
773 && m->selectpoll_timeout == 0) // use the default value
774 timeout = (timer_wait->tv_sec * 1000)
775 + (timer_wait->tv_usec / 1000);
776 else if (m->selectpoll_timeout > 0) // use the user's timeout
777 timeout = m->selectpoll_timeout;
778 else if (m->selectpoll_timeout
779 < 0) // effect a poll (return immediately)
780 timeout = 0;
781
782 /* add poll pipe poker */
783 assert(count + 1 < pfdsize);
784 pfds[count].fd = m->io_pipe[0];
785 pfds[count].events = POLLIN;
786 pfds[count].revents = 0x00;
787
788 num = poll(pfds, count + 1, timeout);
789
790 unsigned char trash[64];
791 if (num > 0 && pfds[count].revents != 0 && num--)
792 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
793 ;
794
795 return num;
209a72a6
DS
796}
797
718e3744 798/* Add new read thread. */
d62a17ae 799struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
800 int (*func)(struct thread *),
801 void *arg, int fd,
802 struct thread **t_ptr,
803 debugargdef)
718e3744 804{
d62a17ae 805 struct thread *thread = NULL;
806
9b864cd3 807 assert(fd >= 0 && fd < m->fd_limit);
d62a17ae 808 pthread_mutex_lock(&m->mtx);
809 {
810 if (t_ptr
811 && *t_ptr) // thread is already scheduled; don't reschedule
812 {
813 pthread_mutex_unlock(&m->mtx);
814 return NULL;
815 }
816
817 /* default to a new pollfd */
818 nfds_t queuepos = m->handler.pfdcount;
819
820 /* if we already have a pollfd for our file descriptor, find and
821 * use it */
822 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
823 if (m->handler.pfds[i].fd == fd) {
824 queuepos = i;
825 break;
826 }
827
828 /* make sure we have room for this fd + pipe poker fd */
829 assert(queuepos + 1 < m->handler.pfdsize);
830
831 thread = thread_get(m, dir, func, arg, debugargpass);
832
833 m->handler.pfds[queuepos].fd = fd;
834 m->handler.pfds[queuepos].events |=
835 (dir == THREAD_READ ? POLLIN : POLLOUT);
836
837 if (queuepos == m->handler.pfdcount)
838 m->handler.pfdcount++;
839
840 if (thread) {
841 pthread_mutex_lock(&thread->mtx);
842 {
843 thread->u.fd = fd;
844 if (dir == THREAD_READ)
845 m->read[thread->u.fd] = thread;
846 else
847 m->write[thread->u.fd] = thread;
848 }
849 pthread_mutex_unlock(&thread->mtx);
850
851 if (t_ptr) {
852 *t_ptr = thread;
853 thread->ref = t_ptr;
854 }
855 }
856
857 AWAKEN(m);
858 }
859 pthread_mutex_unlock(&m->mtx);
860
861 return thread;
718e3744 862}
863
56a94b36 864static struct thread *
d62a17ae 865funcname_thread_add_timer_timeval(struct thread_master *m,
866 int (*func)(struct thread *), int type,
867 void *arg, struct timeval *time_relative,
868 struct thread **t_ptr, debugargdef)
718e3744 869{
d62a17ae 870 struct thread *thread;
871 struct pqueue *queue;
872
873 assert(m != NULL);
874
875 assert(type == THREAD_TIMER);
876 assert(time_relative);
877
878 pthread_mutex_lock(&m->mtx);
879 {
880 if (t_ptr
881 && *t_ptr) // thread is already scheduled; don't reschedule
882 {
883 pthread_mutex_unlock(&m->mtx);
884 return NULL;
885 }
886
887 queue = m->timer;
888 thread = thread_get(m, type, func, arg, debugargpass);
889
890 pthread_mutex_lock(&thread->mtx);
891 {
892 monotime(&thread->u.sands);
893 timeradd(&thread->u.sands, time_relative,
894 &thread->u.sands);
895 pqueue_enqueue(thread, queue);
896 if (t_ptr) {
897 *t_ptr = thread;
898 thread->ref = t_ptr;
899 }
900 }
901 pthread_mutex_unlock(&thread->mtx);
902
903 AWAKEN(m);
904 }
905 pthread_mutex_unlock(&m->mtx);
906
907 return thread;
9e867fe6 908}
909
98c91ac6 910
911/* Add timer event thread. */
d62a17ae 912struct thread *funcname_thread_add_timer(struct thread_master *m,
913 int (*func)(struct thread *),
914 void *arg, long timer,
915 struct thread **t_ptr, debugargdef)
9e867fe6 916{
d62a17ae 917 struct timeval trel;
9e867fe6 918
d62a17ae 919 assert(m != NULL);
9e867fe6 920
d62a17ae 921 trel.tv_sec = timer;
922 trel.tv_usec = 0;
9e867fe6 923
d62a17ae 924 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
925 &trel, t_ptr, debugargpass);
98c91ac6 926}
9e867fe6 927
98c91ac6 928/* Add timer event thread with "millisecond" resolution */
d62a17ae 929struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
930 int (*func)(struct thread *),
931 void *arg, long timer,
932 struct thread **t_ptr,
933 debugargdef)
98c91ac6 934{
d62a17ae 935 struct timeval trel;
9e867fe6 936
d62a17ae 937 assert(m != NULL);
718e3744 938
d62a17ae 939 trel.tv_sec = timer / 1000;
940 trel.tv_usec = 1000 * (timer % 1000);
98c91ac6 941
d62a17ae 942 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
943 &trel, t_ptr, debugargpass);
a48b4e6d 944}
945
d03c4cbd 946/* Add timer event thread with "millisecond" resolution */
d62a17ae 947struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
948 int (*func)(struct thread *),
949 void *arg, struct timeval *tv,
950 struct thread **t_ptr, debugargdef)
d03c4cbd 951{
d62a17ae 952 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
953 t_ptr, debugargpass);
d03c4cbd
DL
954}
955
718e3744 956/* Add simple event thread. */
d62a17ae 957struct thread *funcname_thread_add_event(struct thread_master *m,
958 int (*func)(struct thread *),
959 void *arg, int val,
960 struct thread **t_ptr, debugargdef)
718e3744 961{
d62a17ae 962 struct thread *thread;
963
964 assert(m != NULL);
965
966 pthread_mutex_lock(&m->mtx);
967 {
968 if (t_ptr
969 && *t_ptr) // thread is already scheduled; don't reschedule
970 {
971 pthread_mutex_unlock(&m->mtx);
972 return NULL;
973 }
974
975 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
976 pthread_mutex_lock(&thread->mtx);
977 {
978 thread->u.val = val;
979 thread_list_add(&m->event, thread);
980 }
981 pthread_mutex_unlock(&thread->mtx);
982
983 if (t_ptr) {
984 *t_ptr = thread;
985 thread->ref = t_ptr;
986 }
987
988 AWAKEN(m);
989 }
990 pthread_mutex_unlock(&m->mtx);
991
992 return thread;
718e3744 993}
994
63ccb9cb
QY
995/* Thread cancellation ------------------------------------------------------ */
996
8797240e
QY
997/**
998 * NOT's out the .events field of pollfd corresponding to the given file
999 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
1000 *
1001 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
1002 * implementation for details.
1003 *
1004 * @param master
1005 * @param fd
1006 * @param state the event to cancel. One or more (OR'd together) of the
1007 * following:
1008 * - POLLIN
1009 * - POLLOUT
1010 */
d62a17ae 1011static void thread_cancel_rw(struct thread_master *master, int fd, short state)
0a95a0d0 1012{
42d74538
QY
1013 bool found = false;
1014
d62a17ae 1015 /* Cancel POLLHUP too just in case some bozo set it */
1016 state |= POLLHUP;
1017
1018 /* find the index of corresponding pollfd */
1019 nfds_t i;
1020
1021 for (i = 0; i < master->handler.pfdcount; i++)
42d74538
QY
1022 if (master->handler.pfds[i].fd == fd) {
1023 found = true;
d62a17ae 1024 break;
42d74538
QY
1025 }
1026
1027 if (!found) {
1028 zlog_debug(
1029 "[!] Received cancellation request for nonexistent rw job");
1030 zlog_debug("[!] threadmaster: %s | fd: %d",
996c9314 1031 master->name ? master->name : "", fd);
42d74538
QY
1032 return;
1033 }
d62a17ae 1034
1035 /* NOT out event. */
1036 master->handler.pfds[i].events &= ~(state);
1037
1038 /* If all events are canceled, delete / resize the pollfd array. */
1039 if (master->handler.pfds[i].events == 0) {
1040 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
1041 (master->handler.pfdcount - i - 1)
1042 * sizeof(struct pollfd));
1043 master->handler.pfdcount--;
1044 }
1045
1046 /* If we have the same pollfd in the copy, perform the same operations,
1047 * otherwise return. */
1048 if (i >= master->handler.copycount)
1049 return;
1050
1051 master->handler.copy[i].events &= ~(state);
1052
1053 if (master->handler.copy[i].events == 0) {
1054 memmove(master->handler.copy + i, master->handler.copy + i + 1,
1055 (master->handler.copycount - i - 1)
1056 * sizeof(struct pollfd));
1057 master->handler.copycount--;
1058 }
0a95a0d0
DS
1059}
1060
1189d95f 1061/**
63ccb9cb 1062 * Process cancellation requests.
1189d95f 1063 *
63ccb9cb
QY
1064 * This may only be run from the pthread which owns the thread_master.
1065 *
1066 * @param master the thread master to process
1067 * @REQUIRE master->mtx
1189d95f 1068 */
d62a17ae 1069static void do_thread_cancel(struct thread_master *master)
718e3744 1070{
d62a17ae 1071 struct thread_list *list = NULL;
1072 struct pqueue *queue = NULL;
1073 struct thread **thread_array = NULL;
1074 struct thread *thread;
1075
1076 struct cancel_req *cr;
1077 struct listnode *ln;
1078 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1079 /* If this is an event object cancellation, linear search
1080 * through event
1081 * list deleting any events which have the specified argument.
1082 * We also
1083 * need to check every thread in the ready queue. */
1084 if (cr->eventobj) {
1085 struct thread *t;
1086 thread = master->event.head;
1087
1088 while (thread) {
1089 t = thread;
1090 thread = t->next;
1091
1092 if (t->arg == cr->eventobj) {
1093 thread_list_delete(&master->event, t);
1094 if (t->ref)
1095 *t->ref = NULL;
1096 thread_add_unuse(master, t);
1097 }
1098 }
1099
1100 thread = master->ready.head;
1101 while (thread) {
1102 t = thread;
1103 thread = t->next;
1104
1105 if (t->arg == cr->eventobj) {
1106 thread_list_delete(&master->ready, t);
1107 if (t->ref)
1108 *t->ref = NULL;
1109 thread_add_unuse(master, t);
1110 }
1111 }
1112 continue;
1113 }
1114
1115 /* The pointer varies depending on whether the cancellation
1116 * request was
1117 * made asynchronously or not. If it was, we need to check
1118 * whether the
1119 * thread even exists anymore before cancelling it. */
1120 thread = (cr->thread) ? cr->thread : *cr->threadref;
1121
1122 if (!thread)
1123 continue;
1124
1125 /* Determine the appropriate queue to cancel the thread from */
1126 switch (thread->type) {
1127 case THREAD_READ:
1128 thread_cancel_rw(master, thread->u.fd, POLLIN);
1129 thread_array = master->read;
1130 break;
1131 case THREAD_WRITE:
1132 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1133 thread_array = master->write;
1134 break;
1135 case THREAD_TIMER:
1136 queue = master->timer;
1137 break;
1138 case THREAD_EVENT:
1139 list = &master->event;
1140 break;
1141 case THREAD_READY:
1142 list = &master->ready;
1143 break;
1144 default:
1145 continue;
1146 break;
1147 }
1148
1149 if (queue) {
1150 assert(thread->index >= 0);
522f7f99
DS
1151 assert(thread == queue->array[thread->index]);
1152 pqueue_remove_at(thread->index, queue);
d62a17ae 1153 } else if (list) {
1154 thread_list_delete(list, thread);
1155 } else if (thread_array) {
1156 thread_array[thread->u.fd] = NULL;
1157 } else {
1158 assert(!"Thread should be either in queue or list or array!");
1159 }
1160
1161 if (thread->ref)
1162 *thread->ref = NULL;
1163
1164 thread_add_unuse(thread->master, thread);
1165 }
1166
1167 /* Delete and free all cancellation requests */
1168 list_delete_all_node(master->cancel_req);
1169
1170 /* Wake up any threads which may be blocked in thread_cancel_async() */
1171 master->canceled = true;
1172 pthread_cond_broadcast(&master->cancel_cond);
718e3744 1173}
1174
63ccb9cb
QY
1175/**
1176 * Cancel any events which have the specified argument.
1177 *
1178 * MT-Unsafe
1179 *
1180 * @param m the thread_master to cancel from
1181 * @param arg the argument passed when creating the event
1182 */
d62a17ae 1183void thread_cancel_event(struct thread_master *master, void *arg)
718e3744 1184{
d62a17ae 1185 assert(master->owner == pthread_self());
1186
1187 pthread_mutex_lock(&master->mtx);
1188 {
1189 struct cancel_req *cr =
1190 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1191 cr->eventobj = arg;
1192 listnode_add(master->cancel_req, cr);
1193 do_thread_cancel(master);
1194 }
1195 pthread_mutex_unlock(&master->mtx);
63ccb9cb 1196}
1189d95f 1197
63ccb9cb
QY
1198/**
1199 * Cancel a specific task.
1200 *
1201 * MT-Unsafe
1202 *
1203 * @param thread task to cancel
1204 */
d62a17ae 1205void thread_cancel(struct thread *thread)
63ccb9cb 1206{
6ed04aa2 1207 struct thread_master *master = thread->master;
d62a17ae 1208
6ed04aa2
DS
1209 assert(master->owner == pthread_self());
1210
1211 pthread_mutex_lock(&master->mtx);
d62a17ae 1212 {
1213 struct cancel_req *cr =
1214 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1215 cr->thread = thread;
6ed04aa2
DS
1216 listnode_add(master->cancel_req, cr);
1217 do_thread_cancel(master);
d62a17ae 1218 }
6ed04aa2 1219 pthread_mutex_unlock(&master->mtx);
63ccb9cb 1220}
1189d95f 1221
63ccb9cb
QY
1222/**
1223 * Asynchronous cancellation.
1224 *
8797240e
QY
1225 * Called with either a struct thread ** or void * to an event argument,
1226 * this function posts the correct cancellation request and blocks until it is
1227 * serviced.
63ccb9cb
QY
1228 *
1229 * If the thread is currently running, execution blocks until it completes.
1230 *
8797240e
QY
1231 * The last two parameters are mutually exclusive, i.e. if you pass one the
1232 * other must be NULL.
1233 *
1234 * When the cancellation procedure executes on the target thread_master, the
1235 * thread * provided is checked for nullity. If it is null, the thread is
1236 * assumed to no longer exist and the cancellation request is a no-op. Thus
1237 * users of this API must pass a back-reference when scheduling the original
1238 * task.
1239 *
63ccb9cb
QY
1240 * MT-Safe
1241 *
8797240e
QY
1242 * @param master the thread master with the relevant event / task
1243 * @param thread pointer to thread to cancel
1244 * @param eventobj the event
63ccb9cb 1245 */
d62a17ae 1246void thread_cancel_async(struct thread_master *master, struct thread **thread,
1247 void *eventobj)
63ccb9cb 1248{
d62a17ae 1249 assert(!(thread && eventobj) && (thread || eventobj));
1250 assert(master->owner != pthread_self());
1251
1252 pthread_mutex_lock(&master->mtx);
1253 {
1254 master->canceled = false;
1255
1256 if (thread) {
1257 struct cancel_req *cr =
1258 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1259 cr->threadref = thread;
1260 listnode_add(master->cancel_req, cr);
1261 } else if (eventobj) {
1262 struct cancel_req *cr =
1263 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1264 cr->eventobj = eventobj;
1265 listnode_add(master->cancel_req, cr);
1266 }
1267 AWAKEN(master);
1268
1269 while (!master->canceled)
1270 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1271 }
1272 pthread_mutex_unlock(&master->mtx);
718e3744 1273}
63ccb9cb 1274/* ------------------------------------------------------------------------- */
718e3744 1275
d62a17ae 1276static struct timeval *thread_timer_wait(struct pqueue *queue,
1277 struct timeval *timer_val)
718e3744 1278{
d62a17ae 1279 if (queue->size) {
1280 struct thread *next_timer = queue->array[0];
1281 monotime_until(&next_timer->u.sands, timer_val);
1282 return timer_val;
1283 }
1284 return NULL;
718e3744 1285}
718e3744 1286
d62a17ae 1287static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1288 struct thread *fetch)
718e3744 1289{
d62a17ae 1290 *fetch = *thread;
1291 thread_add_unuse(m, thread);
1292 return fetch;
718e3744 1293}
1294
d62a17ae 1295static int thread_process_io_helper(struct thread_master *m,
1296 struct thread *thread, short state, int pos)
5d4ccd4e 1297{
d62a17ae 1298 struct thread **thread_array;
1299
1300 if (!thread)
1301 return 0;
1302
1303 if (thread->type == THREAD_READ)
1304 thread_array = m->read;
1305 else
1306 thread_array = m->write;
1307
1308 thread_array[thread->u.fd] = NULL;
1309 thread_list_add(&m->ready, thread);
1310 thread->type = THREAD_READY;
1311 /* if another pthread scheduled this file descriptor for the event we're
1312 * responding to, no problem; we're getting to it now */
1313 thread->master->handler.pfds[pos].events &= ~(state);
1314 return 1;
5d4ccd4e
DS
1315}
1316
8797240e
QY
1317/**
1318 * Process I/O events.
1319 *
1320 * Walks through file descriptor array looking for those pollfds whose .revents
1321 * field has something interesting. Deletes any invalid file descriptors.
1322 *
1323 * @param m the thread master
1324 * @param num the number of active file descriptors (return value of poll())
1325 */
d62a17ae 1326static void thread_process_io(struct thread_master *m, unsigned int num)
0a95a0d0 1327{
d62a17ae 1328 unsigned int ready = 0;
1329 struct pollfd *pfds = m->handler.copy;
1330
1331 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1332 /* no event for current fd? immediately continue */
1333 if (pfds[i].revents == 0)
1334 continue;
1335
1336 ready++;
1337
1338 /* Unless someone has called thread_cancel from another pthread,
1339 * the only
1340 * thing that could have changed in m->handler.pfds while we
1341 * were
1342 * asleep is the .events field in a given pollfd. Barring
1343 * thread_cancel()
1344 * that value should be a superset of the values we have in our
1345 * copy, so
1346 * there's no need to update it. Similarily, barring deletion,
1347 * the fd
1348 * should still be a valid index into the master's pfds. */
1349 if (pfds[i].revents & (POLLIN | POLLHUP))
1350 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
1351 i);
1352 if (pfds[i].revents & POLLOUT)
1353 thread_process_io_helper(m, m->write[pfds[i].fd],
1354 POLLOUT, i);
1355
1356 /* if one of our file descriptors is garbage, remove the same
1357 * from
1358 * both pfds + update sizes and index */
1359 if (pfds[i].revents & POLLNVAL) {
1360 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1361 (m->handler.pfdcount - i - 1)
1362 * sizeof(struct pollfd));
1363 m->handler.pfdcount--;
1364
1365 memmove(pfds + i, pfds + i + 1,
1366 (m->handler.copycount - i - 1)
1367 * sizeof(struct pollfd));
1368 m->handler.copycount--;
1369
1370 i--;
1371 }
1372 }
718e3744 1373}
1374
8b70d0b0 1375/* Add all timers that have popped to the ready list. */
d62a17ae 1376static unsigned int thread_process_timers(struct pqueue *queue,
1377 struct timeval *timenow)
a48b4e6d 1378{
d62a17ae 1379 struct thread *thread;
1380 unsigned int ready = 0;
1381
1382 while (queue->size) {
1383 thread = queue->array[0];
1384 if (timercmp(timenow, &thread->u.sands, <))
1385 return ready;
1386 pqueue_dequeue(queue);
1387 thread->type = THREAD_READY;
1388 thread_list_add(&thread->master->ready, thread);
1389 ready++;
1390 }
1391 return ready;
a48b4e6d 1392}
1393
2613abe6 1394/* process a list en masse, e.g. for event thread lists */
d62a17ae 1395static unsigned int thread_process(struct thread_list *list)
2613abe6 1396{
d62a17ae 1397 struct thread *thread;
1398 struct thread *next;
1399 unsigned int ready = 0;
1400
1401 for (thread = list->head; thread; thread = next) {
1402 next = thread->next;
1403 thread_list_delete(list, thread);
1404 thread->type = THREAD_READY;
1405 thread_list_add(&thread->master->ready, thread);
1406 ready++;
1407 }
1408 return ready;
2613abe6
PJ
1409}
1410
1411
718e3744 1412/* Fetch next ready thread. */
d62a17ae 1413struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
718e3744 1414{
d62a17ae 1415 struct thread *thread = NULL;
1416 struct timeval now;
1417 struct timeval zerotime = {0, 0};
1418 struct timeval tv;
1419 struct timeval *tw = NULL;
1420
1421 int num = 0;
1422
1423 do {
1424 /* Handle signals if any */
1425 if (m->handle_signals)
1426 quagga_sigevent_process();
1427
1428 pthread_mutex_lock(&m->mtx);
1429
1430 /* Process any pending cancellation requests */
1431 do_thread_cancel(m);
1432
e3c9529e
QY
1433 /*
1434 * Attempt to flush ready queue before going into poll().
1435 * This is performance-critical. Think twice before modifying.
1436 */
1437 if ((thread = thread_trim_head(&m->ready))) {
1438 fetch = thread_run(m, thread, fetch);
1439 if (fetch->ref)
1440 *fetch->ref = NULL;
1441 pthread_mutex_unlock(&m->mtx);
1442 break;
1443 }
1444
1445 /* otherwise, tick through scheduling sequence */
1446
bca37d17
QY
1447 /*
1448 * Post events to ready queue. This must come before the
1449 * following block since events should occur immediately
1450 */
d62a17ae 1451 thread_process(&m->event);
1452
bca37d17
QY
1453 /*
1454 * If there are no tasks on the ready queue, we will poll()
1455 * until a timer expires or we receive I/O, whichever comes
1456 * first. The strategy for doing this is:
d62a17ae 1457 *
1458 * - If there are events pending, set the poll() timeout to zero
1459 * - If there are no events pending, but there are timers
1460 * pending, set the
1461 * timeout to the smallest remaining time on any timer
1462 * - If there are neither timers nor events pending, but there
1463 * are file
1464 * descriptors pending, block indefinitely in poll()
1465 * - If nothing is pending, it's time for the application to die
1466 *
1467 * In every case except the last, we need to hit poll() at least
bca37d17
QY
1468 * once per loop to avoid starvation by events
1469 */
d62a17ae 1470 if (m->ready.count == 0)
1471 tw = thread_timer_wait(m->timer, &tv);
1472
1473 if (m->ready.count != 0 || (tw && !timercmp(tw, &zerotime, >)))
1474 tw = &zerotime;
1475
1476 if (!tw && m->handler.pfdcount == 0) { /* die */
1477 pthread_mutex_unlock(&m->mtx);
1478 fetch = NULL;
1479 break;
1480 }
1481
bca37d17
QY
1482 /*
1483 * Copy pollfd array + # active pollfds in it. Not necessary to
1484 * copy the array size as this is fixed.
1485 */
d62a17ae 1486 m->handler.copycount = m->handler.pfdcount;
1487 memcpy(m->handler.copy, m->handler.pfds,
1488 m->handler.copycount * sizeof(struct pollfd));
1489
e3c9529e
QY
1490 pthread_mutex_unlock(&m->mtx);
1491 {
1492 num = fd_poll(m, m->handler.copy, m->handler.pfdsize,
1493 m->handler.copycount, tw);
1494 }
1495 pthread_mutex_lock(&m->mtx);
d764d2cc 1496
e3c9529e
QY
1497 /* Handle any errors received in poll() */
1498 if (num < 0) {
1499 if (errno == EINTR) {
d62a17ae 1500 pthread_mutex_unlock(&m->mtx);
e3c9529e
QY
1501 /* loop around to signal handler */
1502 continue;
d62a17ae 1503 }
1504
e3c9529e 1505 /* else die */
450971aa 1506 flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
9ef9495e 1507 safe_strerror(errno));
e3c9529e
QY
1508 pthread_mutex_unlock(&m->mtx);
1509 fetch = NULL;
1510 break;
bca37d17 1511 }
d62a17ae 1512
1513 /* Post timers to ready queue. */
1514 monotime(&now);
1515 thread_process_timers(m->timer, &now);
1516
1517 /* Post I/O to ready queue. */
1518 if (num > 0)
1519 thread_process_io(m, num);
1520
d62a17ae 1521 pthread_mutex_unlock(&m->mtx);
1522
1523 } while (!thread && m->spin);
1524
1525 return fetch;
718e3744 1526}
1527
d62a17ae 1528static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
62f44022 1529{
d62a17ae 1530 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1531 + (a.tv_usec - b.tv_usec));
62f44022
QY
1532}
1533
d62a17ae 1534unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1535 unsigned long *cputime)
718e3744 1536{
d62a17ae 1537 /* This is 'user + sys' time. */
1538 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1539 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1540 return timeval_elapsed(now->real, start->real);
8b70d0b0 1541}
1542
50596be0
DS
1543/* We should aim to yield after yield milliseconds, which defaults
1544 to THREAD_YIELD_TIME_SLOT .
8b70d0b0 1545 Note: we are using real (wall clock) time for this calculation.
1546 It could be argued that CPU time may make more sense in certain
1547 contexts. The things to consider are whether the thread may have
1548 blocked (in which case wall time increases, but CPU time does not),
1549 or whether the system is heavily loaded with other processes competing
d62a17ae 1550 for CPU time. On balance, wall clock time seems to make sense.
8b70d0b0 1551 Plus it has the added benefit that gettimeofday should be faster
1552 than calling getrusage. */
d62a17ae 1553int thread_should_yield(struct thread *thread)
718e3744 1554{
d62a17ae 1555 int result;
1556 pthread_mutex_lock(&thread->mtx);
1557 {
1558 result = monotime_since(&thread->real, NULL)
1559 > (int64_t)thread->yield;
1560 }
1561 pthread_mutex_unlock(&thread->mtx);
1562 return result;
50596be0
DS
1563}
1564
d62a17ae 1565void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
50596be0 1566{
d62a17ae 1567 pthread_mutex_lock(&thread->mtx);
1568 {
1569 thread->yield = yield_time;
1570 }
1571 pthread_mutex_unlock(&thread->mtx);
718e3744 1572}
1573
d62a17ae 1574void thread_getrusage(RUSAGE_T *r)
db9c0df9 1575{
231db9a6
DS
1576#if defined RUSAGE_THREAD
1577#define FRR_RUSAGE RUSAGE_THREAD
1578#else
1579#define FRR_RUSAGE RUSAGE_SELF
1580#endif
d62a17ae 1581 monotime(&r->real);
231db9a6 1582 getrusage(FRR_RUSAGE, &(r->cpu));
db9c0df9
PJ
1583}
1584
fbcac826
QY
1585/*
1586 * Call a thread.
1587 *
1588 * This function will atomically update the thread's usage history. At present
1589 * this is the only spot where usage history is written. Nevertheless the code
1590 * has been written such that the introduction of writers in the future should
1591 * not need to update it provided the writers atomically perform only the
1592 * operations done here, i.e. updating the total and maximum times. In
1593 * particular, the maximum real and cpu times must be monotonically increasing
1594 * or this code is not correct.
1595 */
d62a17ae 1596void thread_call(struct thread *thread)
718e3744 1597{
fbcac826
QY
1598 _Atomic unsigned long realtime, cputime;
1599 unsigned long exp;
1600 unsigned long helper;
d62a17ae 1601 RUSAGE_T before, after;
cc8b13a0 1602
d62a17ae 1603 GETRUSAGE(&before);
1604 thread->real = before.real;
718e3744 1605
d62a17ae 1606 pthread_setspecific(thread_current, thread);
1607 (*thread->func)(thread);
1608 pthread_setspecific(thread_current, NULL);
718e3744 1609
d62a17ae 1610 GETRUSAGE(&after);
718e3744 1611
fbcac826
QY
1612 realtime = thread_consumed_time(&after, &before, &helper);
1613 cputime = helper;
1614
1615 /* update realtime */
1616 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1617 memory_order_seq_cst);
1618 exp = atomic_load_explicit(&thread->hist->real.max,
1619 memory_order_seq_cst);
1620 while (exp < realtime
1621 && !atomic_compare_exchange_weak_explicit(
1622 &thread->hist->real.max, &exp, realtime,
1623 memory_order_seq_cst, memory_order_seq_cst))
1624 ;
1625
1626 /* update cputime */
1627 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1628 memory_order_seq_cst);
1629 exp = atomic_load_explicit(&thread->hist->cpu.max,
1630 memory_order_seq_cst);
1631 while (exp < cputime
1632 && !atomic_compare_exchange_weak_explicit(
1633 &thread->hist->cpu.max, &exp, cputime,
1634 memory_order_seq_cst, memory_order_seq_cst))
1635 ;
1636
1637 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1638 memory_order_seq_cst);
1639 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1640 memory_order_seq_cst);
718e3744 1641
924b9229 1642#ifdef CONSUMED_TIME_CHECK
d62a17ae 1643 if (realtime > CONSUMED_TIME_CHECK) {
1644 /*
1645 * We have a CPU Hog on our hands.
1646 * Whinge about it now, so we're aware this is yet another task
1647 * to fix.
1648 */
9ef9495e 1649 flog_warn(
450971aa 1650 EC_LIB_SLOW_THREAD,
d62a17ae 1651 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1652 thread->funcname, (unsigned long)thread->func,
1653 realtime / 1000, cputime / 1000);
1654 }
924b9229 1655#endif /* CONSUMED_TIME_CHECK */
718e3744 1656}
1657
1658/* Execute thread */
d62a17ae 1659void funcname_thread_execute(struct thread_master *m,
1660 int (*func)(struct thread *), void *arg, int val,
1661 debugargdef)
718e3744 1662{
c4345fbf 1663 struct thread *thread;
718e3744 1664
c4345fbf
RZ
1665 /* Get or allocate new thread to execute. */
1666 pthread_mutex_lock(&m->mtx);
1667 {
1668 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
9c7753e4 1669
c4345fbf
RZ
1670 /* Set its event value. */
1671 pthread_mutex_lock(&thread->mtx);
1672 {
1673 thread->add_type = THREAD_EXECUTE;
1674 thread->u.val = val;
1675 thread->ref = &thread;
1676 }
1677 pthread_mutex_unlock(&thread->mtx);
1678 }
1679 pthread_mutex_unlock(&m->mtx);
f7c62e11 1680
c4345fbf
RZ
1681 /* Execute thread doing all accounting. */
1682 thread_call(thread);
9c7753e4 1683
c4345fbf
RZ
1684 /* Give back or free thread. */
1685 thread_add_unuse(m, thread);
718e3744 1686}