3 # Copyright (C) 2007,2008,2009,2010,2011,2012,2013,2014 Ole Tange and
4 # Free Software Foundation, Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, see <http://www.gnu.org/licenses/>
18 # or write to the Free Software Foundation, Inc., 51 Franklin St,
19 # Fifth Floor, Boston, MA 02110-1301 USA
21 # open3 used in Job::start
23 # &WNOHANG used in reaper
24 use POSIX
qw(:sys_wait_h setsid ceil :errno_h);
25 # gensym used in Job::start
26 use Symbol
qw(gensym);
27 # tempfile used in Job::start
28 use File
::Temp
qw(tempfile tempdir);
29 # mkpath used in openresultsfile
31 # GetOptions used in get_options_from_array
33 # Used to ensure code quality
38 # $ENV{HOME} is sometimes not set if called from PHP
39 ::warning
("\$HOME not set. Using /tmp\n");
43 save_stdin_stdout_stderr
();
44 save_original_signal_handler
();
46 ::debug
("init", "Open file descriptors: ", join(" ",keys %Global::fd
), "\n");
48 if($Global::max_number_of_args
) {
49 $number_of_args=$Global::max_number_of_args
;
50 } elsif ($opt::X
or $opt::m
or $opt::xargs
) {
51 $number_of_args = undef;
61 @fhlist = map { open_or_exit
($_) } "/dev/null";
63 @fhlist = map { open_or_exit
($_) } @opt::a
;
64 if(not @fhlist and not $opt::pipe) {
69 if($opt::skip_first_line
) {
70 # Skip the first line for the first file handle
74 if($opt::header
and not $opt::pipe) {
76 # split with colsep or \t
77 # $header force $colsep = \t if undef?
78 my $delimiter = $opt::colsep
;
81 for my $fh (@fhlist) {
84 ::debug
("init", "Delimiter: '$delimiter'");
85 for my $s (split /$delimiter/o, $line) {
86 ::debug
("init", "Colname: '$s'");
87 # Replace {colname} with {2}
88 # TODO accept configurable short hands
89 # TODO how to deal with headers in {=...=}
91 s
:\
{$s(|/|//|\.|/\
.)\
}:\
{$id$1\}:g
;
93 $Global::input_source_header
{$id} = $s;
99 for my $fh (@fhlist) {
100 $Global::input_source_header
{$id} = $id;
105 if($opt::filter_hosts
and (@opt::sshlogin
or @opt::sshloginfile
)) {
106 # Parallel check all hosts are up. Remove hosts that are down
110 if($opt::nonall
or $opt::onall
) {
112 wait_and_exit
(min
(undef_as_zero
($Global::exitstatus
),254));
115 # TODO --transfer foo/./bar --cleanup
116 # multiple --transfer and --basefile with different /./
118 $Global::JobQueue
= JobQueue-
>new(
119 \
@command,\@fhlist,$Global::ContextReplace
,$number_of_args,\@Global::ret_files
);
121 if($opt::eta
or $opt::bar
) {
122 # Count the number of jobs before starting any
123 $Global::JobQueue-
>total_jobs();
126 @Global::cat_partials
= map { pipe_part_files
($_) } @opt::a
;
127 # Unget the command as many times as there are parts
128 $Global::JobQueue-
>{'commandlinequeue'}->unget(
129 map { $Global::JobQueue-
>{'commandlinequeue'}->get() } @Global::cat_partials
132 for my $sshlogin (values %Global::host
) {
133 $sshlogin->max_jobs_running();
138 if($Global::semaphore
) {
139 $sem = acquire_semaphore
();
141 $SIG{TERM
} = \
&start_no_new_jobs
;
144 if(not $opt::pipepart
) {
149 ::debug
("init", "Start draining\n");
151 ::debug
("init", "Done draining\n");
153 ::debug
("init", "Done reaping\n");
154 if($opt::pipe and @opt::a
) {
155 for my $job (@Global::tee_jobs
) {
156 unlink $job->fh(2,"name");
157 $job->set_fh(2,"name","");
159 unlink $job->fh(1,"name");
162 ::debug
("init", "Cleaning\n");
164 if($Global::semaphore
) {
167 for(keys %Global::sshmaster
) {
170 ::debug
("init", "Halt\n");
171 if($opt::halt_on_error
) {
172 wait_and_exit
($Global::halt_on_error_exitstatus
);
174 wait_and_exit
(min
(undef_as_zero
($Global::exitstatus
),254));
179 sub pipe_part_files
{
181 # $file = the file to read
183 # @commands that will cat_partial each part
186 my $header = find_header
(\
$buf,open_or_exit($file));
188 my @pos = find_split_positions
($file,$opt::blocksize
,length $header);
190 my @cat_partials = ();
191 for(my $i=0; $i<$#pos; $i++) {
192 push @cat_partials, cat_partial
($file, 0, length($header), $pos[$i], $pos[$i+1]);
194 # Remote exec should look like:
195 # ssh -oLogLevel=quiet lo 'eval `echo $SHELL | grep "/t\{0,1\}csh" > /dev/null && echo setenv PARALLEL_SEQ '$PARALLEL_SEQ'\; setenv PARALLEL_PID '$PARALLEL_PID' || echo PARALLEL_SEQ='$PARALLEL_SEQ'\;export PARALLEL_SEQ\; PARALLEL_PID='$PARALLEL_PID'\;export PARALLEL_PID` ;' tty\ \>/dev/null\ \&\&\ stty\ isig\ -onlcr\ -echo\;echo\ \$SHELL\ \|\ grep\ \"/t\\\{0,1\\\}csh\"\ \>\ /dev/null\ \&\&\ setenv\ FOO\ /tmp/foo\ \|\|\ export\ FOO=/tmp/foo\; \(wc\ -\ \$FOO\)
196 # ssh -tt not allowed. Remote will die due to broken pipe anyway.
197 # TODO test remote with --fifo / --cat
198 return @cat_partials;
203 # $buf_ref = reference to read-in buffer
204 # $fh = filehandle to read from
210 my ($buf_ref, $fh) = @_;
213 if($opt::header
eq ":") { $opt::header
= "(.*\n)"; }
214 # Number = number of lines
215 $opt::header
=~ s/^(\d+)$/"(.*\n)"x$1/e;
216 while(read($fh,substr($$buf_ref,length $$buf_ref,0),$opt::blocksize
)) {
217 if($$buf_ref=~s/^($opt::header)//) {
226 sub find_split_positions
{
228 # $file = the file to read
229 # $block = (minimal) --block-size of each chunk
230 # $headerlen = length of header to be skipped
235 # @positions of block start/end
236 my($file, $block, $headerlen) = @_;
239 # The optimal dd blocksize for mint, redhat, solaris, openbsd = 2^17..2^20
240 # The optimal dd blocksize for freebsd = 2^15..2^17
241 my $dd_block_size = 131072; # 2^17
243 my ($recstart,$recend) = recstartrecend
();
244 my $recendrecstart = $recend.$recstart;
245 my $fh = ::open_or_exit
($file);
246 push(@pos,$headerlen);
247 for(my $pos = $block+$headerlen; $pos < $size; $pos += $block) {
249 seek($fh, $pos, 0) || die;
250 while(read($fh,substr($buf,length $buf,0),$dd_block_size)) {
252 # If match /$recend$recstart/ => Record position
253 if($buf =~ /(.*$recend)$recstart/os) {
256 # Start looking for next record _after_ this match
261 # If match $recend$recstart => Record position
262 my $i = index($buf,$recendrecstart);
265 # Start looking for next record _after_ this match
279 # $file = the file to read
280 # ($start, $end, [$start2, $end2, ...]) = start byte, end byte
282 # Efficient perl command to copy $start..$end, $start2..$end2, ... to stdout
283 my($file, @start_end) = @_;
285 # Convert start_end to start_len
286 my @start_len = map { if(++$i % 2) { $start = $_; } else { $_-$start } } @start_end;
287 return "<". shell_quote_scalar
($file) .
288 q{ perl -e 'while(@ARGV) { sysseek(STDIN,shift,0) || die; $left = shift; while($read = sysread(STDIN,$buf, ($left > 32768 ? 32768 : $left))){ $left -= $read; syswrite(STDOUT,$buf); } }' } .
294 # Spawn a job and print the record to it.
300 # $Global::max_number_of_args
302 # $Global::start_no_new_jobs
307 my ($recstart,$recend) = recstartrecend();
308 my $recendrecstart = $recend.$recstart;
309 my $chunk_number = 1;
310 my $one_time_through;
311 my $blocksize = $opt::blocksize;
313 my $header = find_header(\$buf,$in);
315 my $anything_written = 0;
316 if(not read($in,substr($buf,length $buf,0),$blocksize)) {
318 $chunk_number != 1 and last;
319 # Force the while-loop once if everything was read by header reading
320 $one_time_through++ and last;
324 $buf =~ s/^\s*\n//gm;
325 if(length $buf == 0) {
329 if($Global::max_lines and not $Global::max_number_of_args) {
330 # Read n-line records
331 my $n_lines = $buf =~ tr/\n/\n/;
332 my $last_newline_pos = rindex($buf,"\n");
333 while($n_lines % $Global::max_lines) {
335 $last_newline_pos = rindex($buf,"\n",$last_newline_pos-1);
337 # Chop at $last_newline_pos as that is where n-line record ends
339 write_record_to_pipe($chunk_number++,\$header,\$buf,
340 $recstart,$recend,$last_newline_pos+1);
341 substr($buf,0,$last_newline_pos+1) = "";
342 } elsif($opt::regexp) {
343 if($Global::max_number_of_args) {
344 # -N => (start..*?end){n}
345 # -L -N => (start..*?end){n*l}
346 my $read_n_lines = $Global::max_number_of_args * ($Global::max_lines || 1);
347 while($buf =~ s/((?:$recstart.*?$recend){$read_n_lines})($recstart.*)$/$2/os) {
348 # Copy to modifiable variable
351 write_record_to_pipe($chunk_number++,\$header,\$b,
352 $recstart,$recend,length $1);
355 # Find the last recend-recstart in $buf
356 if($buf =~ s/(.*$recend)($recstart.*?)$/$2/os) {
357 # Copy to modifiable variable
360 write_record_to_pipe($chunk_number++,\$header,\$b,
361 $recstart,$recend,length $1);
365 if($Global::max_number_of_args) {
366 # -N => (start..*?end){n}
368 my $read_n_lines = $Global::max_number_of_args * ($Global::max_lines || 1);
369 while(($i = nindex(\$buf,$recendrecstart,$read_n_lines)) != -1) {
370 $i += length $recend; # find the actual splitting location
372 write_record_to_pipe($chunk_number++,\$header,\$buf,
373 $recstart,$recend,$i);
374 substr($buf,0,$i) = "";
377 # Find the last recend-recstart in $buf
378 my $i = rindex($buf,$recendrecstart);
380 $i += length $recend; # find the actual splitting location
382 write_record_to_pipe($chunk_number++,\$header,\$buf,
383 $recstart,$recend,$i);
384 substr($buf,0,$i) = "";
388 if(not $anything_written and not eof($in)) {
389 # Nothing was written - maybe the block size < record size?
390 # Increase blocksize exponentially
391 my $old_blocksize = $blocksize;
392 $blocksize = ceil($blocksize * 1.3 + 1);
393 ::warning("A record was longer than $old_blocksize. " .
394 "Increasing to --blocksize $blocksize\n");
397 ::debug("init", "Done reading input\n");
399 # If there is anything left in the buffer write it
400 substr($buf,0,0) = "";
401 write_record_to_pipe($chunk_number++,\$header,\$buf,$recstart,$recend,length $buf);
403 $Global::start_no_new_jobs ||= 1;
404 if($opt::roundrobin) {
405 for my $job (values %Global::running) {
406 close $job->fh(0,"w");
408 my %incomplete_jobs = %Global::running;
410 while(keys %incomplete_jobs) {
411 my $something_written = 0;
412 for my $pid (keys %incomplete_jobs) {
413 my $job = $incomplete_jobs{$pid};
414 if($job->stdin_buffer_length()) {
415 $something_written += $job->non_block_write();
417 delete $incomplete_jobs{$pid}
420 if($something_written) {
421 $sleep = $sleep/2+0.001;
423 $sleep = ::reap_usleep($sleep);
433 # $recstart,$recend with default values and regexp conversion
434 my($recstart,$recend);
435 if(defined($opt::recstart) and defined($opt::recend)) {
436 # If both --recstart and --recend is given then both must match
437 $recstart = $opt::recstart;
438 $recend = $opt::recend;
439 } elsif(defined($opt::recstart)) {
440 # If --recstart is given it must match start of record
441 $recstart = $opt::recstart;
443 } elsif(defined($opt::recend)) {
444 # If --recend is given then it must match end of record
446 $recend = $opt::recend;
450 # If $recstart/$recend contains '|' this should only apply to the regexp
451 $recstart = "(?:".$recstart.")";
452 $recend = "(?:".$recend.")";
454 # $recstart/$recend = printf strings (\n)
455 $recstart =~ s/\\([0rnt\'\"\\])/"qq|\\$1|"/gee;
456 $recend =~ s/\\([0rnt\'\"\\])/"qq|\\$1|"/gee;
458 return ($recstart,$recend);
462 # See if string is in buffer N times
464 # the position where the Nth copy is found
465 my ($buf_ref, $str, $n) = @_;
468 $i = index($$buf_ref,$str,$i+1);
469 if($i == -1) { last }
477 sub round_robin_write {
479 # $header_ref = ref to $header string
480 # $block_ref = ref to $block to be written
481 # $recstart = record start string
482 # $recend = record end string
483 # $endpos = end position of $block
486 my ($header_ref,$block_ref,$recstart,$recend,$endpos) = @_;
487 my $something_written = 0;
488 my $block_passed = 0;
490 while(not $block_passed) {
491 # Continue flushing existing buffers
492 # until one is empty and a new block is passed
493 # Make a queue to spread the blocks evenly
494 if(not @robin_queue) {
495 push @robin_queue, values %Global::running;
497 while(my $job = shift @robin_queue) {
498 if($job->stdin_buffer_length() > 0) {
499 $something_written += $job->non_block_write();
501 $job->set_stdin_buffer($header_ref,$block_ref,$endpos,$recstart,$recend);
504 $something_written += $job->non_block_write();
508 $sleep = ::reap_usleep($sleep);
510 return $something_written;
514 sub write_record_to_pipe {
516 # Write record from pos 0 .. $endpos to pipe
518 # $chunk_number = sequence number - to see if already run
519 # $header_ref = reference to header string to prepend
520 # $record_ref = reference to record to write
521 # $recstart = start string of record
522 # $recend = end string of record
523 # $endpos = position in $record_ref where record ends
525 # $Global::job_already_run
527 # @Global::virgin_jobs
529 # Number of chunks written (0 or 1)
530 my ($chunk_number,$header_ref,$record_ref,$recstart,$recend,$endpos) = @_;
531 if($endpos == 0) { return 0; }
532 if(vec($Global::job_already_run,$chunk_number,1)) { return 1; }
533 if($opt::roundrobin) {
534 return round_robin_write($header_ref,$record_ref,$recstart,$recend,$endpos);
536 # If no virgin found, backoff
537 my $sleep = 0.0001; # 0.01 ms - better performance on highend
538 while(not @Global::virgin_jobs) {
539 ::debug("pipe", "No virgin jobs");
540 $sleep = ::reap_usleep($sleep);
541 # Jobs may not be started because of loadavg
542 # or too little time between each ssh login.
545 my $job = shift @Global::virgin_jobs;
546 # Job is no longer virgin
551 # Chop of at $endpos as we do not know how many rec_sep will
553 substr($$record_ref,$endpos,length $$record_ref) = "";
555 if($opt::remove_rec_sep) {
556 Job::remove_rec_sep($record_ref,$recstart,$recend);
558 $job->write($header_ref);
559 $job->write($record_ref);
560 close $job->fh(0,"w");
563 close $job->fh(0,"w");
569 sub acquire_semaphore {
570 # Acquires semaphore. If needed: spawns to the background
574 # The semaphore to be released when jobs is complete
575 $Global::host{':'} = SSHLogin->new(":");
576 my $sem = Semaphore->new($Semaphore::name,$Global::host{':'}->max_jobs_running());
581 # If run in the background, the PID will change
582 # therefore release and re-acquire the semaphore
588 # Get a semaphore for this pid
589 ::die_bug("Can't start a new session
: $!") if setsid() == -1;
590 $sem = Semaphore->new($Semaphore::name,$Global::host{':'}->max_jobs_running());
597 sub __PARSE_OPTIONS__ {}
601 # %hash = the GetOptions config
603 ("debug
|D
=s
" => \$opt::D,
604 "xargs
" => \$opt::xargs,
608 "joblog
=s
" => \$opt::joblog,
609 "results
|result
|res
=s
" => \$opt::results,
610 "resume
" => \$opt::resume,
611 "resume-failed
|resumefailed
" => \$opt::resume_failed,
612 "silent
" => \$opt::silent,
613 #"silent-error
|silenterror
" => \$opt::silent_error,
614 "keep-order
|keeporder
|k
" => \$opt::keeporder,
615 "group
" => \$opt::group,
616 "g
" => \$opt::retired,
617 "ungroup
|u
" => \$opt::ungroup,
618 "linebuffer
|linebuffered
|line-buffer
|line-buffered
" => \$opt::linebuffer,
619 "tmux
" => \$opt::tmux,
620 "null
|0" => \$opt::0,
621 "quote
|q
" => \$opt::q,
622 # Replacement strings
623 "parens
=s
" => \$opt::parens,
624 "rpl
=s
" => \@opt::rpl,
625 "plus
" => \$opt::plus,
627 "extensionreplace
|er
=s
" => \$opt::U,
628 "U
=s
" => \$opt::retired,
629 "basenamereplace
|bnr
=s
" => \$opt::basenamereplace,
630 "dirnamereplace
|dnr
=s
" => \$opt::dirnamereplace,
631 "basenameextensionreplace
|bner
=s
" => \$opt::basenameextensionreplace,
632 "seqreplace
=s
" => \$opt::seqreplace,
633 "slotreplace
=s
" => \$opt::slotreplace,
634 "jobs
|j
=s
" => \$opt::jobs,
635 "delay
=f
" => \$opt::delay,
636 "sshdelay
=f
" => \$opt::sshdelay,
637 "load
=s
" => \$opt::load,
638 "noswap
" => \$opt::noswap,
639 "max-line-length-allowed
" => \$opt::max_line_length_allowed,
640 "number-of-cpus
" => \$opt::number_of_cpus,
641 "number-of-cores
" => \$opt::number_of_cores,
642 "use-cpus-instead-of-cores
" => \$opt::use_cpus_instead_of_cores,
643 "shellquote
|shell_quote
|shell-quote
" => \$opt::shellquote,
644 "nice
=i
" => \$opt::nice,
645 "timeout
=s
" => \$opt::timeout,
647 "tagstring
|tag-string
=s
" => \$opt::tagstring,
648 "onall
" => \$opt::onall,
649 "nonall
" => \$opt::nonall,
650 "filter-hosts
|filterhosts
|filter-host
" => \$opt::filter_hosts,
651 "sshlogin
|S
=s
" => \@opt::sshlogin,
652 "sshloginfile
|slf
=s
" => \@opt::sshloginfile,
653 "controlmaster
|M
" => \$opt::controlmaster,
654 "return=s
" => \@opt::return,
655 "trc
=s
" => \@opt::trc,
656 "transfer
" => \$opt::transfer,
657 "cleanup
" => \$opt::cleanup,
658 "basefile
|bf
=s
" => \@opt::basefile,
659 "B
=s
" => \$opt::retired,
660 "ctrlc
|ctrl-c
" => \$opt::ctrlc,
661 "noctrlc
|no-ctrlc
|no-ctrl-c
" => \$opt::noctrlc,
662 "workdir
|work-dir
|wd
=s
" => \$opt::workdir,
663 "W
=s
" => \$opt::retired,
664 "tmpdir
=s
" => \$opt::tmpdir,
665 "tempdir
=s
" => \$opt::tmpdir,
666 "use-compress-program
|compress-program
=s
" => \$opt::compress_program,
667 "use-decompress-program
|decompress-program
=s
" => \$opt::decompress_program,
668 "compress
" => \$opt::compress,
670 "T
" => \$opt::retired,
671 "halt-on-error
|halt
=s
" => \$opt::halt_on_error,
672 "H
=i
" => \$opt::retired,
673 "retries
=i
" => \$opt::retries,
674 "dry-run
|dryrun
" => \$opt::dryrun,
675 "progress
" => \$opt::progress,
678 "arg-sep
|argsep
=s
" => \$opt::arg_sep,
679 "arg-file-sep
|argfilesep
=s
" => \$opt::arg_file_sep,
680 "trim
=s
" => \$opt::trim,
681 "env
=s
" => \@opt::env,
682 "recordenv
|record-env
" => \$opt::record_env,
683 "plain
" => \$opt::plain,
684 "profile
|J
=s
" => \@opt::profile,
685 "pipe|spreadstdin
" => \$opt::pipe,
686 "robin
|round-robin
|roundrobin
" => \$opt::roundrobin,
687 "recstart
=s
" => \$opt::recstart,
688 "recend
=s
" => \$opt::recend,
689 "regexp
|regex
" => \$opt::regexp,
690 "remove-rec-sep
|removerecsep
|rrs
" => \$opt::remove_rec_sep,
691 "files
|output-as-files
|outputasfiles
" => \$opt::files,
692 "block
|block-size
|blocksize
=s
" => \$opt::blocksize,
693 "tollef
" => \$opt::retired,
695 "xapply
" => \$opt::xapply,
696 "bibtex
" => \$opt::bibtex,
697 "nn
|nonotice
|no-notice
" => \$opt::no_notice,
698 # xargs-compatibility - implemented, man, testsuite
699 "max-procs
|P
=s
" => \$opt::jobs,
700 "delimiter
|d
=s
" => \$opt::d,
701 "max-chars
|s
=i
" => \$opt::max_chars,
702 "arg-file
|a
=s
" => \@opt::a,
703 "no-run-if-empty
|r
" => \$opt::r,
704 "replace
|i
:s
" => \$opt::i,
706 "eof|e
:s
" => \$opt::eof,
707 "max-args
|n
=i
" => \$opt::max_args,
708 "max-replace-args
|N
=i
" => \$opt::max_replace_args,
709 "colsep
|col-sep
|C
=s
" => \$opt::colsep,
710 "help
|h
" => \$opt::help,
712 "max-lines
|l
:f
" => \$opt::max_lines,
713 "interactive
|p
" => \$opt::p,
714 "verbose
|t
" => \$opt::verbose,
715 "version
|V
" => \$opt::version,
716 "minversion
|min-version
=i
" => \$opt::minversion,
717 "show-limits
|showlimits
" => \$opt::show_limits,
718 "exit|x
" => \$opt::x,
720 "semaphore
" => \$opt::semaphore,
721 "semaphoretimeout
=i
" => \$opt::semaphoretimeout,
722 "semaphorename
|id
=s
" => \$opt::semaphorename,
725 "wait" => \$opt::wait,
726 # Shebang #!/usr/bin/parallel --shebang
727 "shebang
|hashbang
" => \$opt::shebang,
728 "internal-pipe-means-argfiles
" => \$opt::internal_pipe_means_argfiles,
729 "Y
" => \$opt::retired,
730 "skip-first-line
" => \$opt::skip_first_line,
731 "header
=s
" => \$opt::header,
733 "fifo
" => \$opt::fifo,
734 "pipepart
|pipe-part
" => \$opt::pipepart,
735 "hgrp
|hostgroup
|hostgroups
" => \$opt::hostgroups,
739 sub get_options_from_array {
740 # Run GetOptions on @array
742 # $array_ref = ref to @ARGV to parse
743 # @keep_only = Keep only these options
747 # true if parsing worked
748 # false if parsing failed
749 # @$array_ref is changed
750 my ($array_ref, @keep_only) = @_;
751 if(not @$array_ref) {
752 # Empty array: No need to look more at that
755 # A bit of shuffling of @ARGV needed as GetOptionsFromArray is not
756 # supported everywhere
758 my $this_is_ARGV = (\@::ARGV == $array_ref);
759 if(not $this_is_ARGV) {
760 @save_argv = @::ARGV;
761 @::ARGV = @{$array_ref};
763 # If @keep_only set: Ignore all values except @keep_only
764 my %options = options_hash();
767 @keep{@keep_only} = @keep_only;
768 for my $k (grep { not $keep{$_} } keys %options) {
769 # Store the value of the option in @dummy
770 $options{$k} = \@dummy;
773 my $retval = GetOptions(%options);
774 if(not $this_is_ARGV) {
775 @{$array_ref} = @::ARGV;
776 @::ARGV = @save_argv;
784 $Global::version = 20141122;
785 $Global::progname = 'parallel';
786 $Global::infinity = 2**31;
788 $Global::verbose = 0;
789 $Global::quoting = 0;
790 # Read only table with default --rpl values
794 '{#}' => '1 $_=$job->seq()',
795 '{%}' => '1 $_=$job->slot()',
797 '{//}' => '$Global::use{"File
::Basename
"} ||= eval "use File
::Basename
; 1;"; $_ = dirname($_);',
798 '{/.}' => 's:.*/::; s:\.[^/.]+$::;',
799 '{.}' => 's:\.[^/.]+$::',
804 # = {.}.{+.} = {+/}/{/.}.{+.}
805 # = {..}.{+..} = {+/}/{/..}.{+..}
806 # = {...}.{+...} = {+/}/{/...}.{+...}
807 '{+/}' => 's:/[^/]*$::',
808 '{+.}' => 's:.*\.::',
809 '{+..}' => 's:.*\.([^.]*\.):$1:',
810 '{+...}' => 's:.*\.([^.]*\.[^.]*\.):$1:',
811 '{..}' => 's:\.[^/.]+$::; s:\.[^/.]+$::',
812 '{...}' => 's:\.[^/.]+$::; s:\.[^/.]+$::; s:\.[^/.]+$::',
813 '{/..}' => 's:.*/::; s:\.[^/.]+$::; s:\.[^/.]+$::',
814 '{/...}' => 's:.*/::; s:\.[^/.]+$::; s:\.[^/.]+$::; s:\.[^/.]+$::',
816 # Modifiable copy of %Global::replace
817 %Global::rpl = %Global::replace;
818 $Global::parens = "{==}";
820 $Global::ignore_empty = 0;
821 $Global::interactive = 0;
822 $Global::stderr_verbose = 0;
823 $Global::default_simultaneous_sshlogins = 9;
824 $Global::exitstatus = 0;
825 $Global::halt_on_error_exitstatus = 0;
826 $Global::arg_sep = ":::";
827 $Global::arg_file_sep = "::::";
829 $Global::max_jobs_running = 0;
830 $Global::job_already_run = '';
831 $ENV{'TMPDIR'} ||= "/tmp
";
833 @ARGV=read_options();
835 if(@opt::v) { $Global::verbose = $#opt::v+1; } # Convert -v -v to v=2
836 $Global::debug = $opt::D;
837 $Global::shell = $ENV{'PARALLEL_SHELL'} || parent_shell($$) || $ENV{'SHELL'} || "/bin/sh
";
838 if(defined $opt::X) { $Global::ContextReplace = 1; }
839 if(defined $opt::silent) { $Global::verbose = 0; }
840 if(defined $opt::0) { $/ = "\
0"; }
841 if(defined $opt::d) { my $e="sprintf \"$opt::d
\""; $/ = eval $e; }
842 if(defined $opt::p) { $Global::interactive = $opt::p; }
843 if(defined $opt::q) { $Global::quoting = 1; }
844 if(defined $opt::r) { $Global::ignore_empty = 1; }
845 if(defined $opt::verbose) { $Global::stderr_verbose = 1; }
848 # Modify %Global::rpl
849 # Replace $old with $new
852 $Global::rpl{$new} = $Global::rpl{$old};
853 delete $Global::rpl{$old};
856 if(defined $opt::parens) { $Global::parens = $opt::parens; }
857 my $parenslen = 0.5*length $Global::parens;
858 $Global::parensleft = substr($Global::parens,0,$parenslen);
859 $Global::parensright = substr($Global::parens,$parenslen);
860 if(defined $opt::plus) { %Global::rpl = (%Global::plus,%Global::rpl); }
861 if(defined $opt::I) { rpl('{}',$opt::I); }
862 if(defined $opt::U) { rpl('{.}',$opt::U); }
863 if(defined $opt::i and $opt::i) { rpl('{}',$opt::i); }
864 if(defined $opt::basenamereplace) { rpl('{/}',$opt::basenamereplace); }
865 if(defined $opt::dirnamereplace) { rpl('{//}',$opt::dirnamereplace); }
866 if(defined $opt::seqreplace) { rpl('{#}',$opt::seqreplace); }
867 if(defined $opt::slotreplace) { rpl('{%}',$opt::slotreplace); }
868 if(defined $opt::basenameextensionreplace) {
869 rpl('{/.}',$opt::basenameextensionreplace);
872 # Create $Global::rpl entries for --rpl options
873 # E.g: "{..} s
:\
.[^.]+$:;s
:\
.[^.]+$:;"
874 my ($shorthand,$long) = split/ /,$_,2;
875 $Global::rpl{$shorthand} = $long;
877 if(defined $opt::eof) { $Global::end_of_file_string = $opt::eof; }
878 if(defined $opt::max_args) { $Global::max_number_of_args = $opt::max_args; }
879 if(defined $opt::timeout) { $Global::timeoutq = TimeoutQueue->new($opt::timeout); }
880 if(defined $opt::tmpdir) { $ENV{'TMPDIR'} = $opt::tmpdir; }
881 if(defined $opt::help) { die_usage(); }
882 if(defined $opt::colsep) { $Global::trim = 'lr'; }
883 if(defined $opt::header) { $opt::colsep = defined $opt::colsep ? $opt::colsep : "\t"; }
884 if(defined $opt::trim) { $Global::trim = $opt::trim; }
885 if(defined $opt::arg_sep) { $Global::arg_sep = $opt::arg_sep; }
886 if(defined $opt::arg_file_sep) { $Global::arg_file_sep = $opt::arg_file_sep; }
887 if(defined $opt::number_of_cpus) { print SSHLogin::no_of_cpus(),"\n"; wait_and_exit(0); }
888 if(defined $opt::number_of_cores) {
889 print SSHLogin::no_of_cores(),"\n"; wait_and_exit(0);
891 if(defined $opt::max_line_length_allowed) {
892 print Limits::Command::real_max_length(),"\n"; wait_and_exit(0);
894 if(defined $opt::version) { version(); wait_and_exit(0); }
895 if(defined $opt::bibtex) { bibtex(); wait_and_exit(0); }
896 if(defined $opt::record_env) { record_env(); wait_and_exit(0); }
897 if(defined $opt::show_limits) { show_limits(); }
898 if(@opt::sshlogin) { @Global::sshlogin = @opt::sshlogin; }
899 if(@opt::sshloginfile) { read_sshloginfiles(@opt::sshloginfile); }
900 if(@opt::return) { push @Global::ret_files, @opt::return; }
901 if(not defined $opt::recstart and
902 not defined $opt::recend) { $opt::recend = "\n"; }
903 if(not defined $opt::blocksize) { $opt::blocksize = "1M
"; }
904 $opt::blocksize = multiply_binary_prefix($opt::blocksize);
905 if(defined $opt::controlmaster) { $opt::noctrlc = 1; }
906 if(defined $opt::semaphore) { $Global::semaphore = 1; }
907 if(defined $opt::semaphoretimeout) { $Global::semaphore = 1; }
908 if(defined $opt::semaphorename) { $Global::semaphore = 1; }
909 if(defined $opt::fg) { $Global::semaphore = 1; }
910 if(defined $opt::bg) { $Global::semaphore = 1; }
911 if(defined $opt::wait) { $Global::semaphore = 1; }
912 if(defined $opt::halt_on_error and
913 $opt::halt_on_error=~/%/) { $opt::halt_on_error /= 100; }
914 if(defined $opt::timeout and $opt::timeout !~ /^\d+(\.\d+)?%?$/) {
915 ::error("--timeout must be seconds
or percentage
\n");
918 if(defined $opt::minversion) {
919 print $Global::version,"\n";
920 if($Global::version < $opt::minversion) {
926 if(not defined $opt::delay) {
927 # Set --delay to --sshdelay if not set
928 $opt::delay = $opt::sshdelay;
930 if($opt::compress_program) {
932 $opt::decompress_program ||= $opt::compress_program." -dc
";
935 my ($compress, $decompress) = find_compression_program();
936 $opt::compress_program ||= $compress;
937 $opt::decompress_program ||= $decompress;
939 if(defined $opt::nonall) {
940 # Append a dummy empty argument
941 push @ARGV, $Global::arg_sep, "";
943 if(defined $opt::tty) {
944 # Defaults for --tty: -j1 -u
945 # Can be overridden with -jXXX -g
946 if(not defined $opt::jobs) {
949 if(not defined $opt::group) {
954 push @Global::ret_files, @opt::trc;
958 if(defined $opt::max_lines) {
959 if($opt::max_lines eq "-0") {
960 # -l -0 (swallowed -0)
964 } elsif ($opt::max_lines == 0) {
965 # If not given (or if 0 is given) => 1
968 $Global::max_lines = $opt::max_lines;
970 # --pipe -L means length of record - not max_number_of_args
971 $Global::max_number_of_args ||= $Global::max_lines;
975 # Read more than one arg at a time (-L, -N)
976 if(defined $opt::L) {
977 $Global::max_lines = $opt::L;
979 # --pipe -L means length of record - not max_number_of_args
980 $Global::max_number_of_args ||= $Global::max_lines;
983 if(defined $opt::max_replace_args) {
984 $Global::max_number_of_args = $opt::max_replace_args;
985 $Global::ContextReplace = 1;
987 if((defined $opt::L or defined $opt::max_replace_args)
989 not ($opt::xargs or $opt::m)) {
990 $Global::ContextReplace = 1;
992 if(defined $opt::tag and not defined $opt::tagstring) {
993 $opt::tagstring = "\257<\257>"; # Default = {}
995 if(defined $opt::pipepart and
996 (defined $opt::L or defined $opt::max_lines
997 or defined $opt::max_replace_args)) {
998 ::error("--pipepart
is incompatible with
--max-replace-args
, ",
999 "--max-lines
, and -L
.\n");
1002 if(grep /^$Global::arg_sep$|^$Global::arg_file_sep$/o, @ARGV) {
1003 # Deal with ::: and ::::
1004 @ARGV=read_args_from_command_line();
1007 # Semaphore defaults
1008 # Must be done before computing number of processes and max_line_length
1009 # because when running as a semaphore GNU Parallel does not read args
1010 $Global::semaphore ||= ($0 =~ m:(^|/)sem$:); # called as 'sem'
1011 if($Global::semaphore) {
1012 # A semaphore does not take input from neither stdin nor file
1013 @opt::a = ("/dev/null
");
1014 push(@Global::unget_argv, [Arg->new("")]);
1015 $Semaphore::timeout = $opt::semaphoretimeout || 0;
1016 if(defined $opt::semaphorename) {
1017 $Semaphore::name = $opt::semaphorename;
1019 $Semaphore::name = `tty`;
1020 chomp $Semaphore::name;
1022 $Semaphore::fg = $opt::fg;
1023 $Semaphore::wait = $opt::wait;
1024 $Global::default_simultaneous_sshlogins = 1;
1025 if(not defined $opt::jobs) {
1028 if($Global::interactive and $opt::bg) {
1029 ::error("Jobs running
in the
".
1030 "background cannot be interactive
.\n");
1031 ::wait_and_exit(255);
1034 if(defined $opt::eta) {
1035 $opt::progress = $opt::eta;
1037 if(defined $opt::bar) {
1038 $opt::progress = $opt::bar;
1040 if(defined $opt::retired) {
1041 ::error("-g
has been retired
. Use
--group
.\n");
1042 ::error("-B
has been retired
. Use
--bf
.\n");
1043 ::error("-T
has been retired
. Use
--tty
.\n");
1044 ::error("-U
has been retired
. Use
--er
.\n");
1045 ::error("-W
has been retired
. Use
--wd
.\n");
1046 ::error("-Y
has been retired
. Use
--shebang
.\n");
1047 ::error("-H
has been retired
. Use
--halt
.\n");
1048 ::error("--tollef
has been retired
. Use
-u
-q
--arg-sep
-- and --load
for -l
.\n");
1049 ::wait_and_exit(255);
1056 if(remote_hosts() and ($opt::X or $opt::m or $opt::xargs)) {
1057 # As we do not know the max line length on the remote machine
1058 # long commands generated by xargs may fail
1059 # If opt_N is set, it is probably safe
1060 ::warning("Using
-X
or -m with
--sshlogin may fail
.\n");
1063 if(not defined $opt::jobs) {
1064 $opt::jobs = "100%";
1071 # $v = value to quote
1073 # $v = value quoted as environment variable
1075 $v =~ s/([\\])/\\$1/g;
1076 $v =~ s/([\[\] \#\'\&\<\>\(\)\;\{\}\t\"\$\`\*\174\!\?\~])/\\$1/g;
1082 # Record current %ENV-keys in ~/.parallel/ignored_vars
1084 my $ignore_filename = $ENV{'HOME'} . "/.parallel/ignored_vars
";
1085 if(open(my $vars_fh, ">", $ignore_filename)) {
1086 print $vars_fh map { $_,"\n" } keys %ENV;
1088 ::error("Cannot
write to
$ignore_filename\n");
1089 ::wait_and_exit(255);
1094 # Parse --env and set $Global::envvar, $Global::envwarn and $Global::envvarlen
1096 # Bash functions must be parsed to export them remotely
1097 # Pre-shellshock style bash function:
1099 # Post-shellshock style bash function:
1100 # BASH_FUNC_myfunc()=() {...
1103 # $Global::envvar = eval string that will set variables in both bash and csh
1104 # $Global::envwarn = If functions are used: Give warning in csh
1105 # $Global::envvarlen = length of $Global::envvar
1110 $Global::envvar = "";
1111 $Global::envwarn = "";
1112 my @vars = ('parallel_bash_environment');
1113 for my $varstring (@opt::env) {
1114 # Split up --env VAR1,VAR2
1115 push @vars, split /,/, $varstring;
1117 if(grep { /^_$/ } @vars) {
1119 # Include all vars that are not in a clean environment
1120 if(open(my $vars_fh, "<", $ENV{'HOME'} . "/.parallel/ignored_vars
")) {
1121 my @ignore = <$vars_fh>;
1124 @ignore{@ignore} = @ignore;
1126 push @vars, grep { not defined $ignore{$_} } keys %ENV;
1127 @vars = grep { not /^_$/ } @vars;
1129 ::error("Run
'$Global::progname --record-env' in a clean environment first
.\n");
1130 ::wait_and_exit(255);
1133 # Duplicate vars as BASH functions to include post-shellshock functions.
1134 # So --env myfunc should also look for BASH_FUNC_myfunc()
1135 @vars = map { $_, "BASH_FUNC_
$_()" } @vars;
1136 # Keep only defined variables
1137 @vars = grep { defined($ENV{$_}) } @vars;
1138 # Pre-shellshock style bash function:
1139 # myfunc=() { echo myfunc
1141 # Post-shellshock style bash function:
1142 # BASH_FUNC_myfunc()=() { echo myfunc
1144 my @bash_functions = grep { substr($ENV{$_},0,4) eq "() {" } @vars;
1145 my @non_functions = grep { substr($ENV{$_},0,4) ne "() {" } @vars;
1146 if(@bash_functions) {
1147 # Functions are not supported for all shells
1148 if($Global::shell !~ m:/(bash|rbash|zsh|rzsh|dash|ksh):) {
1149 ::warning("Shell functions may
not be supported
in $Global::shell
\n");
1153 # Pre-shellschock names are without ()
1154 my @bash_pre_shellshock = grep { not /\(\)/ } @bash_functions;
1155 # Post-shellschock names are with ()
1156 my @bash_post_shellshock = grep { /\(\)/ } @bash_functions;
1158 my @qcsh = (map { my $a=$_; "setenv
$a " . env_quote($ENV{$a}) }
1159 grep { not /^parallel_bash_environment$/ } @non_functions);
1160 my @qbash = (map { my $a=$_; "export
$a=" . env_quote($ENV{$a}) }
1161 @non_functions, @bash_pre_shellshock);
1163 push @qbash, map { my $a=$_; "eval $a\"\$$a\"" } @bash_pre_shellshock;
1164 push @qbash, map { /BASH_FUNC_(.*)\(\)/; "$1 $ENV{$_}" } @bash_post_shellshock;
1166 #ssh -tt -oLogLevel=quiet lo 'eval `echo PARALLEL_SEQ='$PARALLEL_SEQ'\;export PARALLEL_SEQ\; PARALLEL_PID='$PARALLEL_PID'\;export PARALLEL_PID` ;' tty\ \>/dev/null\ \&\&\ stty\ isig\ -onlcr\ -echo\;echo\ \$SHELL\ \|\ grep\ \"/t\\\{0,1\\\}csh\"\ \>\ /dev/null\ \&\&\ setenv\ BASH_FUNC_myfunc\ \\\(\\\)\\\ \\\{\\\ \\\ echo\\\ a\"'
1167 #'\"\\\}\ \|\|\ myfunc\(\)\ \{\ \ echo\ a'
1170 # Check if any variables contain \n
1171 if(my @v = map { s/BASH_FUNC_(.*)\(\)/$1/; $_ } grep { $ENV{$_}=~/\n/ } @vars) {
1172 # \n is bad for csh and will cause it to fail.
1173 $Global::envwarn = ::shell_quote_scalar(q{echo $SHELL | grep -E "/t?csh" > /dev
/null && echo CSH/TCSH DO NOT SUPPORT newlines IN VARIABLES
/FUNCTIONS
. Unset
}."@v".q{ && exec false;}."\n\n") . $Global::envwarn
;
1176 if(not @qcsh) { push @qcsh, "true"; }
1177 if(not @qbash) { push @qbash, "true"; }
1178 # Create lines like:
1179 # echo $SHELL | grep "/t\\{0,1\\}csh" >/dev/null && setenv V1 val1 && setenv V2 val2 || export V1=val1 && export V2=val2 ; echo "$V1$V2"
1183 (q{echo $SHELL | grep "/t\\{0,1\\}csh
" > /dev/null && }
1184 . join(" && ", @qcsh)
1186 . join(" && ", @qbash)
1188 if($ENV{'parallel_bash_environment'}) {
1189 $Global::envvar
.= 'eval "$parallel_bash_environment";'."\n";
1192 $Global::envvarlen
= length $Global::envvar
;
1196 # Open joblog as specified by --joblog
1199 # $opt::resume_failed
1202 # $Global::job_already_run
1205 if(($opt::resume
or $opt::resume_failed
)
1207 not ($opt::joblog
or $opt::results
)) {
1208 ::error
("--resume and --resume-failed require --joblog or --results.\n");
1209 ::wait_and_exit
(255);
1212 if($opt::resume
|| $opt::resume_failed
) {
1213 if(open(my $joblog_fh, "<", $opt::joblog
)) {
1215 $append = <$joblog_fh>; # If there is a header: Open as append later
1217 if($opt::resume_failed
) {
1218 # Make a regexp that only matches commands with exit+signal=0
1219 # 4 host 1360490623.067 3.445 1023 1222 0 0 command
1220 $joblog_regexp='^(\d+)(?:\t[^\t]+){5}\t0\t0\t';
1222 # Just match the job number
1223 $joblog_regexp='^(\d+)';
1225 while(<$joblog_fh>) {
1226 if(/$joblog_regexp/o) {
1227 # This is 30% faster than set_job_already_run($1);
1228 vec($Global::job_already_run
,($1||0),1) = 1;
1229 } elsif(not /\d+\s+[^\s]+\s+([0-9.]+\s+){6}/) {
1230 ::error
("Format of '$opt::joblog' is wrong: $_");
1231 ::wait_and_exit
(255);
1239 if(not open($Global::joblog
, ">>", $opt::joblog
)) {
1240 ::error
("Cannot append to --joblog $opt::joblog.\n");
1241 ::wait_and_exit
(255);
1244 if($opt::joblog
eq "-") {
1245 # Use STDOUT as joblog
1246 $Global::joblog
= $Global::fd
{1};
1247 } elsif(not open($Global::joblog
, ">", $opt::joblog
)) {
1248 # Overwrite the joblog
1249 ::error
("Cannot write to --joblog $opt::joblog.\n");
1250 ::wait_and_exit
(255);
1252 print $Global::joblog
1253 join("\t", "Seq", "Host", "Starttime", "JobRuntime",
1254 "Send", "Receive", "Exitval", "Signal", "Command"
1260 sub find_compression_program
{
1261 # Find a fast compression program
1263 # $compress_program = compress program with options
1264 # $decompress_program = decompress program with options
1266 # Search for these. Sorted by speed
1267 my @prg = qw(lzop pigz pxz gzip plzip pbzip2 lzma xz lzip bzip2);
1270 return ("$p -c -1","$p -dc");
1274 return ("cat","cat");
1279 # Read options from command line, profile and $PARALLEL
1281 # $opt::shebang_wrap
1289 # @ARGV_no_opt = @ARGV without --options
1291 # This must be done first as this may exec myself
1292 if(defined $ARGV[0] and ($ARGV[0] =~ /^--shebang/ or
1293 $ARGV[0] =~ /^--shebang-?wrap/ or
1294 $ARGV[0] =~ /^--hashbang/)) {
1295 # Program is called from #! line in script
1296 # remove --shebang-wrap if it is set
1297 $opt::shebang_wrap
= ($ARGV[0] =~ s/^--shebang-?wrap *//);
1298 # remove --shebang if it is set
1299 $opt::shebang
= ($ARGV[0] =~ s/^--shebang *//);
1300 # remove --hashbang if it is set
1301 $opt::shebang
.= ($ARGV[0] =~ s/^--hashbang *//);
1303 my $argfile = shell_quote_scalar
(pop @ARGV);
1304 # exec myself to split $ARGV[0] into separate fields
1305 exec "$0 --skip-first-line -a $argfile @ARGV";
1307 if($opt::shebang_wrap
) {
1310 if ($^O eq 'freebsd') {
1311 # FreeBSD's #! puts different values in @ARGV than Linux' does.
1312 my @nooptions = @ARGV;
1313 get_options_from_array
(\
@nooptions);
1314 while($#ARGV > $#nooptions) {
1315 push @options, shift @ARGV;
1317 while(@ARGV and $ARGV[0] ne ":::") {
1318 push @parser, shift @ARGV;
1320 if(@ARGV and $ARGV[0] eq ":::") {
1324 @options = shift @ARGV;
1326 my $script = shell_quote_scalar
(shift @ARGV);
1327 # exec myself to split $ARGV[0] into separate fields
1328 exec "$0 --internal-pipe-means-argfiles @options @parser $script ::: @ARGV";
1332 Getopt
::Long
::Configure
("bundling","require_order");
1333 my @ARGV_copy = @ARGV;
1334 # Check if there is a --profile to set @opt::profile
1335 get_options_from_array
(\
@ARGV_copy,"profile|J=s","plain") || die_usage
();
1336 my @ARGV_profile = ();
1338 if(not $opt::plain
) {
1339 # Add options from .parallel/config and other profiles
1340 my @config_profiles = (
1341 "/etc/parallel/config",
1342 $ENV{'HOME'}."/.parallel/config",
1343 $ENV{'HOME'}."/.parallelrc");
1344 my @profiles = @config_profiles;
1346 # --profile overrides default profiles
1348 for my $profile (@opt::profile
) {
1350 push @profiles, $profile;
1352 push @profiles, $ENV{'HOME'}."/.parallel/".$profile;
1356 for my $profile (@profiles) {
1358 open (my $in_fh, "<", $profile) || ::die_bug
("read-profile: $profile");
1362 push @ARGV_profile, shellwords
($_);
1366 if(grep /^$profile$/, @config_profiles) {
1367 # config file is not required to exist
1369 ::error
("$profile not readable.\n");
1374 # Add options from shell variable $PARALLEL
1375 if($ENV{'PARALLEL'}) {
1376 @ARGV_env = shellwords
($ENV{'PARALLEL'});
1379 Getopt
::Long
::Configure
("bundling","require_order");
1380 get_options_from_array
(\
@ARGV_profile) || die_usage
();
1381 get_options_from_array
(\
@ARGV_env) || die_usage
();
1382 get_options_from_array
(\
@ARGV) || die_usage
();
1384 # Prepend non-options to @ARGV (such as commands like 'nice')
1385 unshift @ARGV, @ARGV_profile, @ARGV_env;
1389 sub read_args_from_command_line
{
1390 # Arguments given on the command line after:
1391 # ::: ($Global::arg_sep)
1392 # :::: ($Global::arg_file_sep)
1393 # Removes the arguments from @ARGV and:
1394 # - puts filenames into -a
1395 # - puts arguments into files and add the files to -a
1397 # @::ARGV = command option ::: arg arg arg :::: argfiles
1400 # $Global::arg_file_sep
1401 # $opt::internal_pipe_means_argfiles
1405 # @argv_no_argsep = @::ARGV without ::: and :::: and following args
1407 for(my $arg = shift @ARGV; @ARGV; $arg = shift @ARGV) {
1408 if($arg eq $Global::arg_sep
1410 $arg eq $Global::arg_file_sep
) {
1411 my $group = $arg; # This group of arguments is args or argfiles
1413 while(defined ($arg = shift @ARGV)) {
1414 if($arg eq $Global::arg_sep
1416 $arg eq $Global::arg_file_sep
) {
1417 # exit while loop if finding new separator
1420 # If not hitting ::: or ::::
1421 # Append it to the group
1426 if($group eq $Global::arg_file_sep
1427 or ($opt::internal_pipe_means_argfiles
and $opt::pipe)
1429 # Group of file names on the command line.
1430 # Append args into -a
1431 push @opt::a
, @group;
1432 } elsif($group eq $Global::arg_sep
) {
1433 # Group of arguments on the command line.
1434 # Put them into a file.
1436 my ($outfh,$name) = ::tmpfile
(SUFFIX
=> ".arg");
1438 # Put args into argfile
1439 print $outfh map { $_,$/ } @group;
1441 # Append filehandle to -a
1442 push @opt::a
, $outfh;
1444 ::die_bug
("Unknown command line group: $group");
1447 # $arg is ::: or ::::
1450 # $arg is undef -> @ARGV empty
1454 push @new_argv, $arg;
1456 # Output: @ARGV = command to run with options
1462 if(@opt::basefile
) { cleanup_basefile
(); }
1465 sub __QUOTING_ARGUMENTS_FOR_SHELL__
{}
1469 # @strings = strings to be quoted
1471 # @shell_quoted_strings = string quoted with \ as needed by the shell
1473 for my $a (@strings) {
1474 $a =~ s/([\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\*\>\<\~\|\; \"\!\$\&\'\202-\377])/\\$1/g;
1475 $a =~ s/[\n]/'\n'/g; # filenames with '\n' is quoted using \'
1477 return wantarray ?
@strings : "@strings";
1480 sub shell_quote_empty
{
1482 # @strings = strings to be quoted
1484 # @quoted_strings = empty strings quoted as ''.
1485 my @strings = shell_quote
(@_);
1486 for my $a (@strings) {
1491 return wantarray ?
@strings : "@strings";
1494 sub shell_quote_scalar
{
1495 # Quote the string so shell will not expand any special chars
1497 # $string = string to be quoted
1499 # $shell_quoted = string quoted with \ as needed by the shell
1502 # $a =~ s/([\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\*\>\<\~\|\; \"\!\$\&\'\202-\377])/\\$1/g;
1503 # This is 1% faster than the above
1504 $a =~ s/[\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\*\>\<\~\|\; \"\!\$\&\'\202-\377]/\\$&/go;
1505 $a =~ s/[\n]/'\n'/go; # filenames with '\n' is quoted using \'
1510 sub shell_quote_file
{
1511 # Quote the string so shell will not expand any special chars and prepend ./ if needed
1513 # $filename = filename to be shell quoted
1515 # $quoted_filename = filename quoted with \ as needed by the shell and ./ if needed
1516 my $a = shell_quote_scalar
(shift);
1518 if($a =~ m
:^/: or $a =~ m:^\./:) {
1519 # /abs/path or ./rel/path => skip
1521 # rel/path => ./rel/path
1530 # $string = shell line
1532 # @shell_words = $string split into words as shell would do
1533 $Global::use{"Text::ParseWords"} ||= eval "use Text::ParseWords; 1;";
1534 return Text
::ParseWords
::shellwords
(@_);
1538 sub __FILEHANDLES__
{}
1541 sub save_stdin_stdout_stderr
{
1542 # Remember the original STDIN, STDOUT and STDERR
1543 # and file descriptors opened by the shell (e.g. 3>/tmp/foo)
1546 # $Global::original_stderr
1547 # $Global::original_stdin
1550 # Find file descriptors that are already opened (by the shell)
1551 for my $fdno (1..61) {
1552 # /dev/fd/62 and above are used by bash for <(cmd)
1554 # 2-argument-open is used to be compatible with old perl 5.8.0
1555 # bug #43570: Perl 5.8.0 creates 61 files
1556 if(open($fh,">&=$fdno")) {
1557 $Global::fd
{$fdno}=$fh;
1560 open $Global::original_stderr
, ">&", "STDERR" or
1561 ::die_bug
("Can't dup STDERR: $!");
1562 open $Global::original_stdin
, "<&", "STDIN" or
1563 ::die_bug
("Can't dup STDIN: $!");
1564 $Global::is_terminal
= (-t
$Global::original_stderr
) && !$ENV{'CIRCLECI'} && !$ENV{'TRAVIS'};
1567 sub enough_file_handles
{
1568 # Check that we have enough filehandles available for starting
1574 # 1 if ungrouped (thus not needing extra filehandles)
1575 # 0 if too few filehandles
1576 # 1 if enough filehandles
1577 if(not $opt::ungroup
) {
1579 my $enough_filehandles = 1;
1580 # perl uses 7 filehandles for something?
1581 # open3 uses 2 extra filehandles temporarily
1582 # We need a filehandle for each redirected file descriptor
1583 # (normally just STDOUT and STDERR)
1584 for my $i (1..(7+2+keys %Global::fd
)) {
1585 $enough_filehandles &&= open($fh{$i}, "<", "/dev/null");
1587 for (values %fh) { close $_; }
1588 return $enough_filehandles;
1590 # Ungrouped does not need extra file handles
1596 # Open a file name or exit if the file cannot be opened
1598 # $file = filehandle or filename to open
1600 # $Global::stdin_in_opt_a
1601 # $Global::original_stdin
1603 # $fh = file handle to read-opened file
1606 $Global::stdin_in_opt_a
= 1;
1607 return ($Global::original_stdin
|| *STDIN
);
1609 if(ref $file eq "GLOB") {
1610 # This is an open filehandle
1614 if(not open($fh, "<", $file)) {
1615 ::error
("Cannot open input file `$file': No such file or directory.\n");
1621 sub __RUNNING_THE_JOBS_AND_PRINTING_PROGRESS__
{}
1623 # Variable structure:
1625 # $Global::running{$pid} = Pointer to Job-object
1626 # @Global::virgin_jobs = Pointer to Job-object that have received no input
1627 # $Global::host{$sshlogin} = Pointer to SSHLogin-object
1628 # $Global::total_running = total number of running jobs
1629 # $Global::total_started = total jobs started
1632 $Global::total_running
= 0;
1633 $Global::total_started
= 0;
1634 $Global::tty_taken
= 0;
1635 $SIG{USR1
} = \
&list_running_jobs
;
1636 $SIG{USR2
} = \
&toggle_progress
;
1637 if(@opt::basefile
) { setup_basefile
(); }
1644 sub start_more_jobs
{
1645 # Run start_another_job() but only if:
1646 # * not $Global::start_no_new_jobs set
1647 # * not JobQueue is empty
1648 # * not load on server is too high
1649 # * not server swapping
1650 # * not too short time since last remote login
1652 # $Global::max_procs_file
1653 # $Global::max_procs_file_last_mod
1655 # @opt::sshloginfile
1656 # $Global::start_no_new_jobs
1657 # $opt::filter_hosts
1663 # $Global::newest_starttime
1665 # $jobs_started = number of jobs started
1666 my $jobs_started = 0;
1667 my $jobs_started_this_round = 0;
1668 if($Global::start_no_new_jobs
) {
1669 return $jobs_started;
1671 if(time - ($last_time||0) > 1) {
1672 # At most do this every second
1674 if($Global::max_procs_file
) {
1676 my $mtime = (stat($Global::max_procs_file
))[9];
1677 if($mtime > $Global::max_procs_file_last_mod
) {
1678 # file changed: Force re-computing max_jobs_running
1679 $Global::max_procs_file_last_mod
= $mtime;
1680 for my $sshlogin (values %Global::host
) {
1681 $sshlogin->set_max_jobs_running(undef);
1685 if(@opt::sshloginfile
) {
1686 # Is --sshloginfile changed?
1687 for my $slf (@opt::sshloginfile
) {
1688 my $actual_file = expand_slf_shorthand
($slf);
1689 my $mtime = (stat($actual_file))[9];
1690 $last_mtime{$actual_file} ||= $mtime;
1691 if($mtime - $last_mtime{$actual_file} > 1) {
1692 ::debug
("run","--sshloginfile $actual_file changed. reload\n");
1693 $last_mtime{$actual_file} = $mtime;
1696 @Global::sshlogin
= ();
1697 for (values %Global::host
) {
1698 # Don't start new jobs on any host
1699 # except the ones added back later
1700 $_->set_max_jobs_running(0);
1702 # This will set max_jobs_running on the SSHlogins
1703 read_sshloginfile
($actual_file);
1705 $opt::filter_hosts
and filter_hosts
();
1712 $jobs_started_this_round = 0;
1713 # This will start 1 job on each --sshlogin (if possible)
1714 # thus distribute the jobs on the --sshlogins round robin
1716 for my $sshlogin (values %Global::host
) {
1717 if($Global::JobQueue-
>empty() and not $opt::pipe) {
1718 # No more jobs in the queue
1721 debug
("run", "Running jobs before on ", $sshlogin->string(), ": ",
1722 $sshlogin->jobs_running(), "\n");
1723 if ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()) {
1724 if($opt::load
and $sshlogin->loadavg_too_high()) {
1725 # The load is too high or unknown
1728 if($opt::noswap
and $sshlogin->swapping()) {
1729 # The server is swapping
1732 if($sshlogin->too_fast_remote_login()) {
1733 # It has been too short since
1736 if($opt::delay
and $opt::delay
> ::now
() - $Global::newest_starttime
) {
1737 # It has been too short since last start
1740 debug
("run", $sshlogin->string(), " has ", $sshlogin->jobs_running(),
1741 " out of ", $sshlogin->max_jobs_running(),
1742 " jobs running. Start another.\n");
1743 if(start_another_job
($sshlogin) == 0) {
1744 # No more jobs to start on this $sshlogin
1745 debug
("run","No jobs started on ", $sshlogin->string(), "\n");
1748 $sshlogin->inc_jobs_running();
1749 $sshlogin->set_last_login_at(::now
());
1751 $jobs_started_this_round++;
1753 debug
("run","Running jobs after on ", $sshlogin->string(), ": ",
1754 $sshlogin->jobs_running(), " of ",
1755 $sshlogin->max_jobs_running(), "\n");
1757 } while($jobs_started_this_round);
1759 return $jobs_started;
1764 my $no_more_file_handles_warned;
1766 sub start_another_job
{
1767 # If there are enough filehandles
1768 # and JobQueue not empty
1769 # and not $job is in joblog
1770 # Then grab a job from Global::JobQueue,
1771 # start it at sshlogin
1772 # mark it as virgin_job
1774 # $sshlogin = the SSHLogin to start the job on
1780 # @Global::virgin_jobs
1782 # 1 if another jobs was started
1784 my $sshlogin = shift;
1785 # Do we have enough file handles to start another job?
1786 if(enough_file_handles
()) {
1787 if($Global::JobQueue-
>empty() and not $opt::pipe) {
1788 # No more commands to run
1789 debug
("start", "Not starting: JobQueue empty\n");
1793 # Skip jobs already in job log
1794 # Skip jobs already in results
1796 $job = get_job_with_sshlogin
($sshlogin);
1797 if(not defined $job) {
1798 # No command available for that sshlogin
1799 debug
("start", "Not starting: no jobs available for ",
1800 $sshlogin->string(), "\n");
1803 } while ($job->is_already_in_joblog()
1805 ($opt::results
and $opt::resume
and $job->is_already_in_results()));
1806 debug
("start", "Command to run on '", $job->sshlogin()->string(), "': '",
1807 $job->replaced(),"'\n");
1810 push(@Global::virgin_jobs
,$job);
1812 debug
("start", "Started as seq ", $job->seq(),
1813 " pid:", $job->pid(), "\n");
1816 # Not enough processes to run the job.
1817 # Put it back on the queue.
1818 $Global::JobQueue-
>unget($job);
1819 # Count down the number of jobs to run for this SSHLogin.
1820 my $max = $sshlogin->max_jobs_running();
1821 if($max > 1) { $max--; } else {
1822 ::error
("No more processes: cannot run a single job. Something is wrong.\n");
1823 ::wait_and_exit
(255);
1825 $sshlogin->set_max_jobs_running($max);
1826 # Sleep up to 300 ms to give other processes time to die
1827 ::usleep
(rand()*300);
1828 ::warning
("No more processes: ",
1829 "Decreasing number of running jobs to $max. ",
1830 "Raising ulimit -u or /etc/security/limits.conf may help.\n");
1835 # No more file handles
1836 $no_more_file_handles_warned++ or
1837 ::warning
("No more file handles. ",
1838 "Raising ulimit -n or /etc/security/limits.conf may help.\n");
1844 $opt::min_progress_interval
= 0;
1850 # list of computers for progress output
1852 if (not $Global::is_terminal
) {
1853 $opt::min_progress_interval
= 30;
1858 my %progress = progress
();
1859 return ("\nComputers / CPU cores / Max jobs to run\n",
1860 $progress{'workerlist'});
1863 sub drain_job_queue
{
1866 # $Global::original_stderr
1867 # $Global::total_running
1868 # $Global::max_jobs_running
1872 # $Global::start_no_new_jobs
1874 if($opt::progress
) {
1875 print $Global::original_stderr init_progress
();
1879 my $last_left = 1000000000;
1880 my $last_progress_time = 0;
1881 my $ps_reported = 0;
1883 while($Global::total_running
> 0) {
1884 debug
($Global::total_running
, "==", scalar
1885 keys %Global::running
," slots: ", $Global::max_jobs_running
);
1887 # When using --pipe sometimes file handles are not closed properly
1888 for my $job (values %Global::running
) {
1889 close $job->fh(0,"w");
1892 # When not connected to terminal, assume CI (e.g. CircleCI). In
1893 # that case we want occasional progress output to prevent abort
1894 # due to timeout with no output, but we also need to stop sending
1895 # progress output if there has been no actual progress, so that
1896 # the job can time out appropriately (CirecleCI: 10m) in case of
1897 # a hung test. But without special output, it is extremely
1898 # annoying to diagnose which test is hung, so we add that using
1900 if($opt::progress
and
1901 ($Global::is_terminal
or (time() - $last_progress_time) >= 30)) {
1902 my %progress = progress
();
1903 if($last_header ne $progress{'header'}) {
1904 print $Global::original_stderr
"\n", $progress{'header'}, "\n";
1905 $last_header = $progress{'header'};
1907 if ($Global::is_terminal
) {
1908 print $Global::original_stderr
"\r",$progress{'status'};
1910 if ($last_left > $Global::left
) {
1911 if (not $Global::is_terminal
) {
1912 print $Global::original_stderr
$progress{'status'},"\n";
1914 $last_progress_time = time();
1916 } elsif (not $ps_reported and (time() - $last_progress_time) >= 60) {
1917 # No progress in at least 60 seconds: run ps
1918 print $Global::original_stderr
"\n";
1919 my $script_dir = ::dirname
($0);
1920 system("$script_dir/ps_with_stack || ps -wwf");
1923 $last_left = $Global::left
;
1924 flush
$Global::original_stderr
;
1926 if($Global::total_running
< $Global::max_jobs_running
1927 and not $Global::JobQueue-
>empty()) {
1928 # These jobs may not be started because of loadavg
1929 # or too little time between each ssh login.
1930 if(start_more_jobs
() > 0) {
1931 # Exponential back-on if jobs were started
1932 $sleep = $sleep/2+0.001
;
1935 # Sometimes SIGCHLD is not registered, so force reaper
1936 $sleep = ::reap_usleep
($sleep);
1938 if(not $Global::JobQueue-
>empty()) {
1939 # These jobs may not be started:
1940 # * because there the --filter-hosts has removed all
1941 if(not %Global::host
) {
1942 ::error
("There are no hosts left to run on.\n");
1943 ::wait_and_exit
(255);
1945 # * because of loadavg
1946 # * because of too little time between each ssh login.
1948 $sleep = ::reap_usleep
($sleep);
1949 if($Global::max_jobs_running
== 0) {
1950 ::warning
("There are no job slots available. Increase --jobs.\n");
1953 } while ($Global::total_running
> 0
1955 not $Global::start_no_new_jobs
and not $Global::JobQueue-
>empty());
1956 if($opt::progress
) {
1957 my %progress = progress
();
1958 print $Global::original_stderr
$opt::progress_sep
, $progress{'status'}, "\n";
1959 flush
$Global::original_stderr
;
1963 sub toggle_progress
{
1964 # Turn on/off progress view
1967 # $Global::original_stderr
1969 $opt::progress
= not $opt::progress
;
1970 if($opt::progress
) {
1971 print $Global::original_stderr init_progress
();
1980 # $Global::total_started
1982 # $workerlist = list of workers
1983 # $header = that will fit on the screen
1984 # $status = message that will fit on the screen
1986 return ("workerlist" => "", "header" => "", "status" => bar
());
1989 my ($status,$header)=("","");
1991 my($total, $completed, $left, $pctcomplete, $avgtime, $this_eta) =
1993 $eta = sprintf("ETA: %ds Left: %d AVG: %.2fs ",
1994 $this_eta, $left, $avgtime);
1995 $Global::left
= $left;
1997 my $termcols = terminal_columns
();
1998 my @workers = sort keys %Global::host
;
1999 my %sshlogin = map { $_ eq ":" ?
($_=>"local") : ($_=>$_) } @workers;
2001 my %workerno = map { ($_=>$workerno++) } @workers;
2002 my $workerlist = "";
2003 for my $w (@workers) {
2005 $workerno{$w}.":".$sshlogin{$w} ." / ".
2006 ($Global::host
{$w}->ncpus() || "-")." / ".
2007 $Global::host
{$w}->max_jobs_running()."\n";
2009 $status = "x"x
($termcols+1);
2010 if(length $status > $termcols) {
2011 # sshlogin1:XX/XX/XX%/XX.Xs sshlogin2:XX/XX/XX%/XX.Xs sshlogin3:XX/XX/XX%/XX.Xs
2012 $header = "Computer:jobs running/jobs completed/%of started jobs/Average seconds to complete";
2016 if($Global::total_started
) {
2017 my $completed = ($Global::host
{$_}->jobs_completed()||0);
2018 my $running = $Global::host
{$_}->jobs_running();
2019 my $time = $completed ?
(time-
$^T)/($completed) : "0";
2020 sprintf("%s:%d/%d/%d%%/%.1fs ",
2021 $sshlogin{$_}, $running, $completed,
2022 ($running+$completed)*100
2023 / $Global::total_started
, $time);
2027 if(length $status > $termcols) {
2028 # 1:XX/XX/XX%/XX.Xs 2:XX/XX/XX%/XX.Xs 3:XX/XX/XX%/XX.Xs 4:XX/XX/XX%/XX.Xs
2029 $header = "Computer:jobs running/jobs completed/%of started jobs";
2033 my $completed = ($Global::host
{$_}->jobs_completed()||0);
2034 my $running = $Global::host
{$_}->jobs_running();
2035 my $time = $completed ?
(time-
$^T)/($completed) : "0";
2036 sprintf("%s:%d/%d/%d%%/%.1fs ",
2037 $workerno{$_}, $running, $completed,
2038 ($running+$completed)*100
2039 / $Global::total_started
, $time);
2042 if(length $status > $termcols) {
2043 # sshlogin1:XX/XX/XX% sshlogin2:XX/XX/XX% sshlogin3:XX/XX/XX%
2044 $header = "Computer:jobs running/jobs completed/%of started jobs";
2047 { sprintf("%s:%d/%d/%d%%",
2049 $Global::host
{$_}->jobs_running(),
2050 ($Global::host
{$_}->jobs_completed()||0),
2051 ($Global::host
{$_}->jobs_running()+
2052 ($Global::host
{$_}->jobs_completed()||0))*100
2053 / $Global::total_started
) }
2056 if(length $status > $termcols) {
2057 # 1:XX/XX/XX% 2:XX/XX/XX% 3:XX/XX/XX% 4:XX/XX/XX% 5:XX/XX/XX% 6:XX/XX/XX%
2058 $header = "Computer:jobs running/jobs completed/%of started jobs";
2061 { sprintf("%s:%d/%d/%d%%",
2063 $Global::host
{$_}->jobs_running(),
2064 ($Global::host
{$_}->jobs_completed()||0),
2065 ($Global::host
{$_}->jobs_running()+
2066 ($Global::host
{$_}->jobs_completed()||0))*100
2067 / $Global::total_started
) }
2070 if(length $status > $termcols) {
2071 # sshlogin1:XX/XX/XX% sshlogin2:XX/XX/XX% sshlogin3:XX/XX sshlogin4:XX/XX
2072 $header = "Computer:jobs running/jobs completed";
2075 { sprintf("%s:%d/%d",
2076 $sshlogin{$_}, $Global::host
{$_}->jobs_running(),
2077 ($Global::host
{$_}->jobs_completed()||0)) }
2080 if(length $status > $termcols) {
2081 # sshlogin1:XX/XX sshlogin2:XX/XX sshlogin3:XX/XX sshlogin4:XX/XX
2082 $header = "Computer:jobs running/jobs completed";
2085 { sprintf("%s:%d/%d",
2086 $sshlogin{$_}, $Global::host
{$_}->jobs_running(),
2087 ($Global::host
{$_}->jobs_completed()||0)) }
2090 if(length $status > $termcols) {
2091 # 1:XX/XX 2:XX/XX 3:XX/XX 4:XX/XX 5:XX/XX 6:XX/XX
2092 $header = "Computer:jobs running/jobs completed";
2095 { sprintf("%s:%d/%d",
2096 $workerno{$_}, $Global::host
{$_}->jobs_running(),
2097 ($Global::host
{$_}->jobs_completed()||0)) }
2100 if(length $status > $termcols) {
2101 # sshlogin1:XX sshlogin2:XX sshlogin3:XX sshlogin4:XX sshlogin5:XX
2102 $header = "Computer:jobs completed";
2107 ($Global::host
{$_}->jobs_completed()||0)) }
2110 if(length $status > $termcols) {
2111 # 1:XX 2:XX 3:XX 4:XX 5:XX 6:XX
2112 $header = "Computer:jobs completed";
2117 ($Global::host
{$_}->jobs_completed()||0)) }
2120 return ("workerlist" => $workerlist, "header" => $header, "status" => $status);
2124 my ($total, $first_completed, $smoothed_avg_time);
2127 # Calculate important numbers for ETA
2129 # $total = number of jobs in total
2130 # $completed = number of jobs completed
2131 # $left = number of jobs left
2132 # $pctcomplete = percent of jobs completed
2133 # $avgtime = averaged time
2134 # $eta = smoothed eta
2135 $total ||= $Global::JobQueue-
>total_jobs();
2137 for(values %Global::host
) { $completed += $_->jobs_completed() }
2138 my $left = $total - $completed;
2139 if(not $completed) {
2140 return($total, $completed, $left, 0, 0, 0);
2142 my $pctcomplete = $completed / $total;
2143 $first_completed ||= time;
2144 my $timepassed = (time - $first_completed);
2145 my $avgtime = $timepassed / $completed;
2146 $smoothed_avg_time ||= $avgtime;
2147 # Smooth the eta so it does not jump wildly
2148 $smoothed_avg_time = (1 - $pctcomplete) * $smoothed_avg_time +
2149 $pctcomplete * $avgtime;
2150 my $eta = int($left * $smoothed_avg_time);
2151 return($total, $completed, $left, $pctcomplete, $avgtime, $eta);
2160 # $status = bar with eta, completed jobs, arg and pct
2162 $reset ||= "\033[0m";
2163 my($total, $completed, $left, $pctcomplete, $avgtime, $eta) =
2165 my $arg = $Global::newest_job ?
2166 $Global::newest_job-
>{'commandline'}->replace_placeholders(["\257<\257>"],0,0) : "";
2167 # These chars mess up display in the terminal
2168 $arg =~ tr/[\011-\016\033\302-\365]//d;
2170 sprintf("%d%% %d:%d=%ds %s",
2171 $pctcomplete*100, $completed, $left, $eta, $arg);
2172 my $terminal_width = terminal_columns
();
2173 my $s = sprintf("%-${terminal_width}s",
2174 substr($bar_text." "x
$terminal_width,
2175 0,$terminal_width));
2176 my $width = int($terminal_width * $pctcomplete);
2177 substr($s,$width,0) = $reset;
2178 my $zenity = sprintf("%-${terminal_width}s",
2179 substr("# $eta sec $arg",
2180 0,$terminal_width));
2181 $s = "\r" . $zenity . "\r" . $pctcomplete*100 . # Prefix with zenity header
2182 "\r" . $rev . $s . $reset;
2188 my ($columns,$last_column_time);
2190 sub terminal_columns
{
2191 # Get the number of columns of the display
2193 # number of columns of the screen
2194 if(not $columns or $last_column_time < time) {
2195 $last_column_time = time;
2196 $columns = $ENV{'COLUMNS'};
2198 my $resize = qx{ resize 2>/dev/null };
2199 $resize =~ /COLUMNS=(\d+);/ and do { $columns = $1; };
2207 sub get_job_with_sshlogin
{
2209 # next job object for $sshlogin if any available
2210 my $sshlogin = shift;
2213 if ($opt::hostgroups
) {
2214 my @other_hostgroup_jobs = ();
2216 while($job = $Global::JobQueue-
>get()) {
2217 if($sshlogin->in_hostgroups($job->hostgroups())) {
2218 # Found a job for this hostgroup
2221 # This job was not in the hostgroups of $sshlogin
2222 push @other_hostgroup_jobs, $job;
2225 $Global::JobQueue-
>unget(@other_hostgroup_jobs);
2226 if(not defined $job) {
2231 $job = $Global::JobQueue-
>get();
2232 if(not defined $job) {
2234 ::debug
("start", "No more jobs: JobQueue empty\n");
2239 my $clean_command = $job->replaced();
2240 if($clean_command =~ /^\s*$/) {
2241 # Do not run empty lines
2242 if(not $Global::JobQueue-
>empty()) {
2243 return get_job_with_sshlogin
($sshlogin);
2248 $job->set_sshlogin($sshlogin);
2249 if($opt::retries
and $clean_command and
2250 $job->failed_here()) {
2251 # This command with these args failed for this sshlogin
2252 my ($no_of_failed_sshlogins,$min_failures) = $job->min_failed();
2253 # Only look at the Global::host that have > 0 jobslots
2254 if($no_of_failed_sshlogins == grep { $_->max_jobs_running() > 0 } values %Global::host
2255 and $job->failed_here() == $min_failures) {
2256 # It failed the same or more times on another host:
2257 # run it on this host
2259 # If it failed fewer times on another host:
2260 # Find another job to run
2262 if(not $Global::JobQueue-
>empty()) {
2263 # This can potentially recurse for all args
2264 no warnings
'recursion';
2265 $nextjob = get_job_with_sshlogin
($sshlogin);
2267 # Push the command back on the queue
2268 $Global::JobQueue-
>unget($job);
2275 sub __REMOTE_SSH__
{}
2277 sub read_sshloginfiles
{
2280 read_sshloginfile
(expand_slf_shorthand
($s));
2284 sub expand_slf_shorthand
{
2288 } elsif($file eq "..") {
2289 $file = $ENV{'HOME'}."/.parallel/sshloginfile";
2290 } elsif($file eq ".") {
2291 $file = "/etc/parallel/sshloginfile";
2292 } elsif(not -r
$file) {
2293 if(not -r
$ENV{'HOME'}."/.parallel/".$file) {
2294 # Try prepending ~/.parallel
2295 ::error
("Cannot open $file.\n");
2296 ::wait_and_exit
(255);
2298 $file = $ENV{'HOME'}."/.parallel/".$file;
2304 sub read_sshloginfile
{
2309 ::debug
("init","--slf ",$file);
2314 if(not open($in_fh, "<", $file)) {
2316 ::error
("Cannot open $file.\n");
2317 ::wait_and_exit
(255);
2324 push @Global::sshlogin
, $_;
2331 sub parse_sshlogin
{
2334 if(not @Global::sshlogin
) { @Global::sshlogin
= (":"); }
2335 for my $sshlogin (@Global::sshlogin
) {
2336 # Split up -S sshlogin,sshlogin
2337 for my $s (split /,/, $sshlogin) {
2338 if ($s eq ".." or $s eq "-") {
2339 # This may add to @Global::sshlogin - possibly bug
2340 read_sshloginfile
(expand_slf_shorthand
($s));
2346 $Global::minimal_command_line_length
= 8_000_000;
2347 my @allowed_hostgroups;
2348 for my $ncpu_sshlogin_string (::uniq
(@login)) {
2349 my $sshlogin = SSHLogin-
>new($ncpu_sshlogin_string);
2350 my $sshlogin_string = $sshlogin->string();
2351 if($sshlogin_string eq "") {
2352 # This is an ssh group: -S @webservers
2353 push @allowed_hostgroups, $sshlogin->hostgroups();
2356 if($Global::host
{$sshlogin_string}) {
2357 # This sshlogin has already been added:
2358 # It is probably a host that has come back
2359 # Set the max_jobs_running back to the original
2360 debug
("run","Already seen $sshlogin_string\n");
2361 if($sshlogin->{'ncpus'}) {
2362 # If ncpus set by '#/' of the sshlogin, overwrite it:
2363 $Global::host
{$sshlogin_string}->set_ncpus($sshlogin->ncpus());
2365 $Global::host
{$sshlogin_string}->set_max_jobs_running(undef);
2368 if($sshlogin_string eq ":") {
2369 $sshlogin->set_maxlength(Limits
::Command
::max_length
());
2371 # If all chars needs to be quoted, every other character will be \
2372 $sshlogin->set_maxlength(int(Limits
::Command
::max_length
()/2));
2374 $Global::minimal_command_line_length
=
2375 ::min
($Global::minimal_command_line_length
, $sshlogin->maxlength());
2376 $Global::host
{$sshlogin_string} = $sshlogin;
2378 if(@allowed_hostgroups) {
2379 # Remove hosts that are not in these groups
2380 while (my ($string, $sshlogin) = each %Global::host
) {
2381 if(not $sshlogin->in_hostgroups(@allowed_hostgroups)) {
2382 delete $Global::host
{$string};
2387 # debug("start", "sshlogin: ", my_dump(%Global::host),"\n");
2388 if($opt::transfer
or @opt::return or $opt::cleanup
or @opt::basefile
) {
2389 if(not remote_hosts
()) {
2390 # There are no remote hosts
2392 ::warning
("--trc ignored as there are no remote --sshlogin.\n");
2393 } elsif (defined $opt::transfer
) {
2394 ::warning
("--transfer ignored as there are no remote --sshlogin.\n");
2395 } elsif (@opt::return) {
2396 ::warning
("--return ignored as there are no remote --sshlogin.\n");
2397 } elsif (defined $opt::cleanup
) {
2398 ::warning
("--cleanup ignored as there are no remote --sshlogin.\n");
2399 } elsif (@opt::basefile
) {
2400 ::warning
("--basefile ignored as there are no remote --sshlogin.\n");
2407 # Return sshlogins that are not ':'
2409 # list of sshlogins with ':' removed
2410 return grep !/^:$/, keys %Global::host
;
2413 sub setup_basefile
{
2414 # Transfer basefiles to each $sshlogin
2415 # This needs to be done before first jobs on $sshlogin is run
2420 for my $sshlogin (values %Global::host
) {
2421 if($sshlogin->string() eq ":") { next }
2422 for my $file (@opt::basefile
) {
2423 if($file !~ m
:^/: and $opt::workdir
eq "...") {
2424 ::error
("Work dir '...' will not work with relative basefiles\n");
2425 ::wait_and_exit
(255);
2427 $workdir ||= Job-
>new("")->workdir();
2428 $cmd .= $sshlogin->rsync_transfer_cmd($file,$workdir) . "&";
2432 debug
("init", "basesetup: $cmd\n");
2436 sub cleanup_basefile {
2437 # Remove the basefiles transferred
2440 my $workdir = Job->new("")->workdir();
2441 for my $sshlogin (values %Global::host) {
2442 if($sshlogin->string() eq ":") { next }
2443 for my $file (@opt::basefile) {
2444 $cmd .= $sshlogin->cleanup_cmd($file,$workdir)."&";
2448 debug("init", "basecleanup: $cmd\n");
2453 my(@cores, @cpus, @maxline, @echo);
2454 my $envvar = ::shell_quote_scalar
($Global::envvar
);
2455 while (my ($host, $sshlogin) = each %Global::host
) {
2456 if($host eq ":") { next }
2457 # The 'true' is used to get the $host out later
2458 my $sshcmd = "true $host;" . $sshlogin->sshcommand()." ".$sshlogin->serverlogin();
2459 push(@cores, $host."\t".$sshcmd." ".$envvar." parallel --number-of-cores\n\0");
2460 push(@cpus, $host."\t".$sshcmd." ".$envvar." parallel --number-of-cpus\n\0");
2461 push(@maxline, $host."\t".$sshcmd." ".$envvar." parallel --max-line-length-allowed\n\0");
2462 # 'echo' is used to get the best possible value for an ssh login time
2463 push(@echo, $host."\t".$sshcmd." echo\n\0");
2465 my ($fh, $tmpfile) = ::tmpfile
(SUFFIX
=> ".ssh");
2466 print $fh @cores, @cpus, @maxline, @echo;
2468 # --timeout 5: Setting up an SSH connection and running a simple
2469 # command should never take > 5 sec.
2470 # --delay 0.1: If multiple sshlogins use the same proxy the delay
2471 # will make it less likely to overload the ssh daemon.
2472 # --retries 3: If the ssh daemon it overloaded, try 3 times
2473 # -s 16000: Half of the max line on UnixWare
2474 my $cmd = "cat $tmpfile | $0 -j0 --timeout 5 -s 16000 --joblog - --plain --delay 0.1 --retries 3 --tag --tagstring {1} -0 --colsep '\t' -k eval {2} 2>/dev/null";
2475 ::debug
("init", $cmd, "\n");
2476 open(my $host_fh, "-|", $cmd) || ::die_bug
("parallel host check: $cmd");
2477 my (%ncores, %ncpus, %time_to_login, %maxlen, %echo, @down_hosts);
2481 # if last char = ' then append next line
2482 # This may be due to quoting of $Global::envvar
2489 my @col = split /\t/, $_;
2490 if(defined $col[6]) {
2491 # This is a line from --joblog
2492 # seq host time spent sent received exit signal command
2493 # 2 : 1372607672.654 0.675 0 0 0 0 eval true\ m\;ssh\ m\ parallel\ --number-of-cores
2494 if($col[0] eq "Seq" and $col[1] eq "Host" and
2495 $col[2] eq "Starttime") {
2499 # Get server from: eval true server\;
2500 $col[8] =~ /eval true..([^;]+).;/ or ::die_bug
("col8 does not contain host: $col[8]");
2503 $Global::host
{$host} or next;
2504 if($col[6] eq "255" or $col[7] eq "15") {
2505 # exit == 255 or signal == 15: ssh failed
2507 ::debug
("init", "--filtered $host\n");
2508 push(@down_hosts, $host);
2509 @down_hosts = uniq
(@down_hosts);
2510 } elsif($col[6] eq "127") {
2511 # signal == 127: parallel not installed remote
2512 # Set ncpus and ncores = 1
2513 ::warning
("Could not figure out ",
2514 "number of cpus on $host. Using 1.\n");
2517 $maxlen{$host} = Limits
::Command
::max_length
();
2518 } elsif($col[0] =~ /^\d+$/ and $Global::host
{$host}) {
2519 # Remember how log it took to log in
2520 # 2 : 1372607672.654 0.675 0 0 0 0 eval true\ m\;ssh\ m\ echo
2521 $time_to_login{$host} = ::min
($time_to_login{$host},$col[3]);
2523 ::die_bug
("host check unmatched long jobline: $_");
2525 } elsif($Global::host
{$col[0]}) {
2526 # This output from --number-of-cores, --number-of-cpus,
2527 # --max-line-length-allowed
2530 # maxlen: server 131071
2531 if(not $ncores{$col[0]}) {
2532 $ncores{$col[0]} = $col[1];
2533 } elsif(not $ncpus{$col[0]}) {
2534 $ncpus{$col[0]} = $col[1];
2535 } elsif(not $maxlen{$col[0]}) {
2536 $maxlen{$col[0]} = $col[1];
2537 } elsif(not $echo{$col[0]}) {
2538 $echo{$col[0]} = $col[1];
2539 } elsif(m/perl: warning:|LANGUAGE =|LC_ALL =|LANG =|are supported and installed/) {
2541 # perl: warning: Setting locale failed.
2542 # perl: warning: Please check that your locale settings:
2543 # LANGUAGE = (unset),
2545 # LANG = "en_US.UTF-8"
2546 # are supported and installed on your system.
2547 # perl: warning: Falling back to the standard locale ("C").
2549 ::die_bug
("host check too many col0: $_");
2552 ::die_bug
("host check unmatched short jobline ($col[0]): $_");
2556 $Global::debug
or unlink $tmpfile;
2557 delete @Global::host
{@down_hosts};
2558 @down_hosts and ::warning
("Removed @down_hosts\n");
2559 $Global::minimal_command_line_length
= 8_000_000;
2560 while (my ($sshlogin, $obj) = each %Global::host
) {
2561 if($sshlogin eq ":") { next }
2562 $ncpus{$sshlogin} or ::die_bug
("ncpus missing: ".$obj->serverlogin());
2563 $ncores{$sshlogin} or ::die_bug
("ncores missing: ".$obj->serverlogin());
2564 $time_to_login{$sshlogin} or ::die_bug
("time_to_login missing: ".$obj->serverlogin());
2565 $maxlen{$sshlogin} or ::die_bug
("maxlen missing: ".$obj->serverlogin());
2566 if($opt::use_cpus_instead_of_cores
) {
2567 $obj->set_ncpus($ncpus{$sshlogin});
2569 $obj->set_ncpus($ncores{$sshlogin});
2571 $obj->set_time_to_login($time_to_login{$sshlogin});
2572 $obj->set_maxlength($maxlen{$sshlogin});
2573 $Global::minimal_command_line_length
=
2574 ::min
($Global::minimal_command_line_length
,
2575 int($maxlen{$sshlogin}/2));
2576 ::debug
("init", "Timing from -S:$sshlogin ncpus:",$ncpus{$sshlogin},
2577 " ncores:", $ncores{$sshlogin},
2578 " time_to_login:", $time_to_login{$sshlogin},
2579 " maxlen:", $maxlen{$sshlogin},
2580 " min_max_len:", $Global::minimal_command_line_length
,"\n");
2587 if(not defined $joblog) {
2590 my ($fh, $tmpfile) = ::tmpfile
(SUFFIX
=> ".log");
2595 if($Global::quoting
) {
2596 @command = shell_quote_empty
(@command);
2599 # Copy all @fhlist into tempfiles
2601 for my $fh (@fhlist) {
2602 my ($outfh, $name) = ::tmpfile
(SUFFIX
=> ".all", UNLINK
=> 1);
2603 print $outfh (<$fh>);
2605 push @argfiles, $name;
2607 if(@opt::basefile
) { setup_basefile
(); }
2608 # for each sshlogin do:
2609 # parallel -S $sshlogin $command :::: @argfiles
2611 # Pass some of the options to the sub-parallels, not all of them as
2612 # -P should only go to the first, and -S should not be copied at all.
2615 ((defined $opt::jobs
) ?
"-P $opt::jobs" : ""),
2616 ((defined $opt::linebuffer
) ?
"--linebuffer" : ""),
2617 ((defined $opt::ungroup
) ?
"-u" : ""),
2618 ((defined $opt::group
) ?
"-g" : ""),
2619 ((defined $opt::keeporder
) ?
"--keeporder" : ""),
2620 ((defined $opt::D
) ?
"-D $opt::D" : ""),
2621 ((defined $opt::plain
) ?
"--plain" : ""),
2622 ((defined $opt::max_chars
) ?
"--max-chars ".$opt::max_chars
: ""),
2626 ((defined $opt::ungroup
) ?
"-u" : ""),
2627 ((defined $opt::linebuffer
) ?
"--linebuffer" : ""),
2628 ((defined $opt::group
) ?
"-g" : ""),
2629 ((defined $opt::files
) ?
"--files" : ""),
2630 ((defined $opt::keeporder
) ?
"--keeporder" : ""),
2631 ((defined $opt::colsep
) ?
"--colsep ".shell_quote
($opt::colsep
) : ""),
2632 ((@opt::v
) ?
"-vv" : ""),
2633 ((defined $opt::D
) ?
"-D $opt::D" : ""),
2634 ((defined $opt::timeout
) ?
"--timeout ".$opt::timeout
: ""),
2635 ((defined $opt::plain
) ?
"--plain" : ""),
2636 ((defined $opt::retries
) ?
"--retries ".$opt::retries
: ""),
2637 ((defined $opt::max_chars
) ?
"--max-chars ".$opt::max_chars
: ""),
2638 ((defined $opt::arg_sep
) ?
"--arg-sep ".$opt::arg_sep
: ""),
2639 ((defined $opt::arg_file_sep
) ?
"--arg-file-sep ".$opt::arg_file_sep
: ""),
2640 (@opt::env ?
map { "--env ".::shell_quote_scalar
($_) } @opt::env
: ""),
2642 ::debug
("init", "| $0 $options\n");
2643 open(my $parallel_fh, "|-", "$0 --no-notice -j0 $options") ||
2644 ::die_bug
("This does not run GNU Parallel: $0 $options");
2646 for my $host (sort keys %Global::host
) {
2647 my $sshlogin = $Global::host
{$host};
2648 my $joblog = tmp_joblog
($opt::joblog
);
2650 push @joblogs, $joblog;
2651 $joblog = "--joblog $joblog";
2653 my $quad = $opt::arg_file_sep
|| "::::";
2654 ::debug
("init", "$0 $suboptions -j1 $joblog ",
2655 ((defined $opt::tag
) ?
2656 "--tagstring ".shell_quote_scalar
($sshlogin->string()) : ""),
2657 " -S ", shell_quote_scalar
($sshlogin->string())," ",
2658 join(" ",shell_quote
(@command))," $quad @argfiles\n");
2659 print $parallel_fh "$0 $suboptions -j1 $joblog ",
2660 ((defined $opt::tag
) ?
2661 "--tagstring ".shell_quote_scalar
($sshlogin->string()) : ""),
2662 " -S ", shell_quote_scalar
($sshlogin->string())," ",
2663 join(" ",shell_quote
(@command))," $quad @argfiles\n";
2666 $Global::exitstatus
= $? >> 8;
2667 debug
("init", "--onall exitvalue ", $?);
2668 if(@opt::basefile
) { cleanup_basefile
(); }
2669 $Global::debug
or unlink(@argfiles);
2671 for my $joblog (@joblogs) {
2673 open(my $fh, "<", $joblog) || ::die_bug
("Cannot open tmp joblog $joblog");
2674 # Skip first line (header);
2676 print $Global::joblog
(<$fh>);
2682 sub __SIGNAL_HANDLING__
{}
2684 sub save_original_signal_handler
{
2685 # Remember the original signal handler
2687 $SIG{TERM
} ||= sub { exit 0; }; # $SIG{TERM} is not set on Mac OS X
2688 $SIG{INT
} = sub { if($opt::tmux
) { qx
{ tmux kill-session
-t p
$$ }; }
2689 unlink keys %Global::unlink; exit -1 };
2690 $SIG{TERM
} = sub { if($opt::tmux
) { qx
{ tmux kill-session
-t p
$$ }; }
2691 unlink keys %Global::unlink; exit -1 };
2692 %Global::original_sig
= %SIG;
2693 $SIG{TERM
} = sub {}; # Dummy until jobs really start
2696 sub list_running_jobs
{
2698 for my $v (values %Global::running
) {
2699 print $Global::original_stderr
"$Global::progname: ",$v->replaced(),"\n";
2703 sub start_no_new_jobs
{
2705 $SIG{TERM
} = $Global::original_sig
{TERM
};
2706 print $Global::original_stderr
2707 ("$Global::progname: SIGTERM received. No new jobs will be started.\n",
2708 "$Global::progname: Waiting for these ", scalar(keys %Global::running
),
2709 " jobs to finish. Send SIGTERM again to stop now.\n");
2710 list_running_jobs
();
2711 $Global::start_no_new_jobs
||= 1;
2720 my $children_reaped = 0;
2721 debug
("run", "Reaper ");
2722 while (($stiff = waitpid(-1, &WNOHANG
)) > 0) {
2724 if($Global::sshmaster
{$stiff}) {
2725 # This is one of the ssh -M: ignore
2728 my $job = $Global::running
{$stiff};
2729 # '-a <(seq 10)' will give us a pid not in %Global::running
2731 $job->set_exitstatus($? >> 8);
2732 $job->set_exitsignal($? & 127);
2733 debug
("run", "died (", $job->exitstatus(), "): ", $job->seq());
2734 $job->set_endtime(::now
());
2735 if($stiff == $Global::tty_taken
) {
2736 # The process that died had the tty => release it
2737 $Global::tty_taken
= 0;
2740 if(not $job->should_be_retried()) {
2743 push @Global::slots
, $job->slot();
2745 # Update average runtime for timeout
2746 $Global::timeoutq-
>update_delta_time($job->runtime());
2748 # Force printing now if the job failed and we are going to exit
2749 my $print_now = ($opt::halt_on_error
and $opt::halt_on_error
== 2
2750 and $job->exitstatus());
2751 if($opt::keeporder
and not $print_now) {
2752 print_earlier_jobs
($job);
2756 if($job->exitstatus()) {
2757 process_failed_job
($job);
2761 my $sshlogin = $job->sshlogin();
2762 $sshlogin->dec_jobs_running();
2763 $sshlogin->inc_jobs_completed();
2764 $Global::total_running--
;
2765 delete $Global::running
{$stiff};
2768 debug
("run", "done ");
2769 return $children_reaped;
2772 sub process_failed_job
{
2773 # The jobs had a exit status <> 0, so error
2776 $Global::exitstatus
++;
2777 $Global::total_failed
++;
2778 if($opt::halt_on_error
) {
2779 if($opt::halt_on_error
== 1
2781 ($opt::halt_on_error
< 1 and $Global::total_failed
> 3
2783 $Global::total_failed
/ $Global::total_started
> $opt::halt_on_error
)) {
2784 # If halt on error == 1 or --halt 10%
2785 # we should gracefully exit
2786 print $Global::original_stderr
2787 ("$Global::progname: Starting no more jobs. ",
2788 "Waiting for ", scalar(keys %Global::running
),
2789 " jobs to finish. This job failed:\n",
2790 $job->replaced(),"\n");
2791 $Global::start_no_new_jobs
||= 1;
2792 $Global::halt_on_error_exitstatus
= $job->exitstatus();
2793 } elsif($opt::halt_on_error
== 2) {
2794 # If halt on error == 2 we should exit immediately
2795 print $Global::original_stderr
2796 ("$Global::progname: This job failed:\n",
2797 $job->replaced(),"\n");
2798 exit ($job->exitstatus());
2804 my (%print_later,$job_end_sequence);
2806 sub print_earlier_jobs
{
2807 # Print jobs completed earlier
2810 $print_later{$job->seq()} = $job;
2811 $job_end_sequence ||= 1;
2812 debug
("run", "Looking for: $job_end_sequence ",
2813 "Current: ", $job->seq(), "\n");
2814 for(my $j = $print_later{$job_end_sequence};
2815 $j or vec($Global::job_already_run
,$job_end_sequence,1);
2816 $job_end_sequence++,
2817 $j = $print_later{$job_end_sequence}) {
2818 debug
("run", "Found job end $job_end_sequence");
2821 delete $print_later{$job_end_sequence};
2830 # If we do not wait, we sometimes get segfault
2834 # Kill all without printing
2835 for my $job (values %Global::running
) {
2840 for (keys %Global::unkilled_children
) {
2843 delete $Global::unkilled_children
{$_};
2861 "$Global::progname [options] [command [arguments]] < list_of_arguments",
2862 "$Global::progname [options] [command [arguments]] (::: arguments|:::: argfile(s))...",
2863 "cat ... | $Global::progname --pipe [options] [command [arguments]]",
2865 "-j n Run n jobs in parallel",
2866 "-k Keep same order",
2867 "-X Multiple arguments with context replace",
2868 "--colsep regexp Split input on regexp for positional replacements",
2869 "{} {.} {/} {/.} {#} {%} {= perl code =} Replacement strings",
2870 "{3} {3.} {3/} {3/.} {=3 perl code =} Positional replacement strings",
2871 "With --plus: {} = {+/}/{/} = {.}.{+.} = {+/}/{/.}.{+.} = {..}.{+..} =",
2872 " {+/}/{/..}.{+..} = {...}.{+...} = {+/}/{/...}.{+...}",
2874 "-S sshlogin Example: foo\@server.example.com",
2875 "--slf .. Use ~/.parallel/sshloginfile as the list of sshlogins",
2876 "--trc {}.bar Shorthand for --transfer --return {}.bar --cleanup",
2877 "--onall Run the given command with argument on all sshlogins",
2878 "--nonall Run the given command with no arguments on all sshlogins",
2880 "--pipe Split stdin (standard input) to multiple jobs.",
2881 "--recend str Record end separator for --pipe.",
2882 "--recstart str Record start separator for --pipe.",
2884 "See 'man $Global::progname' for details",
2886 "When using programs that use GNU Parallel to process data for publication please cite:",
2888 "O. Tange (2011): GNU Parallel - The Command-Line Power Tool,",
2889 ";login: The USENIX Magazine, February 2011:42-47.",
2891 "Or you can get GNU Parallel without this requirement by paying 10000 EUR.",
2896 sub citation_notice
{
2897 # if --no-notice or --plain: do nothing
2898 # if stderr redirected: do nothing
2899 # if ~/.parallel/will-cite: do nothing
2900 # else: print citation notice to stderr
2905 not -t
$Global::original_stderr
2907 -e
$ENV{'HOME'}."/.parallel/will-cite") {
2910 print $Global::original_stderr
2911 ("When using programs that use GNU Parallel to process data for publication please cite:\n",
2913 " O. Tange (2011): GNU Parallel - The Command-Line Power Tool,\n",
2914 " ;login: The USENIX Magazine, February 2011:42-47.\n",
2916 "This helps funding further development; and it won't cost you a cent.\n",
2917 "Or you can get GNU Parallel without this requirement by paying 10000 EUR.\n",
2919 "To silence this citation notice run 'parallel --bibtex' once or use '--no-notice'.\n\n",
2921 flush
$Global::original_stderr
;
2928 my $fh = $Global::original_stderr
|| *STDERR
;
2929 my $prog = $Global::progname
|| "parallel";
2930 print $fh $prog, ": Warning: ", @w;
2936 my $fh = $Global::original_stderr
|| *STDERR
;
2937 my $prog = $Global::progname
|| "parallel";
2938 print $fh $prog, ": Error: ", @w;
2945 ("$Global::progname: This should not happen. You have found a bug.\n",
2946 "Please contact <parallel\@gnu.org> and include:\n",
2947 "* The version number: $Global::version\n",
2948 "* The bugid: $bugid\n",
2949 "* The command line being run\n",
2950 "* The files being read (put the files on a webserver if they are big)\n",
2952 "If you get the error on smaller/fewer files, please include those instead.\n");
2953 ::wait_and_exit
(255);
2958 if($opt::tollef
and not $opt::gnu
) {
2959 print "WARNING: YOU ARE USING --tollef. IF THINGS ARE ACTING WEIRD USE --gnu.\n";
2962 "GNU $Global::progname $Global::version",
2963 "Copyright (C) 2007,2008,2009,2010,2011,2012,2013,2014 Ole Tange and Free Software Foundation, Inc.",
2964 "License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>",
2965 "This is free software: you are free to change and redistribute it.",
2966 "GNU $Global::progname comes with no warranty.",
2968 "Web site: http://www.gnu.org/software/${Global::progname}\n",
2969 "When using programs that use GNU Parallel to process data for publication please cite:\n",
2970 "O. Tange (2011): GNU Parallel - The Command-Line Power Tool, ",
2971 ";login: The USENIX Magazine, February 2011:42-47.\n",
2972 "Or you can get GNU Parallel without this requirement by paying 10000 EUR.\n",
2978 if($opt::tollef
and not $opt::gnu
) {
2979 print "WARNING: YOU ARE USING --tollef. IF THINGS ARE ACTING WEIRD USE --gnu.\n";
2982 "When using programs that use GNU Parallel to process data for publication please cite:",
2984 "\@article{Tange2011a,",
2985 " title = {GNU Parallel - The Command-Line Power Tool},",
2986 " author = {O. Tange},",
2987 " address = {Frederiksberg, Denmark},",
2988 " journal = {;login: The USENIX Magazine},",
2992 " url = {http://www.gnu.org/s/parallel},",
2997 "(Feel free to use \\nocite{Tange2011a})",
2999 "This helps funding further development.",
3001 "Or you can get GNU Parallel without this requirement by paying 10000 EUR.",
3004 while(not -e
$ENV{'HOME'}."/.parallel/will-cite") {
3005 print "\nType: 'will cite' and press enter.\n> ";
3006 my $input = <STDIN
>;
3007 if($input =~ /will cite/i) {
3008 mkdir $ENV{'HOME'}."/.parallel";
3009 open (my $fh, ">", $ENV{'HOME'}."/.parallel/will-cite")
3010 || ::die_bug
("Cannot write: ".$ENV{'HOME'}."/.parallel/will-cite");
3012 print "\nThank you for your support. It is much appreciated. The citation\n",
3013 "notice is now silenced.\n";
3020 print("Maximal size of command: ",Limits
::Command
::real_max_length
(),"\n",
3021 "Maximal used size of command: ",Limits
::Command
::max_length
(),"\n",
3023 "Execution of will continue now, and it will try to read its input\n",
3024 "and run commands; if this is not what you wanted to happen, please\n",
3025 "press CTRL-D or CTRL-C\n");
3028 sub __GENERIC_COMMON_FUNCTION__
{}
3031 # Remove duplicates and return unique values
3032 return keys %{{ map { $_ => 1 } @_ }};
3037 # Minimum value of array
3042 defined $min or do { $min = $_; next; }; # Set $_ to the first non-undef
3043 $min = ($min < $_) ?
$min : $_;
3050 # Maximum value of array
3055 defined $max or do { $max = $_; next; }; # Set $_ to the first non-undef
3056 $max = ($max > $_) ?
$max : $_;
3063 # Sum of values of array
3068 $_ and do { $sum += $_; }
3078 sub undef_as_empty
{
3080 return $a ?
$a : "";
3087 $hostname = `hostname`;
3089 $hostname ||= "nohostname";
3097 # @programs = programs to find the path to
3099 # @full_path = full paths to @programs. Nothing if not found
3102 push @which, map { $_."/".$prg } grep { -x
$_."/".$prg } split(":",$ENV{'PATH'});
3108 my ($regexp,%fakename);
3112 # $pid = pid to see if (grand)*parent is a shell
3114 # $shellpath = path to shell - undef if no shell found
3117 # All shells known to mankind
3119 # ash bash csh dash fdsh fish fizsh ksh ksh93 mksh pdksh
3120 # posh rbash rush rzsh sash sh static-sh tcsh yash zsh
3121 my @shells = qw(ash bash csh dash fdsh fish fizsh ksh
3122 ksh93 mksh pdksh posh rbash rush rzsh
3123 sash sh static-sh tcsh yash zsh -sh -csh);
3124 # Can be formatted as:
3125 # [sh] -sh sh busybox sh
3126 # /bin/sh /sbin/sh /opt/csw/sh
3127 # NOT: foo.sh sshd crash flush pdflush scosh fsflush ssh
3128 my $shell = "(?:".join("|",@shells).")";
3129 $regexp = '^((\[)('. $shell. ')(\])|(|\S+/|busybox )('. $shell. '))($| )';
3131 # csh and tcsh disguise themselves as -sh/-csh
3132 "-sh" => ["csh", "tcsh"],
3133 "-csh" => ["tcsh", "csh"],
3136 my ($children_of_ref, $parent_of_ref, $name_of_ref) = pid_table
();
3140 ::debug
("init", "shell? ". $name_of_ref->{$testpid}."\n");
3141 if($name_of_ref->{$testpid} =~ /$regexp/o) {
3142 ::debug
("init", "which ".($3||$6)." => ");
3143 $shellpath = (which
($3 || $6,@{$fakename{$3 || $6}}))[0];
3144 ::debug
("init", "shell path $shellpath\n");
3145 $shellpath and last;
3147 $testpid = $parent_of_ref->{$testpid};
3154 my %pid_parentpid_cmd;
3158 # %children_of = { pid -> children of pid }
3159 # %parent_of = { pid -> pid of parent }
3160 # %name_of = { pid -> commandname }
3162 if(not %pid_parentpid_cmd) {
3163 # Filter for SysV-style `ps`
3164 my $sysv = q( ps -ef | perl -ane '1..1 and /^(.*)CO?MM?A?N?D
/ and $s=length $1;).
3165 q(s/^.{$s}//; print "@F[1,2] $_"' );
3167 my $bsd = q(ps -o pid,ppid,command -ax);
3168 %pid_parentpid_cmd =
3175 'dragonfly' => $bsd,
3188 $pid_parentpid_cmd{$^O} or ::die_bug
("pid_parentpid_cmd for $^O missing");
3190 my (@pidtable,%parent_of,%children_of,%name_of);
3191 # Table with pid -> children of pid
3192 @pidtable = `$pid_parentpid_cmd{$^O}`;
3195 # must match: 24436 21224 busybox ash
3196 /(\S+)\s+(\S+)\s+(\S+.*)/ or ::die_bug
("pidtable format: $_");
3197 $parent_of{$1} = $2;
3198 push @{$children_of{$2}}, $1;
3201 return(\
%children_of, \
%parent_of, \
%name_of);
3206 # Reap dead children.
3207 # If no dead children: Sleep specified amount with exponential backoff
3209 # $ms = milliseconds to sleep
3211 # $ms/2+0.001 if children reaped
3212 # $ms*1.1 if no children reaped
3215 # Sleep exponentially shorter (1/2^n) if a job finished
3219 $Global::timeoutq-
>process_timeouts();
3222 Job
::exit_if_disk_full
();
3223 if($opt::linebuffer
) {
3224 for my $job (values %Global::running
) {
3228 # Sleep exponentially longer (1.1^n) if a job did not finish
3229 # though at most 1000 ms.
3230 return (($ms < 1000) ?
($ms * 1.1) : ($ms));
3235 # Sleep this many milliseconds.
3237 # $ms = milliseconds to sleep
3239 ::debug
(int($ms),"ms ");
3240 select(undef, undef, undef, $ms/1000);
3244 # Returns time since epoch as in seconds with 3 decimals
3248 # $time = time now with millisecond accuracy
3249 if(not $Global::use{"Time::HiRes"}) {
3250 if(eval "use Time::HiRes qw ( time );") {
3251 eval "sub TimeHiRestime { return Time::HiRes::time };";
3253 eval "sub TimeHiRestime { return time() };";
3255 $Global::use{"Time::HiRes"} = 1;
3258 return (int(TimeHiRestime
()*1000))/1000;
3261 sub multiply_binary_prefix
{
3262 # Evalualte numbers with binary prefix
3263 # Ki=2^10, Mi=2^20, Gi=2^30, Ti=2^40, Pi=2^50, Ei=2^70, Zi=2^80, Yi=2^80
3264 # ki=2^10, mi=2^20, gi=2^30, ti=2^40, pi=2^50, ei=2^70, zi=2^80, yi=2^80
3265 # K =2^10, M =2^20, G =2^30, T =2^40, P =2^50, E =2^70, Z =2^80, Y =2^80
3266 # k =10^3, m =10^6, g =10^9, t=10^12, p=10^15, e=10^18, z=10^21, y=10^24
3267 # 13G = 13*1024*1024*1024 = 13958643712
3269 # $s = string with prefixes
3271 # $value = int with prefixes multiplied
3273 $s =~ s/ki/*1024/gi;
3274 $s =~ s/mi/*1024*1024/gi;
3275 $s =~ s/gi/*1024*1024*1024/gi;
3276 $s =~ s/ti/*1024*1024*1024*1024/gi;
3277 $s =~ s/pi/*1024*1024*1024*1024*1024/gi;
3278 $s =~ s/ei/*1024*1024*1024*1024*1024*1024/gi;
3279 $s =~ s/zi/*1024*1024*1024*1024*1024*1024*1024/gi;
3280 $s =~ s/yi/*1024*1024*1024*1024*1024*1024*1024*1024/gi;
3281 $s =~ s/xi/*1024*1024*1024*1024*1024*1024*1024*1024*1024/gi;
3284 $s =~ s/M/*1024*1024/g;
3285 $s =~ s/G/*1024*1024*1024/g;
3286 $s =~ s/T/*1024*1024*1024*1024/g;
3287 $s =~ s/P/*1024*1024*1024*1024*1024/g;
3288 $s =~ s/E/*1024*1024*1024*1024*1024*1024/g;
3289 $s =~ s/Z/*1024*1024*1024*1024*1024*1024*1024/g;
3290 $s =~ s/Y/*1024*1024*1024*1024*1024*1024*1024*1024/g;
3291 $s =~ s/X/*1024*1024*1024*1024*1024*1024*1024*1024*1024/g;
3294 $s =~ s/m/*1000*1000/g;
3295 $s =~ s/g/*1000*1000*1000/g;
3296 $s =~ s/t/*1000*1000*1000*1000/g;
3297 $s =~ s/p/*1000*1000*1000*1000*1000/g;
3298 $s =~ s/e/*1000*1000*1000*1000*1000*1000/g;
3299 $s =~ s/z/*1000*1000*1000*1000*1000*1000*1000/g;
3300 $s =~ s/y/*1000*1000*1000*1000*1000*1000*1000*1000/g;
3301 $s =~ s/x/*1000*1000*1000*1000*1000*1000*1000*1000*1000/g;
3309 # Create tempfile as $TMPDIR/parXXXXX
3311 # $filename = file name created
3312 return ::tempfile
(DIR
=>$ENV{'TMPDIR'}, TEMPLATE
=> 'parXXXXX', @_);
3315 sub __DEBUGGING__
{}
3322 $Global::debug
or return;
3323 @_ = grep { defined $_ ?
$_ : "" } @_;
3324 if($Global::debug
eq "all" or $Global::debug
eq $_[0]) {
3325 if($Global::fd
{1}) {
3326 # Original stdout was saved
3327 my $stdout = $Global::fd
{1};
3328 print $stdout @_[1..$#_];
3335 sub my_memory_usage
{
3337 # memory usage if found
3343 if(-e
"/proc/$pid/stat") {
3344 my $fh = FileHandle-
>new("</proc/$pid/stat");
3350 my @procinfo = split(/\s+/,$data);
3352 return undef_as_zero
($procinfo[22]);
3360 # $size = size of object if Devel::Size is installed
3362 my @size_this = (@_);
3363 eval "use Devel::Size qw(size total_size)";
3367 return total_size(@_);
3373 # ascii expression of object if Data::Dump(er) is installed
3374 # error code otherwise
3375 my @dump_this = (@_);
3376 eval "use Data
::Dump
qw(dump);";
3378 # Data::Dump not installed
3379 eval "use Data
::Dumper
;";
3381 my $err = "Neither Data
::Dump nor Data
::Dumper
is installed
\n".
3382 "Not dumping output
\n";
3383 print $Global::original_stderr $err;
3386 return Dumper(@dump_this);
3389 # Create a dummy Data::Dump:dump as Hans Schou sometimes has
3391 eval "sub Data
::Dump
:dump {}";
3392 eval "use Data
::Dump
qw(dump);";
3393 return (Data::Dump::dump(@dump_this));
3409 sub __OBJECT_ORIENTED_PARTS__ {}
3415 my $sshlogin_string = shift;
3418 # SSHLogins can have these formats:
3419 # @grp+grp/ncpu//usr/bin/ssh user@server
3420 # ncpu//usr/bin/ssh user@server
3421 # /usr/bin/ssh user@server
3424 # @grp+grp/user@server
3425 if($sshlogin_string =~ s:^\@([^/]+)/?::) {
3426 # Look for SSHLogin hostgroups
3427 %hostgroups = map { $_ => 1 } split(/\+/, $1);
3429 if ($sshlogin_string =~ s:^(\d+)/::) {
3430 # Override default autodetected ncpus unless missing
3433 my $string = $sshlogin_string;
3434 # An SSHLogin is always in the hostgroup of its $string-name
3435 $hostgroups{$string} = 1;
3436 @Global::hostgroups{keys %hostgroups} = values %hostgroups;
3438 my $no_slash_string = $string;
3439 $no_slash_string =~ s/[^-a-z0-9:]/_/gi;
3441 'string' => $string,
3442 'jobs_running' => 0,
3443 'jobs_completed' => 0,
3444 'maxlength' => undef,
3445 'max_jobs_running' => undef,
3446 'orig_max_jobs_running' => undef,
3448 'hostgroups' => \%hostgroups,
3449 'sshcommand' => undef,
3450 'serverlogin' => undef,
3451 'control_path_dir' => undef,
3452 'control_path' => undef,
3453 'time_to_login' => undef,
3454 'last_login_at' => undef,
3455 'loadavg_file' => $ENV{'HOME'} . "/.parallel/tmp
/loadavg-
" .
3458 'last_loadavg_update' => 0,
3459 'swap_activity_file' => $ENV{'HOME'} . "/.parallel/tmp
/swap_activity-
" .
3461 'swap_activity' => undef,
3462 }, ref($class) || $class;
3467 # Remove temporary files if they are created.
3468 unlink $self->{'loadavg_file'};
3469 unlink $self->{'swap_activity_file'};
3474 return $self->{'string'};
3480 return ($self->{'jobs_running'} || "0");
3483 sub inc_jobs_running {
3485 $self->{'jobs_running'}++;
3488 sub dec_jobs_running {
3490 $self->{'jobs_running'}--;
3495 $self->{'maxlength'} = shift;
3500 return $self->{'maxlength'};
3503 sub jobs_completed {
3505 return $self->{'jobs_completed'};
3510 # @hostgroups = the hostgroups to look for
3512 # true if intersection of @hostgroups and the hostgroups of this
3513 # SSHLogin is non-empty
3515 return grep { defined $self->{'hostgroups'}{$_} } @_;
3520 return keys %{$self->{'hostgroups'}};
3523 sub inc_jobs_completed {
3525 $self->{'jobs_completed'}++;
3528 sub set_max_jobs_running {
3530 if(defined $self->{'max_jobs_running'}) {
3531 $Global::max_jobs_running -= $self->{'max_jobs_running'};
3533 $self->{'max_jobs_running'} = shift;
3534 if(defined $self->{'max_jobs_running'}) {
3535 # max_jobs_running could be resat if -j is a changed file
3536 $Global::max_jobs_running += $self->{'max_jobs_running'};
3538 # Initialize orig to the first non-zero value that comes around
3539 $self->{'orig_max_jobs_running'} ||= $self->{'max_jobs_running'};
3544 my $swapping = $self->swap_activity();
3545 return (not defined $swapping or $swapping)
3549 # If the currently known swap activity is too old:
3550 # Recompute a new one in the background
3552 # last swap activity computed
3554 # Should we update the swap_activity file?
3555 my $update_swap_activity_file = 0;
3556 if(-r $self->{'swap_activity_file'}) {
3557 open(my $swap_fh, "<", $self->{'swap_activity_file'}) || ::die_bug("swap_activity_file-r
");
3558 my $swap_out = <$swap_fh>;
3560 if($swap_out =~ /^(\d+)$/) {
3561 $self->{'swap_activity'} = $1;
3562 ::debug("swap
", "New swap_activity
: ", $self->{'swap_activity'});
3564 ::debug("swap
", "Last update
: ", $self->{'last_swap_activity_update'});
3565 if(time - $self->{'last_swap_activity_update'} > 10) {
3566 # last swap activity update was started 10 seconds ago
3567 ::debug("swap
", "Older than
10 sec
: ", $self->{'swap_activity_file'});
3568 $update_swap_activity_file = 1;
3571 ::debug("swap
", "No swap_activity file
: ", $self->{'swap_activity_file'});
3572 $self->{'swap_activity'} = undef;
3573 $update_swap_activity_file = 1;
3575 if($update_swap_activity_file) {
3576 ::debug("swap
", "Updating swap_activity file
", $self->{'swap_activity_file'});
3577 $self->{'last_swap_activity_update'} = time;
3578 -e $ENV{'HOME'}."/.parallel" or mkdir $ENV{'HOME'}."/.parallel
";
3579 -e $ENV{'HOME'}."/.parallel/tmp
" or mkdir $ENV{'HOME'}."/.parallel/tmp
";
3581 $swap_activity = swapactivityscript();
3582 if($self->{'string'} ne ":") {
3583 $swap_activity = $self->sshcommand() . " " . $self->serverlogin() . " " .
3584 ::shell_quote_scalar($swap_activity);
3586 # Run swap_activity measuring.
3587 # As the command can take long to run if run remote
3588 # save it to a tmp file before moving it to the correct file
3589 my $file = $self->{'swap_activity_file'};
3590 my ($dummy_fh, $tmpfile) = ::tmpfile(SUFFIX => ".swp
");
3591 ::debug("swap
", "\n", $swap_activity, "\n");
3592 qx{ ($swap_activity > $tmpfile && mv $tmpfile $file || rm $tmpfile) & };
3594 return $self->{'swap_activity'};
3600 sub swapactivityscript
{
3602 # shellscript for detecting swap activity
3604 # arguments for vmstat are OS dependant
3605 # swap_in and swap_out are in different columns depending on OS
3611 # procs -----------memory---------- ---swap-- -----io---- -system-- ----cpu----
3612 # r b swpd free buff cache si so bi bo in cs us sy id wa
3613 # 5 0 51208 1701096 198012 18857888 0 0 37 153 28 19 56 11 33 1
3614 # 3 0 51208 1701288 198012 18857972 0 0 0 0 3638 10412 15 3 82 0
3615 'linux' => ['vmstat 1 2 | tail -n1', '$7*$8'],
3619 # kthr memory page disk faults cpu
3620 # r b w swap free si so pi po fr de sr s3 s4 -- -- in sy cs us sy id
3621 # 0 0 0 4628952 3208408 0 0 3 1 1 0 0 -0 2 0 0 263 613 246 1 2 97
3622 # 0 0 0 4552504 3166360 0 0 0 0 0 0 0 0 0 0 0 246 213 240 1 1 98
3623 'solaris' => ['vmstat -S 1 2 | tail -1', '$6*$7'],
3625 # darwin (macosx): $21*$22
3627 # Mach Virtual Memory Statistics: (page size of 4096 bytes)
3628 # free active specul inactive throttle wired prgable faults copy 0fill reactive purged file-backed anonymous cmprssed cmprssor dcomprs comprs pageins pageout swapins swapouts
3629 # 346306 829050 74871 606027 0 240231 90367 544858K 62343596 270837K 14178 415070 570102 939846 356 370 116 922 4019813 4 0 0
3630 # 345740 830383 74875 606031 0 239234 90369 2696 359 553 0 0 570110 941179 356 370 0 0 0 0 0 0
3631 'darwin' => ['vm_stat -c 2 1 | tail -n1', '$21*$22'],
3635 # procs faults cpu memory page disk
3636 # r b w in sy cs us sy id avm fre si so pi po fr de sr s0
3637 # 1 0 0 4 23 2 3 0 97 7743 217k 0 0 0 0 0 0 0 0
3638 # 1 0 0 6 40 8 0 1 99 7743 217k 0 0 3 0 0 0 0 0
3639 'ultrix' => ['vmstat -S 1 2 | tail -1', '$12*$13'],
3643 # System configuration: lcpu=1 mem=2048MB
3645 # kthr memory page faults cpu
3646 # ----- ----------- ------------------------ ------------ -----------
3647 # r b avm fre re pi po fr sr cy in sy cs us sy id wa
3648 # 0 0 333933 241803 0 0 0 0 0 0 10 143 90 0 0 99 0
3649 # 0 0 334125 241569 0 0 0 0 0 0 37 5368 184 0 9 86 5
3650 'aix' => ['vmstat 1 2 | tail -n1', '$6*$7'],
3654 # procs memory page disks faults cpu
3655 # r b w avm fre flt re pi po fr sr ad0 ad1 in sy cs us sy id
3656 # 1 0 0 596716 19560 32 0 0 0 33 8 0 0 11 220 277 0 0 99
3657 # 0 0 0 596716 19560 2 0 0 0 0 0 0 0 11 144 263 0 1 99
3658 'freebsd' => ['vmstat -H 1 2 | tail -n1', '$8*$9'],
3662 # procs memory page disks traps cpu
3663 # r b w avm fre flt re pi po fr sr wd0 cd0 int sys cs us sy id
3664 # 0 0 0 25776 164968 34 0 0 0 0 0 0 0 230 259 38 4 0 96
3665 # 0 0 0 25776 164968 24 0 0 0 0 0 0 0 237 275 37 0 0 100
3666 'mirbsd' => ['vmstat 1 2 | tail -n1', '$8*$9'],
3670 # procs memory page disks faults cpu
3671 # r b avm fre flt re pi po fr sr w0 w1 in sy cs us sy id
3672 # 0 0 138452 6012 54 0 0 0 1 2 3 0 4 100 23 0 0 100
3673 # 0 0 138456 6008 1 0 0 0 0 0 0 0 7 26 19 0 0 100
3674 'netbsd' => ['vmstat 1 2 | tail -n1', '$7*$8'],
3678 # procs memory page disks traps cpu
3679 # r b w avm fre flt re pi po fr sr wd0 wd1 int sys cs us sy id
3680 # 0 0 0 76596 109944 73 0 0 0 0 0 0 1 5 259 22 0 1 99
3681 # 0 0 0 76604 109936 24 0 0 0 0 0 0 0 7 114 20 0 1 99
3682 'openbsd' => ['vmstat 1 2 | tail -n1', '$8*$9'],
3686 # procs memory page faults cpu
3687 # r b w avm free re at pi po fr de sr in sy cs us sy id
3688 # 1 0 0 247211 216476 4 1 0 0 0 0 0 102 73005 54 6 11 83
3689 # 1 0 0 247211 216421 43 9 0 0 0 0 0 144 1675 96 25269512791222387000 25269512791222387000 105
3690 'hpux' => ['vmstat 1 2 | tail -n1', '$8*$9'],
3692 # dec_osf (tru64): $11*$12
3694 # Virtual Memory Statistics: (pagesize = 8192)
3695 # procs memory pages intr cpu
3696 # r w u act free wire fault cow zero react pin pout in sy cs us sy id
3697 # 3 181 36 51K 1895 8696 348M 59M 122M 259 79M 0 5 218 302 4 1 94
3698 # 3 181 36 51K 1893 8696 3 15 21 0 28 0 4 81 321 1 1 98
3699 'dec_osf' => ['vmstat 1 2 | tail -n1', '$11*$12'],
3703 # (pagesize: 4, size: 512288, swap size: 894972)
3704 # free actv inact wired zeroed react pgins pgouts pfaults cowpfs hrat caobj cache swfree
3705 # 371940 30844 89228 20276 298348 0 48192 19016 756105 99808 98% 876 20628 894972
3706 # 371940 30844 89228 20276 +0 +0 +0 +0 +42 +2 98% 876 20628 894972
3707 'gnu' => ['vmstat -k 1 2 | tail -n1', '$7*$8'],
3709 # -nto (qnx has no swap)
3713 my $perlscript = "";
3714 for my $os (keys %vmstat) {
3715 #q[ { vmstat 1 2 2> /dev/null || vmstat -c 1 2; } | ].
3716 # q[ awk 'NR!=4{next} NF==17||NF==16{print $7*$8} NF==22{print $21*$22} {exit}' ];
3717 $vmstat{$os}[1] =~ s/\$/\\\\\\\$/g; # $ => \\\$
3718 $perlscript .= 'if($^O eq "'.$os.'") { print `'.$vmstat{$os}[0].' | awk "{print ' .
3719 $vmstat{$os}[1] . '}"` }';
3721 $perlscript = "perl -e " . ::shell_quote_scalar
($perlscript);
3722 $script = $Global::envvar
. " " .$perlscript;
3728 sub too_fast_remote_login
{
3730 if($self->{'last_login_at'} and $self->{'time_to_login'}) {
3731 # sshd normally allows 10 simultaneous logins
3732 # A login takes time_to_login
3733 # So time_to_login/5 should be safe
3734 # If now <= last_login + time_to_login/5: Then it is too soon.
3735 my $too_fast = (::now
() <= $self->{'last_login_at'}
3736 + $self->{'time_to_login'}/5);
3737 ::debug
("run", "Too fast? $too_fast ");
3740 # No logins so far (or time_to_login not computed): it is not too fast
3747 return $self->{'last_login_at'};
3750 sub set_last_login_at
{
3752 $self->{'last_login_at'} = shift;
3755 sub loadavg_too_high
{
3757 my $loadavg = $self->loadavg();
3758 return (not defined $loadavg or
3759 $loadavg > $self->max_loadavg());
3763 # If the currently know loadavg is too old:
3764 # Recompute a new one in the background
3765 # The load average is computed as the number of processes waiting for disk
3766 # or CPU right now. So it is the server load this instant and not averaged over
3767 # several minutes. This is needed so GNU Parallel will at most start one job
3768 # that will push the load over the limit.
3771 # $last_loadavg = last load average computed (undef if none)
3773 # Should we update the loadavg file?
3774 my $update_loadavg_file = 0;
3775 if(open(my $load_fh, "<", $self->{'loadavg_file'})) {
3777 my $load_out = <$load_fh>;
3779 my $load =()= ($load_out=~/(^[DR]....[^\[])/gm);
3781 # load is overestimated by 1
3782 $self->{'loadavg'} = $load - 1;
3783 ::debug
("load", "New loadavg: ", $self->{'loadavg'});
3785 ::die_bug
("loadavg_invalid_content: $load_out");
3787 ::debug
("load", "Last update: ", $self->{'last_loadavg_update'});
3788 if(time - $self->{'last_loadavg_update'} > 10) {
3789 # last loadavg was started 10 seconds ago
3790 ::debug
("load", time - $self->{'last_loadavg_update'}, " secs old: ",
3791 $self->{'loadavg_file'});
3792 $update_loadavg_file = 1;
3795 ::debug
("load", "No loadavg file: ", $self->{'loadavg_file'});
3796 $self->{'loadavg'} = undef;
3797 $update_loadavg_file = 1;
3799 if($update_loadavg_file) {
3800 ::debug
("load", "Updating loadavg file", $self->{'loadavg_file'}, "\n");
3801 $self->{'last_loadavg_update'} = time;
3802 -e
$ENV{'HOME'}."/.parallel" or mkdir $ENV{'HOME'}."/.parallel";
3803 -e
$ENV{'HOME'}."/.parallel/tmp" or mkdir $ENV{'HOME'}."/.parallel/tmp";
3805 if($self->{'string'} ne ":") {
3806 $cmd = $self->sshcommand() . " " . $self->serverlogin() . " ";
3808 # TODO Is is called 'ps ax -o state,command' on other platforms?
3809 $cmd .= "ps ax -o state,command";
3810 # As the command can take long to run if run remote
3811 # save it to a tmp file before moving it to the correct file
3812 my $file = $self->{'loadavg_file'};
3813 my ($dummy_fh, $tmpfile) = ::tmpfile
(SUFFIX
=> ".loa");
3814 qx{ ($cmd > $tmpfile && mv $tmpfile $file || rm $tmpfile) & };
3816 return $self->{'loadavg'};
3821 # If --load is a file it might be changed
3822 if($Global::max_load_file
) {
3823 my $mtime = (stat($Global::max_load_file
))[9];
3824 if($mtime > $Global::max_load_file_last_mod
) {
3825 $Global::max_load_file_last_mod
= $mtime;
3826 for my $sshlogin (values %Global::host
) {
3827 $sshlogin->set_max_loadavg(undef);
3831 if(not defined $self->{'max_loadavg'}) {
3832 $self->{'max_loadavg'} =
3833 $self->compute_max_loadavg($opt::load
);
3835 ::debug
("load", "max_loadavg: ", $self->string(), " ", $self->{'max_loadavg'});
3836 return $self->{'max_loadavg'};
3839 sub set_max_loadavg
{
3841 $self->{'max_loadavg'} = shift;
3844 sub compute_max_loadavg
{
3845 # Parse the max loadaverage that the user asked for using --load
3849 my $loadspec = shift;
3851 if(defined $loadspec) {
3852 if($loadspec =~ /^\+(\d+)$/) {
3856 $self->ncpus() + $j;
3857 } elsif ($loadspec =~ /^-(\d+)$/) {
3861 $self->ncpus() - $j;
3862 } elsif ($loadspec =~ /^(\d+)\%$/) {
3865 $self->ncpus() * $j / 100;
3866 } elsif ($loadspec =~ /^(\d+(\.\d+)?)$/) {
3868 } elsif (-f
$loadspec) {
3869 $Global::max_load_file
= $loadspec;
3870 $Global::max_load_file_last_mod
= (stat($Global::max_load_file
))[9];
3871 if(open(my $in_fh, "<", $Global::max_load_file
)) {
3872 my $opt_load_file = join("",<$in_fh>);
3874 $load = $self->compute_max_loadavg($opt_load_file);
3876 print $Global::original_stderr
"Cannot open $loadspec\n";
3877 ::wait_and_exit
(255);
3880 print $Global::original_stderr
"Parsing of --load failed\n";
3892 return $self->{'time_to_login'};
3895 sub set_time_to_login
{
3897 $self->{'time_to_login'} = shift;
3900 sub max_jobs_running
{
3902 if(not defined $self->{'max_jobs_running'}) {
3903 my $nproc = $self->compute_number_of_processes($opt::jobs
);
3904 $self->set_max_jobs_running($nproc);
3906 return $self->{'max_jobs_running'};
3909 sub orig_max_jobs_running
{
3911 return $self->{'orig_max_jobs_running'};
3914 sub compute_number_of_processes
{
3915 # Number of processes wanted and limited by system resources
3917 # Number of processes
3920 my $wanted_processes = $self->user_requested_processes($opt_P);
3921 if(not defined $wanted_processes) {
3922 $wanted_processes = $Global::default_simultaneous_sshlogins
;
3924 ::debug
("load", "Wanted procs: $wanted_processes\n");
3926 $self->processes_available_by_system_limit($wanted_processes);
3927 ::debug
("load", "Limited to procs: $system_limit\n");
3928 return $system_limit;
3931 sub processes_available_by_system_limit
{
3932 # If the wanted number of processes is bigger than the system limits:
3933 # Limit them to the system limits
3934 # Limits are: File handles, number of input lines, processes,
3935 # and taking > 1 second to spawn 10 extra processes
3937 # Number of processes
3939 my $wanted_processes = shift;
3941 my $system_limit = 0;
3946 my $more_filehandles = 1;
3947 my $max_system_proc_reached = 0;
3948 my $slow_spawining_warning_printed = 0;
3953 # Reserve filehandles
3954 # perl uses 7 filehandles for something?
3955 # parallel uses 1 for memory_usage
3956 # parallel uses 4 for ?
3958 open($fh{"init-$i"}, "<", "/dev/null");
3962 # System process limit
3964 if($child = fork()) {
3965 push (@children,$child);
3966 $Global::unkilled_children
{$child} = 1;
3967 } elsif(defined $child) {
3968 # The child takes one process slot
3969 # It will be killed later
3970 $SIG{TERM
} = $Global::original_sig
{TERM
};
3974 $max_system_proc_reached = 1;
3977 my $count_jobs_already_read = $Global::JobQueue-
>next_seq();
3978 my $wait_time_for_getting_args = 0;
3979 my $start_time = time;
3981 $system_limit >= $wanted_processes and last;
3982 not $more_filehandles and last;
3983 $max_system_proc_reached and last;
3984 my $before_getting_arg = time;
3985 if($Global::semaphore
or $opt::pipe) {
3986 # Skip: No need to get args
3987 } elsif(defined $opt::retries
and $count_jobs_already_read) {
3988 # For retries we may need to run all jobs on this sshlogin
3989 # so include the already read jobs for this sshlogin
3990 $count_jobs_already_read--;
3992 if($opt::X
or $opt::m
) {
3993 # The arguments may have to be re-spread over several jobslots
3994 # So pessimistically only read one arg per jobslot
3995 # instead of a full commandline
3996 if($Global::JobQueue-
>{'commandlinequeue'}->{'arg_queue'}->empty()) {
3997 if($Global::JobQueue-
>empty()) {
4000 ($job) = $Global::JobQueue-
>get();
4004 ($arg) = $Global::JobQueue-
>{'commandlinequeue'}->{'arg_queue'}->get();
4008 # If there are no more command lines, then we have a process
4009 # per command line, so no need to go further
4010 $Global::JobQueue-
>empty() and last;
4011 ($job) = $Global::JobQueue-
>get();
4015 $wait_time_for_getting_args += time - $before_getting_arg;
4018 # Every simultaneous process uses 2 filehandles when grouping
4019 # Every simultaneous process uses 2 filehandles when compressing
4020 $more_filehandles = open($fh{$system_limit*10}, "<", "/dev/null")
4021 && open($fh{$system_limit*10+2}, "<", "/dev/null")
4022 && open($fh{$system_limit*10+3}, "<", "/dev/null")
4023 && open($fh{$system_limit*10+4}, "<", "/dev/null");
4025 # System process limit
4027 if($child = fork()) {
4028 push (@children,$child);
4029 $Global::unkilled_children
{$child} = 1;
4030 } elsif(defined $child) {
4031 # The child takes one process slot
4032 # It will be killed later
4033 $SIG{TERM
} = $Global::original_sig
{TERM
};
4037 $max_system_proc_reached = 1;
4039 my $forktime = time - $time - $wait_time_for_getting_args;
4040 ::debug
("run", "Time to fork $system_limit procs: $wait_time_for_getting_args ",
4042 " (processes so far: ", $system_limit,")\n");
4043 if($system_limit > 10 and
4045 $forktime > $system_limit * 0.01
4046 and not $slow_spawining_warning_printed) {
4047 # It took more than 0.01 second to fork a processes on avg.
4048 # Give the user a warning. He can press Ctrl-C if this
4050 print $Global::original_stderr
4051 ("parallel: Warning: Starting $system_limit processes took > $forktime sec.\n",
4052 "Consider adjusting -j. Press CTRL-C to stop.\n");
4053 $slow_spawining_warning_printed = 1;
4056 # Cleanup: Close the files
4057 for (values %fh) { close $_ }
4058 # Cleanup: Kill the children
4059 for my $pid (@children) {
4062 delete $Global::unkilled_children
{$pid};
4064 # Cleanup: Unget the command_lines or the @args
4065 $Global::JobQueue-
>{'commandlinequeue'}->{'arg_queue'}->unget(@args);
4066 $Global::JobQueue-
>unget(@jobs);
4067 if($system_limit < $wanted_processes) {
4068 # The system_limit is less than the wanted_processes
4069 if($system_limit < 1 and not $Global::JobQueue-
>empty()) {
4070 ::warning
("Cannot spawn any jobs. Raising ulimit -u or /etc/security/limits.conf\n",
4071 "or /proc/sys/kernel/pid_max may help.\n");
4072 ::wait_and_exit
(255);
4074 if(not $more_filehandles) {
4075 ::warning
("Only enough file handles to run ", $system_limit, " jobs in parallel.\n",
4076 "Running 'parallel -j0 -N", $system_limit, " --pipe parallel -j0' or ",
4077 "raising ulimit -n or /etc/security/limits.conf may help.\n");
4079 if($max_system_proc_reached) {
4080 ::warning
("Only enough available processes to run ", $system_limit,
4081 " jobs in parallel. Raising ulimit -u or /etc/security/limits.conf\n",
4082 "or /proc/sys/kernel/pid_max may help.\n");
4085 if($] == 5.008008 and $system_limit > 1000) {
4086 # https://savannah.gnu.org/bugs/?36942
4087 $system_limit = 1000;
4089 if($Global::JobQueue-
>empty()) {
4090 $system_limit ||= 1;
4092 if($self->string() ne ":" and
4093 $system_limit > $Global::default_simultaneous_sshlogins
) {
4095 $self->simultaneous_sshlogin_limit($system_limit);
4097 return $system_limit;
4100 sub simultaneous_sshlogin_limit
{
4101 # Test by logging in wanted number of times simultaneously
4103 # min($wanted_processes,$working_simultaneous_ssh_logins-1)
4105 my $wanted_processes = shift;
4106 if($self->{'time_to_login'}) {
4107 return $wanted_processes;
4110 # Try twice because it guesses wrong sometimes
4111 # Choose the minimal
4113 ::min
($self->simultaneous_sshlogin($wanted_processes),
4114 $self->simultaneous_sshlogin($wanted_processes));
4115 if($ssh_limit < $wanted_processes) {
4116 my $serverlogin = $self->serverlogin();
4117 ::warning
("ssh to $serverlogin only allows ",
4118 "for $ssh_limit simultaneous logins.\n",
4119 "You may raise this by changing ",
4120 "/etc/ssh/sshd_config:MaxStartups and MaxSessions on $serverlogin.\n",
4121 "Using only ",$ssh_limit-1," connections ",
4122 "to avoid race conditions.\n");
4124 # Race condition can cause problem if using all sshs.
4125 if($ssh_limit > 1) { $ssh_limit -= 1; }
4129 sub simultaneous_sshlogin
{
4130 # Using $sshlogin try to see if we can do $wanted_processes
4131 # simultaneous logins
4132 # (ssh host echo simultaneouslogin & ssh host echo simultaneouslogin & ...)|grep simul|wc -l
4134 # Number of succesful logins
4136 my $wanted_processes = shift;
4137 my $sshcmd = $self->sshcommand();
4138 my $serverlogin = $self->serverlogin();
4139 my $sshdelay = $opt::sshdelay ?
"sleep $opt::sshdelay;" : "";
4140 my $cmd = "$sshdelay$sshcmd $serverlogin echo simultaneouslogin </dev/null 2>&1 &"x
$wanted_processes;
4141 ::debug
("init", "Trying $wanted_processes logins at $serverlogin\n");
4142 open (my $simul_fh, "-|", "($cmd)|grep simultaneouslogin | wc -l") or
4143 ::die_bug
("simultaneouslogin");
4144 my $ssh_limit = <$simul_fh>;
4152 $self->{'ncpus'} = shift;
4155 sub user_requested_processes
{
4156 # Parse the number of processes that the user asked for using -j
4158 # the number of processes to run on this sshlogin
4162 if(defined $opt_P) {
4163 if($opt_P =~ /^\+(\d+)$/) {
4167 $self->ncpus() + $j;
4168 } elsif ($opt_P =~ /^-(\d+)$/) {
4172 $self->ncpus() - $j;
4173 } elsif ($opt_P =~ /^(\d+(\.\d+)?)\%$/) {
4177 $self->ncpus() * $j / 100;
4178 } elsif ($opt_P =~ /^(\d+)$/) {
4180 if($processes == 0) {
4181 # -P 0 = infinity (or at least close)
4182 $processes = $Global::infinity
;
4184 } elsif (-f
$opt_P) {
4185 $Global::max_procs_file
= $opt_P;
4186 $Global::max_procs_file_last_mod
= (stat($Global::max_procs_file
))[9];
4187 if(open(my $in_fh, "<", $Global::max_procs_file
)) {
4188 my $opt_P_file = join("",<$in_fh>);
4190 $processes = $self->user_requested_processes($opt_P_file);
4192 ::error
("Cannot open $opt_P.\n");
4193 ::wait_and_exit
(255);
4196 ::error
("Parsing of --jobs/-j/--max-procs/-P failed.\n");
4199 $processes = ::ceil
($processes);
4206 if(not defined $self->{'ncpus'}) {
4207 my $sshcmd = $self->sshcommand();
4208 my $serverlogin = $self->serverlogin();
4209 if($serverlogin eq ":") {
4210 if($opt::use_cpus_instead_of_cores
) {
4211 $self->{'ncpus'} = no_of_cpus
();
4213 $self->{'ncpus'} = no_of_cores
();
4217 my $sqe = ::shell_quote_scalar
($Global::envvar
);
4218 if($opt::use_cpus_instead_of_cores
) {
4219 $ncpu = qx(echo|$sshcmd $serverlogin $sqe parallel --number-of-cpus);
4221 ::debug
("init",qq(echo|$sshcmd $serverlogin $sqe parallel --number-of-cores\n));
4222 $ncpu = qx(echo|$sshcmd $serverlogin $sqe parallel --number-of-cores);
4225 if($ncpu =~ /^\s*[0-9]+\s*$/s) {
4226 $self->{'ncpus'} = $ncpu;
4228 ::warning
("Could not figure out ",
4229 "number of cpus on $serverlogin ($ncpu). Using 1.\n");
4230 $self->{'ncpus'} = 1;
4234 return $self->{'ncpus'};
4239 # Number of physical CPUs
4240 local $/="\n"; # If delimiter is set, then $/ will be wrong
4242 if ($^O eq 'linux') {
4243 $no_of_cpus = no_of_cpus_gnu_linux
() || no_of_cores_gnu_linux
();
4244 } elsif ($^O eq 'freebsd') {
4245 $no_of_cpus = no_of_cpus_freebsd
();
4246 } elsif ($^O eq 'netbsd') {
4247 $no_of_cpus = no_of_cpus_netbsd
();
4248 } elsif ($^O eq 'openbsd') {
4249 $no_of_cpus = no_of_cpus_openbsd
();
4250 } elsif ($^O eq 'gnu') {
4251 $no_of_cpus = no_of_cpus_hurd
();
4252 } elsif ($^O eq 'darwin') {
4253 $no_of_cpus = no_of_cpus_darwin
();
4254 } elsif ($^O eq 'solaris') {
4255 $no_of_cpus = no_of_cpus_solaris
();
4256 } elsif ($^O eq 'aix') {
4257 $no_of_cpus = no_of_cpus_aix
();
4258 } elsif ($^O eq 'hpux') {
4259 $no_of_cpus = no_of_cpus_hpux
();
4260 } elsif ($^O eq 'nto') {
4261 $no_of_cpus = no_of_cpus_qnx
();
4262 } elsif ($^O eq 'svr5') {
4263 $no_of_cpus = no_of_cpus_openserver
();
4264 } elsif ($^O eq 'irix') {
4265 $no_of_cpus = no_of_cpus_irix
();
4266 } elsif ($^O eq 'dec_osf') {
4267 $no_of_cpus = no_of_cpus_tru64
();
4269 $no_of_cpus = (no_of_cpus_gnu_linux
()
4270 || no_of_cpus_freebsd
()
4271 || no_of_cpus_netbsd
()
4272 || no_of_cpus_openbsd
()
4273 || no_of_cpus_hurd
()
4274 || no_of_cpus_darwin
()
4275 || no_of_cpus_solaris
()
4277 || no_of_cpus_hpux
()
4279 || no_of_cpus_openserver
()
4280 || no_of_cpus_irix
()
4281 || no_of_cpus_tru64
()
4282 # Number of cores is better than no guess for #CPUs
4290 ::warning
("Cannot figure out number of cpus. Using 1.\n");
4297 # Number of CPU cores
4298 local $/="\n"; # If delimiter is set, then $/ will be wrong
4300 if ($^O eq 'linux') {
4301 $no_of_cores = no_of_cores_gnu_linux
();
4302 } elsif ($^O eq 'freebsd') {
4303 $no_of_cores = no_of_cores_freebsd
();
4304 } elsif ($^O eq 'netbsd') {
4305 $no_of_cores = no_of_cores_netbsd
();
4306 } elsif ($^O eq 'openbsd') {
4307 $no_of_cores = no_of_cores_openbsd
();
4308 } elsif ($^O eq 'gnu') {
4309 $no_of_cores = no_of_cores_hurd
();
4310 } elsif ($^O eq 'darwin') {
4311 $no_of_cores = no_of_cores_darwin
();
4312 } elsif ($^O eq 'solaris') {
4313 $no_of_cores = no_of_cores_solaris
();
4314 } elsif ($^O eq 'aix') {
4315 $no_of_cores = no_of_cores_aix
();
4316 } elsif ($^O eq 'hpux') {
4317 $no_of_cores = no_of_cores_hpux
();
4318 } elsif ($^O eq 'nto') {
4319 $no_of_cores = no_of_cores_qnx
();
4320 } elsif ($^O eq 'svr5') {
4321 $no_of_cores = no_of_cores_openserver
();
4322 } elsif ($^O eq 'irix') {
4323 $no_of_cores = no_of_cores_irix
();
4324 } elsif ($^O eq 'dec_osf') {
4325 $no_of_cores = no_of_cores_tru64
();
4327 $no_of_cores = (no_of_cores_gnu_linux
()
4328 || no_of_cores_freebsd
()
4329 || no_of_cores_netbsd
()
4330 || no_of_cores_openbsd
()
4331 || no_of_cores_hurd
()
4332 || no_of_cores_darwin
()
4333 || no_of_cores_solaris
()
4334 || no_of_cores_aix
()
4335 || no_of_cores_hpux
()
4336 || no_of_cores_qnx
()
4337 || no_of_cores_openserver
()
4338 || no_of_cores_irix
()
4339 || no_of_cores_tru64
()
4345 return $no_of_cores;
4347 ::warning
("Cannot figure out number of CPU cores. Using 1.\n");
4354 # Number of cores using `nproc`
4355 my $no_of_cores = `nproc 2>/dev/null`;
4356 return $no_of_cores;
4359 sub no_of_cpus_gnu_linux
{
4361 # Number of physical CPUs on GNU/Linux
4362 # undef if not GNU/Linux
4365 if(-e
"/proc/cpuinfo") {
4369 open(my $in_fh, "<", "/proc/cpuinfo") || return undef;
4371 if(/^physical id.*[:](.*)/ and not $seen{$1}++) {
4374 /^processor.*[:]/i and $no_of_cores++;
4378 return ($no_of_cpus||$no_of_cores);
4381 sub no_of_cores_gnu_linux
{
4383 # Number of CPU cores on GNU/Linux
4384 # undef if not GNU/Linux
4386 if(-e
"/proc/cpuinfo") {
4388 open(my $in_fh, "<", "/proc/cpuinfo") || return undef;
4390 /^processor.*[:]/i and $no_of_cores++;
4394 return $no_of_cores;
4397 sub no_of_cpus_freebsd
{
4399 # Number of physical CPUs on FreeBSD
4400 # undef if not FreeBSD
4402 (`sysctl -a dev.cpu 2>/dev/null | grep \%parent | awk '{ print \$2 }' | uniq | wc -l | awk '{ print \$1 }'`
4404 `sysctl hw.ncpu 2>/dev/null | awk '{ print \$2 }'`);
4409 sub no_of_cores_freebsd
{
4411 # Number of CPU cores on FreeBSD
4412 # undef if not FreeBSD
4414 (`sysctl hw.ncpu 2>/dev/null | awk '{ print \$2 }'`
4416 `sysctl -a hw 2>/dev/null | grep [^a-z]logicalcpu[^a-z] | awk '{ print \$2 }'`);
4418 return $no_of_cores;
4421 sub no_of_cpus_netbsd
{
4423 # Number of physical CPUs on NetBSD
4424 # undef if not NetBSD
4425 my $no_of_cpus = `sysctl -n hw.ncpu 2>/dev/null`;
4430 sub no_of_cores_netbsd
{
4432 # Number of CPU cores on NetBSD
4433 # undef if not NetBSD
4434 my $no_of_cores = `sysctl -n hw.ncpu 2>/dev/null`;
4436 return $no_of_cores;
4439 sub no_of_cpus_openbsd
{
4441 # Number of physical CPUs on OpenBSD
4442 # undef if not OpenBSD
4443 my $no_of_cpus = `sysctl -n hw.ncpu 2>/dev/null`;
4448 sub no_of_cores_openbsd
{
4450 # Number of CPU cores on OpenBSD
4451 # undef if not OpenBSD
4452 my $no_of_cores = `sysctl -n hw.ncpu 2>/dev/null`;
4454 return $no_of_cores;
4457 sub no_of_cpus_hurd
{
4459 # Number of physical CPUs on HURD
4461 my $no_of_cpus = `nproc`;
4466 sub no_of_cores_hurd
{
4468 # Number of physical CPUs on HURD
4470 my $no_of_cores = `nproc`;
4472 return $no_of_cores;
4475 sub no_of_cpus_darwin
{
4477 # Number of physical CPUs on Mac Darwin
4478 # undef if not Mac Darwin
4480 (`sysctl -n hw.physicalcpu 2>/dev/null`
4482 `sysctl -a hw 2>/dev/null | grep [^a-z]physicalcpu[^a-z] | awk '{ print \$2 }'`);
4486 sub no_of_cores_darwin
{
4488 # Number of CPU cores on Mac Darwin
4489 # undef if not Mac Darwin
4491 (`sysctl -n hw.logicalcpu 2>/dev/null`
4493 `sysctl -a hw 2>/dev/null | grep [^a-z]logicalcpu[^a-z] | awk '{ print \$2 }'`);
4494 return $no_of_cores;
4497 sub no_of_cpus_solaris
{
4499 # Number of physical CPUs on Solaris
4500 # undef if not Solaris
4501 if(-x
"/usr/sbin/psrinfo") {
4502 my @psrinfo = `/usr/sbin/psrinfo`;
4503 if($#psrinfo >= 0) {
4504 return $#psrinfo +1;
4507 if(-x
"/usr/sbin/prtconf") {
4508 my @prtconf = `/usr/sbin/prtconf | grep cpu..instance`;
4509 if($#prtconf >= 0) {
4510 return $#prtconf +1;
4516 sub no_of_cores_solaris
{
4518 # Number of CPU cores on Solaris
4519 # undef if not Solaris
4520 if(-x
"/usr/sbin/psrinfo") {
4521 my @psrinfo = `/usr/sbin/psrinfo`;
4522 if($#psrinfo >= 0) {
4523 return $#psrinfo +1;
4526 if(-x
"/usr/sbin/prtconf") {
4527 my @prtconf = `/usr/sbin/prtconf | grep cpu..instance`;
4528 if($#prtconf >= 0) {
4529 return $#prtconf +1;
4535 sub no_of_cpus_aix
{
4537 # Number of physical CPUs on AIX
4540 if(-x
"/usr/sbin/lscfg") {
4541 open(my $in_fh, "-|", "/usr/sbin/lscfg -vs |grep proc | wc -l|tr -d ' '")
4543 $no_of_cpus = <$in_fh>;
4544 chomp ($no_of_cpus);
4550 sub no_of_cores_aix
{
4552 # Number of CPU cores on AIX
4555 if(-x
"/usr/bin/vmstat") {
4556 open(my $in_fh, "-|", "/usr/bin/vmstat 1 1") || return undef;
4558 /lcpu=([0-9]*) / and $no_of_cores = $1;
4562 return $no_of_cores;
4565 sub no_of_cpus_hpux
{
4567 # Number of physical CPUs on HP-UX
4568 # undef if not HP-UX
4570 (`/usr/bin/mpsched -s 2>&1 | grep 'Locality Domain Count' | awk '{ print \$4 }'`);
4574 sub no_of_cores_hpux
{
4576 # Number of CPU cores on HP-UX
4577 # undef if not HP-UX
4579 (`/usr/bin/mpsched -s 2>&1 | grep 'Processor Count' | awk '{ print \$3 }'`);
4580 return $no_of_cores;
4583 sub no_of_cpus_qnx
{
4585 # Number of physical CPUs on QNX
4587 # BUG: It is now known how to calculate this.
4592 sub no_of_cores_qnx
{
4594 # Number of CPU cores on QNX
4596 # BUG: It is now known how to calculate this.
4597 my $no_of_cores = 0;
4598 return $no_of_cores;
4601 sub no_of_cpus_openserver
{
4603 # Number of physical CPUs on SCO OpenServer
4604 # undef if not SCO OpenServer
4606 if(-x
"/usr/sbin/psrinfo") {
4607 my @psrinfo = `/usr/sbin/psrinfo`;
4608 if($#psrinfo >= 0) {
4609 return $#psrinfo +1;
4615 sub no_of_cores_openserver
{
4617 # Number of CPU cores on SCO OpenServer
4618 # undef if not SCO OpenServer
4619 my $no_of_cores = 0;
4620 if(-x
"/usr/sbin/psrinfo") {
4621 my @psrinfo = `/usr/sbin/psrinfo`;
4622 if($#psrinfo >= 0) {
4623 return $#psrinfo +1;
4626 return $no_of_cores;
4629 sub no_of_cpus_irix
{
4631 # Number of physical CPUs on IRIX
4633 my $no_of_cpus = `hinv | grep HZ | grep Processor | awk '{print \$1}'`;
4637 sub no_of_cores_irix
{
4639 # Number of CPU cores on IRIX
4641 my $no_of_cores = `hinv | grep HZ | grep Processor | awk '{print \$1}'`;
4642 return $no_of_cores;
4645 sub no_of_cpus_tru64
{
4647 # Number of physical CPUs on Tru64
4648 # undef if not Tru64
4649 my $no_of_cpus = `sizer -pr`;
4653 sub no_of_cores_tru64
{
4655 # Number of CPU cores on Tru64
4656 # undef if not Tru64
4657 my $no_of_cores = `sizer -pr`;
4658 return $no_of_cores;
4663 if (not defined $self->{'sshcommand'}) {
4664 $self->sshcommand_of_sshlogin();
4666 return $self->{'sshcommand'};
4671 if (not defined $self->{'serverlogin'}) {
4672 $self->sshcommand_of_sshlogin();
4674 return $self->{'serverlogin'};
4677 sub sshcommand_of_sshlogin
{
4678 # 'server' -> ('ssh -S /tmp/parallel-ssh-RANDOM/host-','server')
4679 # 'user@server' -> ('ssh','user@server')
4680 # 'myssh user@server' -> ('myssh','user@server')
4681 # 'myssh -l user server' -> ('myssh -l user','server')
4682 # '/usr/bin/myssh -l user server' -> ('/usr/bin/myssh -l user','server')
4684 # sshcommand - defaults to 'ssh'
4687 my ($sshcmd, $serverlogin);
4688 if($self->{'string'} =~ /(.+) (\S+)$/) {
4690 $sshcmd = $1; $serverlogin = $2;
4693 if($opt::controlmaster
) {
4694 # Use control_path to make ssh faster
4695 my $control_path = $self->control_path_dir()."/ssh-%r@%h:%p";
4696 $sshcmd = "ssh -S ".$control_path;
4697 $serverlogin = $self->{'string'};
4698 if(not $self->{'control_path'}{$control_path}++) {
4699 # Master is not running for this control_path
4703 $Global::sshmaster
{$pid} ||= 1;
4705 $SIG{'TERM'} = undef;
4706 # Ignore the 'foo' being printed
4707 open(STDOUT
,">","/dev/null");
4708 # OpenSSH_3.6.1p2 gives 'tcgetattr: Invalid argument' with -tt
4709 # STDERR >/dev/null to ignore "process_mux_new_session: tcgetattr: Invalid argument"
4710 open(STDERR
,">","/dev/null");
4711 open(STDIN
,"<","/dev/null");
4712 # Run a sleep that outputs data, so it will discover if the ssh connection closes.
4713 my $sleep = ::shell_quote_scalar
('$|=1;while(1){sleep 1;print "foo\n"}');
4714 my @master = ("ssh", "-tt", "-MTS", $control_path, $serverlogin, "perl", "-e", $sleep);
4719 $sshcmd = "ssh"; $serverlogin = $self->{'string'};
4722 $self->{'sshcommand'} = $sshcmd;
4723 $self->{'serverlogin'} = $serverlogin;
4726 sub control_path_dir
{
4730 if(not defined $self->{'control_path_dir'}) {
4731 -e
$ENV{'HOME'}."/.parallel" or mkdir $ENV{'HOME'}."/.parallel";
4732 -e
$ENV{'HOME'}."/.parallel/tmp" or mkdir $ENV{'HOME'}."/.parallel/tmp";
4733 $self->{'control_path_dir'} =
4734 File
::Temp
::tempdir
($ENV{'HOME'}
4735 . "/.parallel/tmp/control_path_dir-XXXX",
4738 return $self->{'control_path_dir'};
4741 sub rsync_transfer_cmd
{
4742 # Command to run to transfer a file
4744 # $file = filename of file to transfer
4745 # $workdir = destination dir
4747 # $cmd = rsync command to run to transfer $file ("" if unreadable)
4750 my $workdir = shift;
4752 ::warning
($file, " is not readable and will not be transferred.\n");
4756 if($file =~ m
:^/:) {
4758 $rsync_destdir = "/";
4760 $rsync_destdir = ::shell_quote_file
($workdir);
4762 $file = ::shell_quote_file
($file);
4763 my $sshcmd = $self->sshcommand();
4764 my $rsync_opt = "-rlDzR -e" . ::shell_quote_scalar
($sshcmd);
4765 my $serverlogin = $self->serverlogin();
4766 # Make dir if it does not exist
4767 return "( $sshcmd $serverlogin mkdir -p $rsync_destdir;" .
4768 rsync
()." $rsync_opt $file $serverlogin:$rsync_destdir )";
4772 # Command to run to remove the remote file
4774 # $file = filename to remove
4775 # $workdir = destination dir
4777 # $cmd = ssh command to run to remove $file and empty parent dirs
4780 my $workdir = shift;
4783 # foo/bar/./baz/quux => workdir/baz/quux
4784 # /foo/bar/./baz/quux => workdir/baz/quux
4785 $f =~ s
:.*/\./:$workdir/:;
4786 } elsif($f =~ m
:^[^/]:) {
4787 # foo/bar => workdir/foo/bar
4788 $f = $workdir."/".$f;
4790 my @subdirs = split m
:/:, ::dirname
($f);
4795 unshift @rmdir, ::shell_quote_file
($dir);
4797 my $rmdir = @rmdir ?
"rmdir @rmdir 2>/dev/null;" : "";
4798 if(defined $opt::workdir
and $opt::workdir
eq "...") {
4799 $rmdir .= "rm -rf " . ::shell_quote_file
($workdir).';';
4802 $f = ::shell_quote_file
($f);
4803 my $sshcmd = $self->sshcommand();
4804 my $serverlogin = $self->serverlogin();
4805 return "$sshcmd $serverlogin ".::shell_quote_scalar
("(rm -f $f; $rmdir)");
4812 # rsync 3.1.x uses protocol 31 which is unsupported by 2.5.7.
4813 # If the version >= 3.1.0: downgrade to protocol 30
4815 my @out = `rsync --version`;
4817 if(/version (\d+.\d+)(.\d+)?/) {
4819 # Version 3.1.0 or later: Downgrade to protocol 30
4820 $rsync = "rsync --protocol 30";
4826 $rsync or ::die_bug
("Cannot figure out version of rsync: @out");
4837 my $commandref = shift;
4838 my $read_from = shift;
4839 my $context_replace = shift;
4840 my $max_number_of_args = shift;
4841 my $return_files = shift;
4842 my $commandlinequeue = CommandLineQueue-
>new
4843 ($commandref, $read_from, $context_replace, $max_number_of_args,
4848 'commandlinequeue' => $commandlinequeue,
4849 'total_jobs' => undef,
4850 }, ref($class) || $class;
4856 if(@{$self->{'unget'}}) {
4857 my $job = shift @{$self->{'unget'}};
4860 my $commandline = $self->{'commandlinequeue'}->get();
4861 if(defined $commandline) {
4862 my $job = Job-
>new($commandline);
4872 unshift @{$self->{'unget'}}, @_;
4877 my $empty = (not @{$self->{'unget'}})
4878 && $self->{'commandlinequeue'}->empty();
4879 ::debug
("run", "JobQueue->empty $empty ");
4885 if(not defined $self->{'total_jobs'}) {
4889 while($job = $self->get()) {
4890 if(time - $start > 10) {
4891 ::warning
("Reading all arguments takes longer than 10 seconds.\n");
4892 $opt::eta
&& ::warning
("Consider removing --eta.\n");
4893 $opt::bar
&& ::warning
("Consider removing --bar.\n");
4898 while($job = $self->get()) {
4902 $self->unget(@queue);
4903 $self->{'total_jobs'} = $#queue+1;
4905 return $self->{'total_jobs'};
4911 return $self->{'commandlinequeue'}->seq();
4916 return $self->{'commandlinequeue'}->quote_args();
4924 my $commandlineref = shift;
4926 'commandline' => $commandlineref, # CommandLine object
4927 'workdir' => undef, # --workdir
4928 'stdin' => undef, # filehandle for stdin (used for --pipe)
4929 # filename for writing stdout to (used for --files)
4930 'remaining' => "", # remaining data not sent to stdin (used for --pipe)
4931 'datawritten' => 0, # amount of data sent via stdin (used for --pipe)
4932 'transfersize' => 0, # size of files using --transfer
4933 'returnsize' => 0, # size of files using --return
4935 # hash of { SSHLogins => number of times the command failed there }
4937 'sshlogin' => undef,
4938 # The commandline wrapped with rsync and ssh
4939 'sshlogin_wrap' => undef,
4940 'exitstatus' => undef,
4941 'exitsignal' => undef,
4942 # Timestamp for timeout if any
4945 }, ref($class) || $class;
4950 $self->{'commandline'} or ::die_bug
("commandline empty");
4951 return $self->{'commandline'}->replaced();
4956 return $self->{'commandline'}->seq();
4961 return $self->{'commandline'}->slot();
4969 # $cattail = perl program for: cattail "decompress program" writerpid [file_to_decompress or stdin] [file_to_unlink]
4972 # cat followed by tail.
4973 # If $writerpid dead: finish after this round
4978 my ($cmd, $writerpid, $read_file, $unlink_file) = @ARGV;
4980 open(IN,"<",$read_file) || die("cattail: Cannot open $read_file");
4986 fcntl(IN
, F_GETFL
, $flags) || die $!; # Get the current flags on the filehandle
4987 $flags |= O_NONBLOCK
; # Add non-blocking to the flags
4988 fcntl(IN
, F_SETFL
, $flags) || die $!; # Set the flags on the filehandle
4989 open(OUT
,"|-",$cmd) || die("cattail: Cannot run $cmd");
4994 my $writer_running = kill 0, $writerpid;
4995 $read = sysread(IN
,$buf,32768);
4997 # We can unlink the file now: The writer has written something
4998 -e
$unlink_file and unlink $unlink_file;
5001 my $bytes_written = syswrite(OUT
,$buf);
5002 # syswrite may be interrupted by SIGHUP
5003 substr($buf,0,$bytes_written) = "";
5005 # Something printed: Wait less next time
5008 if(eof(IN
) and not $writer_running) {
5009 # Writer dead: There will never be more to read => exit
5012 # TODO This could probably be done more efficiently using select(2)
5013 # Nothing read: Wait longer before next read
5014 # Up to 30 milliseconds
5015 $sleep = ($sleep < 30) ?
($sleep * 1.001 + 0.01) : ($sleep);
5021 # Sleep this many milliseconds.
5023 select(undef, undef, undef, $secs/1000);
5026 $cattail =~ s/#.*//mg;
5027 $cattail =~ s/\s+/ /g;
5033 sub openoutputfiles
{
5034 # Open files for STDOUT and STDERR
5035 # Set file handles in $self->fh
5037 my ($outfhw, $errfhw, $outname, $errname);
5039 my $args_as_dirname = $self->{'commandline'}->args_as_dirname();
5040 # Output in: prefix/name1/val1/name2/val2/stdout
5041 my $dir = $opt::results
."/".$args_as_dirname;
5042 if(eval{ File
::Path
::mkpath
($dir); }) {
5045 # mkpath failed: Argument probably too long.
5046 # Set $Global::max_file_length, which will keep the individual
5047 # dir names shorter than the max length
5048 max_file_name_length
($opt::results
);
5049 $args_as_dirname = $self->{'commandline'}->args_as_dirname();
5050 # prefix/name1/val1/name2/val2/
5051 $dir = $opt::results
."/".$args_as_dirname;
5052 File
::Path
::mkpath
($dir);
5054 # prefix/name1/val1/name2/val2/stdout
5055 $outname = "$dir/stdout";
5056 if(not open($outfhw, "+>", $outname)) {
5057 ::error
("Cannot write to `$outname'.\n");
5058 ::wait_and_exit
(255);
5060 # prefix/name1/val1/name2/val2/stderr
5061 $errname = "$dir/stderr";
5062 if(not open($errfhw, "+>", $errname)) {
5063 ::error
("Cannot write to `$errname'.\n");
5064 ::wait_and_exit
(255);
5066 $self->set_fh(1,"unlink","");
5067 $self->set_fh(2,"unlink","");
5068 } elsif(not $opt::ungroup
) {
5069 # To group we create temporary files for STDOUT and STDERR
5070 # To avoid the cleanup unlink the files immediately (but keep them open)
5071 if(@Global::tee_jobs
) {
5072 # files must be removed when the tee is done
5073 } elsif($opt::files
) {
5074 ($outfhw, $outname) = ::tmpfile
(SUFFIX
=> ".par");
5075 ($errfhw, $errname) = ::tmpfile
(SUFFIX
=> ".par");
5076 # --files => only remove stderr
5077 $self->set_fh(1,"unlink","");
5078 $self->set_fh(2,"unlink",$errname);
5080 ($outfhw, $outname) = ::tmpfile
(SUFFIX
=> ".par");
5081 ($errfhw, $errname) = ::tmpfile
(SUFFIX
=> ".par");
5082 $self->set_fh(1,"unlink",$outname);
5083 $self->set_fh(2,"unlink",$errname);
5087 open($outfhw,">&",$Global::fd
{1}) || die;
5088 open($errfhw,">&",$Global::fd
{2}) || die;
5089 # File name must be empty as it will otherwise be printed
5092 $self->set_fh(1,"unlink",$outname);
5093 $self->set_fh(2,"unlink",$errname);
5096 $self->set_fh(1,'w',$outfhw);
5097 $self->set_fh(2,'w',$errfhw);
5098 $self->set_fh(1,'name',$outname);
5099 $self->set_fh(2,'name',$errname);
5100 if($opt::compress
) {
5101 # Send stdout to stdin for $opt::compress_program(1)
5102 # Send stderr to stdin for $opt::compress_program(2)
5103 # cattail get pid: $pid = $self->fh($fdno,'rpid');
5104 my $cattail = cattail
();
5105 for my $fdno (1,2) {
5106 my $wpid = open(my $fdw,"|-","$opt::compress_program >>".
5107 $self->fh($fdno,'name')) || die $?;
5108 $self->set_fh($fdno,'w',$fdw);
5109 $self->set_fh($fdno,'wpid',$wpid);
5110 my $rpid = open(my $fdr, "-|", "perl", "-e", $cattail,
5111 $opt::decompress_program
, $wpid,
5112 $self->fh($fdno,'name'),$self->fh($fdno,'unlink')) || die $?;
5113 $self->set_fh($fdno,'r',$fdr);
5114 $self->set_fh($fdno,'rpid',$rpid);
5116 } elsif(not $opt::ungroup
) {
5117 # Set reading FD if using --group (--ungroup does not need)
5118 for my $fdno (1,2) {
5119 # Re-open the file for reading
5120 # so fdw can be closed separately
5121 # and fdr can be seeked separately (for --line-buffer)
5122 open(my $fdr,"<", $self->fh($fdno,'name')) ||
5123 ::die_bug
("fdr: Cannot open ".$self->fh($fdno,'name'));
5124 $self->set_fh($fdno,'r',$fdr);
5125 # Unlink if required
5126 $Global::debug
or unlink $self->fh($fdno,"unlink");
5129 if($opt::linebuffer
) {
5130 # Set non-blocking when using --linebuffer
5131 $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
5132 for my $fdno (1,2) {
5133 my $fdr = $self->fh($fdno,'r');
5135 fcntl($fdr, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
5136 $flags |= &O_NONBLOCK; # Add non-blocking to the flags
5137 fcntl($fdr, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
5142 sub max_file_name_length {
5143 # Figure out the max length of a subdir
5144 # TODO and the max total length
5146 my $testdir = shift;
5148 my $upper = 8_000_000;
5152 rmdir($testdir."/".$dir);
5155 } while (mkdir $testdir."/".$dir);
5156 # Then search for the actual max length between $len/16 and $len
5159 while($max-$min > 5) {
5160 # If we are within 5 chars of the exact value:
5161 # it is not worth the extra time to find the exact value
5162 my $test = int(($min+$max)/2);
5164 if(mkdir $testdir."/".$dir) {
5165 rmdir($testdir."/".$dir);
5171 $Global::max_file_length = $min;
5177 my ($self, $fd_no, $key, $fh) = @_;
5178 $self->{'fd'}{$fd_no,$key} = $fh;
5183 my ($self, $fd_no, $key) = @_;
5184 return $self->{'fd'}{$fd_no,$key};
5189 my $remaining_ref = shift;
5190 my $stdin_fh = $self->fh(0,"w
");
5191 syswrite($stdin_fh,$$remaining_ref);
5194 sub set_stdin_buffer {
5195 # Copy stdin buffer from $block_ref up to $endpos
5196 # Prepend with $header_ref
5197 # Remove $recstart and $recend if needed
5199 # $header_ref = ref to $header to prepend
5200 # $block_ref = ref to $block to pass on
5201 # $endpos = length of $block to pass on
5202 # $recstart = --recstart regexp
5203 # $recend = --recend regexp
5207 my ($header_ref,$block_ref,$endpos,$recstart,$recend) = @_;
5208 $self->{'stdin_buffer'} = ($self->virgin() ? $$header_ref : "").substr($$block_ref,0,$endpos);
5209 if($opt::remove_rec_sep) {
5210 remove_rec_sep(\$self->{'stdin_buffer'},$recstart,$recend);
5212 $self->{'stdin_buffer_length'} = length $self->{'stdin_buffer'};
5213 $self->{'stdin_buffer_pos'} = 0;
5216 sub stdin_buffer_length {
5218 return $self->{'stdin_buffer_length'};
5221 sub remove_rec_sep {
5222 my ($block_ref,$recstart,$recend) = @_;
5223 # Remove record separator
5224 $$block_ref =~ s/$recend$recstart//gos;
5225 $$block_ref =~ s/^$recstart//os;
5226 $$block_ref =~ s/$recend$//os;
5229 sub non_block_write {
5231 my $something_written = 0;
5232 use POSIX qw(:errno_h);
5235 for my $buf (substr($self->{'stdin_buffer'},$self->{'stdin_buffer_pos'})) {
5236 my $in = $self->fh(0,"w");
5237 # fcntl($in, F_GETFL, $flags)
5238 # or die "Couldn't get flags for HANDLE : $!\n";
5239 # $flags |= O_NONBLOCK;
5240 # fcntl($in, F_SETFL, $flags)
5241 # or die "Couldn't set flags for HANDLE: $!\n";
5242 my $rv = syswrite($in, $buf);
5243 if (!defined($rv) && $! == EAGAIN
) {
5245 $something_written = 0;
5246 } elsif ($self->{'stdin_buffer_pos'}+$rv != $self->{'stdin_buffer_length'}) {
5248 # Remove the written part
5249 $self->{'stdin_buffer_pos'} += $rv;
5250 $something_written = $rv;
5252 # successfully wrote everything
5254 $self->set_stdin_buffer(\
$a,\$a,"","");
5255 $something_written = $rv;
5259 ::debug
("pipe", "Non-block: ", $something_written);
5260 return $something_written;
5266 return $self->{'virgin'};
5271 $self->{'virgin'} = shift;
5276 return $self->{'pid'};
5281 $self->{'pid'} = shift;
5286 # UNIX-timestamp this job started
5288 return sprintf("%.3f",$self->{'starttime'});
5293 my $starttime = shift || ::now
();
5294 $self->{'starttime'} = $starttime;
5299 # Run time in seconds
5301 return sprintf("%.3f",int(($self->endtime() - $self->starttime())*1000)/1000);
5306 # UNIX-timestamp this job ended
5307 # 0 if not ended yet
5309 return ($self->{'endtime'} || 0);
5314 my $endtime = shift;
5315 $self->{'endtime'} = $endtime;
5319 # Is the job timedout?
5321 # $delta_time = time that the job may run
5325 my $delta_time = shift;
5326 return time > $self->{'starttime'} + $delta_time;
5331 # Send the signals to (grand)*children and pid.
5332 # If no signals: TERM TERM KILL
5333 # Wait 200 ms after each TERM.
5335 # @signals = signals to send
5338 my @family_pids = $self->family_pids();
5339 # Record this jobs as failed
5340 $self->set_exitstatus(-1);
5341 # Send two TERMs to give time to clean up
5342 ::debug
("run", "Kill seq ", $self->seq(), "\n");
5343 my @send_signals = @signals || ("TERM", "TERM", "KILL");
5344 for my $signal (@send_signals) {
5346 for my $pid (@family_pids) {
5348 # The job still running
5353 # If a signal was given as input, do not do the sleep below
5356 if($signal eq "TERM" and $alive) {
5357 # Wait up to 200 ms between TERMs - but only if any pids are alive
5359 for (my $sleepsum = 0; kill 0, $family_pids[0] and $sleepsum < 200;
5360 $sleepsum += $sleep) {
5361 $sleep = ::reap_usleep
($sleep);
5368 # Find the pids with this->pid as (grand)*parent
5370 # @pids = pids of (grand)*children
5372 my $pid = $self->pid();
5375 my ($children_of_ref, $parent_of_ref, $name_of_ref) = ::pid_table
();
5378 # While more (grand)*children
5382 for my $parent (@more) {
5383 if($children_of_ref->{$parent}) {
5384 # add the children of this parent
5385 push @m, @{$children_of_ref->{$parent}};
5394 # return number of times failed for this $sshlogin
5398 # Number of times failed for $sshlogin
5400 my $sshlogin = shift;
5401 return $self->{'failed'}{$sshlogin};
5405 # return number of times failed for the current $sshlogin
5407 # Number of times failed for this sshlogin
5409 return $self->{'failed'}{$self->sshlogin()};
5413 # increase the number of times failed for this $sshlogin
5415 my $sshlogin = shift;
5416 $self->{'failed'}{$sshlogin}++;
5419 sub add_failed_here
{
5420 # increase the number of times failed for the current $sshlogin
5422 $self->{'failed'}{$self->sshlogin()}++;
5426 # increase the number of times failed for this $sshlogin
5428 my $sshlogin = shift;
5429 delete $self->{'failed'}{$sshlogin};
5432 sub reset_failed_here
{
5433 # increase the number of times failed for this $sshlogin
5435 delete $self->{'failed'}{$self->sshlogin()};
5440 # the number of sshlogins this command has failed on
5441 # the minimal number of times this command has failed
5444 ::min
(map { $self->{'failed'}{$_} } keys %{$self->{'failed'}});
5445 my $number_of_sshlogins_failed_on = scalar keys %{$self->{'failed'}};
5446 return ($number_of_sshlogins_failed_on,$min_failures);
5451 # $total_failures = the number of times this command has failed
5453 my $total_failures = 0;
5454 for (values %{$self->{'failed'}}) {
5455 $total_failures += $_;
5457 return $total_failures;
5461 # Wrap command with:
5467 # * --pipepart (@Global::cat_partials)
5470 # The ordering of the wrapping is important:
5471 # * --nice/--cat/--fifo should be done on the remote machine
5472 # * --pipepart/--pipe should be done on the local machine inside --tmux
5480 # @Global::cat_partials
5484 # $self->{'wrapped'} = the command wrapped with the above
5486 if(not defined $self->{'wrapped'}) {
5487 my $command = $Global::envvar
.$self->replaced();
5488 if($opt::shellquote
) {
5491 $command = "echo " .
5492 ::shell_quote_scalar
(::shell_quote_scalar
($command));
5495 # Prepend \nice -n19 $SHELL -c
5497 # The '\' before nice is needed to avoid tcsh's built-in
5498 $command = '\nice'. " -n". $opt::nice
. " ".
5499 $Global::shell
. " -c ".
5500 ::shell_quote_scalar
($command);
5503 # Prepend 'cat > {};'
5504 # Append '_EXIT=$?;(rm {};exit $_EXIT)'
5506 $self->{'commandline'}->replace_placeholders(["cat > \257<\257>; "], 0, 0).
5508 $self->{'commandline'}->replace_placeholders(
5509 ["; _EXIT=\$?; rm \257<\257>; exit \$_EXIT"], 0, 0);
5510 } elsif($opt::fifo
) {
5511 # Prepend 'mkfifo {}; ('
5512 # Append ') & _PID=$!; cat > {}; wait $_PID; _EXIT=$?;(rm {};exit $_EXIT)'
5514 $self->{'commandline'}->replace_placeholders(["mkfifo \257<\257>; ("], 0, 0).
5516 $self->{'commandline'}->replace_placeholders([") & _PID=\$!; cat > \257<\257>; ",
5517 "wait \$_PID; _EXIT=\$?; ",
5518 "rm \257<\257>; exit \$_EXIT"],
5521 # Wrap with ssh + tranferring of files
5522 $command = $self->sshlogin_wrap($command);
5523 if(@Global::cat_partials
) {
5525 # < /tmp/foo perl -e 'while(@ARGV) { sysseek(STDIN,shift,0) || die; $left = shift; while($read = sysread(STDIN,$buf, ($left > 32768 ? 32768 : $left))){ $left -= $read; syswrite(STDOUT,$buf); } }' 0 0 0 11 |
5526 $command = (shift @Global::cat_partials
). "|". "(". $command. ")";
5527 } elsif($opt::pipe) {
5528 # Prepend EOF-detector to avoid starting $command if EOF.
5529 # The $tmpfile might exist if run on a remote system - we accept that risk
5530 my ($dummy_fh, $tmpfile) = ::tmpfile
(SUFFIX
=> ".chr");
5531 # Unlink to avoid leaving files if --dry-run or --sshlogin
5535 # empty input = true
5536 # some input = exit val from command
5537 qq{ sh -c 'dd bs=1 count=1 of=$tmpfile 2>/dev/null'; }.
5538 qq{ test \! -s "$tmpfile" && rm -f "$tmpfile" && exec true; }.
5539 qq{ (cat $tmpfile; rm $tmpfile; cat - ) | }.
5543 # Wrap command with 'tmux'
5544 $command = $self->tmux_wrap($command);
5546 $self->{'wrapped'} = $command;
5548 return $self->{'wrapped'};
5553 my $sshlogin = shift;
5554 $self->{'sshlogin'} = $sshlogin;
5555 delete $self->{'sshlogin_wrap'}; # If sshlogin is changed the wrap is wrong
5556 delete $self->{'wrapped'};
5561 return $self->{'sshlogin'};
5565 # Wrap the command with the commands needed to run remotely
5567 # $self->{'sshlogin_wrap'} = command wrapped with ssh+transfer commands
5569 my $command = shift;
5570 if(not defined $self->{'sshlogin_wrap'}) {
5571 my $sshlogin = $self->sshlogin();
5572 my $sshcmd = $sshlogin->sshcommand();
5573 my $serverlogin = $sshlogin->serverlogin();
5574 my ($pre,$post,$cleanup)=("","","");
5576 if($serverlogin eq ":") {
5577 # No transfer neeeded
5578 $self->{'sshlogin_wrap'} = $command;
5581 $pre .= $self->sshtransfer();
5583 $post .= $self->sshreturn();
5585 $post .= $self->sshcleanup();
5587 # We need to save the exit status of the job
5588 $post = '_EXIT_status=$?; ' . $post . ' exit $_EXIT_status;';
5590 # If the remote login shell is (t)csh then use 'setenv'
5591 # otherwise use 'export'
5592 # We cannot use parse_env_var(), as PARALLEL_SEQ changes
5596 . q{ 'eval `echo $SHELL | grep "/t\\{0,1\\}csh
" > /dev/null }
5597 . q{ && echo setenv PARALLEL_SEQ '$PARALLEL_SEQ'\; }
5598 . q{ setenv PARALLEL_PID '$PARALLEL_PID' }
5599 . q{ || echo PARALLEL_SEQ='$PARALLEL_SEQ'\;export PARALLEL_SEQ\; }
5600 . q{ PARALLEL_PID='$PARALLEL_PID'\;export PARALLEL_PID` ;' });
5601 my $remote_pre = "";
5602 my $ssh_options = "";
5603 if(($opt::pipe or $opt::pipepart
) and $opt::ctrlc
5605 not ($opt::pipe or $opt::pipepart
) and not $opt::noctrlc
) {
5606 # TODO Determine if this is needed
5607 # Propagating CTRL-C to kill remote jobs requires
5608 # remote jobs to be run with a terminal.
5609 $ssh_options = "-tt -oLogLevel=quiet";
5610 # $ssh_options = "";
5611 # tty - check if we have a tty.
5613 # -onlcr - make output 8-bit clean
5614 # isig - pass CTRL-C as signal
5615 # -echo - do not echo input
5616 $remote_pre .= ::shell_quote_scalar
('tty >/dev/null && stty isig -onlcr -echo;');
5619 my $wd = ::shell_quote_file
($self->workdir());
5620 $remote_pre .= ::shell_quote_scalar
("mkdir -p ") . $wd .
5621 ::shell_quote_scalar
("; cd ") . $wd .
5622 # exit 255 (instead of exec false) would be the correct thing,
5623 # but that fails on tcsh
5624 ::shell_quote_scalar
(qq{ || exec false;});
5626 # This script is to solve the problem of
5627 # * not mixing STDERR and STDOUT
5628 # * terminating with ctrl-c
5629 # It works on Linux but not Solaris
5630 # Finishes on Solaris, but wrong exit code:
5631 # $SIG{CHLD} = sub {exit ($?&127 ? 128+($?&127) : 1+$?>>8)};
5632 # Hangs on Solaris, but correct exit code on Linux:
5633 # $SIG{CHLD} = sub { $done = 1 };
5635 my $signal_script = "perl -e '".
5638 $SIG{CHLD} = sub { $done = 1 };
5640 $p->mask(STDOUT
, POLLHUP
);
5641 $pid=fork; unless($pid) {setpgrp; exec $ENV{SHELL
}, "-c", @ARGV; die "exec: $!\n"}
5643 kill SIGHUP
, -${pid
} unless $done;
5644 wait; exit ($?&127 ?
128+($?&127) : 1+$?>>8)
5646 $signal_script =~ s/\s+/ /g;
5648 $self->{'sshlogin_wrap'} =
5650 . "$sshcmd $ssh_options $serverlogin $parallel_env "
5652 # . ::shell_quote_scalar($signal_script . ::shell_quote_scalar($command))
5653 . ::shell_quote_scalar
($command)
5658 return $self->{'sshlogin_wrap'};
5664 # @transfer - File names of files to transfer
5667 $self->{'transfersize'} = 0;
5668 if($opt::transfer
) {
5669 for my $record (@{$self->{'commandline'}{'arg_list'}}) {
5670 # Merge arguments from records into args
5671 for my $arg (@$record) {
5672 CORE
::push @transfer, $arg->orig();
5674 if(-e
$arg->orig()) {
5675 $self->{'transfersize'} += (stat($arg->orig()))[7];
5685 return $self->{'transfersize'};
5689 # Returns for each transfer file:
5690 # rsync $file remote:$workdir
5693 my $sshlogin = $self->sshlogin();
5694 my $workdir = $self->workdir();
5695 for my $file ($self->transfer()) {
5696 push @pre, $sshlogin->rsync_transfer_cmd($file,$workdir).";";
5698 return join("",@pre);
5703 # Non-quoted and with {...} substituted
5705 # @non_quoted_filenames
5707 return $self->{'commandline'}->
5708 replace_placeholders
($self->{'commandline'}{'return_files'},0,0);
5712 # This is called after the job has finished
5714 # $number_of_bytes transferred in return
5716 for my $file ($self->return()) {
5718 $self->{'returnsize'} += (stat($file))[7];
5721 return $self->{'returnsize'};
5725 # Returns for each return-file:
5726 # rsync remote:$workdir/$file .
5728 my $sshlogin = $self->sshlogin();
5729 my $sshcmd = $sshlogin->sshcommand();
5730 my $serverlogin = $sshlogin->serverlogin();
5731 my $rsync_opt = "-rlDzR -e".::shell_quote_scalar
($sshcmd);
5733 for my $file ($self->return()) {
5734 $file =~ s
:^\
./::g; # Remove ./ if any
5735 my $relpath = ($file !~ m
:^/:); # Is the path relative?
5739 # rsync -avR /foo/./bar/baz.c remote:/tmp/
5740 # == (on old systems)
5741 # rsync -avR --rsync-path="cd /foo; rsync" remote:bar/baz.c /tmp/
5742 $wd = ::shell_quote_file
($self->workdir()."/");
5744 # Only load File::Basename if actually needed
5745 $Global::use{"File::Basename"} ||= eval "use File::Basename; 1;";
5746 # dir/./file means relative to dir, so remove dir on remote
5747 $file =~ m
:(.*)/\./:;
5748 my $basedir = $1 ?
::shell_quote_file
($1."/") : "";
5749 my $nobasedir = $file;
5750 $nobasedir =~ s
:.*/\./::;
5751 $cd = ::shell_quote_file
(::dirname
($nobasedir));
5752 my $rsync_cd = '--rsync-path='.::shell_quote_scalar
("cd $wd$cd; rsync");
5753 my $basename = ::shell_quote_scalar
(::shell_quote_file
(basename
($file)));
5755 # mkdir -p /home/tange/dir/subdir/;
5756 # rsync (--protocol 30) -rlDzR --rsync-path="cd /home/tange/dir/subdir/; rsync"
5757 # server:file.gz /home/tange/dir/subdir/
5758 $pre .= "mkdir -p $basedir$cd; ".$sshlogin->rsync()." $rsync_cd $rsync_opt $serverlogin:".
5759 $basename . " ".$basedir.$cd.";";
5765 # Return the sshcommand needed to remove the file
5767 # ssh command needed to remove files from sshlogin
5769 my $sshlogin = $self->sshlogin();
5770 my $sshcmd = $sshlogin->sshcommand();
5771 my $serverlogin = $sshlogin->serverlogin();
5772 my $workdir = $self->workdir();
5775 for my $file ($self->cleanup()) {
5776 my @subworkdirs = parentdirs_of
($file);
5777 $cleancmd .= $sshlogin->cleanup_cmd($file,$workdir).";";
5779 if(defined $opt::workdir
and $opt::workdir
eq "...") {
5780 $cleancmd .= "$sshcmd $serverlogin rm -rf " . ::shell_quote_scalar
($workdir).';';
5787 # Files to remove at cleanup
5790 my @transfer = $self->transfer();
5791 my @return = $self->return();
5792 return (@transfer,@return);
5800 # the workdir on a remote machine
5802 if(not defined $self->{'workdir'}) {
5804 if(defined $opt::workdir
) {
5805 if($opt::workdir
eq ".") {
5806 # . means current dir
5807 my $home = $ENV{'HOME'};
5812 # If homedir exists: remove the homedir from
5813 # workdir if cwd starts with homedir
5814 # E.g. /home/foo/my/dir => my/dir
5815 # E.g. /tmp/my/dir => /tmp/my/dir
5816 my ($home_dev, $home_ino) = (stat($home))[0,1];
5818 my @dir_parts = split(m
:/:,$cwd);
5820 while(defined ($part = shift @dir_parts)) {
5821 $part eq "" and next;
5822 $parent .= "/".$part;
5823 my ($parent_dev, $parent_ino) = (stat($parent))[0,1];
5824 if($parent_dev == $home_dev and $parent_ino == $home_ino) {
5825 # dev and ino is the same: We found the homedir.
5826 $workdir = join("/",@dir_parts);
5831 if($workdir eq "") {
5834 } elsif($opt::workdir
eq "...") {
5835 $workdir = ".parallel/tmp/" . ::hostname
() . "-" . $$
5836 . "-" . $self->seq();
5838 $workdir = $opt::workdir
;
5839 # Rsync treats /./ special. We don't want that
5840 $workdir =~ s
:/\./:/:g; # Remove /./
5841 $workdir =~ s
:/+$::; # Remove ending / if any
5842 $workdir =~ s
:^\
./::g; # Remove starting ./ if any
5847 $self->{'workdir'} = ::shell_quote_scalar
($workdir);
5849 return $self->{'workdir'};
5854 # all parentdirs except . of this dir or file - sorted desc by length
5857 while($d =~ s
:/[^/]+$::) {
5866 # Setup STDOUT and STDERR for a job and start it.
5868 # job-object or undef if job not to run
5870 # Get the shell command to be executed (possibly with ssh infront).
5871 my $command = $job->wrapped();
5873 if($Global::interactive
or $Global::stderr_verbose
) {
5874 if($Global::interactive
) {
5875 print $Global::original_stderr
"$command ?...";
5876 open(my $tty_fh, "<", "/dev/tty") || ::die_bug
("interactive-tty");
5877 my $answer = <$tty_fh>;
5879 my $run_yes = ($answer =~ /^\s*y/i);
5881 $command = "true"; # Run the command 'true'
5884 print $Global::original_stderr
"$command\n";
5889 $job->openoutputfiles();
5890 my($stdout_fh,$stderr_fh) = ($job->fh(1,"w"),$job->fh(2,"w"));
5891 local (*IN
,*OUT
,*ERR
);
5892 open OUT
, '>&', $stdout_fh or ::die_bug
("Can't redirect STDOUT: $!");
5893 open ERR
, '>&', $stderr_fh or ::die_bug
("Can't dup STDOUT: $!");
5895 if(($opt::dryrun
or $Global::verbose
) and $opt::ungroup
) {
5896 if($Global::verbose
<= 1) {
5897 print $stdout_fh $job->replaced(),"\n";
5899 # Verbose level > 1: Print the rsync and stuff
5900 print $stdout_fh $command,"\n";
5906 $ENV{'PARALLEL_SEQ'} = $job->seq();
5907 $ENV{'PARALLEL_PID'} = $$;
5908 ::debug
("run", $Global::total_running
, " processes . Starting (",
5909 $job->seq(), "): $command\n");
5912 # The eval is needed to catch exception from open3
5914 $pid = ::open3
($stdin_fh, ">&OUT", ">&ERR", $Global::shell
, "-c", $command) ||
5915 ::die_bug
("open3-pipe");
5918 $job->set_fh(0,"w",$stdin_fh);
5919 } elsif(@opt::a
and not $Global::stdin_in_opt_a
and $job->seq() == 1
5920 and $job->sshlogin()->string() eq ":") {
5921 # Give STDIN to the first job if using -a (but only if running
5922 # locally - otherwise CTRL-C does not work for other jobs Bug#36585)
5924 # The eval is needed to catch exception from open3
5926 $pid = ::open3
("<&IN", ">&OUT", ">&ERR", $Global::shell
, "-c", $command) ||
5927 ::die_bug
("open3-a");
5930 # Re-open to avoid complaining
5931 open(STDIN
, "<&", $Global::original_stdin
)
5932 or ::die_bug
("dup-\$Global::original_stdin: $!");
5933 } elsif ($opt::tty
and not $Global::tty_taken
and -c
"/dev/tty" and
5934 open(my $devtty_fh, "<", "/dev/tty")) {
5935 # Give /dev/tty to the command if no one else is using it
5937 # The eval is needed to catch exception from open3
5939 $pid = ::open3
("<&IN", ">&OUT", ">&ERR", $Global::shell
, "-c", $command) ||
5940 ::die_bug
("open3-/dev/tty");
5941 $Global::tty_taken
= $pid;
5946 # The eval is needed to catch exception from open3
5948 $pid = ::open3
(::gensym
, ">&OUT", ">&ERR", $Global::shell
, "-c", $command) ||
5949 ::die_bug
("open3-gensym");
5955 $Global::total_running
++;
5956 $Global::total_started
++;
5957 $job->set_pid($pid);
5958 $job->set_starttime();
5959 $Global::running
{$job->pid()} = $job;
5961 $Global::timeoutq-
>insert($job);
5963 $Global::newest_job
= $job;
5964 $Global::newest_starttime
= ::now
();
5968 ::debug
("run", "Cannot spawn more jobs.\n");
5974 # Wrap command with tmux for session pPID
5976 # $actual_command = the actual command being run (incl ssh wrap)
5978 my $actual_command = shift;
5979 # Temporary file name. Used for fifo to communicate exit val
5980 my ($fh, $tmpfile) = ::tmpfile
(SUFFIX
=> ".tmx");
5981 $Global::unlink{$tmpfile}=1;
5984 my $visual_command = $self->replaced();
5985 my $title = $visual_command;
5987 # ascii 194-245 annoys tmux
5988 $title =~ tr/[\011-\016;\302-\365]//d;
5991 if($Global::total_running
== 0) {
5992 $tmux = "tmux new-session -s p$$ -d -n ".
5993 ::shell_quote_scalar
($title);
5994 print $Global::original_stderr
"See output with: tmux attach -t p$$\n";
5996 $tmux = "tmux new-window -t p$$ -n ".::shell_quote_scalar
($title);
5998 return "mkfifo $tmpfile; $tmux ".
6000 ::shell_quote_scalar
(
6001 "(".$actual_command.');(echo $?$status;echo 255) >'.$tmpfile."&".
6002 "echo ".::shell_quote_scalar
($visual_command).";".
6003 "echo \007Job finished at: `date`;sleep 10").
6005 # Read the first line from the fifo and use that as status code
6006 "; exit `perl -ne 'unlink \$ARGV; 1..1 and print' $tmpfile` ";
6009 sub is_already_in_results
{
6010 # Do we already have results for this job?
6012 # $job_already_run = bool whether there is output for this or not
6014 my $args_as_dirname = $job->{'commandline'}->args_as_dirname();
6015 # prefix/name1/val1/name2/val2/
6016 my $dir = $opt::results
."/".$args_as_dirname;
6017 ::debug
("run", "Test $dir/stdout", -e
"$dir/stdout", "\n");
6018 return -e
"$dir/stdout";
6021 sub is_already_in_joblog
{
6023 return vec($Global::job_already_run
,$job->seq(),1);
6026 sub set_job_in_joblog
{
6028 vec($Global::job_already_run
,$job->seq(),1) = 1;
6031 sub should_be_retried
{
6032 # Should this job be retried?
6035 # 1 - job queued for retry
6037 if (not $opt::retries
) {
6040 if(not $self->exitstatus()) {
6041 # Completed with success. If there is a recorded failure: forget it
6042 $self->reset_failed_here();
6045 # The job failed. Should it be retried?
6046 $self->add_failed_here();
6047 if($self->total_failed() == $opt::retries
) {
6048 # This has been retried enough
6051 # This command should be retried
6052 $self->set_endtime(undef);
6053 $Global::JobQueue-
>unget($self);
6054 ::debug
("run", "Retry ", $self->seq(), "\n");
6061 # Print the output of the jobs
6065 ::debug
("print", ">>joboutput ", $self->replaced(), "\n");
6067 # Nothing was printed to this job:
6068 # cleanup tmp files if --files was set
6069 unlink $self->fh(1,"name");
6071 if($opt::pipe and $self->virgin()) {
6072 # Skip --joblog, --dryrun, --verbose
6074 if($Global::joblog
and defined $self->{'exitstatus'}) {
6075 # Add to joblog when finished
6076 $self->print_joblog();
6079 # Printing is only relevant for grouped/--line-buffer output.
6080 $opt::ungroup
and return;
6081 # Check for disk full
6082 exit_if_disk_full
();
6084 if(($opt::dryrun
or $Global::verbose
)
6086 not $self->{'verbose_printed'}) {
6087 $self->{'verbose_printed'}++;
6088 if($Global::verbose
<= 1) {
6089 print STDOUT
$self->replaced(),"\n";
6091 # Verbose level > 1: Print the rsync and stuff
6092 print STDOUT
$self->wrapped(),"\n";
6094 # If STDOUT and STDERR are merged,
6095 # we want the command to be printed first
6096 # so flush to avoid STDOUT being buffered
6100 for my $fdno (sort { $a <=> $b } keys %Global::fd
) {
6101 # Sort by file descriptor numerically: 1,2,3,..,9,10,11
6102 $fdno == 0 and next;
6103 my $out_fd = $Global::fd
{$fdno};
6104 my $in_fh = $self->fh($fdno,"r");
6106 if(not $Job::file_descriptor_warning_printed
{$fdno}++) {
6107 # ::warning("File descriptor $fdno not defined\n");
6111 ::debug
("print", "File descriptor $fdno (", $self->fh($fdno,"name"), "):");
6113 # If --compress: $in_fh must be closed first.
6114 close $self->fh($fdno,"w");
6116 if($opt::pipe and $self->virgin()) {
6117 # Nothing was printed to this job:
6118 # cleanup unused tmp files if --files was set
6119 for my $fdno (1,2) {
6120 unlink $self->fh($fdno,"name");
6121 unlink $self->fh($fdno,"unlink");
6123 } elsif($fdno == 1 and $self->fh($fdno,"name")) {
6124 print $out_fd $self->fh($fdno,"name"),"\n";
6126 } elsif($opt::linebuffer
) {
6127 # Line buffered print out
6128 $self->linebuffer_print($fdno,$in_fh,$out_fd);
6131 close $self->fh($fdno,"w");
6133 # $in_fh is now ready for reading at position 0
6134 if($opt::tag
or defined $opt::tagstring
) {
6135 my $tag = $self->tag();
6137 # OpenSSH_3.6.1p2 gives 'tcgetattr: Invalid argument' with -tt
6138 # This is a crappy way of ignoring it.
6140 if(/^(client_process_control: )?tcgetattr: Invalid argument\n/) {
6143 print $out_fd $tag,$_;
6145 # At most run the loop once
6150 print $out_fd $tag,$_;
6155 # OpenSSH_3.6.1p2 gives 'tcgetattr: Invalid argument' with -tt
6156 # This is a crappy way of ignoring it.
6157 sysread($in_fh,$buf,1_000);
6158 $buf =~ s/^(client_process_control: )?tcgetattr: Invalid argument\n//;
6161 while(sysread($in_fh,$buf,32768)) {
6169 ::debug
("print", "<<joboutput @command\n");
6172 sub linebuffer_print
{
6174 my ($fdno,$in_fh,$out_fd) = @_;
6175 my $partial = \
$self->{'partial_line',$fdno};
6177 if(defined $self->{'exitstatus'}) {
6178 # If the job is dead: close printing fh. Needed for --compress
6179 close $self->fh($fdno,"w");
6180 if($opt::compress
) {
6181 # Blocked reading in final round
6182 $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
6183 for my $fdno (1,2) {
6184 my $fdr = $self->fh($fdno,'r');
6186 fcntl($fdr, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
6187 $flags &= ~&O_NONBLOCK; # Remove non-blocking to the flags
6188 fcntl($fdr, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
6192 # This seek will clear EOF
6193 seek $in_fh, tell($in_fh), 0;
6194 # The read is non-blocking: The $in_fh is set to non-blocking.
6195 # 32768 --tag = 5.1s
6196 # 327680 --tag = 4.4s
6197 # 1024000 --tag = 4.4s
6198 # 3276800 --tag = 4.3s
6199 # 32768000 --tag = 4.7s
6200 # 10240000 --tag = 4.3s
6201 while(read($in_fh,substr($$partial,length $$partial),3276800)) {
6202 # Append to $$partial
6204 my $i = rindex($$partial,"\n");
6206 # One or more complete lines were found
6207 if($fdno == 2 and not $self->{'printed_first_line',$fdno}++) {
6208 # OpenSSH_3.6.1p2 gives 'tcgetattr: Invalid argument' with -tt
6209 # This is a crappy way of ignoring it.
6210 $$partial =~ s/^(client_process_control: )?tcgetattr: Invalid argument\n//;
6211 # Length of partial line has changed: Find the last \n again
6212 $i = rindex($$partial,"\n");
6214 if($opt::tag or defined $opt::tagstring) {
6215 # Replace ^ with $tag within the full line
6216 my $tag = $self->tag();
6217 substr($$partial,0,$i+1) =~ s/^/$tag/gm;
6218 # Length of partial line has changed: Find the last \n again
6219 $i = rindex($$partial,"\n");
6221 # Print up to and including the last \n
6222 print $out_fd substr($$partial,0,$i+1);
6223 # Remove the printed part
6224 substr($$partial,0,$i+1)="";
6227 if(defined $self->{'exitstatus'}) {
6228 # If the job is dead: print the remaining partial line
6230 if($$partial and ($opt::tag or defined $opt::tagstring)) {
6231 my $tag = $self->tag();
6232 $$partial =~ s/^/$tag/gm;
6234 print $out_fd $$partial;
6235 # Release the memory
6237 if($self->fh($fdno,"rpid
") and CORE::kill 0, $self->fh($fdno,"rpid
")) {
6238 # decompress still running
6240 # decompress done: close fh
6249 if($Global::verbose <= 1) {
6250 $cmd = $self->replaced();
6252 # Verbose level > 1: Print the rsync and stuff
6255 print $Global::joblog
6256 join("\t", $self->seq(), $self->sshlogin()->string(),
6257 $self->starttime(), sprintf("%10.3f
",$self->runtime()),
6258 $self->transfersize(), $self->returnsize(),
6259 $self->exitstatus(), $self->exitsignal(), $cmd
6261 flush $Global::joblog;
6262 $self->set_job_in_joblog();
6267 if(not defined $self->{'tag'}) {
6268 $self->{'tag'} = $self->{'commandline'}->
6269 replace_placeholders([$opt::tagstring],0,0)."\t";
6271 return $self->{'tag'};
6276 if(not defined $self->{'hostgroups'}) {
6277 $self->{'hostgroups'} = $self->{'commandline'}->{'arg_list'}[0][0]->{'hostgroups'};
6279 return @{$self->{'hostgroups'}};
6284 return $self->{'exitstatus'};
6287 sub set_exitstatus {
6289 my $exitstatus = shift;
6291 # Overwrite status if non-zero
6292 $self->{'exitstatus'} = $exitstatus;
6294 # Set status but do not overwrite
6295 # Status may have been set by --timeout
6296 $self->{'exitstatus'} ||= $exitstatus;
6302 return $self->{'exitsignal'};
6305 sub set_exitsignal {
6307 my $exitsignal = shift;
6308 $self->{'exitsignal'} = $exitsignal;
6312 my ($disk_full_fh, $b8193, $name);
6313 sub exit_if_disk_full {
6314 # Checks if $TMPDIR is full by writing 8kb to a tmpfile
6315 # If the disk is full: Exit immediately.
6318 if(not $disk_full_fh) {
6319 ($disk_full_fh, $name) = ::tmpfile(SUFFIX => ".df
");
6323 # Linux does not discover if a disk is full if writing <= 8192
6325 # bfs btrfs cramfs ext2 ext3 ext4 ext4dev jffs2 jfs minix msdos
6326 # ntfs reiserfs tmpfs ubifs vfat xfs
6327 # TODO this should be tested on different OS similar to this:
6330 # sudo mount /dev/ram0 /mnt/loop; sudo chmod 1777 /mnt/loop
6331 # seq 100000 | parallel --tmpdir /mnt/loop/ true &
6332 # seq 6900000 > /mnt/loop/i && echo seq OK
6333 # seq 6980868 > /mnt/loop/i
6334 # seq 10000 > /mnt/loop/ii
6336 # sudo umount /mnt/loop/ || sudo umount -l /mnt/loop/
6339 print $disk_full_fh $b8193;
6340 if(not $disk_full_fh
6342 tell $disk_full_fh == 0) {
6343 ::error("Output
is incomplete
. Cannot append to buffer file
in $ENV{'TMPDIR'}. Is the disk full?
\n");
6344 ::error("Change \
$TMPDIR with
--tmpdir
or use --compress
.\n");
6345 ::wait_and_exit(255);
6347 truncate $disk_full_fh, 0;
6348 seek($disk_full_fh, 0, 0) || die;
6353 package CommandLine;
6358 my $commandref = shift;
6360 my $arg_queue = shift;
6361 my $context_replace = shift;
6362 my $max_number_of_args = shift; # for -N and normal (-n1)
6363 my $return_files = shift;
6364 my $replacecount_ref = shift;
6365 my $len_ref = shift;
6366 my %replacecount = %$replacecount_ref;
6367 my %len = %$len_ref;
6368 for (keys %$replacecount_ref) {
6369 # Total length of this replacement string {} replaced with all args
6373 'command' => $commandref,
6377 'arg_queue' => $arg_queue,
6378 'max_number_of_args' => $max_number_of_args,
6379 'replacecount' => \%replacecount,
6380 'context_replace' => $context_replace,
6381 'return_files' => $return_files,
6382 'replaced' => undef,
6383 }, ref($class) || $class;
6388 return $self->{'seq'};
6392 my $max_slot_number;
6395 # Find the number of a free job slot and return it
6399 # $jobslot = number of jobslot
6401 if(not $self->{'slot'}) {
6402 if(not @Global::slots) {
6403 # $Global::max_slot_number will typically be $Global::max_jobs_running
6404 push @Global::slots, ++$max_slot_number;
6406 $self->{'slot'} = shift @Global::slots;
6408 return $self->{'slot'};
6413 # Add arguments from arg_queue until the number of arguments or
6414 # max line length is reached
6416 # $Global::minimal_command_line_length
6422 # $CommandLine::already_spread
6423 # $Global::max_jobs_running
6427 my $max_len = $Global::minimal_command_line_length || Limits::Command::max_length();
6429 if($opt::cat or $opt::fifo) {
6430 # Generate a tempfile name that will be used as {}
6431 my($outfh,$name) = ::tmpfile(SUFFIX => ".pip
");
6433 # Unlink is needed if: ssh otheruser@localhost
6435 $Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}->unget([Arg->new($name)]);
6438 while (not $self->{'arg_queue'}->empty()) {
6439 $next_arg = $self->{'arg_queue'}->get();
6440 if(not defined $next_arg) {
6443 $self->push($next_arg);
6444 if($self->len() >= $max_len) {
6445 # Command length is now > max_length
6446 # If there are arguments: remove the last
6447 # If there are no arguments: Error
6448 # TODO stuff about -x opt_x
6449 if($self->number_of_args() > 1) {
6450 # There is something to work on
6451 $self->{'arg_queue'}->unget($self->pop());
6454 my $args = join(" ", map { $_->orig() } @$next_arg);
6455 ::error("Command line too long
(",
6456 $self->len(), " >= ",
6459 $self->{'arg_queue'}->arg_number(),
6461 (substr($args,0,50))."...\n");
6462 $self->{'arg_queue'}->unget($self->pop());
6463 ::wait_and_exit(255);
6467 if(defined $self->{'max_number_of_args'}) {
6468 if($self->number_of_args() >= $self->{'max_number_of_args'}) {
6473 if(($opt::m or $opt::X) and not $CommandLine::already_spread
6474 and $self->{'arg_queue'}->empty() and $Global::max_jobs_running) {
6475 # -m or -X and EOF => Spread the arguments over all jobslots
6476 # (unless they are already spread)
6477 $CommandLine::already_spread ||= 1;
6478 if($self->number_of_args() > 1) {
6479 $self->{'max_number_of_args'} =
6480 ::ceil($self->number_of_args()/$Global::max_jobs_running);
6481 $Global::JobQueue->{'commandlinequeue'}->{'max_number_of_args'} =
6482 $self->{'max_number_of_args'};
6483 $self->{'arg_queue'}->unget($self->pop_all());
6484 while($self->number_of_args() < $self->{'max_number_of_args'}) {
6485 $self->push($self->{'arg_queue'}->get());
6492 # Add one or more records as arguments
6496 push @{$self->{'arg_list'}}, $record;
6498 my $quote_arg = $Global::noquote ? 0 : not $Global::quoting;
6500 for my $arg (@$record) {
6502 for my $perlexpr (keys %{$self->{'replacecount'}}) {
6503 # 50% faster than below
6504 $self->{'len'}{$perlexpr} += length $arg->replace($perlexpr,$quote_arg,$self);
6505 # $rep = $arg->replace($perlexpr,$quote_arg,$self);
6506 # $self->{'len'}{$perlexpr} += length $rep;
6507 # ::debug("length", "Length
: ", length $rep,
6508 # "(", $perlexpr, "=>", $rep, ")\n");
6515 # Remove last argument
6519 my $record = pop @{$self->{'arg_list'}};
6520 my $quote_arg = $Global::noquote ? 0 : not $Global::quoting;
6521 for my $arg (@$record) {
6523 for my $perlexpr (keys %{$self->{'replacecount'}}) {
6524 $self->{'len'}{$perlexpr} -=
6525 length $arg->replace($perlexpr,$quote_arg,$self);
6533 # Remove all arguments and zeros the length of replacement strings
6537 my @popped = @{$self->{'arg_list'}};
6538 for my $replacement_string (keys %{$self->{'replacecount'}}) {
6539 $self->{'len'}{$replacement_string} = 0;
6541 $self->{'arg_list'} = [];
6545 sub number_of_args {
6546 # The number of records
6550 # Ftq rudef oaawuq ime dqxqmeqp az 2011-01-24 mzp ime iaz nk MQhmd
6551 # Mdzrvadp Nvmdymeaz az 2011-04-10. Ftue oaawuq dqxqmeqp az
6552 # 2013-08-18 ue m nuf tmdpqd me kag tmhq fa geq daf14. Bxqmeq
6553 # qymux oaawuq@fmzsq.pw itqz kag dqmp ftue.
6555 # U my ftq ymuzfmuzqd ar m buqoq ar rdqq earfimdq omxxqp SZG
6556 # Bmdmxxqx. Rdqq earfimdq sgmdmzfqqe kag mooqee fa ftq eagdoq
6557 # oapq, ngf U tmhq nqqz iazpqduzs tai ymzk mofgmxxk _dqmp_ ftq
6560 # Fa fqef ftue U bgf uz m oayyqzf fqxxuzs bqabxq fa qymux yq itqz
6561 # ftqk dqmp ftue. Ftq oayyqzf ime bgf uz m eqofuaz ar ftq oapq
6562 # ftmf za azq iagxp xaaw fa ruj ad uybdahq ftq earfimdq - ea ftq
6563 # eagdoq oapq qcguhmxqzf fa m pgefk oadzqd. Fa ymwq egdq ftq
6564 # oayyqzf iagxp zaf etai gb ur eayq azq vgef sdqbbqp ftdagst ftq
6565 # eagdoq oapq U daf13'qp ftq eagdoq oapq
6566 # tffb://qz.iuwubqpum.ads/iuwu/DAF13
6568 # 2.5 yazfte xmfqd U dqoquhqp mz qymux rday eayqazq ita zaf azxk
6569 # ymzmsqp fa ruzp ftq oayyqzf, ngf mxea ymzmsqp fa sgqee ftq oapq
6570 # tmp fa nq daf13'qp.
6572 # Ftue nduzse yq fa ftq oazoxgeuaz ftmf ftqdq _mdq_ bqabxq, ita
6573 # mdq zaf mrruxumfqp iuft ftq bdavqof, ftmf iuxx dqmp ftq eagdoq
6574 # oapq - ftagst uf ymk zaf tmbbqz hqdk arfqz.
6576 # This is really the number of records
6577 return $#{$self->{'arg_list'}}+1;
6580 sub number_of_recargs {
6581 # The number of args in records
6583 # number of args records
6586 my $nrec = scalar @{$self->{'arg_list'}};
6588 $sum = $nrec * (scalar @{$self->{'arg_list'}[0]});
6593 sub args_as_string {
6595 # all unmodified arguments joined with ' ' (similar to {})
6597 return (join " ", map { $_->orig() }
6598 map { @$_ } @{$self->{'arg_list'}});
6601 sub args_as_dirname {
6603 # all unmodified arguments joined with '/' (similar to {})
6604 # \t \0 \\ and / are quoted as: \t \0 \\ \_
6605 # If $Global::max_file_length: Keep subdirs < $Global::max_file_length
6609 for my $rec_ref (@{$self->{'arg_list'}}) {
6610 # If headers are used, sort by them.
6611 # Otherwise keep the order from the command line.
6612 my @header_indexes_sorted = header_indexes_sorted($#$rec_ref+1);
6613 for my $n (@header_indexes_sorted) {
6615 $Global::input_source_header{$n},
6617 # \t \0 \\ and / are quoted as: \t \0 \\ \_
6622 if($Global::max_file_length) {
6623 # Keep each subdir shorter than the longest
6625 $s = substr($s,0,$Global::max_file_length);
6628 $rec_ref->[$n-1]->orig());
6631 return join "/", @res;
6634 sub header_indexes_sorted {
6635 # Sort headers first by number then by name.
6636 # E.g.: 1a 1b 11a 11b
6638 # Indexes of %Global::input_source_header sorted
6639 my $max_col = shift;
6641 no warnings 'numeric';
6642 for my $col (1 .. $max_col) {
6643 # Make sure the header is defined. If it is not: use column number
6644 if(not defined $Global::input_source_header{$col}) {
6645 $Global::input_source_header{$col} = $col;
6648 my @header_indexes_sorted = sort {
6649 # Sort headers numerically then asciibetically
6650 $Global::input_source_header{$a} <=> $Global::input_source_header{$b}
6652 $Global::input_source_header{$a} cmp $Global::input_source_header{$b}
6654 return @header_indexes_sorted;
6660 # The length of the command line with args substituted
6663 # Add length of the original command with no args
6664 # Length of command w/ all replacement args removed
6665 $len += $self->{'len'}{'noncontext'} + @{$self->{'command'}} -1;
6666 ::debug("length", "noncontext
+ command
: $len\n");
6667 my $recargs = $self->number_of_recargs();
6668 if($self->{'context_replace'}) {
6669 # Context is duplicated for each arg
6670 $len += $recargs * $self->{'len'}{'context'};
6671 for my $replstring (keys %{$self->{'replacecount'}}) {
6672 # If the replacements string is more than once: mulitply its length
6673 $len += $self->{'len'}{$replstring} *
6674 $self->{'replacecount'}{$replstring};
6675 ::debug("length", $replstring, " ", $self->{'len'}{$replstring}, "*",
6676 $self->{'replacecount'}{$replstring}, "\n");
6678 # echo 11 22 33 44 55 66 77 88 99 1010
6679 # echo 1 2 3 4 5 6 7 8 9 10 1 2 3 4 5 6 7 8 9 10
6681 ::debug("length", "Ctxgrp
: ", $self->{'len'}{'contextgroups'},
6682 " Groups
: ", $self->{'len'}{'noncontextgroups'}, "\n");
6683 # Add space between context groups
6684 $len += ($recargs-1) * ($self->{'len'}{'contextgroups'});
6686 # Each replacement string may occur several times
6687 # Add the length for each time
6688 $len += 1*$self->{'len'}{'context'};
6689 ::debug("length", "context
+noncontext
+ command
: $len\n");
6690 for my $replstring (keys %{$self->{'replacecount'}}) {
6691 # (space between regargs + length of replacement)
6692 # * number this replacement is used
6693 $len += ($recargs -1 + $self->{'len'}{$replstring}) *
6694 $self->{'replacecount'}{$replstring};
6698 # Pessimistic length if --nice is set
6699 # Worse than worst case: every char needs to be quoted with \
6702 if($Global::quoting) {
6703 # Pessimistic length if -q is set
6704 # Worse than worst case: every char needs to be quoted with \
6707 if($opt::shellquote) {
6708 # Pessimistic length if --shellquote is set
6709 # Worse than worst case: every char needs to be quoted with \ twice
6712 # If we are using --env, add the prefix for that, too.
6713 $len += $Global::envvarlen;
6723 # $replaced = command with place holders replaced and prepended
6725 if(not defined $self->{'replaced'}) {
6726 # Don't quote arguments if the input is the full command line
6727 my $quote_arg = $Global::noquote ? 0 : not $Global::quoting;
6728 $self->{'replaced'} = $self->replace_placeholders($self->{'command'},$Global::quoting,$quote_arg);
6729 my $len = length $self->{'replaced'};
6730 if ($len != $self->len()) {
6731 ::debug("length", $len, " != ", $self->len(), " ", $self->{'replaced'}, "\n");
6733 ::debug("length", $len, " == ", $self->len(), " ", $self->{'replaced'}, "\n");
6736 return $self->{'replaced'};
6739 sub replace_placeholders {
6740 # Replace foo{}bar with fooargbar
6742 # $targetref = command as shell words
6743 # $quote = should everything be quoted?
6744 # $quote_arg = should replaced arguments be quoted?
6746 # @target with placeholders replaced
6748 my $targetref = shift;
6750 my $quote_arg = shift;
6751 my $context_replace = $self->{'context_replace'};
6752 my @target = @$targetref;
6753 ::debug("replace
", "Replace
@target\n");
6754 # -X = context replace
6755 # maybe multiple input sources
6758 # @target is empty: Return empty array
6761 # Fish out the words that have replacement strings in them
6765 ::debug("replace
", "Target
: $tt");
6767 # a{=1 $_=$_ =}b{= $_=$_ =}c{= $_=$_ =}d
6768 # a\257<1 $_=$_ \257>b\257< $_=$_ \257>c\257< $_=$_ \257>d
6769 # A B C => aAbA B CcA B Cd
6770 # -X A B C => aAbAcAd aAbBcBd aAbCcCd
6772 if($context_replace) {
6773 while($tt =~ s/([^\s\257]* # before {=
6776 [^\257]*? # The perl expression
6778 [^\s\257]* # after =}
6780 # $1 = pre \257 perlexpr \257 post
6784 while($tt =~ s/( (?: \257<([^\257]*?)\257>) )//x) {
6785 # $f = \257 perlexpr \257
6790 my @word = keys %word;
6794 for my $record (@{$self->{'arg_list'}}) {
6795 # $self->{'arg_list'} = [ [Arg11, Arg12], [Arg21, Arg22], [Arg31, Arg32] ]
6796 # Merge arg-objects from records into @arg for easy access
6797 CORE::push @arg, @$record;
6799 # Add one arg if empty to allow {#} and {%} to be computed only once
6800 if(not @arg) { @arg = (Arg->new("")); }
6801 # Number of arguments - used for positional arguments
6804 # This is actually a CommandLine-object,
6805 # but it looks nice to be able to say {= $job->slot() =}
6807 for my $word (@word) {
6808 # word = AB \257< perlexpr \257> CD \257< perlexpr \257> EF
6810 ::debug("replace
", "Replacing
in $w\n");
6812 # Replace positional arguments
6813 $w =~ s< ([^\s\257]*) # before {=
6815 (-?\d+) # Position (eg. -2 or 3)
6816 ([^\257]*?) # The perl expression
6818 ([^\s\257]*) # after =}
6820 { $1. # Context (pre)
6822 $arg[$2 > 0 ? $2-1 : $n+$2] ? # If defined: replace
6823 $arg[$2 > 0 ? $2-1 : $n+$2]->replace($3,$quote_arg,$self)
6825 .$4 }egx;# Context (post)
6826 ::debug("replace
", "Positional replaced
$word with
: $w\n");
6829 # No more replacement strings in $w: No need to do more
6831 CORE::push(@{$replace{::shell_quote($word)}}, $w);
6833 CORE::push(@{$replace{$word}}, $w);
6838 # compute replacement for each string
6839 # replace replacement strings with replacement in the word value
6840 # push to replace word value
6841 ::debug("replace
", "Positional done
: $w\n");
6842 for my $arg (@arg) {
6844 my $number_of_replacements = 0;
6845 for my $perlexpr (keys %{$self->{'replacecount'}}) {
6846 # Replace {= perl expr =} with value for each arg
6847 $number_of_replacements +=
6848 $val =~ s{\257<\Q$perlexpr\E\257>}
6849 {$arg ? $arg->replace($perlexpr,$quote_arg,$self) : ""}eg;
6853 $ww = ::shell_quote_scalar($word);
6854 $val = ::shell_quote_scalar($val);
6856 if($number_of_replacements) {
6857 CORE::push(@{$replace{$ww}}, $val);
6863 @target = ::shell_quote(@target);
6865 # ::debug("replace
", "%replace=",::my_dump(%replace),"\n");
6867 # Substitute the replace strings with the replacement values
6868 # Must be sorted by length if a short word is a substring of a long word
6869 my $regexp = join('|', map { my $s = $_; $s =~ s/(\W)/\\$1/g; $s }
6870 sort { length $b <=> length $a } keys %replace);
6872 s/($regexp)/join(" ",@{$replace{$1}})/ge;
6875 ::debug("replace
", "Return
@target\n");
6876 return wantarray ? @target : "@target";
6880 package CommandLineQueue;
6884 my $commandref = shift;
6885 my $read_from = shift;
6886 my $context_replace = shift;
6887 my $max_number_of_args = shift;
6888 my $return_files = shift;
6890 my ($count,%replacecount,$posrpl,$perlexpr,%len);
6891 my @command = @$commandref;
6892 # If the first command start with '-' it is probably an option
6893 if($command[0] =~ /^\s*(-\S+)/) {
6894 # Is this really a command in $PATH starting with '-'?
6896 if(not ::which($cmd)) {
6897 ::error("Command
($cmd) starts with
'-'. Is this a wrong option?
\n");
6898 ::wait_and_exit(255);
6901 # Replace replacement strings with {= perl expr =}
6902 # Protect matching inside {= perl expr =}
6903 # by replacing {= and =} with \257< and \257>
6906 ::error("Command cannot contain the character
\257. Use a function
for that
.\n");
6907 ::wait_and_exit(255);
6909 s/\Q$Global::parensleft\E(.*?)\Q$Global::parensright\E/\257<$1\257>/gx;
6911 for my $rpl (keys %Global::rpl) {
6912 # Replace the short hand string with the {= perl expr =} in $command and $opt::tagstring
6913 # Avoid replacing inside existing {= perl expr =}
6914 for(@command,@Global::ret_files) {
6915 while(s/((^|\257>)[^\257]*?) # Don't replace after \257 unless \257>
6916 \Q$rpl\E/$1\257<$Global::rpl{$rpl}\257>/xg) {
6919 if(defined $opt::tagstring) {
6920 for($opt::tagstring) {
6921 while(s/((^|\257>)[^\257]*?) # Don't replace after \257 unless \257>
6922 \Q$rpl\E/$1\257<$Global::rpl{$rpl}\257>/x) {}
6925 # Do the same for the positional replacement strings
6926 # A bit harder as we have to put in the position number
6928 if($posrpl =~ s/^\{//) {
6929 # Only do this if the shorthand start with {
6930 for(@command,@Global::ret_files) {
6931 s/\{(-?\d+)\Q$posrpl\E/\257<$1 $Global::rpl{$rpl}\257>/g;
6933 if(defined $opt::tagstring) {
6934 $opt::tagstring =~ s/\{(-?\d+)\Q$posrpl\E/\257<$1 $perlexpr\257>/g;
6940 # Count how many times each replacement string is used
6943 my $noncontextlen = 0;
6944 my $contextgroups = 0;
6946 while($c =~ s/ \257<([^\257]*?)\257> /\000/x) {
6947 # %replacecount = { "perlexpr
" => number of times seen }
6948 # e.g { "$_++" => 2 }
6949 $replacecount{$1} ++;
6952 # Measure the length of the context around the {= perl expr =}
6953 # Use that {=...=} has been replaced with \000 above
6954 # So there is no need to deal with \257<
6955 while($c =~ s/ (\S*\000\S*) //x) {
6957 $w =~ tr/\000//d; # Remove all \000's
6958 $contextlen += length($w);
6961 # All {= perl expr =} have been removed: The rest is non-context
6962 $noncontextlen += length $c;
6964 if($opt::tagstring) {
6965 my $t = $opt::tagstring;
6966 while($t =~ s/ \257<([^\257]*)\257> //x) {
6967 # %replacecount = { "perlexpr
" => number of times seen }
6968 # e.g { "$_++" => 2 }
6969 # But for tagstring we just need to mark it as seen
6970 $replacecount{$1}||=1;
6974 $len{'context'} = 0+$contextlen;
6975 $len{'noncontext'} = $noncontextlen;
6976 $len{'contextgroups'} = $contextgroups;
6977 $len{'noncontextgroups'} = @cmd-$contextgroups;
6978 ::debug("length", "@command Context
: ", $len{'context'},
6979 " Non
: ", $len{'noncontext'}, " Ctxgrp
: ", $len{'contextgroups'},
6980 " NonCtxGrp
: ", $len{'noncontextgroups'}, "\n");
6982 # Default command = {}
6983 # If not replacement string: append {}
6985 @command = ("\257<\257>");
6986 $Global::noquote = 1;
6987 } elsif(($opt::pipe or $opt::pipepart)
6988 and not $opt::fifo and not $opt::cat) {
6989 # With --pipe / --pipe-part you can have no replacement
6992 # Append {} to the command if there are no {...}'s and no {=...=}
6993 push @command, ("\257<\257>");
7000 'command' => \@command,
7001 'replacecount' => \%replacecount,
7002 'arg_queue' => RecordQueue->new($read_from,$opt::colsep),
7003 'context_replace' => $context_replace,
7005 'max_number_of_args' => $max_number_of_args,
7007 'return_files' => $return_files,
7009 }, ref($class) || $class;
7014 if(@{$self->{'unget'}}) {
7015 my $cmd_line = shift @{$self->{'unget'}};
7019 $cmd_line = CommandLine->new($self->seq(),
7021 $self->{'arg_queue'},
7022 $self->{'context_replace'},
7023 $self->{'max_number_of_args'},
7024 $self->{'return_files'},
7025 $self->{'replacecount'},
7028 $cmd_line->populate();
7029 ::debug("init
","cmd_line-
>number_of_args ",
7030 $cmd_line->number_of_args(), "\n");
7031 if($opt::pipe or $opt::pipepart) {
7032 if($cmd_line->replaced() eq "") {
7033 # Empty command - pipe requires a command
7034 ::error("--pipe must have a command to
pipe into
(e
.g
. 'cat').\n");
7035 ::wait_and_exit(255);
7038 if($cmd_line->number_of_args() == 0) {
7039 # We did not get more args - maybe at EOF string?
7041 } elsif($cmd_line->replaced() eq "") {
7042 # Empty command - get the next instead
7043 return $self->get();
7046 $self->set_seq($self->seq()+1);
7053 unshift @{$self->{'unget'}}, @_;
7058 my $empty = (not @{$self->{'unget'}}) && $self->{'arg_queue'}->empty();
7059 ::debug("run
", "CommandLineQueue-
>empty $empty");
7065 return $self->{'seq'};
7070 $self->{'seq'} = shift;
7075 # If there is not command emulate |bash
7076 return $self->{'command'};
7081 if(not $self->{'size'}) {
7083 while(not $self->{'arg_queue'}->empty()) {
7084 push @all_lines, CommandLine->new($self->{'command'},
7085 $self->{'arg_queue'},
7086 $self->{'context_replace'},
7087 $self->{'max_number_of_args'});
7089 $self->{'size'} = @all_lines;
7090 $self->unget(@all_lines);
7092 return $self->{'size'};
7096 package Limits::Command;
7098 # Maximal command line length (for -m and -X)
7100 # Find the max_length of a command line and cache it
7102 # number of chars on the longest command line allowed
7103 if(not $Limits::Command::line_max_len) {
7104 # Disk cache of max command line length
7105 my $len_cache = $ENV{'HOME'} . "/.parallel/tmp
/linelen-
" . ::hostname();
7108 open(my $fh, "<", $len_cache) || ::die_bug("Cannot
read $len_cache");
7109 $cached_limit = <$fh>;
7112 $cached_limit = real_max_length();
7113 # If $HOME is write protected: Do not fail
7114 mkdir($ENV{'HOME'} . "/.parallel
");
7115 mkdir($ENV{'HOME'} . "/.parallel/tmp
");
7116 open(my $fh, ">", $len_cache);
7117 print $fh $cached_limit;
7120 $Limits::Command::line_max_len = $cached_limit;
7121 if($opt::max_chars) {
7122 if($opt::max_chars <= $cached_limit) {
7123 $Limits::Command::line_max_len = $opt::max_chars;
7125 ::warning("Value
for -s option
",
7126 "should be
< $cached_limit.\n");
7130 return $Limits::Command::line_max_len;
7133 sub real_max_length {
7134 # Find the max_length of a command line
7136 # The maximal command line length
7137 # Use an upper bound of 8 MB if the shell allows for for infinite long lengths
7138 my $upper = 8_000_000;
7141 if($len > $upper) { return $len };
7143 } while (is_acceptable_command_line_length($len));
7144 # Then search for the actual max length between 0 and upper bound
7145 return binary_find_max_length(int($len/16),$len);
7148 sub binary_find_max_length {
7149 # Given a lower and upper bound find the max_length of a command line
7151 # number of chars on the longest command line allowed
7152 my ($lower, $upper) = (@_);
7153 if($lower == $upper or $lower == $upper-1) { return $lower; }
7154 my $middle = int (($upper-$lower)/2 + $lower);
7155 ::debug("init
", "Maxlen
: $lower,$upper,$middle : ");
7156 if (is_acceptable_command_line_length($middle)) {
7157 return binary_find_max_length($middle,$upper);
7159 return binary_find_max_length($lower,$middle);
7163 sub is_acceptable_command_line_length {
7164 # Test if a command line of this length can run
7166 # 0 if the command line length is too long
7171 open (STDERR, ">", "/dev/null
");
7172 system "true
"."x
"x$len;
7174 ::debug("init
", "$len=$? ");
7179 package RecordQueue;
7188 # Open one file with colsep
7189 $arg_sub_queue = RecordColQueue->new($fhs);
7191 # Open one or more files if multiple -a
7192 $arg_sub_queue = MultifileQueue->new($fhs);
7197 'arg_sub_queue' => $arg_sub_queue,
7198 }, ref($class) || $class;
7203 # reference to array of Arg-objects
7205 if(@{$self->{'unget'}}) {
7206 $self->{'arg_number'}++;
7207 return shift @{$self->{'unget'}};
7209 my $ret = $self->{'arg_sub_queue'}->get();
7210 if(defined $Global::max_number_of_args
7211 and $Global::max_number_of_args == 0) {
7212 ::debug("run
", "Read
1 but
return 0 args
\n");
7213 return [Arg->new("")];
7221 ::debug("run
", "RecordQueue-unget
'@_'\n");
7222 $self->{'arg_number'} -= @_;
7223 unshift @{$self->{'unget'}}, @_;
7228 my $empty = not @{$self->{'unget'}};
7229 $empty &&= $self->{'arg_sub_queue'}->empty();
7230 ::debug("run
", "RecordQueue-
>empty $empty");
7236 return $self->{'arg_number'};
7240 package RecordColQueue;
7246 my $arg_sub_queue = MultifileQueue->new($fhs);
7249 'arg_sub_queue' => $arg_sub_queue,
7250 }, ref($class) || $class;
7255 # reference to array of Arg-objects
7257 if(@{$self->{'unget'}}) {
7258 return shift @{$self->{'unget'}};
7260 my $unget_ref=$self->{'unget'};
7261 if($self->{'arg_sub_queue'}->empty()) {
7264 my $in_record = $self->{'arg_sub_queue'}->get();
7265 if(defined $in_record) {
7266 my @out_record = ();
7267 for my $arg (@$in_record) {
7268 ::debug("run
", "RecordColQueue
::arg
$arg\n");
7269 my $line = $arg->orig();
7270 ::debug("run
", "line
='$line'\n");
7272 for my $s (split /$opt::colsep/o, $line, -1) {
7273 push @out_record, Arg->new($s);
7276 push @out_record, Arg->new("");
7279 return \@out_record;
7287 ::debug("run
", "RecordColQueue-unget
'@_'\n");
7288 unshift @{$self->{'unget'}}, @_;
7293 my $empty = (not @{$self->{'unget'}} and $self->{'arg_sub_queue'}->empty());
7294 ::debug("run
", "RecordColQueue-
>empty $empty");
7299 package MultifileQueue;
7301 @Global::unget_argv=();
7306 for my $fh (@$fhs) {
7308 ::warning("Input
is read from the terminal
. ".
7309 "Only experts
do this on purpose
. ".
7310 "Press CTRL-D to
exit.\n");
7314 'unget' => \@Global::unget_argv,
7316 'arg_matrix' => undef,
7317 }, ref($class) || $class;
7323 return $self->xapply_get();
7325 return $self->nest_get();
7331 ::debug("run
", "MultifileQueue-unget
'@_'\n");
7332 unshift @{$self->{'unget'}}, @_;
7337 my $empty = (not @Global::unget_argv
7338 and not @{$self->{'unget'}});
7339 for my $fh (@{$self->{'fhs'}}) {
7340 $empty &&= eof($fh);
7342 ::debug("run
", "MultifileQueue-
>empty $empty ");
7348 if(@{$self->{'unget'}}) {
7349 return shift @{$self->{'unget'}};
7352 my $prepend = undef;
7354 for my $fh (@{$self->{'fhs'}}) {
7355 my $arg = read_arg_from_fh($fh);
7357 # Record $arg for recycling at end of file
7358 push @{$self->{'arg_matrix'}{$fh}}, $arg;
7362 ::debug("run
", "EOA
");
7363 # End of file: Recycle arguments
7364 push @{$self->{'arg_matrix'}{$fh}}, shift @{$self->{'arg_matrix'}{$fh}};
7365 # return last @{$args->{'args'}{$fh}};
7366 push @record, @{$self->{'arg_matrix'}{$fh}}[-1];
7378 if(@{$self->{'unget'}}) {
7379 return shift @{$self->{'unget'}};
7382 my $prepend = undef;
7384 my $no_of_inputsources = $#{$self->{'fhs'}} + 1;
7385 if(not $self->{'arg_matrix'}) {
7386 # Initialize @arg_matrix with one arg from each file
7387 # read one line from each file
7390 for (my $fhno = 0; $fhno < $no_of_inputsources ; $fhno++) {
7391 my $arg = read_arg_from_fh($self->{'fhs'}[$fhno]);
7395 $self->{'arg_matrix'}[$fhno][0] = $arg || Arg->new("");
7396 push @first_arg_set, $self->{'arg_matrix'}[$fhno][0];
7399 # All filehandles were at eof or eof-string
7402 return [@first_arg_set];
7405 # Treat the case with one input source special. For multiple
7406 # input sources we need to remember all previously read values to
7407 # generate all combinations. But for one input source we can
7408 # forget the value after first use.
7409 if($no_of_inputsources == 1) {
7410 my $arg = read_arg_from_fh($self->{'fhs'}[0]);
7416 for (my $fhno = $no_of_inputsources - 1; $fhno >= 0; $fhno--) {
7417 if(eof($self->{'fhs'}[$fhno])) {
7421 my $arg = read_arg_from_fh($self->{'fhs'}[$fhno]);
7422 defined($arg) || next; # If we just read an EOF string: Treat this as EOF
7423 my $len = $#{$self->{'arg_matrix'}[$fhno]} + 1;
7424 $self->{'arg_matrix'}[$fhno][$len] = $arg;
7425 # make all new combinations
7427 for (my $fhn = 0; $fhn < $no_of_inputsources; $fhn++) {
7428 push @combarg, [0, $#{$self->{'arg_matrix'}[$fhn]}];
7430 $combarg[$fhno] = [$len,$len]; # Find only combinations with this new entry
7432 # [ 1, 3, 7 ], [ 2, 4, 1 ]
7434 # [ m[0][1], m[1][3], m[3][7] ], [ m[0][2], m[1][4], m[2][1] ]
7436 for my $c (expand_combinations(@combarg)) {
7438 for my $n (0 .. $no_of_inputsources - 1 ) {
7439 push @a, $self->{'arg_matrix'}[$n][$$c[$n]];
7443 # append the mapped to the ungotten arguments
7444 push @{$self->{'unget'}}, @mapped;
7446 return shift @{$self->{'unget'}};
7449 # all are eof or at EOF string; return from the unget queue
7450 return shift @{$self->{'unget'}};
7453 sub read_arg_from_fh {
7454 # Read one Arg from filehandle
7456 # Arg-object with one read line
7457 # undef if end of file
7459 my $prepend = undef;
7462 # This makes 10% faster
7463 if(not ($arg = <$fh>)) {
7464 if(defined $prepend) {
7465 return Arg->new($prepend);
7470 # ::debug("run
", "read $arg\n");
7473 if($Global::end_of_file_string and
7474 $arg eq $Global::end_of_file_string) {
7475 # Ignore the rest of input file
7477 ::debug("run
", "EOF-string
($arg) met
\n");
7478 if(defined $prepend) {
7479 return Arg->new($prepend);
7484 if(defined $prepend) {
7485 $arg = $prepend.$arg; # For line continuation
7486 $prepend = undef; #undef;
7488 if($Global::ignore_empty) {
7489 if($arg =~ /^\s*$/) {
7490 redo; # Try the next line
7493 if($Global::max_lines) {
7495 # Trailing space => continued on next line
7500 }} while (1 == 0); # Dummy loop {{}} for redo
7502 return Arg->new($arg);
7504 ::die_bug("multiread arg undefined
");
7508 sub expand_combinations {
7510 # ([xmin,xmax], [ymin,ymax], ...)
7511 # Returns: ([x,y,...],[x,y,...])
7512 # where xmin <= x <= xmax and ymin <= y <= ymax
7513 my $minmax_ref = shift;
7514 my $xmin = $$minmax_ref[0];
7515 my $xmax = $$minmax_ref[1];
7518 # If there are more columns: Compute those recursively
7519 my @rest = expand_combinations(@_);
7520 for(my $x = $xmin; $x <= $xmax; $x++) {
7521 push @p, map { [$x, @$_] } @rest;
7524 for(my $x = $xmin; $x <= $xmax; $x++) {
7538 if($opt::hostgroups) {
7539 if($orig =~ s:@(.+)::) {
7540 # We found hostgroups on the arg
7541 @hostgroups = split(/\+/, $1);
7542 if(not grep { defined $Global::hostgroups{$_} } @hostgroups) {
7543 ::warning("No such hostgroup
(@hostgroups)\n");
7544 @hostgroups = (keys %Global::hostgroups);
7547 @hostgroups = (keys %Global::hostgroups);
7552 'hostgroups' => \@hostgroups,
7553 }, ref($class) || $class;
7557 # Calculates the corresponding value for a given perl expression
7559 # The calculated string (quoted if asked for)
7561 my $perlexpr = shift; # E.g. $_=$_ or s/.gz//
7562 my $quote = (shift) ? 1 : 0; # should the string be quoted?
7563 # This is actually a CommandLine-object,
7564 # but it looks nice to be able to say {= $job->slot() =}
7566 $perlexpr =~ s/^-?\d+ //; # Positional replace treated as normal replace
7567 if(not defined $self->{"rpl
",0,$perlexpr}) {
7569 if($Global::trim eq "n
") {
7570 $_ = $self->{'orig'};
7572 $_ = trim_of($self->{'orig'});
7574 ::debug("replace
", "eval ", $perlexpr, " ", $_, "\n");
7575 if(not $Global::perleval{$perlexpr}) {
7576 # Make an anonymous function of the $perlexpr
7577 # And more importantly: Compile it only once
7578 if($Global::perleval{$perlexpr} =
7579 eval('sub { no strict; no warnings; my $job = shift; '.
7583 # The eval failed. Maybe $perlexpr is invalid perl?
7584 ::error("Cannot
use $perlexpr: $@\n");
7585 ::wait_and_exit(255);
7588 # Execute the function
7589 $Global::perleval{$perlexpr}->($job);
7590 $self->{"rpl
",0,$perlexpr} = $_;
7592 if(not defined $self->{"rpl
",$quote,$perlexpr}) {
7593 $self->{"rpl
",1,$perlexpr} =
7594 ::shell_quote_scalar($self->{"rpl
",0,$perlexpr});
7596 return $self->{"rpl
",$quote,$perlexpr};
7601 return $self->{'orig'};
7605 # Removes white space as specifed by --trim:
7611 # string with white space removed as needed
7612 my @strings = map { defined $_ ? $_ : "" } (@_);
7614 if($Global::trim eq "n
") {
7616 } elsif($Global::trim eq "l
") {
7617 for my $arg (@strings) { $arg =~ s/^\s+//; }
7618 } elsif($Global::trim eq "r
") {
7619 for my $arg (@strings) { $arg =~ s/\s+$//; }
7620 } elsif($Global::trim eq "rl
" or $Global::trim eq "lr
") {
7621 for my $arg (@strings) { $arg =~ s/^\s+//; $arg =~ s/\s+$//; }
7623 ::error("--trim must be one of
: r l rl lr
.\n");
7624 ::wait_and_exit(255);
7626 return wantarray ? @strings : "@strings";
7630 package TimeoutQueue;
7634 my $delta_time = shift;
7636 if($delta_time =~ /(\d+(\.\d+)?)%/) {
7637 # Timeout in percent
7639 $delta_time = 1_000_000;
7643 'delta_time' => $delta_time,
7645 'remedian_idx' => 0,
7646 'remedian_arr' => [],
7647 'remedian' => undef,
7648 }, ref($class) || $class;
7653 return $self->{'delta_time'};
7656 sub set_delta_time {
7658 $self->{'delta_time'} = shift;
7663 return $self->{'remedian'};
7667 # Set median of the last 999^3 (=997002999) values using Remedian
7669 # Rousseeuw, Peter J., and Gilbert W. Bassett Jr. "The remedian
: A
7670 # robust averaging method for large data sets." Journal of the
7671 # American Statistical Association 85.409 (1990): 97-104.
7674 my $i = $self->{'remedian_idx'}++;
7675 my $rref = $self->{'remedian_arr'};
7676 $rref->[0][$i%999] = $val;
7677 $rref->[1][$i/999%999] = (sort @{$rref->[0]})[$#{$rref->[0]}/2];
7678 $rref->[2][$i/999/999%999] = (sort @{$rref->[1]})[$#{$rref->[1]}/2];
7679 $self->{'remedian'} = (sort @{$rref->[2]})[$#{$rref->[2]}/2];
7682 sub update_delta_time
{
7683 # Update delta_time based on runtime of finished job if timeout is
7686 my $runtime = shift;
7687 if($self->{'pct'}) {
7688 $self->set_remedian($runtime);
7689 $self->{'delta_time'} = $self->{'pct'} * $self->remedian();
7690 ::debug
("run", "Timeout: $self->{'delta_time'}s ");
7694 sub process_timeouts
{
7695 # Check if there was a timeout
7697 # $self->{'queue'} is sorted by start time
7698 while (@{$self->{'queue'}}) {
7699 my $job = $self->{'queue'}[0];
7700 if($job->endtime()) {
7701 # Job already finished. No need to timeout the job
7702 # This could be because of --keep-order
7703 shift @{$self->{'queue'}};
7704 } elsif($job->timedout($self->{'delta_time'})) {
7705 # Need to shift off queue before kill
7706 # because kill calls usleep that calls process_timeouts
7707 shift @{$self->{'queue'}};
7710 # Because they are sorted by start time the rest are later
7719 push @{$self->{'queue'}}, $in;
7725 # This package provides a counting semaphore
7727 # If a process dies without releasing the semaphore the next process
7728 # that needs that entry will clean up dead semaphores
7730 # The semaphores are stored in ~/.parallel/semaphores/id-<name> Each
7731 # file in ~/.parallel/semaphores/id-<name>/ is the process ID of the
7732 # process holding the entry. If the process dies, the entry can be
7733 # taken by another process.
7739 $id=~s/([^-_a-z0-9])/unpack("H*",$1)/ige; # Convert non-word chars to hex
7740 $id="id-".$id; # To distinguish it from a process id
7741 my $parallel_dir = $ENV{'HOME'}."/.parallel";
7742 -d
$parallel_dir or mkdir_or_die
($parallel_dir);
7743 my $parallel_locks = $parallel_dir."/semaphores";
7744 -d
$parallel_locks or mkdir_or_die
($parallel_locks);
7745 my $lockdir = "$parallel_locks/$id";
7746 my $lockfile = $lockdir.".lock";
7747 if($count < 1) { ::die_bug
("semaphore-count: $count"); }
7749 'lockfile' => $lockfile,
7750 'lockfh' => Symbol
::gensym
(),
7751 'lockdir' => $lockdir,
7753 'idfile' => $lockdir."/".$id,
7755 'pidfile' => $lockdir."/".$$.'@'.::hostname
(),
7756 'count' => $count + 1 # nlinks returns a link for the 'id-' as well
7757 }, ref($class) || $class;
7762 my $sleep = 1; # 1 ms
7763 my $start_time = time;
7765 $self->atomic_link_if_count_less_than() and last;
7766 ::debug
("sem", "Remove dead locks");
7767 my $lockdir = $self->{'lockdir'};
7768 for my $d (glob "$lockdir/*") {
7769 ::debug
("sem", "Lock $d $lockdir\n");
7770 $d =~ m
:$lockdir/([0-9]+)\
@([-\
._a-z0-9
]+)$:o
or next;
7771 my ($pid, $host) = ($1, $2);
7772 if($host eq ::hostname
()) {
7773 if(not kill 0, $1) {
7774 ::debug
("sem", "Dead: $d");
7777 ::debug
("sem", "Alive: $d");
7782 $self->atomic_link_if_count_less_than() and last;
7783 # Retry slower and slower up to 1 second
7784 $sleep = ($sleep < 1000) ?
($sleep * 1.1) : ($sleep);
7785 # Random to avoid every sleeping job waking up at the same time
7786 ::usleep
(rand()*$sleep);
7787 if(defined($opt::timeout
) and
7788 $start_time + $opt::timeout
> time) {
7789 # Acquire the lock anyway
7790 if(not -e
$self->{'idfile'}) {
7791 open (my $fh, ">", $self->{'idfile'}) or
7792 ::die_bug
("timeout_write_idfile: $self->{'idfile'}");
7795 link $self->{'idfile'}, $self->{'pidfile'};
7799 ::debug
("sem", "acquired $self->{'pid'}\n");
7804 unlink $self->{'pidfile'};
7805 if($self->nlinks() == 1) {
7806 # This is the last link, so atomic cleanup
7808 if($self->nlinks() == 1) {
7809 unlink $self->{'idfile'};
7810 rmdir $self->{'lockdir'};
7814 ::debug
("run", "released $self->{'pid'}\n");
7820 unlink $self->{'pidfile'};
7822 my $nlinks = $self->nlinks();
7823 ::debug
("sem", $nlinks, "<", $self->{'count'});
7825 unlink $self->{'idfile'};
7826 open (my $fh, ">", $self->{'idfile'}) or
7827 ::die_bug
("write_idfile: $self->{'idfile'}");
7828 print $fh "#"x
$nlinks;
7831 unlink $self->{'idfile'};
7832 rmdir $self->{'lockdir'};
7835 ::debug
("sem", "released $self->{'pid'}\n");
7838 sub atomic_link_if_count_less_than
{
7839 # Link $file1 to $file2 if nlinks to $file1 < $count
7843 ::debug
($self->nlinks(), "<", $self->{'count'});
7844 if($self->nlinks() < $self->{'count'}) {
7845 -d
$self->{'lockdir'} or mkdir_or_die
($self->{'lockdir'});
7846 if(not -e
$self->{'idfile'}) {
7847 open (my $fh, ">", $self->{'idfile'}) or
7848 ::die_bug
("write_idfile: $self->{'idfile'}");
7851 $retval = link $self->{'idfile'}, $self->{'pidfile'};
7854 ::debug
("run", "atomic $retval");
7858 sub _atomic_link_if_count_less_than
{
7859 # Link $file1 to $file2 if nlinks to $file1 < $count
7863 my $nlinks = $self->nlinks();
7864 ::debug
("sem", $nlinks, "<", $self->{'count'});
7865 if($nlinks++ < $self->{'count'}) {
7866 -d
$self->{'lockdir'} or mkdir_or_die
($self->{'lockdir'});
7867 if(not -e
$self->{'idfile'}) {
7868 open (my $fh, ">", $self->{'idfile'}) or
7869 ::die_bug
("write_idfile: $self->{'idfile'}");
7872 open (my $fh, ">", $self->{'idfile'}) or
7873 ::die_bug
("write_idfile: $self->{'idfile'}");
7874 print $fh "#"x
$nlinks;
7876 $retval = link $self->{'idfile'}, $self->{'pidfile'};
7879 ::debug
("sem", "atomic $retval");
7885 if(-e
$self->{'idfile'}) {
7886 ::debug
("sem", "nlinks", (stat(_
))[3], "size", (stat(_
))[7], "\n");
7887 return (stat(_
))[3];
7895 my $sleep = 100; # 100 ms
7896 my $total_sleep = 0;
7897 $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
7899 while(not $locked) {
7900 if(tell($self->{'lockfh'}) == -1) {
7902 open($self->{'lockfh'}, ">", $self->{'lockfile'})
7903 or ::debug("run
", "Cannot
open $self->{'lockfile'}");
7905 if($self->{'lockfh'}) {
7907 chmod 0666, $self->{'lockfile'}; # assuming you want it a+rw
7908 if(flock($self->{'lockfh'}, LOCK_EX()|LOCK_NB())) {
7909 # The file is locked: No need to retry
7913 if ($! =~ m/Function not implemented/) {
7914 ::warning("flock: $!");
7915 ::warning("Will
wait for a random
while\n");
7916 ::usleep(rand(5000));
7917 # File cannot be locked: No need to retry
7923 # Locking failed in first round
7924 # Sleep and try again
7925 $sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep);
7926 # Random to avoid every sleeping job waking up at the same time
7927 ::usleep(rand()*$sleep);
7928 $total_sleep += $sleep;
7929 if($opt::semaphoretimeout) {
7930 if($total_sleep/1000 > $opt::semaphoretimeout) {
7932 ::warning("Semaphore timed out
. Ignoring timeout
.");
7937 if($total_sleep/1000 > 30) {
7938 ::warning("Semaphore stuck
for 30 seconds
. Consider using
--semaphoretimeout
.");
7942 ::debug("run
", "locked
$self->{'lockfile'}");
7947 unlink $self->{'lockfile'};
7948 close $self->{'lockfh'};
7949 ::debug("run
", "unlocked
\n");
7953 # If dir is not writable: die
7955 my @dir_parts = split(m:/:,$dir);
7957 while(defined ($part = shift @dir_parts)) {
7958 $part eq "" and next;
7964 ::error("Cannot
write to
$dir: $!\n");
7965 ::wait_and_exit(255);
7969 # Keep perl -w happy
7970 $opt::x = $Semaphore::timeout = $Semaphore::wait =
7971 $Job::file_descriptor_warning_printed = 0;