]> git.proxmox.com Git - mirror_frr.git/blame - lib/thread.c
lib, pimd, sharpd: Various output string cleanups
[mirror_frr.git] / lib / thread.c
CommitLineData
718e3744 1/* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
896014f4
DL
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
718e3744 19 */
20
21/* #define DEBUG */
22
23#include <zebra.h>
308d14ae 24#include <sys/resource.h>
718e3744 25
26#include "thread.h"
27#include "memory.h"
28#include "log.h"
e04ab74d 29#include "hash.h"
8390828d 30#include "pqueue.h"
e04ab74d 31#include "command.h"
05c447dd 32#include "sigevent.h"
3bf2673b 33#include "network.h"
bd74dc61 34#include "jhash.h"
fbcac826 35#include "frratomic.h"
9ef9495e 36#include "lib_errors.h"
d6be5fb9 37
d62a17ae 38DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
4a1ab8e4 39DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
a6f235f3 40DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
d62a17ae 41DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
4a1ab8e4 42
c284542b
DL
43DECLARE_LIST(thread_list, struct thread, threaditem)
44
3b96b781
HT
45#if defined(__APPLE__)
46#include <mach/mach.h>
47#include <mach/mach_time.h>
48#endif
49
d62a17ae 50#define AWAKEN(m) \
51 do { \
52 static unsigned char wakebyte = 0x01; \
53 write(m->io_pipe[1], &wakebyte, 1); \
54 } while (0);
3bf2673b 55
62f44022 56/* control variable for initializer */
c17faa4b 57static pthread_once_t init_once = PTHREAD_ONCE_INIT;
e0bebc7c 58pthread_key_t thread_current;
6b0655a2 59
c17faa4b 60static pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
62f44022
QY
61static struct list *masters;
62
6655966d 63static void thread_free(struct thread_master *master, struct thread *thread);
6b0655a2 64
62f44022 65/* CLI start ---------------------------------------------------------------- */
d8b87afe 66static unsigned int cpu_record_hash_key(const struct cpu_thread_history *a)
e04ab74d 67{
883cc51d 68 int size = sizeof(a->func);
bd74dc61
DS
69
70 return jhash(&a->func, size, 0);
e04ab74d 71}
72
74df8d6d 73static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
d62a17ae 74 const struct cpu_thread_history *b)
e04ab74d 75{
d62a17ae 76 return a->func == b->func;
e04ab74d 77}
78
d62a17ae 79static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
e04ab74d 80{
d62a17ae 81 struct cpu_thread_history *new;
82 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
83 new->func = a->func;
84 new->funcname = a->funcname;
85 return new;
e04ab74d 86}
87
d62a17ae 88static void cpu_record_hash_free(void *a)
228da428 89{
d62a17ae 90 struct cpu_thread_history *hist = a;
91
92 XFREE(MTYPE_THREAD_STATS, hist);
228da428
CC
93}
94
d62a17ae 95static void vty_out_cpu_thread_history(struct vty *vty,
96 struct cpu_thread_history *a)
e04ab74d 97{
9a8a7b0e 98 vty_out(vty, "%5zu %10zu.%03zu %9zu %8zu %9zu %8zu %9zu",
fa0069c6
DS
99 a->total_active, a->cpu.total / 1000, a->cpu.total % 1000,
100 a->total_calls, a->cpu.total / a->total_calls, a->cpu.max,
d62a17ae 101 a->real.total / a->total_calls, a->real.max);
102 vty_out(vty, " %c%c%c%c%c %s\n",
103 a->types & (1 << THREAD_READ) ? 'R' : ' ',
104 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
105 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
106 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
107 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
e04ab74d 108}
109
e3b78da8 110static void cpu_record_hash_print(struct hash_bucket *bucket, void *args[])
e04ab74d 111{
d62a17ae 112 struct cpu_thread_history *totals = args[0];
fbcac826 113 struct cpu_thread_history copy;
d62a17ae 114 struct vty *vty = args[1];
fbcac826 115 uint8_t *filter = args[2];
d62a17ae 116
117 struct cpu_thread_history *a = bucket->data;
118
fbcac826
QY
119 copy.total_active =
120 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
121 copy.total_calls =
122 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
123 copy.cpu.total =
124 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
125 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
126 copy.real.total =
127 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
128 copy.real.max =
129 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
130 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
131 copy.funcname = a->funcname;
132
133 if (!(copy.types & *filter))
d62a17ae 134 return;
fbcac826
QY
135
136 vty_out_cpu_thread_history(vty, &copy);
137 totals->total_active += copy.total_active;
138 totals->total_calls += copy.total_calls;
139 totals->real.total += copy.real.total;
140 if (totals->real.max < copy.real.max)
141 totals->real.max = copy.real.max;
142 totals->cpu.total += copy.cpu.total;
143 if (totals->cpu.max < copy.cpu.max)
144 totals->cpu.max = copy.cpu.max;
e04ab74d 145}
146
fbcac826 147static void cpu_record_print(struct vty *vty, uint8_t filter)
e04ab74d 148{
d62a17ae 149 struct cpu_thread_history tmp;
150 void *args[3] = {&tmp, vty, &filter};
151 struct thread_master *m;
152 struct listnode *ln;
153
154 memset(&tmp, 0, sizeof tmp);
155 tmp.funcname = "TOTAL";
156 tmp.types = filter;
157
158 pthread_mutex_lock(&masters_mtx);
159 {
160 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
161 const char *name = m->name ? m->name : "main";
162
163 char underline[strlen(name) + 1];
164 memset(underline, '-', sizeof(underline));
4f113d60 165 underline[sizeof(underline) - 1] = '\0';
d62a17ae 166
167 vty_out(vty, "\n");
168 vty_out(vty, "Showing statistics for pthread %s\n",
169 name);
170 vty_out(vty, "-------------------------------%s\n",
171 underline);
172 vty_out(vty, "%21s %18s %18s\n", "",
173 "CPU (user+system):", "Real (wall-clock):");
174 vty_out(vty,
175 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
176 vty_out(vty, " Avg uSec Max uSecs");
177 vty_out(vty, " Type Thread\n");
178
179 if (m->cpu_record->count)
180 hash_iterate(
181 m->cpu_record,
e3b78da8 182 (void (*)(struct hash_bucket *,
d62a17ae 183 void *))cpu_record_hash_print,
184 args);
185 else
186 vty_out(vty, "No data to display yet.\n");
187
188 vty_out(vty, "\n");
189 }
190 }
191 pthread_mutex_unlock(&masters_mtx);
192
193 vty_out(vty, "\n");
194 vty_out(vty, "Total thread statistics\n");
195 vty_out(vty, "-------------------------\n");
196 vty_out(vty, "%21s %18s %18s\n", "",
197 "CPU (user+system):", "Real (wall-clock):");
198 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
199 vty_out(vty, " Avg uSec Max uSecs");
200 vty_out(vty, " Type Thread\n");
201
202 if (tmp.total_calls > 0)
203 vty_out_cpu_thread_history(vty, &tmp);
e04ab74d 204}
205
e3b78da8 206static void cpu_record_hash_clear(struct hash_bucket *bucket, void *args[])
e276eb82 207{
fbcac826 208 uint8_t *filter = args[0];
d62a17ae 209 struct hash *cpu_record = args[1];
210
211 struct cpu_thread_history *a = bucket->data;
62f44022 212
d62a17ae 213 if (!(a->types & *filter))
214 return;
f48f65d2 215
d62a17ae 216 hash_release(cpu_record, bucket->data);
e276eb82
PJ
217}
218
fbcac826 219static void cpu_record_clear(uint8_t filter)
e276eb82 220{
fbcac826 221 uint8_t *tmp = &filter;
d62a17ae 222 struct thread_master *m;
223 struct listnode *ln;
224
225 pthread_mutex_lock(&masters_mtx);
226 {
227 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
228 pthread_mutex_lock(&m->mtx);
229 {
230 void *args[2] = {tmp, m->cpu_record};
231 hash_iterate(
232 m->cpu_record,
e3b78da8 233 (void (*)(struct hash_bucket *,
d62a17ae 234 void *))cpu_record_hash_clear,
235 args);
236 }
237 pthread_mutex_unlock(&m->mtx);
238 }
239 }
240 pthread_mutex_unlock(&masters_mtx);
62f44022
QY
241}
242
fbcac826 243static uint8_t parse_filter(const char *filterstr)
62f44022 244{
d62a17ae 245 int i = 0;
246 int filter = 0;
247
248 while (filterstr[i] != '\0') {
249 switch (filterstr[i]) {
250 case 'r':
251 case 'R':
252 filter |= (1 << THREAD_READ);
253 break;
254 case 'w':
255 case 'W':
256 filter |= (1 << THREAD_WRITE);
257 break;
258 case 't':
259 case 'T':
260 filter |= (1 << THREAD_TIMER);
261 break;
262 case 'e':
263 case 'E':
264 filter |= (1 << THREAD_EVENT);
265 break;
266 case 'x':
267 case 'X':
268 filter |= (1 << THREAD_EXECUTE);
269 break;
270 default:
271 break;
272 }
273 ++i;
274 }
275 return filter;
62f44022
QY
276}
277
278DEFUN (show_thread_cpu,
279 show_thread_cpu_cmd,
280 "show thread cpu [FILTER]",
281 SHOW_STR
282 "Thread information\n"
283 "Thread CPU usage\n"
61fa0b97 284 "Display filter (rwtex)\n")
62f44022 285{
fbcac826 286 uint8_t filter = (uint8_t)-1U;
d62a17ae 287 int idx = 0;
288
289 if (argv_find(argv, argc, "FILTER", &idx)) {
290 filter = parse_filter(argv[idx]->arg);
291 if (!filter) {
292 vty_out(vty,
293 "Invalid filter \"%s\" specified; must contain at least"
294 "one of 'RWTEXB'\n",
295 argv[idx]->arg);
296 return CMD_WARNING;
297 }
298 }
299
300 cpu_record_print(vty, filter);
301 return CMD_SUCCESS;
e276eb82
PJ
302}
303
8872626b
DS
304static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
305{
306 const char *name = m->name ? m->name : "main";
307 char underline[strlen(name) + 1];
308 uint32_t i;
309
310 memset(underline, '-', sizeof(underline));
311 underline[sizeof(underline) - 1] = '\0';
312
313 vty_out(vty, "\nShowing poll FD's for %s\n", name);
314 vty_out(vty, "----------------------%s\n", underline);
6c19478a
DS
315 vty_out(vty, "Count: %u/%d\n", (uint32_t)m->handler.pfdcount,
316 m->fd_limit);
8872626b
DS
317 for (i = 0; i < m->handler.pfdcount; i++)
318 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\n", i,
319 m->handler.pfds[i].fd,
320 m->handler.pfds[i].events,
321 m->handler.pfds[i].revents);
322}
323
324DEFUN (show_thread_poll,
325 show_thread_poll_cmd,
326 "show thread poll",
327 SHOW_STR
328 "Thread information\n"
329 "Show poll FD's and information\n")
330{
331 struct listnode *node;
332 struct thread_master *m;
333
334 pthread_mutex_lock(&masters_mtx);
335 {
336 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
337 show_thread_poll_helper(vty, m);
338 }
339 }
340 pthread_mutex_unlock(&masters_mtx);
341
342 return CMD_SUCCESS;
343}
344
345
49d41a26
DS
346DEFUN (clear_thread_cpu,
347 clear_thread_cpu_cmd,
348 "clear thread cpu [FILTER]",
62f44022 349 "Clear stored data in all pthreads\n"
49d41a26
DS
350 "Thread information\n"
351 "Thread CPU usage\n"
352 "Display filter (rwtexb)\n")
e276eb82 353{
fbcac826 354 uint8_t filter = (uint8_t)-1U;
d62a17ae 355 int idx = 0;
356
357 if (argv_find(argv, argc, "FILTER", &idx)) {
358 filter = parse_filter(argv[idx]->arg);
359 if (!filter) {
360 vty_out(vty,
361 "Invalid filter \"%s\" specified; must contain at least"
362 "one of 'RWTEXB'\n",
363 argv[idx]->arg);
364 return CMD_WARNING;
365 }
366 }
367
368 cpu_record_clear(filter);
369 return CMD_SUCCESS;
e276eb82 370}
6b0655a2 371
d62a17ae 372void thread_cmd_init(void)
0b84f294 373{
d62a17ae 374 install_element(VIEW_NODE, &show_thread_cpu_cmd);
8872626b 375 install_element(VIEW_NODE, &show_thread_poll_cmd);
d62a17ae 376 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
0b84f294 377}
62f44022
QY
378/* CLI end ------------------------------------------------------------------ */
379
0b84f294 380
8390828d
DL
381static int thread_timer_cmp(void *a, void *b)
382{
383 struct thread *thread_a = a;
384 struct thread *thread_b = b;
385
386 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, <))
387 return -1;
388 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, >))
389 return 1;
390 return 0;
391}
392
393static void thread_timer_update(void *node, int actual_position)
394{
395 struct thread *thread = node;
396
397 thread->index = actual_position;
398}
399
d62a17ae 400static void cancelreq_del(void *cr)
63ccb9cb 401{
d62a17ae 402 XFREE(MTYPE_TMP, cr);
63ccb9cb
QY
403}
404
e0bebc7c 405/* initializer, only ever called once */
4d762f26 406static void initializer(void)
e0bebc7c 407{
d62a17ae 408 pthread_key_create(&thread_current, NULL);
e0bebc7c
QY
409}
410
d62a17ae 411struct thread_master *thread_master_create(const char *name)
718e3744 412{
d62a17ae 413 struct thread_master *rv;
414 struct rlimit limit;
415
416 pthread_once(&init_once, &initializer);
417
418 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
d62a17ae 419
420 /* Initialize master mutex */
421 pthread_mutex_init(&rv->mtx, NULL);
422 pthread_cond_init(&rv->cancel_cond, NULL);
423
424 /* Set name */
425 rv->name = name ? XSTRDUP(MTYPE_THREAD_MASTER, name) : NULL;
426
427 /* Initialize I/O task data structures */
428 getrlimit(RLIMIT_NOFILE, &limit);
429 rv->fd_limit = (int)limit.rlim_cur;
a6f235f3
DS
430 rv->read = XCALLOC(MTYPE_THREAD_POLL,
431 sizeof(struct thread *) * rv->fd_limit);
432
433 rv->write = XCALLOC(MTYPE_THREAD_POLL,
434 sizeof(struct thread *) * rv->fd_limit);
d62a17ae 435
bd74dc61 436 rv->cpu_record = hash_create_size(
d8b87afe 437 8, (unsigned int (*)(const void *))cpu_record_hash_key,
74df8d6d 438 (bool (*)(const void *, const void *))cpu_record_hash_cmp,
bd74dc61 439 "Thread Hash");
d62a17ae 440
c284542b
DL
441 thread_list_init(&rv->event);
442 thread_list_init(&rv->ready);
443 thread_list_init(&rv->unuse);
8390828d
DL
444
445 /* Initialize the timer queues */
446 rv->timer = pqueue_create();
447 rv->timer->cmp = thread_timer_cmp;
448 rv->timer->update = thread_timer_update;
d62a17ae 449
450 /* Initialize thread_fetch() settings */
451 rv->spin = true;
452 rv->handle_signals = true;
453
454 /* Set pthread owner, should be updated by actual owner */
455 rv->owner = pthread_self();
456 rv->cancel_req = list_new();
457 rv->cancel_req->del = cancelreq_del;
458 rv->canceled = true;
459
460 /* Initialize pipe poker */
461 pipe(rv->io_pipe);
462 set_nonblocking(rv->io_pipe[0]);
463 set_nonblocking(rv->io_pipe[1]);
464
465 /* Initialize data structures for poll() */
466 rv->handler.pfdsize = rv->fd_limit;
467 rv->handler.pfdcount = 0;
468 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
469 sizeof(struct pollfd) * rv->handler.pfdsize);
470 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
471 sizeof(struct pollfd) * rv->handler.pfdsize);
472
eff09c66 473 /* add to list of threadmasters */
d62a17ae 474 pthread_mutex_lock(&masters_mtx);
475 {
eff09c66
QY
476 if (!masters)
477 masters = list_new();
478
d62a17ae 479 listnode_add(masters, rv);
480 }
481 pthread_mutex_unlock(&masters_mtx);
482
483 return rv;
718e3744 484}
485
d8a8a8de
QY
486void thread_master_set_name(struct thread_master *master, const char *name)
487{
488 pthread_mutex_lock(&master->mtx);
489 {
0a22ddfb 490 XFREE(MTYPE_THREAD_MASTER, master->name);
d8a8a8de
QY
491 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
492 }
493 pthread_mutex_unlock(&master->mtx);
494}
495
6ed04aa2
DS
496#define THREAD_UNUSED_DEPTH 10
497
718e3744 498/* Move thread to unuse list. */
d62a17ae 499static void thread_add_unuse(struct thread_master *m, struct thread *thread)
718e3744 500{
6655966d
RZ
501 pthread_mutex_t mtxc = thread->mtx;
502
d62a17ae 503 assert(m != NULL && thread != NULL);
d62a17ae 504
d62a17ae 505 thread->hist->total_active--;
6ed04aa2
DS
506 memset(thread, 0, sizeof(struct thread));
507 thread->type = THREAD_UNUSED;
508
6655966d
RZ
509 /* Restore the thread mutex context. */
510 thread->mtx = mtxc;
511
c284542b
DL
512 if (thread_list_count(&m->unuse) < THREAD_UNUSED_DEPTH) {
513 thread_list_add_tail(&m->unuse, thread);
6655966d
RZ
514 return;
515 }
516
517 thread_free(m, thread);
718e3744 518}
519
520/* Free all unused thread. */
c284542b
DL
521static void thread_list_free(struct thread_master *m,
522 struct thread_list_head *list)
718e3744 523{
d62a17ae 524 struct thread *t;
d62a17ae 525
c284542b 526 while ((t = thread_list_pop(list)))
6655966d 527 thread_free(m, t);
718e3744 528}
529
d62a17ae 530static void thread_array_free(struct thread_master *m,
531 struct thread **thread_array)
308d14ae 532{
d62a17ae 533 struct thread *t;
534 int index;
535
536 for (index = 0; index < m->fd_limit; ++index) {
537 t = thread_array[index];
538 if (t) {
539 thread_array[index] = NULL;
6655966d 540 thread_free(m, t);
d62a17ae 541 }
542 }
a6f235f3 543 XFREE(MTYPE_THREAD_POLL, thread_array);
308d14ae
DV
544}
545
8390828d
DL
546static void thread_queue_free(struct thread_master *m, struct pqueue *queue)
547{
548 int i;
549
550 for (i = 0; i < queue->size; i++)
551 thread_free(m, queue->array[i]);
552
553 pqueue_delete(queue);
554}
555
495f0b13
DS
556/*
557 * thread_master_free_unused
558 *
559 * As threads are finished with they are put on the
560 * unuse list for later reuse.
561 * If we are shutting down, Free up unused threads
562 * So we can see if we forget to shut anything off
563 */
d62a17ae 564void thread_master_free_unused(struct thread_master *m)
495f0b13 565{
d62a17ae 566 pthread_mutex_lock(&m->mtx);
567 {
568 struct thread *t;
c284542b 569 while ((t = thread_list_pop(&m->unuse)))
6655966d 570 thread_free(m, t);
d62a17ae 571 }
572 pthread_mutex_unlock(&m->mtx);
495f0b13
DS
573}
574
718e3744 575/* Stop thread scheduler. */
d62a17ae 576void thread_master_free(struct thread_master *m)
718e3744 577{
d62a17ae 578 pthread_mutex_lock(&masters_mtx);
579 {
580 listnode_delete(masters, m);
eff09c66 581 if (masters->count == 0) {
6a154c88 582 list_delete(&masters);
eff09c66 583 }
d62a17ae 584 }
585 pthread_mutex_unlock(&masters_mtx);
586
587 thread_array_free(m, m->read);
588 thread_array_free(m, m->write);
8390828d 589 thread_queue_free(m, m->timer);
d62a17ae 590 thread_list_free(m, &m->event);
591 thread_list_free(m, &m->ready);
592 thread_list_free(m, &m->unuse);
593 pthread_mutex_destroy(&m->mtx);
33844bbe 594 pthread_cond_destroy(&m->cancel_cond);
d62a17ae 595 close(m->io_pipe[0]);
596 close(m->io_pipe[1]);
6a154c88 597 list_delete(&m->cancel_req);
1a0a92ea 598 m->cancel_req = NULL;
d62a17ae 599
600 hash_clean(m->cpu_record, cpu_record_hash_free);
601 hash_free(m->cpu_record);
602 m->cpu_record = NULL;
603
0a22ddfb 604 XFREE(MTYPE_THREAD_MASTER, m->name);
d62a17ae 605 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
606 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
607 XFREE(MTYPE_THREAD_MASTER, m);
718e3744 608}
609
78ca0342
CF
610/* Return remain time in miliseconds. */
611unsigned long thread_timer_remain_msec(struct thread *thread)
718e3744 612{
d62a17ae 613 int64_t remain;
1189d95f 614
d62a17ae 615 pthread_mutex_lock(&thread->mtx);
616 {
78ca0342 617 remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
d62a17ae 618 }
619 pthread_mutex_unlock(&thread->mtx);
1189d95f 620
d62a17ae 621 return remain < 0 ? 0 : remain;
718e3744 622}
623
78ca0342
CF
624/* Return remain time in seconds. */
625unsigned long thread_timer_remain_second(struct thread *thread)
626{
627 return thread_timer_remain_msec(thread) / 1000LL;
628}
629
9c7753e4
DL
630#define debugargdef const char *funcname, const char *schedfrom, int fromln
631#define debugargpass funcname, schedfrom, fromln
e04ab74d 632
d62a17ae 633struct timeval thread_timer_remain(struct thread *thread)
6ac44687 634{
d62a17ae 635 struct timeval remain;
636 pthread_mutex_lock(&thread->mtx);
637 {
638 monotime_until(&thread->u.sands, &remain);
639 }
640 pthread_mutex_unlock(&thread->mtx);
641 return remain;
6ac44687
CF
642}
643
718e3744 644/* Get new thread. */
d7c0a89a 645static struct thread *thread_get(struct thread_master *m, uint8_t type,
d62a17ae 646 int (*func)(struct thread *), void *arg,
647 debugargdef)
718e3744 648{
c284542b 649 struct thread *thread = thread_list_pop(&m->unuse);
d62a17ae 650 struct cpu_thread_history tmp;
651
652 if (!thread) {
653 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
654 /* mutex only needs to be initialized at struct creation. */
655 pthread_mutex_init(&thread->mtx, NULL);
656 m->alloc++;
657 }
658
659 thread->type = type;
660 thread->add_type = type;
661 thread->master = m;
662 thread->arg = arg;
8390828d 663 thread->index = -1;
d62a17ae 664 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
665 thread->ref = NULL;
666
667 /*
668 * So if the passed in funcname is not what we have
669 * stored that means the thread->hist needs to be
670 * updated. We keep the last one around in unused
671 * under the assumption that we are probably
672 * going to immediately allocate the same
673 * type of thread.
674 * This hopefully saves us some serious
675 * hash_get lookups.
676 */
677 if (thread->funcname != funcname || thread->func != func) {
678 tmp.func = func;
679 tmp.funcname = funcname;
680 thread->hist =
681 hash_get(m->cpu_record, &tmp,
682 (void *(*)(void *))cpu_record_hash_alloc);
683 }
684 thread->hist->total_active++;
685 thread->func = func;
686 thread->funcname = funcname;
687 thread->schedfrom = schedfrom;
688 thread->schedfrom_line = fromln;
689
690 return thread;
718e3744 691}
692
6655966d
RZ
693static void thread_free(struct thread_master *master, struct thread *thread)
694{
695 /* Update statistics. */
696 assert(master->alloc > 0);
697 master->alloc--;
698
699 /* Free allocated resources. */
700 pthread_mutex_destroy(&thread->mtx);
701 XFREE(MTYPE_THREAD, thread);
702}
703
d62a17ae 704static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
705 nfds_t count, const struct timeval *timer_wait)
209a72a6 706{
d62a17ae 707 /* If timer_wait is null here, that means poll() should block
708 * indefinitely,
709 * unless the thread_master has overriden it by setting
710 * ->selectpoll_timeout.
711 * If the value is positive, it specifies the maximum number of
712 * milliseconds
713 * to wait. If the timeout is -1, it specifies that we should never wait
714 * and
715 * always return immediately even if no event is detected. If the value
716 * is
717 * zero, the behavior is default. */
718 int timeout = -1;
719
720 /* number of file descriptors with events */
721 int num;
722
723 if (timer_wait != NULL
724 && m->selectpoll_timeout == 0) // use the default value
725 timeout = (timer_wait->tv_sec * 1000)
726 + (timer_wait->tv_usec / 1000);
727 else if (m->selectpoll_timeout > 0) // use the user's timeout
728 timeout = m->selectpoll_timeout;
729 else if (m->selectpoll_timeout
730 < 0) // effect a poll (return immediately)
731 timeout = 0;
732
733 /* add poll pipe poker */
734 assert(count + 1 < pfdsize);
735 pfds[count].fd = m->io_pipe[0];
736 pfds[count].events = POLLIN;
737 pfds[count].revents = 0x00;
738
739 num = poll(pfds, count + 1, timeout);
740
741 unsigned char trash[64];
742 if (num > 0 && pfds[count].revents != 0 && num--)
743 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
744 ;
745
746 return num;
209a72a6
DS
747}
748
718e3744 749/* Add new read thread. */
d62a17ae 750struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
751 int (*func)(struct thread *),
752 void *arg, int fd,
753 struct thread **t_ptr,
754 debugargdef)
718e3744 755{
d62a17ae 756 struct thread *thread = NULL;
757
9b864cd3 758 assert(fd >= 0 && fd < m->fd_limit);
d62a17ae 759 pthread_mutex_lock(&m->mtx);
760 {
761 if (t_ptr
762 && *t_ptr) // thread is already scheduled; don't reschedule
763 {
764 pthread_mutex_unlock(&m->mtx);
765 return NULL;
766 }
767
768 /* default to a new pollfd */
769 nfds_t queuepos = m->handler.pfdcount;
770
771 /* if we already have a pollfd for our file descriptor, find and
772 * use it */
773 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
774 if (m->handler.pfds[i].fd == fd) {
775 queuepos = i;
776 break;
777 }
778
779 /* make sure we have room for this fd + pipe poker fd */
780 assert(queuepos + 1 < m->handler.pfdsize);
781
782 thread = thread_get(m, dir, func, arg, debugargpass);
783
784 m->handler.pfds[queuepos].fd = fd;
785 m->handler.pfds[queuepos].events |=
786 (dir == THREAD_READ ? POLLIN : POLLOUT);
787
788 if (queuepos == m->handler.pfdcount)
789 m->handler.pfdcount++;
790
791 if (thread) {
792 pthread_mutex_lock(&thread->mtx);
793 {
794 thread->u.fd = fd;
795 if (dir == THREAD_READ)
796 m->read[thread->u.fd] = thread;
797 else
798 m->write[thread->u.fd] = thread;
799 }
800 pthread_mutex_unlock(&thread->mtx);
801
802 if (t_ptr) {
803 *t_ptr = thread;
804 thread->ref = t_ptr;
805 }
806 }
807
808 AWAKEN(m);
809 }
810 pthread_mutex_unlock(&m->mtx);
811
812 return thread;
718e3744 813}
814
56a94b36 815static struct thread *
d62a17ae 816funcname_thread_add_timer_timeval(struct thread_master *m,
817 int (*func)(struct thread *), int type,
818 void *arg, struct timeval *time_relative,
819 struct thread **t_ptr, debugargdef)
718e3744 820{
d62a17ae 821 struct thread *thread;
8390828d 822 struct pqueue *queue;
d62a17ae 823
824 assert(m != NULL);
825
826 assert(type == THREAD_TIMER);
827 assert(time_relative);
828
829 pthread_mutex_lock(&m->mtx);
830 {
831 if (t_ptr
832 && *t_ptr) // thread is already scheduled; don't reschedule
833 {
834 pthread_mutex_unlock(&m->mtx);
835 return NULL;
836 }
837
8390828d 838 queue = m->timer;
d62a17ae 839 thread = thread_get(m, type, func, arg, debugargpass);
840
841 pthread_mutex_lock(&thread->mtx);
842 {
843 monotime(&thread->u.sands);
844 timeradd(&thread->u.sands, time_relative,
845 &thread->u.sands);
8390828d 846 pqueue_enqueue(thread, queue);
d62a17ae 847 if (t_ptr) {
848 *t_ptr = thread;
849 thread->ref = t_ptr;
850 }
851 }
852 pthread_mutex_unlock(&thread->mtx);
853
854 AWAKEN(m);
855 }
856 pthread_mutex_unlock(&m->mtx);
857
858 return thread;
9e867fe6 859}
860
98c91ac6 861
862/* Add timer event thread. */
d62a17ae 863struct thread *funcname_thread_add_timer(struct thread_master *m,
864 int (*func)(struct thread *),
865 void *arg, long timer,
866 struct thread **t_ptr, debugargdef)
9e867fe6 867{
d62a17ae 868 struct timeval trel;
9e867fe6 869
d62a17ae 870 assert(m != NULL);
9e867fe6 871
d62a17ae 872 trel.tv_sec = timer;
873 trel.tv_usec = 0;
9e867fe6 874
d62a17ae 875 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
876 &trel, t_ptr, debugargpass);
98c91ac6 877}
9e867fe6 878
98c91ac6 879/* Add timer event thread with "millisecond" resolution */
d62a17ae 880struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
881 int (*func)(struct thread *),
882 void *arg, long timer,
883 struct thread **t_ptr,
884 debugargdef)
98c91ac6 885{
d62a17ae 886 struct timeval trel;
9e867fe6 887
d62a17ae 888 assert(m != NULL);
718e3744 889
d62a17ae 890 trel.tv_sec = timer / 1000;
891 trel.tv_usec = 1000 * (timer % 1000);
98c91ac6 892
d62a17ae 893 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
894 &trel, t_ptr, debugargpass);
a48b4e6d 895}
896
d03c4cbd 897/* Add timer event thread with "millisecond" resolution */
d62a17ae 898struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
899 int (*func)(struct thread *),
900 void *arg, struct timeval *tv,
901 struct thread **t_ptr, debugargdef)
d03c4cbd 902{
d62a17ae 903 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
904 t_ptr, debugargpass);
d03c4cbd
DL
905}
906
718e3744 907/* Add simple event thread. */
d62a17ae 908struct thread *funcname_thread_add_event(struct thread_master *m,
909 int (*func)(struct thread *),
910 void *arg, int val,
911 struct thread **t_ptr, debugargdef)
718e3744 912{
d62a17ae 913 struct thread *thread;
914
915 assert(m != NULL);
916
917 pthread_mutex_lock(&m->mtx);
918 {
919 if (t_ptr
920 && *t_ptr) // thread is already scheduled; don't reschedule
921 {
922 pthread_mutex_unlock(&m->mtx);
923 return NULL;
924 }
925
926 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
927 pthread_mutex_lock(&thread->mtx);
928 {
929 thread->u.val = val;
c284542b 930 thread_list_add_tail(&m->event, thread);
d62a17ae 931 }
932 pthread_mutex_unlock(&thread->mtx);
933
934 if (t_ptr) {
935 *t_ptr = thread;
936 thread->ref = t_ptr;
937 }
938
939 AWAKEN(m);
940 }
941 pthread_mutex_unlock(&m->mtx);
942
943 return thread;
718e3744 944}
945
63ccb9cb
QY
946/* Thread cancellation ------------------------------------------------------ */
947
8797240e
QY
948/**
949 * NOT's out the .events field of pollfd corresponding to the given file
950 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
951 *
952 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
953 * implementation for details.
954 *
955 * @param master
956 * @param fd
957 * @param state the event to cancel. One or more (OR'd together) of the
958 * following:
959 * - POLLIN
960 * - POLLOUT
961 */
d62a17ae 962static void thread_cancel_rw(struct thread_master *master, int fd, short state)
0a95a0d0 963{
42d74538
QY
964 bool found = false;
965
d62a17ae 966 /* Cancel POLLHUP too just in case some bozo set it */
967 state |= POLLHUP;
968
969 /* find the index of corresponding pollfd */
970 nfds_t i;
971
972 for (i = 0; i < master->handler.pfdcount; i++)
42d74538
QY
973 if (master->handler.pfds[i].fd == fd) {
974 found = true;
d62a17ae 975 break;
42d74538
QY
976 }
977
978 if (!found) {
979 zlog_debug(
980 "[!] Received cancellation request for nonexistent rw job");
981 zlog_debug("[!] threadmaster: %s | fd: %d",
996c9314 982 master->name ? master->name : "", fd);
42d74538
QY
983 return;
984 }
d62a17ae 985
986 /* NOT out event. */
987 master->handler.pfds[i].events &= ~(state);
988
989 /* If all events are canceled, delete / resize the pollfd array. */
990 if (master->handler.pfds[i].events == 0) {
991 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
992 (master->handler.pfdcount - i - 1)
993 * sizeof(struct pollfd));
994 master->handler.pfdcount--;
995 }
996
997 /* If we have the same pollfd in the copy, perform the same operations,
998 * otherwise return. */
999 if (i >= master->handler.copycount)
1000 return;
1001
1002 master->handler.copy[i].events &= ~(state);
1003
1004 if (master->handler.copy[i].events == 0) {
1005 memmove(master->handler.copy + i, master->handler.copy + i + 1,
1006 (master->handler.copycount - i - 1)
1007 * sizeof(struct pollfd));
1008 master->handler.copycount--;
1009 }
0a95a0d0
DS
1010}
1011
1189d95f 1012/**
63ccb9cb 1013 * Process cancellation requests.
1189d95f 1014 *
63ccb9cb
QY
1015 * This may only be run from the pthread which owns the thread_master.
1016 *
1017 * @param master the thread master to process
1018 * @REQUIRE master->mtx
1189d95f 1019 */
d62a17ae 1020static void do_thread_cancel(struct thread_master *master)
718e3744 1021{
c284542b 1022 struct thread_list_head *list = NULL;
8390828d 1023 struct pqueue *queue = NULL;
d62a17ae 1024 struct thread **thread_array = NULL;
1025 struct thread *thread;
1026
1027 struct cancel_req *cr;
1028 struct listnode *ln;
1029 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1030 /* If this is an event object cancellation, linear search
1031 * through event
1032 * list deleting any events which have the specified argument.
1033 * We also
1034 * need to check every thread in the ready queue. */
1035 if (cr->eventobj) {
1036 struct thread *t;
c284542b 1037
81fddbe7 1038 frr_each_safe(thread_list, &master->event, t) {
c284542b
DL
1039 if (t->arg != cr->eventobj)
1040 continue;
1041 thread_list_del(&master->event, t);
1042 if (t->ref)
1043 *t->ref = NULL;
1044 thread_add_unuse(master, t);
d62a17ae 1045 }
1046
81fddbe7 1047 frr_each_safe(thread_list, &master->ready, t) {
c284542b
DL
1048 if (t->arg != cr->eventobj)
1049 continue;
1050 thread_list_del(&master->ready, t);
1051 if (t->ref)
1052 *t->ref = NULL;
1053 thread_add_unuse(master, t);
d62a17ae 1054 }
1055 continue;
1056 }
1057
1058 /* The pointer varies depending on whether the cancellation
1059 * request was
1060 * made asynchronously or not. If it was, we need to check
1061 * whether the
1062 * thread even exists anymore before cancelling it. */
1063 thread = (cr->thread) ? cr->thread : *cr->threadref;
1064
1065 if (!thread)
1066 continue;
1067
1068 /* Determine the appropriate queue to cancel the thread from */
1069 switch (thread->type) {
1070 case THREAD_READ:
1071 thread_cancel_rw(master, thread->u.fd, POLLIN);
1072 thread_array = master->read;
1073 break;
1074 case THREAD_WRITE:
1075 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1076 thread_array = master->write;
1077 break;
1078 case THREAD_TIMER:
8390828d 1079 queue = master->timer;
d62a17ae 1080 break;
1081 case THREAD_EVENT:
1082 list = &master->event;
1083 break;
1084 case THREAD_READY:
1085 list = &master->ready;
1086 break;
1087 default:
1088 continue;
1089 break;
1090 }
1091
8390828d
DL
1092 if (queue) {
1093 assert(thread->index >= 0);
1094 assert(thread == queue->array[thread->index]);
1095 pqueue_remove_at(thread->index, queue);
1096 } else if (list) {
c284542b 1097 thread_list_del(list, thread);
d62a17ae 1098 } else if (thread_array) {
1099 thread_array[thread->u.fd] = NULL;
8390828d
DL
1100 } else {
1101 assert(!"Thread should be either in queue or list or array!");
d62a17ae 1102 }
1103
1104 if (thread->ref)
1105 *thread->ref = NULL;
1106
1107 thread_add_unuse(thread->master, thread);
1108 }
1109
1110 /* Delete and free all cancellation requests */
1111 list_delete_all_node(master->cancel_req);
1112
1113 /* Wake up any threads which may be blocked in thread_cancel_async() */
1114 master->canceled = true;
1115 pthread_cond_broadcast(&master->cancel_cond);
718e3744 1116}
1117
63ccb9cb
QY
1118/**
1119 * Cancel any events which have the specified argument.
1120 *
1121 * MT-Unsafe
1122 *
1123 * @param m the thread_master to cancel from
1124 * @param arg the argument passed when creating the event
1125 */
d62a17ae 1126void thread_cancel_event(struct thread_master *master, void *arg)
718e3744 1127{
d62a17ae 1128 assert(master->owner == pthread_self());
1129
1130 pthread_mutex_lock(&master->mtx);
1131 {
1132 struct cancel_req *cr =
1133 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1134 cr->eventobj = arg;
1135 listnode_add(master->cancel_req, cr);
1136 do_thread_cancel(master);
1137 }
1138 pthread_mutex_unlock(&master->mtx);
63ccb9cb 1139}
1189d95f 1140
63ccb9cb
QY
1141/**
1142 * Cancel a specific task.
1143 *
1144 * MT-Unsafe
1145 *
1146 * @param thread task to cancel
1147 */
d62a17ae 1148void thread_cancel(struct thread *thread)
63ccb9cb 1149{
6ed04aa2 1150 struct thread_master *master = thread->master;
d62a17ae 1151
6ed04aa2
DS
1152 assert(master->owner == pthread_self());
1153
1154 pthread_mutex_lock(&master->mtx);
d62a17ae 1155 {
1156 struct cancel_req *cr =
1157 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1158 cr->thread = thread;
6ed04aa2
DS
1159 listnode_add(master->cancel_req, cr);
1160 do_thread_cancel(master);
d62a17ae 1161 }
6ed04aa2 1162 pthread_mutex_unlock(&master->mtx);
63ccb9cb 1163}
1189d95f 1164
63ccb9cb
QY
1165/**
1166 * Asynchronous cancellation.
1167 *
8797240e
QY
1168 * Called with either a struct thread ** or void * to an event argument,
1169 * this function posts the correct cancellation request and blocks until it is
1170 * serviced.
63ccb9cb
QY
1171 *
1172 * If the thread is currently running, execution blocks until it completes.
1173 *
8797240e
QY
1174 * The last two parameters are mutually exclusive, i.e. if you pass one the
1175 * other must be NULL.
1176 *
1177 * When the cancellation procedure executes on the target thread_master, the
1178 * thread * provided is checked for nullity. If it is null, the thread is
1179 * assumed to no longer exist and the cancellation request is a no-op. Thus
1180 * users of this API must pass a back-reference when scheduling the original
1181 * task.
1182 *
63ccb9cb
QY
1183 * MT-Safe
1184 *
8797240e
QY
1185 * @param master the thread master with the relevant event / task
1186 * @param thread pointer to thread to cancel
1187 * @param eventobj the event
63ccb9cb 1188 */
d62a17ae 1189void thread_cancel_async(struct thread_master *master, struct thread **thread,
1190 void *eventobj)
63ccb9cb 1191{
d62a17ae 1192 assert(!(thread && eventobj) && (thread || eventobj));
1193 assert(master->owner != pthread_self());
1194
1195 pthread_mutex_lock(&master->mtx);
1196 {
1197 master->canceled = false;
1198
1199 if (thread) {
1200 struct cancel_req *cr =
1201 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1202 cr->threadref = thread;
1203 listnode_add(master->cancel_req, cr);
1204 } else if (eventobj) {
1205 struct cancel_req *cr =
1206 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1207 cr->eventobj = eventobj;
1208 listnode_add(master->cancel_req, cr);
1209 }
1210 AWAKEN(master);
1211
1212 while (!master->canceled)
1213 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1214 }
1215 pthread_mutex_unlock(&master->mtx);
718e3744 1216}
63ccb9cb 1217/* ------------------------------------------------------------------------- */
718e3744 1218
8390828d 1219static struct timeval *thread_timer_wait(struct pqueue *queue,
d62a17ae 1220 struct timeval *timer_val)
718e3744 1221{
8390828d
DL
1222 if (queue->size) {
1223 struct thread *next_timer = queue->array[0];
1224 monotime_until(&next_timer->u.sands, timer_val);
1225 return timer_val;
1226 }
1227 return NULL;
718e3744 1228}
718e3744 1229
d62a17ae 1230static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1231 struct thread *fetch)
718e3744 1232{
d62a17ae 1233 *fetch = *thread;
1234 thread_add_unuse(m, thread);
1235 return fetch;
718e3744 1236}
1237
d62a17ae 1238static int thread_process_io_helper(struct thread_master *m,
1239 struct thread *thread, short state, int pos)
5d4ccd4e 1240{
d62a17ae 1241 struct thread **thread_array;
1242
1243 if (!thread)
1244 return 0;
1245
1246 if (thread->type == THREAD_READ)
1247 thread_array = m->read;
1248 else
1249 thread_array = m->write;
1250
1251 thread_array[thread->u.fd] = NULL;
c284542b 1252 thread_list_add_tail(&m->ready, thread);
d62a17ae 1253 thread->type = THREAD_READY;
1254 /* if another pthread scheduled this file descriptor for the event we're
1255 * responding to, no problem; we're getting to it now */
1256 thread->master->handler.pfds[pos].events &= ~(state);
1257 return 1;
5d4ccd4e
DS
1258}
1259
8797240e
QY
1260/**
1261 * Process I/O events.
1262 *
1263 * Walks through file descriptor array looking for those pollfds whose .revents
1264 * field has something interesting. Deletes any invalid file descriptors.
1265 *
1266 * @param m the thread master
1267 * @param num the number of active file descriptors (return value of poll())
1268 */
d62a17ae 1269static void thread_process_io(struct thread_master *m, unsigned int num)
0a95a0d0 1270{
d62a17ae 1271 unsigned int ready = 0;
1272 struct pollfd *pfds = m->handler.copy;
1273
1274 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1275 /* no event for current fd? immediately continue */
1276 if (pfds[i].revents == 0)
1277 continue;
1278
1279 ready++;
1280
1281 /* Unless someone has called thread_cancel from another pthread,
1282 * the only
1283 * thing that could have changed in m->handler.pfds while we
1284 * were
1285 * asleep is the .events field in a given pollfd. Barring
1286 * thread_cancel()
1287 * that value should be a superset of the values we have in our
1288 * copy, so
1289 * there's no need to update it. Similarily, barring deletion,
1290 * the fd
1291 * should still be a valid index into the master's pfds. */
1292 if (pfds[i].revents & (POLLIN | POLLHUP))
1293 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
1294 i);
1295 if (pfds[i].revents & POLLOUT)
1296 thread_process_io_helper(m, m->write[pfds[i].fd],
1297 POLLOUT, i);
1298
1299 /* if one of our file descriptors is garbage, remove the same
1300 * from
1301 * both pfds + update sizes and index */
1302 if (pfds[i].revents & POLLNVAL) {
1303 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1304 (m->handler.pfdcount - i - 1)
1305 * sizeof(struct pollfd));
1306 m->handler.pfdcount--;
1307
1308 memmove(pfds + i, pfds + i + 1,
1309 (m->handler.copycount - i - 1)
1310 * sizeof(struct pollfd));
1311 m->handler.copycount--;
1312
1313 i--;
1314 }
1315 }
718e3744 1316}
1317
8b70d0b0 1318/* Add all timers that have popped to the ready list. */
8390828d 1319static unsigned int thread_process_timers(struct pqueue *queue,
d62a17ae 1320 struct timeval *timenow)
a48b4e6d 1321{
d62a17ae 1322 struct thread *thread;
1323 unsigned int ready = 0;
1324
8390828d
DL
1325 while (queue->size) {
1326 thread = queue->array[0];
d62a17ae 1327 if (timercmp(timenow, &thread->u.sands, <))
1328 return ready;
8390828d 1329 pqueue_dequeue(queue);
d62a17ae 1330 thread->type = THREAD_READY;
c284542b 1331 thread_list_add_tail(&thread->master->ready, thread);
d62a17ae 1332 ready++;
1333 }
1334 return ready;
a48b4e6d 1335}
1336
2613abe6 1337/* process a list en masse, e.g. for event thread lists */
c284542b 1338static unsigned int thread_process(struct thread_list_head *list)
2613abe6 1339{
d62a17ae 1340 struct thread *thread;
d62a17ae 1341 unsigned int ready = 0;
1342
c284542b 1343 while ((thread = thread_list_pop(list))) {
d62a17ae 1344 thread->type = THREAD_READY;
c284542b 1345 thread_list_add_tail(&thread->master->ready, thread);
d62a17ae 1346 ready++;
1347 }
1348 return ready;
2613abe6
PJ
1349}
1350
1351
718e3744 1352/* Fetch next ready thread. */
d62a17ae 1353struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
718e3744 1354{
d62a17ae 1355 struct thread *thread = NULL;
1356 struct timeval now;
1357 struct timeval zerotime = {0, 0};
1358 struct timeval tv;
1359 struct timeval *tw = NULL;
1360
1361 int num = 0;
1362
1363 do {
1364 /* Handle signals if any */
1365 if (m->handle_signals)
1366 quagga_sigevent_process();
1367
1368 pthread_mutex_lock(&m->mtx);
1369
1370 /* Process any pending cancellation requests */
1371 do_thread_cancel(m);
1372
e3c9529e
QY
1373 /*
1374 * Attempt to flush ready queue before going into poll().
1375 * This is performance-critical. Think twice before modifying.
1376 */
c284542b 1377 if ((thread = thread_list_pop(&m->ready))) {
e3c9529e
QY
1378 fetch = thread_run(m, thread, fetch);
1379 if (fetch->ref)
1380 *fetch->ref = NULL;
1381 pthread_mutex_unlock(&m->mtx);
1382 break;
1383 }
1384
1385 /* otherwise, tick through scheduling sequence */
1386
bca37d17
QY
1387 /*
1388 * Post events to ready queue. This must come before the
1389 * following block since events should occur immediately
1390 */
d62a17ae 1391 thread_process(&m->event);
1392
bca37d17
QY
1393 /*
1394 * If there are no tasks on the ready queue, we will poll()
1395 * until a timer expires or we receive I/O, whichever comes
1396 * first. The strategy for doing this is:
d62a17ae 1397 *
1398 * - If there are events pending, set the poll() timeout to zero
1399 * - If there are no events pending, but there are timers
1400 * pending, set the
1401 * timeout to the smallest remaining time on any timer
1402 * - If there are neither timers nor events pending, but there
1403 * are file
1404 * descriptors pending, block indefinitely in poll()
1405 * - If nothing is pending, it's time for the application to die
1406 *
1407 * In every case except the last, we need to hit poll() at least
bca37d17
QY
1408 * once per loop to avoid starvation by events
1409 */
c284542b 1410 if (!thread_list_count(&m->ready))
8390828d 1411 tw = thread_timer_wait(m->timer, &tv);
d62a17ae 1412
c284542b
DL
1413 if (thread_list_count(&m->ready) ||
1414 (tw && !timercmp(tw, &zerotime, >)))
d62a17ae 1415 tw = &zerotime;
1416
1417 if (!tw && m->handler.pfdcount == 0) { /* die */
1418 pthread_mutex_unlock(&m->mtx);
1419 fetch = NULL;
1420 break;
1421 }
1422
bca37d17
QY
1423 /*
1424 * Copy pollfd array + # active pollfds in it. Not necessary to
1425 * copy the array size as this is fixed.
1426 */
d62a17ae 1427 m->handler.copycount = m->handler.pfdcount;
1428 memcpy(m->handler.copy, m->handler.pfds,
1429 m->handler.copycount * sizeof(struct pollfd));
1430
e3c9529e
QY
1431 pthread_mutex_unlock(&m->mtx);
1432 {
1433 num = fd_poll(m, m->handler.copy, m->handler.pfdsize,
1434 m->handler.copycount, tw);
1435 }
1436 pthread_mutex_lock(&m->mtx);
d764d2cc 1437
e3c9529e
QY
1438 /* Handle any errors received in poll() */
1439 if (num < 0) {
1440 if (errno == EINTR) {
d62a17ae 1441 pthread_mutex_unlock(&m->mtx);
e3c9529e
QY
1442 /* loop around to signal handler */
1443 continue;
d62a17ae 1444 }
1445
e3c9529e 1446 /* else die */
450971aa 1447 flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
9ef9495e 1448 safe_strerror(errno));
e3c9529e
QY
1449 pthread_mutex_unlock(&m->mtx);
1450 fetch = NULL;
1451 break;
bca37d17 1452 }
d62a17ae 1453
1454 /* Post timers to ready queue. */
1455 monotime(&now);
8390828d 1456 thread_process_timers(m->timer, &now);
d62a17ae 1457
1458 /* Post I/O to ready queue. */
1459 if (num > 0)
1460 thread_process_io(m, num);
1461
d62a17ae 1462 pthread_mutex_unlock(&m->mtx);
1463
1464 } while (!thread && m->spin);
1465
1466 return fetch;
718e3744 1467}
1468
d62a17ae 1469static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
62f44022 1470{
d62a17ae 1471 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1472 + (a.tv_usec - b.tv_usec));
62f44022
QY
1473}
1474
d62a17ae 1475unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1476 unsigned long *cputime)
718e3744 1477{
d62a17ae 1478 /* This is 'user + sys' time. */
1479 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1480 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1481 return timeval_elapsed(now->real, start->real);
8b70d0b0 1482}
1483
50596be0
DS
1484/* We should aim to yield after yield milliseconds, which defaults
1485 to THREAD_YIELD_TIME_SLOT .
8b70d0b0 1486 Note: we are using real (wall clock) time for this calculation.
1487 It could be argued that CPU time may make more sense in certain
1488 contexts. The things to consider are whether the thread may have
1489 blocked (in which case wall time increases, but CPU time does not),
1490 or whether the system is heavily loaded with other processes competing
d62a17ae 1491 for CPU time. On balance, wall clock time seems to make sense.
8b70d0b0 1492 Plus it has the added benefit that gettimeofday should be faster
1493 than calling getrusage. */
d62a17ae 1494int thread_should_yield(struct thread *thread)
718e3744 1495{
d62a17ae 1496 int result;
1497 pthread_mutex_lock(&thread->mtx);
1498 {
1499 result = monotime_since(&thread->real, NULL)
1500 > (int64_t)thread->yield;
1501 }
1502 pthread_mutex_unlock(&thread->mtx);
1503 return result;
50596be0
DS
1504}
1505
d62a17ae 1506void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
50596be0 1507{
d62a17ae 1508 pthread_mutex_lock(&thread->mtx);
1509 {
1510 thread->yield = yield_time;
1511 }
1512 pthread_mutex_unlock(&thread->mtx);
718e3744 1513}
1514
d62a17ae 1515void thread_getrusage(RUSAGE_T *r)
db9c0df9 1516{
231db9a6
DS
1517#if defined RUSAGE_THREAD
1518#define FRR_RUSAGE RUSAGE_THREAD
1519#else
1520#define FRR_RUSAGE RUSAGE_SELF
1521#endif
d62a17ae 1522 monotime(&r->real);
231db9a6 1523 getrusage(FRR_RUSAGE, &(r->cpu));
db9c0df9
PJ
1524}
1525
fbcac826
QY
1526/*
1527 * Call a thread.
1528 *
1529 * This function will atomically update the thread's usage history. At present
1530 * this is the only spot where usage history is written. Nevertheless the code
1531 * has been written such that the introduction of writers in the future should
1532 * not need to update it provided the writers atomically perform only the
1533 * operations done here, i.e. updating the total and maximum times. In
1534 * particular, the maximum real and cpu times must be monotonically increasing
1535 * or this code is not correct.
1536 */
d62a17ae 1537void thread_call(struct thread *thread)
718e3744 1538{
fbcac826
QY
1539 _Atomic unsigned long realtime, cputime;
1540 unsigned long exp;
1541 unsigned long helper;
d62a17ae 1542 RUSAGE_T before, after;
cc8b13a0 1543
d62a17ae 1544 GETRUSAGE(&before);
1545 thread->real = before.real;
718e3744 1546
d62a17ae 1547 pthread_setspecific(thread_current, thread);
1548 (*thread->func)(thread);
1549 pthread_setspecific(thread_current, NULL);
718e3744 1550
d62a17ae 1551 GETRUSAGE(&after);
718e3744 1552
fbcac826
QY
1553 realtime = thread_consumed_time(&after, &before, &helper);
1554 cputime = helper;
1555
1556 /* update realtime */
1557 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1558 memory_order_seq_cst);
1559 exp = atomic_load_explicit(&thread->hist->real.max,
1560 memory_order_seq_cst);
1561 while (exp < realtime
1562 && !atomic_compare_exchange_weak_explicit(
1563 &thread->hist->real.max, &exp, realtime,
1564 memory_order_seq_cst, memory_order_seq_cst))
1565 ;
1566
1567 /* update cputime */
1568 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1569 memory_order_seq_cst);
1570 exp = atomic_load_explicit(&thread->hist->cpu.max,
1571 memory_order_seq_cst);
1572 while (exp < cputime
1573 && !atomic_compare_exchange_weak_explicit(
1574 &thread->hist->cpu.max, &exp, cputime,
1575 memory_order_seq_cst, memory_order_seq_cst))
1576 ;
1577
1578 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1579 memory_order_seq_cst);
1580 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1581 memory_order_seq_cst);
718e3744 1582
924b9229 1583#ifdef CONSUMED_TIME_CHECK
d62a17ae 1584 if (realtime > CONSUMED_TIME_CHECK) {
1585 /*
1586 * We have a CPU Hog on our hands.
1587 * Whinge about it now, so we're aware this is yet another task
1588 * to fix.
1589 */
9ef9495e 1590 flog_warn(
450971aa 1591 EC_LIB_SLOW_THREAD,
d62a17ae 1592 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1593 thread->funcname, (unsigned long)thread->func,
1594 realtime / 1000, cputime / 1000);
1595 }
924b9229 1596#endif /* CONSUMED_TIME_CHECK */
718e3744 1597}
1598
1599/* Execute thread */
d62a17ae 1600void funcname_thread_execute(struct thread_master *m,
1601 int (*func)(struct thread *), void *arg, int val,
1602 debugargdef)
718e3744 1603{
c4345fbf 1604 struct thread *thread;
718e3744 1605
c4345fbf
RZ
1606 /* Get or allocate new thread to execute. */
1607 pthread_mutex_lock(&m->mtx);
1608 {
1609 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
9c7753e4 1610
c4345fbf
RZ
1611 /* Set its event value. */
1612 pthread_mutex_lock(&thread->mtx);
1613 {
1614 thread->add_type = THREAD_EXECUTE;
1615 thread->u.val = val;
1616 thread->ref = &thread;
1617 }
1618 pthread_mutex_unlock(&thread->mtx);
1619 }
1620 pthread_mutex_unlock(&m->mtx);
f7c62e11 1621
c4345fbf
RZ
1622 /* Execute thread doing all accounting. */
1623 thread_call(thread);
9c7753e4 1624
c4345fbf
RZ
1625 /* Give back or free thread. */
1626 thread_add_unuse(m, thread);
718e3744 1627}