]> git.proxmox.com Git - mirror_frr.git/blame - lib/thread.c
zebra, lib: fix the ZEBRA_INTERFACE_VRF_UPDATE zapi message
[mirror_frr.git] / lib / thread.c
CommitLineData
718e3744 1/* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
896014f4
DL
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
718e3744 19 */
20
21/* #define DEBUG */
22
23#include <zebra.h>
308d14ae 24#include <sys/resource.h>
718e3744 25
26#include "thread.h"
27#include "memory.h"
28#include "log.h"
e04ab74d 29#include "hash.h"
4becea72 30#include "pqueue.h"
e04ab74d 31#include "command.h"
05c447dd 32#include "sigevent.h"
3bf2673b 33#include "network.h"
bd74dc61 34#include "jhash.h"
fbcac826 35#include "frratomic.h"
9ef9495e 36#include "lib_errors.h"
d6be5fb9 37
d62a17ae 38DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
4a1ab8e4 39DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
a6f235f3 40DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
d62a17ae 41DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
4a1ab8e4 42
3b96b781
HT
43#if defined(__APPLE__)
44#include <mach/mach.h>
45#include <mach/mach_time.h>
46#endif
47
d62a17ae 48#define AWAKEN(m) \
49 do { \
50 static unsigned char wakebyte = 0x01; \
51 write(m->io_pipe[1], &wakebyte, 1); \
52 } while (0);
3bf2673b 53
62f44022 54/* control variable for initializer */
e0bebc7c 55pthread_once_t init_once = PTHREAD_ONCE_INIT;
e0bebc7c 56pthread_key_t thread_current;
6b0655a2 57
62f44022
QY
58pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
59static struct list *masters;
60
6655966d 61static void thread_free(struct thread_master *master, struct thread *thread);
6b0655a2 62
62f44022 63/* CLI start ---------------------------------------------------------------- */
d62a17ae 64static unsigned int cpu_record_hash_key(struct cpu_thread_history *a)
e04ab74d 65{
883cc51d 66 int size = sizeof(a->func);
bd74dc61
DS
67
68 return jhash(&a->func, size, 0);
e04ab74d 69}
70
74df8d6d 71static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
d62a17ae 72 const struct cpu_thread_history *b)
e04ab74d 73{
d62a17ae 74 return a->func == b->func;
e04ab74d 75}
76
d62a17ae 77static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
e04ab74d 78{
d62a17ae 79 struct cpu_thread_history *new;
80 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
81 new->func = a->func;
82 new->funcname = a->funcname;
83 return new;
e04ab74d 84}
85
d62a17ae 86static void cpu_record_hash_free(void *a)
228da428 87{
d62a17ae 88 struct cpu_thread_history *hist = a;
89
90 XFREE(MTYPE_THREAD_STATS, hist);
228da428
CC
91}
92
d62a17ae 93static void vty_out_cpu_thread_history(struct vty *vty,
94 struct cpu_thread_history *a)
e04ab74d 95{
d1667f53 96 vty_out(vty, "%5d %10lu.%03lu %9u %8lu %9lu %8lu %9lu", a->total_active,
d62a17ae 97 a->cpu.total / 1000, a->cpu.total % 1000, a->total_calls,
98 a->cpu.total / a->total_calls, a->cpu.max,
99 a->real.total / a->total_calls, a->real.max);
100 vty_out(vty, " %c%c%c%c%c %s\n",
101 a->types & (1 << THREAD_READ) ? 'R' : ' ',
102 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
103 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
104 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
105 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
e04ab74d 106}
107
d62a17ae 108static void cpu_record_hash_print(struct hash_backet *bucket, void *args[])
e04ab74d 109{
d62a17ae 110 struct cpu_thread_history *totals = args[0];
fbcac826 111 struct cpu_thread_history copy;
d62a17ae 112 struct vty *vty = args[1];
fbcac826 113 uint8_t *filter = args[2];
d62a17ae 114
115 struct cpu_thread_history *a = bucket->data;
116
fbcac826
QY
117 copy.total_active =
118 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
119 copy.total_calls =
120 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
121 copy.cpu.total =
122 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
123 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
124 copy.real.total =
125 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
126 copy.real.max =
127 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
128 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
129 copy.funcname = a->funcname;
130
131 if (!(copy.types & *filter))
d62a17ae 132 return;
fbcac826
QY
133
134 vty_out_cpu_thread_history(vty, &copy);
135 totals->total_active += copy.total_active;
136 totals->total_calls += copy.total_calls;
137 totals->real.total += copy.real.total;
138 if (totals->real.max < copy.real.max)
139 totals->real.max = copy.real.max;
140 totals->cpu.total += copy.cpu.total;
141 if (totals->cpu.max < copy.cpu.max)
142 totals->cpu.max = copy.cpu.max;
e04ab74d 143}
144
fbcac826 145static void cpu_record_print(struct vty *vty, uint8_t filter)
e04ab74d 146{
d62a17ae 147 struct cpu_thread_history tmp;
148 void *args[3] = {&tmp, vty, &filter};
149 struct thread_master *m;
150 struct listnode *ln;
151
152 memset(&tmp, 0, sizeof tmp);
153 tmp.funcname = "TOTAL";
154 tmp.types = filter;
155
156 pthread_mutex_lock(&masters_mtx);
157 {
158 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
159 const char *name = m->name ? m->name : "main";
160
161 char underline[strlen(name) + 1];
162 memset(underline, '-', sizeof(underline));
4f113d60 163 underline[sizeof(underline) - 1] = '\0';
d62a17ae 164
165 vty_out(vty, "\n");
166 vty_out(vty, "Showing statistics for pthread %s\n",
167 name);
168 vty_out(vty, "-------------------------------%s\n",
169 underline);
170 vty_out(vty, "%21s %18s %18s\n", "",
171 "CPU (user+system):", "Real (wall-clock):");
172 vty_out(vty,
173 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
174 vty_out(vty, " Avg uSec Max uSecs");
175 vty_out(vty, " Type Thread\n");
176
177 if (m->cpu_record->count)
178 hash_iterate(
179 m->cpu_record,
180 (void (*)(struct hash_backet *,
181 void *))cpu_record_hash_print,
182 args);
183 else
184 vty_out(vty, "No data to display yet.\n");
185
186 vty_out(vty, "\n");
187 }
188 }
189 pthread_mutex_unlock(&masters_mtx);
190
191 vty_out(vty, "\n");
192 vty_out(vty, "Total thread statistics\n");
193 vty_out(vty, "-------------------------\n");
194 vty_out(vty, "%21s %18s %18s\n", "",
195 "CPU (user+system):", "Real (wall-clock):");
196 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
197 vty_out(vty, " Avg uSec Max uSecs");
198 vty_out(vty, " Type Thread\n");
199
200 if (tmp.total_calls > 0)
201 vty_out_cpu_thread_history(vty, &tmp);
e04ab74d 202}
203
d62a17ae 204static void cpu_record_hash_clear(struct hash_backet *bucket, void *args[])
e276eb82 205{
fbcac826 206 uint8_t *filter = args[0];
d62a17ae 207 struct hash *cpu_record = args[1];
208
209 struct cpu_thread_history *a = bucket->data;
62f44022 210
d62a17ae 211 if (!(a->types & *filter))
212 return;
f48f65d2 213
d62a17ae 214 hash_release(cpu_record, bucket->data);
e276eb82
PJ
215}
216
fbcac826 217static void cpu_record_clear(uint8_t filter)
e276eb82 218{
fbcac826 219 uint8_t *tmp = &filter;
d62a17ae 220 struct thread_master *m;
221 struct listnode *ln;
222
223 pthread_mutex_lock(&masters_mtx);
224 {
225 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
226 pthread_mutex_lock(&m->mtx);
227 {
228 void *args[2] = {tmp, m->cpu_record};
229 hash_iterate(
230 m->cpu_record,
231 (void (*)(struct hash_backet *,
232 void *))cpu_record_hash_clear,
233 args);
234 }
235 pthread_mutex_unlock(&m->mtx);
236 }
237 }
238 pthread_mutex_unlock(&masters_mtx);
62f44022
QY
239}
240
fbcac826 241static uint8_t parse_filter(const char *filterstr)
62f44022 242{
d62a17ae 243 int i = 0;
244 int filter = 0;
245
246 while (filterstr[i] != '\0') {
247 switch (filterstr[i]) {
248 case 'r':
249 case 'R':
250 filter |= (1 << THREAD_READ);
251 break;
252 case 'w':
253 case 'W':
254 filter |= (1 << THREAD_WRITE);
255 break;
256 case 't':
257 case 'T':
258 filter |= (1 << THREAD_TIMER);
259 break;
260 case 'e':
261 case 'E':
262 filter |= (1 << THREAD_EVENT);
263 break;
264 case 'x':
265 case 'X':
266 filter |= (1 << THREAD_EXECUTE);
267 break;
268 default:
269 break;
270 }
271 ++i;
272 }
273 return filter;
62f44022
QY
274}
275
276DEFUN (show_thread_cpu,
277 show_thread_cpu_cmd,
278 "show thread cpu [FILTER]",
279 SHOW_STR
280 "Thread information\n"
281 "Thread CPU usage\n"
282 "Display filter (rwtexb)\n")
283{
fbcac826 284 uint8_t filter = (uint8_t)-1U;
d62a17ae 285 int idx = 0;
286
287 if (argv_find(argv, argc, "FILTER", &idx)) {
288 filter = parse_filter(argv[idx]->arg);
289 if (!filter) {
290 vty_out(vty,
291 "Invalid filter \"%s\" specified; must contain at least"
292 "one of 'RWTEXB'\n",
293 argv[idx]->arg);
294 return CMD_WARNING;
295 }
296 }
297
298 cpu_record_print(vty, filter);
299 return CMD_SUCCESS;
e276eb82
PJ
300}
301
8872626b
DS
302static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
303{
304 const char *name = m->name ? m->name : "main";
305 char underline[strlen(name) + 1];
306 uint32_t i;
307
308 memset(underline, '-', sizeof(underline));
309 underline[sizeof(underline) - 1] = '\0';
310
311 vty_out(vty, "\nShowing poll FD's for %s\n", name);
312 vty_out(vty, "----------------------%s\n", underline);
313 vty_out(vty, "Count: %u\n", (uint32_t)m->handler.pfdcount);
314 for (i = 0; i < m->handler.pfdcount; i++)
315 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\n", i,
316 m->handler.pfds[i].fd,
317 m->handler.pfds[i].events,
318 m->handler.pfds[i].revents);
319}
320
321DEFUN (show_thread_poll,
322 show_thread_poll_cmd,
323 "show thread poll",
324 SHOW_STR
325 "Thread information\n"
326 "Show poll FD's and information\n")
327{
328 struct listnode *node;
329 struct thread_master *m;
330
331 pthread_mutex_lock(&masters_mtx);
332 {
333 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
334 show_thread_poll_helper(vty, m);
335 }
336 }
337 pthread_mutex_unlock(&masters_mtx);
338
339 return CMD_SUCCESS;
340}
341
342
49d41a26
DS
343DEFUN (clear_thread_cpu,
344 clear_thread_cpu_cmd,
345 "clear thread cpu [FILTER]",
62f44022 346 "Clear stored data in all pthreads\n"
49d41a26
DS
347 "Thread information\n"
348 "Thread CPU usage\n"
349 "Display filter (rwtexb)\n")
e276eb82 350{
fbcac826 351 uint8_t filter = (uint8_t)-1U;
d62a17ae 352 int idx = 0;
353
354 if (argv_find(argv, argc, "FILTER", &idx)) {
355 filter = parse_filter(argv[idx]->arg);
356 if (!filter) {
357 vty_out(vty,
358 "Invalid filter \"%s\" specified; must contain at least"
359 "one of 'RWTEXB'\n",
360 argv[idx]->arg);
361 return CMD_WARNING;
362 }
363 }
364
365 cpu_record_clear(filter);
366 return CMD_SUCCESS;
e276eb82 367}
6b0655a2 368
d62a17ae 369void thread_cmd_init(void)
0b84f294 370{
d62a17ae 371 install_element(VIEW_NODE, &show_thread_cpu_cmd);
8872626b 372 install_element(VIEW_NODE, &show_thread_poll_cmd);
d62a17ae 373 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
0b84f294 374}
62f44022
QY
375/* CLI end ------------------------------------------------------------------ */
376
0b84f294 377
d62a17ae 378static int thread_timer_cmp(void *a, void *b)
4becea72 379{
d62a17ae 380 struct thread *thread_a = a;
381 struct thread *thread_b = b;
382
383 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, <))
384 return -1;
385 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, >))
386 return 1;
387 return 0;
4becea72
CF
388}
389
d62a17ae 390static void thread_timer_update(void *node, int actual_position)
4becea72 391{
d62a17ae 392 struct thread *thread = node;
4becea72 393
d62a17ae 394 thread->index = actual_position;
4becea72
CF
395}
396
d62a17ae 397static void cancelreq_del(void *cr)
63ccb9cb 398{
d62a17ae 399 XFREE(MTYPE_TMP, cr);
63ccb9cb
QY
400}
401
e0bebc7c 402/* initializer, only ever called once */
d62a17ae 403static void initializer()
e0bebc7c 404{
d62a17ae 405 pthread_key_create(&thread_current, NULL);
e0bebc7c
QY
406}
407
d62a17ae 408struct thread_master *thread_master_create(const char *name)
718e3744 409{
d62a17ae 410 struct thread_master *rv;
411 struct rlimit limit;
412
413 pthread_once(&init_once, &initializer);
414
415 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
416 if (rv == NULL)
417 return NULL;
418
419 /* Initialize master mutex */
420 pthread_mutex_init(&rv->mtx, NULL);
421 pthread_cond_init(&rv->cancel_cond, NULL);
422
423 /* Set name */
424 rv->name = name ? XSTRDUP(MTYPE_THREAD_MASTER, name) : NULL;
425
426 /* Initialize I/O task data structures */
427 getrlimit(RLIMIT_NOFILE, &limit);
428 rv->fd_limit = (int)limit.rlim_cur;
a6f235f3
DS
429 rv->read = XCALLOC(MTYPE_THREAD_POLL,
430 sizeof(struct thread *) * rv->fd_limit);
431
432 rv->write = XCALLOC(MTYPE_THREAD_POLL,
433 sizeof(struct thread *) * rv->fd_limit);
d62a17ae 434
bd74dc61 435 rv->cpu_record = hash_create_size(
996c9314 436 8, (unsigned int (*)(void *))cpu_record_hash_key,
74df8d6d 437 (bool (*)(const void *, const void *))cpu_record_hash_cmp,
bd74dc61 438 "Thread Hash");
d62a17ae 439
440
441 /* Initialize the timer queues */
442 rv->timer = pqueue_create();
443 rv->timer->cmp = thread_timer_cmp;
444 rv->timer->update = thread_timer_update;
445
446 /* Initialize thread_fetch() settings */
447 rv->spin = true;
448 rv->handle_signals = true;
449
450 /* Set pthread owner, should be updated by actual owner */
451 rv->owner = pthread_self();
452 rv->cancel_req = list_new();
453 rv->cancel_req->del = cancelreq_del;
454 rv->canceled = true;
455
456 /* Initialize pipe poker */
457 pipe(rv->io_pipe);
458 set_nonblocking(rv->io_pipe[0]);
459 set_nonblocking(rv->io_pipe[1]);
460
461 /* Initialize data structures for poll() */
462 rv->handler.pfdsize = rv->fd_limit;
463 rv->handler.pfdcount = 0;
464 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
465 sizeof(struct pollfd) * rv->handler.pfdsize);
466 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
467 sizeof(struct pollfd) * rv->handler.pfdsize);
468
eff09c66 469 /* add to list of threadmasters */
d62a17ae 470 pthread_mutex_lock(&masters_mtx);
471 {
eff09c66
QY
472 if (!masters)
473 masters = list_new();
474
d62a17ae 475 listnode_add(masters, rv);
476 }
477 pthread_mutex_unlock(&masters_mtx);
478
479 return rv;
718e3744 480}
481
d8a8a8de
QY
482void thread_master_set_name(struct thread_master *master, const char *name)
483{
484 pthread_mutex_lock(&master->mtx);
485 {
486 if (master->name)
487 XFREE(MTYPE_THREAD_MASTER, master->name);
488 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
489 }
490 pthread_mutex_unlock(&master->mtx);
491}
492
718e3744 493/* Add a new thread to the list. */
d62a17ae 494static void thread_list_add(struct thread_list *list, struct thread *thread)
718e3744 495{
d62a17ae 496 thread->next = NULL;
497 thread->prev = list->tail;
498 if (list->tail)
499 list->tail->next = thread;
500 else
501 list->head = thread;
502 list->tail = thread;
503 list->count++;
718e3744 504}
505
718e3744 506/* Delete a thread from the list. */
d62a17ae 507static struct thread *thread_list_delete(struct thread_list *list,
508 struct thread *thread)
718e3744 509{
d62a17ae 510 if (thread->next)
511 thread->next->prev = thread->prev;
512 else
513 list->tail = thread->prev;
514 if (thread->prev)
515 thread->prev->next = thread->next;
516 else
517 list->head = thread->next;
518 thread->next = thread->prev = NULL;
519 list->count--;
520 return thread;
718e3744 521}
522
495f0b13 523/* Thread list is empty or not. */
d62a17ae 524static int thread_empty(struct thread_list *list)
495f0b13 525{
d62a17ae 526 return list->head ? 0 : 1;
495f0b13
DS
527}
528
529/* Delete top of the list and return it. */
d62a17ae 530static struct thread *thread_trim_head(struct thread_list *list)
495f0b13 531{
d62a17ae 532 if (!thread_empty(list))
533 return thread_list_delete(list, list->head);
534 return NULL;
495f0b13
DS
535}
536
6ed04aa2
DS
537#define THREAD_UNUSED_DEPTH 10
538
718e3744 539/* Move thread to unuse list. */
d62a17ae 540static void thread_add_unuse(struct thread_master *m, struct thread *thread)
718e3744 541{
6655966d
RZ
542 pthread_mutex_t mtxc = thread->mtx;
543
d62a17ae 544 assert(m != NULL && thread != NULL);
545 assert(thread->next == NULL);
546 assert(thread->prev == NULL);
d62a17ae 547
d62a17ae 548 thread->hist->total_active--;
6ed04aa2
DS
549 memset(thread, 0, sizeof(struct thread));
550 thread->type = THREAD_UNUSED;
551
6655966d
RZ
552 /* Restore the thread mutex context. */
553 thread->mtx = mtxc;
554
555 if (m->unuse.count < THREAD_UNUSED_DEPTH) {
6ed04aa2 556 thread_list_add(&m->unuse, thread);
6655966d
RZ
557 return;
558 }
559
560 thread_free(m, thread);
718e3744 561}
562
563/* Free all unused thread. */
d62a17ae 564static void thread_list_free(struct thread_master *m, struct thread_list *list)
718e3744 565{
d62a17ae 566 struct thread *t;
567 struct thread *next;
568
569 for (t = list->head; t; t = next) {
570 next = t->next;
6655966d 571 thread_free(m, t);
d62a17ae 572 list->count--;
d62a17ae 573 }
718e3744 574}
575
d62a17ae 576static void thread_array_free(struct thread_master *m,
577 struct thread **thread_array)
308d14ae 578{
d62a17ae 579 struct thread *t;
580 int index;
581
582 for (index = 0; index < m->fd_limit; ++index) {
583 t = thread_array[index];
584 if (t) {
585 thread_array[index] = NULL;
6655966d 586 thread_free(m, t);
d62a17ae 587 }
588 }
a6f235f3 589 XFREE(MTYPE_THREAD_POLL, thread_array);
308d14ae
DV
590}
591
d62a17ae 592static void thread_queue_free(struct thread_master *m, struct pqueue *queue)
4becea72 593{
d62a17ae 594 int i;
4becea72 595
d62a17ae 596 for (i = 0; i < queue->size; i++)
6655966d 597 thread_free(m, queue->array[i]);
4becea72 598
d62a17ae 599 pqueue_delete(queue);
4becea72
CF
600}
601
495f0b13
DS
602/*
603 * thread_master_free_unused
604 *
605 * As threads are finished with they are put on the
606 * unuse list for later reuse.
607 * If we are shutting down, Free up unused threads
608 * So we can see if we forget to shut anything off
609 */
d62a17ae 610void thread_master_free_unused(struct thread_master *m)
495f0b13 611{
d62a17ae 612 pthread_mutex_lock(&m->mtx);
613 {
614 struct thread *t;
615 while ((t = thread_trim_head(&m->unuse)) != NULL) {
6655966d 616 thread_free(m, t);
d62a17ae 617 }
618 }
619 pthread_mutex_unlock(&m->mtx);
495f0b13
DS
620}
621
718e3744 622/* Stop thread scheduler. */
d62a17ae 623void thread_master_free(struct thread_master *m)
718e3744 624{
d62a17ae 625 pthread_mutex_lock(&masters_mtx);
626 {
627 listnode_delete(masters, m);
eff09c66 628 if (masters->count == 0) {
6a154c88 629 list_delete(&masters);
eff09c66 630 }
d62a17ae 631 }
632 pthread_mutex_unlock(&masters_mtx);
633
634 thread_array_free(m, m->read);
635 thread_array_free(m, m->write);
636 thread_queue_free(m, m->timer);
637 thread_list_free(m, &m->event);
638 thread_list_free(m, &m->ready);
639 thread_list_free(m, &m->unuse);
640 pthread_mutex_destroy(&m->mtx);
33844bbe 641 pthread_cond_destroy(&m->cancel_cond);
d62a17ae 642 close(m->io_pipe[0]);
643 close(m->io_pipe[1]);
6a154c88 644 list_delete(&m->cancel_req);
1a0a92ea 645 m->cancel_req = NULL;
d62a17ae 646
647 hash_clean(m->cpu_record, cpu_record_hash_free);
648 hash_free(m->cpu_record);
649 m->cpu_record = NULL;
650
4e1000a1
QY
651 if (m->name)
652 XFREE(MTYPE_THREAD_MASTER, m->name);
d62a17ae 653 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
654 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
655 XFREE(MTYPE_THREAD_MASTER, m);
718e3744 656}
657
78ca0342
CF
658/* Return remain time in miliseconds. */
659unsigned long thread_timer_remain_msec(struct thread *thread)
718e3744 660{
d62a17ae 661 int64_t remain;
1189d95f 662
d62a17ae 663 pthread_mutex_lock(&thread->mtx);
664 {
78ca0342 665 remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
d62a17ae 666 }
667 pthread_mutex_unlock(&thread->mtx);
1189d95f 668
d62a17ae 669 return remain < 0 ? 0 : remain;
718e3744 670}
671
78ca0342
CF
672/* Return remain time in seconds. */
673unsigned long thread_timer_remain_second(struct thread *thread)
674{
675 return thread_timer_remain_msec(thread) / 1000LL;
676}
677
9c7753e4
DL
678#define debugargdef const char *funcname, const char *schedfrom, int fromln
679#define debugargpass funcname, schedfrom, fromln
e04ab74d 680
d62a17ae 681struct timeval thread_timer_remain(struct thread *thread)
6ac44687 682{
d62a17ae 683 struct timeval remain;
684 pthread_mutex_lock(&thread->mtx);
685 {
686 monotime_until(&thread->u.sands, &remain);
687 }
688 pthread_mutex_unlock(&thread->mtx);
689 return remain;
6ac44687
CF
690}
691
718e3744 692/* Get new thread. */
d7c0a89a 693static struct thread *thread_get(struct thread_master *m, uint8_t type,
d62a17ae 694 int (*func)(struct thread *), void *arg,
695 debugargdef)
718e3744 696{
d62a17ae 697 struct thread *thread = thread_trim_head(&m->unuse);
698 struct cpu_thread_history tmp;
699
700 if (!thread) {
701 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
702 /* mutex only needs to be initialized at struct creation. */
703 pthread_mutex_init(&thread->mtx, NULL);
704 m->alloc++;
705 }
706
707 thread->type = type;
708 thread->add_type = type;
709 thread->master = m;
710 thread->arg = arg;
711 thread->index = -1;
712 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
713 thread->ref = NULL;
714
715 /*
716 * So if the passed in funcname is not what we have
717 * stored that means the thread->hist needs to be
718 * updated. We keep the last one around in unused
719 * under the assumption that we are probably
720 * going to immediately allocate the same
721 * type of thread.
722 * This hopefully saves us some serious
723 * hash_get lookups.
724 */
725 if (thread->funcname != funcname || thread->func != func) {
726 tmp.func = func;
727 tmp.funcname = funcname;
728 thread->hist =
729 hash_get(m->cpu_record, &tmp,
730 (void *(*)(void *))cpu_record_hash_alloc);
731 }
732 thread->hist->total_active++;
733 thread->func = func;
734 thread->funcname = funcname;
735 thread->schedfrom = schedfrom;
736 thread->schedfrom_line = fromln;
737
738 return thread;
718e3744 739}
740
6655966d
RZ
741static void thread_free(struct thread_master *master, struct thread *thread)
742{
743 /* Update statistics. */
744 assert(master->alloc > 0);
745 master->alloc--;
746
747 /* Free allocated resources. */
748 pthread_mutex_destroy(&thread->mtx);
749 XFREE(MTYPE_THREAD, thread);
750}
751
d62a17ae 752static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
753 nfds_t count, const struct timeval *timer_wait)
209a72a6 754{
d62a17ae 755 /* If timer_wait is null here, that means poll() should block
756 * indefinitely,
757 * unless the thread_master has overriden it by setting
758 * ->selectpoll_timeout.
759 * If the value is positive, it specifies the maximum number of
760 * milliseconds
761 * to wait. If the timeout is -1, it specifies that we should never wait
762 * and
763 * always return immediately even if no event is detected. If the value
764 * is
765 * zero, the behavior is default. */
766 int timeout = -1;
767
768 /* number of file descriptors with events */
769 int num;
770
771 if (timer_wait != NULL
772 && m->selectpoll_timeout == 0) // use the default value
773 timeout = (timer_wait->tv_sec * 1000)
774 + (timer_wait->tv_usec / 1000);
775 else if (m->selectpoll_timeout > 0) // use the user's timeout
776 timeout = m->selectpoll_timeout;
777 else if (m->selectpoll_timeout
778 < 0) // effect a poll (return immediately)
779 timeout = 0;
780
781 /* add poll pipe poker */
782 assert(count + 1 < pfdsize);
783 pfds[count].fd = m->io_pipe[0];
784 pfds[count].events = POLLIN;
785 pfds[count].revents = 0x00;
786
787 num = poll(pfds, count + 1, timeout);
788
789 unsigned char trash[64];
790 if (num > 0 && pfds[count].revents != 0 && num--)
791 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
792 ;
793
794 return num;
209a72a6
DS
795}
796
718e3744 797/* Add new read thread. */
d62a17ae 798struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
799 int (*func)(struct thread *),
800 void *arg, int fd,
801 struct thread **t_ptr,
802 debugargdef)
718e3744 803{
d62a17ae 804 struct thread *thread = NULL;
805
9b864cd3 806 assert(fd >= 0 && fd < m->fd_limit);
d62a17ae 807 pthread_mutex_lock(&m->mtx);
808 {
809 if (t_ptr
810 && *t_ptr) // thread is already scheduled; don't reschedule
811 {
812 pthread_mutex_unlock(&m->mtx);
813 return NULL;
814 }
815
816 /* default to a new pollfd */
817 nfds_t queuepos = m->handler.pfdcount;
818
819 /* if we already have a pollfd for our file descriptor, find and
820 * use it */
821 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
822 if (m->handler.pfds[i].fd == fd) {
823 queuepos = i;
824 break;
825 }
826
827 /* make sure we have room for this fd + pipe poker fd */
828 assert(queuepos + 1 < m->handler.pfdsize);
829
830 thread = thread_get(m, dir, func, arg, debugargpass);
831
832 m->handler.pfds[queuepos].fd = fd;
833 m->handler.pfds[queuepos].events |=
834 (dir == THREAD_READ ? POLLIN : POLLOUT);
835
836 if (queuepos == m->handler.pfdcount)
837 m->handler.pfdcount++;
838
839 if (thread) {
840 pthread_mutex_lock(&thread->mtx);
841 {
842 thread->u.fd = fd;
843 if (dir == THREAD_READ)
844 m->read[thread->u.fd] = thread;
845 else
846 m->write[thread->u.fd] = thread;
847 }
848 pthread_mutex_unlock(&thread->mtx);
849
850 if (t_ptr) {
851 *t_ptr = thread;
852 thread->ref = t_ptr;
853 }
854 }
855
856 AWAKEN(m);
857 }
858 pthread_mutex_unlock(&m->mtx);
859
860 return thread;
718e3744 861}
862
56a94b36 863static struct thread *
d62a17ae 864funcname_thread_add_timer_timeval(struct thread_master *m,
865 int (*func)(struct thread *), int type,
866 void *arg, struct timeval *time_relative,
867 struct thread **t_ptr, debugargdef)
718e3744 868{
d62a17ae 869 struct thread *thread;
870 struct pqueue *queue;
871
872 assert(m != NULL);
873
874 assert(type == THREAD_TIMER);
875 assert(time_relative);
876
877 pthread_mutex_lock(&m->mtx);
878 {
879 if (t_ptr
880 && *t_ptr) // thread is already scheduled; don't reschedule
881 {
882 pthread_mutex_unlock(&m->mtx);
883 return NULL;
884 }
885
886 queue = m->timer;
887 thread = thread_get(m, type, func, arg, debugargpass);
888
889 pthread_mutex_lock(&thread->mtx);
890 {
891 monotime(&thread->u.sands);
892 timeradd(&thread->u.sands, time_relative,
893 &thread->u.sands);
894 pqueue_enqueue(thread, queue);
895 if (t_ptr) {
896 *t_ptr = thread;
897 thread->ref = t_ptr;
898 }
899 }
900 pthread_mutex_unlock(&thread->mtx);
901
902 AWAKEN(m);
903 }
904 pthread_mutex_unlock(&m->mtx);
905
906 return thread;
9e867fe6 907}
908
98c91ac6 909
910/* Add timer event thread. */
d62a17ae 911struct thread *funcname_thread_add_timer(struct thread_master *m,
912 int (*func)(struct thread *),
913 void *arg, long timer,
914 struct thread **t_ptr, debugargdef)
9e867fe6 915{
d62a17ae 916 struct timeval trel;
9e867fe6 917
d62a17ae 918 assert(m != NULL);
9e867fe6 919
d62a17ae 920 trel.tv_sec = timer;
921 trel.tv_usec = 0;
9e867fe6 922
d62a17ae 923 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
924 &trel, t_ptr, debugargpass);
98c91ac6 925}
9e867fe6 926
98c91ac6 927/* Add timer event thread with "millisecond" resolution */
d62a17ae 928struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
929 int (*func)(struct thread *),
930 void *arg, long timer,
931 struct thread **t_ptr,
932 debugargdef)
98c91ac6 933{
d62a17ae 934 struct timeval trel;
9e867fe6 935
d62a17ae 936 assert(m != NULL);
718e3744 937
d62a17ae 938 trel.tv_sec = timer / 1000;
939 trel.tv_usec = 1000 * (timer % 1000);
98c91ac6 940
d62a17ae 941 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
942 &trel, t_ptr, debugargpass);
a48b4e6d 943}
944
d03c4cbd 945/* Add timer event thread with "millisecond" resolution */
d62a17ae 946struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
947 int (*func)(struct thread *),
948 void *arg, struct timeval *tv,
949 struct thread **t_ptr, debugargdef)
d03c4cbd 950{
d62a17ae 951 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
952 t_ptr, debugargpass);
d03c4cbd
DL
953}
954
718e3744 955/* Add simple event thread. */
d62a17ae 956struct thread *funcname_thread_add_event(struct thread_master *m,
957 int (*func)(struct thread *),
958 void *arg, int val,
959 struct thread **t_ptr, debugargdef)
718e3744 960{
d62a17ae 961 struct thread *thread;
962
963 assert(m != NULL);
964
965 pthread_mutex_lock(&m->mtx);
966 {
967 if (t_ptr
968 && *t_ptr) // thread is already scheduled; don't reschedule
969 {
970 pthread_mutex_unlock(&m->mtx);
971 return NULL;
972 }
973
974 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
975 pthread_mutex_lock(&thread->mtx);
976 {
977 thread->u.val = val;
978 thread_list_add(&m->event, thread);
979 }
980 pthread_mutex_unlock(&thread->mtx);
981
982 if (t_ptr) {
983 *t_ptr = thread;
984 thread->ref = t_ptr;
985 }
986
987 AWAKEN(m);
988 }
989 pthread_mutex_unlock(&m->mtx);
990
991 return thread;
718e3744 992}
993
63ccb9cb
QY
994/* Thread cancellation ------------------------------------------------------ */
995
8797240e
QY
996/**
997 * NOT's out the .events field of pollfd corresponding to the given file
998 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
999 *
1000 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
1001 * implementation for details.
1002 *
1003 * @param master
1004 * @param fd
1005 * @param state the event to cancel. One or more (OR'd together) of the
1006 * following:
1007 * - POLLIN
1008 * - POLLOUT
1009 */
d62a17ae 1010static void thread_cancel_rw(struct thread_master *master, int fd, short state)
0a95a0d0 1011{
42d74538
QY
1012 bool found = false;
1013
d62a17ae 1014 /* Cancel POLLHUP too just in case some bozo set it */
1015 state |= POLLHUP;
1016
1017 /* find the index of corresponding pollfd */
1018 nfds_t i;
1019
1020 for (i = 0; i < master->handler.pfdcount; i++)
42d74538
QY
1021 if (master->handler.pfds[i].fd == fd) {
1022 found = true;
d62a17ae 1023 break;
42d74538
QY
1024 }
1025
1026 if (!found) {
1027 zlog_debug(
1028 "[!] Received cancellation request for nonexistent rw job");
1029 zlog_debug("[!] threadmaster: %s | fd: %d",
996c9314 1030 master->name ? master->name : "", fd);
42d74538
QY
1031 return;
1032 }
d62a17ae 1033
1034 /* NOT out event. */
1035 master->handler.pfds[i].events &= ~(state);
1036
1037 /* If all events are canceled, delete / resize the pollfd array. */
1038 if (master->handler.pfds[i].events == 0) {
1039 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
1040 (master->handler.pfdcount - i - 1)
1041 * sizeof(struct pollfd));
1042 master->handler.pfdcount--;
1043 }
1044
1045 /* If we have the same pollfd in the copy, perform the same operations,
1046 * otherwise return. */
1047 if (i >= master->handler.copycount)
1048 return;
1049
1050 master->handler.copy[i].events &= ~(state);
1051
1052 if (master->handler.copy[i].events == 0) {
1053 memmove(master->handler.copy + i, master->handler.copy + i + 1,
1054 (master->handler.copycount - i - 1)
1055 * sizeof(struct pollfd));
1056 master->handler.copycount--;
1057 }
0a95a0d0
DS
1058}
1059
1189d95f 1060/**
63ccb9cb 1061 * Process cancellation requests.
1189d95f 1062 *
63ccb9cb
QY
1063 * This may only be run from the pthread which owns the thread_master.
1064 *
1065 * @param master the thread master to process
1066 * @REQUIRE master->mtx
1189d95f 1067 */
d62a17ae 1068static void do_thread_cancel(struct thread_master *master)
718e3744 1069{
d62a17ae 1070 struct thread_list *list = NULL;
1071 struct pqueue *queue = NULL;
1072 struct thread **thread_array = NULL;
1073 struct thread *thread;
1074
1075 struct cancel_req *cr;
1076 struct listnode *ln;
1077 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1078 /* If this is an event object cancellation, linear search
1079 * through event
1080 * list deleting any events which have the specified argument.
1081 * We also
1082 * need to check every thread in the ready queue. */
1083 if (cr->eventobj) {
1084 struct thread *t;
1085 thread = master->event.head;
1086
1087 while (thread) {
1088 t = thread;
1089 thread = t->next;
1090
1091 if (t->arg == cr->eventobj) {
1092 thread_list_delete(&master->event, t);
1093 if (t->ref)
1094 *t->ref = NULL;
1095 thread_add_unuse(master, t);
1096 }
1097 }
1098
1099 thread = master->ready.head;
1100 while (thread) {
1101 t = thread;
1102 thread = t->next;
1103
1104 if (t->arg == cr->eventobj) {
1105 thread_list_delete(&master->ready, t);
1106 if (t->ref)
1107 *t->ref = NULL;
1108 thread_add_unuse(master, t);
1109 }
1110 }
1111 continue;
1112 }
1113
1114 /* The pointer varies depending on whether the cancellation
1115 * request was
1116 * made asynchronously or not. If it was, we need to check
1117 * whether the
1118 * thread even exists anymore before cancelling it. */
1119 thread = (cr->thread) ? cr->thread : *cr->threadref;
1120
1121 if (!thread)
1122 continue;
1123
1124 /* Determine the appropriate queue to cancel the thread from */
1125 switch (thread->type) {
1126 case THREAD_READ:
1127 thread_cancel_rw(master, thread->u.fd, POLLIN);
1128 thread_array = master->read;
1129 break;
1130 case THREAD_WRITE:
1131 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1132 thread_array = master->write;
1133 break;
1134 case THREAD_TIMER:
1135 queue = master->timer;
1136 break;
1137 case THREAD_EVENT:
1138 list = &master->event;
1139 break;
1140 case THREAD_READY:
1141 list = &master->ready;
1142 break;
1143 default:
1144 continue;
1145 break;
1146 }
1147
1148 if (queue) {
1149 assert(thread->index >= 0);
522f7f99
DS
1150 assert(thread == queue->array[thread->index]);
1151 pqueue_remove_at(thread->index, queue);
d62a17ae 1152 } else if (list) {
1153 thread_list_delete(list, thread);
1154 } else if (thread_array) {
1155 thread_array[thread->u.fd] = NULL;
1156 } else {
1157 assert(!"Thread should be either in queue or list or array!");
1158 }
1159
1160 if (thread->ref)
1161 *thread->ref = NULL;
1162
1163 thread_add_unuse(thread->master, thread);
1164 }
1165
1166 /* Delete and free all cancellation requests */
1167 list_delete_all_node(master->cancel_req);
1168
1169 /* Wake up any threads which may be blocked in thread_cancel_async() */
1170 master->canceled = true;
1171 pthread_cond_broadcast(&master->cancel_cond);
718e3744 1172}
1173
63ccb9cb
QY
1174/**
1175 * Cancel any events which have the specified argument.
1176 *
1177 * MT-Unsafe
1178 *
1179 * @param m the thread_master to cancel from
1180 * @param arg the argument passed when creating the event
1181 */
d62a17ae 1182void thread_cancel_event(struct thread_master *master, void *arg)
718e3744 1183{
d62a17ae 1184 assert(master->owner == pthread_self());
1185
1186 pthread_mutex_lock(&master->mtx);
1187 {
1188 struct cancel_req *cr =
1189 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1190 cr->eventobj = arg;
1191 listnode_add(master->cancel_req, cr);
1192 do_thread_cancel(master);
1193 }
1194 pthread_mutex_unlock(&master->mtx);
63ccb9cb 1195}
1189d95f 1196
63ccb9cb
QY
1197/**
1198 * Cancel a specific task.
1199 *
1200 * MT-Unsafe
1201 *
1202 * @param thread task to cancel
1203 */
d62a17ae 1204void thread_cancel(struct thread *thread)
63ccb9cb 1205{
6ed04aa2 1206 struct thread_master *master = thread->master;
d62a17ae 1207
6ed04aa2
DS
1208 assert(master->owner == pthread_self());
1209
1210 pthread_mutex_lock(&master->mtx);
d62a17ae 1211 {
1212 struct cancel_req *cr =
1213 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1214 cr->thread = thread;
6ed04aa2
DS
1215 listnode_add(master->cancel_req, cr);
1216 do_thread_cancel(master);
d62a17ae 1217 }
6ed04aa2 1218 pthread_mutex_unlock(&master->mtx);
63ccb9cb 1219}
1189d95f 1220
63ccb9cb
QY
1221/**
1222 * Asynchronous cancellation.
1223 *
8797240e
QY
1224 * Called with either a struct thread ** or void * to an event argument,
1225 * this function posts the correct cancellation request and blocks until it is
1226 * serviced.
63ccb9cb
QY
1227 *
1228 * If the thread is currently running, execution blocks until it completes.
1229 *
8797240e
QY
1230 * The last two parameters are mutually exclusive, i.e. if you pass one the
1231 * other must be NULL.
1232 *
1233 * When the cancellation procedure executes on the target thread_master, the
1234 * thread * provided is checked for nullity. If it is null, the thread is
1235 * assumed to no longer exist and the cancellation request is a no-op. Thus
1236 * users of this API must pass a back-reference when scheduling the original
1237 * task.
1238 *
63ccb9cb
QY
1239 * MT-Safe
1240 *
8797240e
QY
1241 * @param master the thread master with the relevant event / task
1242 * @param thread pointer to thread to cancel
1243 * @param eventobj the event
63ccb9cb 1244 */
d62a17ae 1245void thread_cancel_async(struct thread_master *master, struct thread **thread,
1246 void *eventobj)
63ccb9cb 1247{
d62a17ae 1248 assert(!(thread && eventobj) && (thread || eventobj));
1249 assert(master->owner != pthread_self());
1250
1251 pthread_mutex_lock(&master->mtx);
1252 {
1253 master->canceled = false;
1254
1255 if (thread) {
1256 struct cancel_req *cr =
1257 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1258 cr->threadref = thread;
1259 listnode_add(master->cancel_req, cr);
1260 } else if (eventobj) {
1261 struct cancel_req *cr =
1262 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1263 cr->eventobj = eventobj;
1264 listnode_add(master->cancel_req, cr);
1265 }
1266 AWAKEN(master);
1267
1268 while (!master->canceled)
1269 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1270 }
1271 pthread_mutex_unlock(&master->mtx);
718e3744 1272}
63ccb9cb 1273/* ------------------------------------------------------------------------- */
718e3744 1274
d62a17ae 1275static struct timeval *thread_timer_wait(struct pqueue *queue,
1276 struct timeval *timer_val)
718e3744 1277{
d62a17ae 1278 if (queue->size) {
1279 struct thread *next_timer = queue->array[0];
1280 monotime_until(&next_timer->u.sands, timer_val);
1281 return timer_val;
1282 }
1283 return NULL;
718e3744 1284}
718e3744 1285
d62a17ae 1286static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1287 struct thread *fetch)
718e3744 1288{
d62a17ae 1289 *fetch = *thread;
1290 thread_add_unuse(m, thread);
1291 return fetch;
718e3744 1292}
1293
d62a17ae 1294static int thread_process_io_helper(struct thread_master *m,
1295 struct thread *thread, short state, int pos)
5d4ccd4e 1296{
d62a17ae 1297 struct thread **thread_array;
1298
1299 if (!thread)
1300 return 0;
1301
1302 if (thread->type == THREAD_READ)
1303 thread_array = m->read;
1304 else
1305 thread_array = m->write;
1306
1307 thread_array[thread->u.fd] = NULL;
1308 thread_list_add(&m->ready, thread);
1309 thread->type = THREAD_READY;
1310 /* if another pthread scheduled this file descriptor for the event we're
1311 * responding to, no problem; we're getting to it now */
1312 thread->master->handler.pfds[pos].events &= ~(state);
1313 return 1;
5d4ccd4e
DS
1314}
1315
8797240e
QY
1316/**
1317 * Process I/O events.
1318 *
1319 * Walks through file descriptor array looking for those pollfds whose .revents
1320 * field has something interesting. Deletes any invalid file descriptors.
1321 *
1322 * @param m the thread master
1323 * @param num the number of active file descriptors (return value of poll())
1324 */
d62a17ae 1325static void thread_process_io(struct thread_master *m, unsigned int num)
0a95a0d0 1326{
d62a17ae 1327 unsigned int ready = 0;
1328 struct pollfd *pfds = m->handler.copy;
1329
1330 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1331 /* no event for current fd? immediately continue */
1332 if (pfds[i].revents == 0)
1333 continue;
1334
1335 ready++;
1336
1337 /* Unless someone has called thread_cancel from another pthread,
1338 * the only
1339 * thing that could have changed in m->handler.pfds while we
1340 * were
1341 * asleep is the .events field in a given pollfd. Barring
1342 * thread_cancel()
1343 * that value should be a superset of the values we have in our
1344 * copy, so
1345 * there's no need to update it. Similarily, barring deletion,
1346 * the fd
1347 * should still be a valid index into the master's pfds. */
1348 if (pfds[i].revents & (POLLIN | POLLHUP))
1349 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
1350 i);
1351 if (pfds[i].revents & POLLOUT)
1352 thread_process_io_helper(m, m->write[pfds[i].fd],
1353 POLLOUT, i);
1354
1355 /* if one of our file descriptors is garbage, remove the same
1356 * from
1357 * both pfds + update sizes and index */
1358 if (pfds[i].revents & POLLNVAL) {
1359 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1360 (m->handler.pfdcount - i - 1)
1361 * sizeof(struct pollfd));
1362 m->handler.pfdcount--;
1363
1364 memmove(pfds + i, pfds + i + 1,
1365 (m->handler.copycount - i - 1)
1366 * sizeof(struct pollfd));
1367 m->handler.copycount--;
1368
1369 i--;
1370 }
1371 }
718e3744 1372}
1373
8b70d0b0 1374/* Add all timers that have popped to the ready list. */
d62a17ae 1375static unsigned int thread_process_timers(struct pqueue *queue,
1376 struct timeval *timenow)
a48b4e6d 1377{
d62a17ae 1378 struct thread *thread;
1379 unsigned int ready = 0;
1380
1381 while (queue->size) {
1382 thread = queue->array[0];
1383 if (timercmp(timenow, &thread->u.sands, <))
1384 return ready;
1385 pqueue_dequeue(queue);
1386 thread->type = THREAD_READY;
1387 thread_list_add(&thread->master->ready, thread);
1388 ready++;
1389 }
1390 return ready;
a48b4e6d 1391}
1392
2613abe6 1393/* process a list en masse, e.g. for event thread lists */
d62a17ae 1394static unsigned int thread_process(struct thread_list *list)
2613abe6 1395{
d62a17ae 1396 struct thread *thread;
1397 struct thread *next;
1398 unsigned int ready = 0;
1399
1400 for (thread = list->head; thread; thread = next) {
1401 next = thread->next;
1402 thread_list_delete(list, thread);
1403 thread->type = THREAD_READY;
1404 thread_list_add(&thread->master->ready, thread);
1405 ready++;
1406 }
1407 return ready;
2613abe6
PJ
1408}
1409
1410
718e3744 1411/* Fetch next ready thread. */
d62a17ae 1412struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
718e3744 1413{
d62a17ae 1414 struct thread *thread = NULL;
1415 struct timeval now;
1416 struct timeval zerotime = {0, 0};
1417 struct timeval tv;
1418 struct timeval *tw = NULL;
1419
1420 int num = 0;
1421
1422 do {
1423 /* Handle signals if any */
1424 if (m->handle_signals)
1425 quagga_sigevent_process();
1426
1427 pthread_mutex_lock(&m->mtx);
1428
1429 /* Process any pending cancellation requests */
1430 do_thread_cancel(m);
1431
e3c9529e
QY
1432 /*
1433 * Attempt to flush ready queue before going into poll().
1434 * This is performance-critical. Think twice before modifying.
1435 */
1436 if ((thread = thread_trim_head(&m->ready))) {
1437 fetch = thread_run(m, thread, fetch);
1438 if (fetch->ref)
1439 *fetch->ref = NULL;
1440 pthread_mutex_unlock(&m->mtx);
1441 break;
1442 }
1443
1444 /* otherwise, tick through scheduling sequence */
1445
bca37d17
QY
1446 /*
1447 * Post events to ready queue. This must come before the
1448 * following block since events should occur immediately
1449 */
d62a17ae 1450 thread_process(&m->event);
1451
bca37d17
QY
1452 /*
1453 * If there are no tasks on the ready queue, we will poll()
1454 * until a timer expires or we receive I/O, whichever comes
1455 * first. The strategy for doing this is:
d62a17ae 1456 *
1457 * - If there are events pending, set the poll() timeout to zero
1458 * - If there are no events pending, but there are timers
1459 * pending, set the
1460 * timeout to the smallest remaining time on any timer
1461 * - If there are neither timers nor events pending, but there
1462 * are file
1463 * descriptors pending, block indefinitely in poll()
1464 * - If nothing is pending, it's time for the application to die
1465 *
1466 * In every case except the last, we need to hit poll() at least
bca37d17
QY
1467 * once per loop to avoid starvation by events
1468 */
d62a17ae 1469 if (m->ready.count == 0)
1470 tw = thread_timer_wait(m->timer, &tv);
1471
1472 if (m->ready.count != 0 || (tw && !timercmp(tw, &zerotime, >)))
1473 tw = &zerotime;
1474
1475 if (!tw && m->handler.pfdcount == 0) { /* die */
1476 pthread_mutex_unlock(&m->mtx);
1477 fetch = NULL;
1478 break;
1479 }
1480
bca37d17
QY
1481 /*
1482 * Copy pollfd array + # active pollfds in it. Not necessary to
1483 * copy the array size as this is fixed.
1484 */
d62a17ae 1485 m->handler.copycount = m->handler.pfdcount;
1486 memcpy(m->handler.copy, m->handler.pfds,
1487 m->handler.copycount * sizeof(struct pollfd));
1488
e3c9529e
QY
1489 pthread_mutex_unlock(&m->mtx);
1490 {
1491 num = fd_poll(m, m->handler.copy, m->handler.pfdsize,
1492 m->handler.copycount, tw);
1493 }
1494 pthread_mutex_lock(&m->mtx);
d764d2cc 1495
e3c9529e
QY
1496 /* Handle any errors received in poll() */
1497 if (num < 0) {
1498 if (errno == EINTR) {
d62a17ae 1499 pthread_mutex_unlock(&m->mtx);
e3c9529e
QY
1500 /* loop around to signal handler */
1501 continue;
d62a17ae 1502 }
1503
e3c9529e 1504 /* else die */
450971aa 1505 flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
9ef9495e 1506 safe_strerror(errno));
e3c9529e
QY
1507 pthread_mutex_unlock(&m->mtx);
1508 fetch = NULL;
1509 break;
bca37d17 1510 }
d62a17ae 1511
1512 /* Post timers to ready queue. */
1513 monotime(&now);
1514 thread_process_timers(m->timer, &now);
1515
1516 /* Post I/O to ready queue. */
1517 if (num > 0)
1518 thread_process_io(m, num);
1519
d62a17ae 1520 pthread_mutex_unlock(&m->mtx);
1521
1522 } while (!thread && m->spin);
1523
1524 return fetch;
718e3744 1525}
1526
d62a17ae 1527static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
62f44022 1528{
d62a17ae 1529 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1530 + (a.tv_usec - b.tv_usec));
62f44022
QY
1531}
1532
d62a17ae 1533unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1534 unsigned long *cputime)
718e3744 1535{
d62a17ae 1536 /* This is 'user + sys' time. */
1537 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1538 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1539 return timeval_elapsed(now->real, start->real);
8b70d0b0 1540}
1541
50596be0
DS
1542/* We should aim to yield after yield milliseconds, which defaults
1543 to THREAD_YIELD_TIME_SLOT .
8b70d0b0 1544 Note: we are using real (wall clock) time for this calculation.
1545 It could be argued that CPU time may make more sense in certain
1546 contexts. The things to consider are whether the thread may have
1547 blocked (in which case wall time increases, but CPU time does not),
1548 or whether the system is heavily loaded with other processes competing
d62a17ae 1549 for CPU time. On balance, wall clock time seems to make sense.
8b70d0b0 1550 Plus it has the added benefit that gettimeofday should be faster
1551 than calling getrusage. */
d62a17ae 1552int thread_should_yield(struct thread *thread)
718e3744 1553{
d62a17ae 1554 int result;
1555 pthread_mutex_lock(&thread->mtx);
1556 {
1557 result = monotime_since(&thread->real, NULL)
1558 > (int64_t)thread->yield;
1559 }
1560 pthread_mutex_unlock(&thread->mtx);
1561 return result;
50596be0
DS
1562}
1563
d62a17ae 1564void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
50596be0 1565{
d62a17ae 1566 pthread_mutex_lock(&thread->mtx);
1567 {
1568 thread->yield = yield_time;
1569 }
1570 pthread_mutex_unlock(&thread->mtx);
718e3744 1571}
1572
d62a17ae 1573void thread_getrusage(RUSAGE_T *r)
db9c0df9 1574{
d62a17ae 1575 monotime(&r->real);
1576 getrusage(RUSAGE_SELF, &(r->cpu));
db9c0df9
PJ
1577}
1578
fbcac826
QY
1579/*
1580 * Call a thread.
1581 *
1582 * This function will atomically update the thread's usage history. At present
1583 * this is the only spot where usage history is written. Nevertheless the code
1584 * has been written such that the introduction of writers in the future should
1585 * not need to update it provided the writers atomically perform only the
1586 * operations done here, i.e. updating the total and maximum times. In
1587 * particular, the maximum real and cpu times must be monotonically increasing
1588 * or this code is not correct.
1589 */
d62a17ae 1590void thread_call(struct thread *thread)
718e3744 1591{
fbcac826
QY
1592 _Atomic unsigned long realtime, cputime;
1593 unsigned long exp;
1594 unsigned long helper;
d62a17ae 1595 RUSAGE_T before, after;
cc8b13a0 1596
d62a17ae 1597 GETRUSAGE(&before);
1598 thread->real = before.real;
718e3744 1599
d62a17ae 1600 pthread_setspecific(thread_current, thread);
1601 (*thread->func)(thread);
1602 pthread_setspecific(thread_current, NULL);
718e3744 1603
d62a17ae 1604 GETRUSAGE(&after);
718e3744 1605
fbcac826
QY
1606 realtime = thread_consumed_time(&after, &before, &helper);
1607 cputime = helper;
1608
1609 /* update realtime */
1610 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1611 memory_order_seq_cst);
1612 exp = atomic_load_explicit(&thread->hist->real.max,
1613 memory_order_seq_cst);
1614 while (exp < realtime
1615 && !atomic_compare_exchange_weak_explicit(
1616 &thread->hist->real.max, &exp, realtime,
1617 memory_order_seq_cst, memory_order_seq_cst))
1618 ;
1619
1620 /* update cputime */
1621 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1622 memory_order_seq_cst);
1623 exp = atomic_load_explicit(&thread->hist->cpu.max,
1624 memory_order_seq_cst);
1625 while (exp < cputime
1626 && !atomic_compare_exchange_weak_explicit(
1627 &thread->hist->cpu.max, &exp, cputime,
1628 memory_order_seq_cst, memory_order_seq_cst))
1629 ;
1630
1631 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1632 memory_order_seq_cst);
1633 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1634 memory_order_seq_cst);
718e3744 1635
924b9229 1636#ifdef CONSUMED_TIME_CHECK
d62a17ae 1637 if (realtime > CONSUMED_TIME_CHECK) {
1638 /*
1639 * We have a CPU Hog on our hands.
1640 * Whinge about it now, so we're aware this is yet another task
1641 * to fix.
1642 */
9ef9495e 1643 flog_warn(
450971aa 1644 EC_LIB_SLOW_THREAD,
d62a17ae 1645 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1646 thread->funcname, (unsigned long)thread->func,
1647 realtime / 1000, cputime / 1000);
1648 }
924b9229 1649#endif /* CONSUMED_TIME_CHECK */
718e3744 1650}
1651
1652/* Execute thread */
d62a17ae 1653void funcname_thread_execute(struct thread_master *m,
1654 int (*func)(struct thread *), void *arg, int val,
1655 debugargdef)
718e3744 1656{
c4345fbf 1657 struct thread *thread;
718e3744 1658
c4345fbf
RZ
1659 /* Get or allocate new thread to execute. */
1660 pthread_mutex_lock(&m->mtx);
1661 {
1662 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
9c7753e4 1663
c4345fbf
RZ
1664 /* Set its event value. */
1665 pthread_mutex_lock(&thread->mtx);
1666 {
1667 thread->add_type = THREAD_EXECUTE;
1668 thread->u.val = val;
1669 thread->ref = &thread;
1670 }
1671 pthread_mutex_unlock(&thread->mtx);
1672 }
1673 pthread_mutex_unlock(&m->mtx);
f7c62e11 1674
c4345fbf
RZ
1675 /* Execute thread doing all accounting. */
1676 thread_call(thread);
9c7753e4 1677
c4345fbf
RZ
1678 /* Give back or free thread. */
1679 thread_add_unuse(m, thread);
718e3744 1680}