#include "network.h"
#include "jhash.h"
#include "frratomic.h"
+#include "lib_errors.h"
DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
+DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info")
DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
#if defined(__APPLE__)
pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
static struct list *masters;
+static void thread_free(struct thread_master *master, struct thread *thread);
/* CLI start ---------------------------------------------------------------- */
static unsigned int cpu_record_hash_key(struct cpu_thread_history *a)
return jhash(&a->func, size, 0);
}
-static int cpu_record_hash_cmp(const struct cpu_thread_history *a,
+static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
const struct cpu_thread_history *b)
{
return a->func == b->func;
/* Initialize I/O task data structures */
getrlimit(RLIMIT_NOFILE, &limit);
rv->fd_limit = (int)limit.rlim_cur;
- rv->read =
- XCALLOC(MTYPE_THREAD, sizeof(struct thread *) * rv->fd_limit);
- if (rv->read == NULL) {
- XFREE(MTYPE_THREAD_MASTER, rv);
- return NULL;
- }
- rv->write =
- XCALLOC(MTYPE_THREAD, sizeof(struct thread *) * rv->fd_limit);
- if (rv->write == NULL) {
- XFREE(MTYPE_THREAD, rv->read);
- XFREE(MTYPE_THREAD_MASTER, rv);
- return NULL;
- }
+ rv->read = XCALLOC(MTYPE_THREAD_POLL,
+ sizeof(struct thread *) * rv->fd_limit);
+
+ rv->write = XCALLOC(MTYPE_THREAD_POLL,
+ sizeof(struct thread *) * rv->fd_limit);
rv->cpu_record = hash_create_size(
8, (unsigned int (*)(void *))cpu_record_hash_key,
- (int (*)(const void *, const void *))cpu_record_hash_cmp,
+ (bool (*)(const void *, const void *))cpu_record_hash_cmp,
"Thread Hash");
return NULL;
}
+#define THREAD_UNUSED_DEPTH 10
+
/* Move thread to unuse list. */
static void thread_add_unuse(struct thread_master *m, struct thread *thread)
{
+ pthread_mutex_t mtxc = thread->mtx;
+
assert(m != NULL && thread != NULL);
assert(thread->next == NULL);
assert(thread->prev == NULL);
- thread->ref = NULL;
- thread->type = THREAD_UNUSED;
thread->hist->total_active--;
- thread_list_add(&m->unuse, thread);
+ memset(thread, 0, sizeof(struct thread));
+ thread->type = THREAD_UNUSED;
+
+ /* Restore the thread mutex context. */
+ thread->mtx = mtxc;
+
+ if (m->unuse.count < THREAD_UNUSED_DEPTH) {
+ thread_list_add(&m->unuse, thread);
+ return;
+ }
+
+ thread_free(m, thread);
}
/* Free all unused thread. */
for (t = list->head; t; t = next) {
next = t->next;
- XFREE(MTYPE_THREAD, t);
+ thread_free(m, t);
list->count--;
- m->alloc--;
}
}
t = thread_array[index];
if (t) {
thread_array[index] = NULL;
- XFREE(MTYPE_THREAD, t);
- m->alloc--;
+ thread_free(m, t);
}
}
- XFREE(MTYPE_THREAD, thread_array);
+ XFREE(MTYPE_THREAD_POLL, thread_array);
}
static void thread_queue_free(struct thread_master *m, struct pqueue *queue)
int i;
for (i = 0; i < queue->size; i++)
- XFREE(MTYPE_THREAD, queue->array[i]);
+ thread_free(m, queue->array[i]);
- m->alloc -= queue->size;
pqueue_delete(queue);
}
{
struct thread *t;
while ((t = thread_trim_head(&m->unuse)) != NULL) {
- pthread_mutex_destroy(&t->mtx);
- XFREE(MTYPE_THREAD, t);
+ thread_free(m, t);
}
}
pthread_mutex_unlock(&m->mtx);
{
listnode_delete(masters, m);
if (masters->count == 0) {
- list_delete_and_null(&masters);
+ list_delete(&masters);
}
}
pthread_mutex_unlock(&masters_mtx);
pthread_cond_destroy(&m->cancel_cond);
close(m->io_pipe[0]);
close(m->io_pipe[1]);
- list_delete_and_null(&m->cancel_req);
+ list_delete(&m->cancel_req);
m->cancel_req = NULL;
hash_clean(m->cpu_record, cpu_record_hash_free);
XFREE(MTYPE_THREAD_MASTER, m);
}
-/* Return remain time in second. */
-unsigned long thread_timer_remain_second(struct thread *thread)
+/* Return remain time in miliseconds. */
+unsigned long thread_timer_remain_msec(struct thread *thread)
{
int64_t remain;
pthread_mutex_lock(&thread->mtx);
{
- remain = monotime_until(&thread->u.sands, NULL) / 1000000LL;
+ remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
}
pthread_mutex_unlock(&thread->mtx);
return remain < 0 ? 0 : remain;
}
+/* Return remain time in seconds. */
+unsigned long thread_timer_remain_second(struct thread *thread)
+{
+ return thread_timer_remain_msec(thread) / 1000LL;
+}
+
#define debugargdef const char *funcname, const char *schedfrom, int fromln
#define debugargpass funcname, schedfrom, fromln
return thread;
}
+static void thread_free(struct thread_master *master, struct thread *thread)
+{
+ /* Update statistics. */
+ assert(master->alloc > 0);
+ master->alloc--;
+
+ /* Free allocated resources. */
+ pthread_mutex_destroy(&thread->mtx);
+ XFREE(MTYPE_THREAD, thread);
+}
+
static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
nfds_t count, const struct timeval *timer_wait)
{
{
struct thread *thread = NULL;
+ assert(fd >= 0 && fd < m->fd_limit);
pthread_mutex_lock(&m->mtx);
{
if (t_ptr
*/
void thread_cancel(struct thread *thread)
{
- assert(thread->master->owner == pthread_self());
+ struct thread_master *master = thread->master;
+
+ assert(master->owner == pthread_self());
- pthread_mutex_lock(&thread->master->mtx);
+ pthread_mutex_lock(&master->mtx);
{
struct cancel_req *cr =
XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
cr->thread = thread;
- listnode_add(thread->master->cancel_req, cr);
- do_thread_cancel(thread->master);
+ listnode_add(master->cancel_req, cr);
+ do_thread_cancel(master);
}
- pthread_mutex_unlock(&thread->master->mtx);
+ pthread_mutex_unlock(&master->mtx);
}
/**
}
/* else die */
- zlog_warn("poll() error: %s", safe_strerror(errno));
+ flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
+ safe_strerror(errno));
pthread_mutex_unlock(&m->mtx);
fetch = NULL;
break;
* Whinge about it now, so we're aware this is yet another task
* to fix.
*/
- zlog_warn(
+ flog_warn(
+ EC_LIB_SLOW_THREAD,
"SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
thread->funcname, (unsigned long)thread->func,
realtime / 1000, cputime / 1000);
int (*func)(struct thread *), void *arg, int val,
debugargdef)
{
- struct cpu_thread_history tmp;
- struct thread dummy;
-
- memset(&dummy, 0, sizeof(struct thread));
+ struct thread *thread;
- pthread_mutex_init(&dummy.mtx, NULL);
- dummy.type = THREAD_EVENT;
- dummy.add_type = THREAD_EXECUTE;
- dummy.master = NULL;
- dummy.arg = arg;
- dummy.u.val = val;
+ /* Get or allocate new thread to execute. */
+ pthread_mutex_lock(&m->mtx);
+ {
+ thread = thread_get(m, THREAD_EVENT, func, arg, debugargpass);
- tmp.func = dummy.func = func;
- tmp.funcname = dummy.funcname = funcname;
- dummy.hist = hash_get(m->cpu_record, &tmp,
- (void *(*)(void *))cpu_record_hash_alloc);
+ /* Set its event value. */
+ pthread_mutex_lock(&thread->mtx);
+ {
+ thread->add_type = THREAD_EXECUTE;
+ thread->u.val = val;
+ thread->ref = &thread;
+ }
+ pthread_mutex_unlock(&thread->mtx);
+ }
+ pthread_mutex_unlock(&m->mtx);
- dummy.schedfrom = schedfrom;
- dummy.schedfrom_line = fromln;
+ /* Execute thread doing all accounting. */
+ thread_call(thread);
- thread_call(&dummy);
+ /* Give back or free thread. */
+ thread_add_unuse(m, thread);
}