]> git.proxmox.com Git - mirror_frr.git/blame - lib/thread.c
lib: remove deprecated list_delete()/list_free()
[mirror_frr.git] / lib / thread.c
CommitLineData
718e3744 1/* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
896014f4
DL
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
718e3744 19 */
20
21/* #define DEBUG */
22
23#include <zebra.h>
308d14ae 24#include <sys/resource.h>
718e3744 25
26#include "thread.h"
27#include "memory.h"
28#include "log.h"
e04ab74d 29#include "hash.h"
4becea72 30#include "pqueue.h"
e04ab74d 31#include "command.h"
05c447dd 32#include "sigevent.h"
3bf2673b 33#include "network.h"
bd74dc61 34#include "jhash.h"
fbcac826 35#include "frratomic.h"
9ef9495e 36#include "lib_errors.h"
d6be5fb9 37
d62a17ae 38DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
4a1ab8e4 39DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
a6f235f3 40DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
d62a17ae 41DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
4a1ab8e4 42
3b96b781
HT
43#if defined(__APPLE__)
44#include <mach/mach.h>
45#include <mach/mach_time.h>
46#endif
47
d62a17ae 48#define AWAKEN(m) \
49 do { \
50 static unsigned char wakebyte = 0x01; \
51 write(m->io_pipe[1], &wakebyte, 1); \
52 } while (0);
3bf2673b 53
62f44022 54/* control variable for initializer */
e0bebc7c 55pthread_once_t init_once = PTHREAD_ONCE_INIT;
e0bebc7c 56pthread_key_t thread_current;
6b0655a2 57
62f44022
QY
58pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
59static struct list *masters;
60
6b0655a2 61
62f44022 62/* CLI start ---------------------------------------------------------------- */
d62a17ae 63static unsigned int cpu_record_hash_key(struct cpu_thread_history *a)
e04ab74d 64{
883cc51d 65 int size = sizeof(a->func);
bd74dc61
DS
66
67 return jhash(&a->func, size, 0);
e04ab74d 68}
69
d62a17ae 70static int cpu_record_hash_cmp(const struct cpu_thread_history *a,
71 const struct cpu_thread_history *b)
e04ab74d 72{
d62a17ae 73 return a->func == b->func;
e04ab74d 74}
75
d62a17ae 76static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
e04ab74d 77{
d62a17ae 78 struct cpu_thread_history *new;
79 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
80 new->func = a->func;
81 new->funcname = a->funcname;
82 return new;
e04ab74d 83}
84
d62a17ae 85static void cpu_record_hash_free(void *a)
228da428 86{
d62a17ae 87 struct cpu_thread_history *hist = a;
88
89 XFREE(MTYPE_THREAD_STATS, hist);
228da428
CC
90}
91
d62a17ae 92static void vty_out_cpu_thread_history(struct vty *vty,
93 struct cpu_thread_history *a)
e04ab74d 94{
d1667f53 95 vty_out(vty, "%5d %10lu.%03lu %9u %8lu %9lu %8lu %9lu", a->total_active,
d62a17ae 96 a->cpu.total / 1000, a->cpu.total % 1000, a->total_calls,
97 a->cpu.total / a->total_calls, a->cpu.max,
98 a->real.total / a->total_calls, a->real.max);
99 vty_out(vty, " %c%c%c%c%c %s\n",
100 a->types & (1 << THREAD_READ) ? 'R' : ' ',
101 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
102 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
103 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
104 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
e04ab74d 105}
106
d62a17ae 107static void cpu_record_hash_print(struct hash_backet *bucket, void *args[])
e04ab74d 108{
d62a17ae 109 struct cpu_thread_history *totals = args[0];
fbcac826 110 struct cpu_thread_history copy;
d62a17ae 111 struct vty *vty = args[1];
fbcac826 112 uint8_t *filter = args[2];
d62a17ae 113
114 struct cpu_thread_history *a = bucket->data;
115
fbcac826
QY
116 copy.total_active =
117 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
118 copy.total_calls =
119 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
120 copy.cpu.total =
121 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
122 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
123 copy.real.total =
124 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
125 copy.real.max =
126 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
127 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
128 copy.funcname = a->funcname;
129
130 if (!(copy.types & *filter))
d62a17ae 131 return;
fbcac826
QY
132
133 vty_out_cpu_thread_history(vty, &copy);
134 totals->total_active += copy.total_active;
135 totals->total_calls += copy.total_calls;
136 totals->real.total += copy.real.total;
137 if (totals->real.max < copy.real.max)
138 totals->real.max = copy.real.max;
139 totals->cpu.total += copy.cpu.total;
140 if (totals->cpu.max < copy.cpu.max)
141 totals->cpu.max = copy.cpu.max;
e04ab74d 142}
143
fbcac826 144static void cpu_record_print(struct vty *vty, uint8_t filter)
e04ab74d 145{
d62a17ae 146 struct cpu_thread_history tmp;
147 void *args[3] = {&tmp, vty, &filter};
148 struct thread_master *m;
149 struct listnode *ln;
150
151 memset(&tmp, 0, sizeof tmp);
152 tmp.funcname = "TOTAL";
153 tmp.types = filter;
154
155 pthread_mutex_lock(&masters_mtx);
156 {
157 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
158 const char *name = m->name ? m->name : "main";
159
160 char underline[strlen(name) + 1];
161 memset(underline, '-', sizeof(underline));
4f113d60 162 underline[sizeof(underline) - 1] = '\0';
d62a17ae 163
164 vty_out(vty, "\n");
165 vty_out(vty, "Showing statistics for pthread %s\n",
166 name);
167 vty_out(vty, "-------------------------------%s\n",
168 underline);
169 vty_out(vty, "%21s %18s %18s\n", "",
170 "CPU (user+system):", "Real (wall-clock):");
171 vty_out(vty,
172 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
173 vty_out(vty, " Avg uSec Max uSecs");
174 vty_out(vty, " Type Thread\n");
175
176 if (m->cpu_record->count)
177 hash_iterate(
178 m->cpu_record,
179 (void (*)(struct hash_backet *,
180 void *))cpu_record_hash_print,
181 args);
182 else
183 vty_out(vty, "No data to display yet.\n");
184
185 vty_out(vty, "\n");
186 }
187 }
188 pthread_mutex_unlock(&masters_mtx);
189
190 vty_out(vty, "\n");
191 vty_out(vty, "Total thread statistics\n");
192 vty_out(vty, "-------------------------\n");
193 vty_out(vty, "%21s %18s %18s\n", "",
194 "CPU (user+system):", "Real (wall-clock):");
195 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
196 vty_out(vty, " Avg uSec Max uSecs");
197 vty_out(vty, " Type Thread\n");
198
199 if (tmp.total_calls > 0)
200 vty_out_cpu_thread_history(vty, &tmp);
e04ab74d 201}
202
d62a17ae 203static void cpu_record_hash_clear(struct hash_backet *bucket, void *args[])
e276eb82 204{
fbcac826 205 uint8_t *filter = args[0];
d62a17ae 206 struct hash *cpu_record = args[1];
207
208 struct cpu_thread_history *a = bucket->data;
62f44022 209
d62a17ae 210 if (!(a->types & *filter))
211 return;
f48f65d2 212
d62a17ae 213 hash_release(cpu_record, bucket->data);
e276eb82
PJ
214}
215
fbcac826 216static void cpu_record_clear(uint8_t filter)
e276eb82 217{
fbcac826 218 uint8_t *tmp = &filter;
d62a17ae 219 struct thread_master *m;
220 struct listnode *ln;
221
222 pthread_mutex_lock(&masters_mtx);
223 {
224 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
225 pthread_mutex_lock(&m->mtx);
226 {
227 void *args[2] = {tmp, m->cpu_record};
228 hash_iterate(
229 m->cpu_record,
230 (void (*)(struct hash_backet *,
231 void *))cpu_record_hash_clear,
232 args);
233 }
234 pthread_mutex_unlock(&m->mtx);
235 }
236 }
237 pthread_mutex_unlock(&masters_mtx);
62f44022
QY
238}
239
fbcac826 240static uint8_t parse_filter(const char *filterstr)
62f44022 241{
d62a17ae 242 int i = 0;
243 int filter = 0;
244
245 while (filterstr[i] != '\0') {
246 switch (filterstr[i]) {
247 case 'r':
248 case 'R':
249 filter |= (1 << THREAD_READ);
250 break;
251 case 'w':
252 case 'W':
253 filter |= (1 << THREAD_WRITE);
254 break;
255 case 't':
256 case 'T':
257 filter |= (1 << THREAD_TIMER);
258 break;
259 case 'e':
260 case 'E':
261 filter |= (1 << THREAD_EVENT);
262 break;
263 case 'x':
264 case 'X':
265 filter |= (1 << THREAD_EXECUTE);
266 break;
267 default:
268 break;
269 }
270 ++i;
271 }
272 return filter;
62f44022
QY
273}
274
275DEFUN (show_thread_cpu,
276 show_thread_cpu_cmd,
277 "show thread cpu [FILTER]",
278 SHOW_STR
279 "Thread information\n"
280 "Thread CPU usage\n"
281 "Display filter (rwtexb)\n")
282{
fbcac826 283 uint8_t filter = (uint8_t)-1U;
d62a17ae 284 int idx = 0;
285
286 if (argv_find(argv, argc, "FILTER", &idx)) {
287 filter = parse_filter(argv[idx]->arg);
288 if (!filter) {
289 vty_out(vty,
290 "Invalid filter \"%s\" specified; must contain at least"
291 "one of 'RWTEXB'\n",
292 argv[idx]->arg);
293 return CMD_WARNING;
294 }
295 }
296
297 cpu_record_print(vty, filter);
298 return CMD_SUCCESS;
e276eb82
PJ
299}
300
8872626b
DS
301static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
302{
303 const char *name = m->name ? m->name : "main";
304 char underline[strlen(name) + 1];
305 uint32_t i;
306
307 memset(underline, '-', sizeof(underline));
308 underline[sizeof(underline) - 1] = '\0';
309
310 vty_out(vty, "\nShowing poll FD's for %s\n", name);
311 vty_out(vty, "----------------------%s\n", underline);
312 vty_out(vty, "Count: %u\n", (uint32_t)m->handler.pfdcount);
313 for (i = 0; i < m->handler.pfdcount; i++)
314 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\n", i,
315 m->handler.pfds[i].fd,
316 m->handler.pfds[i].events,
317 m->handler.pfds[i].revents);
318}
319
320DEFUN (show_thread_poll,
321 show_thread_poll_cmd,
322 "show thread poll",
323 SHOW_STR
324 "Thread information\n"
325 "Show poll FD's and information\n")
326{
327 struct listnode *node;
328 struct thread_master *m;
329
330 pthread_mutex_lock(&masters_mtx);
331 {
332 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
333 show_thread_poll_helper(vty, m);
334 }
335 }
336 pthread_mutex_unlock(&masters_mtx);
337
338 return CMD_SUCCESS;
339}
340
341
49d41a26
DS
342DEFUN (clear_thread_cpu,
343 clear_thread_cpu_cmd,
344 "clear thread cpu [FILTER]",
62f44022 345 "Clear stored data in all pthreads\n"
49d41a26
DS
346 "Thread information\n"
347 "Thread CPU usage\n"
348 "Display filter (rwtexb)\n")
e276eb82 349{
fbcac826 350 uint8_t filter = (uint8_t)-1U;
d62a17ae 351 int idx = 0;
352
353 if (argv_find(argv, argc, "FILTER", &idx)) {
354 filter = parse_filter(argv[idx]->arg);
355 if (!filter) {
356 vty_out(vty,
357 "Invalid filter \"%s\" specified; must contain at least"
358 "one of 'RWTEXB'\n",
359 argv[idx]->arg);
360 return CMD_WARNING;
361 }
362 }
363
364 cpu_record_clear(filter);
365 return CMD_SUCCESS;
e276eb82 366}
6b0655a2 367
d62a17ae 368void thread_cmd_init(void)
0b84f294 369{
d62a17ae 370 install_element(VIEW_NODE, &show_thread_cpu_cmd);
8872626b 371 install_element(VIEW_NODE, &show_thread_poll_cmd);
d62a17ae 372 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
0b84f294 373}
62f44022
QY
374/* CLI end ------------------------------------------------------------------ */
375
0b84f294 376
d62a17ae 377static int thread_timer_cmp(void *a, void *b)
4becea72 378{
d62a17ae 379 struct thread *thread_a = a;
380 struct thread *thread_b = b;
381
382 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, <))
383 return -1;
384 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, >))
385 return 1;
386 return 0;
4becea72
CF
387}
388
d62a17ae 389static void thread_timer_update(void *node, int actual_position)
4becea72 390{
d62a17ae 391 struct thread *thread = node;
4becea72 392
d62a17ae 393 thread->index = actual_position;
4becea72
CF
394}
395
d62a17ae 396static void cancelreq_del(void *cr)
63ccb9cb 397{
d62a17ae 398 XFREE(MTYPE_TMP, cr);
63ccb9cb
QY
399}
400
e0bebc7c 401/* initializer, only ever called once */
d62a17ae 402static void initializer()
e0bebc7c 403{
d62a17ae 404 pthread_key_create(&thread_current, NULL);
e0bebc7c
QY
405}
406
d62a17ae 407struct thread_master *thread_master_create(const char *name)
718e3744 408{
d62a17ae 409 struct thread_master *rv;
410 struct rlimit limit;
411
412 pthread_once(&init_once, &initializer);
413
414 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
415 if (rv == NULL)
416 return NULL;
417
418 /* Initialize master mutex */
419 pthread_mutex_init(&rv->mtx, NULL);
420 pthread_cond_init(&rv->cancel_cond, NULL);
421
422 /* Set name */
423 rv->name = name ? XSTRDUP(MTYPE_THREAD_MASTER, name) : NULL;
424
425 /* Initialize I/O task data structures */
426 getrlimit(RLIMIT_NOFILE, &limit);
427 rv->fd_limit = (int)limit.rlim_cur;
a6f235f3
DS
428 rv->read = XCALLOC(MTYPE_THREAD_POLL,
429 sizeof(struct thread *) * rv->fd_limit);
430
431 rv->write = XCALLOC(MTYPE_THREAD_POLL,
432 sizeof(struct thread *) * rv->fd_limit);
d62a17ae 433
bd74dc61 434 rv->cpu_record = hash_create_size(
996c9314 435 8, (unsigned int (*)(void *))cpu_record_hash_key,
bd74dc61
DS
436 (int (*)(const void *, const void *))cpu_record_hash_cmp,
437 "Thread Hash");
d62a17ae 438
439
440 /* Initialize the timer queues */
441 rv->timer = pqueue_create();
442 rv->timer->cmp = thread_timer_cmp;
443 rv->timer->update = thread_timer_update;
444
445 /* Initialize thread_fetch() settings */
446 rv->spin = true;
447 rv->handle_signals = true;
448
449 /* Set pthread owner, should be updated by actual owner */
450 rv->owner = pthread_self();
451 rv->cancel_req = list_new();
452 rv->cancel_req->del = cancelreq_del;
453 rv->canceled = true;
454
455 /* Initialize pipe poker */
456 pipe(rv->io_pipe);
457 set_nonblocking(rv->io_pipe[0]);
458 set_nonblocking(rv->io_pipe[1]);
459
460 /* Initialize data structures for poll() */
461 rv->handler.pfdsize = rv->fd_limit;
462 rv->handler.pfdcount = 0;
463 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
464 sizeof(struct pollfd) * rv->handler.pfdsize);
465 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
466 sizeof(struct pollfd) * rv->handler.pfdsize);
467
eff09c66 468 /* add to list of threadmasters */
d62a17ae 469 pthread_mutex_lock(&masters_mtx);
470 {
eff09c66
QY
471 if (!masters)
472 masters = list_new();
473
d62a17ae 474 listnode_add(masters, rv);
475 }
476 pthread_mutex_unlock(&masters_mtx);
477
478 return rv;
718e3744 479}
480
d8a8a8de
QY
481void thread_master_set_name(struct thread_master *master, const char *name)
482{
483 pthread_mutex_lock(&master->mtx);
484 {
485 if (master->name)
486 XFREE(MTYPE_THREAD_MASTER, master->name);
487 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
488 }
489 pthread_mutex_unlock(&master->mtx);
490}
491
718e3744 492/* Add a new thread to the list. */
d62a17ae 493static void thread_list_add(struct thread_list *list, struct thread *thread)
718e3744 494{
d62a17ae 495 thread->next = NULL;
496 thread->prev = list->tail;
497 if (list->tail)
498 list->tail->next = thread;
499 else
500 list->head = thread;
501 list->tail = thread;
502 list->count++;
718e3744 503}
504
718e3744 505/* Delete a thread from the list. */
d62a17ae 506static struct thread *thread_list_delete(struct thread_list *list,
507 struct thread *thread)
718e3744 508{
d62a17ae 509 if (thread->next)
510 thread->next->prev = thread->prev;
511 else
512 list->tail = thread->prev;
513 if (thread->prev)
514 thread->prev->next = thread->next;
515 else
516 list->head = thread->next;
517 thread->next = thread->prev = NULL;
518 list->count--;
519 return thread;
718e3744 520}
521
495f0b13 522/* Thread list is empty or not. */
d62a17ae 523static int thread_empty(struct thread_list *list)
495f0b13 524{
d62a17ae 525 return list->head ? 0 : 1;
495f0b13
DS
526}
527
528/* Delete top of the list and return it. */
d62a17ae 529static struct thread *thread_trim_head(struct thread_list *list)
495f0b13 530{
d62a17ae 531 if (!thread_empty(list))
532 return thread_list_delete(list, list->head);
533 return NULL;
495f0b13
DS
534}
535
6ed04aa2
DS
536#define THREAD_UNUSED_DEPTH 10
537
718e3744 538/* Move thread to unuse list. */
d62a17ae 539static void thread_add_unuse(struct thread_master *m, struct thread *thread)
718e3744 540{
d62a17ae 541 assert(m != NULL && thread != NULL);
542 assert(thread->next == NULL);
543 assert(thread->prev == NULL);
d62a17ae 544
d62a17ae 545 thread->hist->total_active--;
6ed04aa2
DS
546 memset(thread, 0, sizeof(struct thread));
547 thread->type = THREAD_UNUSED;
548
549 if (m->unuse.count < THREAD_UNUSED_DEPTH)
550 thread_list_add(&m->unuse, thread);
551 else
552 XFREE(MTYPE_THREAD, thread);
718e3744 553}
554
555/* Free all unused thread. */
d62a17ae 556static void thread_list_free(struct thread_master *m, struct thread_list *list)
718e3744 557{
d62a17ae 558 struct thread *t;
559 struct thread *next;
560
561 for (t = list->head; t; t = next) {
562 next = t->next;
563 XFREE(MTYPE_THREAD, t);
564 list->count--;
565 m->alloc--;
566 }
718e3744 567}
568
d62a17ae 569static void thread_array_free(struct thread_master *m,
570 struct thread **thread_array)
308d14ae 571{
d62a17ae 572 struct thread *t;
573 int index;
574
575 for (index = 0; index < m->fd_limit; ++index) {
576 t = thread_array[index];
577 if (t) {
578 thread_array[index] = NULL;
579 XFREE(MTYPE_THREAD, t);
580 m->alloc--;
581 }
582 }
a6f235f3 583 XFREE(MTYPE_THREAD_POLL, thread_array);
308d14ae
DV
584}
585
d62a17ae 586static void thread_queue_free(struct thread_master *m, struct pqueue *queue)
4becea72 587{
d62a17ae 588 int i;
4becea72 589
d62a17ae 590 for (i = 0; i < queue->size; i++)
591 XFREE(MTYPE_THREAD, queue->array[i]);
4becea72 592
d62a17ae 593 m->alloc -= queue->size;
594 pqueue_delete(queue);
4becea72
CF
595}
596
495f0b13
DS
597/*
598 * thread_master_free_unused
599 *
600 * As threads are finished with they are put on the
601 * unuse list for later reuse.
602 * If we are shutting down, Free up unused threads
603 * So we can see if we forget to shut anything off
604 */
d62a17ae 605void thread_master_free_unused(struct thread_master *m)
495f0b13 606{
d62a17ae 607 pthread_mutex_lock(&m->mtx);
608 {
609 struct thread *t;
610 while ((t = thread_trim_head(&m->unuse)) != NULL) {
611 pthread_mutex_destroy(&t->mtx);
612 XFREE(MTYPE_THREAD, t);
613 }
614 }
615 pthread_mutex_unlock(&m->mtx);
495f0b13
DS
616}
617
718e3744 618/* Stop thread scheduler. */
d62a17ae 619void thread_master_free(struct thread_master *m)
718e3744 620{
d62a17ae 621 pthread_mutex_lock(&masters_mtx);
622 {
623 listnode_delete(masters, m);
eff09c66 624 if (masters->count == 0) {
acdf5e25 625 list_delete_and_null(&masters);
eff09c66 626 }
d62a17ae 627 }
628 pthread_mutex_unlock(&masters_mtx);
629
630 thread_array_free(m, m->read);
631 thread_array_free(m, m->write);
632 thread_queue_free(m, m->timer);
633 thread_list_free(m, &m->event);
634 thread_list_free(m, &m->ready);
635 thread_list_free(m, &m->unuse);
636 pthread_mutex_destroy(&m->mtx);
33844bbe 637 pthread_cond_destroy(&m->cancel_cond);
d62a17ae 638 close(m->io_pipe[0]);
639 close(m->io_pipe[1]);
affe9e99 640 list_delete_and_null(&m->cancel_req);
1a0a92ea 641 m->cancel_req = NULL;
d62a17ae 642
643 hash_clean(m->cpu_record, cpu_record_hash_free);
644 hash_free(m->cpu_record);
645 m->cpu_record = NULL;
646
4e1000a1
QY
647 if (m->name)
648 XFREE(MTYPE_THREAD_MASTER, m->name);
d62a17ae 649 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
650 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
651 XFREE(MTYPE_THREAD_MASTER, m);
718e3744 652}
653
718e3744 654/* Return remain time in second. */
d62a17ae 655unsigned long thread_timer_remain_second(struct thread *thread)
718e3744 656{
d62a17ae 657 int64_t remain;
1189d95f 658
d62a17ae 659 pthread_mutex_lock(&thread->mtx);
660 {
661 remain = monotime_until(&thread->u.sands, NULL) / 1000000LL;
662 }
663 pthread_mutex_unlock(&thread->mtx);
1189d95f 664
d62a17ae 665 return remain < 0 ? 0 : remain;
718e3744 666}
667
9c7753e4
DL
668#define debugargdef const char *funcname, const char *schedfrom, int fromln
669#define debugargpass funcname, schedfrom, fromln
e04ab74d 670
d62a17ae 671struct timeval thread_timer_remain(struct thread *thread)
6ac44687 672{
d62a17ae 673 struct timeval remain;
674 pthread_mutex_lock(&thread->mtx);
675 {
676 monotime_until(&thread->u.sands, &remain);
677 }
678 pthread_mutex_unlock(&thread->mtx);
679 return remain;
6ac44687
CF
680}
681
718e3744 682/* Get new thread. */
d7c0a89a 683static struct thread *thread_get(struct thread_master *m, uint8_t type,
d62a17ae 684 int (*func)(struct thread *), void *arg,
685 debugargdef)
718e3744 686{
d62a17ae 687 struct thread *thread = thread_trim_head(&m->unuse);
688 struct cpu_thread_history tmp;
689
690 if (!thread) {
691 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
692 /* mutex only needs to be initialized at struct creation. */
693 pthread_mutex_init(&thread->mtx, NULL);
694 m->alloc++;
695 }
696
697 thread->type = type;
698 thread->add_type = type;
699 thread->master = m;
700 thread->arg = arg;
701 thread->index = -1;
702 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
703 thread->ref = NULL;
704
705 /*
706 * So if the passed in funcname is not what we have
707 * stored that means the thread->hist needs to be
708 * updated. We keep the last one around in unused
709 * under the assumption that we are probably
710 * going to immediately allocate the same
711 * type of thread.
712 * This hopefully saves us some serious
713 * hash_get lookups.
714 */
715 if (thread->funcname != funcname || thread->func != func) {
716 tmp.func = func;
717 tmp.funcname = funcname;
718 thread->hist =
719 hash_get(m->cpu_record, &tmp,
720 (void *(*)(void *))cpu_record_hash_alloc);
721 }
722 thread->hist->total_active++;
723 thread->func = func;
724 thread->funcname = funcname;
725 thread->schedfrom = schedfrom;
726 thread->schedfrom_line = fromln;
727
728 return thread;
718e3744 729}
730
d62a17ae 731static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
732 nfds_t count, const struct timeval *timer_wait)
209a72a6 733{
d62a17ae 734 /* If timer_wait is null here, that means poll() should block
735 * indefinitely,
736 * unless the thread_master has overriden it by setting
737 * ->selectpoll_timeout.
738 * If the value is positive, it specifies the maximum number of
739 * milliseconds
740 * to wait. If the timeout is -1, it specifies that we should never wait
741 * and
742 * always return immediately even if no event is detected. If the value
743 * is
744 * zero, the behavior is default. */
745 int timeout = -1;
746
747 /* number of file descriptors with events */
748 int num;
749
750 if (timer_wait != NULL
751 && m->selectpoll_timeout == 0) // use the default value
752 timeout = (timer_wait->tv_sec * 1000)
753 + (timer_wait->tv_usec / 1000);
754 else if (m->selectpoll_timeout > 0) // use the user's timeout
755 timeout = m->selectpoll_timeout;
756 else if (m->selectpoll_timeout
757 < 0) // effect a poll (return immediately)
758 timeout = 0;
759
760 /* add poll pipe poker */
761 assert(count + 1 < pfdsize);
762 pfds[count].fd = m->io_pipe[0];
763 pfds[count].events = POLLIN;
764 pfds[count].revents = 0x00;
765
766 num = poll(pfds, count + 1, timeout);
767
768 unsigned char trash[64];
769 if (num > 0 && pfds[count].revents != 0 && num--)
770 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
771 ;
772
773 return num;
209a72a6
DS
774}
775
718e3744 776/* Add new read thread. */
d62a17ae 777struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
778 int (*func)(struct thread *),
779 void *arg, int fd,
780 struct thread **t_ptr,
781 debugargdef)
718e3744 782{
d62a17ae 783 struct thread *thread = NULL;
784
9b864cd3 785 assert(fd >= 0 && fd < m->fd_limit);
d62a17ae 786 pthread_mutex_lock(&m->mtx);
787 {
788 if (t_ptr
789 && *t_ptr) // thread is already scheduled; don't reschedule
790 {
791 pthread_mutex_unlock(&m->mtx);
792 return NULL;
793 }
794
795 /* default to a new pollfd */
796 nfds_t queuepos = m->handler.pfdcount;
797
798 /* if we already have a pollfd for our file descriptor, find and
799 * use it */
800 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
801 if (m->handler.pfds[i].fd == fd) {
802 queuepos = i;
803 break;
804 }
805
806 /* make sure we have room for this fd + pipe poker fd */
807 assert(queuepos + 1 < m->handler.pfdsize);
808
809 thread = thread_get(m, dir, func, arg, debugargpass);
810
811 m->handler.pfds[queuepos].fd = fd;
812 m->handler.pfds[queuepos].events |=
813 (dir == THREAD_READ ? POLLIN : POLLOUT);
814
815 if (queuepos == m->handler.pfdcount)
816 m->handler.pfdcount++;
817
818 if (thread) {
819 pthread_mutex_lock(&thread->mtx);
820 {
821 thread->u.fd = fd;
822 if (dir == THREAD_READ)
823 m->read[thread->u.fd] = thread;
824 else
825 m->write[thread->u.fd] = thread;
826 }
827 pthread_mutex_unlock(&thread->mtx);
828
829 if (t_ptr) {
830 *t_ptr = thread;
831 thread->ref = t_ptr;
832 }
833 }
834
835 AWAKEN(m);
836 }
837 pthread_mutex_unlock(&m->mtx);
838
839 return thread;
718e3744 840}
841
56a94b36 842static struct thread *
d62a17ae 843funcname_thread_add_timer_timeval(struct thread_master *m,
844 int (*func)(struct thread *), int type,
845 void *arg, struct timeval *time_relative,
846 struct thread **t_ptr, debugargdef)
718e3744 847{
d62a17ae 848 struct thread *thread;
849 struct pqueue *queue;
850
851 assert(m != NULL);
852
853 assert(type == THREAD_TIMER);
854 assert(time_relative);
855
856 pthread_mutex_lock(&m->mtx);
857 {
858 if (t_ptr
859 && *t_ptr) // thread is already scheduled; don't reschedule
860 {
861 pthread_mutex_unlock(&m->mtx);
862 return NULL;
863 }
864
865 queue = m->timer;
866 thread = thread_get(m, type, func, arg, debugargpass);
867
868 pthread_mutex_lock(&thread->mtx);
869 {
870 monotime(&thread->u.sands);
871 timeradd(&thread->u.sands, time_relative,
872 &thread->u.sands);
873 pqueue_enqueue(thread, queue);
874 if (t_ptr) {
875 *t_ptr = thread;
876 thread->ref = t_ptr;
877 }
878 }
879 pthread_mutex_unlock(&thread->mtx);
880
881 AWAKEN(m);
882 }
883 pthread_mutex_unlock(&m->mtx);
884
885 return thread;
9e867fe6 886}
887
98c91ac6 888
889/* Add timer event thread. */
d62a17ae 890struct thread *funcname_thread_add_timer(struct thread_master *m,
891 int (*func)(struct thread *),
892 void *arg, long timer,
893 struct thread **t_ptr, debugargdef)
9e867fe6 894{
d62a17ae 895 struct timeval trel;
9e867fe6 896
d62a17ae 897 assert(m != NULL);
9e867fe6 898
d62a17ae 899 trel.tv_sec = timer;
900 trel.tv_usec = 0;
9e867fe6 901
d62a17ae 902 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
903 &trel, t_ptr, debugargpass);
98c91ac6 904}
9e867fe6 905
98c91ac6 906/* Add timer event thread with "millisecond" resolution */
d62a17ae 907struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
908 int (*func)(struct thread *),
909 void *arg, long timer,
910 struct thread **t_ptr,
911 debugargdef)
98c91ac6 912{
d62a17ae 913 struct timeval trel;
9e867fe6 914
d62a17ae 915 assert(m != NULL);
718e3744 916
d62a17ae 917 trel.tv_sec = timer / 1000;
918 trel.tv_usec = 1000 * (timer % 1000);
98c91ac6 919
d62a17ae 920 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
921 &trel, t_ptr, debugargpass);
a48b4e6d 922}
923
d03c4cbd 924/* Add timer event thread with "millisecond" resolution */
d62a17ae 925struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
926 int (*func)(struct thread *),
927 void *arg, struct timeval *tv,
928 struct thread **t_ptr, debugargdef)
d03c4cbd 929{
d62a17ae 930 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
931 t_ptr, debugargpass);
d03c4cbd
DL
932}
933
718e3744 934/* Add simple event thread. */
d62a17ae 935struct thread *funcname_thread_add_event(struct thread_master *m,
936 int (*func)(struct thread *),
937 void *arg, int val,
938 struct thread **t_ptr, debugargdef)
718e3744 939{
d62a17ae 940 struct thread *thread;
941
942 assert(m != NULL);
943
944 pthread_mutex_lock(&m->mtx);
945 {
946 if (t_ptr
947 && *t_ptr) // thread is already scheduled; don't reschedule
948 {
949 pthread_mutex_unlock(&m->mtx);
950 return NULL;
951 }
952
953 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
954 pthread_mutex_lock(&thread->mtx);
955 {
956 thread->u.val = val;
957 thread_list_add(&m->event, thread);
958 }
959 pthread_mutex_unlock(&thread->mtx);
960
961 if (t_ptr) {
962 *t_ptr = thread;
963 thread->ref = t_ptr;
964 }
965
966 AWAKEN(m);
967 }
968 pthread_mutex_unlock(&m->mtx);
969
970 return thread;
718e3744 971}
972
63ccb9cb
QY
973/* Thread cancellation ------------------------------------------------------ */
974
8797240e
QY
975/**
976 * NOT's out the .events field of pollfd corresponding to the given file
977 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
978 *
979 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
980 * implementation for details.
981 *
982 * @param master
983 * @param fd
984 * @param state the event to cancel. One or more (OR'd together) of the
985 * following:
986 * - POLLIN
987 * - POLLOUT
988 */
d62a17ae 989static void thread_cancel_rw(struct thread_master *master, int fd, short state)
0a95a0d0 990{
42d74538
QY
991 bool found = false;
992
d62a17ae 993 /* Cancel POLLHUP too just in case some bozo set it */
994 state |= POLLHUP;
995
996 /* find the index of corresponding pollfd */
997 nfds_t i;
998
999 for (i = 0; i < master->handler.pfdcount; i++)
42d74538
QY
1000 if (master->handler.pfds[i].fd == fd) {
1001 found = true;
d62a17ae 1002 break;
42d74538
QY
1003 }
1004
1005 if (!found) {
1006 zlog_debug(
1007 "[!] Received cancellation request for nonexistent rw job");
1008 zlog_debug("[!] threadmaster: %s | fd: %d",
996c9314 1009 master->name ? master->name : "", fd);
42d74538
QY
1010 return;
1011 }
d62a17ae 1012
1013 /* NOT out event. */
1014 master->handler.pfds[i].events &= ~(state);
1015
1016 /* If all events are canceled, delete / resize the pollfd array. */
1017 if (master->handler.pfds[i].events == 0) {
1018 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
1019 (master->handler.pfdcount - i - 1)
1020 * sizeof(struct pollfd));
1021 master->handler.pfdcount--;
1022 }
1023
1024 /* If we have the same pollfd in the copy, perform the same operations,
1025 * otherwise return. */
1026 if (i >= master->handler.copycount)
1027 return;
1028
1029 master->handler.copy[i].events &= ~(state);
1030
1031 if (master->handler.copy[i].events == 0) {
1032 memmove(master->handler.copy + i, master->handler.copy + i + 1,
1033 (master->handler.copycount - i - 1)
1034 * sizeof(struct pollfd));
1035 master->handler.copycount--;
1036 }
0a95a0d0
DS
1037}
1038
1189d95f 1039/**
63ccb9cb 1040 * Process cancellation requests.
1189d95f 1041 *
63ccb9cb
QY
1042 * This may only be run from the pthread which owns the thread_master.
1043 *
1044 * @param master the thread master to process
1045 * @REQUIRE master->mtx
1189d95f 1046 */
d62a17ae 1047static void do_thread_cancel(struct thread_master *master)
718e3744 1048{
d62a17ae 1049 struct thread_list *list = NULL;
1050 struct pqueue *queue = NULL;
1051 struct thread **thread_array = NULL;
1052 struct thread *thread;
1053
1054 struct cancel_req *cr;
1055 struct listnode *ln;
1056 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1057 /* If this is an event object cancellation, linear search
1058 * through event
1059 * list deleting any events which have the specified argument.
1060 * We also
1061 * need to check every thread in the ready queue. */
1062 if (cr->eventobj) {
1063 struct thread *t;
1064 thread = master->event.head;
1065
1066 while (thread) {
1067 t = thread;
1068 thread = t->next;
1069
1070 if (t->arg == cr->eventobj) {
1071 thread_list_delete(&master->event, t);
1072 if (t->ref)
1073 *t->ref = NULL;
1074 thread_add_unuse(master, t);
1075 }
1076 }
1077
1078 thread = master->ready.head;
1079 while (thread) {
1080 t = thread;
1081 thread = t->next;
1082
1083 if (t->arg == cr->eventobj) {
1084 thread_list_delete(&master->ready, t);
1085 if (t->ref)
1086 *t->ref = NULL;
1087 thread_add_unuse(master, t);
1088 }
1089 }
1090 continue;
1091 }
1092
1093 /* The pointer varies depending on whether the cancellation
1094 * request was
1095 * made asynchronously or not. If it was, we need to check
1096 * whether the
1097 * thread even exists anymore before cancelling it. */
1098 thread = (cr->thread) ? cr->thread : *cr->threadref;
1099
1100 if (!thread)
1101 continue;
1102
1103 /* Determine the appropriate queue to cancel the thread from */
1104 switch (thread->type) {
1105 case THREAD_READ:
1106 thread_cancel_rw(master, thread->u.fd, POLLIN);
1107 thread_array = master->read;
1108 break;
1109 case THREAD_WRITE:
1110 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1111 thread_array = master->write;
1112 break;
1113 case THREAD_TIMER:
1114 queue = master->timer;
1115 break;
1116 case THREAD_EVENT:
1117 list = &master->event;
1118 break;
1119 case THREAD_READY:
1120 list = &master->ready;
1121 break;
1122 default:
1123 continue;
1124 break;
1125 }
1126
1127 if (queue) {
1128 assert(thread->index >= 0);
522f7f99
DS
1129 assert(thread == queue->array[thread->index]);
1130 pqueue_remove_at(thread->index, queue);
d62a17ae 1131 } else if (list) {
1132 thread_list_delete(list, thread);
1133 } else if (thread_array) {
1134 thread_array[thread->u.fd] = NULL;
1135 } else {
1136 assert(!"Thread should be either in queue or list or array!");
1137 }
1138
1139 if (thread->ref)
1140 *thread->ref = NULL;
1141
1142 thread_add_unuse(thread->master, thread);
1143 }
1144
1145 /* Delete and free all cancellation requests */
1146 list_delete_all_node(master->cancel_req);
1147
1148 /* Wake up any threads which may be blocked in thread_cancel_async() */
1149 master->canceled = true;
1150 pthread_cond_broadcast(&master->cancel_cond);
718e3744 1151}
1152
63ccb9cb
QY
1153/**
1154 * Cancel any events which have the specified argument.
1155 *
1156 * MT-Unsafe
1157 *
1158 * @param m the thread_master to cancel from
1159 * @param arg the argument passed when creating the event
1160 */
d62a17ae 1161void thread_cancel_event(struct thread_master *master, void *arg)
718e3744 1162{
d62a17ae 1163 assert(master->owner == pthread_self());
1164
1165 pthread_mutex_lock(&master->mtx);
1166 {
1167 struct cancel_req *cr =
1168 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1169 cr->eventobj = arg;
1170 listnode_add(master->cancel_req, cr);
1171 do_thread_cancel(master);
1172 }
1173 pthread_mutex_unlock(&master->mtx);
63ccb9cb 1174}
1189d95f 1175
63ccb9cb
QY
1176/**
1177 * Cancel a specific task.
1178 *
1179 * MT-Unsafe
1180 *
1181 * @param thread task to cancel
1182 */
d62a17ae 1183void thread_cancel(struct thread *thread)
63ccb9cb 1184{
6ed04aa2 1185 struct thread_master *master = thread->master;
d62a17ae 1186
6ed04aa2
DS
1187 assert(master->owner == pthread_self());
1188
1189 pthread_mutex_lock(&master->mtx);
d62a17ae 1190 {
1191 struct cancel_req *cr =
1192 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1193 cr->thread = thread;
6ed04aa2
DS
1194 listnode_add(master->cancel_req, cr);
1195 do_thread_cancel(master);
d62a17ae 1196 }
6ed04aa2 1197 pthread_mutex_unlock(&master->mtx);
63ccb9cb 1198}
1189d95f 1199
63ccb9cb
QY
1200/**
1201 * Asynchronous cancellation.
1202 *
8797240e
QY
1203 * Called with either a struct thread ** or void * to an event argument,
1204 * this function posts the correct cancellation request and blocks until it is
1205 * serviced.
63ccb9cb
QY
1206 *
1207 * If the thread is currently running, execution blocks until it completes.
1208 *
8797240e
QY
1209 * The last two parameters are mutually exclusive, i.e. if you pass one the
1210 * other must be NULL.
1211 *
1212 * When the cancellation procedure executes on the target thread_master, the
1213 * thread * provided is checked for nullity. If it is null, the thread is
1214 * assumed to no longer exist and the cancellation request is a no-op. Thus
1215 * users of this API must pass a back-reference when scheduling the original
1216 * task.
1217 *
63ccb9cb
QY
1218 * MT-Safe
1219 *
8797240e
QY
1220 * @param master the thread master with the relevant event / task
1221 * @param thread pointer to thread to cancel
1222 * @param eventobj the event
63ccb9cb 1223 */
d62a17ae 1224void thread_cancel_async(struct thread_master *master, struct thread **thread,
1225 void *eventobj)
63ccb9cb 1226{
d62a17ae 1227 assert(!(thread && eventobj) && (thread || eventobj));
1228 assert(master->owner != pthread_self());
1229
1230 pthread_mutex_lock(&master->mtx);
1231 {
1232 master->canceled = false;
1233
1234 if (thread) {
1235 struct cancel_req *cr =
1236 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1237 cr->threadref = thread;
1238 listnode_add(master->cancel_req, cr);
1239 } else if (eventobj) {
1240 struct cancel_req *cr =
1241 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1242 cr->eventobj = eventobj;
1243 listnode_add(master->cancel_req, cr);
1244 }
1245 AWAKEN(master);
1246
1247 while (!master->canceled)
1248 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1249 }
1250 pthread_mutex_unlock(&master->mtx);
718e3744 1251}
63ccb9cb 1252/* ------------------------------------------------------------------------- */
718e3744 1253
d62a17ae 1254static struct timeval *thread_timer_wait(struct pqueue *queue,
1255 struct timeval *timer_val)
718e3744 1256{
d62a17ae 1257 if (queue->size) {
1258 struct thread *next_timer = queue->array[0];
1259 monotime_until(&next_timer->u.sands, timer_val);
1260 return timer_val;
1261 }
1262 return NULL;
718e3744 1263}
718e3744 1264
d62a17ae 1265static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1266 struct thread *fetch)
718e3744 1267{
d62a17ae 1268 *fetch = *thread;
1269 thread_add_unuse(m, thread);
1270 return fetch;
718e3744 1271}
1272
d62a17ae 1273static int thread_process_io_helper(struct thread_master *m,
1274 struct thread *thread, short state, int pos)
5d4ccd4e 1275{
d62a17ae 1276 struct thread **thread_array;
1277
1278 if (!thread)
1279 return 0;
1280
1281 if (thread->type == THREAD_READ)
1282 thread_array = m->read;
1283 else
1284 thread_array = m->write;
1285
1286 thread_array[thread->u.fd] = NULL;
1287 thread_list_add(&m->ready, thread);
1288 thread->type = THREAD_READY;
1289 /* if another pthread scheduled this file descriptor for the event we're
1290 * responding to, no problem; we're getting to it now */
1291 thread->master->handler.pfds[pos].events &= ~(state);
1292 return 1;
5d4ccd4e
DS
1293}
1294
8797240e
QY
1295/**
1296 * Process I/O events.
1297 *
1298 * Walks through file descriptor array looking for those pollfds whose .revents
1299 * field has something interesting. Deletes any invalid file descriptors.
1300 *
1301 * @param m the thread master
1302 * @param num the number of active file descriptors (return value of poll())
1303 */
d62a17ae 1304static void thread_process_io(struct thread_master *m, unsigned int num)
0a95a0d0 1305{
d62a17ae 1306 unsigned int ready = 0;
1307 struct pollfd *pfds = m->handler.copy;
1308
1309 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1310 /* no event for current fd? immediately continue */
1311 if (pfds[i].revents == 0)
1312 continue;
1313
1314 ready++;
1315
1316 /* Unless someone has called thread_cancel from another pthread,
1317 * the only
1318 * thing that could have changed in m->handler.pfds while we
1319 * were
1320 * asleep is the .events field in a given pollfd. Barring
1321 * thread_cancel()
1322 * that value should be a superset of the values we have in our
1323 * copy, so
1324 * there's no need to update it. Similarily, barring deletion,
1325 * the fd
1326 * should still be a valid index into the master's pfds. */
1327 if (pfds[i].revents & (POLLIN | POLLHUP))
1328 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
1329 i);
1330 if (pfds[i].revents & POLLOUT)
1331 thread_process_io_helper(m, m->write[pfds[i].fd],
1332 POLLOUT, i);
1333
1334 /* if one of our file descriptors is garbage, remove the same
1335 * from
1336 * both pfds + update sizes and index */
1337 if (pfds[i].revents & POLLNVAL) {
1338 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1339 (m->handler.pfdcount - i - 1)
1340 * sizeof(struct pollfd));
1341 m->handler.pfdcount--;
1342
1343 memmove(pfds + i, pfds + i + 1,
1344 (m->handler.copycount - i - 1)
1345 * sizeof(struct pollfd));
1346 m->handler.copycount--;
1347
1348 i--;
1349 }
1350 }
718e3744 1351}
1352
8b70d0b0 1353/* Add all timers that have popped to the ready list. */
d62a17ae 1354static unsigned int thread_process_timers(struct pqueue *queue,
1355 struct timeval *timenow)
a48b4e6d 1356{
d62a17ae 1357 struct thread *thread;
1358 unsigned int ready = 0;
1359
1360 while (queue->size) {
1361 thread = queue->array[0];
1362 if (timercmp(timenow, &thread->u.sands, <))
1363 return ready;
1364 pqueue_dequeue(queue);
1365 thread->type = THREAD_READY;
1366 thread_list_add(&thread->master->ready, thread);
1367 ready++;
1368 }
1369 return ready;
a48b4e6d 1370}
1371
2613abe6 1372/* process a list en masse, e.g. for event thread lists */
d62a17ae 1373static unsigned int thread_process(struct thread_list *list)
2613abe6 1374{
d62a17ae 1375 struct thread *thread;
1376 struct thread *next;
1377 unsigned int ready = 0;
1378
1379 for (thread = list->head; thread; thread = next) {
1380 next = thread->next;
1381 thread_list_delete(list, thread);
1382 thread->type = THREAD_READY;
1383 thread_list_add(&thread->master->ready, thread);
1384 ready++;
1385 }
1386 return ready;
2613abe6
PJ
1387}
1388
1389
718e3744 1390/* Fetch next ready thread. */
d62a17ae 1391struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
718e3744 1392{
d62a17ae 1393 struct thread *thread = NULL;
1394 struct timeval now;
1395 struct timeval zerotime = {0, 0};
1396 struct timeval tv;
1397 struct timeval *tw = NULL;
1398
1399 int num = 0;
1400
1401 do {
1402 /* Handle signals if any */
1403 if (m->handle_signals)
1404 quagga_sigevent_process();
1405
1406 pthread_mutex_lock(&m->mtx);
1407
1408 /* Process any pending cancellation requests */
1409 do_thread_cancel(m);
1410
e3c9529e
QY
1411 /*
1412 * Attempt to flush ready queue before going into poll().
1413 * This is performance-critical. Think twice before modifying.
1414 */
1415 if ((thread = thread_trim_head(&m->ready))) {
1416 fetch = thread_run(m, thread, fetch);
1417 if (fetch->ref)
1418 *fetch->ref = NULL;
1419 pthread_mutex_unlock(&m->mtx);
1420 break;
1421 }
1422
1423 /* otherwise, tick through scheduling sequence */
1424
bca37d17
QY
1425 /*
1426 * Post events to ready queue. This must come before the
1427 * following block since events should occur immediately
1428 */
d62a17ae 1429 thread_process(&m->event);
1430
bca37d17
QY
1431 /*
1432 * If there are no tasks on the ready queue, we will poll()
1433 * until a timer expires or we receive I/O, whichever comes
1434 * first. The strategy for doing this is:
d62a17ae 1435 *
1436 * - If there are events pending, set the poll() timeout to zero
1437 * - If there are no events pending, but there are timers
1438 * pending, set the
1439 * timeout to the smallest remaining time on any timer
1440 * - If there are neither timers nor events pending, but there
1441 * are file
1442 * descriptors pending, block indefinitely in poll()
1443 * - If nothing is pending, it's time for the application to die
1444 *
1445 * In every case except the last, we need to hit poll() at least
bca37d17
QY
1446 * once per loop to avoid starvation by events
1447 */
d62a17ae 1448 if (m->ready.count == 0)
1449 tw = thread_timer_wait(m->timer, &tv);
1450
1451 if (m->ready.count != 0 || (tw && !timercmp(tw, &zerotime, >)))
1452 tw = &zerotime;
1453
1454 if (!tw && m->handler.pfdcount == 0) { /* die */
1455 pthread_mutex_unlock(&m->mtx);
1456 fetch = NULL;
1457 break;
1458 }
1459
bca37d17
QY
1460 /*
1461 * Copy pollfd array + # active pollfds in it. Not necessary to
1462 * copy the array size as this is fixed.
1463 */
d62a17ae 1464 m->handler.copycount = m->handler.pfdcount;
1465 memcpy(m->handler.copy, m->handler.pfds,
1466 m->handler.copycount * sizeof(struct pollfd));
1467
e3c9529e
QY
1468 pthread_mutex_unlock(&m->mtx);
1469 {
1470 num = fd_poll(m, m->handler.copy, m->handler.pfdsize,
1471 m->handler.copycount, tw);
1472 }
1473 pthread_mutex_lock(&m->mtx);
d764d2cc 1474
e3c9529e
QY
1475 /* Handle any errors received in poll() */
1476 if (num < 0) {
1477 if (errno == EINTR) {
d62a17ae 1478 pthread_mutex_unlock(&m->mtx);
e3c9529e
QY
1479 /* loop around to signal handler */
1480 continue;
d62a17ae 1481 }
1482
e3c9529e 1483 /* else die */
450971aa 1484 flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
9ef9495e 1485 safe_strerror(errno));
e3c9529e
QY
1486 pthread_mutex_unlock(&m->mtx);
1487 fetch = NULL;
1488 break;
bca37d17 1489 }
d62a17ae 1490
1491 /* Post timers to ready queue. */
1492 monotime(&now);
1493 thread_process_timers(m->timer, &now);
1494
1495 /* Post I/O to ready queue. */
1496 if (num > 0)
1497 thread_process_io(m, num);
1498
d62a17ae 1499 pthread_mutex_unlock(&m->mtx);
1500
1501 } while (!thread && m->spin);
1502
1503 return fetch;
718e3744 1504}
1505
d62a17ae 1506static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
62f44022 1507{
d62a17ae 1508 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1509 + (a.tv_usec - b.tv_usec));
62f44022
QY
1510}
1511
d62a17ae 1512unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1513 unsigned long *cputime)
718e3744 1514{
d62a17ae 1515 /* This is 'user + sys' time. */
1516 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1517 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1518 return timeval_elapsed(now->real, start->real);
8b70d0b0 1519}
1520
50596be0
DS
1521/* We should aim to yield after yield milliseconds, which defaults
1522 to THREAD_YIELD_TIME_SLOT .
8b70d0b0 1523 Note: we are using real (wall clock) time for this calculation.
1524 It could be argued that CPU time may make more sense in certain
1525 contexts. The things to consider are whether the thread may have
1526 blocked (in which case wall time increases, but CPU time does not),
1527 or whether the system is heavily loaded with other processes competing
d62a17ae 1528 for CPU time. On balance, wall clock time seems to make sense.
8b70d0b0 1529 Plus it has the added benefit that gettimeofday should be faster
1530 than calling getrusage. */
d62a17ae 1531int thread_should_yield(struct thread *thread)
718e3744 1532{
d62a17ae 1533 int result;
1534 pthread_mutex_lock(&thread->mtx);
1535 {
1536 result = monotime_since(&thread->real, NULL)
1537 > (int64_t)thread->yield;
1538 }
1539 pthread_mutex_unlock(&thread->mtx);
1540 return result;
50596be0
DS
1541}
1542
d62a17ae 1543void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
50596be0 1544{
d62a17ae 1545 pthread_mutex_lock(&thread->mtx);
1546 {
1547 thread->yield = yield_time;
1548 }
1549 pthread_mutex_unlock(&thread->mtx);
718e3744 1550}
1551
d62a17ae 1552void thread_getrusage(RUSAGE_T *r)
db9c0df9 1553{
d62a17ae 1554 monotime(&r->real);
1555 getrusage(RUSAGE_SELF, &(r->cpu));
db9c0df9
PJ
1556}
1557
fbcac826
QY
1558/*
1559 * Call a thread.
1560 *
1561 * This function will atomically update the thread's usage history. At present
1562 * this is the only spot where usage history is written. Nevertheless the code
1563 * has been written such that the introduction of writers in the future should
1564 * not need to update it provided the writers atomically perform only the
1565 * operations done here, i.e. updating the total and maximum times. In
1566 * particular, the maximum real and cpu times must be monotonically increasing
1567 * or this code is not correct.
1568 */
d62a17ae 1569void thread_call(struct thread *thread)
718e3744 1570{
fbcac826
QY
1571 _Atomic unsigned long realtime, cputime;
1572 unsigned long exp;
1573 unsigned long helper;
d62a17ae 1574 RUSAGE_T before, after;
cc8b13a0 1575
d62a17ae 1576 GETRUSAGE(&before);
1577 thread->real = before.real;
718e3744 1578
d62a17ae 1579 pthread_setspecific(thread_current, thread);
1580 (*thread->func)(thread);
1581 pthread_setspecific(thread_current, NULL);
718e3744 1582
d62a17ae 1583 GETRUSAGE(&after);
718e3744 1584
fbcac826
QY
1585 realtime = thread_consumed_time(&after, &before, &helper);
1586 cputime = helper;
1587
1588 /* update realtime */
1589 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1590 memory_order_seq_cst);
1591 exp = atomic_load_explicit(&thread->hist->real.max,
1592 memory_order_seq_cst);
1593 while (exp < realtime
1594 && !atomic_compare_exchange_weak_explicit(
1595 &thread->hist->real.max, &exp, realtime,
1596 memory_order_seq_cst, memory_order_seq_cst))
1597 ;
1598
1599 /* update cputime */
1600 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1601 memory_order_seq_cst);
1602 exp = atomic_load_explicit(&thread->hist->cpu.max,
1603 memory_order_seq_cst);
1604 while (exp < cputime
1605 && !atomic_compare_exchange_weak_explicit(
1606 &thread->hist->cpu.max, &exp, cputime,
1607 memory_order_seq_cst, memory_order_seq_cst))
1608 ;
1609
1610 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1611 memory_order_seq_cst);
1612 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1613 memory_order_seq_cst);
718e3744 1614
924b9229 1615#ifdef CONSUMED_TIME_CHECK
d62a17ae 1616 if (realtime > CONSUMED_TIME_CHECK) {
1617 /*
1618 * We have a CPU Hog on our hands.
1619 * Whinge about it now, so we're aware this is yet another task
1620 * to fix.
1621 */
9ef9495e 1622 flog_warn(
450971aa 1623 EC_LIB_SLOW_THREAD,
d62a17ae 1624 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1625 thread->funcname, (unsigned long)thread->func,
1626 realtime / 1000, cputime / 1000);
1627 }
924b9229 1628#endif /* CONSUMED_TIME_CHECK */
718e3744 1629}
1630
1631/* Execute thread */
d62a17ae 1632void funcname_thread_execute(struct thread_master *m,
1633 int (*func)(struct thread *), void *arg, int val,
1634 debugargdef)
718e3744 1635{
d62a17ae 1636 struct cpu_thread_history tmp;
1637 struct thread dummy;
718e3744 1638
d62a17ae 1639 memset(&dummy, 0, sizeof(struct thread));
718e3744 1640
d62a17ae 1641 pthread_mutex_init(&dummy.mtx, NULL);
1642 dummy.type = THREAD_EVENT;
1643 dummy.add_type = THREAD_EXECUTE;
1644 dummy.master = NULL;
1645 dummy.arg = arg;
1646 dummy.u.val = val;
9c7753e4 1647
d62a17ae 1648 tmp.func = dummy.func = func;
1649 tmp.funcname = dummy.funcname = funcname;
1650 dummy.hist = hash_get(m->cpu_record, &tmp,
1651 (void *(*)(void *))cpu_record_hash_alloc);
f7c62e11 1652
d62a17ae 1653 dummy.schedfrom = schedfrom;
1654 dummy.schedfrom_line = fromln;
9c7753e4 1655
d62a17ae 1656 thread_call(&dummy);
718e3744 1657}