]> git.proxmox.com Git - mirror_frr.git/blame - lib/thread.c
Merge pull request #7099 from mjstapp/fix_doc_thread_rw
[mirror_frr.git] / lib / thread.c
CommitLineData
718e3744 1/* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
896014f4
DL
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
718e3744 19 */
20
21/* #define DEBUG */
22
23#include <zebra.h>
308d14ae 24#include <sys/resource.h>
718e3744 25
26#include "thread.h"
27#include "memory.h"
3e41733f 28#include "frrcu.h"
718e3744 29#include "log.h"
e04ab74d 30#include "hash.h"
31#include "command.h"
05c447dd 32#include "sigevent.h"
3bf2673b 33#include "network.h"
bd74dc61 34#include "jhash.h"
fbcac826 35#include "frratomic.h"
00dffa8c 36#include "frr_pthread.h"
9ef9495e 37#include "lib_errors.h"
d6be5fb9 38
d62a17ae 39DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
4a1ab8e4 40DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
a6f235f3 41DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
d62a17ae 42DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
4a1ab8e4 43
c284542b
DL
44DECLARE_LIST(thread_list, struct thread, threaditem)
45
27d29ced
DL
46static int thread_timer_cmp(const struct thread *a, const struct thread *b)
47{
48 if (a->u.sands.tv_sec < b->u.sands.tv_sec)
49 return -1;
50 if (a->u.sands.tv_sec > b->u.sands.tv_sec)
51 return 1;
52 if (a->u.sands.tv_usec < b->u.sands.tv_usec)
53 return -1;
54 if (a->u.sands.tv_usec > b->u.sands.tv_usec)
55 return 1;
56 return 0;
57}
58
59DECLARE_HEAP(thread_timer_list, struct thread, timeritem,
60 thread_timer_cmp)
61
3b96b781
HT
62#if defined(__APPLE__)
63#include <mach/mach.h>
64#include <mach/mach_time.h>
65#endif
66
d62a17ae 67#define AWAKEN(m) \
68 do { \
2b64873d 69 const unsigned char wakebyte = 0x01; \
d62a17ae 70 write(m->io_pipe[1], &wakebyte, 1); \
71 } while (0);
3bf2673b 72
62f44022 73/* control variable for initializer */
c17faa4b 74static pthread_once_t init_once = PTHREAD_ONCE_INIT;
e0bebc7c 75pthread_key_t thread_current;
6b0655a2 76
c17faa4b 77static pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
62f44022
QY
78static struct list *masters;
79
6655966d 80static void thread_free(struct thread_master *master, struct thread *thread);
6b0655a2 81
62f44022 82/* CLI start ---------------------------------------------------------------- */
d8b87afe 83static unsigned int cpu_record_hash_key(const struct cpu_thread_history *a)
e04ab74d 84{
883cc51d 85 int size = sizeof(a->func);
bd74dc61
DS
86
87 return jhash(&a->func, size, 0);
e04ab74d 88}
89
74df8d6d 90static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
d62a17ae 91 const struct cpu_thread_history *b)
e04ab74d 92{
d62a17ae 93 return a->func == b->func;
e04ab74d 94}
95
d62a17ae 96static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
e04ab74d 97{
d62a17ae 98 struct cpu_thread_history *new;
99 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
100 new->func = a->func;
101 new->funcname = a->funcname;
102 return new;
e04ab74d 103}
104
d62a17ae 105static void cpu_record_hash_free(void *a)
228da428 106{
d62a17ae 107 struct cpu_thread_history *hist = a;
108
109 XFREE(MTYPE_THREAD_STATS, hist);
228da428
CC
110}
111
f75e802d 112#ifndef EXCLUDE_CPU_TIME
d62a17ae 113static void vty_out_cpu_thread_history(struct vty *vty,
114 struct cpu_thread_history *a)
e04ab74d 115{
9a8a7b0e 116 vty_out(vty, "%5zu %10zu.%03zu %9zu %8zu %9zu %8zu %9zu",
566bdaf6
DL
117 (size_t)a->total_active, a->cpu.total / 1000,
118 a->cpu.total % 1000, (size_t)a->total_calls,
119 (size_t)(a->cpu.total / a->total_calls), a->cpu.max,
120 (size_t)(a->real.total / a->total_calls), a->real.max);
d62a17ae 121 vty_out(vty, " %c%c%c%c%c %s\n",
122 a->types & (1 << THREAD_READ) ? 'R' : ' ',
123 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
124 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
125 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
126 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
e04ab74d 127}
128
e3b78da8 129static void cpu_record_hash_print(struct hash_bucket *bucket, void *args[])
e04ab74d 130{
d62a17ae 131 struct cpu_thread_history *totals = args[0];
fbcac826 132 struct cpu_thread_history copy;
d62a17ae 133 struct vty *vty = args[1];
fbcac826 134 uint8_t *filter = args[2];
d62a17ae 135
136 struct cpu_thread_history *a = bucket->data;
137
fbcac826
QY
138 copy.total_active =
139 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
140 copy.total_calls =
141 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
142 copy.cpu.total =
143 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
144 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
145 copy.real.total =
146 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
147 copy.real.max =
148 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
149 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
150 copy.funcname = a->funcname;
151
152 if (!(copy.types & *filter))
d62a17ae 153 return;
fbcac826
QY
154
155 vty_out_cpu_thread_history(vty, &copy);
156 totals->total_active += copy.total_active;
157 totals->total_calls += copy.total_calls;
158 totals->real.total += copy.real.total;
159 if (totals->real.max < copy.real.max)
160 totals->real.max = copy.real.max;
161 totals->cpu.total += copy.cpu.total;
162 if (totals->cpu.max < copy.cpu.max)
163 totals->cpu.max = copy.cpu.max;
e04ab74d 164}
165
fbcac826 166static void cpu_record_print(struct vty *vty, uint8_t filter)
e04ab74d 167{
d62a17ae 168 struct cpu_thread_history tmp;
169 void *args[3] = {&tmp, vty, &filter};
170 struct thread_master *m;
171 struct listnode *ln;
172
0d6f7fd6 173 memset(&tmp, 0, sizeof(tmp));
d62a17ae 174 tmp.funcname = "TOTAL";
175 tmp.types = filter;
176
00dffa8c 177 frr_with_mutex(&masters_mtx) {
d62a17ae 178 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
179 const char *name = m->name ? m->name : "main";
180
181 char underline[strlen(name) + 1];
182 memset(underline, '-', sizeof(underline));
4f113d60 183 underline[sizeof(underline) - 1] = '\0';
d62a17ae 184
185 vty_out(vty, "\n");
186 vty_out(vty, "Showing statistics for pthread %s\n",
187 name);
188 vty_out(vty, "-------------------------------%s\n",
189 underline);
190 vty_out(vty, "%21s %18s %18s\n", "",
191 "CPU (user+system):", "Real (wall-clock):");
192 vty_out(vty,
193 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
194 vty_out(vty, " Avg uSec Max uSecs");
195 vty_out(vty, " Type Thread\n");
196
197 if (m->cpu_record->count)
198 hash_iterate(
199 m->cpu_record,
e3b78da8 200 (void (*)(struct hash_bucket *,
d62a17ae 201 void *))cpu_record_hash_print,
202 args);
203 else
204 vty_out(vty, "No data to display yet.\n");
205
206 vty_out(vty, "\n");
207 }
208 }
d62a17ae 209
210 vty_out(vty, "\n");
211 vty_out(vty, "Total thread statistics\n");
212 vty_out(vty, "-------------------------\n");
213 vty_out(vty, "%21s %18s %18s\n", "",
214 "CPU (user+system):", "Real (wall-clock):");
215 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
216 vty_out(vty, " Avg uSec Max uSecs");
217 vty_out(vty, " Type Thread\n");
218
219 if (tmp.total_calls > 0)
220 vty_out_cpu_thread_history(vty, &tmp);
e04ab74d 221}
f75e802d 222#endif
e04ab74d 223
e3b78da8 224static void cpu_record_hash_clear(struct hash_bucket *bucket, void *args[])
e276eb82 225{
fbcac826 226 uint8_t *filter = args[0];
d62a17ae 227 struct hash *cpu_record = args[1];
228
229 struct cpu_thread_history *a = bucket->data;
62f44022 230
d62a17ae 231 if (!(a->types & *filter))
232 return;
f48f65d2 233
d62a17ae 234 hash_release(cpu_record, bucket->data);
e276eb82
PJ
235}
236
fbcac826 237static void cpu_record_clear(uint8_t filter)
e276eb82 238{
fbcac826 239 uint8_t *tmp = &filter;
d62a17ae 240 struct thread_master *m;
241 struct listnode *ln;
242
00dffa8c 243 frr_with_mutex(&masters_mtx) {
d62a17ae 244 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
00dffa8c 245 frr_with_mutex(&m->mtx) {
d62a17ae 246 void *args[2] = {tmp, m->cpu_record};
247 hash_iterate(
248 m->cpu_record,
e3b78da8 249 (void (*)(struct hash_bucket *,
d62a17ae 250 void *))cpu_record_hash_clear,
251 args);
252 }
d62a17ae 253 }
254 }
62f44022
QY
255}
256
fbcac826 257static uint8_t parse_filter(const char *filterstr)
62f44022 258{
d62a17ae 259 int i = 0;
260 int filter = 0;
261
262 while (filterstr[i] != '\0') {
263 switch (filterstr[i]) {
264 case 'r':
265 case 'R':
266 filter |= (1 << THREAD_READ);
267 break;
268 case 'w':
269 case 'W':
270 filter |= (1 << THREAD_WRITE);
271 break;
272 case 't':
273 case 'T':
274 filter |= (1 << THREAD_TIMER);
275 break;
276 case 'e':
277 case 'E':
278 filter |= (1 << THREAD_EVENT);
279 break;
280 case 'x':
281 case 'X':
282 filter |= (1 << THREAD_EXECUTE);
283 break;
284 default:
285 break;
286 }
287 ++i;
288 }
289 return filter;
62f44022
QY
290}
291
f75e802d 292#ifndef EXCLUDE_CPU_TIME
62f44022
QY
293DEFUN (show_thread_cpu,
294 show_thread_cpu_cmd,
295 "show thread cpu [FILTER]",
296 SHOW_STR
297 "Thread information\n"
298 "Thread CPU usage\n"
61fa0b97 299 "Display filter (rwtex)\n")
62f44022 300{
fbcac826 301 uint8_t filter = (uint8_t)-1U;
d62a17ae 302 int idx = 0;
303
304 if (argv_find(argv, argc, "FILTER", &idx)) {
305 filter = parse_filter(argv[idx]->arg);
306 if (!filter) {
307 vty_out(vty,
3efd0893 308 "Invalid filter \"%s\" specified; must contain at leastone of 'RWTEXB'\n",
d62a17ae 309 argv[idx]->arg);
310 return CMD_WARNING;
311 }
312 }
313
314 cpu_record_print(vty, filter);
315 return CMD_SUCCESS;
e276eb82 316}
f75e802d 317#endif
e276eb82 318
8872626b
DS
319static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
320{
321 const char *name = m->name ? m->name : "main";
322 char underline[strlen(name) + 1];
a0b36ae6 323 struct thread *thread;
8872626b
DS
324 uint32_t i;
325
326 memset(underline, '-', sizeof(underline));
327 underline[sizeof(underline) - 1] = '\0';
328
329 vty_out(vty, "\nShowing poll FD's for %s\n", name);
330 vty_out(vty, "----------------------%s\n", underline);
6c19478a
DS
331 vty_out(vty, "Count: %u/%d\n", (uint32_t)m->handler.pfdcount,
332 m->fd_limit);
a0b36ae6
DS
333 for (i = 0; i < m->handler.pfdcount; i++) {
334 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\t\t", i,
335 m->handler.pfds[i].fd, m->handler.pfds[i].events,
8872626b 336 m->handler.pfds[i].revents);
a0b36ae6
DS
337
338 if (m->handler.pfds[i].events & POLLIN) {
339 thread = m->read[m->handler.pfds[i].fd];
340
341 if (!thread)
342 vty_out(vty, "ERROR ");
343 else
344 vty_out(vty, "%s ", thread->funcname);
345 } else
346 vty_out(vty, " ");
347
348 if (m->handler.pfds[i].events & POLLOUT) {
349 thread = m->write[m->handler.pfds[i].fd];
350
351 if (!thread)
352 vty_out(vty, "ERROR\n");
353 else
354 vty_out(vty, "%s\n", thread->funcname);
355 } else
356 vty_out(vty, "\n");
357 }
8872626b
DS
358}
359
360DEFUN (show_thread_poll,
361 show_thread_poll_cmd,
362 "show thread poll",
363 SHOW_STR
364 "Thread information\n"
365 "Show poll FD's and information\n")
366{
367 struct listnode *node;
368 struct thread_master *m;
369
00dffa8c 370 frr_with_mutex(&masters_mtx) {
8872626b
DS
371 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
372 show_thread_poll_helper(vty, m);
373 }
374 }
8872626b
DS
375
376 return CMD_SUCCESS;
377}
378
379
49d41a26
DS
380DEFUN (clear_thread_cpu,
381 clear_thread_cpu_cmd,
382 "clear thread cpu [FILTER]",
62f44022 383 "Clear stored data in all pthreads\n"
49d41a26
DS
384 "Thread information\n"
385 "Thread CPU usage\n"
386 "Display filter (rwtexb)\n")
e276eb82 387{
fbcac826 388 uint8_t filter = (uint8_t)-1U;
d62a17ae 389 int idx = 0;
390
391 if (argv_find(argv, argc, "FILTER", &idx)) {
392 filter = parse_filter(argv[idx]->arg);
393 if (!filter) {
394 vty_out(vty,
3efd0893 395 "Invalid filter \"%s\" specified; must contain at leastone of 'RWTEXB'\n",
d62a17ae 396 argv[idx]->arg);
397 return CMD_WARNING;
398 }
399 }
400
401 cpu_record_clear(filter);
402 return CMD_SUCCESS;
e276eb82 403}
6b0655a2 404
d62a17ae 405void thread_cmd_init(void)
0b84f294 406{
f75e802d 407#ifndef EXCLUDE_CPU_TIME
d62a17ae 408 install_element(VIEW_NODE, &show_thread_cpu_cmd);
f75e802d 409#endif
8872626b 410 install_element(VIEW_NODE, &show_thread_poll_cmd);
d62a17ae 411 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
0b84f294 412}
62f44022
QY
413/* CLI end ------------------------------------------------------------------ */
414
0b84f294 415
d62a17ae 416static void cancelreq_del(void *cr)
63ccb9cb 417{
d62a17ae 418 XFREE(MTYPE_TMP, cr);
63ccb9cb
QY
419}
420
e0bebc7c 421/* initializer, only ever called once */
4d762f26 422static void initializer(void)
e0bebc7c 423{
d62a17ae 424 pthread_key_create(&thread_current, NULL);
e0bebc7c
QY
425}
426
d62a17ae 427struct thread_master *thread_master_create(const char *name)
718e3744 428{
d62a17ae 429 struct thread_master *rv;
430 struct rlimit limit;
431
432 pthread_once(&init_once, &initializer);
433
434 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
d62a17ae 435
436 /* Initialize master mutex */
437 pthread_mutex_init(&rv->mtx, NULL);
438 pthread_cond_init(&rv->cancel_cond, NULL);
439
440 /* Set name */
441 rv->name = name ? XSTRDUP(MTYPE_THREAD_MASTER, name) : NULL;
442
443 /* Initialize I/O task data structures */
444 getrlimit(RLIMIT_NOFILE, &limit);
445 rv->fd_limit = (int)limit.rlim_cur;
a6f235f3
DS
446 rv->read = XCALLOC(MTYPE_THREAD_POLL,
447 sizeof(struct thread *) * rv->fd_limit);
448
449 rv->write = XCALLOC(MTYPE_THREAD_POLL,
450 sizeof(struct thread *) * rv->fd_limit);
d62a17ae 451
bd74dc61 452 rv->cpu_record = hash_create_size(
d8b87afe 453 8, (unsigned int (*)(const void *))cpu_record_hash_key,
74df8d6d 454 (bool (*)(const void *, const void *))cpu_record_hash_cmp,
bd74dc61 455 "Thread Hash");
d62a17ae 456
c284542b
DL
457 thread_list_init(&rv->event);
458 thread_list_init(&rv->ready);
459 thread_list_init(&rv->unuse);
27d29ced 460 thread_timer_list_init(&rv->timer);
d62a17ae 461
462 /* Initialize thread_fetch() settings */
463 rv->spin = true;
464 rv->handle_signals = true;
465
466 /* Set pthread owner, should be updated by actual owner */
467 rv->owner = pthread_self();
468 rv->cancel_req = list_new();
469 rv->cancel_req->del = cancelreq_del;
470 rv->canceled = true;
471
472 /* Initialize pipe poker */
473 pipe(rv->io_pipe);
474 set_nonblocking(rv->io_pipe[0]);
475 set_nonblocking(rv->io_pipe[1]);
476
477 /* Initialize data structures for poll() */
478 rv->handler.pfdsize = rv->fd_limit;
479 rv->handler.pfdcount = 0;
480 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
481 sizeof(struct pollfd) * rv->handler.pfdsize);
482 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
483 sizeof(struct pollfd) * rv->handler.pfdsize);
484
eff09c66 485 /* add to list of threadmasters */
00dffa8c 486 frr_with_mutex(&masters_mtx) {
eff09c66
QY
487 if (!masters)
488 masters = list_new();
489
d62a17ae 490 listnode_add(masters, rv);
491 }
d62a17ae 492
493 return rv;
718e3744 494}
495
d8a8a8de
QY
496void thread_master_set_name(struct thread_master *master, const char *name)
497{
00dffa8c 498 frr_with_mutex(&master->mtx) {
0a22ddfb 499 XFREE(MTYPE_THREAD_MASTER, master->name);
d8a8a8de
QY
500 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
501 }
d8a8a8de
QY
502}
503
6ed04aa2
DS
504#define THREAD_UNUSED_DEPTH 10
505
718e3744 506/* Move thread to unuse list. */
d62a17ae 507static void thread_add_unuse(struct thread_master *m, struct thread *thread)
718e3744 508{
6655966d
RZ
509 pthread_mutex_t mtxc = thread->mtx;
510
d62a17ae 511 assert(m != NULL && thread != NULL);
d62a17ae 512
d62a17ae 513 thread->hist->total_active--;
6ed04aa2
DS
514 memset(thread, 0, sizeof(struct thread));
515 thread->type = THREAD_UNUSED;
516
6655966d
RZ
517 /* Restore the thread mutex context. */
518 thread->mtx = mtxc;
519
c284542b
DL
520 if (thread_list_count(&m->unuse) < THREAD_UNUSED_DEPTH) {
521 thread_list_add_tail(&m->unuse, thread);
6655966d
RZ
522 return;
523 }
524
525 thread_free(m, thread);
718e3744 526}
527
528/* Free all unused thread. */
c284542b
DL
529static void thread_list_free(struct thread_master *m,
530 struct thread_list_head *list)
718e3744 531{
d62a17ae 532 struct thread *t;
d62a17ae 533
c284542b 534 while ((t = thread_list_pop(list)))
6655966d 535 thread_free(m, t);
718e3744 536}
537
d62a17ae 538static void thread_array_free(struct thread_master *m,
539 struct thread **thread_array)
308d14ae 540{
d62a17ae 541 struct thread *t;
542 int index;
543
544 for (index = 0; index < m->fd_limit; ++index) {
545 t = thread_array[index];
546 if (t) {
547 thread_array[index] = NULL;
6655966d 548 thread_free(m, t);
d62a17ae 549 }
550 }
a6f235f3 551 XFREE(MTYPE_THREAD_POLL, thread_array);
308d14ae
DV
552}
553
495f0b13
DS
554/*
555 * thread_master_free_unused
556 *
557 * As threads are finished with they are put on the
558 * unuse list for later reuse.
559 * If we are shutting down, Free up unused threads
560 * So we can see if we forget to shut anything off
561 */
d62a17ae 562void thread_master_free_unused(struct thread_master *m)
495f0b13 563{
00dffa8c 564 frr_with_mutex(&m->mtx) {
d62a17ae 565 struct thread *t;
c284542b 566 while ((t = thread_list_pop(&m->unuse)))
6655966d 567 thread_free(m, t);
d62a17ae 568 }
495f0b13
DS
569}
570
718e3744 571/* Stop thread scheduler. */
d62a17ae 572void thread_master_free(struct thread_master *m)
718e3744 573{
27d29ced
DL
574 struct thread *t;
575
00dffa8c 576 frr_with_mutex(&masters_mtx) {
d62a17ae 577 listnode_delete(masters, m);
eff09c66 578 if (masters->count == 0) {
6a154c88 579 list_delete(&masters);
eff09c66 580 }
d62a17ae 581 }
d62a17ae 582
583 thread_array_free(m, m->read);
584 thread_array_free(m, m->write);
27d29ced
DL
585 while ((t = thread_timer_list_pop(&m->timer)))
586 thread_free(m, t);
d62a17ae 587 thread_list_free(m, &m->event);
588 thread_list_free(m, &m->ready);
589 thread_list_free(m, &m->unuse);
590 pthread_mutex_destroy(&m->mtx);
33844bbe 591 pthread_cond_destroy(&m->cancel_cond);
d62a17ae 592 close(m->io_pipe[0]);
593 close(m->io_pipe[1]);
6a154c88 594 list_delete(&m->cancel_req);
1a0a92ea 595 m->cancel_req = NULL;
d62a17ae 596
597 hash_clean(m->cpu_record, cpu_record_hash_free);
598 hash_free(m->cpu_record);
599 m->cpu_record = NULL;
600
0a22ddfb 601 XFREE(MTYPE_THREAD_MASTER, m->name);
d62a17ae 602 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
603 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
604 XFREE(MTYPE_THREAD_MASTER, m);
718e3744 605}
606
78ca0342
CF
607/* Return remain time in miliseconds. */
608unsigned long thread_timer_remain_msec(struct thread *thread)
718e3744 609{
d62a17ae 610 int64_t remain;
1189d95f 611
00dffa8c 612 frr_with_mutex(&thread->mtx) {
78ca0342 613 remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
d62a17ae 614 }
1189d95f 615
d62a17ae 616 return remain < 0 ? 0 : remain;
718e3744 617}
618
78ca0342
CF
619/* Return remain time in seconds. */
620unsigned long thread_timer_remain_second(struct thread *thread)
621{
622 return thread_timer_remain_msec(thread) / 1000LL;
623}
624
9c7753e4
DL
625#define debugargdef const char *funcname, const char *schedfrom, int fromln
626#define debugargpass funcname, schedfrom, fromln
e04ab74d 627
d62a17ae 628struct timeval thread_timer_remain(struct thread *thread)
6ac44687 629{
d62a17ae 630 struct timeval remain;
00dffa8c 631 frr_with_mutex(&thread->mtx) {
d62a17ae 632 monotime_until(&thread->u.sands, &remain);
633 }
d62a17ae 634 return remain;
6ac44687
CF
635}
636
0447957e
AK
637static int time_hhmmss(char *buf, int buf_size, long sec)
638{
639 long hh;
640 long mm;
641 int wr;
642
643 zassert(buf_size >= 8);
644
645 hh = sec / 3600;
646 sec %= 3600;
647 mm = sec / 60;
648 sec %= 60;
649
650 wr = snprintf(buf, buf_size, "%02ld:%02ld:%02ld", hh, mm, sec);
651
652 return wr != 8;
653}
654
655char *thread_timer_to_hhmmss(char *buf, int buf_size,
656 struct thread *t_timer)
657{
658 if (t_timer) {
659 time_hhmmss(buf, buf_size,
660 thread_timer_remain_second(t_timer));
661 } else {
662 snprintf(buf, buf_size, "--:--:--");
663 }
664 return buf;
665}
666
718e3744 667/* Get new thread. */
d7c0a89a 668static struct thread *thread_get(struct thread_master *m, uint8_t type,
d62a17ae 669 int (*func)(struct thread *), void *arg,
670 debugargdef)
718e3744 671{
c284542b 672 struct thread *thread = thread_list_pop(&m->unuse);
d62a17ae 673 struct cpu_thread_history tmp;
674
675 if (!thread) {
676 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
677 /* mutex only needs to be initialized at struct creation. */
678 pthread_mutex_init(&thread->mtx, NULL);
679 m->alloc++;
680 }
681
682 thread->type = type;
683 thread->add_type = type;
684 thread->master = m;
685 thread->arg = arg;
d62a17ae 686 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
687 thread->ref = NULL;
688
689 /*
690 * So if the passed in funcname is not what we have
691 * stored that means the thread->hist needs to be
692 * updated. We keep the last one around in unused
693 * under the assumption that we are probably
694 * going to immediately allocate the same
695 * type of thread.
696 * This hopefully saves us some serious
697 * hash_get lookups.
698 */
699 if (thread->funcname != funcname || thread->func != func) {
700 tmp.func = func;
701 tmp.funcname = funcname;
702 thread->hist =
703 hash_get(m->cpu_record, &tmp,
704 (void *(*)(void *))cpu_record_hash_alloc);
705 }
706 thread->hist->total_active++;
707 thread->func = func;
708 thread->funcname = funcname;
709 thread->schedfrom = schedfrom;
710 thread->schedfrom_line = fromln;
711
712 return thread;
718e3744 713}
714
6655966d
RZ
715static void thread_free(struct thread_master *master, struct thread *thread)
716{
717 /* Update statistics. */
718 assert(master->alloc > 0);
719 master->alloc--;
720
721 /* Free allocated resources. */
722 pthread_mutex_destroy(&thread->mtx);
723 XFREE(MTYPE_THREAD, thread);
724}
725
d62a17ae 726static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
727 nfds_t count, const struct timeval *timer_wait)
209a72a6 728{
d279ef57
DS
729 /*
730 * If timer_wait is null here, that means poll() should block
731 * indefinitely, unless the thread_master has overridden it by setting
d62a17ae 732 * ->selectpoll_timeout.
d279ef57 733 *
d62a17ae 734 * If the value is positive, it specifies the maximum number of
d279ef57
DS
735 * milliseconds to wait. If the timeout is -1, it specifies that
736 * we should never wait and always return immediately even if no
737 * event is detected. If the value is zero, the behavior is default.
738 */
d62a17ae 739 int timeout = -1;
740
741 /* number of file descriptors with events */
742 int num;
743
744 if (timer_wait != NULL
745 && m->selectpoll_timeout == 0) // use the default value
746 timeout = (timer_wait->tv_sec * 1000)
747 + (timer_wait->tv_usec / 1000);
748 else if (m->selectpoll_timeout > 0) // use the user's timeout
749 timeout = m->selectpoll_timeout;
750 else if (m->selectpoll_timeout
751 < 0) // effect a poll (return immediately)
752 timeout = 0;
753
0bdeb5e5 754 zlog_tls_buffer_flush();
3e41733f
DL
755 rcu_read_unlock();
756 rcu_assert_read_unlocked();
757
d62a17ae 758 /* add poll pipe poker */
759 assert(count + 1 < pfdsize);
760 pfds[count].fd = m->io_pipe[0];
761 pfds[count].events = POLLIN;
762 pfds[count].revents = 0x00;
763
764 num = poll(pfds, count + 1, timeout);
765
766 unsigned char trash[64];
767 if (num > 0 && pfds[count].revents != 0 && num--)
768 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
769 ;
770
3e41733f
DL
771 rcu_read_lock();
772
d62a17ae 773 return num;
209a72a6
DS
774}
775
718e3744 776/* Add new read thread. */
d62a17ae 777struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
778 int (*func)(struct thread *),
779 void *arg, int fd,
780 struct thread **t_ptr,
781 debugargdef)
718e3744 782{
d62a17ae 783 struct thread *thread = NULL;
1ef14bee 784 struct thread **thread_array;
d62a17ae 785
9b864cd3 786 assert(fd >= 0 && fd < m->fd_limit);
00dffa8c
DL
787 frr_with_mutex(&m->mtx) {
788 if (t_ptr && *t_ptr)
789 // thread is already scheduled; don't reschedule
790 break;
d62a17ae 791
792 /* default to a new pollfd */
793 nfds_t queuepos = m->handler.pfdcount;
794
1ef14bee
DS
795 if (dir == THREAD_READ)
796 thread_array = m->read;
797 else
798 thread_array = m->write;
799
d62a17ae 800 /* if we already have a pollfd for our file descriptor, find and
801 * use it */
802 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
803 if (m->handler.pfds[i].fd == fd) {
804 queuepos = i;
1ef14bee
DS
805
806#ifdef DEV_BUILD
807 /*
808 * What happens if we have a thread already
809 * created for this event?
810 */
811 if (thread_array[fd])
812 assert(!"Thread already scheduled for file descriptor");
813#endif
d62a17ae 814 break;
815 }
816
817 /* make sure we have room for this fd + pipe poker fd */
818 assert(queuepos + 1 < m->handler.pfdsize);
819
820 thread = thread_get(m, dir, func, arg, debugargpass);
821
822 m->handler.pfds[queuepos].fd = fd;
823 m->handler.pfds[queuepos].events |=
824 (dir == THREAD_READ ? POLLIN : POLLOUT);
825
826 if (queuepos == m->handler.pfdcount)
827 m->handler.pfdcount++;
828
829 if (thread) {
00dffa8c 830 frr_with_mutex(&thread->mtx) {
d62a17ae 831 thread->u.fd = fd;
1ef14bee 832 thread_array[thread->u.fd] = thread;
d62a17ae 833 }
d62a17ae 834
835 if (t_ptr) {
836 *t_ptr = thread;
837 thread->ref = t_ptr;
838 }
839 }
840
841 AWAKEN(m);
842 }
d62a17ae 843
844 return thread;
718e3744 845}
846
56a94b36 847static struct thread *
d62a17ae 848funcname_thread_add_timer_timeval(struct thread_master *m,
849 int (*func)(struct thread *), int type,
850 void *arg, struct timeval *time_relative,
851 struct thread **t_ptr, debugargdef)
718e3744 852{
d62a17ae 853 struct thread *thread;
d62a17ae 854
855 assert(m != NULL);
856
857 assert(type == THREAD_TIMER);
858 assert(time_relative);
859
00dffa8c
DL
860 frr_with_mutex(&m->mtx) {
861 if (t_ptr && *t_ptr)
d279ef57 862 /* thread is already scheduled; don't reschedule */
d62a17ae 863 return NULL;
d62a17ae 864
d62a17ae 865 thread = thread_get(m, type, func, arg, debugargpass);
866
00dffa8c 867 frr_with_mutex(&thread->mtx) {
d62a17ae 868 monotime(&thread->u.sands);
869 timeradd(&thread->u.sands, time_relative,
870 &thread->u.sands);
27d29ced 871 thread_timer_list_add(&m->timer, thread);
d62a17ae 872 if (t_ptr) {
873 *t_ptr = thread;
874 thread->ref = t_ptr;
875 }
876 }
d62a17ae 877
878 AWAKEN(m);
879 }
d62a17ae 880
881 return thread;
9e867fe6 882}
883
98c91ac6 884
885/* Add timer event thread. */
d62a17ae 886struct thread *funcname_thread_add_timer(struct thread_master *m,
887 int (*func)(struct thread *),
888 void *arg, long timer,
889 struct thread **t_ptr, debugargdef)
9e867fe6 890{
d62a17ae 891 struct timeval trel;
9e867fe6 892
d62a17ae 893 assert(m != NULL);
9e867fe6 894
d62a17ae 895 trel.tv_sec = timer;
896 trel.tv_usec = 0;
9e867fe6 897
d62a17ae 898 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
899 &trel, t_ptr, debugargpass);
98c91ac6 900}
9e867fe6 901
98c91ac6 902/* Add timer event thread with "millisecond" resolution */
d62a17ae 903struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
904 int (*func)(struct thread *),
905 void *arg, long timer,
906 struct thread **t_ptr,
907 debugargdef)
98c91ac6 908{
d62a17ae 909 struct timeval trel;
9e867fe6 910
d62a17ae 911 assert(m != NULL);
718e3744 912
d62a17ae 913 trel.tv_sec = timer / 1000;
914 trel.tv_usec = 1000 * (timer % 1000);
98c91ac6 915
d62a17ae 916 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
917 &trel, t_ptr, debugargpass);
a48b4e6d 918}
919
d03c4cbd 920/* Add timer event thread with "millisecond" resolution */
d62a17ae 921struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
922 int (*func)(struct thread *),
923 void *arg, struct timeval *tv,
924 struct thread **t_ptr, debugargdef)
d03c4cbd 925{
d62a17ae 926 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
927 t_ptr, debugargpass);
d03c4cbd
DL
928}
929
718e3744 930/* Add simple event thread. */
d62a17ae 931struct thread *funcname_thread_add_event(struct thread_master *m,
932 int (*func)(struct thread *),
933 void *arg, int val,
934 struct thread **t_ptr, debugargdef)
718e3744 935{
00dffa8c 936 struct thread *thread = NULL;
d62a17ae 937
938 assert(m != NULL);
939
00dffa8c
DL
940 frr_with_mutex(&m->mtx) {
941 if (t_ptr && *t_ptr)
d279ef57 942 /* thread is already scheduled; don't reschedule */
00dffa8c 943 break;
d62a17ae 944
945 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
00dffa8c 946 frr_with_mutex(&thread->mtx) {
d62a17ae 947 thread->u.val = val;
c284542b 948 thread_list_add_tail(&m->event, thread);
d62a17ae 949 }
d62a17ae 950
951 if (t_ptr) {
952 *t_ptr = thread;
953 thread->ref = t_ptr;
954 }
955
956 AWAKEN(m);
957 }
d62a17ae 958
959 return thread;
718e3744 960}
961
63ccb9cb
QY
962/* Thread cancellation ------------------------------------------------------ */
963
8797240e
QY
964/**
965 * NOT's out the .events field of pollfd corresponding to the given file
966 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
967 *
968 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
969 * implementation for details.
970 *
971 * @param master
972 * @param fd
973 * @param state the event to cancel. One or more (OR'd together) of the
974 * following:
975 * - POLLIN
976 * - POLLOUT
977 */
d62a17ae 978static void thread_cancel_rw(struct thread_master *master, int fd, short state)
0a95a0d0 979{
42d74538
QY
980 bool found = false;
981
d62a17ae 982 /* Cancel POLLHUP too just in case some bozo set it */
983 state |= POLLHUP;
984
985 /* find the index of corresponding pollfd */
986 nfds_t i;
987
988 for (i = 0; i < master->handler.pfdcount; i++)
42d74538
QY
989 if (master->handler.pfds[i].fd == fd) {
990 found = true;
d62a17ae 991 break;
42d74538
QY
992 }
993
994 if (!found) {
995 zlog_debug(
996 "[!] Received cancellation request for nonexistent rw job");
997 zlog_debug("[!] threadmaster: %s | fd: %d",
996c9314 998 master->name ? master->name : "", fd);
42d74538
QY
999 return;
1000 }
d62a17ae 1001
1002 /* NOT out event. */
1003 master->handler.pfds[i].events &= ~(state);
1004
1005 /* If all events are canceled, delete / resize the pollfd array. */
1006 if (master->handler.pfds[i].events == 0) {
1007 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
1008 (master->handler.pfdcount - i - 1)
1009 * sizeof(struct pollfd));
1010 master->handler.pfdcount--;
e985cda0
S
1011 master->handler.pfds[master->handler.pfdcount].fd = 0;
1012 master->handler.pfds[master->handler.pfdcount].events = 0;
d62a17ae 1013 }
1014
1015 /* If we have the same pollfd in the copy, perform the same operations,
1016 * otherwise return. */
1017 if (i >= master->handler.copycount)
1018 return;
1019
1020 master->handler.copy[i].events &= ~(state);
1021
1022 if (master->handler.copy[i].events == 0) {
1023 memmove(master->handler.copy + i, master->handler.copy + i + 1,
1024 (master->handler.copycount - i - 1)
1025 * sizeof(struct pollfd));
1026 master->handler.copycount--;
e985cda0
S
1027 master->handler.copy[master->handler.copycount].fd = 0;
1028 master->handler.copy[master->handler.copycount].events = 0;
d62a17ae 1029 }
0a95a0d0
DS
1030}
1031
1189d95f 1032/**
63ccb9cb 1033 * Process cancellation requests.
1189d95f 1034 *
63ccb9cb
QY
1035 * This may only be run from the pthread which owns the thread_master.
1036 *
1037 * @param master the thread master to process
1038 * @REQUIRE master->mtx
1189d95f 1039 */
d62a17ae 1040static void do_thread_cancel(struct thread_master *master)
718e3744 1041{
c284542b 1042 struct thread_list_head *list = NULL;
d62a17ae 1043 struct thread **thread_array = NULL;
1044 struct thread *thread;
1045
1046 struct cancel_req *cr;
1047 struct listnode *ln;
1048 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
d279ef57
DS
1049 /*
1050 * If this is an event object cancellation, linear search
1051 * through event list deleting any events which have the
1052 * specified argument. We also need to check every thread
1053 * in the ready queue.
1054 */
d62a17ae 1055 if (cr->eventobj) {
1056 struct thread *t;
c284542b 1057
81fddbe7 1058 frr_each_safe(thread_list, &master->event, t) {
c284542b
DL
1059 if (t->arg != cr->eventobj)
1060 continue;
1061 thread_list_del(&master->event, t);
1062 if (t->ref)
1063 *t->ref = NULL;
1064 thread_add_unuse(master, t);
d62a17ae 1065 }
1066
81fddbe7 1067 frr_each_safe(thread_list, &master->ready, t) {
c284542b
DL
1068 if (t->arg != cr->eventobj)
1069 continue;
1070 thread_list_del(&master->ready, t);
1071 if (t->ref)
1072 *t->ref = NULL;
1073 thread_add_unuse(master, t);
d62a17ae 1074 }
1075 continue;
1076 }
1077
d279ef57
DS
1078 /*
1079 * The pointer varies depending on whether the cancellation
1080 * request was made asynchronously or not. If it was, we
1081 * need to check whether the thread even exists anymore
1082 * before cancelling it.
1083 */
d62a17ae 1084 thread = (cr->thread) ? cr->thread : *cr->threadref;
1085
1086 if (!thread)
1087 continue;
1088
1089 /* Determine the appropriate queue to cancel the thread from */
1090 switch (thread->type) {
1091 case THREAD_READ:
1092 thread_cancel_rw(master, thread->u.fd, POLLIN);
1093 thread_array = master->read;
1094 break;
1095 case THREAD_WRITE:
1096 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1097 thread_array = master->write;
1098 break;
1099 case THREAD_TIMER:
27d29ced 1100 thread_timer_list_del(&master->timer, thread);
d62a17ae 1101 break;
1102 case THREAD_EVENT:
1103 list = &master->event;
1104 break;
1105 case THREAD_READY:
1106 list = &master->ready;
1107 break;
1108 default:
1109 continue;
1110 break;
1111 }
1112
27d29ced 1113 if (list) {
c284542b 1114 thread_list_del(list, thread);
d62a17ae 1115 } else if (thread_array) {
1116 thread_array[thread->u.fd] = NULL;
d62a17ae 1117 }
1118
1119 if (thread->ref)
1120 *thread->ref = NULL;
1121
1122 thread_add_unuse(thread->master, thread);
1123 }
1124
1125 /* Delete and free all cancellation requests */
41b21bfa
MS
1126 if (master->cancel_req)
1127 list_delete_all_node(master->cancel_req);
d62a17ae 1128
1129 /* Wake up any threads which may be blocked in thread_cancel_async() */
1130 master->canceled = true;
1131 pthread_cond_broadcast(&master->cancel_cond);
718e3744 1132}
1133
63ccb9cb
QY
1134/**
1135 * Cancel any events which have the specified argument.
1136 *
1137 * MT-Unsafe
1138 *
1139 * @param m the thread_master to cancel from
1140 * @param arg the argument passed when creating the event
1141 */
d62a17ae 1142void thread_cancel_event(struct thread_master *master, void *arg)
718e3744 1143{
d62a17ae 1144 assert(master->owner == pthread_self());
1145
00dffa8c 1146 frr_with_mutex(&master->mtx) {
d62a17ae 1147 struct cancel_req *cr =
1148 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1149 cr->eventobj = arg;
1150 listnode_add(master->cancel_req, cr);
1151 do_thread_cancel(master);
1152 }
63ccb9cb 1153}
1189d95f 1154
63ccb9cb
QY
1155/**
1156 * Cancel a specific task.
1157 *
1158 * MT-Unsafe
1159 *
1160 * @param thread task to cancel
1161 */
d62a17ae 1162void thread_cancel(struct thread *thread)
63ccb9cb 1163{
6ed04aa2 1164 struct thread_master *master = thread->master;
d62a17ae 1165
6ed04aa2
DS
1166 assert(master->owner == pthread_self());
1167
00dffa8c 1168 frr_with_mutex(&master->mtx) {
d62a17ae 1169 struct cancel_req *cr =
1170 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1171 cr->thread = thread;
6ed04aa2
DS
1172 listnode_add(master->cancel_req, cr);
1173 do_thread_cancel(master);
d62a17ae 1174 }
63ccb9cb 1175}
1189d95f 1176
63ccb9cb
QY
1177/**
1178 * Asynchronous cancellation.
1179 *
8797240e
QY
1180 * Called with either a struct thread ** or void * to an event argument,
1181 * this function posts the correct cancellation request and blocks until it is
1182 * serviced.
63ccb9cb
QY
1183 *
1184 * If the thread is currently running, execution blocks until it completes.
1185 *
8797240e
QY
1186 * The last two parameters are mutually exclusive, i.e. if you pass one the
1187 * other must be NULL.
1188 *
1189 * When the cancellation procedure executes on the target thread_master, the
1190 * thread * provided is checked for nullity. If it is null, the thread is
1191 * assumed to no longer exist and the cancellation request is a no-op. Thus
1192 * users of this API must pass a back-reference when scheduling the original
1193 * task.
1194 *
63ccb9cb
QY
1195 * MT-Safe
1196 *
8797240e
QY
1197 * @param master the thread master with the relevant event / task
1198 * @param thread pointer to thread to cancel
1199 * @param eventobj the event
63ccb9cb 1200 */
d62a17ae 1201void thread_cancel_async(struct thread_master *master, struct thread **thread,
1202 void *eventobj)
63ccb9cb 1203{
d62a17ae 1204 assert(!(thread && eventobj) && (thread || eventobj));
1205 assert(master->owner != pthread_self());
1206
00dffa8c 1207 frr_with_mutex(&master->mtx) {
d62a17ae 1208 master->canceled = false;
1209
1210 if (thread) {
1211 struct cancel_req *cr =
1212 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1213 cr->threadref = thread;
1214 listnode_add(master->cancel_req, cr);
1215 } else if (eventobj) {
1216 struct cancel_req *cr =
1217 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1218 cr->eventobj = eventobj;
1219 listnode_add(master->cancel_req, cr);
1220 }
1221 AWAKEN(master);
1222
1223 while (!master->canceled)
1224 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1225 }
718e3744 1226}
63ccb9cb 1227/* ------------------------------------------------------------------------- */
718e3744 1228
27d29ced 1229static struct timeval *thread_timer_wait(struct thread_timer_list_head *timers,
d62a17ae 1230 struct timeval *timer_val)
718e3744 1231{
27d29ced
DL
1232 if (!thread_timer_list_count(timers))
1233 return NULL;
1234
1235 struct thread *next_timer = thread_timer_list_first(timers);
1236 monotime_until(&next_timer->u.sands, timer_val);
1237 return timer_val;
718e3744 1238}
718e3744 1239
d62a17ae 1240static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1241 struct thread *fetch)
718e3744 1242{
d62a17ae 1243 *fetch = *thread;
1244 thread_add_unuse(m, thread);
1245 return fetch;
718e3744 1246}
1247
d62a17ae 1248static int thread_process_io_helper(struct thread_master *m,
45f3d590
DS
1249 struct thread *thread, short state,
1250 short actual_state, int pos)
5d4ccd4e 1251{
d62a17ae 1252 struct thread **thread_array;
1253
45f3d590
DS
1254 /*
1255 * poll() clears the .events field, but the pollfd array we
1256 * pass to poll() is a copy of the one used to schedule threads.
1257 * We need to synchronize state between the two here by applying
1258 * the same changes poll() made on the copy of the "real" pollfd
1259 * array.
1260 *
1261 * This cleans up a possible infinite loop where we refuse
1262 * to respond to a poll event but poll is insistent that
1263 * we should.
1264 */
1265 m->handler.pfds[pos].events &= ~(state);
1266
1267 if (!thread) {
1268 if ((actual_state & (POLLHUP|POLLIN)) != POLLHUP)
1269 flog_err(EC_LIB_NO_THREAD,
1270 "Attempting to process an I/O event but for fd: %d(%d) no thread to handle this!\n",
1271 m->handler.pfds[pos].fd, actual_state);
d62a17ae 1272 return 0;
45f3d590 1273 }
d62a17ae 1274
1275 if (thread->type == THREAD_READ)
1276 thread_array = m->read;
1277 else
1278 thread_array = m->write;
1279
1280 thread_array[thread->u.fd] = NULL;
c284542b 1281 thread_list_add_tail(&m->ready, thread);
d62a17ae 1282 thread->type = THREAD_READY;
45f3d590 1283
d62a17ae 1284 return 1;
5d4ccd4e
DS
1285}
1286
8797240e
QY
1287/**
1288 * Process I/O events.
1289 *
1290 * Walks through file descriptor array looking for those pollfds whose .revents
1291 * field has something interesting. Deletes any invalid file descriptors.
1292 *
1293 * @param m the thread master
1294 * @param num the number of active file descriptors (return value of poll())
1295 */
d62a17ae 1296static void thread_process_io(struct thread_master *m, unsigned int num)
0a95a0d0 1297{
d62a17ae 1298 unsigned int ready = 0;
1299 struct pollfd *pfds = m->handler.copy;
1300
1301 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1302 /* no event for current fd? immediately continue */
1303 if (pfds[i].revents == 0)
1304 continue;
1305
1306 ready++;
1307
d279ef57
DS
1308 /*
1309 * Unless someone has called thread_cancel from another
1310 * pthread, the only thing that could have changed in
1311 * m->handler.pfds while we were asleep is the .events
1312 * field in a given pollfd. Barring thread_cancel() that
1313 * value should be a superset of the values we have in our
1314 * copy, so there's no need to update it. Similarily,
1315 * barring deletion, the fd should still be a valid index
1316 * into the master's pfds.
d142453d
DS
1317 *
1318 * We are including POLLERR here to do a READ event
1319 * this is because the read should fail and the
1320 * read function should handle it appropriately
d279ef57 1321 */
d142453d 1322 if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
d62a17ae 1323 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
45f3d590
DS
1324 pfds[i].revents, i);
1325 }
d62a17ae 1326 if (pfds[i].revents & POLLOUT)
1327 thread_process_io_helper(m, m->write[pfds[i].fd],
45f3d590 1328 POLLOUT, pfds[i].revents, i);
d62a17ae 1329
1330 /* if one of our file descriptors is garbage, remove the same
1331 * from
1332 * both pfds + update sizes and index */
1333 if (pfds[i].revents & POLLNVAL) {
1334 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1335 (m->handler.pfdcount - i - 1)
1336 * sizeof(struct pollfd));
1337 m->handler.pfdcount--;
e985cda0
S
1338 m->handler.pfds[m->handler.pfdcount].fd = 0;
1339 m->handler.pfds[m->handler.pfdcount].events = 0;
d62a17ae 1340
1341 memmove(pfds + i, pfds + i + 1,
1342 (m->handler.copycount - i - 1)
1343 * sizeof(struct pollfd));
1344 m->handler.copycount--;
e985cda0
S
1345 m->handler.copy[m->handler.copycount].fd = 0;
1346 m->handler.copy[m->handler.copycount].events = 0;
d62a17ae 1347
1348 i--;
1349 }
1350 }
718e3744 1351}
1352
8b70d0b0 1353/* Add all timers that have popped to the ready list. */
27d29ced 1354static unsigned int thread_process_timers(struct thread_timer_list_head *timers,
d62a17ae 1355 struct timeval *timenow)
a48b4e6d 1356{
d62a17ae 1357 struct thread *thread;
1358 unsigned int ready = 0;
1359
27d29ced 1360 while ((thread = thread_timer_list_first(timers))) {
d62a17ae 1361 if (timercmp(timenow, &thread->u.sands, <))
1362 return ready;
27d29ced 1363 thread_timer_list_pop(timers);
d62a17ae 1364 thread->type = THREAD_READY;
c284542b 1365 thread_list_add_tail(&thread->master->ready, thread);
d62a17ae 1366 ready++;
1367 }
1368 return ready;
a48b4e6d 1369}
1370
2613abe6 1371/* process a list en masse, e.g. for event thread lists */
c284542b 1372static unsigned int thread_process(struct thread_list_head *list)
2613abe6 1373{
d62a17ae 1374 struct thread *thread;
d62a17ae 1375 unsigned int ready = 0;
1376
c284542b 1377 while ((thread = thread_list_pop(list))) {
d62a17ae 1378 thread->type = THREAD_READY;
c284542b 1379 thread_list_add_tail(&thread->master->ready, thread);
d62a17ae 1380 ready++;
1381 }
1382 return ready;
2613abe6
PJ
1383}
1384
1385
718e3744 1386/* Fetch next ready thread. */
d62a17ae 1387struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
718e3744 1388{
d62a17ae 1389 struct thread *thread = NULL;
1390 struct timeval now;
1391 struct timeval zerotime = {0, 0};
1392 struct timeval tv;
1393 struct timeval *tw = NULL;
1394
1395 int num = 0;
1396
1397 do {
1398 /* Handle signals if any */
1399 if (m->handle_signals)
1400 quagga_sigevent_process();
1401
1402 pthread_mutex_lock(&m->mtx);
1403
1404 /* Process any pending cancellation requests */
1405 do_thread_cancel(m);
1406
e3c9529e
QY
1407 /*
1408 * Attempt to flush ready queue before going into poll().
1409 * This is performance-critical. Think twice before modifying.
1410 */
c284542b 1411 if ((thread = thread_list_pop(&m->ready))) {
e3c9529e
QY
1412 fetch = thread_run(m, thread, fetch);
1413 if (fetch->ref)
1414 *fetch->ref = NULL;
1415 pthread_mutex_unlock(&m->mtx);
1416 break;
1417 }
1418
1419 /* otherwise, tick through scheduling sequence */
1420
bca37d17
QY
1421 /*
1422 * Post events to ready queue. This must come before the
1423 * following block since events should occur immediately
1424 */
d62a17ae 1425 thread_process(&m->event);
1426
bca37d17
QY
1427 /*
1428 * If there are no tasks on the ready queue, we will poll()
1429 * until a timer expires or we receive I/O, whichever comes
1430 * first. The strategy for doing this is:
d62a17ae 1431 *
1432 * - If there are events pending, set the poll() timeout to zero
1433 * - If there are no events pending, but there are timers
d279ef57
DS
1434 * pending, set the timeout to the smallest remaining time on
1435 * any timer.
d62a17ae 1436 * - If there are neither timers nor events pending, but there
d279ef57 1437 * are file descriptors pending, block indefinitely in poll()
d62a17ae 1438 * - If nothing is pending, it's time for the application to die
1439 *
1440 * In every case except the last, we need to hit poll() at least
bca37d17
QY
1441 * once per loop to avoid starvation by events
1442 */
c284542b 1443 if (!thread_list_count(&m->ready))
27d29ced 1444 tw = thread_timer_wait(&m->timer, &tv);
d62a17ae 1445
c284542b
DL
1446 if (thread_list_count(&m->ready) ||
1447 (tw && !timercmp(tw, &zerotime, >)))
d62a17ae 1448 tw = &zerotime;
1449
1450 if (!tw && m->handler.pfdcount == 0) { /* die */
1451 pthread_mutex_unlock(&m->mtx);
1452 fetch = NULL;
1453 break;
1454 }
1455
bca37d17
QY
1456 /*
1457 * Copy pollfd array + # active pollfds in it. Not necessary to
1458 * copy the array size as this is fixed.
1459 */
d62a17ae 1460 m->handler.copycount = m->handler.pfdcount;
1461 memcpy(m->handler.copy, m->handler.pfds,
1462 m->handler.copycount * sizeof(struct pollfd));
1463
e3c9529e
QY
1464 pthread_mutex_unlock(&m->mtx);
1465 {
1466 num = fd_poll(m, m->handler.copy, m->handler.pfdsize,
1467 m->handler.copycount, tw);
1468 }
1469 pthread_mutex_lock(&m->mtx);
d764d2cc 1470
e3c9529e
QY
1471 /* Handle any errors received in poll() */
1472 if (num < 0) {
1473 if (errno == EINTR) {
d62a17ae 1474 pthread_mutex_unlock(&m->mtx);
e3c9529e
QY
1475 /* loop around to signal handler */
1476 continue;
d62a17ae 1477 }
1478
e3c9529e 1479 /* else die */
450971aa 1480 flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
9ef9495e 1481 safe_strerror(errno));
e3c9529e
QY
1482 pthread_mutex_unlock(&m->mtx);
1483 fetch = NULL;
1484 break;
bca37d17 1485 }
d62a17ae 1486
1487 /* Post timers to ready queue. */
1488 monotime(&now);
27d29ced 1489 thread_process_timers(&m->timer, &now);
d62a17ae 1490
1491 /* Post I/O to ready queue. */
1492 if (num > 0)
1493 thread_process_io(m, num);
1494
d62a17ae 1495 pthread_mutex_unlock(&m->mtx);
1496
1497 } while (!thread && m->spin);
1498
1499 return fetch;
718e3744 1500}
1501
d62a17ae 1502static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
62f44022 1503{
d62a17ae 1504 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1505 + (a.tv_usec - b.tv_usec));
62f44022
QY
1506}
1507
d62a17ae 1508unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1509 unsigned long *cputime)
718e3744 1510{
d62a17ae 1511 /* This is 'user + sys' time. */
1512 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1513 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1514 return timeval_elapsed(now->real, start->real);
8b70d0b0 1515}
1516
50596be0
DS
1517/* We should aim to yield after yield milliseconds, which defaults
1518 to THREAD_YIELD_TIME_SLOT .
8b70d0b0 1519 Note: we are using real (wall clock) time for this calculation.
1520 It could be argued that CPU time may make more sense in certain
1521 contexts. The things to consider are whether the thread may have
1522 blocked (in which case wall time increases, but CPU time does not),
1523 or whether the system is heavily loaded with other processes competing
d62a17ae 1524 for CPU time. On balance, wall clock time seems to make sense.
8b70d0b0 1525 Plus it has the added benefit that gettimeofday should be faster
1526 than calling getrusage. */
d62a17ae 1527int thread_should_yield(struct thread *thread)
718e3744 1528{
d62a17ae 1529 int result;
00dffa8c 1530 frr_with_mutex(&thread->mtx) {
d62a17ae 1531 result = monotime_since(&thread->real, NULL)
1532 > (int64_t)thread->yield;
1533 }
d62a17ae 1534 return result;
50596be0
DS
1535}
1536
d62a17ae 1537void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
50596be0 1538{
00dffa8c 1539 frr_with_mutex(&thread->mtx) {
d62a17ae 1540 thread->yield = yield_time;
1541 }
718e3744 1542}
1543
d62a17ae 1544void thread_getrusage(RUSAGE_T *r)
db9c0df9 1545{
231db9a6
DS
1546#if defined RUSAGE_THREAD
1547#define FRR_RUSAGE RUSAGE_THREAD
1548#else
1549#define FRR_RUSAGE RUSAGE_SELF
1550#endif
d62a17ae 1551 monotime(&r->real);
f75e802d 1552#ifndef EXCLUDE_CPU_TIME
231db9a6 1553 getrusage(FRR_RUSAGE, &(r->cpu));
f75e802d 1554#endif
db9c0df9
PJ
1555}
1556
fbcac826
QY
1557/*
1558 * Call a thread.
1559 *
1560 * This function will atomically update the thread's usage history. At present
1561 * this is the only spot where usage history is written. Nevertheless the code
1562 * has been written such that the introduction of writers in the future should
1563 * not need to update it provided the writers atomically perform only the
1564 * operations done here, i.e. updating the total and maximum times. In
1565 * particular, the maximum real and cpu times must be monotonically increasing
1566 * or this code is not correct.
1567 */
d62a17ae 1568void thread_call(struct thread *thread)
718e3744 1569{
f75e802d 1570#ifndef EXCLUDE_CPU_TIME
fbcac826
QY
1571 _Atomic unsigned long realtime, cputime;
1572 unsigned long exp;
1573 unsigned long helper;
f75e802d 1574#endif
d62a17ae 1575 RUSAGE_T before, after;
cc8b13a0 1576
d62a17ae 1577 GETRUSAGE(&before);
1578 thread->real = before.real;
718e3744 1579
d62a17ae 1580 pthread_setspecific(thread_current, thread);
1581 (*thread->func)(thread);
1582 pthread_setspecific(thread_current, NULL);
718e3744 1583
d62a17ae 1584 GETRUSAGE(&after);
718e3744 1585
f75e802d 1586#ifndef EXCLUDE_CPU_TIME
fbcac826
QY
1587 realtime = thread_consumed_time(&after, &before, &helper);
1588 cputime = helper;
1589
1590 /* update realtime */
1591 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1592 memory_order_seq_cst);
1593 exp = atomic_load_explicit(&thread->hist->real.max,
1594 memory_order_seq_cst);
1595 while (exp < realtime
1596 && !atomic_compare_exchange_weak_explicit(
1597 &thread->hist->real.max, &exp, realtime,
1598 memory_order_seq_cst, memory_order_seq_cst))
1599 ;
1600
1601 /* update cputime */
1602 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1603 memory_order_seq_cst);
1604 exp = atomic_load_explicit(&thread->hist->cpu.max,
1605 memory_order_seq_cst);
1606 while (exp < cputime
1607 && !atomic_compare_exchange_weak_explicit(
1608 &thread->hist->cpu.max, &exp, cputime,
1609 memory_order_seq_cst, memory_order_seq_cst))
1610 ;
1611
1612 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1613 memory_order_seq_cst);
1614 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1615 memory_order_seq_cst);
718e3744 1616
924b9229 1617#ifdef CONSUMED_TIME_CHECK
d62a17ae 1618 if (realtime > CONSUMED_TIME_CHECK) {
1619 /*
1620 * We have a CPU Hog on our hands.
1621 * Whinge about it now, so we're aware this is yet another task
1622 * to fix.
1623 */
9ef9495e 1624 flog_warn(
450971aa 1625 EC_LIB_SLOW_THREAD,
d62a17ae 1626 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1627 thread->funcname, (unsigned long)thread->func,
1628 realtime / 1000, cputime / 1000);
1629 }
924b9229 1630#endif /* CONSUMED_TIME_CHECK */
f75e802d 1631#endif /* Exclude CPU Time */
718e3744 1632}
1633
1634/* Execute thread */
d62a17ae 1635void funcname_thread_execute(struct thread_master *m,
1636 int (*func)(struct thread *), void *arg, int val,
1637 debugargdef)
718e3744 1638{
c4345fbf 1639 struct thread *thread;
718e3744 1640
c4345fbf 1641 /* Get or allocate new thread to execute. */
00dffa8c 1642 frr_with_mutex(&m->mtx) {
c4345fbf 1643 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
9c7753e4 1644
c4345fbf 1645 /* Set its event value. */
00dffa8c 1646 frr_with_mutex(&thread->mtx) {
c4345fbf
RZ
1647 thread->add_type = THREAD_EXECUTE;
1648 thread->u.val = val;
1649 thread->ref = &thread;
1650 }
c4345fbf 1651 }
f7c62e11 1652
c4345fbf
RZ
1653 /* Execute thread doing all accounting. */
1654 thread_call(thread);
9c7753e4 1655
c4345fbf
RZ
1656 /* Give back or free thread. */
1657 thread_add_unuse(m, thread);
718e3744 1658}