rv->background = pqueue_create();
rv->timer->cmp = rv->background->cmp = thread_timer_cmp;
rv->timer->update = rv->background->update = thread_timer_update;
+ rv->spin = true;
+ rv->handle_signals = true;
- #if defined(HAVE_POLL)
+ #if defined(HAVE_POLL_CALL)
rv->handler.pfdsize = rv->fd_limit;
rv->handler.pfdcount = 0;
rv->handler.pfds = XCALLOC (MTYPE_THREAD_MASTER,
thread_list_free (m, &m->ready);
thread_list_free (m, &m->unuse);
thread_queue_free (m, m->background);
+ pthread_mutex_destroy (&m->mtx);
- #if defined(HAVE_POLL)
+ #if defined(HAVE_POLL_CALL)
XFREE (MTYPE_THREAD_MASTER, m->handler.pfds);
#endif
XFREE (MTYPE_THREAD_MASTER, m);
fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *timer_wait)
{
int num;
- #if defined(HAVE_POLL)
+
+ /* If timer_wait is null here, that means either select() or poll() should
+ * block indefinitely, unless the thread_master has overriden it. select()
+ * and poll() differ in the timeout values they interpret as an indefinite
+ * block; select() requires a null pointer, while poll takes a millisecond
+ * value of -1.
+ *
+ * The thread_master owner has the option of overriding the default behavior
+ * by setting ->selectpoll_timeout. If the value is positive, it specifies
+ * the maximum number of milliseconds to wait. If the timeout is -1, it
+ * specifies that we should never wait and always return immediately even if
+ * no event is detected. If the value is zero, the behavior is default.
+ */
+
- /* recalc timeout for poll. Attention NULL pointer is no timeout with
- select, where with poll no timeount is -1 */
+ #if defined(HAVE_POLL_CALL)
int timeout = -1;
- if (timer_wait != NULL)
+
+ if (timer_wait != NULL && m->selectpoll_timeout == 0) // use the default value
timeout = (timer_wait->tv_sec*1000) + (timer_wait->tv_usec/1000);
+ else if (m->selectpoll_timeout > 0) // use the user's timeout
+ timeout = m->selectpoll_timeout;
+ else if (m->selectpoll_timeout < 0) // effect a poll (return immediately)
+ timeout = 0;
num = poll (m->handler.pfds, m->handler.pfdcount + m->handler.pfdcountsnmp, timeout);
#else
{
struct thread *thread = NULL;
-#if !defined(HAVE_POLL_CALL)
- thread_fd_set *fdset = NULL;
- if (dir == THREAD_READ)
- fdset = &m->handler.readfd;
- else
- fdset = &m->handler.writefd;
-#endif
-
+ pthread_mutex_lock (&m->mtx);
+ {
- #if defined (HAVE_POLL)
+ #if defined (HAVE_POLL_CALL)
- thread = generic_thread_add(m, func, arg, fd, dir, debugargpass);
-
- if (thread == NULL)
- return NULL;
+ thread = generic_thread_add(m, func, arg, fd, dir, debugargpass);
#else
- if (FD_ISSET (fd, fdset))
- {
- zlog_warn ("There is already %s fd [%d]",
- (dir == THREAD_READ) ? "read" : "write", fd);
- return NULL;
- }
+ if (fd >= FD_SETSIZE)
+ {
+ zlog_err ("File descriptor %d is >= FD_SETSIZE (%d). Please recompile"
+ "with --enable-poll=yes", fd, FD_SETSIZE);
+ assert (fd < FD_SETSIZE && !"fd >= FD_SETSIZE");
+ }
+ thread_fd_set *fdset = NULL;
+ if (dir == THREAD_READ)
+ fdset = &m->handler.readfd;
+ else
+ fdset = &m->handler.writefd;
- FD_SET (fd, fdset);
- thread = thread_get (m, dir, func, arg, debugargpass);
+ if (FD_ISSET (fd, fdset))
+ {
+ zlog_warn ("There is already %s fd [%d]",
+ (dir == THREAD_READ) ? "read" : "write", fd);
+ }
+ else
+ {
+ FD_SET (fd, fdset);
+ thread = thread_get (m, dir, func, arg, debugargpass);
+ }
#endif
- thread->u.fd = fd;
- if (dir == THREAD_READ)
- thread_add_fd (m->read, thread);
- else
- thread_add_fd (m->write, thread);
+ if (thread)
+ {
+ pthread_mutex_lock (&thread->mtx);
+ {
+ thread->u.fd = fd;
+ if (dir == THREAD_READ)
+ thread_add_fd (m->read, thread);
+ else
+ thread_add_fd (m->write, thread);
+ }
+ pthread_mutex_unlock (&thread->mtx);
+ }
+ }
+ pthread_mutex_unlock (&m->mtx);
return thread;
}