]> git.proxmox.com Git - mirror_frr.git/blame - lib/thread.c
Merge pull request #4243 from mjstapp/fix_dplane_strlcpy
[mirror_frr.git] / lib / thread.c
CommitLineData
718e3744 1/* Thread management routine
2 * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 *
4 * This file is part of GNU Zebra.
5 *
6 * GNU Zebra is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * GNU Zebra is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
896014f4
DL
16 * You should have received a copy of the GNU General Public License along
17 * with this program; see the file COPYING; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
718e3744 19 */
20
21/* #define DEBUG */
22
23#include <zebra.h>
308d14ae 24#include <sys/resource.h>
718e3744 25
26#include "thread.h"
27#include "memory.h"
28#include "log.h"
e04ab74d 29#include "hash.h"
8390828d 30#include "pqueue.h"
e04ab74d 31#include "command.h"
05c447dd 32#include "sigevent.h"
3bf2673b 33#include "network.h"
bd74dc61 34#include "jhash.h"
fbcac826 35#include "frratomic.h"
9ef9495e 36#include "lib_errors.h"
d6be5fb9 37
d62a17ae 38DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
4a1ab8e4 39DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
a6f235f3 40DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
d62a17ae 41DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
4a1ab8e4 42
c284542b
DL
43DECLARE_LIST(thread_list, struct thread, threaditem)
44
3b96b781
HT
45#if defined(__APPLE__)
46#include <mach/mach.h>
47#include <mach/mach_time.h>
48#endif
49
d62a17ae 50#define AWAKEN(m) \
51 do { \
52 static unsigned char wakebyte = 0x01; \
53 write(m->io_pipe[1], &wakebyte, 1); \
54 } while (0);
3bf2673b 55
62f44022 56/* control variable for initializer */
c17faa4b 57static pthread_once_t init_once = PTHREAD_ONCE_INIT;
e0bebc7c 58pthread_key_t thread_current;
6b0655a2 59
c17faa4b 60static pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
62f44022
QY
61static struct list *masters;
62
6655966d 63static void thread_free(struct thread_master *master, struct thread *thread);
6b0655a2 64
62f44022 65/* CLI start ---------------------------------------------------------------- */
d62a17ae 66static unsigned int cpu_record_hash_key(struct cpu_thread_history *a)
e04ab74d 67{
883cc51d 68 int size = sizeof(a->func);
bd74dc61
DS
69
70 return jhash(&a->func, size, 0);
e04ab74d 71}
72
74df8d6d 73static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
d62a17ae 74 const struct cpu_thread_history *b)
e04ab74d 75{
d62a17ae 76 return a->func == b->func;
e04ab74d 77}
78
d62a17ae 79static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
e04ab74d 80{
d62a17ae 81 struct cpu_thread_history *new;
82 new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
83 new->func = a->func;
84 new->funcname = a->funcname;
85 return new;
e04ab74d 86}
87
d62a17ae 88static void cpu_record_hash_free(void *a)
228da428 89{
d62a17ae 90 struct cpu_thread_history *hist = a;
91
92 XFREE(MTYPE_THREAD_STATS, hist);
228da428
CC
93}
94
d62a17ae 95static void vty_out_cpu_thread_history(struct vty *vty,
96 struct cpu_thread_history *a)
e04ab74d 97{
fa0069c6
DS
98 vty_out(vty, "%5zu %10zu.%03lu %9zu %8zu %9zu %8lu %9lu",
99 a->total_active, a->cpu.total / 1000, a->cpu.total % 1000,
100 a->total_calls, a->cpu.total / a->total_calls, a->cpu.max,
d62a17ae 101 a->real.total / a->total_calls, a->real.max);
102 vty_out(vty, " %c%c%c%c%c %s\n",
103 a->types & (1 << THREAD_READ) ? 'R' : ' ',
104 a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
105 a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
106 a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
107 a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
e04ab74d 108}
109
e3b78da8 110static void cpu_record_hash_print(struct hash_bucket *bucket, void *args[])
e04ab74d 111{
d62a17ae 112 struct cpu_thread_history *totals = args[0];
fbcac826 113 struct cpu_thread_history copy;
d62a17ae 114 struct vty *vty = args[1];
fbcac826 115 uint8_t *filter = args[2];
d62a17ae 116
117 struct cpu_thread_history *a = bucket->data;
118
fbcac826
QY
119 copy.total_active =
120 atomic_load_explicit(&a->total_active, memory_order_seq_cst);
121 copy.total_calls =
122 atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
123 copy.cpu.total =
124 atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
125 copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
126 copy.real.total =
127 atomic_load_explicit(&a->real.total, memory_order_seq_cst);
128 copy.real.max =
129 atomic_load_explicit(&a->real.max, memory_order_seq_cst);
130 copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
131 copy.funcname = a->funcname;
132
133 if (!(copy.types & *filter))
d62a17ae 134 return;
fbcac826
QY
135
136 vty_out_cpu_thread_history(vty, &copy);
137 totals->total_active += copy.total_active;
138 totals->total_calls += copy.total_calls;
139 totals->real.total += copy.real.total;
140 if (totals->real.max < copy.real.max)
141 totals->real.max = copy.real.max;
142 totals->cpu.total += copy.cpu.total;
143 if (totals->cpu.max < copy.cpu.max)
144 totals->cpu.max = copy.cpu.max;
e04ab74d 145}
146
fbcac826 147static void cpu_record_print(struct vty *vty, uint8_t filter)
e04ab74d 148{
d62a17ae 149 struct cpu_thread_history tmp;
150 void *args[3] = {&tmp, vty, &filter};
151 struct thread_master *m;
152 struct listnode *ln;
153
154 memset(&tmp, 0, sizeof tmp);
155 tmp.funcname = "TOTAL";
156 tmp.types = filter;
157
158 pthread_mutex_lock(&masters_mtx);
159 {
160 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
161 const char *name = m->name ? m->name : "main";
162
163 char underline[strlen(name) + 1];
164 memset(underline, '-', sizeof(underline));
4f113d60 165 underline[sizeof(underline) - 1] = '\0';
d62a17ae 166
167 vty_out(vty, "\n");
168 vty_out(vty, "Showing statistics for pthread %s\n",
169 name);
170 vty_out(vty, "-------------------------------%s\n",
171 underline);
172 vty_out(vty, "%21s %18s %18s\n", "",
173 "CPU (user+system):", "Real (wall-clock):");
174 vty_out(vty,
175 "Active Runtime(ms) Invoked Avg uSec Max uSecs");
176 vty_out(vty, " Avg uSec Max uSecs");
177 vty_out(vty, " Type Thread\n");
178
179 if (m->cpu_record->count)
180 hash_iterate(
181 m->cpu_record,
e3b78da8 182 (void (*)(struct hash_bucket *,
d62a17ae 183 void *))cpu_record_hash_print,
184 args);
185 else
186 vty_out(vty, "No data to display yet.\n");
187
188 vty_out(vty, "\n");
189 }
190 }
191 pthread_mutex_unlock(&masters_mtx);
192
193 vty_out(vty, "\n");
194 vty_out(vty, "Total thread statistics\n");
195 vty_out(vty, "-------------------------\n");
196 vty_out(vty, "%21s %18s %18s\n", "",
197 "CPU (user+system):", "Real (wall-clock):");
198 vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
199 vty_out(vty, " Avg uSec Max uSecs");
200 vty_out(vty, " Type Thread\n");
201
202 if (tmp.total_calls > 0)
203 vty_out_cpu_thread_history(vty, &tmp);
e04ab74d 204}
205
e3b78da8 206static void cpu_record_hash_clear(struct hash_bucket *bucket, void *args[])
e276eb82 207{
fbcac826 208 uint8_t *filter = args[0];
d62a17ae 209 struct hash *cpu_record = args[1];
210
211 struct cpu_thread_history *a = bucket->data;
62f44022 212
d62a17ae 213 if (!(a->types & *filter))
214 return;
f48f65d2 215
d62a17ae 216 hash_release(cpu_record, bucket->data);
e276eb82
PJ
217}
218
fbcac826 219static void cpu_record_clear(uint8_t filter)
e276eb82 220{
fbcac826 221 uint8_t *tmp = &filter;
d62a17ae 222 struct thread_master *m;
223 struct listnode *ln;
224
225 pthread_mutex_lock(&masters_mtx);
226 {
227 for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
228 pthread_mutex_lock(&m->mtx);
229 {
230 void *args[2] = {tmp, m->cpu_record};
231 hash_iterate(
232 m->cpu_record,
e3b78da8 233 (void (*)(struct hash_bucket *,
d62a17ae 234 void *))cpu_record_hash_clear,
235 args);
236 }
237 pthread_mutex_unlock(&m->mtx);
238 }
239 }
240 pthread_mutex_unlock(&masters_mtx);
62f44022
QY
241}
242
fbcac826 243static uint8_t parse_filter(const char *filterstr)
62f44022 244{
d62a17ae 245 int i = 0;
246 int filter = 0;
247
248 while (filterstr[i] != '\0') {
249 switch (filterstr[i]) {
250 case 'r':
251 case 'R':
252 filter |= (1 << THREAD_READ);
253 break;
254 case 'w':
255 case 'W':
256 filter |= (1 << THREAD_WRITE);
257 break;
258 case 't':
259 case 'T':
260 filter |= (1 << THREAD_TIMER);
261 break;
262 case 'e':
263 case 'E':
264 filter |= (1 << THREAD_EVENT);
265 break;
266 case 'x':
267 case 'X':
268 filter |= (1 << THREAD_EXECUTE);
269 break;
270 default:
271 break;
272 }
273 ++i;
274 }
275 return filter;
62f44022
QY
276}
277
278DEFUN (show_thread_cpu,
279 show_thread_cpu_cmd,
280 "show thread cpu [FILTER]",
281 SHOW_STR
282 "Thread information\n"
283 "Thread CPU usage\n"
284 "Display filter (rwtexb)\n")
285{
fbcac826 286 uint8_t filter = (uint8_t)-1U;
d62a17ae 287 int idx = 0;
288
289 if (argv_find(argv, argc, "FILTER", &idx)) {
290 filter = parse_filter(argv[idx]->arg);
291 if (!filter) {
292 vty_out(vty,
293 "Invalid filter \"%s\" specified; must contain at least"
294 "one of 'RWTEXB'\n",
295 argv[idx]->arg);
296 return CMD_WARNING;
297 }
298 }
299
300 cpu_record_print(vty, filter);
301 return CMD_SUCCESS;
e276eb82
PJ
302}
303
8872626b
DS
304static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
305{
306 const char *name = m->name ? m->name : "main";
307 char underline[strlen(name) + 1];
308 uint32_t i;
309
310 memset(underline, '-', sizeof(underline));
311 underline[sizeof(underline) - 1] = '\0';
312
313 vty_out(vty, "\nShowing poll FD's for %s\n", name);
314 vty_out(vty, "----------------------%s\n", underline);
315 vty_out(vty, "Count: %u\n", (uint32_t)m->handler.pfdcount);
316 for (i = 0; i < m->handler.pfdcount; i++)
317 vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\n", i,
318 m->handler.pfds[i].fd,
319 m->handler.pfds[i].events,
320 m->handler.pfds[i].revents);
321}
322
323DEFUN (show_thread_poll,
324 show_thread_poll_cmd,
325 "show thread poll",
326 SHOW_STR
327 "Thread information\n"
328 "Show poll FD's and information\n")
329{
330 struct listnode *node;
331 struct thread_master *m;
332
333 pthread_mutex_lock(&masters_mtx);
334 {
335 for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
336 show_thread_poll_helper(vty, m);
337 }
338 }
339 pthread_mutex_unlock(&masters_mtx);
340
341 return CMD_SUCCESS;
342}
343
344
49d41a26
DS
345DEFUN (clear_thread_cpu,
346 clear_thread_cpu_cmd,
347 "clear thread cpu [FILTER]",
62f44022 348 "Clear stored data in all pthreads\n"
49d41a26
DS
349 "Thread information\n"
350 "Thread CPU usage\n"
351 "Display filter (rwtexb)\n")
e276eb82 352{
fbcac826 353 uint8_t filter = (uint8_t)-1U;
d62a17ae 354 int idx = 0;
355
356 if (argv_find(argv, argc, "FILTER", &idx)) {
357 filter = parse_filter(argv[idx]->arg);
358 if (!filter) {
359 vty_out(vty,
360 "Invalid filter \"%s\" specified; must contain at least"
361 "one of 'RWTEXB'\n",
362 argv[idx]->arg);
363 return CMD_WARNING;
364 }
365 }
366
367 cpu_record_clear(filter);
368 return CMD_SUCCESS;
e276eb82 369}
6b0655a2 370
d62a17ae 371void thread_cmd_init(void)
0b84f294 372{
d62a17ae 373 install_element(VIEW_NODE, &show_thread_cpu_cmd);
8872626b 374 install_element(VIEW_NODE, &show_thread_poll_cmd);
d62a17ae 375 install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
0b84f294 376}
62f44022
QY
377/* CLI end ------------------------------------------------------------------ */
378
0b84f294 379
8390828d
DL
380static int thread_timer_cmp(void *a, void *b)
381{
382 struct thread *thread_a = a;
383 struct thread *thread_b = b;
384
385 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, <))
386 return -1;
387 if (timercmp(&thread_a->u.sands, &thread_b->u.sands, >))
388 return 1;
389 return 0;
390}
391
392static void thread_timer_update(void *node, int actual_position)
393{
394 struct thread *thread = node;
395
396 thread->index = actual_position;
397}
398
d62a17ae 399static void cancelreq_del(void *cr)
63ccb9cb 400{
d62a17ae 401 XFREE(MTYPE_TMP, cr);
63ccb9cb
QY
402}
403
e0bebc7c 404/* initializer, only ever called once */
4d762f26 405static void initializer(void)
e0bebc7c 406{
d62a17ae 407 pthread_key_create(&thread_current, NULL);
e0bebc7c
QY
408}
409
d62a17ae 410struct thread_master *thread_master_create(const char *name)
718e3744 411{
d62a17ae 412 struct thread_master *rv;
413 struct rlimit limit;
414
415 pthread_once(&init_once, &initializer);
416
417 rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
d62a17ae 418
419 /* Initialize master mutex */
420 pthread_mutex_init(&rv->mtx, NULL);
421 pthread_cond_init(&rv->cancel_cond, NULL);
422
423 /* Set name */
424 rv->name = name ? XSTRDUP(MTYPE_THREAD_MASTER, name) : NULL;
425
426 /* Initialize I/O task data structures */
427 getrlimit(RLIMIT_NOFILE, &limit);
428 rv->fd_limit = (int)limit.rlim_cur;
a6f235f3
DS
429 rv->read = XCALLOC(MTYPE_THREAD_POLL,
430 sizeof(struct thread *) * rv->fd_limit);
431
432 rv->write = XCALLOC(MTYPE_THREAD_POLL,
433 sizeof(struct thread *) * rv->fd_limit);
d62a17ae 434
bd74dc61 435 rv->cpu_record = hash_create_size(
996c9314 436 8, (unsigned int (*)(void *))cpu_record_hash_key,
74df8d6d 437 (bool (*)(const void *, const void *))cpu_record_hash_cmp,
bd74dc61 438 "Thread Hash");
d62a17ae 439
c284542b
DL
440 thread_list_init(&rv->event);
441 thread_list_init(&rv->ready);
442 thread_list_init(&rv->unuse);
8390828d
DL
443
444 /* Initialize the timer queues */
445 rv->timer = pqueue_create();
446 rv->timer->cmp = thread_timer_cmp;
447 rv->timer->update = thread_timer_update;
d62a17ae 448
449 /* Initialize thread_fetch() settings */
450 rv->spin = true;
451 rv->handle_signals = true;
452
453 /* Set pthread owner, should be updated by actual owner */
454 rv->owner = pthread_self();
455 rv->cancel_req = list_new();
456 rv->cancel_req->del = cancelreq_del;
457 rv->canceled = true;
458
459 /* Initialize pipe poker */
460 pipe(rv->io_pipe);
461 set_nonblocking(rv->io_pipe[0]);
462 set_nonblocking(rv->io_pipe[1]);
463
464 /* Initialize data structures for poll() */
465 rv->handler.pfdsize = rv->fd_limit;
466 rv->handler.pfdcount = 0;
467 rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
468 sizeof(struct pollfd) * rv->handler.pfdsize);
469 rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
470 sizeof(struct pollfd) * rv->handler.pfdsize);
471
eff09c66 472 /* add to list of threadmasters */
d62a17ae 473 pthread_mutex_lock(&masters_mtx);
474 {
eff09c66
QY
475 if (!masters)
476 masters = list_new();
477
d62a17ae 478 listnode_add(masters, rv);
479 }
480 pthread_mutex_unlock(&masters_mtx);
481
482 return rv;
718e3744 483}
484
d8a8a8de
QY
485void thread_master_set_name(struct thread_master *master, const char *name)
486{
487 pthread_mutex_lock(&master->mtx);
488 {
0a22ddfb 489 XFREE(MTYPE_THREAD_MASTER, master->name);
d8a8a8de
QY
490 master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
491 }
492 pthread_mutex_unlock(&master->mtx);
493}
494
6ed04aa2
DS
495#define THREAD_UNUSED_DEPTH 10
496
718e3744 497/* Move thread to unuse list. */
d62a17ae 498static void thread_add_unuse(struct thread_master *m, struct thread *thread)
718e3744 499{
6655966d
RZ
500 pthread_mutex_t mtxc = thread->mtx;
501
d62a17ae 502 assert(m != NULL && thread != NULL);
d62a17ae 503
d62a17ae 504 thread->hist->total_active--;
6ed04aa2
DS
505 memset(thread, 0, sizeof(struct thread));
506 thread->type = THREAD_UNUSED;
507
6655966d
RZ
508 /* Restore the thread mutex context. */
509 thread->mtx = mtxc;
510
c284542b
DL
511 if (thread_list_count(&m->unuse) < THREAD_UNUSED_DEPTH) {
512 thread_list_add_tail(&m->unuse, thread);
6655966d
RZ
513 return;
514 }
515
516 thread_free(m, thread);
718e3744 517}
518
519/* Free all unused thread. */
c284542b
DL
520static void thread_list_free(struct thread_master *m,
521 struct thread_list_head *list)
718e3744 522{
d62a17ae 523 struct thread *t;
d62a17ae 524
c284542b 525 while ((t = thread_list_pop(list)))
6655966d 526 thread_free(m, t);
718e3744 527}
528
d62a17ae 529static void thread_array_free(struct thread_master *m,
530 struct thread **thread_array)
308d14ae 531{
d62a17ae 532 struct thread *t;
533 int index;
534
535 for (index = 0; index < m->fd_limit; ++index) {
536 t = thread_array[index];
537 if (t) {
538 thread_array[index] = NULL;
6655966d 539 thread_free(m, t);
d62a17ae 540 }
541 }
a6f235f3 542 XFREE(MTYPE_THREAD_POLL, thread_array);
308d14ae
DV
543}
544
8390828d
DL
545static void thread_queue_free(struct thread_master *m, struct pqueue *queue)
546{
547 int i;
548
549 for (i = 0; i < queue->size; i++)
550 thread_free(m, queue->array[i]);
551
552 pqueue_delete(queue);
553}
554
495f0b13
DS
555/*
556 * thread_master_free_unused
557 *
558 * As threads are finished with they are put on the
559 * unuse list for later reuse.
560 * If we are shutting down, Free up unused threads
561 * So we can see if we forget to shut anything off
562 */
d62a17ae 563void thread_master_free_unused(struct thread_master *m)
495f0b13 564{
d62a17ae 565 pthread_mutex_lock(&m->mtx);
566 {
567 struct thread *t;
c284542b 568 while ((t = thread_list_pop(&m->unuse)))
6655966d 569 thread_free(m, t);
d62a17ae 570 }
571 pthread_mutex_unlock(&m->mtx);
495f0b13
DS
572}
573
718e3744 574/* Stop thread scheduler. */
d62a17ae 575void thread_master_free(struct thread_master *m)
718e3744 576{
d62a17ae 577 pthread_mutex_lock(&masters_mtx);
578 {
579 listnode_delete(masters, m);
eff09c66 580 if (masters->count == 0) {
6a154c88 581 list_delete(&masters);
eff09c66 582 }
d62a17ae 583 }
584 pthread_mutex_unlock(&masters_mtx);
585
586 thread_array_free(m, m->read);
587 thread_array_free(m, m->write);
8390828d 588 thread_queue_free(m, m->timer);
d62a17ae 589 thread_list_free(m, &m->event);
590 thread_list_free(m, &m->ready);
591 thread_list_free(m, &m->unuse);
592 pthread_mutex_destroy(&m->mtx);
33844bbe 593 pthread_cond_destroy(&m->cancel_cond);
d62a17ae 594 close(m->io_pipe[0]);
595 close(m->io_pipe[1]);
6a154c88 596 list_delete(&m->cancel_req);
1a0a92ea 597 m->cancel_req = NULL;
d62a17ae 598
599 hash_clean(m->cpu_record, cpu_record_hash_free);
600 hash_free(m->cpu_record);
601 m->cpu_record = NULL;
602
0a22ddfb 603 XFREE(MTYPE_THREAD_MASTER, m->name);
d62a17ae 604 XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
605 XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
606 XFREE(MTYPE_THREAD_MASTER, m);
718e3744 607}
608
78ca0342
CF
609/* Return remain time in miliseconds. */
610unsigned long thread_timer_remain_msec(struct thread *thread)
718e3744 611{
d62a17ae 612 int64_t remain;
1189d95f 613
d62a17ae 614 pthread_mutex_lock(&thread->mtx);
615 {
78ca0342 616 remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
d62a17ae 617 }
618 pthread_mutex_unlock(&thread->mtx);
1189d95f 619
d62a17ae 620 return remain < 0 ? 0 : remain;
718e3744 621}
622
78ca0342
CF
623/* Return remain time in seconds. */
624unsigned long thread_timer_remain_second(struct thread *thread)
625{
626 return thread_timer_remain_msec(thread) / 1000LL;
627}
628
9c7753e4
DL
629#define debugargdef const char *funcname, const char *schedfrom, int fromln
630#define debugargpass funcname, schedfrom, fromln
e04ab74d 631
d62a17ae 632struct timeval thread_timer_remain(struct thread *thread)
6ac44687 633{
d62a17ae 634 struct timeval remain;
635 pthread_mutex_lock(&thread->mtx);
636 {
637 monotime_until(&thread->u.sands, &remain);
638 }
639 pthread_mutex_unlock(&thread->mtx);
640 return remain;
6ac44687
CF
641}
642
718e3744 643/* Get new thread. */
d7c0a89a 644static struct thread *thread_get(struct thread_master *m, uint8_t type,
d62a17ae 645 int (*func)(struct thread *), void *arg,
646 debugargdef)
718e3744 647{
c284542b 648 struct thread *thread = thread_list_pop(&m->unuse);
d62a17ae 649 struct cpu_thread_history tmp;
650
651 if (!thread) {
652 thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
653 /* mutex only needs to be initialized at struct creation. */
654 pthread_mutex_init(&thread->mtx, NULL);
655 m->alloc++;
656 }
657
658 thread->type = type;
659 thread->add_type = type;
660 thread->master = m;
661 thread->arg = arg;
8390828d 662 thread->index = -1;
d62a17ae 663 thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
664 thread->ref = NULL;
665
666 /*
667 * So if the passed in funcname is not what we have
668 * stored that means the thread->hist needs to be
669 * updated. We keep the last one around in unused
670 * under the assumption that we are probably
671 * going to immediately allocate the same
672 * type of thread.
673 * This hopefully saves us some serious
674 * hash_get lookups.
675 */
676 if (thread->funcname != funcname || thread->func != func) {
677 tmp.func = func;
678 tmp.funcname = funcname;
679 thread->hist =
680 hash_get(m->cpu_record, &tmp,
681 (void *(*)(void *))cpu_record_hash_alloc);
682 }
683 thread->hist->total_active++;
684 thread->func = func;
685 thread->funcname = funcname;
686 thread->schedfrom = schedfrom;
687 thread->schedfrom_line = fromln;
688
689 return thread;
718e3744 690}
691
6655966d
RZ
692static void thread_free(struct thread_master *master, struct thread *thread)
693{
694 /* Update statistics. */
695 assert(master->alloc > 0);
696 master->alloc--;
697
698 /* Free allocated resources. */
699 pthread_mutex_destroy(&thread->mtx);
700 XFREE(MTYPE_THREAD, thread);
701}
702
d62a17ae 703static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
704 nfds_t count, const struct timeval *timer_wait)
209a72a6 705{
d62a17ae 706 /* If timer_wait is null here, that means poll() should block
707 * indefinitely,
708 * unless the thread_master has overriden it by setting
709 * ->selectpoll_timeout.
710 * If the value is positive, it specifies the maximum number of
711 * milliseconds
712 * to wait. If the timeout is -1, it specifies that we should never wait
713 * and
714 * always return immediately even if no event is detected. If the value
715 * is
716 * zero, the behavior is default. */
717 int timeout = -1;
718
719 /* number of file descriptors with events */
720 int num;
721
722 if (timer_wait != NULL
723 && m->selectpoll_timeout == 0) // use the default value
724 timeout = (timer_wait->tv_sec * 1000)
725 + (timer_wait->tv_usec / 1000);
726 else if (m->selectpoll_timeout > 0) // use the user's timeout
727 timeout = m->selectpoll_timeout;
728 else if (m->selectpoll_timeout
729 < 0) // effect a poll (return immediately)
730 timeout = 0;
731
732 /* add poll pipe poker */
733 assert(count + 1 < pfdsize);
734 pfds[count].fd = m->io_pipe[0];
735 pfds[count].events = POLLIN;
736 pfds[count].revents = 0x00;
737
738 num = poll(pfds, count + 1, timeout);
739
740 unsigned char trash[64];
741 if (num > 0 && pfds[count].revents != 0 && num--)
742 while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
743 ;
744
745 return num;
209a72a6
DS
746}
747
718e3744 748/* Add new read thread. */
d62a17ae 749struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
750 int (*func)(struct thread *),
751 void *arg, int fd,
752 struct thread **t_ptr,
753 debugargdef)
718e3744 754{
d62a17ae 755 struct thread *thread = NULL;
756
9b864cd3 757 assert(fd >= 0 && fd < m->fd_limit);
d62a17ae 758 pthread_mutex_lock(&m->mtx);
759 {
760 if (t_ptr
761 && *t_ptr) // thread is already scheduled; don't reschedule
762 {
763 pthread_mutex_unlock(&m->mtx);
764 return NULL;
765 }
766
767 /* default to a new pollfd */
768 nfds_t queuepos = m->handler.pfdcount;
769
770 /* if we already have a pollfd for our file descriptor, find and
771 * use it */
772 for (nfds_t i = 0; i < m->handler.pfdcount; i++)
773 if (m->handler.pfds[i].fd == fd) {
774 queuepos = i;
775 break;
776 }
777
778 /* make sure we have room for this fd + pipe poker fd */
779 assert(queuepos + 1 < m->handler.pfdsize);
780
781 thread = thread_get(m, dir, func, arg, debugargpass);
782
783 m->handler.pfds[queuepos].fd = fd;
784 m->handler.pfds[queuepos].events |=
785 (dir == THREAD_READ ? POLLIN : POLLOUT);
786
787 if (queuepos == m->handler.pfdcount)
788 m->handler.pfdcount++;
789
790 if (thread) {
791 pthread_mutex_lock(&thread->mtx);
792 {
793 thread->u.fd = fd;
794 if (dir == THREAD_READ)
795 m->read[thread->u.fd] = thread;
796 else
797 m->write[thread->u.fd] = thread;
798 }
799 pthread_mutex_unlock(&thread->mtx);
800
801 if (t_ptr) {
802 *t_ptr = thread;
803 thread->ref = t_ptr;
804 }
805 }
806
807 AWAKEN(m);
808 }
809 pthread_mutex_unlock(&m->mtx);
810
811 return thread;
718e3744 812}
813
56a94b36 814static struct thread *
d62a17ae 815funcname_thread_add_timer_timeval(struct thread_master *m,
816 int (*func)(struct thread *), int type,
817 void *arg, struct timeval *time_relative,
818 struct thread **t_ptr, debugargdef)
718e3744 819{
d62a17ae 820 struct thread *thread;
8390828d 821 struct pqueue *queue;
d62a17ae 822
823 assert(m != NULL);
824
825 assert(type == THREAD_TIMER);
826 assert(time_relative);
827
828 pthread_mutex_lock(&m->mtx);
829 {
830 if (t_ptr
831 && *t_ptr) // thread is already scheduled; don't reschedule
832 {
833 pthread_mutex_unlock(&m->mtx);
834 return NULL;
835 }
836
8390828d 837 queue = m->timer;
d62a17ae 838 thread = thread_get(m, type, func, arg, debugargpass);
839
840 pthread_mutex_lock(&thread->mtx);
841 {
842 monotime(&thread->u.sands);
843 timeradd(&thread->u.sands, time_relative,
844 &thread->u.sands);
8390828d 845 pqueue_enqueue(thread, queue);
d62a17ae 846 if (t_ptr) {
847 *t_ptr = thread;
848 thread->ref = t_ptr;
849 }
850 }
851 pthread_mutex_unlock(&thread->mtx);
852
853 AWAKEN(m);
854 }
855 pthread_mutex_unlock(&m->mtx);
856
857 return thread;
9e867fe6 858}
859
98c91ac6 860
861/* Add timer event thread. */
d62a17ae 862struct thread *funcname_thread_add_timer(struct thread_master *m,
863 int (*func)(struct thread *),
864 void *arg, long timer,
865 struct thread **t_ptr, debugargdef)
9e867fe6 866{
d62a17ae 867 struct timeval trel;
9e867fe6 868
d62a17ae 869 assert(m != NULL);
9e867fe6 870
d62a17ae 871 trel.tv_sec = timer;
872 trel.tv_usec = 0;
9e867fe6 873
d62a17ae 874 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
875 &trel, t_ptr, debugargpass);
98c91ac6 876}
9e867fe6 877
98c91ac6 878/* Add timer event thread with "millisecond" resolution */
d62a17ae 879struct thread *funcname_thread_add_timer_msec(struct thread_master *m,
880 int (*func)(struct thread *),
881 void *arg, long timer,
882 struct thread **t_ptr,
883 debugargdef)
98c91ac6 884{
d62a17ae 885 struct timeval trel;
9e867fe6 886
d62a17ae 887 assert(m != NULL);
718e3744 888
d62a17ae 889 trel.tv_sec = timer / 1000;
890 trel.tv_usec = 1000 * (timer % 1000);
98c91ac6 891
d62a17ae 892 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg,
893 &trel, t_ptr, debugargpass);
a48b4e6d 894}
895
d03c4cbd 896/* Add timer event thread with "millisecond" resolution */
d62a17ae 897struct thread *funcname_thread_add_timer_tv(struct thread_master *m,
898 int (*func)(struct thread *),
899 void *arg, struct timeval *tv,
900 struct thread **t_ptr, debugargdef)
d03c4cbd 901{
d62a17ae 902 return funcname_thread_add_timer_timeval(m, func, THREAD_TIMER, arg, tv,
903 t_ptr, debugargpass);
d03c4cbd
DL
904}
905
718e3744 906/* Add simple event thread. */
d62a17ae 907struct thread *funcname_thread_add_event(struct thread_master *m,
908 int (*func)(struct thread *),
909 void *arg, int val,
910 struct thread **t_ptr, debugargdef)
718e3744 911{
d62a17ae 912 struct thread *thread;
913
914 assert(m != NULL);
915
916 pthread_mutex_lock(&m->mtx);
917 {
918 if (t_ptr
919 && *t_ptr) // thread is already scheduled; don't reschedule
920 {
921 pthread_mutex_unlock(&m->mtx);
922 return NULL;
923 }
924
925 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
926 pthread_mutex_lock(&thread->mtx);
927 {
928 thread->u.val = val;
c284542b 929 thread_list_add_tail(&m->event, thread);
d62a17ae 930 }
931 pthread_mutex_unlock(&thread->mtx);
932
933 if (t_ptr) {
934 *t_ptr = thread;
935 thread->ref = t_ptr;
936 }
937
938 AWAKEN(m);
939 }
940 pthread_mutex_unlock(&m->mtx);
941
942 return thread;
718e3744 943}
944
63ccb9cb
QY
945/* Thread cancellation ------------------------------------------------------ */
946
8797240e
QY
947/**
948 * NOT's out the .events field of pollfd corresponding to the given file
949 * descriptor. The event to be NOT'd is passed in the 'state' parameter.
950 *
951 * This needs to happen for both copies of pollfd's. See 'thread_fetch'
952 * implementation for details.
953 *
954 * @param master
955 * @param fd
956 * @param state the event to cancel. One or more (OR'd together) of the
957 * following:
958 * - POLLIN
959 * - POLLOUT
960 */
d62a17ae 961static void thread_cancel_rw(struct thread_master *master, int fd, short state)
0a95a0d0 962{
42d74538
QY
963 bool found = false;
964
d62a17ae 965 /* Cancel POLLHUP too just in case some bozo set it */
966 state |= POLLHUP;
967
968 /* find the index of corresponding pollfd */
969 nfds_t i;
970
971 for (i = 0; i < master->handler.pfdcount; i++)
42d74538
QY
972 if (master->handler.pfds[i].fd == fd) {
973 found = true;
d62a17ae 974 break;
42d74538
QY
975 }
976
977 if (!found) {
978 zlog_debug(
979 "[!] Received cancellation request for nonexistent rw job");
980 zlog_debug("[!] threadmaster: %s | fd: %d",
996c9314 981 master->name ? master->name : "", fd);
42d74538
QY
982 return;
983 }
d62a17ae 984
985 /* NOT out event. */
986 master->handler.pfds[i].events &= ~(state);
987
988 /* If all events are canceled, delete / resize the pollfd array. */
989 if (master->handler.pfds[i].events == 0) {
990 memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
991 (master->handler.pfdcount - i - 1)
992 * sizeof(struct pollfd));
993 master->handler.pfdcount--;
994 }
995
996 /* If we have the same pollfd in the copy, perform the same operations,
997 * otherwise return. */
998 if (i >= master->handler.copycount)
999 return;
1000
1001 master->handler.copy[i].events &= ~(state);
1002
1003 if (master->handler.copy[i].events == 0) {
1004 memmove(master->handler.copy + i, master->handler.copy + i + 1,
1005 (master->handler.copycount - i - 1)
1006 * sizeof(struct pollfd));
1007 master->handler.copycount--;
1008 }
0a95a0d0
DS
1009}
1010
1189d95f 1011/**
63ccb9cb 1012 * Process cancellation requests.
1189d95f 1013 *
63ccb9cb
QY
1014 * This may only be run from the pthread which owns the thread_master.
1015 *
1016 * @param master the thread master to process
1017 * @REQUIRE master->mtx
1189d95f 1018 */
d62a17ae 1019static void do_thread_cancel(struct thread_master *master)
718e3744 1020{
c284542b 1021 struct thread_list_head *list = NULL;
8390828d 1022 struct pqueue *queue = NULL;
d62a17ae 1023 struct thread **thread_array = NULL;
1024 struct thread *thread;
1025
1026 struct cancel_req *cr;
1027 struct listnode *ln;
1028 for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1029 /* If this is an event object cancellation, linear search
1030 * through event
1031 * list deleting any events which have the specified argument.
1032 * We also
1033 * need to check every thread in the ready queue. */
1034 if (cr->eventobj) {
1035 struct thread *t;
c284542b
DL
1036
1037 for_each_safe(thread_list, &master->event, t) {
1038 if (t->arg != cr->eventobj)
1039 continue;
1040 thread_list_del(&master->event, t);
1041 if (t->ref)
1042 *t->ref = NULL;
1043 thread_add_unuse(master, t);
d62a17ae 1044 }
1045
c284542b
DL
1046 for_each_safe(thread_list, &master->ready, t) {
1047 if (t->arg != cr->eventobj)
1048 continue;
1049 thread_list_del(&master->ready, t);
1050 if (t->ref)
1051 *t->ref = NULL;
1052 thread_add_unuse(master, t);
d62a17ae 1053 }
1054 continue;
1055 }
1056
1057 /* The pointer varies depending on whether the cancellation
1058 * request was
1059 * made asynchronously or not. If it was, we need to check
1060 * whether the
1061 * thread even exists anymore before cancelling it. */
1062 thread = (cr->thread) ? cr->thread : *cr->threadref;
1063
1064 if (!thread)
1065 continue;
1066
1067 /* Determine the appropriate queue to cancel the thread from */
1068 switch (thread->type) {
1069 case THREAD_READ:
1070 thread_cancel_rw(master, thread->u.fd, POLLIN);
1071 thread_array = master->read;
1072 break;
1073 case THREAD_WRITE:
1074 thread_cancel_rw(master, thread->u.fd, POLLOUT);
1075 thread_array = master->write;
1076 break;
1077 case THREAD_TIMER:
8390828d 1078 queue = master->timer;
d62a17ae 1079 break;
1080 case THREAD_EVENT:
1081 list = &master->event;
1082 break;
1083 case THREAD_READY:
1084 list = &master->ready;
1085 break;
1086 default:
1087 continue;
1088 break;
1089 }
1090
8390828d
DL
1091 if (queue) {
1092 assert(thread->index >= 0);
1093 assert(thread == queue->array[thread->index]);
1094 pqueue_remove_at(thread->index, queue);
1095 } else if (list) {
c284542b 1096 thread_list_del(list, thread);
d62a17ae 1097 } else if (thread_array) {
1098 thread_array[thread->u.fd] = NULL;
8390828d
DL
1099 } else {
1100 assert(!"Thread should be either in queue or list or array!");
d62a17ae 1101 }
1102
1103 if (thread->ref)
1104 *thread->ref = NULL;
1105
1106 thread_add_unuse(thread->master, thread);
1107 }
1108
1109 /* Delete and free all cancellation requests */
1110 list_delete_all_node(master->cancel_req);
1111
1112 /* Wake up any threads which may be blocked in thread_cancel_async() */
1113 master->canceled = true;
1114 pthread_cond_broadcast(&master->cancel_cond);
718e3744 1115}
1116
63ccb9cb
QY
1117/**
1118 * Cancel any events which have the specified argument.
1119 *
1120 * MT-Unsafe
1121 *
1122 * @param m the thread_master to cancel from
1123 * @param arg the argument passed when creating the event
1124 */
d62a17ae 1125void thread_cancel_event(struct thread_master *master, void *arg)
718e3744 1126{
d62a17ae 1127 assert(master->owner == pthread_self());
1128
1129 pthread_mutex_lock(&master->mtx);
1130 {
1131 struct cancel_req *cr =
1132 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1133 cr->eventobj = arg;
1134 listnode_add(master->cancel_req, cr);
1135 do_thread_cancel(master);
1136 }
1137 pthread_mutex_unlock(&master->mtx);
63ccb9cb 1138}
1189d95f 1139
63ccb9cb
QY
1140/**
1141 * Cancel a specific task.
1142 *
1143 * MT-Unsafe
1144 *
1145 * @param thread task to cancel
1146 */
d62a17ae 1147void thread_cancel(struct thread *thread)
63ccb9cb 1148{
6ed04aa2 1149 struct thread_master *master = thread->master;
d62a17ae 1150
6ed04aa2
DS
1151 assert(master->owner == pthread_self());
1152
1153 pthread_mutex_lock(&master->mtx);
d62a17ae 1154 {
1155 struct cancel_req *cr =
1156 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1157 cr->thread = thread;
6ed04aa2
DS
1158 listnode_add(master->cancel_req, cr);
1159 do_thread_cancel(master);
d62a17ae 1160 }
6ed04aa2 1161 pthread_mutex_unlock(&master->mtx);
63ccb9cb 1162}
1189d95f 1163
63ccb9cb
QY
1164/**
1165 * Asynchronous cancellation.
1166 *
8797240e
QY
1167 * Called with either a struct thread ** or void * to an event argument,
1168 * this function posts the correct cancellation request and blocks until it is
1169 * serviced.
63ccb9cb
QY
1170 *
1171 * If the thread is currently running, execution blocks until it completes.
1172 *
8797240e
QY
1173 * The last two parameters are mutually exclusive, i.e. if you pass one the
1174 * other must be NULL.
1175 *
1176 * When the cancellation procedure executes on the target thread_master, the
1177 * thread * provided is checked for nullity. If it is null, the thread is
1178 * assumed to no longer exist and the cancellation request is a no-op. Thus
1179 * users of this API must pass a back-reference when scheduling the original
1180 * task.
1181 *
63ccb9cb
QY
1182 * MT-Safe
1183 *
8797240e
QY
1184 * @param master the thread master with the relevant event / task
1185 * @param thread pointer to thread to cancel
1186 * @param eventobj the event
63ccb9cb 1187 */
d62a17ae 1188void thread_cancel_async(struct thread_master *master, struct thread **thread,
1189 void *eventobj)
63ccb9cb 1190{
d62a17ae 1191 assert(!(thread && eventobj) && (thread || eventobj));
1192 assert(master->owner != pthread_self());
1193
1194 pthread_mutex_lock(&master->mtx);
1195 {
1196 master->canceled = false;
1197
1198 if (thread) {
1199 struct cancel_req *cr =
1200 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1201 cr->threadref = thread;
1202 listnode_add(master->cancel_req, cr);
1203 } else if (eventobj) {
1204 struct cancel_req *cr =
1205 XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1206 cr->eventobj = eventobj;
1207 listnode_add(master->cancel_req, cr);
1208 }
1209 AWAKEN(master);
1210
1211 while (!master->canceled)
1212 pthread_cond_wait(&master->cancel_cond, &master->mtx);
1213 }
1214 pthread_mutex_unlock(&master->mtx);
718e3744 1215}
63ccb9cb 1216/* ------------------------------------------------------------------------- */
718e3744 1217
8390828d 1218static struct timeval *thread_timer_wait(struct pqueue *queue,
d62a17ae 1219 struct timeval *timer_val)
718e3744 1220{
8390828d
DL
1221 if (queue->size) {
1222 struct thread *next_timer = queue->array[0];
1223 monotime_until(&next_timer->u.sands, timer_val);
1224 return timer_val;
1225 }
1226 return NULL;
718e3744 1227}
718e3744 1228
d62a17ae 1229static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1230 struct thread *fetch)
718e3744 1231{
d62a17ae 1232 *fetch = *thread;
1233 thread_add_unuse(m, thread);
1234 return fetch;
718e3744 1235}
1236
d62a17ae 1237static int thread_process_io_helper(struct thread_master *m,
1238 struct thread *thread, short state, int pos)
5d4ccd4e 1239{
d62a17ae 1240 struct thread **thread_array;
1241
1242 if (!thread)
1243 return 0;
1244
1245 if (thread->type == THREAD_READ)
1246 thread_array = m->read;
1247 else
1248 thread_array = m->write;
1249
1250 thread_array[thread->u.fd] = NULL;
c284542b 1251 thread_list_add_tail(&m->ready, thread);
d62a17ae 1252 thread->type = THREAD_READY;
1253 /* if another pthread scheduled this file descriptor for the event we're
1254 * responding to, no problem; we're getting to it now */
1255 thread->master->handler.pfds[pos].events &= ~(state);
1256 return 1;
5d4ccd4e
DS
1257}
1258
8797240e
QY
1259/**
1260 * Process I/O events.
1261 *
1262 * Walks through file descriptor array looking for those pollfds whose .revents
1263 * field has something interesting. Deletes any invalid file descriptors.
1264 *
1265 * @param m the thread master
1266 * @param num the number of active file descriptors (return value of poll())
1267 */
d62a17ae 1268static void thread_process_io(struct thread_master *m, unsigned int num)
0a95a0d0 1269{
d62a17ae 1270 unsigned int ready = 0;
1271 struct pollfd *pfds = m->handler.copy;
1272
1273 for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1274 /* no event for current fd? immediately continue */
1275 if (pfds[i].revents == 0)
1276 continue;
1277
1278 ready++;
1279
1280 /* Unless someone has called thread_cancel from another pthread,
1281 * the only
1282 * thing that could have changed in m->handler.pfds while we
1283 * were
1284 * asleep is the .events field in a given pollfd. Barring
1285 * thread_cancel()
1286 * that value should be a superset of the values we have in our
1287 * copy, so
1288 * there's no need to update it. Similarily, barring deletion,
1289 * the fd
1290 * should still be a valid index into the master's pfds. */
1291 if (pfds[i].revents & (POLLIN | POLLHUP))
1292 thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
1293 i);
1294 if (pfds[i].revents & POLLOUT)
1295 thread_process_io_helper(m, m->write[pfds[i].fd],
1296 POLLOUT, i);
1297
1298 /* if one of our file descriptors is garbage, remove the same
1299 * from
1300 * both pfds + update sizes and index */
1301 if (pfds[i].revents & POLLNVAL) {
1302 memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1303 (m->handler.pfdcount - i - 1)
1304 * sizeof(struct pollfd));
1305 m->handler.pfdcount--;
1306
1307 memmove(pfds + i, pfds + i + 1,
1308 (m->handler.copycount - i - 1)
1309 * sizeof(struct pollfd));
1310 m->handler.copycount--;
1311
1312 i--;
1313 }
1314 }
718e3744 1315}
1316
8b70d0b0 1317/* Add all timers that have popped to the ready list. */
8390828d 1318static unsigned int thread_process_timers(struct pqueue *queue,
d62a17ae 1319 struct timeval *timenow)
a48b4e6d 1320{
d62a17ae 1321 struct thread *thread;
1322 unsigned int ready = 0;
1323
8390828d
DL
1324 while (queue->size) {
1325 thread = queue->array[0];
d62a17ae 1326 if (timercmp(timenow, &thread->u.sands, <))
1327 return ready;
8390828d 1328 pqueue_dequeue(queue);
d62a17ae 1329 thread->type = THREAD_READY;
c284542b 1330 thread_list_add_tail(&thread->master->ready, thread);
d62a17ae 1331 ready++;
1332 }
1333 return ready;
a48b4e6d 1334}
1335
2613abe6 1336/* process a list en masse, e.g. for event thread lists */
c284542b 1337static unsigned int thread_process(struct thread_list_head *list)
2613abe6 1338{
d62a17ae 1339 struct thread *thread;
d62a17ae 1340 unsigned int ready = 0;
1341
c284542b 1342 while ((thread = thread_list_pop(list))) {
d62a17ae 1343 thread->type = THREAD_READY;
c284542b 1344 thread_list_add_tail(&thread->master->ready, thread);
d62a17ae 1345 ready++;
1346 }
1347 return ready;
2613abe6
PJ
1348}
1349
1350
718e3744 1351/* Fetch next ready thread. */
d62a17ae 1352struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
718e3744 1353{
d62a17ae 1354 struct thread *thread = NULL;
1355 struct timeval now;
1356 struct timeval zerotime = {0, 0};
1357 struct timeval tv;
1358 struct timeval *tw = NULL;
1359
1360 int num = 0;
1361
1362 do {
1363 /* Handle signals if any */
1364 if (m->handle_signals)
1365 quagga_sigevent_process();
1366
1367 pthread_mutex_lock(&m->mtx);
1368
1369 /* Process any pending cancellation requests */
1370 do_thread_cancel(m);
1371
e3c9529e
QY
1372 /*
1373 * Attempt to flush ready queue before going into poll().
1374 * This is performance-critical. Think twice before modifying.
1375 */
c284542b 1376 if ((thread = thread_list_pop(&m->ready))) {
e3c9529e
QY
1377 fetch = thread_run(m, thread, fetch);
1378 if (fetch->ref)
1379 *fetch->ref = NULL;
1380 pthread_mutex_unlock(&m->mtx);
1381 break;
1382 }
1383
1384 /* otherwise, tick through scheduling sequence */
1385
bca37d17
QY
1386 /*
1387 * Post events to ready queue. This must come before the
1388 * following block since events should occur immediately
1389 */
d62a17ae 1390 thread_process(&m->event);
1391
bca37d17
QY
1392 /*
1393 * If there are no tasks on the ready queue, we will poll()
1394 * until a timer expires or we receive I/O, whichever comes
1395 * first. The strategy for doing this is:
d62a17ae 1396 *
1397 * - If there are events pending, set the poll() timeout to zero
1398 * - If there are no events pending, but there are timers
1399 * pending, set the
1400 * timeout to the smallest remaining time on any timer
1401 * - If there are neither timers nor events pending, but there
1402 * are file
1403 * descriptors pending, block indefinitely in poll()
1404 * - If nothing is pending, it's time for the application to die
1405 *
1406 * In every case except the last, we need to hit poll() at least
bca37d17
QY
1407 * once per loop to avoid starvation by events
1408 */
c284542b 1409 if (!thread_list_count(&m->ready))
8390828d 1410 tw = thread_timer_wait(m->timer, &tv);
d62a17ae 1411
c284542b
DL
1412 if (thread_list_count(&m->ready) ||
1413 (tw && !timercmp(tw, &zerotime, >)))
d62a17ae 1414 tw = &zerotime;
1415
1416 if (!tw && m->handler.pfdcount == 0) { /* die */
1417 pthread_mutex_unlock(&m->mtx);
1418 fetch = NULL;
1419 break;
1420 }
1421
bca37d17
QY
1422 /*
1423 * Copy pollfd array + # active pollfds in it. Not necessary to
1424 * copy the array size as this is fixed.
1425 */
d62a17ae 1426 m->handler.copycount = m->handler.pfdcount;
1427 memcpy(m->handler.copy, m->handler.pfds,
1428 m->handler.copycount * sizeof(struct pollfd));
1429
e3c9529e
QY
1430 pthread_mutex_unlock(&m->mtx);
1431 {
1432 num = fd_poll(m, m->handler.copy, m->handler.pfdsize,
1433 m->handler.copycount, tw);
1434 }
1435 pthread_mutex_lock(&m->mtx);
d764d2cc 1436
e3c9529e
QY
1437 /* Handle any errors received in poll() */
1438 if (num < 0) {
1439 if (errno == EINTR) {
d62a17ae 1440 pthread_mutex_unlock(&m->mtx);
e3c9529e
QY
1441 /* loop around to signal handler */
1442 continue;
d62a17ae 1443 }
1444
e3c9529e 1445 /* else die */
450971aa 1446 flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
9ef9495e 1447 safe_strerror(errno));
e3c9529e
QY
1448 pthread_mutex_unlock(&m->mtx);
1449 fetch = NULL;
1450 break;
bca37d17 1451 }
d62a17ae 1452
1453 /* Post timers to ready queue. */
1454 monotime(&now);
8390828d 1455 thread_process_timers(m->timer, &now);
d62a17ae 1456
1457 /* Post I/O to ready queue. */
1458 if (num > 0)
1459 thread_process_io(m, num);
1460
d62a17ae 1461 pthread_mutex_unlock(&m->mtx);
1462
1463 } while (!thread && m->spin);
1464
1465 return fetch;
718e3744 1466}
1467
d62a17ae 1468static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
62f44022 1469{
d62a17ae 1470 return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1471 + (a.tv_usec - b.tv_usec));
62f44022
QY
1472}
1473
d62a17ae 1474unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1475 unsigned long *cputime)
718e3744 1476{
d62a17ae 1477 /* This is 'user + sys' time. */
1478 *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1479 + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1480 return timeval_elapsed(now->real, start->real);
8b70d0b0 1481}
1482
50596be0
DS
1483/* We should aim to yield after yield milliseconds, which defaults
1484 to THREAD_YIELD_TIME_SLOT .
8b70d0b0 1485 Note: we are using real (wall clock) time for this calculation.
1486 It could be argued that CPU time may make more sense in certain
1487 contexts. The things to consider are whether the thread may have
1488 blocked (in which case wall time increases, but CPU time does not),
1489 or whether the system is heavily loaded with other processes competing
d62a17ae 1490 for CPU time. On balance, wall clock time seems to make sense.
8b70d0b0 1491 Plus it has the added benefit that gettimeofday should be faster
1492 than calling getrusage. */
d62a17ae 1493int thread_should_yield(struct thread *thread)
718e3744 1494{
d62a17ae 1495 int result;
1496 pthread_mutex_lock(&thread->mtx);
1497 {
1498 result = monotime_since(&thread->real, NULL)
1499 > (int64_t)thread->yield;
1500 }
1501 pthread_mutex_unlock(&thread->mtx);
1502 return result;
50596be0
DS
1503}
1504
d62a17ae 1505void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
50596be0 1506{
d62a17ae 1507 pthread_mutex_lock(&thread->mtx);
1508 {
1509 thread->yield = yield_time;
1510 }
1511 pthread_mutex_unlock(&thread->mtx);
718e3744 1512}
1513
d62a17ae 1514void thread_getrusage(RUSAGE_T *r)
db9c0df9 1515{
231db9a6
DS
1516#if defined RUSAGE_THREAD
1517#define FRR_RUSAGE RUSAGE_THREAD
1518#else
1519#define FRR_RUSAGE RUSAGE_SELF
1520#endif
d62a17ae 1521 monotime(&r->real);
231db9a6 1522 getrusage(FRR_RUSAGE, &(r->cpu));
db9c0df9
PJ
1523}
1524
fbcac826
QY
1525/*
1526 * Call a thread.
1527 *
1528 * This function will atomically update the thread's usage history. At present
1529 * this is the only spot where usage history is written. Nevertheless the code
1530 * has been written such that the introduction of writers in the future should
1531 * not need to update it provided the writers atomically perform only the
1532 * operations done here, i.e. updating the total and maximum times. In
1533 * particular, the maximum real and cpu times must be monotonically increasing
1534 * or this code is not correct.
1535 */
d62a17ae 1536void thread_call(struct thread *thread)
718e3744 1537{
fbcac826
QY
1538 _Atomic unsigned long realtime, cputime;
1539 unsigned long exp;
1540 unsigned long helper;
d62a17ae 1541 RUSAGE_T before, after;
cc8b13a0 1542
d62a17ae 1543 GETRUSAGE(&before);
1544 thread->real = before.real;
718e3744 1545
d62a17ae 1546 pthread_setspecific(thread_current, thread);
1547 (*thread->func)(thread);
1548 pthread_setspecific(thread_current, NULL);
718e3744 1549
d62a17ae 1550 GETRUSAGE(&after);
718e3744 1551
fbcac826
QY
1552 realtime = thread_consumed_time(&after, &before, &helper);
1553 cputime = helper;
1554
1555 /* update realtime */
1556 atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
1557 memory_order_seq_cst);
1558 exp = atomic_load_explicit(&thread->hist->real.max,
1559 memory_order_seq_cst);
1560 while (exp < realtime
1561 && !atomic_compare_exchange_weak_explicit(
1562 &thread->hist->real.max, &exp, realtime,
1563 memory_order_seq_cst, memory_order_seq_cst))
1564 ;
1565
1566 /* update cputime */
1567 atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
1568 memory_order_seq_cst);
1569 exp = atomic_load_explicit(&thread->hist->cpu.max,
1570 memory_order_seq_cst);
1571 while (exp < cputime
1572 && !atomic_compare_exchange_weak_explicit(
1573 &thread->hist->cpu.max, &exp, cputime,
1574 memory_order_seq_cst, memory_order_seq_cst))
1575 ;
1576
1577 atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
1578 memory_order_seq_cst);
1579 atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
1580 memory_order_seq_cst);
718e3744 1581
924b9229 1582#ifdef CONSUMED_TIME_CHECK
d62a17ae 1583 if (realtime > CONSUMED_TIME_CHECK) {
1584 /*
1585 * We have a CPU Hog on our hands.
1586 * Whinge about it now, so we're aware this is yet another task
1587 * to fix.
1588 */
9ef9495e 1589 flog_warn(
450971aa 1590 EC_LIB_SLOW_THREAD,
d62a17ae 1591 "SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
1592 thread->funcname, (unsigned long)thread->func,
1593 realtime / 1000, cputime / 1000);
1594 }
924b9229 1595#endif /* CONSUMED_TIME_CHECK */
718e3744 1596}
1597
1598/* Execute thread */
d62a17ae 1599void funcname_thread_execute(struct thread_master *m,
1600 int (*func)(struct thread *), void *arg, int val,
1601 debugargdef)
718e3744 1602{
c4345fbf 1603 struct thread *thread;
718e3744 1604
c4345fbf
RZ
1605 /* Get or allocate new thread to execute. */
1606 pthread_mutex_lock(&m->mtx);
1607 {
1608 thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
9c7753e4 1609
c4345fbf
RZ
1610 /* Set its event value. */
1611 pthread_mutex_lock(&thread->mtx);
1612 {
1613 thread->add_type = THREAD_EXECUTE;
1614 thread->u.val = val;
1615 thread->ref = &thread;
1616 }
1617 pthread_mutex_unlock(&thread->mtx);
1618 }
1619 pthread_mutex_unlock(&m->mtx);
f7c62e11 1620
c4345fbf
RZ
1621 /* Execute thread doing all accounting. */
1622 thread_call(thread);
9c7753e4 1623
c4345fbf
RZ
1624 /* Give back or free thread. */
1625 thread_add_unuse(m, thread);
718e3744 1626}