]> git.proxmox.com Git - mirror_frr.git/blame - lib/thread.c
lib/seqlock: add a few more comments
[mirror_frr.git] / lib / thread.c
CommitLineData
718e3744 1/* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
896014f4
DL
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
718e3744 19 */
20
21/* #define DEBUG */
22
23#include <zebra.h>
308d14ae 24#include <sys/resource.h>
718e3744 25
26#include "thread.h"
27#include "memory.h"
28#include "log.h"
e04ab74d 29#include "hash.h"
8390828d 30#include "pqueue.h"
e04ab74d 31#include "command.h"
05c447dd 32#include "sigevent.h"
3bf2673b 33#include "network.h"
bd74dc61 34#include "jhash.h"
fbcac826 35#include "frratomic.h"
9ef9495e 36#include "lib_errors.h"
d6be5fb9 37
d62a17ae 38DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
4a1ab8e4 39DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
a6f235f3 40DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
d62a17ae 41DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
4a1ab8e4 42
c284542b
DL
43DECLARE_LIST(thread_list, struct thread, threaditem)
44
3b96b781
HT
45#if defined(__APPLE__)
46#include <mach/mach.h>
47#include <mach/mach_time.h>
48#endif
49
d62a17ae 50#define AWAKEN(m) \
51 do { \
52 static unsigned char wakebyte = 0x01; \
53 write(m->io_pipe[1], &wakebyte, 1); \
54 } while (0);
3bf2673b 55
62f44022 56/* control variable for initializer */
c17faa4b 57static pthread_once_t init_once = PTHREAD_ONCE_INIT;
e0bebc7c 58pthread_key_t thread_current;
6b0655a2 59
c17faa4b 60static pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
62f44022
QY
61static struct list *masters;
62
6655966d 63static void thread_free(struct thread_master *master, struct thread *thread);
6b0655a2 64
62f44022 65/* CLI start ---------------------------------------------------------------- */
d8b87afe 66static unsigned int cpu_record_hash_key(const struct cpu_thread_history *a)
e04ab74d 67{
883cc51d 68 int size = sizeof(a->func);
bd74dc61
DS
69
70 return jhash(&a->func, size, 0);
e04ab74d 71}
72
74df8d6d 73static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
d62a17ae 74 const struct cpu_thread_history *b)
e04ab74d 75{
d62a17ae 76 return a->func == b->func;
e04ab74d 77}
78
d62a17ae 79static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
e04ab74d 80{
d62a17ae 81 struct cpu_thread_history *new;
82 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
83 new->func = a->func;
84 new->funcname = a->funcname;
85 return new;
e04ab74d 86}
87
d62a17ae 88static void cpu_record_hash_free(void *a)
228da428 89{
d62a17ae 90 struct cpu_thread_history *hist = a;
91
92 XFREE(MTYPE_THREAD_STATS, hist);
228da428
CC
93}
94
d62a17ae 95static void vty_out_cpu_thread_history(struct vty *vty,
96 struct cpu_thread_history *a)
e04ab74d 97{
9a8a7b0e 98 vty_out(vty, "%5zu %10zu.%03zu %9zu %8zu %9zu %8zu %9zu",
051a0be4
DL
99 (size_t)a->total_active,
100 a->cpu.total / 1000, a->cpu.total % 1000,
101 (size_t)a->total_calls,
102 a->cpu.total / a->total_calls, a->cpu.max,
d62a17ae 103 a->real.total / a->total_calls, a->real.max);
104 vty_out(vty, " %c%c%c%c%c %s\n",
105 a->types & (1 << THREAD_READ) ? 'R' : ' ',
106 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
107 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
108 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
109 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
e04ab74d 110}
111
e3b78da8 112static void cpu_record_hash_print(struct hash_bucket *bucket, void *args[])
e04ab74d 113{
d62a17ae 114 struct cpu_thread_history *totals = args[0];
fbcac826 115 struct cpu_thread_history copy;
d62a17ae 116 struct vty *vty = args[1];
fbcac826 117 uint8_t *filter = args[2];
d62a17ae 118
119 struct cpu_thread_history *a = bucket->data;
120
fbcac826
QY
121 copy.total_active =
122 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
123 copy.total_calls =
124 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
125 copy.cpu.total =
126 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
127 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
128 copy.real.total =
129 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
130 copy.real.max =
131 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
132 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
133 copy.funcname = a->funcname;
134
135 if (!(copy.types & *filter))
d62a17ae 136 return;
fbcac826
QY
137
138 vty_out_cpu_thread_history(vty, &copy);
139 totals->total_active += copy.total_active;
140 totals->total_calls += copy.total_calls;
141 totals->real.total += copy.real.total;
142 if (totals->real.max < copy.real.max)
143 totals->real.max = copy.real.max;
144 totals->cpu.total += copy.cpu.total;
145 if (totals->cpu.max < copy.cpu.max)
146 totals->cpu.max = copy.cpu.max;
e04ab74d 147}
148
fbcac826 149static void cpu_record_print(struct vty *vty, uint8_t filter)
e04ab74d 150{
d62a17ae 151 struct cpu_thread_history tmp;
152 void *args[3] = {&tmp, vty, &filter};
153 struct thread_master *m;
154 struct listnode *ln;
155
156 memset(&tmp, 0, sizeof tmp);
157 tmp.funcname = "TOTAL";
158 tmp.types = filter;
159
160 pthread_mutex_lock(&masters_mtx);
161 {
162 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
163 const char *name = m->name ? m->name : "main";
164
165 char underline[strlen(name) + 1];
166 memset(underline, '-', sizeof(underline));
4f113d60 167 underline[sizeof(underline) - 1] = '\0';
d62a17ae 168
169 vty_out(vty, "\n");
170 vty_out(vty, "Showing statistics for pthread %s\n",
171 name);
172 vty_out(vty, "-------------------------------%s\n",
173 underline);
174 vty_out(vty, "%21s %18s %18s\n", "",
175 "CPU (user+system):", "Real (wall-clock):");
176 vty_out(vty,
177 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
178 vty_out(vty, " Avg uSec Max uSecs");
179 vty_out(vty, " Type Thread\n");
180
181 if (m->cpu_record->count)
182 hash_iterate(
183 m->cpu_record,
e3b78da8 184 (void (*)(struct hash_bucket *,
d62a17ae 185 void *))cpu_record_hash_print,
186 args);
187 else
188 vty_out(vty, "No data to display yet.\n");
189
190 vty_out(vty, "\n");
191 }
192 }
193 pthread_mutex_unlock(&masters_mtx);
194
195 vty_out(vty, "\n");
196 vty_out(vty, "Total thread statistics\n");
197 vty_out(vty, "-------------------------\n");
198 vty_out(vty, "%21s %18s %18s\n", "",
199 "CPU (user+system):", "Real (wall-clock):");
200 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
201 vty_out(vty, " Avg uSec Max uSecs");
202 vty_out(vty, " Type Thread\n");
203
204 if (tmp.total_calls > 0)
205 vty_out_cpu_thread_history(vty, &tmp);
e04ab74d 206}
207
e3b78da8 208static void cpu_record_hash_clear(struct hash_bucket *bucket, void *args[])
e276eb82 209{
fbcac826 210 uint8_t *filter = args[0];
d62a17ae 211 struct hash *cpu_record = args[1];
212
213 struct cpu_thread_history *a = bucket->data;
62f44022 214
d62a17ae 215 if (!(a->types & *filter))
216 return;
f48f65d2 217
d62a17ae 218 hash_release(cpu_record, bucket->data);
e276eb82
PJ
219}
220
fbcac826 221static void cpu_record_clear(uint8_t filter)
e276eb82 222{
fbcac826 223 uint8_t *tmp = &filter;
d62a17ae 224 struct thread_master *m;
225 struct listnode *ln;
226
227 pthread_mutex_lock(&masters_mtx);
228 {
229 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
230 pthread_mutex_lock(&m->mtx);
231 {
232 void *args[2] = {tmp, m->cpu_record};
233 hash_iterate(
234 m->cpu_record,
e3b78da8 235 (void (*)(struct hash_bucket *,
d62a17ae 236 void *))cpu_record_hash_clear,
237 args);
238 }
239 pthread_mutex_unlock(&m->mtx);
240 }
241 }
242 pthread_mutex_unlock(&masters_mtx);
62f44022
QY
243}
244
fbcac826 245static uint8_t parse_filter(const char *filterstr)
62f44022 246{
d62a17ae 247 int i = 0;
248 int filter = 0;
249
250 while (filterstr[i] != '\0') {
251 switch (filterstr[i]) {
252 case 'r':
253 case 'R':
254 filter |= (1 << THREAD_READ);
255 break;
256 case 'w':
257 case 'W':
258 filter |= (1 << THREAD_WRITE);
259 break;
260 case 't':
261 case 'T':
262 filter |= (1 << THREAD_TIMER);
263 break;
264 case 'e':
265 case 'E':
266 filter |= (1 << THREAD_EVENT);
267 break;
268 case 'x':
269 case 'X':
270 filter |= (1 << THREAD_EXECUTE);
271 break;
272 default:
273 break;
274 }
275 ++i;
276 }
277 return filter;
62f44022
QY
278}
279
280DEFUN (show_thread_cpu,
281 show_thread_cpu_cmd,
282 "show thread cpu [FILTER]",
283 SHOW_STR
284 "Thread information\n"
285 "Thread CPU usage\n"
61fa0b97 286 "Display filter (rwtex)\n")
62f44022 287{
fbcac826 288 uint8_t filter = (uint8_t)-1U;
d62a17ae 289 int idx = 0;
290
291 if (argv_find(argv, argc, "FILTER", &idx)) {
292 filter = parse_filter(argv[idx]->arg);
293 if (!filter) {
294 vty_out(vty,
295 "Invalid filter \"%s\" specified; must contain at least"
296 "one of 'RWTEXB'\n",
297 argv[idx]->arg);
298 return CMD_WARNING;
299 }
300 }
301
302 cpu_record_print(vty, filter);
303 return CMD_SUCCESS;
e276eb82
PJ
304}
305
8872626b
DS
306static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
307{
308 const char *name = m->name ? m->name : "main";
309 char underline[strlen(name) + 1];
a0b36ae6 310 struct thread *thread;
8872626b
DS
311 uint32_t i;
312
313 memset(underline, '-', sizeof(underline));
314 underline[sizeof(underline) - 1] = '\0';
315
316 vty_out(vty, "\nShowing poll FD's for %s\n", name);
317 vty_out(vty, "----------------------%s\n", underline);
6c19478a
DS
318 vty_out(vty, "Count: %u/%d\n", (uint32_t)m->handler.pfdcount,
319 m->fd_limit);
a0b36ae6
DS
320 for (i = 0; i < m->handler.pfdcount; i++) {
321 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\t\t", i,
322 m->handler.pfds[i].fd, m->handler.pfds[i].events,
8872626b 323 m->handler.pfds[i].revents);
a0b36ae6
DS
324
325 if (m->handler.pfds[i].events & POLLIN) {
326 thread = m->read[m->handler.pfds[i].fd];
327
328 if (!thread)
329 vty_out(vty, "ERROR ");
330 else
331 vty_out(vty, "%s ", thread->funcname);
332 } else
333 vty_out(vty, " ");
334
335 if (m->handler.pfds[i].events & POLLOUT) {
336 thread = m->write[m->handler.pfds[i].fd];
337
338 if (!thread)
339 vty_out(vty, "ERROR\n");
340 else
341 vty_out(vty, "%s\n", thread->funcname);
342 } else
343 vty_out(vty, "\n");
344 }
8872626b
DS
345}
346
347DEFUN (show_thread_poll,
348 show_thread_poll_cmd,
349 "show thread poll",
350 SHOW_STR
351 "Thread information\n"
352 "Show poll FD's and information\n")
353{
354 struct listnode *node;
355 struct thread_master *m;
356
357 pthread_mutex_lock(&masters_mtx);
358 {
359 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
360 show_thread_poll_helper(vty, m);
361 }
362 }
363 pthread_mutex_unlock(&masters_mtx);
364
365 return CMD_SUCCESS;
366}
367
368
49d41a26
DS
369DEFUN (clear_thread_cpu,
370 clear_thread_cpu_cmd,
371 "clear thread cpu [FILTER]",
62f44022 372 "Clear stored data in all pthreads\n"
49d41a26
DS
373 "Thread information\n"
374 "Thread CPU usage\n"
375 "Display filter (rwtexb)\n")
e276eb82 376{
fbcac826 377 uint8_t filter = (uint8_t)-1U;
d62a17ae 378 int idx = 0;
379
380 if (argv_find(argv, argc, "FILTER", &idx)) {
381 filter = parse_filter(argv[idx]->arg);
382 if (!filter) {
383 vty_out(vty,
384 "Invalid filter \"%s\" specified; must contain at least"
385 "one of 'RWTEXB'\n",
386 argv[idx]->arg);
387 return CMD_WARNING;
388 }
389 }
390
391 cpu_record_clear(filter);
392 return CMD_SUCCESS;
e276eb82 393}
6b0655a2 394
d62a17ae 395void thread_cmd_init(void)
0b84f294 396{
d62a17ae 397 install_element(VIEW_NODE, &show_thread_cpu_cmd);
8872626b 398 install_element(VIEW_NODE, &show_thread_poll_cmd);
d62a17ae 399 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
0b84f294 400}
62f44022
QY
401/* CLI end ------------------------------------------------------------------ */
402
0b84f294 403
8390828d
DL
404static int thread_timer_cmp(void *a, void *b)
405{
406 struct thread *thread_a = a;
407 struct thread *thread_b = b;
408
409 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, <))
410 return -1;
411 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, >))
412 return 1;
413 return 0;
414}
415
416static void thread_timer_update(void *node, int actual_position)
417{
418 struct thread *thread = node;
419
420 thread->index = actual_position;
421}
422
d62a17ae 423static void cancelreq_del(void *cr)
63ccb9cb 424{
d62a17ae 425 XFREE(MTYPE_TMP, cr);
63ccb9cb
QY
426}
427
e0bebc7c 428/* initializer, only ever called once */
4d762f26 429static void initializer(void)
e0bebc7c 430{
d62a17ae 431 pthread_key_create(&thread_current, NULL);
e0bebc7c
QY
432}
433
d62a17ae 434struct thread_master *thread_master_create(const char *name)
718e3744 435{
d62a17ae 436 struct thread_master *rv;
437 struct rlimit limit;
438
439 pthread_once(&init_once, &initializer);
440
441 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
d62a17ae 442
443 /* Initialize master mutex */
444 pthread_mutex_init(&rv->mtx, NULL);
445 pthread_cond_init(&rv->cancel_cond, NULL);
446
447 /* Set name */
448 rv->name = name ? XSTRDUP(MTYPE_THREAD_MASTER, name) : NULL;
449
450 /* Initialize I/O task data structures */
451 getrlimit(RLIMIT_NOFILE, &limit);
452 rv->fd_limit = (int)limit.rlim_cur;
a6f235f3
DS
453 rv->read = XCALLOC(MTYPE_THREAD_POLL,
454 sizeof(struct thread *) * rv->fd_limit);
455
456 rv->write = XCALLOC(MTYPE_THREAD_POLL,
457 sizeof(struct thread *) * rv->fd_limit);
d62a17ae 458
bd74dc61 459 rv->cpu_record = hash_create_size(
d8b87afe 460 8, (unsigned int (*)(const void *))cpu_record_hash_key,
74df8d6d 461 (bool (*)(const void *, const void *))cpu_record_hash_cmp,
bd74dc61 462 "Thread Hash");
d62a17ae 463
c284542b
DL
464 thread_list_init(&rv->event);
465 thread_list_init(&rv->ready);
466 thread_list_init(&rv->unuse);
8390828d
DL
467
468 /* Initialize the timer queues */
469 rv->timer = pqueue_create();
470 rv->timer->cmp = thread_timer_cmp;
471 rv->timer->update = thread_timer_update;
d62a17ae 472
473 /* Initialize thread_fetch() settings */
474 rv->spin = true;
475 rv->handle_signals = true;
476
477 /* Set pthread owner, should be updated by actual owner */
478 rv->owner = pthread_self();
479 rv->cancel_req = list_new();
480 rv->cancel_req->del = cancelreq_del;
481 rv->canceled = true;
482
483 /* Initialize pipe poker */
484 pipe(rv->io_pipe);
485 set_nonblocking(rv->io_pipe[0]);
486 set_nonblocking(rv->io_pipe[1]);
487
488 /* Initialize data structures for poll() */
489 rv->handler.pfdsize = rv->fd_limit;
490 rv->handler.pfdcount = 0;
491 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
492 sizeof(struct pollfd) * rv->handler.pfdsize);
493 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
494 sizeof(struct pollfd) * rv->handler.pfdsize);
495
eff09c66 496 /* add to list of threadmasters */
d62a17ae 497 pthread_mutex_lock(&masters_mtx);
498 {
eff09c66
QY
499 if (!masters)
500 masters = list_new();
501
d62a17ae 502 listnode_add(masters, rv);
503 }
504 pthread_mutex_unlock(&masters_mtx);
505
506 return rv;
718e3744 507}
508
d8a8a8de
QY
509void thread_master_set_name(struct thread_master *master, const char *name)
510{
511 pthread_mutex_lock(&master->mtx);
512 {
0a22ddfb 513 XFREE(MTYPE_THREAD_MASTER, master->name);
d8a8a8de
QY
514 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
515 }
516 pthread_mutex_unlock(&master->mtx);
517}
518
6ed04aa2
DS
519#define THREAD_UNUSED_DEPTH 10
520
718e3744 521/* Move thread to unuse list. */
d62a17ae 522static void thread_add_unuse(struct thread_master *m, struct thread *thread)
718e3744 523{
6655966d
RZ
524 pthread_mutex_t mtxc = thread->mtx;
525
d62a17ae 526 assert(m != NULL && thread != NULL);
d62a17ae 527
d62a17ae 528 thread->hist->total_active--;
6ed04aa2
DS
529 memset(thread, 0, sizeof(struct thread));
530 thread->type = THREAD_UNUSED;
531
6655966d
RZ
532 /* Restore the thread mutex context. */
533 thread->mtx = mtxc;
534
c284542b
DL
535 if (thread_list_count(&m->unuse) < THREAD_UNUSED_DEPTH) {
536 thread_list_add_tail(&m->unuse, thread);
6655966d
RZ
537 return;
538 }
539
540 thread_free(m, thread);
718e3744 541}
542
543/* Free all unused thread. */
c284542b
DL
544static void thread_list_free(struct thread_master *m,
545 struct thread_list_head *list)
718e3744 546{
d62a17ae 547 struct thread *t;
d62a17ae 548
c284542b 549 while ((t = thread_list_pop(list)))
6655966d 550 thread_free(m, t);
718e3744 551}
552
d62a17ae 553static void thread_array_free(struct thread_master *m,
554 struct thread **thread_array)
308d14ae 555{
d62a17ae 556 struct thread *t;
557 int index;
558
559 for (index = 0; index < m->fd_limit; ++index) {
560 t = thread_array[index];
561 if (t) {
562 thread_array[index] = NULL;
6655966d 563 thread_free(m, t);
d62a17ae 564 }
565 }
a6f235f3 566 XFREE(MTYPE_THREAD_POLL, thread_array);
308d14ae
DV
567}
568
8390828d
DL
569static void thread_queue_free(struct thread_master *m, struct pqueue *queue)
570{
571 int i;
572
573 for (i = 0; i < queue->size; i++)
574 thread_free(m, queue->array[i]);
575
576 pqueue_delete(queue);
577}
578
495f0b13
DS
579/*
580 * thread_master_free_unused
581 *
582 * As threads are finished with they are put on the
583 * unuse list for later reuse.
584 * If we are shutting down, Free up unused threads
585 * So we can see if we forget to shut anything off
586 */
d62a17ae 587void thread_master_free_unused(struct thread_master *m)
495f0b13 588{
d62a17ae 589 pthread_mutex_lock(&m->mtx);
590 {
591 struct thread *t;
c284542b 592 while ((t = thread_list_pop(&m->unuse)))
6655966d 593 thread_free(m, t);
d62a17ae 594 }
595 pthread_mutex_unlock(&m->mtx);
495f0b13
DS
596}
597
718e3744 598/* Stop thread scheduler. */
d62a17ae 599void thread_master_free(struct thread_master *m)
718e3744 600{
d62a17ae 601 pthread_mutex_lock(&masters_mtx);
602 {
603 listnode_delete(masters, m);
eff09c66 604 if (masters->count == 0) {
6a154c88 605 list_delete(&masters);
eff09c66 606 }
d62a17ae 607 }
608 pthread_mutex_unlock(&masters_mtx);
609
610 thread_array_free(m, m->read);
611 thread_array_free(m, m->write);
8390828d 612 thread_queue_free(m, m->timer);
d62a17ae 613 thread_list_free(m, &m->event);
614 thread_list_free(m, &m->ready);
615 thread_list_free(m, &m->unuse);
616 pthread_mutex_destroy(&m->mtx);
33844bbe 617 pthread_cond_destroy(&m->cancel_cond);
d62a17ae 618 close(m->io_pipe[0]);
619 close(m->io_pipe[1]);
6a154c88 620 list_delete(&m->cancel_req);
1a0a92ea 621 m->cancel_req = NULL;
d62a17ae 622
623 hash_clean(m->cpu_record, cpu_record_hash_free);
624 hash_free(m->cpu_record);
625 m->cpu_record = NULL;
626
0a22ddfb 627 XFREE(MTYPE_THREAD_MASTER, m->name);
d62a17ae 628 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
629 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
630 XFREE(MTYPE_THREAD_MASTER, m);
718e3744 631}
632
78ca0342
CF
633/* Return remain time in miliseconds. */
634unsigned long thread_timer_remain_msec(struct thread *thread)
718e3744 635{
d62a17ae 636 int64_t remain;
1189d95f 637
d62a17ae 638 pthread_mutex_lock(&thread->mtx);
639 {
78ca0342 640 remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
d62a17ae 641 }
642 pthread_mutex_unlock(&thread->mtx);
1189d95f 643
d62a17ae 644 return remain < 0 ? 0 : remain;
718e3744 645}
646
78ca0342
CF
647/* Return remain time in seconds. */
648unsigned long thread_timer_remain_second(struct thread *thread)
649{
650 return thread_timer_remain_msec(thread) / 1000LL;
651}
652
9c7753e4
DL
653#define debugargdef const char *funcname, const char *schedfrom, int fromln
654#define debugargpass funcname, schedfrom, fromln
e04ab74d 655
d62a17ae 656struct timeval thread_timer_remain(struct thread *thread)
6ac44687 657{
d62a17ae 658 struct timeval remain;
659 pthread_mutex_lock(&thread->mtx);
660 {
661 monotime_until(&thread->u.sands, &remain);
662 }
663 pthread_mutex_unlock(&thread->mtx);
664 return remain;
6ac44687
CF
665}
666
718e3744 667/* Get new thread. */
d7c0a89a 668static struct thread *thread_get(struct thread_master *m, uint8_t type,
d62a17ae 669 int (*func)(struct thread *), void *arg,
670 debugargdef)
718e3744 671{
c284542b 672 struct thread *thread = thread_list_pop(&m->unuse);
d62a17ae 673 struct cpu_thread_history tmp;
674
675 if (!thread) {
676 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
677 /* mutex only needs to be initialized at struct creation. */
678 pthread_mutex_init(&thread->mtx, NULL);
679 m->alloc++;
680 }
681
682 thread->type = type;
683 thread->add_type = type;
684 thread->master = m;
685 thread->arg = arg;
8390828d 686 thread->index = -1;
d62a17ae 687 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
688 thread->ref = NULL;
689
690 /*
691 * So if the passed in funcname is not what we have
692 * stored that means the thread->hist needs to be
693 * updated. We keep the last one around in unused
694 * under the assumption that we are probably
695 * going to immediately allocate the same
696 * type of thread.
697 * This hopefully saves us some serious
698 * hash_get lookups.
699 */
700 if (thread->funcname != funcname || thread->func != func) {
701 tmp.func = func;
702 tmp.funcname = funcname;
703 thread->hist =
704 hash_get(m->cpu_record, &tmp,
705 (void *(*)(void *))cpu_record_hash_alloc);
706 }
707 thread->hist->total_active++;
708 thread->func = func;
709 thread->funcname = funcname;
710 thread->schedfrom = schedfrom;
711 thread->schedfrom_line = fromln;
712
713 return thread;
718e3744 714}
715
6655966d
RZ
716static void thread_free(struct thread_master *master, struct thread *thread)
717{
718 /* Update statistics. */
719 assert(master->alloc > 0);
720 master->alloc--;
721
722 /* Free allocated resources. */
723 pthread_mutex_destroy(&thread->mtx);
724 XFREE(MTYPE_THREAD, thread);
725}
726
d62a17ae 727static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
728 nfds_t count, const struct timeval *timer_wait)
209a72a6 729{
d62a17ae 730 /* If timer_wait is null here, that means poll() should block
731 * indefinitely,
732 * unless the thread_master has overriden it by setting
733 * ->selectpoll_timeout.
734 * If the value is positive, it specifies the maximum number of
735 * milliseconds
736 * to wait. If the timeout is -1, it specifies that we should never wait
737 * and
738 * always return immediately even if no event is detected. If the value
739 * is
740 * zero, the behavior is default. */
741 int timeout = -1;
742
743 /* number of file descriptors with events */
744 int num;
745
746 if (timer_wait != NULL
747 && m->selectpoll_timeout == 0) // use the default value
748 timeout = (timer_wait->tv_sec * 1000)
749 + (timer_wait->tv_usec / 1000);
750 else if (m->selectpoll_timeout > 0) // use the user's timeout
751 timeout = m->selectpoll_timeout;
752 else if (m->selectpoll_timeout
753 < 0) // effect a poll (return immediately)
754 timeout = 0;
755
756 /* add poll pipe poker */
757 assert(count + 1 < pfdsize);
758 pfds[count].fd = m->io_pipe[0];
759 pfds[count].events = POLLIN;
760 pfds[count].revents = 0x00;
761
762 num = poll(pfds, count + 1, timeout);
763
764 unsigned char trash[64];
765 if (num > 0 && pfds[count].revents != 0 && num--)
766 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
767 ;
768
769 return num;
209a72a6
DS
770}
771
718e3744 772/* Add new read thread. */
d62a17ae 773struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
774 int (*func)(struct thread *),
775 void *arg, int fd,
776 struct thread **t_ptr,
777 debugargdef)
718e3744 778{
d62a17ae 779 struct thread *thread = NULL;
1ef14bee 780 struct thread **thread_array;
d62a17ae 781
9b864cd3 782 assert(fd >= 0 && fd < m->fd_limit);
d62a17ae 783 pthread_mutex_lock(&m->mtx);
784 {
785 if (t_ptr
786 && *t_ptr) // thread is already scheduled; don't reschedule
787 {
788 pthread_mutex_unlock(&m->mtx);
789 return NULL;
790 }
791
792 /* default to a new pollfd */
793 nfds_t queuepos = m->handler.pfdcount;
794
1ef14bee
DS
795 if (dir == THREAD_READ)
796 thread_array = m->read;
797 else
798 thread_array = m->write;
799
d62a17ae 800 /* if we already have a pollfd for our file descriptor, find and
801 * use it */
802 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
803 if (m->handler.pfds[i].fd == fd) {
804 queuepos = i;
1ef14bee
DS
805
806#ifdef DEV_BUILD
807 /*
808 * What happens if we have a thread already
809 * created for this event?
810 */
811 if (thread_array[fd])
812 assert(!"Thread already scheduled for file descriptor");
813#endif
d62a17ae 814 break;
815 }
816
817 /* make sure we have room for this fd + pipe poker fd */
818 assert(queuepos + 1 < m->handler.pfdsize);
819
820 thread = thread_get(m, dir, func, arg, debugargpass);
821
822 m->handler.pfds[queuepos].fd = fd;
823 m->handler.pfds[queuepos].events |=
824 (dir == THREAD_READ ? POLLIN : POLLOUT);
825
826 if (queuepos == m->handler.pfdcount)
827 m->handler.pfdcount++;
828
829 if (thread) {
830 pthread_mutex_lock(&thread->mtx);
831 {
832 thread->u.fd = fd;
1ef14bee 833 thread_array[thread->u.fd] = thread;
d62a17ae 834 }
835 pthread_mutex_unlock(&thread->mtx);
836
837 if (t_ptr) {
838 *t_ptr = thread;
839 thread->ref = t_ptr;
840 }
841 }
842
843 AWAKEN(m);
844 }
845 pthread_mutex_unlock(&m->mtx);
846
847 return thread;
718e3744 848}
849
56a94b36 850static struct thread *
d62a17ae 851funcname_thread_add_timer_timeval(struct thread_master *m,
852 int (*func)(struct thread *), int type,
853 void *arg, struct timeval *time_relative,
854 struct thread **t_ptr, debugargdef)
718e3744 855{
d62a17ae 856 struct thread *thread;
8390828d 857 struct pqueue *queue;
d62a17ae 858
859 assert(m != NULL);
860
861 assert(type == THREAD_TIMER);
862 assert(time_relative);
863
864 pthread_mutex_lock(&m->mtx);
865 {
866 if (t_ptr
867 && *t_ptr) // thread is already scheduled; don't reschedule
868 {
869 pthread_mutex_unlock(&m->mtx);
870 return NULL;
871 }
872
8390828d 873 queue = m->timer;
d62a17ae 874 thread = thread_get(m, type, func, arg, debugargpass);
875
876 pthread_mutex_lock(&thread->mtx);
877 {
878 monotime(&thread->u.sands);
879 timeradd(&thread->u.sands, time_relative,
880 &thread->u.sands);
8390828d 881 pqueue_enqueue(thread, queue);
d62a17ae 882 if (t_ptr) {
883 *t_ptr = thread;
884 thread->ref = t_ptr;
885 }
886 }
887 pthread_mutex_unlock(&thread->mtx);
888
889 AWAKEN(m);
890 }
891 pthread_mutex_unlock(&m->mtx);
892
893 return thread;
9e867fe6 894}
895
98c91ac6 896
897/* Add timer event thread. */
d62a17ae 898struct thread *funcname_thread_add_timer(struct thread_master *m,
899 int (*func)(struct thread *),
900 void *arg, long timer,
901 struct thread **t_ptr, debugargdef)
9e867fe6 902{
d62a17ae 903 struct timeval trel;
9e867fe6 904
d62a17ae 905 assert(m != NULL);
9e867fe6 906
d62a17ae 907 trel.tv_sec = timer;
908 trel.tv_usec = 0;
9e867fe6 909
d62a17ae 910 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
911 &trel, t_ptr, debugargpass);
98c91ac6 912}
9e867fe6 913
98c91ac6 914/* Add timer event thread with "millisecond" resolution */
d62a17ae 915struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
916 int (*func)(struct thread *),
917 void *arg, long timer,
918 struct thread **t_ptr,
919 debugargdef)
98c91ac6 920{
d62a17ae 921 struct timeval trel;
9e867fe6 922
d62a17ae 923 assert(m != NULL);
718e3744 924
d62a17ae 925 trel.tv_sec = timer / 1000;
926 trel.tv_usec = 1000 * (timer % 1000);
98c91ac6 927
d62a17ae 928 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
929 &trel, t_ptr, debugargpass);
a48b4e6d 930}
931
d03c4cbd 932/* Add timer event thread with "millisecond" resolution */
d62a17ae 933struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
934 int (*func)(struct thread *),
935 void *arg, struct timeval *tv,
936 struct thread **t_ptr, debugargdef)
d03c4cbd 937{
d62a17ae 938 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
939 t_ptr, debugargpass);
d03c4cbd
DL
940}
941
718e3744 942/* Add simple event thread. */
d62a17ae 943struct thread *funcname_thread_add_event(struct thread_master *m,
944 int (*func)(struct thread *),
945 void *arg, int val,
946 struct thread **t_ptr, debugargdef)
718e3744 947{
d62a17ae 948 struct thread *thread;
949
950 assert(m != NULL);
951
952 pthread_mutex_lock(&m->mtx);
953 {
954 if (t_ptr
955 && *t_ptr) // thread is already scheduled; don't reschedule
956 {
957 pthread_mutex_unlock(&m->mtx);
958 return NULL;
959 }
960
961 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
962 pthread_mutex_lock(&thread->mtx);
963 {
964 thread->u.val = val;
c284542b 965 thread_list_add_tail(&m->event, thread);
d62a17ae 966 }
967 pthread_mutex_unlock(&thread->mtx);
968
969 if (t_ptr) {
970 *t_ptr = thread;
971 thread->ref = t_ptr;
972 }
973
974 AWAKEN(m);
975 }
976 pthread_mutex_unlock(&m->mtx);
977
978 return thread;
718e3744 979}
980
63ccb9cb
QY
981/* Thread cancellation ------------------------------------------------------ */
982
8797240e
QY
983/**
984 * NOT's out the .events field of pollfd corresponding to the given file
985 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
986 *
987 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
988 * implementation for details.
989 *
990 * @param master
991 * @param fd
992 * @param state the event to cancel. One or more (OR'd together) of the
993 * following:
994 * - POLLIN
995 * - POLLOUT
996 */
d62a17ae 997static void thread_cancel_rw(struct thread_master *master, int fd, short state)
0a95a0d0 998{
42d74538
QY
999 bool found = false;
1000
d62a17ae 1001 /* Cancel POLLHUP too just in case some bozo set it */
1002 state |= POLLHUP;
1003
1004 /* find the index of corresponding pollfd */
1005 nfds_t i;
1006
1007 for (i = 0; i < master->handler.pfdcount; i++)
42d74538
QY
1008 if (master->handler.pfds[i].fd == fd) {
1009 found = true;
d62a17ae 1010 break;
42d74538
QY
1011 }
1012
1013 if (!found) {
1014 zlog_debug(
1015 "[!] Received cancellation request for nonexistent rw job");
1016 zlog_debug("[!] threadmaster: %s | fd: %d",
996c9314 1017 master->name ? master->name : "", fd);
42d74538
QY
1018 return;
1019 }
d62a17ae 1020
1021 /* NOT out event. */
1022 master->handler.pfds[i].events &= ~(state);
1023
1024 /* If all events are canceled, delete / resize the pollfd array. */
1025 if (master->handler.pfds[i].events == 0) {
1026 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
1027 (master->handler.pfdcount - i - 1)
1028 * sizeof(struct pollfd));
1029 master->handler.pfdcount--;
1030 }
1031
1032 /* If we have the same pollfd in the copy, perform the same operations,
1033 * otherwise return. */
1034 if (i >= master->handler.copycount)
1035 return;
1036
1037 master->handler.copy[i].events &= ~(state);
1038
1039 if (master->handler.copy[i].events == 0) {
1040 memmove(master->handler.copy + i, master->handler.copy + i + 1,
1041 (master->handler.copycount - i - 1)
1042 * sizeof(struct pollfd));
1043 master->handler.copycount--;
1044 }
0a95a0d0
DS
1045}
1046
1189d95f 1047/**
63ccb9cb 1048 * Process cancellation requests.
1189d95f 1049 *
63ccb9cb
QY
1050 * This may only be run from the pthread which owns the thread_master.
1051 *
1052 * @param master the thread master to process
1053 * @REQUIRE master->mtx
1189d95f 1054 */
d62a17ae 1055static void do_thread_cancel(struct thread_master *master)
718e3744 1056{
c284542b 1057 struct thread_list_head *list = NULL;
8390828d 1058 struct pqueue *queue = NULL;
d62a17ae 1059 struct thread **thread_array = NULL;
1060 struct thread *thread;
1061
1062 struct cancel_req *cr;
1063 struct listnode *ln;
1064 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1065 /* If this is an event object cancellation, linear search
1066 * through event
1067 * list deleting any events which have the specified argument.
1068 * We also
1069 * need to check every thread in the ready queue. */
1070 if (cr->eventobj) {
1071 struct thread *t;
c284542b 1072
81fddbe7 1073 frr_each_safe(thread_list, &master->event, t) {
c284542b
DL
1074 if (t->arg != cr->eventobj)
1075 continue;
1076 thread_list_del(&master->event, t);
1077 if (t->ref)
1078 *t->ref = NULL;
1079 thread_add_unuse(master, t);
d62a17ae 1080 }
1081
81fddbe7 1082 frr_each_safe(thread_list, &master->ready, t) {
c284542b
DL
1083 if (t->arg != cr->eventobj)
1084 continue;
1085 thread_list_del(&master->ready, t);
1086 if (t->ref)
1087 *t->ref = NULL;
1088 thread_add_unuse(master, t);
d62a17ae 1089 }
1090 continue;
1091 }
1092
1093 /* The pointer varies depending on whether the cancellation
1094 * request was
1095 * made asynchronously or not. If it was, we need to check
1096 * whether the
1097 * thread even exists anymore before cancelling it. */
1098 thread = (cr->thread) ? cr->thread : *cr->threadref;
1099
1100 if (!thread)
1101 continue;
1102
1103 /* Determine the appropriate queue to cancel the thread from */
1104 switch (thread->type) {
1105 case THREAD_READ:
1106 thread_cancel_rw(master, thread->u.fd, POLLIN);
1107 thread_array = master->read;
1108 break;
1109 case THREAD_WRITE:
1110 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1111 thread_array = master->write;
1112 break;
1113 case THREAD_TIMER:
8390828d 1114 queue = master->timer;
d62a17ae 1115 break;
1116 case THREAD_EVENT:
1117 list = &master->event;
1118 break;
1119 case THREAD_READY:
1120 list = &master->ready;
1121 break;
1122 default:
1123 continue;
1124 break;
1125 }
1126
8390828d
DL
1127 if (queue) {
1128 assert(thread->index >= 0);
1129 assert(thread == queue->array[thread->index]);
1130 pqueue_remove_at(thread->index, queue);
1131 } else if (list) {
c284542b 1132 thread_list_del(list, thread);
d62a17ae 1133 } else if (thread_array) {
1134 thread_array[thread->u.fd] = NULL;
8390828d
DL
1135 } else {
1136 assert(!"Thread should be either in queue or list or array!");
d62a17ae 1137 }
1138
1139 if (thread->ref)
1140 *thread->ref = NULL;
1141
1142 thread_add_unuse(thread->master, thread);
1143 }
1144
1145 /* Delete and free all cancellation requests */
1146 list_delete_all_node(master->cancel_req);
1147
1148 /* Wake up any threads which may be blocked in thread_cancel_async() */
1149 master->canceled = true;
1150 pthread_cond_broadcast(&master->cancel_cond);
718e3744 1151}
1152
63ccb9cb
QY
1153/**
1154 * Cancel any events which have the specified argument.
1155 *
1156 * MT-Unsafe
1157 *
1158 * @param m the thread_master to cancel from
1159 * @param arg the argument passed when creating the event
1160 */
d62a17ae 1161void thread_cancel_event(struct thread_master *master, void *arg)
718e3744 1162{
d62a17ae 1163 assert(master->owner == pthread_self());
1164
1165 pthread_mutex_lock(&master->mtx);
1166 {
1167 struct cancel_req *cr =
1168 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1169 cr->eventobj = arg;
1170 listnode_add(master->cancel_req, cr);
1171 do_thread_cancel(master);
1172 }
1173 pthread_mutex_unlock(&master->mtx);
63ccb9cb 1174}
1189d95f 1175
63ccb9cb
QY
1176/**
1177 * Cancel a specific task.
1178 *
1179 * MT-Unsafe
1180 *
1181 * @param thread task to cancel
1182 */
d62a17ae 1183void thread_cancel(struct thread *thread)
63ccb9cb 1184{
6ed04aa2 1185 struct thread_master *master = thread->master;
d62a17ae 1186
6ed04aa2
DS
1187 assert(master->owner == pthread_self());
1188
1189 pthread_mutex_lock(&master->mtx);
d62a17ae 1190 {
1191 struct cancel_req *cr =
1192 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1193 cr->thread = thread;
6ed04aa2
DS
1194 listnode_add(master->cancel_req, cr);
1195 do_thread_cancel(master);
d62a17ae 1196 }
6ed04aa2 1197 pthread_mutex_unlock(&master->mtx);
63ccb9cb 1198}
1189d95f 1199
63ccb9cb
QY
1200/**
1201 * Asynchronous cancellation.
1202 *
8797240e
QY
1203 * Called with either a struct thread ** or void * to an event argument,
1204 * this function posts the correct cancellation request and blocks until it is
1205 * serviced.
63ccb9cb
QY
1206 *
1207 * If the thread is currently running, execution blocks until it completes.
1208 *
8797240e
QY
1209 * The last two parameters are mutually exclusive, i.e. if you pass one the
1210 * other must be NULL.
1211 *
1212 * When the cancellation procedure executes on the target thread_master, the
1213 * thread * provided is checked for nullity. If it is null, the thread is
1214 * assumed to no longer exist and the cancellation request is a no-op. Thus
1215 * users of this API must pass a back-reference when scheduling the original
1216 * task.
1217 *
63ccb9cb
QY
1218 * MT-Safe
1219 *
8797240e
QY
1220 * @param master the thread master with the relevant event / task
1221 * @param thread pointer to thread to cancel
1222 * @param eventobj the event
63ccb9cb 1223 */
d62a17ae 1224void thread_cancel_async(struct thread_master *master, struct thread **thread,
1225 void *eventobj)
63ccb9cb 1226{
d62a17ae 1227 assert(!(thread && eventobj) && (thread || eventobj));
1228 assert(master->owner != pthread_self());
1229
1230 pthread_mutex_lock(&master->mtx);
1231 {
1232 master->canceled = false;
1233
1234 if (thread) {
1235 struct cancel_req *cr =
1236 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1237 cr->threadref = thread;
1238 listnode_add(master->cancel_req, cr);
1239 } else if (eventobj) {
1240 struct cancel_req *cr =
1241 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1242 cr->eventobj = eventobj;
1243 listnode_add(master->cancel_req, cr);
1244 }
1245 AWAKEN(master);
1246
1247 while (!master->canceled)
1248 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1249 }
1250 pthread_mutex_unlock(&master->mtx);
718e3744 1251}
63ccb9cb 1252/* ------------------------------------------------------------------------- */
718e3744 1253
8390828d 1254static struct timeval *thread_timer_wait(struct pqueue *queue,
d62a17ae 1255 struct timeval *timer_val)
718e3744 1256{
8390828d
DL
1257 if (queue->size) {
1258 struct thread *next_timer = queue->array[0];
1259 monotime_until(&next_timer->u.sands, timer_val);
1260 return timer_val;
1261 }
1262 return NULL;
718e3744 1263}
718e3744 1264
d62a17ae 1265static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1266 struct thread *fetch)
718e3744 1267{
d62a17ae 1268 *fetch = *thread;
1269 thread_add_unuse(m, thread);
1270 return fetch;
718e3744 1271}
1272
d62a17ae 1273static int thread_process_io_helper(struct thread_master *m,
45f3d590
DS
1274 struct thread *thread, short state,
1275 short actual_state, int pos)
5d4ccd4e 1276{
d62a17ae 1277 struct thread **thread_array;
1278
45f3d590
DS
1279 /*
1280 * poll() clears the .events field, but the pollfd array we
1281 * pass to poll() is a copy of the one used to schedule threads.
1282 * We need to synchronize state between the two here by applying
1283 * the same changes poll() made on the copy of the "real" pollfd
1284 * array.
1285 *
1286 * This cleans up a possible infinite loop where we refuse
1287 * to respond to a poll event but poll is insistent that
1288 * we should.
1289 */
1290 m->handler.pfds[pos].events &= ~(state);
1291
1292 if (!thread) {
1293 if ((actual_state & (POLLHUP|POLLIN)) != POLLHUP)
1294 flog_err(EC_LIB_NO_THREAD,
1295 "Attempting to process an I/O event but for fd: %d(%d) no thread to handle this!\n",
1296 m->handler.pfds[pos].fd, actual_state);
d62a17ae 1297 return 0;
45f3d590 1298 }
d62a17ae 1299
1300 if (thread->type == THREAD_READ)
1301 thread_array = m->read;
1302 else
1303 thread_array = m->write;
1304
1305 thread_array[thread->u.fd] = NULL;
c284542b 1306 thread_list_add_tail(&m->ready, thread);
d62a17ae 1307 thread->type = THREAD_READY;
45f3d590 1308
d62a17ae 1309 return 1;
5d4ccd4e
DS
1310}
1311
8797240e
QY
1312/**
1313 * Process I/O events.
1314 *
1315 * Walks through file descriptor array looking for those pollfds whose .revents
1316 * field has something interesting. Deletes any invalid file descriptors.
1317 *
1318 * @param m the thread master
1319 * @param num the number of active file descriptors (return value of poll())
1320 */
d62a17ae 1321static void thread_process_io(struct thread_master *m, unsigned int num)
0a95a0d0 1322{
d62a17ae 1323 unsigned int ready = 0;
1324 struct pollfd *pfds = m->handler.copy;
1325
1326 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1327 /* no event for current fd? immediately continue */
1328 if (pfds[i].revents == 0)
1329 continue;
1330
1331 ready++;
1332
1333 /* Unless someone has called thread_cancel from another pthread,
1334 * the only
1335 * thing that could have changed in m->handler.pfds while we
1336 * were
1337 * asleep is the .events field in a given pollfd. Barring
1338 * thread_cancel()
1339 * that value should be a superset of the values we have in our
1340 * copy, so
1341 * there's no need to update it. Similarily, barring deletion,
1342 * the fd
1343 * should still be a valid index into the master's pfds. */
45f3d590 1344 if (pfds[i].revents & (POLLIN | POLLHUP)) {
d62a17ae 1345 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
45f3d590
DS
1346 pfds[i].revents, i);
1347 }
d62a17ae 1348 if (pfds[i].revents & POLLOUT)
1349 thread_process_io_helper(m, m->write[pfds[i].fd],
45f3d590 1350 POLLOUT, pfds[i].revents, i);
d62a17ae 1351
1352 /* if one of our file descriptors is garbage, remove the same
1353 * from
1354 * both pfds + update sizes and index */
1355 if (pfds[i].revents & POLLNVAL) {
1356 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1357 (m->handler.pfdcount - i - 1)
1358 * sizeof(struct pollfd));
1359 m->handler.pfdcount--;
1360
1361 memmove(pfds + i, pfds + i + 1,
1362 (m->handler.copycount - i - 1)
1363 * sizeof(struct pollfd));
1364 m->handler.copycount--;
1365
1366 i--;
1367 }
1368 }
718e3744 1369}
1370
8b70d0b0 1371/* Add all timers that have popped to the ready list. */
8390828d 1372static unsigned int thread_process_timers(struct pqueue *queue,
d62a17ae 1373 struct timeval *timenow)
a48b4e6d 1374{
d62a17ae 1375 struct thread *thread;
1376 unsigned int ready = 0;
1377
8390828d
DL
1378 while (queue->size) {
1379 thread = queue->array[0];
d62a17ae 1380 if (timercmp(timenow, &thread->u.sands, <))
1381 return ready;
8390828d 1382 pqueue_dequeue(queue);
d62a17ae 1383 thread->type = THREAD_READY;
c284542b 1384 thread_list_add_tail(&thread->master->ready, thread);
d62a17ae 1385 ready++;
1386 }
1387 return ready;
a48b4e6d 1388}
1389
2613abe6 1390/* process a list en masse, e.g. for event thread lists */
c284542b 1391static unsigned int thread_process(struct thread_list_head *list)
2613abe6 1392{
d62a17ae 1393 struct thread *thread;
d62a17ae 1394 unsigned int ready = 0;
1395
c284542b 1396 while ((thread = thread_list_pop(list))) {
d62a17ae 1397 thread->type = THREAD_READY;
c284542b 1398 thread_list_add_tail(&thread->master->ready, thread);
d62a17ae 1399 ready++;
1400 }
1401 return ready;
2613abe6
PJ
1402}
1403
1404
718e3744 1405/* Fetch next ready thread. */
d62a17ae 1406struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
718e3744 1407{
d62a17ae 1408 struct thread *thread = NULL;
1409 struct timeval now;
1410 struct timeval zerotime = {0, 0};
1411 struct timeval tv;
1412 struct timeval *tw = NULL;
1413
1414 int num = 0;
1415
1416 do {
1417 /* Handle signals if any */
1418 if (m->handle_signals)
1419 quagga_sigevent_process();
1420
1421 pthread_mutex_lock(&m->mtx);
1422
1423 /* Process any pending cancellation requests */
1424 do_thread_cancel(m);
1425
e3c9529e
QY
1426 /*
1427 * Attempt to flush ready queue before going into poll().
1428 * This is performance-critical. Think twice before modifying.
1429 */
c284542b 1430 if ((thread = thread_list_pop(&m->ready))) {
e3c9529e
QY
1431 fetch = thread_run(m, thread, fetch);
1432 if (fetch->ref)
1433 *fetch->ref = NULL;
1434 pthread_mutex_unlock(&m->mtx);
1435 break;
1436 }
1437
1438 /* otherwise, tick through scheduling sequence */
1439
bca37d17
QY
1440 /*
1441 * Post events to ready queue. This must come before the
1442 * following block since events should occur immediately
1443 */
d62a17ae 1444 thread_process(&m->event);
1445
bca37d17
QY
1446 /*
1447 * If there are no tasks on the ready queue, we will poll()
1448 * until a timer expires or we receive I/O, whichever comes
1449 * first. The strategy for doing this is:
d62a17ae 1450 *
1451 * - If there are events pending, set the poll() timeout to zero
1452 * - If there are no events pending, but there are timers
1453 * pending, set the
1454 * timeout to the smallest remaining time on any timer
1455 * - If there are neither timers nor events pending, but there
1456 * are file
1457 * descriptors pending, block indefinitely in poll()
1458 * - If nothing is pending, it's time for the application to die
1459 *
1460 * In every case except the last, we need to hit poll() at least
bca37d17
QY
1461 * once per loop to avoid starvation by events
1462 */
c284542b 1463 if (!thread_list_count(&m->ready))
8390828d 1464 tw = thread_timer_wait(m->timer, &tv);
d62a17ae 1465
c284542b
DL
1466 if (thread_list_count(&m->ready) ||
1467 (tw && !timercmp(tw, &zerotime, >)))
d62a17ae 1468 tw = &zerotime;
1469
1470 if (!tw && m->handler.pfdcount == 0) { /* die */
1471 pthread_mutex_unlock(&m->mtx);
1472 fetch = NULL;
1473 break;
1474 }
1475
bca37d17
QY
1476 /*
1477 * Copy pollfd array + # active pollfds in it. Not necessary to
1478 * copy the array size as this is fixed.
1479 */
d62a17ae 1480 m->handler.copycount = m->handler.pfdcount;
1481 memcpy(m->handler.copy, m->handler.pfds,
1482 m->handler.copycount * sizeof(struct pollfd));
1483
e3c9529e
QY
1484 pthread_mutex_unlock(&m->mtx);
1485 {
1486 num = fd_poll(m, m->handler.copy, m->handler.pfdsize,
1487 m->handler.copycount, tw);
1488 }
1489 pthread_mutex_lock(&m->mtx);
d764d2cc 1490
e3c9529e
QY
1491 /* Handle any errors received in poll() */
1492 if (num < 0) {
1493 if (errno == EINTR) {
d62a17ae 1494 pthread_mutex_unlock(&m->mtx);
e3c9529e
QY
1495 /* loop around to signal handler */
1496 continue;
d62a17ae 1497 }
1498
e3c9529e 1499 /* else die */
450971aa 1500 flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
9ef9495e 1501 safe_strerror(errno));
e3c9529e
QY
1502 pthread_mutex_unlock(&m->mtx);
1503 fetch = NULL;
1504 break;
bca37d17 1505 }
d62a17ae 1506
1507 /* Post timers to ready queue. */
1508 monotime(&now);
8390828d 1509 thread_process_timers(m->timer, &now);
d62a17ae 1510
1511 /* Post I/O to ready queue. */
1512 if (num > 0)
1513 thread_process_io(m, num);
1514
d62a17ae 1515 pthread_mutex_unlock(&m->mtx);
1516
1517 } while (!thread && m->spin);
1518
1519 return fetch;
718e3744 1520}
1521
d62a17ae 1522static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
62f44022 1523{
d62a17ae 1524 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1525 + (a.tv_usec - b.tv_usec));
62f44022
QY
1526}
1527
d62a17ae 1528unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1529 unsigned long *cputime)
718e3744 1530{
d62a17ae 1531 /* This is 'user + sys' time. */
1532 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1533 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1534 return timeval_elapsed(now->real, start->real);
8b70d0b0 1535}
1536
50596be0
DS
1537/* We should aim to yield after yield milliseconds, which defaults
1538 to THREAD_YIELD_TIME_SLOT .
8b70d0b0 1539 Note: we are using real (wall clock) time for this calculation.
1540 It could be argued that CPU time may make more sense in certain
1541 contexts. The things to consider are whether the thread may have
1542 blocked (in which case wall time increases, but CPU time does not),
1543 or whether the system is heavily loaded with other processes competing
d62a17ae 1544 for CPU time. On balance, wall clock time seems to make sense.
8b70d0b0 1545 Plus it has the added benefit that gettimeofday should be faster
1546 than calling getrusage. */
d62a17ae 1547int thread_should_yield(struct thread *thread)
718e3744 1548{
d62a17ae 1549 int result;
1550 pthread_mutex_lock(&thread->mtx);
1551 {
1552 result = monotime_since(&thread->real, NULL)
1553 > (int64_t)thread->yield;
1554 }
1555 pthread_mutex_unlock(&thread->mtx);
1556 return result;
50596be0
DS
1557}
1558
d62a17ae 1559void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
50596be0 1560{
d62a17ae 1561 pthread_mutex_lock(&thread->mtx);
1562 {
1563 thread->yield = yield_time;
1564 }
1565 pthread_mutex_unlock(&thread->mtx);
718e3744 1566}
1567
d62a17ae 1568void thread_getrusage(RUSAGE_T *r)
db9c0df9 1569{
231db9a6
DS
1570#if defined RUSAGE_THREAD
1571#define FRR_RUSAGE RUSAGE_THREAD
1572#else
1573#define FRR_RUSAGE RUSAGE_SELF
1574#endif
d62a17ae 1575 monotime(&r->real);
231db9a6 1576 getrusage(FRR_RUSAGE, &(r->cpu));
db9c0df9
PJ
1577}
1578
fbcac826
QY
1579/*
1580 * Call a thread.
1581 *
1582 * This function will atomically update the thread's usage history. At present
1583 * this is the only spot where usage history is written. Nevertheless the code
1584 * has been written such that the introduction of writers in the future should
1585 * not need to update it provided the writers atomically perform only the
1586 * operations done here, i.e. updating the total and maximum times. In
1587 * particular, the maximum real and cpu times must be monotonically increasing
1588 * or this code is not correct.
1589 */
d62a17ae 1590void thread_call(struct thread *thread)
718e3744 1591{
fbcac826
QY
1592 _Atomic unsigned long realtime, cputime;
1593 unsigned long exp;
1594 unsigned long helper;
d62a17ae 1595 RUSAGE_T before, after;
cc8b13a0 1596
d62a17ae 1597 GETRUSAGE(&before);
1598 thread->real = before.real;
718e3744 1599
d62a17ae 1600 pthread_setspecific(thread_current, thread);
1601 (*thread->func)(thread);
1602 pthread_setspecific(thread_current, NULL);
718e3744 1603
d62a17ae 1604 GETRUSAGE(&after);
718e3744 1605
fbcac826
QY
1606 realtime = thread_consumed_time(&after, &before, &helper);
1607 cputime = helper;
1608
1609 /* update realtime */
1610 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1611 memory_order_seq_cst);
1612 exp = atomic_load_explicit(&thread->hist->real.max,
1613 memory_order_seq_cst);
1614 while (exp < realtime
1615 && !atomic_compare_exchange_weak_explicit(
1616 &thread->hist->real.max, &exp, realtime,
1617 memory_order_seq_cst, memory_order_seq_cst))
1618 ;
1619
1620 /* update cputime */
1621 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1622 memory_order_seq_cst);
1623 exp = atomic_load_explicit(&thread->hist->cpu.max,
1624 memory_order_seq_cst);
1625 while (exp < cputime
1626 && !atomic_compare_exchange_weak_explicit(
1627 &thread->hist->cpu.max, &exp, cputime,
1628 memory_order_seq_cst, memory_order_seq_cst))
1629 ;
1630
1631 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1632 memory_order_seq_cst);
1633 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1634 memory_order_seq_cst);
718e3744 1635
924b9229 1636#ifdef CONSUMED_TIME_CHECK
d62a17ae 1637 if (realtime > CONSUMED_TIME_CHECK) {
1638 /*
1639 * We have a CPU Hog on our hands.
1640 * Whinge about it now, so we're aware this is yet another task
1641 * to fix.
1642 */
9ef9495e 1643 flog_warn(
450971aa 1644 EC_LIB_SLOW_THREAD,
d62a17ae 1645 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1646 thread->funcname, (unsigned long)thread->func,
1647 realtime / 1000, cputime / 1000);
1648 }
924b9229 1649#endif /* CONSUMED_TIME_CHECK */
718e3744 1650}
1651
1652/* Execute thread */
d62a17ae 1653void funcname_thread_execute(struct thread_master *m,
1654 int (*func)(struct thread *), void *arg, int val,
1655 debugargdef)
718e3744 1656{
c4345fbf 1657 struct thread *thread;
718e3744 1658
c4345fbf
RZ
1659 /* Get or allocate new thread to execute. */
1660 pthread_mutex_lock(&m->mtx);
1661 {
1662 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
9c7753e4 1663
c4345fbf
RZ
1664 /* Set its event value. */
1665 pthread_mutex_lock(&thread->mtx);
1666 {
1667 thread->add_type = THREAD_EXECUTE;
1668 thread->u.val = val;
1669 thread->ref = &thread;
1670 }
1671 pthread_mutex_unlock(&thread->mtx);
1672 }
1673 pthread_mutex_unlock(&m->mtx);
f7c62e11 1674
c4345fbf
RZ
1675 /* Execute thread doing all accounting. */
1676 thread_call(thread);
9c7753e4 1677
c4345fbf
RZ
1678 /* Give back or free thread. */
1679 thread_add_unuse(m, thread);
718e3744 1680}