static void reportWindowsError( char const * const apiName, int slot );
/* closes a Windows HANDLE and resets its variable to 0. */
static void closeWinHandle( HANDLE * const handle );
+/* Adds the job index to the list of currently active jobs. */
+static void register_wait( int job_id );
/* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */
/* The list of commands we run. */
-static struct
+static struct _cmdtab_t
{
/* Temporary command file used to execute the action when needed. */
string command_file[ 1 ];
PROCESS_INFORMATION pi; /* running process information */
+ HANDLE wait_handle;
+
/* Function called when the command completes. */
ExecCmdCallback func;
/* Opaque data passed back to the 'func' callback. */
void * closure;
-}
-cmdtab[ MAXJOBS ] = { { 0 } };
+} * cmdtab = NULL;
+static int cmdtab_size = 0;
+/* A thread-safe single element queue. Used by the worker threads
+ * to signal the main thread that a process is completed.
+ */
+struct
+{
+ int job_index;
+ HANDLE read_okay;
+ HANDLE write_okay;
+} process_queue;
/*
* Execution unit tests.
#endif
}
+/*
+ * exec_init() - global initialization
+ */
+void exec_init( void )
+{
+ if ( globs.jobs > cmdtab_size )
+ {
+ cmdtab = BJAM_REALLOC( cmdtab, globs.jobs * sizeof( *cmdtab ) );
+ memset( cmdtab + cmdtab_size, 0, ( globs.jobs - cmdtab_size ) * sizeof( *cmdtab ) );
+ cmdtab_size = globs.jobs;
+ }
+ if ( globs.jobs > MAXIMUM_WAIT_OBJECTS && !process_queue.read_okay )
+ {
+ process_queue.read_okay = CreateEvent( NULL, FALSE, FALSE, NULL );
+ process_queue.write_okay = CreateEvent( NULL, FALSE, TRUE, NULL );
+ }
+}
+
+/*
+ * exec_done - free resources.
+ */
+void exec_done( void )
+{
+ if ( process_queue.read_okay )
+ {
+ CloseHandle( process_queue.read_okay );
+ }
+ if ( process_queue.write_okay )
+ {
+ CloseHandle( process_queue.write_okay );
+ }
+ BJAM_FREE( cmdtab );
+}
/*
* exec_check() - preprocess and validate the command
reportWindowsError( "CreateProcessA", slot );
return;
}
+
+ register_wait( slot );
}
static int maxline()
{
- static result;
+ static int result;
if ( !result ) result = raw_maxline();
return result;
}
}
}
+static void CALLBACK try_wait_callback( void * data, BOOLEAN is_timeout )
+{
+ struct _cmdtab_t * slot = ( struct _cmdtab_t * )data;
+ WaitForSingleObject( process_queue.write_okay, INFINITE );
+ process_queue.job_index = slot - cmdtab;
+ assert( !is_timeout );
+ SetEvent( process_queue.read_okay );
+ /* Okay. Non-blocking. */
+ UnregisterWait( slot->wait_handle );
+}
+
+static int try_wait_impl( DWORD timeout )
+{
+ int job_index;
+ int timed_out;
+ int res = WaitForSingleObject( process_queue.read_okay, timeout );
+ if ( res != WAIT_OBJECT_0 )
+ return -1;
+ job_index = process_queue.job_index;
+ SetEvent( process_queue.write_okay );
+ return job_index;
+}
+
+static void register_wait( int job_id )
+{
+ if ( globs.jobs > MAXIMUM_WAIT_OBJECTS )
+ {
+ HANDLE ignore;
+ RegisterWaitForSingleObject( &cmdtab[ job_id ].wait_handle,
+ cmdtab[ job_id ].pi.hProcess,
+ &try_wait_callback, &cmdtab[ job_id ], INFINITE,
+ WT_EXECUTEDEFAULT | WT_EXECUTEONLYONCE );
+ }
+}
/*
* Waits for a single child process command to complete, or the timeout,
* cmdtab array, or -1.
*/
-typedef struct _twh_params
-{
- int * active_procs;
- HANDLE * active_handles;
- DWORD num_active;
- DWORD timeoutMillis;
-} twh_params;
-
-static int try_wait_helper( twh_params * );
-
static int try_wait( int const timeoutMillis )
{
-#define MAX_THREADS MAXJOBS/(MAXIMUM_WAIT_OBJECTS - 1) + 1
- int i;
- int num_active;
- int wait_api_result;
- HANDLE active_handles[ MAXJOBS + MAX_THREADS ];
- int active_procs[ MAXJOBS + MAX_THREADS ];
- unsigned int num_threads;
- unsigned int num_handles;
- unsigned int last_chunk_size;
- unsigned int last_chunk_offset;
- HANDLE completed_event = INVALID_HANDLE_VALUE;
- HANDLE thread_handles[MAXIMUM_WAIT_OBJECTS];
- twh_params thread_params[MAX_THREADS];
- int result = -1;
- BOOL success;
-
- /* Prepare a list of all active processes to wait for. */
- for ( num_active = 0, i = 0; i < globs.jobs; ++i )
- if ( cmdtab[ i ].pi.hProcess )
+ if ( globs.jobs <= MAXIMUM_WAIT_OBJECTS )
+ {
+ int i;
+ HANDLE active_handles[ MAXIMUM_WAIT_OBJECTS ];
+ int job_ids[ MAXIMUM_WAIT_OBJECTS ];
+ DWORD num_handles = 0;
+ DWORD wait_api_result;
+ for ( i = 0; i < globs.jobs; ++i )
{
- if ( num_active == MAXIMUM_WAIT_OBJECTS )
- {
- /*
- * We surpassed MAXIMUM_WAIT_OBJECTS, so we need to use threads
- * to wait for this set. Create an event object which will
- * notify threads to stop waiting. Every handle set chunk should
- * have this event as its last element.
- */
- assert( completed_event == INVALID_HANDLE_VALUE );
- completed_event = CreateEvent(NULL, FALSE, FALSE, NULL);
- active_handles[ num_active ] = active_handles[ num_active - 1 ];
- active_procs[ num_active ] = active_procs[ num_active - 1 ];
- active_handles[ num_active - 1 ] = completed_event;
- active_procs[ num_active - 1 ] = -1;
- ++num_active;
- }
- else if ( ( completed_event != INVALID_HANDLE_VALUE ) &&
- !((num_active + 1) % MAXIMUM_WAIT_OBJECTS) )
+ if( cmdtab[ i ].pi.hProcess )
{
- active_handles[ num_active ] = completed_event;
- active_procs[ num_active ] = -1;
- ++num_active;
+ job_ids[ num_handles ] = i;
+ active_handles[ num_handles ] = cmdtab[ i ].pi.hProcess;
+ ++num_handles;
}
- active_handles[ num_active ] = cmdtab[ i ].pi.hProcess;
- active_procs[ num_active ] = i;
- ++num_active;
}
-
- assert( (num_active <= MAXIMUM_WAIT_OBJECTS) ==
- (completed_event == INVALID_HANDLE_VALUE) );
- if ( num_active <= MAXIMUM_WAIT_OBJECTS )
- {
- twh_params twh;
- twh.active_procs = active_procs;
- twh.active_handles = active_handles;
- twh.num_active = num_active;
- twh.timeoutMillis = timeoutMillis;
- return try_wait_helper( &twh );
- }
-
- num_threads = num_active / MAXIMUM_WAIT_OBJECTS;
- last_chunk_size = num_active % MAXIMUM_WAIT_OBJECTS;
- num_handles = num_threads;
- if ( last_chunk_size )
- {
- /* Can we fit the last chunk in the outer WFMO call? */
- if ( last_chunk_size <= MAXIMUM_WAIT_OBJECTS - num_threads )
+ wait_api_result = WaitForMultipleObjects( num_handles, active_handles, FALSE, timeoutMillis );
+ if ( WAIT_OBJECT_0 <= wait_api_result && wait_api_result < WAIT_OBJECT_0 + globs.jobs )
{
- last_chunk_offset = num_threads * MAXIMUM_WAIT_OBJECTS;
- for ( i = 0; i < last_chunk_size; ++i )
- thread_handles[ i + num_threads ] =
- active_handles[ i + last_chunk_offset ];
- num_handles = num_threads + last_chunk_size;
+ return job_ids[ wait_api_result - WAIT_OBJECT_0 ];
}
else
{
- /* We need another thread for the remainder. */
- /* Add completed_event handle to the last chunk. */
- active_handles[ num_active ] = completed_event;
- active_procs[ num_active ] = -1;
- ++last_chunk_size;
- ++num_active;
- ++num_threads;
- num_handles = num_threads;
+ return -1;
}
}
-
- assert( num_threads <= MAX_THREADS );
-
- for ( i = 0; i < num_threads; ++i )
- {
- thread_params[i].active_procs = active_procs +
- i * MAXIMUM_WAIT_OBJECTS;
- thread_params[i].active_handles = active_handles +
- i * MAXIMUM_WAIT_OBJECTS;
- thread_params[i].timeoutMillis = INFINITE;
- thread_params[i].num_active = MAXIMUM_WAIT_OBJECTS;
- if ( ( i == num_threads - 1 ) && last_chunk_size &&
- ( num_handles == num_threads ) )
- thread_params[i].num_active = last_chunk_size;
- thread_handles[i] = CreateThread(NULL, 4 * 1024,
- (LPTHREAD_START_ROUTINE)&try_wait_helper, &thread_params[i],
- 0, NULL);
- }
- wait_api_result = WaitForMultipleObjects(num_handles, thread_handles,
- FALSE, timeoutMillis);
- if ( ( WAIT_OBJECT_0 <= wait_api_result ) &&
- ( wait_api_result < WAIT_OBJECT_0 + num_threads ) )
- {
- HANDLE thread_handle = thread_handles[wait_api_result - WAIT_OBJECT_0];
- success = GetExitCodeThread(thread_handle, (DWORD *)&result);
- assert( success );
- }
- else if ( ( WAIT_OBJECT_0 + num_threads <= wait_api_result ) &&
- ( wait_api_result < WAIT_OBJECT_0 + num_handles ) )
- {
- unsigned int offset = wait_api_result - num_threads - WAIT_OBJECT_0;
- result = active_procs[ last_chunk_offset + offset ];
- }
- SetEvent(completed_event);
- /* Should complete instantly. */
- WaitForMultipleObjects(num_threads, thread_handles, TRUE, INFINITE);
- CloseHandle(completed_event);
- for ( i = 0; i < num_threads; ++i )
- CloseHandle(thread_handles[i]);
- return result;
-#undef MAX_THREADS
-}
-
-static int try_wait_helper( twh_params * params )
-{
- int wait_api_result;
-
- assert( params->num_active <= MAXIMUM_WAIT_OBJECTS );
-
- /* Wait for a child to complete, or for our timeout window to expire. */
- wait_api_result = WaitForMultipleObjects( params->num_active,
- params->active_handles, FALSE, params->timeoutMillis );
- if ( ( WAIT_OBJECT_0 <= wait_api_result ) &&
- ( wait_api_result < WAIT_OBJECT_0 + params->num_active ) )
+ else
{
- /* Terminated process detected - return its index. */
- return params->active_procs[ wait_api_result - WAIT_OBJECT_0 ];
+ return try_wait_impl( timeoutMillis );
}
- /* Timeout. */
- return -1;
}
static int get_free_cmdtab_slot()
{
int slot;
- for ( slot = 0; slot < MAXJOBS; ++slot )
+ for ( slot = 0; slot < globs.jobs; ++slot )
if ( !cmdtab[ slot ].pi.hProcess )
return slot;
err_printf( "no slots for child!\n" );