]> git.proxmox.com Git - mirror_frr.git/blame - lib/thread.c
Merge pull request #4155 from pguibert6WIND/bfd_increase_config
[mirror_frr.git] / lib / thread.c
CommitLineData
718e3744 1/* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
896014f4
DL
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
718e3744 19 */
20
21/* #define DEBUG */
22
23#include <zebra.h>
308d14ae 24#include <sys/resource.h>
718e3744 25
26#include "thread.h"
27#include "memory.h"
28#include "log.h"
e04ab74d 29#include "hash.h"
4becea72 30#include "pqueue.h"
e04ab74d 31#include "command.h"
05c447dd 32#include "sigevent.h"
3bf2673b 33#include "network.h"
bd74dc61 34#include "jhash.h"
fbcac826 35#include "frratomic.h"
9ef9495e 36#include "lib_errors.h"
d6be5fb9 37
d62a17ae 38DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
4a1ab8e4 39DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
a6f235f3 40DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
d62a17ae 41DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
4a1ab8e4 42
3b96b781
HT
43#if defined(__APPLE__)
44#include <mach/mach.h>
45#include <mach/mach_time.h>
46#endif
47
d62a17ae 48#define AWAKEN(m) \
49 do { \
50 static unsigned char wakebyte = 0x01; \
51 write(m->io_pipe[1], &wakebyte, 1); \
52 } while (0);
3bf2673b 53
62f44022 54/* control variable for initializer */
c17faa4b 55static pthread_once_t init_once = PTHREAD_ONCE_INIT;
e0bebc7c 56pthread_key_t thread_current;
6b0655a2 57
c17faa4b 58static pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
62f44022
QY
59static struct list *masters;
60
6655966d 61static void thread_free(struct thread_master *master, struct thread *thread);
6b0655a2 62
62f44022 63/* CLI start ---------------------------------------------------------------- */
d62a17ae 64static unsigned int cpu_record_hash_key(struct cpu_thread_history *a)
e04ab74d 65{
883cc51d 66 int size = sizeof(a->func);
bd74dc61
DS
67
68 return jhash(&a->func, size, 0);
e04ab74d 69}
70
74df8d6d 71static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
d62a17ae 72 const struct cpu_thread_history *b)
e04ab74d 73{
d62a17ae 74 return a->func == b->func;
e04ab74d 75}
76
d62a17ae 77static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
e04ab74d 78{
d62a17ae 79 struct cpu_thread_history *new;
80 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
81 new->func = a->func;
82 new->funcname = a->funcname;
83 return new;
e04ab74d 84}
85
d62a17ae 86static void cpu_record_hash_free(void *a)
228da428 87{
d62a17ae 88 struct cpu_thread_history *hist = a;
89
90 XFREE(MTYPE_THREAD_STATS, hist);
228da428
CC
91}
92
d62a17ae 93static void vty_out_cpu_thread_history(struct vty *vty,
94 struct cpu_thread_history *a)
e04ab74d 95{
fa0069c6
DS
96 vty_out(vty, "%5zu %10zu.%03lu %9zu %8zu %9zu %8lu %9lu",
97 a->total_active, a->cpu.total / 1000, a->cpu.total % 1000,
98 a->total_calls, a->cpu.total / a->total_calls, a->cpu.max,
d62a17ae 99 a->real.total / a->total_calls, a->real.max);
100 vty_out(vty, " %c%c%c%c%c %s\n",
101 a->types & (1 << THREAD_READ) ? 'R' : ' ',
102 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
103 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
104 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
105 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
e04ab74d 106}
107
e3b78da8 108static void cpu_record_hash_print(struct hash_bucket *bucket, void *args[])
e04ab74d 109{
d62a17ae 110 struct cpu_thread_history *totals = args[0];
fbcac826 111 struct cpu_thread_history copy;
d62a17ae 112 struct vty *vty = args[1];
fbcac826 113 uint8_t *filter = args[2];
d62a17ae 114
115 struct cpu_thread_history *a = bucket->data;
116
fbcac826
QY
117 copy.total_active =
118 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
119 copy.total_calls =
120 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
121 copy.cpu.total =
122 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
123 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
124 copy.real.total =
125 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
126 copy.real.max =
127 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
128 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
129 copy.funcname = a->funcname;
130
131 if (!(copy.types & *filter))
d62a17ae 132 return;
fbcac826
QY
133
134 vty_out_cpu_thread_history(vty, &copy);
135 totals->total_active += copy.total_active;
136 totals->total_calls += copy.total_calls;
137 totals->real.total += copy.real.total;
138 if (totals->real.max < copy.real.max)
139 totals->real.max = copy.real.max;
140 totals->cpu.total += copy.cpu.total;
141 if (totals->cpu.max < copy.cpu.max)
142 totals->cpu.max = copy.cpu.max;
e04ab74d 143}
144
fbcac826 145static void cpu_record_print(struct vty *vty, uint8_t filter)
e04ab74d 146{
d62a17ae 147 struct cpu_thread_history tmp;
148 void *args[3] = {&tmp, vty, &filter};
149 struct thread_master *m;
150 struct listnode *ln;
151
152 memset(&tmp, 0, sizeof tmp);
153 tmp.funcname = "TOTAL";
154 tmp.types = filter;
155
156 pthread_mutex_lock(&masters_mtx);
157 {
158 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
159 const char *name = m->name ? m->name : "main";
160
161 char underline[strlen(name) + 1];
162 memset(underline, '-', sizeof(underline));
4f113d60 163 underline[sizeof(underline) - 1] = '\0';
d62a17ae 164
165 vty_out(vty, "\n");
166 vty_out(vty, "Showing statistics for pthread %s\n",
167 name);
168 vty_out(vty, "-------------------------------%s\n",
169 underline);
170 vty_out(vty, "%21s %18s %18s\n", "",
171 "CPU (user+system):", "Real (wall-clock):");
172 vty_out(vty,
173 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
174 vty_out(vty, " Avg uSec Max uSecs");
175 vty_out(vty, " Type Thread\n");
176
177 if (m->cpu_record->count)
178 hash_iterate(
179 m->cpu_record,
e3b78da8 180 (void (*)(struct hash_bucket *,
d62a17ae 181 void *))cpu_record_hash_print,
182 args);
183 else
184 vty_out(vty, "No data to display yet.\n");
185
186 vty_out(vty, "\n");
187 }
188 }
189 pthread_mutex_unlock(&masters_mtx);
190
191 vty_out(vty, "\n");
192 vty_out(vty, "Total thread statistics\n");
193 vty_out(vty, "-------------------------\n");
194 vty_out(vty, "%21s %18s %18s\n", "",
195 "CPU (user+system):", "Real (wall-clock):");
196 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
197 vty_out(vty, " Avg uSec Max uSecs");
198 vty_out(vty, " Type Thread\n");
199
200 if (tmp.total_calls > 0)
201 vty_out_cpu_thread_history(vty, &tmp);
e04ab74d 202}
203
e3b78da8 204static void cpu_record_hash_clear(struct hash_bucket *bucket, void *args[])
e276eb82 205{
fbcac826 206 uint8_t *filter = args[0];
d62a17ae 207 struct hash *cpu_record = args[1];
208
209 struct cpu_thread_history *a = bucket->data;
62f44022 210
d62a17ae 211 if (!(a->types & *filter))
212 return;
f48f65d2 213
d62a17ae 214 hash_release(cpu_record, bucket->data);
e276eb82
PJ
215}
216
fbcac826 217static void cpu_record_clear(uint8_t filter)
e276eb82 218{
fbcac826 219 uint8_t *tmp = &filter;
d62a17ae 220 struct thread_master *m;
221 struct listnode *ln;
222
223 pthread_mutex_lock(&masters_mtx);
224 {
225 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
226 pthread_mutex_lock(&m->mtx);
227 {
228 void *args[2] = {tmp, m->cpu_record};
229 hash_iterate(
230 m->cpu_record,
e3b78da8 231 (void (*)(struct hash_bucket *,
d62a17ae 232 void *))cpu_record_hash_clear,
233 args);
234 }
235 pthread_mutex_unlock(&m->mtx);
236 }
237 }
238 pthread_mutex_unlock(&masters_mtx);
62f44022
QY
239}
240
fbcac826 241static uint8_t parse_filter(const char *filterstr)
62f44022 242{
d62a17ae 243 int i = 0;
244 int filter = 0;
245
246 while (filterstr[i] != '\0') {
247 switch (filterstr[i]) {
248 case 'r':
249 case 'R':
250 filter |= (1 << THREAD_READ);
251 break;
252 case 'w':
253 case 'W':
254 filter |= (1 << THREAD_WRITE);
255 break;
256 case 't':
257 case 'T':
258 filter |= (1 << THREAD_TIMER);
259 break;
260 case 'e':
261 case 'E':
262 filter |= (1 << THREAD_EVENT);
263 break;
264 case 'x':
265 case 'X':
266 filter |= (1 << THREAD_EXECUTE);
267 break;
268 default:
269 break;
270 }
271 ++i;
272 }
273 return filter;
62f44022
QY
274}
275
276DEFUN (show_thread_cpu,
277 show_thread_cpu_cmd,
278 "show thread cpu [FILTER]",
279 SHOW_STR
280 "Thread information\n"
281 "Thread CPU usage\n"
282 "Display filter (rwtexb)\n")
283{
fbcac826 284 uint8_t filter = (uint8_t)-1U;
d62a17ae 285 int idx = 0;
286
287 if (argv_find(argv, argc, "FILTER", &idx)) {
288 filter = parse_filter(argv[idx]->arg);
289 if (!filter) {
290 vty_out(vty,
291 "Invalid filter \"%s\" specified; must contain at least"
292 "one of 'RWTEXB'\n",
293 argv[idx]->arg);
294 return CMD_WARNING;
295 }
296 }
297
298 cpu_record_print(vty, filter);
299 return CMD_SUCCESS;
e276eb82
PJ
300}
301
8872626b
DS
302static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
303{
304 const char *name = m->name ? m->name : "main";
305 char underline[strlen(name) + 1];
306 uint32_t i;
307
308 memset(underline, '-', sizeof(underline));
309 underline[sizeof(underline) - 1] = '\0';
310
311 vty_out(vty, "\nShowing poll FD's for %s\n", name);
312 vty_out(vty, "----------------------%s\n", underline);
313 vty_out(vty, "Count: %u\n", (uint32_t)m->handler.pfdcount);
314 for (i = 0; i < m->handler.pfdcount; i++)
315 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\n", i,
316 m->handler.pfds[i].fd,
317 m->handler.pfds[i].events,
318 m->handler.pfds[i].revents);
319}
320
321DEFUN (show_thread_poll,
322 show_thread_poll_cmd,
323 "show thread poll",
324 SHOW_STR
325 "Thread information\n"
326 "Show poll FD's and information\n")
327{
328 struct listnode *node;
329 struct thread_master *m;
330
331 pthread_mutex_lock(&masters_mtx);
332 {
333 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
334 show_thread_poll_helper(vty, m);
335 }
336 }
337 pthread_mutex_unlock(&masters_mtx);
338
339 return CMD_SUCCESS;
340}
341
342
49d41a26
DS
343DEFUN (clear_thread_cpu,
344 clear_thread_cpu_cmd,
345 "clear thread cpu [FILTER]",
62f44022 346 "Clear stored data in all pthreads\n"
49d41a26
DS
347 "Thread information\n"
348 "Thread CPU usage\n"
349 "Display filter (rwtexb)\n")
e276eb82 350{
fbcac826 351 uint8_t filter = (uint8_t)-1U;
d62a17ae 352 int idx = 0;
353
354 if (argv_find(argv, argc, "FILTER", &idx)) {
355 filter = parse_filter(argv[idx]->arg);
356 if (!filter) {
357 vty_out(vty,
358 "Invalid filter \"%s\" specified; must contain at least"
359 "one of 'RWTEXB'\n",
360 argv[idx]->arg);
361 return CMD_WARNING;
362 }
363 }
364
365 cpu_record_clear(filter);
366 return CMD_SUCCESS;
e276eb82 367}
6b0655a2 368
d62a17ae 369void thread_cmd_init(void)
0b84f294 370{
d62a17ae 371 install_element(VIEW_NODE, &show_thread_cpu_cmd);
8872626b 372 install_element(VIEW_NODE, &show_thread_poll_cmd);
d62a17ae 373 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
0b84f294 374}
62f44022
QY
375/* CLI end ------------------------------------------------------------------ */
376
0b84f294 377
d62a17ae 378static int thread_timer_cmp(void *a, void *b)
4becea72 379{
d62a17ae 380 struct thread *thread_a = a;
381 struct thread *thread_b = b;
382
383 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, <))
384 return -1;
385 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, >))
386 return 1;
387 return 0;
4becea72
CF
388}
389
d62a17ae 390static void thread_timer_update(void *node, int actual_position)
4becea72 391{
d62a17ae 392 struct thread *thread = node;
4becea72 393
d62a17ae 394 thread->index = actual_position;
4becea72
CF
395}
396
d62a17ae 397static void cancelreq_del(void *cr)
63ccb9cb 398{
d62a17ae 399 XFREE(MTYPE_TMP, cr);
63ccb9cb
QY
400}
401
e0bebc7c 402/* initializer, only ever called once */
4d762f26 403static void initializer(void)
e0bebc7c 404{
d62a17ae 405 pthread_key_create(&thread_current, NULL);
e0bebc7c
QY
406}
407
d62a17ae 408struct thread_master *thread_master_create(const char *name)
718e3744 409{
d62a17ae 410 struct thread_master *rv;
411 struct rlimit limit;
412
413 pthread_once(&init_once, &initializer);
414
415 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
d62a17ae 416
417 /* Initialize master mutex */
418 pthread_mutex_init(&rv->mtx, NULL);
419 pthread_cond_init(&rv->cancel_cond, NULL);
420
421 /* Set name */
422 rv->name = name ? XSTRDUP(MTYPE_THREAD_MASTER, name) : NULL;
423
424 /* Initialize I/O task data structures */
425 getrlimit(RLIMIT_NOFILE, &limit);
426 rv->fd_limit = (int)limit.rlim_cur;
a6f235f3
DS
427 rv->read = XCALLOC(MTYPE_THREAD_POLL,
428 sizeof(struct thread *) * rv->fd_limit);
429
430 rv->write = XCALLOC(MTYPE_THREAD_POLL,
431 sizeof(struct thread *) * rv->fd_limit);
d62a17ae 432
bd74dc61 433 rv->cpu_record = hash_create_size(
996c9314 434 8, (unsigned int (*)(void *))cpu_record_hash_key,
74df8d6d 435 (bool (*)(const void *, const void *))cpu_record_hash_cmp,
bd74dc61 436 "Thread Hash");
d62a17ae 437
438
439 /* Initialize the timer queues */
440 rv->timer = pqueue_create();
441 rv->timer->cmp = thread_timer_cmp;
442 rv->timer->update = thread_timer_update;
443
444 /* Initialize thread_fetch() settings */
445 rv->spin = true;
446 rv->handle_signals = true;
447
448 /* Set pthread owner, should be updated by actual owner */
449 rv->owner = pthread_self();
450 rv->cancel_req = list_new();
451 rv->cancel_req->del = cancelreq_del;
452 rv->canceled = true;
453
454 /* Initialize pipe poker */
455 pipe(rv->io_pipe);
456 set_nonblocking(rv->io_pipe[0]);
457 set_nonblocking(rv->io_pipe[1]);
458
459 /* Initialize data structures for poll() */
460 rv->handler.pfdsize = rv->fd_limit;
461 rv->handler.pfdcount = 0;
462 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
463 sizeof(struct pollfd) * rv->handler.pfdsize);
464 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
465 sizeof(struct pollfd) * rv->handler.pfdsize);
466
eff09c66 467 /* add to list of threadmasters */
d62a17ae 468 pthread_mutex_lock(&masters_mtx);
469 {
eff09c66
QY
470 if (!masters)
471 masters = list_new();
472
d62a17ae 473 listnode_add(masters, rv);
474 }
475 pthread_mutex_unlock(&masters_mtx);
476
477 return rv;
718e3744 478}
479
d8a8a8de
QY
480void thread_master_set_name(struct thread_master *master, const char *name)
481{
482 pthread_mutex_lock(&master->mtx);
483 {
0a22ddfb 484 XFREE(MTYPE_THREAD_MASTER, master->name);
d8a8a8de
QY
485 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
486 }
487 pthread_mutex_unlock(&master->mtx);
488}
489
718e3744 490/* Add a new thread to the list. */
d62a17ae 491static void thread_list_add(struct thread_list *list, struct thread *thread)
718e3744 492{
d62a17ae 493 thread->next = NULL;
494 thread->prev = list->tail;
495 if (list->tail)
496 list->tail->next = thread;
497 else
498 list->head = thread;
499 list->tail = thread;
500 list->count++;
718e3744 501}
502
718e3744 503/* Delete a thread from the list. */
d62a17ae 504static struct thread *thread_list_delete(struct thread_list *list,
505 struct thread *thread)
718e3744 506{
d62a17ae 507 if (thread->next)
508 thread->next->prev = thread->prev;
509 else
510 list->tail = thread->prev;
511 if (thread->prev)
512 thread->prev->next = thread->next;
513 else
514 list->head = thread->next;
515 thread->next = thread->prev = NULL;
516 list->count--;
517 return thread;
718e3744 518}
519
495f0b13 520/* Thread list is empty or not. */
d62a17ae 521static int thread_empty(struct thread_list *list)
495f0b13 522{
d62a17ae 523 return list->head ? 0 : 1;
495f0b13
DS
524}
525
526/* Delete top of the list and return it. */
d62a17ae 527static struct thread *thread_trim_head(struct thread_list *list)
495f0b13 528{
d62a17ae 529 if (!thread_empty(list))
530 return thread_list_delete(list, list->head);
531 return NULL;
495f0b13
DS
532}
533
6ed04aa2
DS
534#define THREAD_UNUSED_DEPTH 10
535
718e3744 536/* Move thread to unuse list. */
d62a17ae 537static void thread_add_unuse(struct thread_master *m, struct thread *thread)
718e3744 538{
6655966d
RZ
539 pthread_mutex_t mtxc = thread->mtx;
540
d62a17ae 541 assert(m != NULL && thread != NULL);
542 assert(thread->next == NULL);
543 assert(thread->prev == NULL);
d62a17ae 544
d62a17ae 545 thread->hist->total_active--;
6ed04aa2
DS
546 memset(thread, 0, sizeof(struct thread));
547 thread->type = THREAD_UNUSED;
548
6655966d
RZ
549 /* Restore the thread mutex context. */
550 thread->mtx = mtxc;
551
552 if (m->unuse.count < THREAD_UNUSED_DEPTH) {
6ed04aa2 553 thread_list_add(&m->unuse, thread);
6655966d
RZ
554 return;
555 }
556
557 thread_free(m, thread);
718e3744 558}
559
560/* Free all unused thread. */
d62a17ae 561static void thread_list_free(struct thread_master *m, struct thread_list *list)
718e3744 562{
d62a17ae 563 struct thread *t;
564 struct thread *next;
565
566 for (t = list->head; t; t = next) {
567 next = t->next;
6655966d 568 thread_free(m, t);
d62a17ae 569 list->count--;
d62a17ae 570 }
718e3744 571}
572
d62a17ae 573static void thread_array_free(struct thread_master *m,
574 struct thread **thread_array)
308d14ae 575{
d62a17ae 576 struct thread *t;
577 int index;
578
579 for (index = 0; index < m->fd_limit; ++index) {
580 t = thread_array[index];
581 if (t) {
582 thread_array[index] = NULL;
6655966d 583 thread_free(m, t);
d62a17ae 584 }
585 }
a6f235f3 586 XFREE(MTYPE_THREAD_POLL, thread_array);
308d14ae
DV
587}
588
d62a17ae 589static void thread_queue_free(struct thread_master *m, struct pqueue *queue)
4becea72 590{
d62a17ae 591 int i;
4becea72 592
d62a17ae 593 for (i = 0; i < queue->size; i++)
6655966d 594 thread_free(m, queue->array[i]);
4becea72 595
d62a17ae 596 pqueue_delete(queue);
4becea72
CF
597}
598
495f0b13
DS
599/*
600 * thread_master_free_unused
601 *
602 * As threads are finished with they are put on the
603 * unuse list for later reuse.
604 * If we are shutting down, Free up unused threads
605 * So we can see if we forget to shut anything off
606 */
d62a17ae 607void thread_master_free_unused(struct thread_master *m)
495f0b13 608{
d62a17ae 609 pthread_mutex_lock(&m->mtx);
610 {
611 struct thread *t;
612 while ((t = thread_trim_head(&m->unuse)) != NULL) {
6655966d 613 thread_free(m, t);
d62a17ae 614 }
615 }
616 pthread_mutex_unlock(&m->mtx);
495f0b13
DS
617}
618
718e3744 619/* Stop thread scheduler. */
d62a17ae 620void thread_master_free(struct thread_master *m)
718e3744 621{
d62a17ae 622 pthread_mutex_lock(&masters_mtx);
623 {
624 listnode_delete(masters, m);
eff09c66 625 if (masters->count == 0) {
6a154c88 626 list_delete(&masters);
eff09c66 627 }
d62a17ae 628 }
629 pthread_mutex_unlock(&masters_mtx);
630
631 thread_array_free(m, m->read);
632 thread_array_free(m, m->write);
633 thread_queue_free(m, m->timer);
634 thread_list_free(m, &m->event);
635 thread_list_free(m, &m->ready);
636 thread_list_free(m, &m->unuse);
637 pthread_mutex_destroy(&m->mtx);
33844bbe 638 pthread_cond_destroy(&m->cancel_cond);
d62a17ae 639 close(m->io_pipe[0]);
640 close(m->io_pipe[1]);
6a154c88 641 list_delete(&m->cancel_req);
1a0a92ea 642 m->cancel_req = NULL;
d62a17ae 643
644 hash_clean(m->cpu_record, cpu_record_hash_free);
645 hash_free(m->cpu_record);
646 m->cpu_record = NULL;
647
0a22ddfb 648 XFREE(MTYPE_THREAD_MASTER, m->name);
d62a17ae 649 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
650 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
651 XFREE(MTYPE_THREAD_MASTER, m);
718e3744 652}
653
78ca0342
CF
654/* Return remain time in miliseconds. */
655unsigned long thread_timer_remain_msec(struct thread *thread)
718e3744 656{
d62a17ae 657 int64_t remain;
1189d95f 658
d62a17ae 659 pthread_mutex_lock(&thread->mtx);
660 {
78ca0342 661 remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
d62a17ae 662 }
663 pthread_mutex_unlock(&thread->mtx);
1189d95f 664
d62a17ae 665 return remain < 0 ? 0 : remain;
718e3744 666}
667
78ca0342
CF
668/* Return remain time in seconds. */
669unsigned long thread_timer_remain_second(struct thread *thread)
670{
671 return thread_timer_remain_msec(thread) / 1000LL;
672}
673
9c7753e4
DL
674#define debugargdef const char *funcname, const char *schedfrom, int fromln
675#define debugargpass funcname, schedfrom, fromln
e04ab74d 676
d62a17ae 677struct timeval thread_timer_remain(struct thread *thread)
6ac44687 678{
d62a17ae 679 struct timeval remain;
680 pthread_mutex_lock(&thread->mtx);
681 {
682 monotime_until(&thread->u.sands, &remain);
683 }
684 pthread_mutex_unlock(&thread->mtx);
685 return remain;
6ac44687
CF
686}
687
718e3744 688/* Get new thread. */
d7c0a89a 689static struct thread *thread_get(struct thread_master *m, uint8_t type,
d62a17ae 690 int (*func)(struct thread *), void *arg,
691 debugargdef)
718e3744 692{
d62a17ae 693 struct thread *thread = thread_trim_head(&m->unuse);
694 struct cpu_thread_history tmp;
695
696 if (!thread) {
697 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
698 /* mutex only needs to be initialized at struct creation. */
699 pthread_mutex_init(&thread->mtx, NULL);
700 m->alloc++;
701 }
702
703 thread->type = type;
704 thread->add_type = type;
705 thread->master = m;
706 thread->arg = arg;
707 thread->index = -1;
708 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
709 thread->ref = NULL;
710
711 /*
712 * So if the passed in funcname is not what we have
713 * stored that means the thread->hist needs to be
714 * updated. We keep the last one around in unused
715 * under the assumption that we are probably
716 * going to immediately allocate the same
717 * type of thread.
718 * This hopefully saves us some serious
719 * hash_get lookups.
720 */
721 if (thread->funcname != funcname || thread->func != func) {
722 tmp.func = func;
723 tmp.funcname = funcname;
724 thread->hist =
725 hash_get(m->cpu_record, &tmp,
726 (void *(*)(void *))cpu_record_hash_alloc);
727 }
728 thread->hist->total_active++;
729 thread->func = func;
730 thread->funcname = funcname;
731 thread->schedfrom = schedfrom;
732 thread->schedfrom_line = fromln;
733
734 return thread;
718e3744 735}
736
6655966d
RZ
737static void thread_free(struct thread_master *master, struct thread *thread)
738{
739 /* Update statistics. */
740 assert(master->alloc > 0);
741 master->alloc--;
742
743 /* Free allocated resources. */
744 pthread_mutex_destroy(&thread->mtx);
745 XFREE(MTYPE_THREAD, thread);
746}
747
d62a17ae 748static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
749 nfds_t count, const struct timeval *timer_wait)
209a72a6 750{
d62a17ae 751 /* If timer_wait is null here, that means poll() should block
752 * indefinitely,
753 * unless the thread_master has overriden it by setting
754 * ->selectpoll_timeout.
755 * If the value is positive, it specifies the maximum number of
756 * milliseconds
757 * to wait. If the timeout is -1, it specifies that we should never wait
758 * and
759 * always return immediately even if no event is detected. If the value
760 * is
761 * zero, the behavior is default. */
762 int timeout = -1;
763
764 /* number of file descriptors with events */
765 int num;
766
767 if (timer_wait != NULL
768 && m->selectpoll_timeout == 0) // use the default value
769 timeout = (timer_wait->tv_sec * 1000)
770 + (timer_wait->tv_usec / 1000);
771 else if (m->selectpoll_timeout > 0) // use the user's timeout
772 timeout = m->selectpoll_timeout;
773 else if (m->selectpoll_timeout
774 < 0) // effect a poll (return immediately)
775 timeout = 0;
776
777 /* add poll pipe poker */
778 assert(count + 1 < pfdsize);
779 pfds[count].fd = m->io_pipe[0];
780 pfds[count].events = POLLIN;
781 pfds[count].revents = 0x00;
782
783 num = poll(pfds, count + 1, timeout);
784
785 unsigned char trash[64];
786 if (num > 0 && pfds[count].revents != 0 && num--)
787 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
788 ;
789
790 return num;
209a72a6
DS
791}
792
718e3744 793/* Add new read thread. */
d62a17ae 794struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
795 int (*func)(struct thread *),
796 void *arg, int fd,
797 struct thread **t_ptr,
798 debugargdef)
718e3744 799{
d62a17ae 800 struct thread *thread = NULL;
801
9b864cd3 802 assert(fd >= 0 && fd < m->fd_limit);
d62a17ae 803 pthread_mutex_lock(&m->mtx);
804 {
805 if (t_ptr
806 && *t_ptr) // thread is already scheduled; don't reschedule
807 {
808 pthread_mutex_unlock(&m->mtx);
809 return NULL;
810 }
811
812 /* default to a new pollfd */
813 nfds_t queuepos = m->handler.pfdcount;
814
815 /* if we already have a pollfd for our file descriptor, find and
816 * use it */
817 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
818 if (m->handler.pfds[i].fd == fd) {
819 queuepos = i;
820 break;
821 }
822
823 /* make sure we have room for this fd + pipe poker fd */
824 assert(queuepos + 1 < m->handler.pfdsize);
825
826 thread = thread_get(m, dir, func, arg, debugargpass);
827
828 m->handler.pfds[queuepos].fd = fd;
829 m->handler.pfds[queuepos].events |=
830 (dir == THREAD_READ ? POLLIN : POLLOUT);
831
832 if (queuepos == m->handler.pfdcount)
833 m->handler.pfdcount++;
834
835 if (thread) {
836 pthread_mutex_lock(&thread->mtx);
837 {
838 thread->u.fd = fd;
839 if (dir == THREAD_READ)
840 m->read[thread->u.fd] = thread;
841 else
842 m->write[thread->u.fd] = thread;
843 }
844 pthread_mutex_unlock(&thread->mtx);
845
846 if (t_ptr) {
847 *t_ptr = thread;
848 thread->ref = t_ptr;
849 }
850 }
851
852 AWAKEN(m);
853 }
854 pthread_mutex_unlock(&m->mtx);
855
856 return thread;
718e3744 857}
858
56a94b36 859static struct thread *
d62a17ae 860funcname_thread_add_timer_timeval(struct thread_master *m,
861 int (*func)(struct thread *), int type,
862 void *arg, struct timeval *time_relative,
863 struct thread **t_ptr, debugargdef)
718e3744 864{
d62a17ae 865 struct thread *thread;
866 struct pqueue *queue;
867
868 assert(m != NULL);
869
870 assert(type == THREAD_TIMER);
871 assert(time_relative);
872
873 pthread_mutex_lock(&m->mtx);
874 {
875 if (t_ptr
876 && *t_ptr) // thread is already scheduled; don't reschedule
877 {
878 pthread_mutex_unlock(&m->mtx);
879 return NULL;
880 }
881
882 queue = m->timer;
883 thread = thread_get(m, type, func, arg, debugargpass);
884
885 pthread_mutex_lock(&thread->mtx);
886 {
887 monotime(&thread->u.sands);
888 timeradd(&thread->u.sands, time_relative,
889 &thread->u.sands);
890 pqueue_enqueue(thread, queue);
891 if (t_ptr) {
892 *t_ptr = thread;
893 thread->ref = t_ptr;
894 }
895 }
896 pthread_mutex_unlock(&thread->mtx);
897
898 AWAKEN(m);
899 }
900 pthread_mutex_unlock(&m->mtx);
901
902 return thread;
9e867fe6 903}
904
98c91ac6 905
906/* Add timer event thread. */
d62a17ae 907struct thread *funcname_thread_add_timer(struct thread_master *m,
908 int (*func)(struct thread *),
909 void *arg, long timer,
910 struct thread **t_ptr, debugargdef)
9e867fe6 911{
d62a17ae 912 struct timeval trel;
9e867fe6 913
d62a17ae 914 assert(m != NULL);
9e867fe6 915
d62a17ae 916 trel.tv_sec = timer;
917 trel.tv_usec = 0;
9e867fe6 918
d62a17ae 919 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
920 &trel, t_ptr, debugargpass);
98c91ac6 921}
9e867fe6 922
98c91ac6 923/* Add timer event thread with "millisecond" resolution */
d62a17ae 924struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
925 int (*func)(struct thread *),
926 void *arg, long timer,
927 struct thread **t_ptr,
928 debugargdef)
98c91ac6 929{
d62a17ae 930 struct timeval trel;
9e867fe6 931
d62a17ae 932 assert(m != NULL);
718e3744 933
d62a17ae 934 trel.tv_sec = timer / 1000;
935 trel.tv_usec = 1000 * (timer % 1000);
98c91ac6 936
d62a17ae 937 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
938 &trel, t_ptr, debugargpass);
a48b4e6d 939}
940
d03c4cbd 941/* Add timer event thread with "millisecond" resolution */
d62a17ae 942struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
943 int (*func)(struct thread *),
944 void *arg, struct timeval *tv,
945 struct thread **t_ptr, debugargdef)
d03c4cbd 946{
d62a17ae 947 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
948 t_ptr, debugargpass);
d03c4cbd
DL
949}
950
718e3744 951/* Add simple event thread. */
d62a17ae 952struct thread *funcname_thread_add_event(struct thread_master *m,
953 int (*func)(struct thread *),
954 void *arg, int val,
955 struct thread **t_ptr, debugargdef)
718e3744 956{
d62a17ae 957 struct thread *thread;
958
959 assert(m != NULL);
960
961 pthread_mutex_lock(&m->mtx);
962 {
963 if (t_ptr
964 && *t_ptr) // thread is already scheduled; don't reschedule
965 {
966 pthread_mutex_unlock(&m->mtx);
967 return NULL;
968 }
969
970 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
971 pthread_mutex_lock(&thread->mtx);
972 {
973 thread->u.val = val;
974 thread_list_add(&m->event, thread);
975 }
976 pthread_mutex_unlock(&thread->mtx);
977
978 if (t_ptr) {
979 *t_ptr = thread;
980 thread->ref = t_ptr;
981 }
982
983 AWAKEN(m);
984 }
985 pthread_mutex_unlock(&m->mtx);
986
987 return thread;
718e3744 988}
989
63ccb9cb
QY
990/* Thread cancellation ------------------------------------------------------ */
991
8797240e
QY
992/**
993 * NOT's out the .events field of pollfd corresponding to the given file
994 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
995 *
996 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
997 * implementation for details.
998 *
999 * @param master
1000 * @param fd
1001 * @param state the event to cancel. One or more (OR'd together) of the
1002 * following:
1003 * - POLLIN
1004 * - POLLOUT
1005 */
d62a17ae 1006static void thread_cancel_rw(struct thread_master *master, int fd, short state)
0a95a0d0 1007{
42d74538
QY
1008 bool found = false;
1009
d62a17ae 1010 /* Cancel POLLHUP too just in case some bozo set it */
1011 state |= POLLHUP;
1012
1013 /* find the index of corresponding pollfd */
1014 nfds_t i;
1015
1016 for (i = 0; i < master->handler.pfdcount; i++)
42d74538
QY
1017 if (master->handler.pfds[i].fd == fd) {
1018 found = true;
d62a17ae 1019 break;
42d74538
QY
1020 }
1021
1022 if (!found) {
1023 zlog_debug(
1024 "[!] Received cancellation request for nonexistent rw job");
1025 zlog_debug("[!] threadmaster: %s | fd: %d",
996c9314 1026 master->name ? master->name : "", fd);
42d74538
QY
1027 return;
1028 }
d62a17ae 1029
1030 /* NOT out event. */
1031 master->handler.pfds[i].events &= ~(state);
1032
1033 /* If all events are canceled, delete / resize the pollfd array. */
1034 if (master->handler.pfds[i].events == 0) {
1035 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
1036 (master->handler.pfdcount - i - 1)
1037 * sizeof(struct pollfd));
1038 master->handler.pfdcount--;
1039 }
1040
1041 /* If we have the same pollfd in the copy, perform the same operations,
1042 * otherwise return. */
1043 if (i >= master->handler.copycount)
1044 return;
1045
1046 master->handler.copy[i].events &= ~(state);
1047
1048 if (master->handler.copy[i].events == 0) {
1049 memmove(master->handler.copy + i, master->handler.copy + i + 1,
1050 (master->handler.copycount - i - 1)
1051 * sizeof(struct pollfd));
1052 master->handler.copycount--;
1053 }
0a95a0d0
DS
1054}
1055
1189d95f 1056/**
63ccb9cb 1057 * Process cancellation requests.
1189d95f 1058 *
63ccb9cb
QY
1059 * This may only be run from the pthread which owns the thread_master.
1060 *
1061 * @param master the thread master to process
1062 * @REQUIRE master->mtx
1189d95f 1063 */
d62a17ae 1064static void do_thread_cancel(struct thread_master *master)
718e3744 1065{
d62a17ae 1066 struct thread_list *list = NULL;
1067 struct pqueue *queue = NULL;
1068 struct thread **thread_array = NULL;
1069 struct thread *thread;
1070
1071 struct cancel_req *cr;
1072 struct listnode *ln;
1073 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1074 /* If this is an event object cancellation, linear search
1075 * through event
1076 * list deleting any events which have the specified argument.
1077 * We also
1078 * need to check every thread in the ready queue. */
1079 if (cr->eventobj) {
1080 struct thread *t;
1081 thread = master->event.head;
1082
1083 while (thread) {
1084 t = thread;
1085 thread = t->next;
1086
1087 if (t->arg == cr->eventobj) {
1088 thread_list_delete(&master->event, t);
1089 if (t->ref)
1090 *t->ref = NULL;
1091 thread_add_unuse(master, t);
1092 }
1093 }
1094
1095 thread = master->ready.head;
1096 while (thread) {
1097 t = thread;
1098 thread = t->next;
1099
1100 if (t->arg == cr->eventobj) {
1101 thread_list_delete(&master->ready, t);
1102 if (t->ref)
1103 *t->ref = NULL;
1104 thread_add_unuse(master, t);
1105 }
1106 }
1107 continue;
1108 }
1109
1110 /* The pointer varies depending on whether the cancellation
1111 * request was
1112 * made asynchronously or not. If it was, we need to check
1113 * whether the
1114 * thread even exists anymore before cancelling it. */
1115 thread = (cr->thread) ? cr->thread : *cr->threadref;
1116
1117 if (!thread)
1118 continue;
1119
1120 /* Determine the appropriate queue to cancel the thread from */
1121 switch (thread->type) {
1122 case THREAD_READ:
1123 thread_cancel_rw(master, thread->u.fd, POLLIN);
1124 thread_array = master->read;
1125 break;
1126 case THREAD_WRITE:
1127 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1128 thread_array = master->write;
1129 break;
1130 case THREAD_TIMER:
1131 queue = master->timer;
1132 break;
1133 case THREAD_EVENT:
1134 list = &master->event;
1135 break;
1136 case THREAD_READY:
1137 list = &master->ready;
1138 break;
1139 default:
1140 continue;
1141 break;
1142 }
1143
1144 if (queue) {
1145 assert(thread->index >= 0);
522f7f99
DS
1146 assert(thread == queue->array[thread->index]);
1147 pqueue_remove_at(thread->index, queue);
d62a17ae 1148 } else if (list) {
1149 thread_list_delete(list, thread);
1150 } else if (thread_array) {
1151 thread_array[thread->u.fd] = NULL;
1152 } else {
1153 assert(!"Thread should be either in queue or list or array!");
1154 }
1155
1156 if (thread->ref)
1157 *thread->ref = NULL;
1158
1159 thread_add_unuse(thread->master, thread);
1160 }
1161
1162 /* Delete and free all cancellation requests */
1163 list_delete_all_node(master->cancel_req);
1164
1165 /* Wake up any threads which may be blocked in thread_cancel_async() */
1166 master->canceled = true;
1167 pthread_cond_broadcast(&master->cancel_cond);
718e3744 1168}
1169
63ccb9cb
QY
1170/**
1171 * Cancel any events which have the specified argument.
1172 *
1173 * MT-Unsafe
1174 *
1175 * @param m the thread_master to cancel from
1176 * @param arg the argument passed when creating the event
1177 */
d62a17ae 1178void thread_cancel_event(struct thread_master *master, void *arg)
718e3744 1179{
d62a17ae 1180 assert(master->owner == pthread_self());
1181
1182 pthread_mutex_lock(&master->mtx);
1183 {
1184 struct cancel_req *cr =
1185 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1186 cr->eventobj = arg;
1187 listnode_add(master->cancel_req, cr);
1188 do_thread_cancel(master);
1189 }
1190 pthread_mutex_unlock(&master->mtx);
63ccb9cb 1191}
1189d95f 1192
63ccb9cb
QY
1193/**
1194 * Cancel a specific task.
1195 *
1196 * MT-Unsafe
1197 *
1198 * @param thread task to cancel
1199 */
d62a17ae 1200void thread_cancel(struct thread *thread)
63ccb9cb 1201{
6ed04aa2 1202 struct thread_master *master = thread->master;
d62a17ae 1203
6ed04aa2
DS
1204 assert(master->owner == pthread_self());
1205
1206 pthread_mutex_lock(&master->mtx);
d62a17ae 1207 {
1208 struct cancel_req *cr =
1209 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1210 cr->thread = thread;
6ed04aa2
DS
1211 listnode_add(master->cancel_req, cr);
1212 do_thread_cancel(master);
d62a17ae 1213 }
6ed04aa2 1214 pthread_mutex_unlock(&master->mtx);
63ccb9cb 1215}
1189d95f 1216
63ccb9cb
QY
1217/**
1218 * Asynchronous cancellation.
1219 *
8797240e
QY
1220 * Called with either a struct thread ** or void * to an event argument,
1221 * this function posts the correct cancellation request and blocks until it is
1222 * serviced.
63ccb9cb
QY
1223 *
1224 * If the thread is currently running, execution blocks until it completes.
1225 *
8797240e
QY
1226 * The last two parameters are mutually exclusive, i.e. if you pass one the
1227 * other must be NULL.
1228 *
1229 * When the cancellation procedure executes on the target thread_master, the
1230 * thread * provided is checked for nullity. If it is null, the thread is
1231 * assumed to no longer exist and the cancellation request is a no-op. Thus
1232 * users of this API must pass a back-reference when scheduling the original
1233 * task.
1234 *
63ccb9cb
QY
1235 * MT-Safe
1236 *
8797240e
QY
1237 * @param master the thread master with the relevant event / task
1238 * @param thread pointer to thread to cancel
1239 * @param eventobj the event
63ccb9cb 1240 */
d62a17ae 1241void thread_cancel_async(struct thread_master *master, struct thread **thread,
1242 void *eventobj)
63ccb9cb 1243{
d62a17ae 1244 assert(!(thread && eventobj) && (thread || eventobj));
1245 assert(master->owner != pthread_self());
1246
1247 pthread_mutex_lock(&master->mtx);
1248 {
1249 master->canceled = false;
1250
1251 if (thread) {
1252 struct cancel_req *cr =
1253 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1254 cr->threadref = thread;
1255 listnode_add(master->cancel_req, cr);
1256 } else if (eventobj) {
1257 struct cancel_req *cr =
1258 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1259 cr->eventobj = eventobj;
1260 listnode_add(master->cancel_req, cr);
1261 }
1262 AWAKEN(master);
1263
1264 while (!master->canceled)
1265 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1266 }
1267 pthread_mutex_unlock(&master->mtx);
718e3744 1268}
63ccb9cb 1269/* ------------------------------------------------------------------------- */
718e3744 1270
d62a17ae 1271static struct timeval *thread_timer_wait(struct pqueue *queue,
1272 struct timeval *timer_val)
718e3744 1273{
d62a17ae 1274 if (queue->size) {
1275 struct thread *next_timer = queue->array[0];
1276 monotime_until(&next_timer->u.sands, timer_val);
1277 return timer_val;
1278 }
1279 return NULL;
718e3744 1280}
718e3744 1281
d62a17ae 1282static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1283 struct thread *fetch)
718e3744 1284{
d62a17ae 1285 *fetch = *thread;
1286 thread_add_unuse(m, thread);
1287 return fetch;
718e3744 1288}
1289
d62a17ae 1290static int thread_process_io_helper(struct thread_master *m,
1291 struct thread *thread, short state, int pos)
5d4ccd4e 1292{
d62a17ae 1293 struct thread **thread_array;
1294
1295 if (!thread)
1296 return 0;
1297
1298 if (thread->type == THREAD_READ)
1299 thread_array = m->read;
1300 else
1301 thread_array = m->write;
1302
1303 thread_array[thread->u.fd] = NULL;
1304 thread_list_add(&m->ready, thread);
1305 thread->type = THREAD_READY;
1306 /* if another pthread scheduled this file descriptor for the event we're
1307 * responding to, no problem; we're getting to it now */
1308 thread->master->handler.pfds[pos].events &= ~(state);
1309 return 1;
5d4ccd4e
DS
1310}
1311
8797240e
QY
1312/**
1313 * Process I/O events.
1314 *
1315 * Walks through file descriptor array looking for those pollfds whose .revents
1316 * field has something interesting. Deletes any invalid file descriptors.
1317 *
1318 * @param m the thread master
1319 * @param num the number of active file descriptors (return value of poll())
1320 */
d62a17ae 1321static void thread_process_io(struct thread_master *m, unsigned int num)
0a95a0d0 1322{
d62a17ae 1323 unsigned int ready = 0;
1324 struct pollfd *pfds = m->handler.copy;
1325
1326 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1327 /* no event for current fd? immediately continue */
1328 if (pfds[i].revents == 0)
1329 continue;
1330
1331 ready++;
1332
1333 /* Unless someone has called thread_cancel from another pthread,
1334 * the only
1335 * thing that could have changed in m->handler.pfds while we
1336 * were
1337 * asleep is the .events field in a given pollfd. Barring
1338 * thread_cancel()
1339 * that value should be a superset of the values we have in our
1340 * copy, so
1341 * there's no need to update it. Similarily, barring deletion,
1342 * the fd
1343 * should still be a valid index into the master's pfds. */
1344 if (pfds[i].revents & (POLLIN | POLLHUP))
1345 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
1346 i);
1347 if (pfds[i].revents & POLLOUT)
1348 thread_process_io_helper(m, m->write[pfds[i].fd],
1349 POLLOUT, i);
1350
1351 /* if one of our file descriptors is garbage, remove the same
1352 * from
1353 * both pfds + update sizes and index */
1354 if (pfds[i].revents & POLLNVAL) {
1355 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1356 (m->handler.pfdcount - i - 1)
1357 * sizeof(struct pollfd));
1358 m->handler.pfdcount--;
1359
1360 memmove(pfds + i, pfds + i + 1,
1361 (m->handler.copycount - i - 1)
1362 * sizeof(struct pollfd));
1363 m->handler.copycount--;
1364
1365 i--;
1366 }
1367 }
718e3744 1368}
1369
8b70d0b0 1370/* Add all timers that have popped to the ready list. */
d62a17ae 1371static unsigned int thread_process_timers(struct pqueue *queue,
1372 struct timeval *timenow)
a48b4e6d 1373{
d62a17ae 1374 struct thread *thread;
1375 unsigned int ready = 0;
1376
1377 while (queue->size) {
1378 thread = queue->array[0];
1379 if (timercmp(timenow, &thread->u.sands, <))
1380 return ready;
1381 pqueue_dequeue(queue);
1382 thread->type = THREAD_READY;
1383 thread_list_add(&thread->master->ready, thread);
1384 ready++;
1385 }
1386 return ready;
a48b4e6d 1387}
1388
2613abe6 1389/* process a list en masse, e.g. for event thread lists */
d62a17ae 1390static unsigned int thread_process(struct thread_list *list)
2613abe6 1391{
d62a17ae 1392 struct thread *thread;
1393 struct thread *next;
1394 unsigned int ready = 0;
1395
1396 for (thread = list->head; thread; thread = next) {
1397 next = thread->next;
1398 thread_list_delete(list, thread);
1399 thread->type = THREAD_READY;
1400 thread_list_add(&thread->master->ready, thread);
1401 ready++;
1402 }
1403 return ready;
2613abe6
PJ
1404}
1405
1406
718e3744 1407/* Fetch next ready thread. */
d62a17ae 1408struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
718e3744 1409{
d62a17ae 1410 struct thread *thread = NULL;
1411 struct timeval now;
1412 struct timeval zerotime = {0, 0};
1413 struct timeval tv;
1414 struct timeval *tw = NULL;
1415
1416 int num = 0;
1417
1418 do {
1419 /* Handle signals if any */
1420 if (m->handle_signals)
1421 quagga_sigevent_process();
1422
1423 pthread_mutex_lock(&m->mtx);
1424
1425 /* Process any pending cancellation requests */
1426 do_thread_cancel(m);
1427
e3c9529e
QY
1428 /*
1429 * Attempt to flush ready queue before going into poll().
1430 * This is performance-critical. Think twice before modifying.
1431 */
1432 if ((thread = thread_trim_head(&m->ready))) {
1433 fetch = thread_run(m, thread, fetch);
1434 if (fetch->ref)
1435 *fetch->ref = NULL;
1436 pthread_mutex_unlock(&m->mtx);
1437 break;
1438 }
1439
1440 /* otherwise, tick through scheduling sequence */
1441
bca37d17
QY
1442 /*
1443 * Post events to ready queue. This must come before the
1444 * following block since events should occur immediately
1445 */
d62a17ae 1446 thread_process(&m->event);
1447
bca37d17
QY
1448 /*
1449 * If there are no tasks on the ready queue, we will poll()
1450 * until a timer expires or we receive I/O, whichever comes
1451 * first. The strategy for doing this is:
d62a17ae 1452 *
1453 * - If there are events pending, set the poll() timeout to zero
1454 * - If there are no events pending, but there are timers
1455 * pending, set the
1456 * timeout to the smallest remaining time on any timer
1457 * - If there are neither timers nor events pending, but there
1458 * are file
1459 * descriptors pending, block indefinitely in poll()
1460 * - If nothing is pending, it's time for the application to die
1461 *
1462 * In every case except the last, we need to hit poll() at least
bca37d17
QY
1463 * once per loop to avoid starvation by events
1464 */
d62a17ae 1465 if (m->ready.count == 0)
1466 tw = thread_timer_wait(m->timer, &tv);
1467
1468 if (m->ready.count != 0 || (tw && !timercmp(tw, &zerotime, >)))
1469 tw = &zerotime;
1470
1471 if (!tw && m->handler.pfdcount == 0) { /* die */
1472 pthread_mutex_unlock(&m->mtx);
1473 fetch = NULL;
1474 break;
1475 }
1476
bca37d17
QY
1477 /*
1478 * Copy pollfd array + # active pollfds in it. Not necessary to
1479 * copy the array size as this is fixed.
1480 */
d62a17ae 1481 m->handler.copycount = m->handler.pfdcount;
1482 memcpy(m->handler.copy, m->handler.pfds,
1483 m->handler.copycount * sizeof(struct pollfd));
1484
e3c9529e
QY
1485 pthread_mutex_unlock(&m->mtx);
1486 {
1487 num = fd_poll(m, m->handler.copy, m->handler.pfdsize,
1488 m->handler.copycount, tw);
1489 }
1490 pthread_mutex_lock(&m->mtx);
d764d2cc 1491
e3c9529e
QY
1492 /* Handle any errors received in poll() */
1493 if (num < 0) {
1494 if (errno == EINTR) {
d62a17ae 1495 pthread_mutex_unlock(&m->mtx);
e3c9529e
QY
1496 /* loop around to signal handler */
1497 continue;
d62a17ae 1498 }
1499
e3c9529e 1500 /* else die */
450971aa 1501 flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
9ef9495e 1502 safe_strerror(errno));
e3c9529e
QY
1503 pthread_mutex_unlock(&m->mtx);
1504 fetch = NULL;
1505 break;
bca37d17 1506 }
d62a17ae 1507
1508 /* Post timers to ready queue. */
1509 monotime(&now);
1510 thread_process_timers(m->timer, &now);
1511
1512 /* Post I/O to ready queue. */
1513 if (num > 0)
1514 thread_process_io(m, num);
1515
d62a17ae 1516 pthread_mutex_unlock(&m->mtx);
1517
1518 } while (!thread && m->spin);
1519
1520 return fetch;
718e3744 1521}
1522
d62a17ae 1523static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
62f44022 1524{
d62a17ae 1525 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1526 + (a.tv_usec - b.tv_usec));
62f44022
QY
1527}
1528
d62a17ae 1529unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1530 unsigned long *cputime)
718e3744 1531{
d62a17ae 1532 /* This is 'user + sys' time. */
1533 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1534 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1535 return timeval_elapsed(now->real, start->real);
8b70d0b0 1536}
1537
50596be0
DS
1538/* We should aim to yield after yield milliseconds, which defaults
1539 to THREAD_YIELD_TIME_SLOT .
8b70d0b0 1540 Note: we are using real (wall clock) time for this calculation.
1541 It could be argued that CPU time may make more sense in certain
1542 contexts. The things to consider are whether the thread may have
1543 blocked (in which case wall time increases, but CPU time does not),
1544 or whether the system is heavily loaded with other processes competing
d62a17ae 1545 for CPU time. On balance, wall clock time seems to make sense.
8b70d0b0 1546 Plus it has the added benefit that gettimeofday should be faster
1547 than calling getrusage. */
d62a17ae 1548int thread_should_yield(struct thread *thread)
718e3744 1549{
d62a17ae 1550 int result;
1551 pthread_mutex_lock(&thread->mtx);
1552 {
1553 result = monotime_since(&thread->real, NULL)
1554 > (int64_t)thread->yield;
1555 }
1556 pthread_mutex_unlock(&thread->mtx);
1557 return result;
50596be0
DS
1558}
1559
d62a17ae 1560void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
50596be0 1561{
d62a17ae 1562 pthread_mutex_lock(&thread->mtx);
1563 {
1564 thread->yield = yield_time;
1565 }
1566 pthread_mutex_unlock(&thread->mtx);
718e3744 1567}
1568
d62a17ae 1569void thread_getrusage(RUSAGE_T *r)
db9c0df9 1570{
231db9a6
DS
1571#if defined RUSAGE_THREAD
1572#define FRR_RUSAGE RUSAGE_THREAD
1573#else
1574#define FRR_RUSAGE RUSAGE_SELF
1575#endif
d62a17ae 1576 monotime(&r->real);
231db9a6 1577 getrusage(FRR_RUSAGE, &(r->cpu));
db9c0df9
PJ
1578}
1579
fbcac826
QY
1580/*
1581 * Call a thread.
1582 *
1583 * This function will atomically update the thread's usage history. At present
1584 * this is the only spot where usage history is written. Nevertheless the code
1585 * has been written such that the introduction of writers in the future should
1586 * not need to update it provided the writers atomically perform only the
1587 * operations done here, i.e. updating the total and maximum times. In
1588 * particular, the maximum real and cpu times must be monotonically increasing
1589 * or this code is not correct.
1590 */
d62a17ae 1591void thread_call(struct thread *thread)
718e3744 1592{
fbcac826
QY
1593 _Atomic unsigned long realtime, cputime;
1594 unsigned long exp;
1595 unsigned long helper;
d62a17ae 1596 RUSAGE_T before, after;
cc8b13a0 1597
d62a17ae 1598 GETRUSAGE(&before);
1599 thread->real = before.real;
718e3744 1600
d62a17ae 1601 pthread_setspecific(thread_current, thread);
1602 (*thread->func)(thread);
1603 pthread_setspecific(thread_current, NULL);
718e3744 1604
d62a17ae 1605 GETRUSAGE(&after);
718e3744 1606
fbcac826
QY
1607 realtime = thread_consumed_time(&after, &before, &helper);
1608 cputime = helper;
1609
1610 /* update realtime */
1611 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1612 memory_order_seq_cst);
1613 exp = atomic_load_explicit(&thread->hist->real.max,
1614 memory_order_seq_cst);
1615 while (exp < realtime
1616 && !atomic_compare_exchange_weak_explicit(
1617 &thread->hist->real.max, &exp, realtime,
1618 memory_order_seq_cst, memory_order_seq_cst))
1619 ;
1620
1621 /* update cputime */
1622 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1623 memory_order_seq_cst);
1624 exp = atomic_load_explicit(&thread->hist->cpu.max,
1625 memory_order_seq_cst);
1626 while (exp < cputime
1627 && !atomic_compare_exchange_weak_explicit(
1628 &thread->hist->cpu.max, &exp, cputime,
1629 memory_order_seq_cst, memory_order_seq_cst))
1630 ;
1631
1632 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1633 memory_order_seq_cst);
1634 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1635 memory_order_seq_cst);
718e3744 1636
924b9229 1637#ifdef CONSUMED_TIME_CHECK
d62a17ae 1638 if (realtime > CONSUMED_TIME_CHECK) {
1639 /*
1640 * We have a CPU Hog on our hands.
1641 * Whinge about it now, so we're aware this is yet another task
1642 * to fix.
1643 */
9ef9495e 1644 flog_warn(
450971aa 1645 EC_LIB_SLOW_THREAD,
d62a17ae 1646 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1647 thread->funcname, (unsigned long)thread->func,
1648 realtime / 1000, cputime / 1000);
1649 }
924b9229 1650#endif /* CONSUMED_TIME_CHECK */
718e3744 1651}
1652
1653/* Execute thread */
d62a17ae 1654void funcname_thread_execute(struct thread_master *m,
1655 int (*func)(struct thread *), void *arg, int val,
1656 debugargdef)
718e3744 1657{
c4345fbf 1658 struct thread *thread;
718e3744 1659
c4345fbf
RZ
1660 /* Get or allocate new thread to execute. */
1661 pthread_mutex_lock(&m->mtx);
1662 {
1663 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
9c7753e4 1664
c4345fbf
RZ
1665 /* Set its event value. */
1666 pthread_mutex_lock(&thread->mtx);
1667 {
1668 thread->add_type = THREAD_EXECUTE;
1669 thread->u.val = val;
1670 thread->ref = &thread;
1671 }
1672 pthread_mutex_unlock(&thread->mtx);
1673 }
1674 pthread_mutex_unlock(&m->mtx);
f7c62e11 1675
c4345fbf
RZ
1676 /* Execute thread doing all accounting. */
1677 thread_call(thread);
9c7753e4 1678
c4345fbf
RZ
1679 /* Give back or free thread. */
1680 thread_add_unuse(m, thread);
718e3744 1681}