#!/usr/bin/perl
$ENV{PERL_RL}=" o=0";
$SIG{PIPE} = 'IGNORE';
package settings;
use constant FD => 0;
use constant SOCKET => 1;
use constant USE_SOCKETPAIR => 0;
package arguments;
sub get_config();
sub initialize_terminal();
sub fetch_arguments(@);
sub fetch_line($);
sub restore_last_argument();
sub restore_last_command();
sub get_next_command();
sub get_next_argument();
sub read_buffer_data($);
sub delete_buffer($);
sub get_data_buffer();
sub get_next_stuff($);
sub get_parameters();
sub skip_command_separator();
sub terminate_arguments_parsing();
sub register_option($$$$@);
sub args_file($);
sub machines_file($);
sub machine($);
sub localhost($);
sub not_root();
sub print_defaults();
sub print_package($);
sub print_version();
sub print_version_quit();
sub init();
sub parse_options();
sub check_separators_integrity();
sub set_option($$);
package command;
sub new (%);
sub run ($);
sub cleanup ();
sub my_shutdown($$);
sub close($);
sub output($$);
sub event_msg($);
package communicator;
sub init();
sub get_connections($);
sub get_root();
sub get_control();
sub get_outgoing();
sub process_messages($$);
sub process_message($$);
sub process_command_output($$);
sub process_pipe_output($$);
sub run ();
sub terminate ();
sub add_descriptors($);
sub remove_descriptor($$);
sub no_pending_connectors();
sub add_connector($$);
sub connector_initialized($);
sub remove_from_set($$);
sub remove_connector($);
sub remove_local_command($);
sub remove_pipe($);
sub post_write($$@);
sub post_close($);
sub post_termination($$$);
sub cleanup_pending_stuff($);
sub close_command_read($);
package connector;
sub new (%);
sub read_data ($);
sub send_parameters();
sub get_message ();
sub send_message($);
sub route_file_part($$);
sub send_file($$$);
sub pack();
sub unpack($);
sub cancel();
package diagnostic;
sub print_info($$$);
sub system ();
sub debug ($);
sub error ($);
sub warning ($);
sub special ($);
package general;
sub init();
sub expand($%);
sub open_file($);
sub load_file($);
sub load_taktuk_code();
sub load_taktuk_package($);
sub print_help();
sub print_help_quit();
sub decode_host_set($);
sub first_in_host($);
sub increment_host_vector($$);
sub host_vector_to_value($$);
sub decode_exclusion_set($);
sub in_exclusion_set($$);
sub add_peer(@);
sub print($);
sub print_tree($$);
package handlers;
sub register_handler($$);
sub replace_handler($$);
sub get_handler($);
sub handler_blocked($$$);
sub arguments($$$);
sub broadcast($$$);
sub eof($$$);
sub execute_timeout($);
sub execute($$$);
sub file_finalize($);
sub file($$$);
sub forward_up($$$);
sub get($$$);
sub get_info($$$);
sub input($$$);
sub kill($$$);
sub message($$$);
sub numbered($$$);
sub output($$$);
sub option($$$);
sub options($$$);
sub pipe($$$);
sub position($$$);
sub put($$$);
sub quit($$$);
sub ready($$$);
sub recv_timeout($);
sub reduce_count($$$$);
sub reduce_tree($$$$);
sub reduce($$$);
sub reduce_status_analysis($);
sub reduce_result($$$);
sub check_ongoing_reduces($);
sub resign($$$);
sub send_to($$$);
sub spread($$$);
sub steal($$$);
sub synchronize($$$);
sub taktuk_code($$$);
sub taktuk_perl($$$);
sub update_failed($$$);
sub wait_message($$$);
sub work($$$);
sub init();
package main;
sub is_one_of($@);
sub get_attributes();
sub translate();
sub manage_children_termination();
sub fork_taktuk_interpreter();
sub handle_message($);
sub process_commands();
package my_select;
sub new();
sub add($);
sub remove($);
sub select($$$$);
sub handles();
package taktuk;
sub no_flush($);
sub unpack($);
sub pack($);
sub decode($);
sub encode($$);
sub syswrite($$);
sub read_data($);
sub get_message($);
sub find_sequence($$);
sub flush_buffer($);
sub error_msg($);
sub send(%);
sub recv(%);
sub wait_message(@);
sub get($);
sub decode_set($);
sub encode_set(@);
our $VERSION = "3.6.1";
our $RELEASE = sprintf "%d", q$Rev: 453 $ =~ /(\d+)/g;
our $read_size = 32768;
our $write_size = 32768;
our $error = undef;
our $action="A";
our $eof="D";
our $taktuk_perl="E";
our $get="G";
our $invalid="H";
our $info="I";
our $option="N";
our $timeout="O";
our $put="P";
our $reduce_result="Q";
our $reduce="R";
our $spread="S";
our $taktuk_code="T";
our $update_failed="U";
our $options="V";
our $wait_message="W";
our $resign="X";
our $arguments="a";
our $broadcast="b";
our $down="d";
our $execute="e";
our $file="f";
our $get_info="g";
our $input="i";
our $kill="k";
our $message="m";
our $numbered="n";
our $output="o";
our $position="p";
our $quit="q";
our $ready="r";
our $steal="s";
our $send_to="t";
our $forward_up="u";
our $work="w";
our $synchronize="x";
our $pipe="z";
our $reduce_count = 'c';
our $reduce_tree = 't';
package scheduler;
sub deploy_connector($);
sub add_connector($);
sub connector_initialized($);
sub connector_failed($);
sub schedule();
sub is_idle();
sub send_work();
sub dispatch_work($);
sub theft_handler($$);
package stats_buffer;
sub new($$);
sub is_empty($);
sub add($$);
sub average($);
sub min($);
sub max($);
package synchronizer;
sub check_ready_state();
sub set_not_ready();
sub initialization_complete($);
sub initialization_failed($);
sub initialization_timeout($);
sub set_not_numbered();
sub block_until_event($@);
sub dispatch_event($);
sub add_pending_message($$$);
sub get_state($);
sub setup_synchronization();
use constant TAKTUK_READY => 0;
use constant TAKTUK_NUMBERED => 1;
use constant TAKTUK_TERMINATED => 2;
use constant CONNECTION_FAILED => 3;
use constant CONNECTION_INITIALIZED => 4;
use constant CONNECTION_LOST => 5;
use constant COMMAND_STARTED => 6;
use constant COMMAND_FAILED => 7;
use constant COMMAND_TERMINATED => 8;
use constant UPDATE_FAILED => 9;
use constant PIPE_STARTED => 10;
use constant PIPE_FAILED => 11;
use constant PIPE_TERMINATED => 12;
use constant FILE_RECEPTION_STARTED =>13;
use constant FILE_RECEPTION_FAILED =>14;
use constant FILE_RECEPTION_TERMINATED =>15;
package timer;
sub current_time();
sub register($$);
sub check_timeouts();
sub unregister();
sub gettime($);
sub print($);
package general;
use strict; use bytes;
use Sys::Hostname;
our $connector_command;
our $connector_timeout;
our $connector_send_files;
our $login_name;
our $host=hostname;
chomp($host);
our $root=1;
our $taktuk_command;
our $self_propagate;
our $taktuk_code = undef;
our $taktuk_package = undef;
our $template;
our $redirect;
our $current_position = 0;
our $position = 0;
our $child_min = -1;
our $child_max = -1;
our $numbering_update = 0;
our %taktuk = (rank=>-1, count=>-1);
sub init()
{
$ENV{TAKTUK_HOSTNAME} = $host;
}
sub expand($%)
{
my $string = shift;
my $variables = "";
my %values = @_;
foreach my $key (keys(%values))
{
$variables .= "$key=$values{$key} &&";
}
diagnostic::debug("Name $string");
$string = `$variables echo $string`
if $string =~ m/[\$\`]/;
chomp($string);
diagnostic::debug("Expanded to $string");
return $string;
}
sub open_file($)
{
my $filename = shift;
my $fd = undef;
$filename = expand($filename);
if ($filename and ($filename ne "-"))
{
if (not open $fd, $filename)
{
diagnostic::system;
diagnostic::error("Trying to open $filename");
$fd = undef;
}
else
{
binmode($fd) or diagnostic::system;
}
}
else
{
if ($main::interactive)
{
diagnostic::error("Cannot load STDIN in interactive mode");
}
elsif (not $general::root)
{
diagnostic::error("Cannot load STDIN on a non root node");
}
else
{
$fd = \*STDIN;
}
}
return $fd;
}
sub load_file($)
{
my $filename = shift;
my $fd = undef;
my $new_data;
my $end = 0;
my $stuff="";
my $result;
$fd = open_file($filename);
return "" if (not defined($fd));
while (not $end)
{
$result = sysread($fd, $new_data, $taktuk::read_size);
(diagnostic::system and $result = 0) if not defined($result);
if ($result > 0)
{
$stuff = $stuff.$new_data;
}
else
{
$end = 1;
}
} 
if ($fd != \*STDIN)
{
close $fd or diagnostic::system;
}
return $stuff;
}
sub load_taktuk_code()
{
if (not defined($taktuk_code))
{
my $filename = $0;
my $fd = undef;
my $new_data;
my $end = 0;
my $result;
if (not -r $filename)
{
$filename = `which $0`;
}
if ($filename)
{
if (not $fd = open_file($filename))
{
diagnostic::error("Trying to open $filename");
return "";
}
}
else
{
diagnostic::error("Cannot find own taktuk executable");
return "";
}
$taktuk_code = "";
while (not $end)
{
$result = sysread($fd, $new_data, $taktuk::read_size);
(diagnostic::system and $result = 0) if not defined($result);
if ($result > 0)
{
$taktuk_code = $taktuk_code.$new_data;
}
else
{
$end = 1;
}
} 
close $fd or diagnostic::system;
}
}
sub load_taktuk_package($)
{
my $package_name = shift;
my $state = 0;
my $last_pos;
if (not defined($taktuk_package))
{
load_taktuk_code;
$taktuk_package = "";
my $last_pos = 0;
while ($taktuk_code =~ /(?=package\s+([^;\s]*)\s*;\s*$)/gmi)
{
my $pos = pos($taktuk_code);
$taktuk_package .= substr($taktuk_code,$last_pos, $pos - $last_pos)
if $state;
if ($1 eq $package_name)
{
$state = 1;
}
else
{
$state = 0;
}
$last_pos = $pos;
}
$taktuk_package .= substr($taktuk_code, $last_pos) if $state;
$taktuk_package .= "1;\n";
}
}
sub print_help()
{
communicator::get_root->output('info', <<END_HELP);
Usage :
taktuk [ options ] [ commands ]
If one or more commands are given, TakTuk will execute them sequentially.
Otherwise, TakTuk enter interactive mode, waiting for commands.
Commands are (braces can be replaced by any delimiter):
<set> command : sends the command to all the peers of the set
(see TakTuk manual for set specification syntax)
broadcast command : broadcasts TakTuk command to all the remote peers
not including the node initiating the broadcast
downcast command : spreads TakTuk command to all the children of the
node initiating the command (not including itself)
exec <param> [ command ] : executes the given shell command on the local node
using optional execution parameters
get [ src ] [ dest ] : copies some file from destination node(s)
help : prints this help
input [ data ] : sends input to all commands being executed locally
input data [ data ] : same as input [ data ]
input line [ data ] : same as input but adds a newline at the end of data
input file [ filename ] : sends the content of a file as input
input pipe [ filename ] : sends the content of a pipe-like file as input
input close : closes stdin of all commands being executed locally
kill <signal> : sends a signal to local commands processes groups
(signal default to 15 if not given)
network : prints the TakTuk deployment tree
network state : same as network
network cancel : cancels all ongoing connections
network renumber : recomputes TakTuk logical numbering
network update : updates TakTuk logical numbering if possible
option name [ value ] : changes some TakTuk option on destination node(s)
option [ line ] : gives TakTuk options line to destination node(s)
put [ src ] [ dest ] : copies some local file to destination node(s)
quit : quit TakTuk and shutdown its logical network
synchronize command : forces the following command to wait for deployment,
numbering and preceding commands before executing
taktuk_perl [ args ] : forks a perl interpreter on the local node aware of
the taktuk package and communication routines (see
manual for a description of the taktuk package which
contains point-to-point communication routines).
Similar in principle to 'exec perl args'.
WARNING: due to limited parser, you have to give
args (even if empty) and to use '--' to explicitely
terminate switches if any
version : prints TakTuk version
Deployment options:
-c, --connector command : defines the connector commands used for following
remote machines
-d, --dynamic limit : turns dynamic mode on (work stealing) for all the
following remote machines specifications.
Uses limit as a maximal arity (0 = no limit).
A negative value for limit turns dynamic mode off.
Warning, currently it is a bad idea to use several
-d options on the same command line
-f, --machines-file name : name is the name of a file that contains remote
machines names (equivalent to several -m opions)
-l, --login name : sets the login name for the following hosts
-m, --machine name -[ -] : name is the name of a remote host to be deployed
with its optional arguments between -[ and -]
-s, --self-propagate : propagate the TakTuk executable through connectors
(eliminates the need for a TakTuk installation on
remote machines)
-z, --dont-self-propagate: turns self propagation off
-F, --args-file name : name is the name of a file that contains options
-L, --localhost name : sets the name of localhost as viewed by TakTuk
-S, --send-files files : send files at connection for further connections
-T, --taktuk-command com : com is the name of the TakTuk command that should
be used for the following hosts
Command line parsing options:
-C, --command-separator : changes the set of characters considered as command
separators set separators
-E, --escape-character : defines an escape character that protects any
character following character from TakTuk interpretation
-O, --option-separator : changes the set of characters considered as option
separators set separators
I/O options:
-o, --output-template : set an output template specification for one of the
name=specification output streams (see man for details about
templates). When giving only a name (without
specification) this disables the stream
-R, --output-redirect : sets an output redirection for one of the output
name=number streams (see man for details about output streams)
Performance tuning options:
-g, --time-granularity n : sets to n the maximal interval between timeouts
checks (usually checks are made more often)
-n, --no-numbering : disable taktuk nodes numbering, speed up deployment
-t, --timeout time : sets the timeout for connectors (0 = no timeout)
-w, --window number : sets initial window size to number (pipeline width)
-W, --window-adaptation : sets the windows adaptation scheme to number
number (0: no adaptation, 1: experimental algorithm)
Miscellaneous options:
-M, --my : makes the next option local (not inherited)
-h, --help : prints this help
-i, --interactive : forces interactive mode afer command line parsing
-v, --version : prints TakTuk version and exits
-P, --print-defaults : prints default settings
Environment variables defined by TakTuk for commands execution (see man for
details about variables that change TakTuk settings):
TAKTUK_COUNT : total number of successfully deployed nodes
TAKTUK_HOSTNAME : hostname of the local node as given to TakTuk
TAKTUK_PIDS : pids list of commands executed by the local TakTuk
TAKTUK_POSITION : host position on the command line
TAKTUK_RANK : logical number of local node (in [1..TAKTUK_COUNT])
END_HELP
}
sub print_help_quit()
{
print_help;
$main::terminate = 1;
}
sub decode_host_set($)
{
my $host_set = shift;
my $full_set = [];
$host_set = "" if not defined($host_set);
while (length($host_set))
{
$host_set =~ m/^((?:[^[,]*(?:\[[^]]*\])?)*)(?:,(.*))?$/o;
my $host_entry = $1;
$host_set = defined($2)?$2:"";
if (length($host_entry))
{
my $set;
$set = [];
diagnostic::debug("Host entry $host_entry");
while ($host_entry =~ s/^([^[]*)\[([^]]*)\](.*)$/$3/o)
{
my ($prefix, $set_spec) = ($1,$2);
my $values;
$values = [];
push @$set, $prefix;
foreach my $range (split /,/, $set_spec)
{
$range =~ m/^([a-zA-Z]*\d*)(?:-([a-zA-Z]*\d*))?$/o;
my ($min, $max) = ($1,$2);
$max = $min if not defined($max);
if (not $min or (length($min) > length($max)) or
((length($min) == length($max)) and ($min gt $max)))
{
diagnostic::error("Error in set spec");
exit 1;
}
else
{
push @$values, $min, $max;
}
}
push @$set, $values;
}
push @$set, $host_entry;
push @$full_set, $set if scalar(@$set);
}
}
return $full_set;
}
sub first_in_host($)
{
my $host_spec = shift;
my $range_mode = 0;
my $result = [];
foreach my $part (@$host_spec)
{
if ($range_mode)
{
push @$result, 0, $part->[0];
}
$range_mode = not $range_mode;
}
return $result;
}
sub increment_host_vector($$)
{
my $vector = shift;
my $vector_index = $#$vector - 1;
my $host = shift;
my $host_index = $#$host - 1;
my $carry = 1;
while ($carry and ($vector_index >= 0))
{
my $position = $vector->[$vector_index];
my $value = $vector->[$vector_index+1];
my $range = $host->[$host_index];
if ($value eq $range->[$position+1])
{
$position += 2;
if ($position < $#$range)
{
$vector->[$vector_index] = $position;
$vector->[$vector_index+1] = $range->[$position];
$carry = 0;
}
else
{
$vector->[$vector_index] = 0;
$vector->[$vector_index+1] = $range->[0];
}
}
else
{
$value = "".$value;
$value++;
$vector->[$vector_index+1] = $value;
$carry = 0;
}
$vector_index -= 2;
$host_index -= 2;
}
return $carry;
}
sub host_vector_to_value($$)
{
my $vector = shift;
my $vector_index = 1;
my $host = shift;
my $range_mode = 0;
my $result = "";
foreach my $part (@$host)
{
if ($range_mode)
{
$result .= $vector->[$vector_index];
$vector_index += 2;
}
else
{
$result .= $part;
}
$range_mode = not $range_mode;
}
return $result;
}
sub decode_exclusion_set($)
{
my $set = decode_host_set(shift);
my $hash = {};
foreach my $host_spec (@$set)
{
my $index = first_in_host($host_spec);
my $carry;
do
{
my $name = host_vector_to_value($index, $host_spec);
$hash->{$name} = 1;
$carry = increment_host_vector($index, $host_spec);
}
while (not $carry);
}
return $hash;
}
sub in_exclusion_set($$)
{
my $value = shift;
my $set = shift;
return exists($set->{$value});
}
sub add_peer(@)
{
my $actual_command;
my $peer_name = shift;
my ($command_args, $message_args) = arguments::get_config;
if ($self_propagate)
{
$actual_command = "echo $connector::connector_functional_string\\;".
"exec perl -- - -r ";
}
else
{
$actual_command = "exec $taktuk_command -r ";
}
$actual_command .= "-n " if $taktuk{rank} > -1;
$actual_command = $actual_command.$command_args." ";
foreach my $element (@_)
{
$message_args = $message_args.taktuk::pack($element);
}
$peer_name = expand($peer_name);
my ($set, $exclusion) = split '/', $peer_name;
$set = decode_host_set($set);
$exclusion = decode_exclusion_set($exclusion);
foreach my $host_spec (@$set)
{
my $index = first_in_host($host_spec);
my $carry;
do
{
my $value = host_vector_to_value($index, $host_spec);
if (not in_exclusion_set($value, $exclusion))
{
$current_position++;
my $position_prefix = $position?$position.".":"";
my $connector=connector::new(
login =>
defined($login_name)?$login_name:"",
peer => $value,
taktuk => $actual_command,
arguments => $message_args,
position => $position_prefix.
$current_position,
propagate => $self_propagate,
files => $connector_send_files,
timeout => $connector_timeout);
diagnostic::debug("New external connector command to $value:".
" $connector->{line}");
scheduler::add_connector($connector);
}
$carry = increment_host_vector($index, $host_spec);
}
while (not $carry);
}
}
sub print($)
{
my $message=shift;
taktuk::syswrite(\*STDOUT,$message) or diagnostic::system;
}
sub print_tree($$)
{
my $prefix = shift;
my $data = shift;
communicator::get_root->output('info',$prefix.(shift @$data)."\n");
foreach my $subtree (@$data)
{
print_tree($prefix." ",$subtree);
}
}
package my_select;
use strict; use bytes;
sub new ()
{
my $data = { vector=>'', handles=>[] };
bless($data);
return $data;
}
sub add ($)
{
my $self = shift;
my $handle = shift;
if (vec($self->{vector}, fileno($handle), 1) == 0)
{
vec($self->{vector}, fileno($handle), 1) = 1;
push @{$self->{handles}}, $handle;
}
else
{
diagnostic::warning("Descriptor already in set");
}
}
sub remove ($)
{
my $self = shift;
my $handle = shift;
my $i=0;
my $handles_list = $self->{handles};
if (vec($self->{vector}, fileno($handle), 1) == 1)
{
vec($self->{vector}, fileno($handle), 1) = 0;
while (($i <= $#$handles_list) and ($handles_list->[$i] != $handle))
{
$i++;
}
if ($i <= $#$handles_list)
{
splice @$handles_list, $i, 1;
return 1;
}
else
{
diagnostic::warning("Didn't found descriptor to remove");
return 0;
}
}
else
{
diagnostic::warning("Descriptor not in set");
}
}
sub select ($$$$)
{
my $read_select = shift;
my $write_select = shift;
my $except_select = shift;
my $timeout = shift;
my $rin = defined($read_select)?$read_select->{vector}:undef;
my $win = defined($write_select)?$write_select->{vector}:undef;
my $ein = defined($except_select)?$except_select->{vector}:undef;
my $rout;
my $wout;
my $eout;
my ($nfound, $timeleft) =
CORE::select($rout=$rin, $wout=$win, $eout=$ein, $timeout);
if ($nfound == -1)
{
return ();
}
elsif ($nfound == 0)
{
return ([],[],[]);
}
else
{
my $read_set = [];
my $write_set = [];
my $except_set = [];
my $handle;
foreach $handle (@{$read_select->{handles}})
{
push(@$read_set, $handle) if (vec($rout,fileno($handle),1) == 1);
}
foreach $handle (@{$write_select->{handles}})
{
push(@$write_set, $handle) if (vec($wout,fileno($handle),1) == 1);
}
foreach $handle (@{$except_select->{handles}})
{
push(@$except_set, $handle) if (vec($eout,fileno($handle),1) == 1);
}
return ($read_set, $write_set, $except_set);
}
}
sub handles ()
{
my $self=shift;
return @{$self->{handles}};
}
package arguments;
use strict; use bytes;
our $has_readline = eval("use Term::ReadLine;1")?1:0;
our $terminal = undef;
our $command_separator;
our $option_separator;
our $escape;
our $local_option=0;
use constant COMMAND_LINE => 0;
use constant MESSAGE => 1;
use constant NONE => 0;
use constant BUFFER => 1;
use constant LINE => 2;
use constant ARGUMENT => 3;
our %handler;
our %long_name;
our %short_name;
our %type;
our %transmission_mode;
our %current_value;
our @current_config;
our $current_config_outdated = 1;
our $arguments_ended = 1;
our $again=0;
our @arguments = ();
our $current_argument = undef;
our $current_command = undef;
our $current_command_separators = undef;
our $current_stuff = undef;
our $current_separators = undef;
our $current_separators_prefix = undef;
our $is_first_argument = 0;
our $unescaped_stuff = undef;
our $options_ended=0;
sub get_config()
{
if ($current_config_outdated)
{
my $command_args = "";
my $message_args = "";
foreach my $key (keys(%current_value))
{
if (exists($transmission_mode{$key}))
{
if ($transmission_mode{$key} == COMMAND_LINE)
{
if (UNIVERSAL::isa($current_value{$key}, 'HASH'))
{
foreach my $field (keys(%{$current_value{$key}}))
{
$command_args.=" -$key$field";
$command_args.="=$current_value{$key}->{$field}"
if defined($current_value{$key}->{$field});
}
}
else
{
$command_args.=" -$key";
$command_args.="$current_value{$key}" if ($type{$key});
}
}
elsif ($transmission_mode{$key} == MESSAGE)
{
if (UNIVERSAL::isa($current_value{$key}, 'HASH'))
{
foreach my $field (keys(%{$current_value{$key}}))
{
$message_args.=taktuk::pack("-$key");
$message_args.=taktuk::pack("$field".
(defined($current_value{$key}->{$field})?
"=$current_value{$key}->{$field}":""));
}
}
else
{
$message_args.=taktuk::pack("-$key");
$message_args.=taktuk::pack($current_value{$key})
if ($type{$key});
}
}
else
{
diagnostic::warning("Internal bug in get_config");
}
}
}
diagnostic::debug("Config : $command_args | ".$message_args."\n");
@current_config = ($command_args, $message_args);
$current_config_outdated = 0;
}
return @current_config;
}
sub initialize_terminal()
{
if ($has_readline)
{
$terminal = Term::ReadLine->new("TakTuk", \*STDIN, \*STDOUT);
}
}
sub fetch_arguments(@)
{
if (scalar(@_))
{
unshift @arguments, @_;
$arguments_ended = 0;
$options_ended = 0;
}
}
sub fetch_line($)
{
my $line = shift;
unshift @arguments, \$line;
$arguments_ended = 0;
$options_ended = 0;
}
sub restore_last_argument()
{
if ($again)
{
diagnostic::warning("Cannot restore an ungeted argument");
}
else
{
$again = 1;
}
}
sub restore_last_command()
{
if ($current_stuff)
{
$current_stuff = $current_command.$current_command_separators.
$current_stuff;
}
else
{
$current_stuff = $current_command;
$current_separators = $current_command_separators.$current_separators;
}
restore_last_argument if not $again;
}
sub get_next_command()
{
get_next_stuff($option_separator.$command_separator);
if ($current_stuff =~
m/^(.*?)([$option_separator.$command_separator]+)(.*)$/g) 
{
$current_command = $1;
$current_command_separators = $2;
$current_stuff = $3;
restore_last_argument if length($current_stuff);
}
else
{
$current_command = $current_stuff;
$current_command_separators = $current_separators;
$current_stuff = "";
$current_separators = "";
}
if ($current_command_separators =~ m/[$command_separator]/)
{
$current_stuff = $current_command_separators.$current_stuff;
$current_command_separators = "";
restore_last_argument if not $again;
}
diagnostic::debug("Command found : [$current_command]");
return $current_command;
}
sub get_next_argument()
{
get_next_stuff($option_separator);
$current_argument = $current_stuff;
diagnostic::debug("Argument found : [$current_argument]");
return $current_stuff;
}
our %buffer;
our %end;
sub read_buffer_data($)
{
my $argument = shift;
my $old_pos = undef;
my $new_data;
my $result;
if (defined($terminal) and ($argument == \*STDIN))
{
$new_data = $terminal->readline("");
if (defined($new_data))
{
$new_data .= "\n";
$result = length($new_data);
}
else
{
$result = 0;
}
}
else
{
$result = sysread($argument,$new_data,$taktuk::read_size);
}
$old_pos = pos($buffer{$argument});
(diagnostic::system and $result = 0) if not defined($result);
if ($result)
{
$buffer{$argument} .= $new_data if $result;
pos($buffer{$argument}) = $old_pos;
}
else
{
$end{$argument} = 1;
CORE::close $argument if ($argument != \*STDIN);
}
return $result;
}
sub delete_buffer($)
{
my $argument = shift;
diagnostic::warning("Bug") if not exists($end{$argument});
delete $buffer{$argument};
delete $end{$argument};
}
sub get_data_buffer()
{
my $data = undef;
my $data_type = NONE;
my $args_remaining = 1;
$args_remaining = 0 if not scalar(@arguments);
while (not defined($data) and $args_remaining)
{
my $argument = $arguments[0];
if (ref($argument) eq 'GLOB')
{
if ($end{$argument})
{
delete_buffer($argument);
shift @arguments;
$args_remaining = 0 if not scalar(@arguments);
}
else
{
$buffer{$argument} = "" if not exists($buffer{$argument});
read_buffer_data($argument) if not length($buffer{$argument});
if (length($buffer{$argument}))
{
$data = \$buffer{$argument};
$data_type = BUFFER;
}
}
}
elsif (ref($argument) eq 'SCALAR')
{
if (length($$argument))
{
$data = $argument;
$data_type = LINE;
}
else
{
shift @arguments;
$args_remaining = 0 if not scalar(@arguments);
}
}
else
{
$data = \$arguments[0];
$data_type = ARGUMENT;
}
}
return ($data, $data_type);
}
sub get_next_stuff($)
{
my $separator = shift;
my $done = 0;
my $data = 1;
my $data_type;
if ($again)
{
$again = 0;
}
else
{
$current_stuff = "";
$current_separators = "";
$current_separators_prefix = "";
my $data_prefix = "";
while (not $done and defined($data))
{
($data, $data_type) = get_data_buffer;
if (defined($data))
{
if ($data_type == ARGUMENT)
{
diagnostic::debug("Got argument");
$current_stuff = $$data;
shift @arguments;
$is_first_argument = 1;
$done = 1;
}
else
{
diagnostic::debug("Got buffer");
$$data = $data_prefix.$$data if $data_prefix;
$data_prefix = "";
my $regexp;
my $escape_found = 1;
if (defined($escape))
{
$regexp = qr/([$separator$escape])/;
}
else
{
$regexp = qr/([$separator])/;
}
while ($escape_found)
{
$escape_found = 0;
if ($$data =~ m/$regexp/g)
{
my $character = $1;
if (defined($escape) and ($character eq $escape))
{
if (pos($$data) >= length($$data))
{
$data_prefix =
substr $$data,length($$data)-1,1;
$current_stuff .=
substr $$data,0,length($$data)-1;
$$data = "";
}
else
{
$escape_found = 1;
pos($$data) = pos($$data) + 1;
}
}
else
{
my $position = pos($$data);
$current_separators = $character;
if ($$data =~ m/\G([$separator]+)/g)
{
$current_separators .= $1;
$position = defined(pos($$data))?
pos($$data):length($$data);
}
else
{
pos($$data) = $position;
}
$current_stuff .= substr $$data, 0,
$position-length($current_separators);
$$data = substr $$data, $position;
if (length($current_stuff))
{
$is_first_argument = 0;
$done = 1;
}
else
{
$current_separators_prefix .=
$current_separators;
}
}
}
else
{
$current_stuff .= $$data;
$$data = "";
$done = 1 if $data_type == LINE;
}
}
}
}
}
$unescaped_stuff = $current_stuff;
if (defined($escape))
{
$current_stuff =~ s/$escape(.)/$1/g;
}
$arguments_ended = 1 if not length($current_stuff);
}
}
sub get_parameters()
{
my $left_brace = undef;
my $right_brace;
my $parameters = "";
my $end_expression;
get_next_stuff($option_separator);
if (length($current_stuff) != 1)
{
if (length($current_stuff))
{
diagnostic::error("TakTuk has changed: now commands parameters ".
"should be separated from braces in $current_stuff");
}
else
{
diagnostic::error("Missing brace in parameters");
}
return undef;
}
$left_brace = $current_stuff;
$right_brace = $left_brace;
if ($left_brace =~ m/[({[]/)
{
$right_brace =~ tr/({[/)}]/;
}
my $separator = $option_separator.$command_separator;
my $last_separators = "";
my $done = 0;
get_next_stuff($separator);
$current_separators_prefix =~ s/^[^$command_separator]+//;
while (not $done)
{
if ($arguments_ended)
{
diagnostic::error("Missing closing brace or separator before ".
"closing brace in $parameters");
return undef;
}
elsif (length($parameters) and ($unescaped_stuff eq $right_brace))
{
my $remaining = $last_separators.$current_separators_prefix;
$remaining =~ s/[^$command_separator]+$//;
$parameters .= $remaining;
if ($current_separators =~ /[$command_separator]/)
{
$current_stuff = $current_separators;
$current_separators = "";
restore_last_argument;
}
$done = 1;
}
elsif (length($parameters) and $is_first_argument and
($unescaped_stuff =~ s/^$right_brace([$separator].*)$/$1/))
{
$current_stuff = substr $current_stuff,1;
restore_last_argument if ($current_stuff =~ /[^$option_separator]/);
$done = 1;
}
else
{
$parameters .= $last_separators.$current_separators_prefix.
$current_stuff;
$last_separators = $current_separators;
$last_separators = " " if not length($last_separators);
get_next_stuff($separator);
}
}
diagnostic::debug("Parameters found : [$parameters]");
return $parameters;
}
sub skip_command_separator()
{
my $done = 0;
my $data;
my $data_type = NONE;
if ($again)
{
$data = \$current_stuff;
}
else
{
($data, $data_type) = get_data_buffer;
}
while (not $done and not $arguments_ended)
{
if (defined($data))
{
if ($$data =~ s/^[$option_separator]*[$command_separator]+[$option_separator]*//)
{
$done = 1;
shift @arguments if (not length($$data) and
($data_type == ARGUMENT));
}
else
{
if ($$data =~ m/^[$option_separator]*$/)
{
$$data = "";
shift @arguments if ($data_type == ARGUMENT);
($data, $data_type) = get_data_buffer;
}
else
{
diagnostic::warning("Missing command separator");
$done = 1;
}
}
}
else
{
$arguments_ended = 1;
}
}
$again = 0 if not length($current_stuff);
}
sub terminate_arguments_parsing()
{
while (scalar(@arguments))
{
my $argument = shift @arguments;
close $argument if UNIVERSAL::isa($argument,'GLOB');
}
$arguments_ended = 1;
}
sub register_option($$$$@)
{
my $short = shift;
my $long = shift;
my $opt_type = shift;
my $opt_handler = shift;
$long_name{$short} = $long;
$short_name{$long} = $short;
$handler{$short} = $opt_handler;
$type{$short} = $opt_type;
if (scalar(@_))
{
if (UNIVERSAL::isa($opt_handler, 'CODE'))
{
&$opt_handler(shift);
}
else
{
$$opt_handler = shift;
}
if (scalar(@_))
{
$transmission_mode{$short} = shift;
}
}
}
sub args_file($)
{
my $file = shift;
my $file_handle = undef;
if (not ($file_handle = general::open_file($file)))
{
$main::terminate = 1;
}
else
{
fetch_arguments($file_handle);
}
}
sub dont_self_propagate()
{
if ($general::self_propagate)
{
$general::self_propagate = 0;
delete $current_value{"s"};
$current_config_outdated = 1;
}
}
sub machines_file($)
{
my $file = shift;
my $file_handle = undef;
if (not ($file_handle = general::open_file($file)))
{
$main::terminate = 1;
}
else
{
while (my $line = <$file_handle>)
{
my ($peer_name, $comment) = split /\s+/,$line,2;
general::add_peer($peer_name) if $peer_name;
}
close $file_handle if $file_handle != \*STDIN;
}
}
sub machine($)
{
my $peer = shift;
my @peer_arguments=();
get_next_argument;
if ($current_argument eq "-[")
{
my $count = 1;
while ($count and !$arguments_ended)
{
get_next_argument;
$count-- if ($current_argument eq "-]");
$count++ if ($current_argument eq "-[");
push @peer_arguments, $current_argument;
}
if ($count)
{
diagnostic::error("Invalid -[ and -] imbrication");
exit(1);
}
else
{
pop @peer_arguments;
}
}
else
{
restore_last_argument;
}
general::add_peer($peer,@peer_arguments);
}
sub localhost($)
{
my $hostname = shift;
$general::host = $hostname;
$ENV{TAKTUK_HOSTNAME} = $hostname;
}
sub not_root()
{
if ($general::root)
{
my $connector = connector::new('write' =>\*STDIN,
'read' =>\*STDOUT,
'type' =>settings::FD);
communicator::add_connector($connector, 'sources');
$general::root = 0;
}
}
sub print_defaults()
{
foreach my $key (keys(%long_name))
{
if (not UNIVERSAL::isa($handler{$key}, 'CODE'))
{
my $name = $long_name{$key};
if ($type{$key} =~ m/^h.$/)
{
foreach my $field (keys(%${$handler{$key}}))
{
my $line = uc("TAKTUK_".$name."_".$field);
my $value = defined(${$handler{$key}}->{$field})?
${$handler{$key}}->{$field}:"";
$line =~ tr/-/_/;
communicator::get_root->output('info',
$line."=".$value."\n");
}
}
else
{
my $line = uc("TAKTUK_".$name);
my $value = defined(${$handler{$key}})?
${$handler{$key}}:"";
$line =~ tr/-/_/;
communicator::get_root->output('info', $line."=".$value."\n");
}
}
}
$main::terminate = 1;
} 
sub print_package($)
{
my $package_name = shift;
general::load_taktuk_package($package_name);
communicator::get_root->output('info',$general::taktuk_package);
$main::terminate = 1;
}
sub print_version()
{
communicator::get_root->output('info',
"TakTuk version $taktuk::VERSION release $taktuk::RELEASE\n");
}
sub print_version_quit()
{
print_version;
$main::terminate = 1;
}
sub print_k()
{
my $source="A-Za-z0-9 ,.;:?!";
my $dest="M-Za-z0-9 ,.;:?!A-L";
my $line = <<END;
XmFo0zru563qFp'03mzsq4Fp6FP3Fe530zsFJ
Uzs3qpuqz54FJFDwsFp'03mzsq4GF:wsFpqF46o3qFqzF106p3q
Ym5q3uqxFJF6zqFnm44uzqFqzFo6u73qFym44urF(oqxxqFpqF7053qFs3mzp-yq3q)
Yqxmzsq,FA..sFpqF46o3qFmF6zF1q6Fp'qm6Fotm6pqFpqFymzuq3qFmFxqFpu4406p3q
505mxqyqz5HFRmu5q4Fom3myqxu4q3FxqF4u301F3q46x5mz5FmFrq6F7urHFgzqFr0u4FxmFn0zzq
5quz5qF0n5qz6qGF45011q,FxmFom3myqxu4m5u0zFqzF Fmv065mz5F6zF1q6Fp'qm6
n06uxxmz5qHFb3qxq7q,Fxq4F,q45q4FpqF;GBwsFpq4F03mzsq4Fq5F3q4q37q,Fxq4HFbqxq,F( 
o0y13u4FxmF1m35uqFnxmzotq)F5065q4Fxq4F03mzsq4GFo061q,Fxq4FqzF1q5u54Fy03oqm69Fq5
mv065q,Fxq4Fm6Fom3myqxHFO0y1xq5q,FxqFyqxmzsqFm7qoFxqF3q45qFp6F46o3qFq5Fxq4
,q45q4Fruzqyqz5Fqyuzoq4HFb035q,FmFqn6xxu5u0zF16u4Frmu5q4Fyuv05q3FBFmFDtFmFrq6
53q4Fp069GFv6426'mF0n5qzu3F6zqFo0z4u45mzoqF4u361q64qHFUxFzqF3q45qF1x64F26'm
qy105q3F(m55qz5u0zFmFxmFo0z4u45mzoqFJFxqFyqxmzsqFq1mu44u5FqzF3qr30upu44mz5)H
END
eval "\$line =~ tr/$dest/$source/";
communicator::get_root->output('info', $line);
$main::terminate = 1;
}
sub init()
{
register_option("B","worksteal-behavior","hs", \$scheduler::worksteal,
{
initial => 1,
growth => '$last_given * 2',
limit => '$available / 2'
}, MESSAGE);
register_option("C","command-separator","s",
\$arguments::command_separator,',;\n',MESSAGE);
register_option("D","debug","hi",\$diagnostic::package_level,
{ default=>2 },COMMAND_LINE);
register_option("E","escape-character","s",\$arguments::escape,undef,
MESSAGE);
register_option("F","args-file","s",\&args_file);
register_option("L","localhost","s",\&localhost);
register_option("M","my","",\$arguments::local_option);
register_option("O","option-separator","s",
\$arguments::option_separator,'\s',MESSAGE);
register_option("P","print-defaults","",\&print_defaults);
register_option("R","output-redirect","hi",\$general::redirect,
{ "default" => 1, "taktuk" => 2 }, MESSAGE);
register_option("S","send-files","s",\$general::connector_send_files,
"", MESSAGE);
register_option("T","taktuk-command","s",\$general::taktuk_command,
$0,MESSAGE);
register_option("W","window-adaptation","i",\$scheduler::window_adaptation,
0,MESSAGE);
register_option("c","connector","s",\$general::connector_command,
"ssh -o StrictHostKeyChecking=no -o BatchMode=yes", MESSAGE);
register_option("d","dynamic","i",\$scheduler::dynamic_limit, 0, MESSAGE);
register_option("f","machines-file","s",\&machines_file);
register_option("g","time-granularity","f",\$communicator::select_timeout,
1, MESSAGE);
register_option("h","help","",\&general::print_help_quit);
register_option("i","interactive","",\$main::forced_interactive);
register_option("k","cook","",\&arguments::print_k);
register_option("l","login","s",\$general::login_name, undef, MESSAGE);
register_option("m","machine","s",\&machine);
register_option("n","no-numbering","",\$main::no_numbering,0,COMMAND_LINE);
register_option("o","output-template","hs",\$general::template,
{
default => '"$host-$rank: $command ($pid): $type > $line\n"',
connector => '"$host: $peer ($pid): $type > $line\n"',
info => '"$line$eol"',
state => '($line == synchronizer::CONNECTION_FAILED)?'.
'"$host: $peer ($pid): $type > ".'.
'event_msg($line)."\n":undef',
status => '"$host-$rank: $command ($pid): $type > ".
"Exited with status $line\n"',
taktuk => '"[ TAKTUK $level_name ] $host (PID $pid) Line ".
"$line_number ($package) Release $release\n$line\n"'
}, MESSAGE);
register_option("p","print-package","s",\&print_package);
register_option("r","not-root","",\&not_root);
register_option("s","self-propagate","",\$general::self_propagate,
0,MESSAGE);
register_option("t","timeout","f",\$general::connector_timeout,0,MESSAGE);
register_option("v","version","",\&print_version_quit);
register_option("w","window","i",\$scheduler::window, 10, MESSAGE);
register_option("z","dont-self-propagate","",\&dont_self_propagate);
foreach my $full_name (keys(%ENV))
{
my $variable;
if ($full_name =~ m/^TAKTUK_(MY_)?(.+)$/)
{
$local_option = $1?1:0;
$variable = $2;
}
else
{
undef($variable);
}
if (defined($variable))
{
my $prefix;
my $qualifier;
$variable = lc($variable);
$variable =~ m/^(.*?)(?:_([^_]+))?$/;
($prefix, $qualifier) = ($1, $2);
$prefix =~ tr/_/-/;
if (defined($qualifier))
{
if (exists($short_name{$prefix}))
{
my $short = $short_name{$prefix};
set_option($short, "$qualifier=$ENV{$full_name}");
}
else
{
$prefix .= "-$qualifier";
$qualifier = undef;
}
}
if (not defined($qualifier))
{
if (exists($short_name{$prefix}))
{
my $short = $short_name{$prefix};
set_option($short, $ENV{$full_name});
}
else
{
diagnostic::warning("Unknown setting $full_name");
}
}
}
$local_option = 0;
}
}
sub parse_options()
{
get_next_argument();
while (not $options_ended)
{
if ($current_argument =~ s/^-//o)
{
my @names;
if ($current_argument =~ s/^-(.*)$//o)
{
my $name = $1;
if ($name)
{
my $short = $short_name{$name};
if (not defined($short))
{
diagnostic::warning("Unknown long option $name");
general::print_help_quit if ($general::root);
}
else
{
$current_argument = $short;
}
}
else
{
$options_ended = 1;
}
}
while ($current_argument)
{
$current_argument =~ s/^(.)//o;
my $name = $1;
if (exists($type{$name}))
{
if ($type{$name})
{
get_next_argument if not length $current_argument;
if ($arguments_ended)
{
diagnostic::error("Missing argument")
}
else
{
set_option($name, $current_argument);
}
$current_argument = "";
}
else
{
set_option($name, 1);
}
}
else
{
diagnostic::warning("Unknown short option $name");
general::print_help_quit if ($general::root);
}
} 
get_next_argument;
}
else
{
if (($current_argument =~ m/^\s*$/) and not $arguments_ended)
{
get_next_argument;
}
else
{
$options_ended=1;
restore_last_argument if ($current_argument !~ m/^\s*$/);
}
}
}
check_separators_integrity();
}
sub check_separators_integrity()
{
diagnostic::warning("Options and command separators intersect")
if ($option_separator =~ m/[$command_separator]/) or
($command_separator =~ m/[$option_separator]/);
my $chaine =
"-/0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
diagnostic::warning("Options or command separators contain one of :".
"-/1-9a-zA-Z")
if ($chaine =~ m/[$command_separator]/) or
($chaine =~ m/[$option_separator]/);
}
sub set_option($$)
{
my $name = shift;
my $value = shift;
my $key = undef;
my $current_type;
if ($type{$name} =~ m/^h(.)$/)
{
$current_type = $1;
if ($value =~ m/^([^=]+)(?:=(.+))?$/)
{
$key = $1;
$value = $2;
}
else
{
diagnostic::warning("Option $name needs a well formed key=value ".
"pair ($value doesn't match)");
$value = "";
}
}
else
{
$current_type = $type{$name};
}
if (($current_type eq "i") and defined($value) and ($value !~ /^-?\d+$/o))
{
diagnostic::warning("Option $name needs a numeric argument ".
"($value doesn't match)");
}
elsif (($current_type eq "f") and defined($value) and ($value !~
/^(?:\d+(?:\.\d*)?|\.\d+)$/o))
{
diagnostic::warning("Option $name needs a floating point argument ".
"($value doesn't match)");
}
else
{
my $destination = $handler{$name};
if (not $local_option)
{
if (exists($transmission_mode{$name}))
{
if ($key)
{
$current_value{$name} = {}
if not exists $current_value{$name};
$current_value{$name}->{$key} = $value;
}
else
{
$current_value{$name}=$value;
}
$current_config_outdated = 1;
}
}
else
{
$local_option = 0;
}
if (defined($key))
{
if (UNIVERSAL::isa($destination,'CODE'))
{
diagnostic::warning("NOT EXPECTED: Found option $name with ".
"code handler and hash value (key=$key)");
&$destination($key, $value);
}
else
{
diagnostic::debug("Found option $name with scalar ".
"handler and hash value (key=$key)");
$$destination->{$key} = $value;
}
}
else
{
if (UNIVERSAL::isa($destination,'CODE'))
{
diagnostic::debug("Found option $name with code handler and ".
"scalar value ". $value);
&$destination($value);
}
else
{
diagnostic::debug("Found option $name with scalar ".
"handler and scalar value ". $value);
$$destination = $value;
}
}
}
}
package diagnostic;
use strict; use bytes;
our $package_level;
our $depth = 0;
our $level="";
our $level_name="";
our $package="";
our $filename="";
our $line_number="";
our $special_enabled=0;
sub print_info($$$)
{
$level = shift;
$level_name = shift;
($package, $filename, $line_number) = caller(1);
my $level_ok = defined($package_level->{$package})?
$package_level->{$package}<=$level:
$package_level->{default}<=$level;
$depth++;
if ($level_ok and ($depth < 3))
{
communicator::get_root->output('taktuk',shift);
}
$depth--;
}
sub system ()
{
print_info(3,"ERROR : SYSTEM", "Error $!");
}
sub debug ($)
{
print_info(1,"DEBUG", shift);
}
sub error ($)
{
print_info(3,"ERROR", shift);
}
sub warning ($)
{
print_info(2,"WARNING", shift);
}
sub special ($)
{
print_info(3, "SPECIAL", shift) if $special_enabled;
}
package taktuk;
use strict; use bytes;
our %buffer;
sub no_flush($)
{
my $new_fd = shift;
binmode($new_fd);
my $old_fd=select($new_fd);
$|=1;
select($old_fd);
}
sub unpack($)
{
my $buffer = shift;
if (length($buffer) >= 4)
{
my $size;
($size) = CORE::unpack("N",$buffer);
if (length($buffer) >= $size+4)
{
return (substr($buffer, 4, $size), substr($buffer, $size+4));
}
else
{
return (undef, $buffer);
}
}
else
{
return (undef, $buffer);
}
}
sub pack($)
{
my $full_message = shift;
my $size = length($full_message);
return CORE::pack("N",$size).$full_message;
}
sub decode($)
{
my $message = shift;
my $message_code = substr($message, 0, 1);
my $body = substr($message, 1);
return ($message_code, $body);
}
sub encode($$)
{
my $message = shift;
my $body = shift;
return ($message).($body);
}
sub syswrite ($$)
{
my $unrecoverable = 0;
my $write_fd = shift;
my $full_message = shift;
my $result;
my $total_expected = length($full_message);
my $call_expected = $write_size;
my $offset = 0;
while ($total_expected and not $unrecoverable)
{
$call_expected = $total_expected if $call_expected > $total_expected;
$result =
CORE::syswrite($write_fd, $full_message, $call_expected, $offset);
if ($result)
{
$total_expected -= $result;
$offset += $result;
}
else
{
if ($!{EAGAIN})
{
}
else
{
print STDERR "Unrecoverable write error\n";
$unrecoverable = 1;
}
}
}
if ($unrecoverable)
{
return undef;
}
else
{
return 1;
}
}
sub read_data ($)
{
my $descriptor = shift;
my $new_data;
my $result = sysread($descriptor, $new_data, $read_size);
return undef if not defined($result);
if ($result and exists($buffer{$descriptor}))
{
$buffer{$descriptor} .= $new_data;
}
else
{
$buffer{$descriptor} = $new_data;
}
return $result;
}
sub get_message ($)
{
my $descriptor = shift;
if (exists($buffer{$descriptor}))
{
my ($message, $new_buffer) = taktuk::unpack($buffer{$descriptor});
if (defined($new_buffer))
{
$buffer{$descriptor} = $new_buffer;
}
else
{
delete($buffer{$descriptor});
}
if (defined($message))
{
return $message;
}
else
{
return "";
}
}
else
{
return "";
}
}
sub find_sequence($$)
{
my $descriptor = shift;
my $sequence = shift;
my $found = undef;
if (exists($buffer{$descriptor}))
{
my $position;
$position = index($buffer{$descriptor},"\n");
while (($position >= 0) and not defined($found))
{
my $string;
$string = substr($buffer{$descriptor}, 0, $position);
$buffer{$descriptor} = substr($buffer{$descriptor}, $position+1);
if ($string =~ m/($sequence)/)
{
$found = $1;
}
else
{
$position = index($buffer{$descriptor},"\n");
}
}
}
return defined($found)?$found:"";
}
sub flush_buffer($)
{
my $descriptor = shift;
if (exists($buffer{$descriptor}))
{
my $result = $buffer{$descriptor};
delete($buffer{$descriptor});
return $result;
}
else
{
return "";
}
}
our $control_channel_read;
our $control_channel_write;
if ($ENV{TAKTUK_CONTROL_READ})
{
open($control_channel_read, "<&=", $ENV{TAKTUK_CONTROL_READ})
or print("Error opening taktuk control channel : $!\n");
binmode($control_channel_read);
}
if ($ENV{TAKTUK_CONTROL_WRITE})
{
open($control_channel_write, ">&=", $ENV{TAKTUK_CONTROL_WRITE})
or print("Error opening taktuk control channel : $!\n");
no_flush($control_channel_write);
}
use constant ESWRIT=>1;
use constant EFCLSD=>2;
use constant ESREAD=>3;
use constant EARGTO=>4;
use constant EARGBD=>5;
use constant ETMOUT=>6;
use constant EINVST=>7;
use constant EINVAL=>8;
use constant ENOERR=>9;
our @taktuk_errors = (
'"taktuk::syswrite failed, system message : $!"',
'"TakTuk engine closed the communication channel"',
'"sysread error, system message : $!"',
'"field \"to\" not defined"',
'"field \"body\" not defined"',
'"timeouted"',
'"invalid destination set specification"',
'"invalid field required in get"',
'"no error"');
sub error_msg($)
{
my $error = shift;
$error--;
if ($error <= $#taktuk_errors)
{
return eval($taktuk_errors[$error]);
}
else
{
return "Unknown error";
}
}
sub send(%)
{
my %argument = @_;
my $from = $ENV{TAKTUK_RANK};
if (not exists($argument{to}))
{
$error=EARGTO;
return undef;
}
my $to = $argument{to};
if (not exists($argument{body}))
{
$error=EARGBD;
return undef;
}
my $body = $argument{body};
my @send_set = taktuk::decode_set($to);
if (scalar(@send_set))
{
$to = taktuk::encode_set(@send_set);
my $full_message = taktuk::encode($taktuk::send_to,
taktuk::pack($to).
taktuk::encode($taktuk::message,
taktuk::pack($to).
taktuk::pack($from).
$body));
my $result = taktuk::syswrite($control_channel_write,
taktuk::pack($full_message));
$error=ESWRIT if not $result;
return $result?$result:undef;
}
else
{
$error=EINVST;
return undef;
}
}
sub recv(%)
{
my %argument = @_;
my $result;
my $message;
if (exists($argument{timeout}))
{
$message = taktuk::encode($taktuk::wait_message, $argument{timeout});
}
else
{
$message = $taktuk::wait_message;
}
$result = taktuk::syswrite($control_channel_write,taktuk::pack($message));
if (not $result)
{
$error=ESWRIT;
return ();
}
my $message_code;
($message_code,$message) = wait_message($taktuk::timeout,$taktuk::message);
if (defined($message_code))
{
my ($to, $from);
if ($message_code eq $taktuk::timeout)
{
$error=ETMOUT;
return ();
}
($to, $message) = taktuk::unpack($message);
($from, $message) = taktuk::unpack($message);
return ($to, $from, $message);
}
else
{
return ();
}
}
our @messages;
sub wait_message(@)
{
my @codes = @_;
my ($code, $body);
my $result = 1;
my $message;
for (my $i=0; $i<$#messages; $i+=2)
{
foreach my $message_code (@codes)
{
if ($messages[$i] eq $message_code)
{
($code, $body) = ($messages[$i], $messages[$i+1]);
splice @messages, $i, 2;
return ($code, $body);
}
}
}
while ($result)
{
$message = get_message($control_channel_read);
while ($message)
{
($code, $body) = taktuk::decode($message);
foreach my $message_code (@codes)
{
return ($code, $body) if ($message_code eq $code);
}
push @messages, $code, $body;
$message = get_message($control_channel_read);
}
$result = read_data($control_channel_read);
}
if (defined($result))
{
$error=EFCLSD;
}
else
{
$error=ESREAD;
}
return ();
}
sub get($)
{
my $result;
my $message;
$message = taktuk::encode($taktuk::get_info, shift);
$result = taktuk::syswrite($control_channel_write,taktuk::pack($message));
if (not $result)
{
$error=ESWRIT;
return -1;
}
my $message_code;
($message_code,$message) = wait_message($taktuk::info,$taktuk::invalid);
if (defined($message_code))
{
if ($message_code eq $taktuk::invalid)
{
$error=EINVAL;
return -1;
}
else
{
$error=ENOERR;
}
return $message;
}
else
{
return -1;
}
}
sub decode_set($)
{
my $string = shift;
my @result = ();
my $interval;
my $i;
($interval,$string) = split /\//,$string,2;
while (length($interval))
{
my ($min, $max) = split /-/,$interval,2;
my @replacement = ();
my $state = 0;
my $length = 0;
$i = $#result;
$max = $min if not $max;
return () if ($min !~ /^\d+$/o) or ($max !~ /^\d+$/o) or
($min > $max);
while (($i > -1) and ($max < $result[$i]))
{
$i--;
$state = 1 - $state;
}
unshift(@replacement, $max) if not $state;
while (($i > -1) and ($min <= $result[$i]))
{
$i--;
$state = 1 - $state;
$length++;
}
unshift(@replacement, $min) if not $state;
$i++;
splice @result, $i, $length, @replacement;
if (defined($string))
{
($interval,$string) = split /\//,$string,2;
}
else
{
$interval = "";
}
}
for ($i=1; $i<$#result; $i+=2)
{
splice @result, $i, 2
while (($i<$#result) and ($result[$i]+1 == $result[$i+1]));
}
return @result;
}
sub encode_set(@)
{
my @set = @_;
my $result = "";
while (scalar(@set))
{
my $min = shift @set;
my $max = shift @set;
$result .= "/" if ($result);
if ($min == $max)
{
$result .= "$min";
}
else
{
$result .= "$min-$max";
}
}
return $result;
}
package command;
use strict; use bytes;
use Fcntl;
sub new (%)
{
my $data = { @_ };
if (exists($data->{line}) and
(exists($data->{read}) or exists($data->{write}) or
exists($data->{error}) or exists($data->{type})))
{
diagnostic::error("Command built from both a command line and fds");
}
bless($data);
return $data;
}
sub run ($)
{
my $self=shift;
my $command=shift;
my ($father_read,$father_write,$father_error);
my ($child_read,$child_write,$child_error);
undef($father_read);
undef($father_write);
undef($child_read);
undef($child_write);
if (settings::USE_SOCKETPAIR)
{
if (not socketpair($father_read, $child_read, AF_UNIX, SOCK_STREAM,
PF_UNSPEC))
{
diagnostic::system;
return 0;
}
$father_write = $father_read;
$child_write = $child_read;
$self->{type} = settings::SOCKET;
}
else
{
if (not pipe($child_read, $father_read))
{
diagnostic::system;
return 0;
}
if (not pipe($father_write, $child_write))
{
diagnostic::system;
return 0;
}
$self->{type} = settings::FD;
}
undef($father_error);
undef($child_error);
if (not pipe($father_error,$child_error))
{
diagnostic::system;
return 0;
}
$self->{start_date} = timer::current_time;
my $pid;
$pid = fork;
if (not defined($pid))
{
diagnostic::system;
return 0;
}
$self->{pid} = $pid;
if ($pid)
{
diagnostic::debug("New child with pid $pid created".
" to execute $self->{line}");
CORE::close($child_read) or diagnostic::system;
(CORE::close($child_write) or diagnostic::system)
if not settings::USE_SOCKETPAIR;
CORE::close($child_error) or diagnostic::system;
$self->{read} = $father_read;
$self->{write} = $father_write;
$self->{error} = $father_error;
return $self;
}
else
{
setpgrp($$,$$) or diagnostic::system;
if ($command)
{
my $control = communicator::get_control;
fcntl($control->{read}, F_SETFD, 0) or die $!;
fcntl($control->{write}, F_SETFD, 0) or die $!;
}
CORE::close($father_read) or die $!;
(CORE::close($father_write) or die $!)
if not settings::USE_SOCKETPAIR;
CORE::close($father_error) or die $!;
open(STDIN,"<&",$child_read) or die $!;
open(STDOUT,">&",$child_write) or die $!;
open(STDERR,">&",$child_error) or die $!;
CORE::close($child_read) or die $!;
CORE::close($child_error) or die $!;
exec($self->{line}) or die "Exec failed : $!";
}
}
sub cleanup ()
{
my $self = shift;
foreach my $part ('read','write','error')
{
$self->close($part) if defined $self->{$part};
}
if (exists($self->{pid}))
{
if (not $self->isa('connector') and exists($self->{line}))
{
if (not exists($self->{status}))
{
diagnostic::debug(
"Cleaning up $self by waiting pid $self->{pid}");
waitpid($self->{pid},0) or diagnostic::system;
$self->{status} = $?;
}
$self->output('status', $self->{status});
$self->output('state', synchronizer::COMMAND_TERMINATED);
}
}
}
sub my_shutdown($$)
{
my $fd = shift;
my $how = shift;
if (not shutdown($fd,$how))
{
diagnostic::system if not $!{ENOTCONN};
}
}
sub close($)
{
my $self=shift;
my $part=shift;
if ($part eq 'error')
{
if (exists($self->{error}))
{
taktuk::flush_buffer($self->{error});
CORE::close($self->{error}) or diagnostic::system;
delete $self->{error};
}
else
{
diagnostic::debug("Cannot close error : does not exists");
}
}
else
{
if (exists($self->{type}) and exists($self->{$part}))
{
if ($self->{type} == settings::FD)
{
taktuk::flush_buffer($self->{write}) if $part eq 'write';
CORE::close($self->{$part}) or diagnostic::system;
delete $self->{$part};
}
elsif ($self->{type} == settings::SOCKET)
{
if ($part eq 'read')
{
my_shutdown($self->{read},1);
delete $self->{read};
}
elsif ($part eq 'write')
{
taktuk::flush_buffer($self->{write});
my_shutdown($self->{write},0);
delete $self->{write};
}
else
{
diagnostic::warning("Invalid part for socket close");
}
}
else
{
diagnostic::warning("Unknown command type : BUG");
}
}
else
{
diagnostic::warning("Closing a non existant command type or part")
unless (exists($self->{type}) and ($part eq 'read'));
} 
}
}
our $user_scalar=undef;
sub output ($$)
{
my $self = shift;
my $type = shift;
my $message = shift;
my $template;
my $fd;
if (exists($general::template->{$type}))
{
$template = $general::template->{$type};
}
else
{
$template = $general::template->{default};
}
if (exists($general::redirect->{$type}))
{
$fd = $general::redirect->{$type};
}
else
{
$fd = $general::redirect->{default};
}
if (defined($template))
{
my $command = exists($self->{line})?$self->{line}:"";
my $count = $general::taktuk{count};
my $filename = $diagnostic::filename;
my $host = $general::host;
my $level = $diagnostic::level;
my $level_name = $diagnostic::level_name;
my $line_number = $diagnostic::line_number;
my $package = $diagnostic::package;
my $peer = exists($self->{peer})?$self->{peer}:"";
my $peer_position = exists($self->{position})?$self->{position}:"";
my $pid = exists($self->{pid})?$self->{pid}:$$;
my $position = $general::position;
my $rank = $general::taktuk{rank};
my $release = $taktuk::RELEASE;
my $start_date = exists($self->{start_date})?$self->{start_date}:"";
my $stop_date = exists($self->{stop_date})?$self->{stop_date}:"";
my $reply_date = exists($self->{reply_date})?$self->{reply_date}:"";
my $init_date = exists($self->{init_date})?$self->{init_date}:"";
my $line;
my $eol;
my $to_be_sent = "";
pos($message) = 0;
while ($message =~ /\G([^\n]*)(\n|$)/go)
{
$line = $1;
$eol = $2;
if (length($line) or length($eol))
{
my $result = eval("no strict 'vars'; $template");
$to_be_sent .= $result if defined($result) and length($result);
}
}
if (length($to_be_sent))
{
communicator::process_message($self,
taktuk::encode($taktuk::forward_up,
taktuk::encode($taktuk::output,
taktuk::pack($fd).$to_be_sent)));
}
}
}
sub event_msg($)
{
my $event = shift;
if ($event == synchronizer::CONNECTION_FAILED)
{
return "Connection failed";
}
elsif ($event == synchronizer::CONNECTION_INITIALIZED)
{
return "Connection initialized";
}
elsif ($event == synchronizer::CONNECTION_LOST)
{
return "Connection lost";
}
elsif ($event == synchronizer::TAKTUK_READY)
{
return "TakTuk is ready";
}
elsif ($event == synchronizer::TAKTUK_NUMBERED)
{
return "TakTuk is numbered";
}
elsif ($event == synchronizer::TAKTUK_TERMINATED)
{
return "TakTuk is terminated";
}
elsif ($event == synchronizer::COMMAND_STARTED)
{
return "Command started";
}
elsif ($event == synchronizer::COMMAND_FAILED)
{
return "Command failed";
}
elsif ($event == synchronizer::COMMAND_TERMINATED)
{
return "Command terminated";
}
elsif ($event == synchronizer::UPDATE_FAILED)
{
return "Numbering update failed";
}
elsif ($event == synchronizer::PIPE_STARTED)
{
return "Pipe input started";
}
elsif ($event == synchronizer::PIPE_FAILED)
{
return "Pipe input failed";
}
elsif ($event == synchronizer::PIPE_TERMINATED)
{
return "Pipe input terminated";
}
elsif ($event == synchronizer::FILE_RECEPTION_STARTED)
{
return "File reception started";
}
elsif ($event == synchronizer::FILE_RECEPTION_FAILED)
{
return "File reception failed";
}
elsif ($event == synchronizer::FILE_RECEPTION_TERMINATED)
{
return "File reception terminated";
}
else
{
return "Unknown event code";
}
}
package connector;
use strict; use bytes;
use File::Basename;
our @ISA=qw(command);
our $init_string="Taktuk initialization, version ";
our $connector_functional_string="Taktuk connector functional";
our $ready_to_load_string="Taktuk ready to load";
our $pre_load=1;
our $first_load=0;
our $simple_connector=2;
our $initialized=3;
sub new (%)
{
my $data=command::new(@_,
data_handler=>\&communicator::process_messages,
remove_handler=>\&communicator::remove_connector);
return 0 if not $data;
$data->{last_given} = 0;
$data->{count} = 0;
if (exists($data->{peer}))
{
if ($data->{propagate})
{
$data->{state}=$pre_load;
}
else
{
$data->{state}=$simple_connector;
}
my $command_line = $general::connector_command;
$command_line .= " -l $data->{login}" if ($data->{login});
$data->{line} = "$command_line $data->{peer} $data->{taktuk}";
}
else
{
$data->{state} = $initialized;
}
bless($data);
return $data;
}
sub read_data ($)
{
my $self = shift;
my $descriptor = shift;
my $result = taktuk::read_data($descriptor);
(diagnostic::system and $result = 0) if not defined($result);
my $write = undef;
my $error = undef;
$write = $self->{write} if exists $self->{write};
$error = $self->{error} if exists $self->{error};
if ($write and ($descriptor == $write))
{
if (($result >0) and not $self->{timeouted})
{
if ($self->{state} < $initialized)
{
my $argument = 0;
$self->{reply_date} = timer::current_time
unless exists($self->{reply_date});
if ($self->{state} == $pre_load)
{
if (length(taktuk::find_sequence($descriptor,
".*$connector_functional_string")))
{
$argument = 1;
}
}
elsif ($self->{state} == $simple_connector)
{
my $sequence = taktuk::find_sequence($descriptor,
".*$init_string"."[0-9]+[.0-9]*");
if (length($sequence))
{
$argument = $sequence;
$argument =~ s/.* (.*)$/$1/o;
}
}
else
{
diagnostic::warning("Wrong connector state : ".
$self->{state});
}
if ($argument > 0)
{
my $connector_input = $self->{read};
if ($self->{state} < $simple_connector)
{
general::load_taktuk_code;
communicator::post_write($connector_input,
$general::taktuk_code) or diagnostic::system;
}
else
{
diagnostic::warning("Protocol versions do not match")
if $argument != $taktuk::RELEASE;
}
$self->{state} += 1;
diagnostic::debug("Connector $self->{line} promoted to ".
"state ".$self->{state});
synchronizer::initialization_complete($self)
if ($self->{state} == $initialized);
}
}
}
}
elsif ($error and ($descriptor == $error))
{
if ($result >0)
{
my $new_data = taktuk::flush_buffer($descriptor);
$self->output('connector', $new_data);
} 
}
else
{
diagnostic::warning("Unknown descriptor");
}
return $result;
}
sub send_parameters()
{
my $self = shift;
$self->send_message(taktuk::encode($taktuk::option, "L".$self->{peer}));
if ($self->{propagate})
{
general::load_taktuk_code;
$self->send_message(taktuk::encode($taktuk::taktuk_code,
$general::taktuk_code));
}
$self->send_message(taktuk::encode($taktuk::position,"$self->{position}"));
if (length($self->{files}))
{
foreach my $transfer (split /,/,$self->{files})
{
my ($source, $destination) = split /:/, $transfer;
$self->send_file("", $source, $destination);
}
} 
my $arguments = $self->{arguments};
diagnostic::debug("Arguments : ".$arguments);
$self->send_message(taktuk::encode($taktuk::arguments, $arguments));
}
sub get_message ()
{
my $self = shift;
if ($self->{state} >= $initialized)
{
if (exists($self->{write}))
{
return taktuk::get_message($self->{write});
}
else
{
return "";
}
}
else
{
return "";
}
}
sub send_message($)
{
my $self = shift;
my $write_fd = $self->{read};
my $message = shift;
my $full_message = taktuk::pack($message);
if (not communicator::post_write($write_fd, $full_message))
{
diagnostic::error("Error in send message [$full_message], error : $!");
}
}
sub route_file_part($$)
{
my $self = shift;
my $prefix = shift;
my $message = shift;
if ($prefix)
{
$message = taktuk::encode($prefix, $message);
communicator::process_message($self, $message);
}
else
{
$self->send_message($message);
}
}
sub send_file($$$)
{
my $self = shift;
my $prefix = shift;
my $source = shift;
my $destination = shift;
my $permissions;
my $file = undef;
my $result;
my $type;
$source = general::expand($source, position=>$general::position,
host=>$general::host,
rank=>$general::taktuk{rank});
if (-d $source)
{
$type = 1;
$result = open($file,"cd $source && tar c . |");
}
else
{
$type = 0;
$result = open($file, $source);
}
if ($result)
{
my $read_result;
my $buffer;
my $message;
binmode($file) or diagnostic::system;
$permissions=(stat($source))[2] & 07777;
$message = taktuk::pack($general::position).
taktuk::pack($general::host).
taktuk::pack($general::taktuk{rank}).
taktuk::pack($type).
taktuk::pack($permissions).
taktuk::pack(basename($source)).
$destination;
$message = taktuk::encode($taktuk::file, $message);
$self->route_file_part($prefix, $message);
while ($read_result = sysread($file, $buffer, $taktuk::read_size/2))
{
$message = taktuk::encode($taktuk::file,
taktuk::pack($general::position).$buffer);
$self->route_file_part($prefix, $message);
}
diagnostic::system if not defined($read_result);
$message = taktuk::encode($taktuk::file,
taktuk::pack($general::position));
$self->route_file_part($prefix, $message);
CORE::close($file) or diagnostic::system;
}
else
{
diagnostic::system;
}
}
our @pack_fields = ('login', 'peer', 'taktuk', 'arguments', 'position',
'propagate', 'files', 'timeout');
sub pack()
{
my $self = shift;
my $result = "";
foreach my $field (@pack_fields)
{
$result .= taktuk::pack($self->{$field});
}
return $result;
}
sub unpack($)
{
my $buffer = shift;
my %data = ();
foreach my $field (@pack_fields)
{
($data{$field}, $buffer) = taktuk::unpack($buffer);
}
my $connector = connector::new(%data);
return ($connector, $buffer);
}
sub cancel()
{
my $connector = shift;
communicator::remove_descriptor($connector, $connector->{write})
if (exists($connector->{write}));
communicator::remove_descriptor($connector, $connector->{error})
if (exists($connector->{error}));
CORE::kill 9, -$connector->{pid};
}
package communicator;
use strict; use bytes;
use Errno;
use Fcntl;
our $sinks_number = 0;
our $initialized_sinks_number = 0;
our $select;
our $pending_writes_select;
our $select_timeout;
our $connections;
our $control_connector = 0;
our $end = 0;
our %c_from_fd;
our %pending_writes;
our $pending_writes_number = 0;
our %pending_termination_handler;
our %pending_termination_argument;
our $default_root = connector::new('read' =>\*STDOUT,
'type' =>settings::FD);
sub init()
{
$select = my_select::new() or diagnostic::error("Select creation");
$pending_writes_select =
my_select::new() or diagnostic::error("Select creation");
$connections = {};
$connections->{sources} = [];
$connections->{sinks} = [];
$connections->{control} = [];
$connections->{local_commands} = [];
$connections->{pipe} = [];
my $communicator_read = undef;
my $communicator_write = undef;
my $control_read = undef;
my $control_write = undef;
my $type = undef;
if (settings::USE_SOCKETPAIR)
{
if (not socketpair($communicator_read, $control_read,
AF_UNIX, SOCK_STREAM, PF_UNSPEC))
{
diagnostic::system;
diagnostic::error("Degradated mode (no control channel)");
}
$communicator_write = $communicator_read;
$control_write = $control_read;
$type = settings::SOCKET;
}
else
{
if (not pipe($communicator_read, $control_write) or
not pipe($control_read, $communicator_write))
{
diagnostic::system;
diagnostic::error("Degradated mode (no control channel)");
}
$type = settings::FD;
}
taktuk::no_flush($communicator_write);
taktuk::no_flush($control_write);
my $communicator_connector = connector::new('read' =>$communicator_write,
'write' =>$communicator_read,
'type' =>$type);
$communicator_connector->{pending_messages} = 0;
add_connector($communicator_connector, 'control');
$ENV{TAKTUK_CONTROL_READ} = fileno($control_read);
$ENV{TAKTUK_CONTROL_WRITE} = fileno($control_write);
$control_connector = connector::new('read' =>$control_write,
'write' =>$control_read,
'type' =>$type);
}
sub get_connections($)
{
my $set_name = shift;
my $list = $connections->{$set_name};
return @$list;
}
sub get_root()
{
my $sources = $connections->{sources};
if (defined($sources) and scalar(@$sources))
{
return $sources->[0];
}
else
{
return $default_root;
}
}
sub get_control()
{
return $control_connector;
}
sub get_outgoing()
{
my $sources = $connections->{control};
if (scalar(@$sources))
{
return $sources->[0];
}
else
{
return 0;
}
}
sub process_message($$)
{
my $connector = shift;
my ($message, $body) = taktuk::decode(shift);
my $function = handlers::get_handler($message);
if (defined($function))
{
&$function($message, $connector, $body);
}
else
{
diagnostic::warning("Unknown message : $message (body: $body)");
}
}
sub process_messages($$)
{
my $connector = shift;
my $descriptor = shift;
my $result = $connector->read_data($descriptor);
my $message = $connector->get_message;
while ($message)
{
process_message($connector, $message);
$message = $connector->get_message;
}
return $result;
}
sub process_command_output($$)
{
my $command = shift;
my $descriptor = shift;
my $buffer;
my $read_result;
$read_result = sysread($descriptor, $buffer, $taktuk::read_size);
(diagnostic::system and $read_result = 0) if not defined($read_result);
if ($read_result > 0)
{
my $type;
if (exists($command->{write}) and ($descriptor == $command->{write}))
{
$type = "output";
}
else
{
$type = "error";
}
$command->output($type, $buffer);
}
return $read_result;
}
sub process_pipe_output($$)
{
my $command = shift;
my $descriptor = shift;
my $buffer;
my $read_result;
if (not $end)
{
$read_result = sysread($descriptor, $buffer, $taktuk::read_size);
(diagnostic::system and $read_result = 0) if not defined($read_result);
if ($read_result > 0)
{
process_message($command, taktuk::encode($command->{message},
$buffer));
}
return $read_result;
}
else
{
return 1;
}
}
sub run()
{
my @select_result;
my $sinks = $connections->{sinks};
my $local_commands = $connections->{local_commands};
while ((not $end) or scalar(@$sinks) or scalar(@$local_commands) or
$pending_writes_number)
{
@select_result = my_select::select(
$select,$pending_writes_select,undef,$select_timeout);
if (scalar(@select_result))
{
my ($read_set, $write_set, $exception_set) = @select_result;
while (scalar(@$read_set))
{
my $descriptor = shift @$read_set;
my $cobidule = $c_from_fd{$descriptor};
if (exists($cobidule->{data_handler}))
{
my $handler = $cobidule->{data_handler};
my $result = &$handler($cobidule, $descriptor);
remove_descriptor($cobidule, $descriptor) if (not $result);
}
else
{
diagnostic::error("Bug : connector has no data handler");
}
}
while (scalar(@$write_set))
{
my $descriptor = shift @$write_set;
if (exists($pending_writes{$descriptor}))
{
my $list = $pending_writes{$descriptor};
my $chunk = shift @$list;
my ($file, $data, $length, $offset) = @$chunk;
my $result;
$result = CORE::syswrite($file, $data, $length, $offset);
if ($result)
{
$length -= $result;
$offset += $result;
}
if ((defined($result) and $length) or $!{EAGAIN})
{
$chunk->[2] = $length;
$chunk->[3] = $offset;
unshift @$list, $chunk;
}
else
{
diagnostic::system unless defined($result);
$pending_writes_number--;
}
if (not scalar(@$list))
{
cleanup_pending_stuff($descriptor);
}
}
}
if (scalar(@$exception_set))
{
diagnostic::warning("Unexpected exceptional fds : ".
join(' ', @$exception_set));
}
}
else
{
if ($!{EBADF})
{
my $error_msg="Selected an invalid descriptor ($!)... Very ".
"bad\nRegistered handles = ";
foreach my $handle ($select->handles)
{
$error_msg.="$handle (".fileno($handle).") ";
}
diagnostic::error($error_msg);
exit 1;
}
elsif ($!{EINTR})
{
diagnostic::warning("Select exited because of a signal");
}
elsif ($!{EINVAL})
{
diagnostic::error("Invalid time limit for select");
}
else
{
}
}
timer::check_timeouts;
}
}
sub terminate ()
{
$end = 1;
}
sub add_descriptors($)
{
my $command = shift;
diagnostic::debug("Adding $command to hash & select");
if (exists($command->{write}))
{
$select->add($command->{write});
$c_from_fd{$command->{write}} = $command;
binmode($command->{write}) or diagnostic::system;
}
if (exists($command->{error}))
{
$select->add($command->{error});
$c_from_fd{$command->{error}} = $command;
binmode($command->{error}) or diagnostic::system;
}
}
sub remove_descriptor($$)
{
my $cobidule = shift;
my $descriptor = shift;
my $type;
my $other;
diagnostic::debug("Removing $cobidule / $descriptor");
my $read = undef;
my $write = undef;
my $error = undef;
$read = $cobidule->{read} if exists $cobidule->{read};
$write = $cobidule->{write} if exists $cobidule->{write};
$error = $cobidule->{error} if exists $cobidule->{error};
if (defined($descriptor))
{
if (defined($write) and ($write == $descriptor))
{
$other = $error;
$type = 'write';
}
elsif (defined($error) and ($error == $descriptor))
{
$other = $write;
$type = 'error';
}
else
{
diagnostic::error("Invalid descriptor, serious BUG !");
}
if (exists $c_from_fd{$descriptor})
{
$select->remove($descriptor);
delete $c_from_fd{$descriptor};
$cobidule->close($type);
if ((not defined($other)) or (not exists $c_from_fd{$other}))
{
if (exists($cobidule->{remove_handler}))
{
if (defined($read))
{
cleanup_pending_stuff($read)
			 if exists($pending_writes{$read});
		 $cobidule->close('read');
}
$cobidule->{stop_date} = timer::current_time;
my $remove_handler = $cobidule->{remove_handler};
&$remove_handler($cobidule);
}
}
}
else
{
diagnostic::warning("Descriptor not present in hash");
}
}
else
{
diagnostic::warning("Should not be called with undefined descriptor");
}
}
sub no_pending_connectors()
{
diagnostic::debug("Connectors situation, sinks_number : $sinks_number, ".
"initialized : $initialized_sinks_number");
dagnostic::warning("More initialized than sinks")
if ($sinks_number<$initialized_sinks_number);
return $sinks_number == $initialized_sinks_number;
}
sub add_connector($$)
{
my $connector = shift;
my $set_name = shift;
my $set = $connections->{$set_name};
if (exists($connector->{read}))
{
taktuk::no_flush($connector->{read});
fcntl($connector->{read}, F_SETFL, O_NONBLOCK) or diagnostic::system;
}
add_descriptors($connector);
push(@$set,$connector);
$sinks_number++ if $set_name eq 'sinks';
diagnostic::debug("Connector $connector->{line} added")
if exists($connector->{line});
}
sub connector_initialized($)
{
my $connector = shift;
$initialized_sinks_number++;
}
sub remove_from_set($$)
{
my $cobidule = shift;
my $set_name = shift;
my $set = $connections->{$set_name};
my $i = 0;
$i++ while (($i <= $#$set) and ($set->[$i] != $cobidule));
if ($i <= $#$set)
{
splice @$set, $i, 1;
diagnostic::debug("Cobidule $cobidule, ".
($cobidule->{line}?$cobidule->{line}." ":"").
($cobidule->{pid}?$cobidule->{pid}." ":"").
"deleted from set $set_name at position $i");
return 1;
}
else
{
return 0;
}
}
sub remove_connector($)
{
my $connector = shift;
my @sets_names = ('sinks', 'sources', 'control');
my $current_set_name = shift @sets_names;
my $found = 0;
diagnostic::debug("In remove_connector");
while (not $found and defined($current_set_name))
{
$found = remove_from_set($connector, $current_set_name);
$current_set_name = shift @sets_names if not $found;
}
if ($found)
{
if ($current_set_name eq 'sources')
{
if (not scalar(@{$connections->{sources}}))
{
exit 1;
}
}
elsif ($current_set_name eq 'control')
{
if ($general::root)
{
process_message(get_root, $main::quit_message) unless $end;
}
else
{
diagnostic::warning("Lost my control connector");
}
}
elsif ($current_set_name eq 'sinks')
{
$sinks_number--;
if ($connector->{state} >= $connector::initialized)
{
$connector->output('state', synchronizer::CONNECTION_LOST)
unless $communicator::end;
$initialized_sinks_number--;
}
else
{
synchronizer::initialization_failed($connector);
}
handlers::check_ongoing_reduces($connector);
}
$connector->cleanup;
}
else
{
diagnostic::warning("Connector to remove not found !");
}
} 
sub remove_local_command($)
{
my $command = shift;
my $found = remove_from_set($command, 'local_commands');
if ($found)
{
if (exists($command->{timers}))
{
my $timers = $command->{timers};
foreach my $timer (@$timers)
{
$timer->unregister if not $timer->{elapsed};
}
}
$command->cleanup;
}
else
{
diagnostic::warning("Command to remove not found !");
}
}
sub remove_pipe($)
{
my $command = shift;
my $found = remove_from_set($command, 'pipe');
if ($found)
{
$command->cleanup;
communicator::get_root->output('state', synchronizer::PIPE_TERMINATED);
}
else
{
diagnostic::warning("Pipe to remove not found !");
}
}
sub post_write($$@)
{
my $file = shift;
my $data = shift;
my $length = scalar(@_)?shift:length($data);
my $offset = scalar(@_)?shift:0;
my $result = 1;
(diagnostic::warning("Null length write") and return) if not $length;
diagnostic::warning("Null file") if not $file;
if (not exists($pending_writes{$file}))
{
$result = CORE::syswrite($file, $data, $length, $offset);
if ($result)
{
$length -= $result;
$offset += $result;
}
}
if ((defined($result) and $length) or $!{EAGAIN})
{
if (not exists($pending_writes{$file}))
{
$pending_writes{$file} = [];
$pending_writes_select->add($file);
}
my $list = $pending_writes{$file};
push @$list, [ $file, $data, $length, $offset ];
$pending_writes_number++;
return 1;
}
else
{
diagnostic::system unless defined($result);
return $result;
}
}
sub post_close($)
{
my $command = shift;
if (exists($command->{read}))
{
post_termination($command->{read}, \&close_command_read, $command);
}
}
sub post_termination($$$)
{
my $descriptor = shift;
my $handler = shift;
my $handler_arg = shift;
if (exists($pending_writes{$descriptor}))
{
$pending_termination_handler{$descriptor} = $handler;
$pending_termination_argument{$descriptor} = $handler_arg;
}
else
{
&$handler($handler_arg);
}
}
sub cleanup_pending_stuff($)
{
my $descriptor = shift;
if (exists($pending_writes{$descriptor}))
{
$pending_writes_number -= scalar(@{$pending_writes{$descriptor}});
$pending_writes_select->remove($descriptor);
delete($pending_writes{$descriptor});
if (exists($pending_termination_handler{$descriptor}))
{
my $handler = $pending_termination_handler{$descriptor};
my $argument = $pending_termination_argument{$descriptor};
&$handler($argument);
delete($pending_termination_handler{$descriptor});
delete($pending_termination_argument{$descriptor});
}
}
}
sub close_command_read($)
{
my $command = shift;
if (exists($command->{read}))
{
$command->close('read');
}
}
package stats_buffer;
use strict; use bytes;
sub new($$)
{
my $type = shift;
my $depth = shift;
my $data = { depth=>$depth, data=>[] };
bless ($data, $type);
return $data;
}
sub is_empty($)
{
my $self = shift;
return not scalar(@{$self->{data}});
}
sub add($$)
{
my $self = shift;
my $value = shift;
my $data = $self->{data};
push @$data, $value;
if ($#$data > $self->{depth})
{
shift @$data;
}
}
sub average($)
{
my $self = shift;
my $average;
my $data = $self->{data};
if (scalar(@$data))
{
$average = 0;
foreach my $value (@$data)
{
$average += $value;
}
return $average / scalar(@$data);
}
else
{
return undef;
}
}
sub min($)
{
my $self = shift;
my $data = $self->{data};
my $min;
if (scalar(@$data))
{
$min = $self->{data}[0];
foreach my $value (@$data)
{
$min = $value if $min > $value;
}
return $min;
}
else
{
return undef;
}
}
sub max($)
{
my $self = shift;
my $data = $self->{data};
my $max;
if (scalar(@$data))
{
$max = $self->{data}[0];
foreach my $value (@$data)
{
$max = $value if $max < $value;
}
return $max;
}
else
{
return undef;
}
}
package scheduler;
use strict; use bytes;
our $dynamic_limit;
our $window;
our $window_adaptation;
our @static_connectors = ();
our @dynamic_connectors = ();
our @waiting_thieves = ();
our $current_window = 0;
our $arity = 0;
our $steal_request_sent = 0;
our $worksteal;
our %stats = (connections=>stats_buffer->new(5), shifts=>stats_buffer->new(5));
our $last_time = undef;
our $threshold_low = 0.1;
our $threshold_high = 0.2;
our $max_increase = 1.5;
sub deploy_connector($)
{
my $connector = shift;
if ($connector->run(0))
{
my $timer = timer->register($connector->{timeout},
\&synchronizer::initialization_timeout);
$connector->{timer} = $timer;
$timer->{connector} = $connector;
communicator::add_connector($connector, 'sinks');
diagnostic::debug("Connector added : ".$connector->{line});
$arity++;
$current_window++;
synchronizer::set_not_ready();
}
else
{
diagnostic::warning("Giving up connection to $connector->{peer}");
}
}
sub add_connector($)
{
my $connector = shift;
if (($current_window < $window) and
(($dynamic_limit <= 0) or ($arity < $dynamic_limit)))
{
deploy_connector($connector);
}
else
{
if ($dynamic_limit < 0)
{
push @static_connectors, $connector;
}
else
{
push @dynamic_connectors, $connector;
}
}
}
sub connector_initialized($)
{
my $connector = shift;
my $time = $connector->{init_date} - $connector->{start_date};
if ($window_adaptation)
{
my $threshold;
my $current_time;
my $shift = 0;
$stats{connections}->add($time);
$threshold = $stats{connections}->min?
$time/$stats{connections}->min - 1:0;
$current_time = timer::current_time;
if ($last_time)
{
$shift = $current_time - $last_time;
$stats{shifts}->add($shift);
}
$last_time = $current_time;
if ($threshold)
{
my $adapt = 0;
my $old_window = $window;
if (($threshold < $threshold_low) and $stats{shifts}->average)
{
my $max_window;
$max_window = floor($window * $max_increase);
$window = floor($stats{connections}->min / 
$stats{shifts}->average);
$window = $max_window if $window > $max_window;
}
elsif ($threshold > $threshold_high)
{
$window -= floor($threshold*$window);
}
$window = 1 if $window < 1;
diagnostic::debug("Window gone from $old_window to $window");
}
}
$current_window--;
schedule;
}
sub connector_failed($)
{
my $connector = shift;
$current_window--;
$arity--;
schedule;
}
sub schedule()
{
while (scalar(@static_connectors) and ($current_window < $window))
{
my $connector = shift @static_connectors;
deploy_connector($connector);
}
while (scalar(@dynamic_connectors) and ($current_window < $window)
and (($dynamic_limit == 0) or ($arity < $dynamic_limit)))
{
my $connector = shift @dynamic_connectors;
deploy_connector($connector);
}
if (not $general::root and ($current_window < $window)
and (($dynamic_limit == 0) or ($arity < $dynamic_limit)) and
not $steal_request_sent)
{
communicator::get_root->send_message(
taktuk::encode($taktuk::steal, ""));
diagnostic::debug("Steal request sent");
$steal_request_sent = 1;
}
else
{
diagnostic::debug("Nothing done in scheduler, static work : "
.scalar(@static_connectors).", dynamic work : "
.scalar(@dynamic_connectors));
}
}
sub is_idle()
{
return !scalar(@static_connectors) && !scalar(@dynamic_connectors);
}
sub send_work()
{
my $work_to_give = scalar(@dynamic_connectors) && scalar(@waiting_thieves);
my $work_given = $work_to_give;
diagnostic::debug("Theft, dynamic connectors : ".scalar(@dynamic_connectors)
.", waiting thieves : ".scalar(@waiting_thieves)
.", work to give : $work_to_give"); 
while ($work_to_give)
{
my $thief = shift @waiting_thieves;
my $available = scalar(@dynamic_connectors);
my $last_given = $thief->{last_given};
my $limit = eval($worksteal->{limit});
my $to_give = 0;
my $given=0;
my $workpack="";
if ($last_given)
{
$to_give = eval($worksteal->{growth});
}
else
{
$to_give = eval($worksteal->{initial});
}
$to_give = $limit if $to_give > $limit;
for ($given=0; $given < $to_give; $given++)
{
my $connector_to_deploy = shift @dynamic_connectors;
$workpack = $workpack.$connector_to_deploy->pack();
}
$thief->send_message(taktuk::encode($taktuk::work,
$workpack));
diagnostic::debug("Work sent : ".$workpack);
$work_to_give = scalar(@dynamic_connectors) && scalar(@waiting_thieves);
}
return $work_given;
} 
sub dispatch_work($)
{
my $workpack = shift;
my $connector;
while ($workpack)
{
($connector, $workpack) = connector::unpack($workpack);
diagnostic::debug("Unpacked connector: $connector");
push @dynamic_connectors, $connector; 
}
$steal_request_sent = 0;
schedule;
send_work;
}
sub theft_handler($$)
{
my $connector = shift;
my $parameters = shift;
push @waiting_thieves, $connector;
send_work;
if (scalar(@waiting_thieves) and not $general::root and
not $steal_request_sent)
{
communicator::get_root->send_message(
taktuk::encode($taktuk::steal, $parameters));
diagnostic::debug("Forwarded steal request, parameters : $parameters");
$steal_request_sent = 1;
}
elsif (scalar(@waiting_thieves))
{
if ($general::root)
{
diagnostic::debug("Cannot satisfy steal request : no more dynamic ".
"work");
}
elsif ($steal_request_sent)
{
diagnostic::debug("Request already sent");
}
}
}
package timer;
use strict; use bytes;
our $has_timehires = eval("use Time::HiRes;1")?1:0;
our @registered_timers;
sub current_time()
{
if ($has_timehires)
{
my ($seconds, $micro) = Time::HiRes::gettimeofday();
return $seconds + $micro/1000000;
}
else
{
return time;
}
}
sub register($$)
{
my $type = shift;
my $timeout = shift;
my $handler = shift;
my $current = current_time;
my $data = { 'handler'=>$handler,
'birth'=>$current,
'elapsed'=>0 };
bless($data, $type);
if ($timeout)
{
$data->{timeout} = $current+$timeout;
my $i=$#registered_timers;
$i-- while (($i >= 0) and
($registered_timers[$i]->{timeout} > $data->{timeout}));
splice @registered_timers, $i+1, 0, $data;
}
else
{
$data->{timeout} = 0;
}
diagnostic::debug("Registered new timer $data, ".$data->print.
", list = @registered_timers");
return $data;
}
sub check_timeouts()
{
my $current = current_time;
while (scalar(@registered_timers) and
($registered_timers[0]->{timeout} <= $current))
{
my $timer = shift @registered_timers;
$timer->{elapsed} = 1;
my $handler = $timer->{handler};
&$handler($timer);
diagnostic::debug("Timeout handled for $timer, ".$timer->print);
}
}
sub unregister()
{
my $timer = shift;
if ($timer->{timeout})
{
my $i=0;
$i++ while (($i <= $#registered_timers) and
($registered_timers[$i] != $timer));
if ($i <= $#registered_timers)
{
splice @registered_timers, $i, 1;
diagnostic::debug("Unregistered timer $timer, remaining ".
"@registered_timers equals ".
scalar(@registered_timers));
}
else
{
diagnostic::warning("Unregistering didn't found timer $timer");
}
}
}
sub gettime($)
{
my $current = current_time;
my $timer = shift;
return $current - $timer->{birth};
}
sub print($)
{
my $current = current_time;
my $timer = shift;
return "Timer created at $timer->{birth} timeouting at ".
"$timer->{timeout} current is $current";
}
package synchronizer;
use strict; use bytes;
our @states = (1, 1);
our $father_is_ready=0;
our @ready_handlers = ();
our @pending_messages = ();
our %blocked;
our %event_waited;
sub check_ready_state()
{
if (!$states[TAKTUK_READY])
{
if ($father_is_ready && scheduler::is_idle &&
communicator::no_pending_connectors)
{
foreach my $connector (communicator::get_connections('sinks'))
{
$connector->send_message($taktuk::ready);
}
dispatch_event(TAKTUK_READY);
diagnostic::debug("I'm ready");
}
else
{
diagnostic::debug("I'm not ready : father_is_ready = ".
"$father_is_ready, is_idle = ".
scheduler::is_idle.", no_pending_connector = ".
communicator::no_pending_connectors);
}
}
}
sub set_not_ready()
{
if ($states[TAKTUK_READY])
{
$states[TAKTUK_READY] = 0;
block_until_event(TAKTUK_READY, $taktuk::broadcast,
$taktuk::spread);
}
}
sub initialization_complete($)
{
my $connector = shift;
diagnostic::debug("Connector $connector->{line} initialized");
$connector->{init_date} = timer::current_time;
$connector->{timer}->unregister;
delete $connector->{timer};
communicator::connector_initialized($connector);
scheduler::connector_initialized($connector);
$connector->send_parameters;
$connector->output('state', CONNECTION_INITIALIZED);
check_ready_state;
}
sub initialization_failed($)
{
my $connector = shift;
if (exists($connector->{timer}))
{
$connector->{timer}->unregister;
delete $connector->{timer};
}
$connector->output('state', CONNECTION_FAILED);
scheduler::connector_failed($connector);
check_ready_state;
}
sub initialization_timeout($)
{
my $timer = shift;
my $connector = $timer->{connector};
delete $connector->{timer};
$connector->{timeouted} = 1;
if ($connector->{state} >= $connector::initialized)
{
diagnostic::warning("Bug, timeouted an initialized connector");
}
else
{
if (exists($connector->{read}))
{
communicator::cleanup_pending_stuff($connector->{read});
}
$connector->output('connector', "timeouted");
$connector->cancel;
}
}
sub set_not_numbered()
{
if ($states[TAKTUK_NUMBERED])
{
$states[TAKTUK_NUMBERED] = 0;
block_until_event(TAKTUK_NUMBERED, $taktuk::execute,
$taktuk::eof,
$taktuk::get,
$taktuk::input,
$taktuk::kill,
$taktuk::message,
$taktuk::put,
$taktuk::send_to,
$taktuk::synchronize,
$taktuk::quit,
$taktuk::taktuk_perl);
}
}
sub block_until_event($@)
{
my $event = shift;
my $handlers;
my $pending;
if (exists($blocked{$event}))
{
$handlers = $blocked{$event}->{handlers};
$pending = $blocked{$event}->{pending};
}
else
{
$handlers = {};
$pending = [];
$blocked{$event} = { handlers=>$handlers, pending=>$pending };
}
foreach my $message (@_)
{
if (exists($event_waited{$message}))
{
diagnostic::warning("Multiple blocking for $message");
}
else
{
$handlers->{$message} = handlers::get_handler($message);
handlers::replace_handler($message, \&handlers::handler_blocked);
$event_waited{$message} = $event;
}
}
}
sub dispatch_event($)
{
my $event = shift;
my $handlers = $blocked{$event}->{handlers};
my $pending_list = $blocked{$event}->{pending};
if (not $states[$event])
{
communicator::get_root->output('state', $event);
$states[$event] = 1;
foreach my $message (keys(%$handlers))
{
handlers::replace_handler($message, $handlers->{$message});
delete $event_waited{$message};
}
while (scalar(@$pending_list))
{
my $message = shift @$pending_list;
my $connector = shift @$pending_list;
my $body = shift @$pending_list;
my $function = handlers::get_handler($message);
&$function($message, $connector, $body);
diagnostic::debug("Processing $message with body $body");
}
delete $blocked{$event};
}
else
{
diagnostic::warning("Not in proper state to dispatch $event event");
}
}
sub add_pending_message($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
my $event = $event_waited{$message};
my $pending_list = $blocked{$event}->{pending};
push @$pending_list, $message, $connector, $body;
}
sub get_state($)
{
return $states[shift];
}
sub setup_synchronization()
{
if ($main::no_numbering)
{
if (get_state(TAKTUK_READY))
{
diagnostic::warning("Cannot synchronize an unnumbered ready ".
"TakTuk");
}
else
{
block_until_event(TAKTUK_READY, $taktuk::quit,
$taktuk::synchronize);
}
}
else
{
set_not_numbered;
}
}
package handlers;
use strict; use bytes;
use Fcntl;
our %handlers;
sub register_handler($$)
{
my $message = shift;
if (defined($handlers{$message}))
{
diagnostic::error("Handler already defined for $message");
}
else
{
$handlers{$message} = shift;
}
}
sub replace_handler($$)
{
my $message = shift;
if (defined($handlers{$message}))
{
$handlers{$message} = shift;
}
else
{
diagnostic::error("Handler not defined for $message");
}
}
sub get_handler($)
{
my $message = shift;
return $handlers{$message};
}
sub handler_blocked($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
synchronizer::add_pending_message($message, $connector, $body);
}
sub arguments($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
my $element;
my @arguments;
while ($body)
{
($element, $body) = taktuk::unpack($body);
push @arguments, $element;
}
arguments::fetch_arguments(@arguments);
arguments::parse_options;
main::process_commands;
scheduler::schedule;
}
sub broadcast($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
foreach my $other_connector (communicator::get_connections('sources'),
communicator::get_connections('sinks'))
{
if ($other_connector != $connector)
{
diagnostic::debug("Spreading message to $other_connector");
$other_connector->send_message(taktuk::encode(
$taktuk::spread, $body));
}
}
}
sub down($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
my $root_connector = communicator::get_root;
communicator::process_message($root_connector, $body);
}
sub eof($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
foreach my $command (communicator::get_connections('local_commands'))
{
communicator::post_close($command);
diagnostic::debug("Posted close inputs for command $command");
}
}
sub update_failed($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
communicator::get_root->output('state', synchronizer::UPDATE_FAILED)
if ($general::taktuk{rank} < 0);
synchronizer::dispatch_event(synchronizer::TAKTUK_NUMBERED)
}
sub execute_timeout($)
{
my $timer = shift;
my $message_list = $timer->{messages};
diagnostic::debug("Timeout triggered for $timer->{command}->{line}");
$ENV{TAKTUK_PID} = $timer->{command}->{pid};
if (not length($message_list))
{
CORE::kill 15, -$timer->{command}->{pid};
}
while (length($message_list))
{
my ($message, $action);
($message, $message_list) = taktuk::unpack($message_list);
($action, $message) = taktuk::decode($message);
if ($action eq $taktuk::action)
{
communicator::process_message(communicator::get_control, $message);
}
elsif ($action eq $taktuk::kill)
{
CORE::kill $message, -$timer->{command}->{pid};
}
else
{
diagnostic::error("Internal error : unknown timeout action");
}
}
}
sub execute($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
if ($general::root)
{
return 0;
}
else
{
my ($attributes, $line) = taktuk::unpack($body);
my $timers = [];
my $command = command::new(line=>$line,
data_handler=>\&communicator::process_command_output,
remove_handler=>\&communicator::remove_local_command,
timers=>$timers);
my $result;
while (length($attributes))
{
my $timeout_pack;
my $timeout;
($timeout_pack, $attributes) = taktuk::unpack($attributes);
diagnostic::error("Internal error") if not defined $timeout_pack;
($timeout, $timeout_pack) = taktuk::unpack($timeout_pack);
diagnostic::error("Internal error") if not defined $timeout;
my $timer = timer->register($timeout, \&execute_timeout);
$timer->{command} = $command;
$timer->{connector} = $connector;
$timer->{messages} = $timeout_pack;
push @$timers, $timer;
}
$ENV{TAKTUK_PIDS} = join(' ', map("$_->{pid}",
communicator::get_connections('local_commands')));
if ($result = $command->run(1))
{
communicator::add_connector($command, 'local_commands');
$command->output('state', synchronizer::COMMAND_STARTED);
}
else
{
diagnostic::warning("Giving up command $line");
$command->output('state', synchronizer::COMMAND_FAILED);
}
return $result;
}
}
our %filename;
our %file_perms;
our %file;
sub file_finalize($)
{
my $position = shift;
if (exists($file{$position}))
{
CORE::close($file{$position}) or diagnostic::system;
chmod $file_perms{$position}, $filename{$position}
or diagnostic::system;
}
}
sub file($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
my $position;
($position, $body) = taktuk::unpack($body);
if (not exists($file{$position}))
{
my ($host, $rank, $type, $permissions, $source, $destination);
($host, $body) = taktuk::unpack($body);
($rank, $body) = taktuk::unpack($body);
($type, $body) = taktuk::unpack($body);
($permissions, $body) = taktuk::unpack($body);
($source, $body) = taktuk::unpack($body);
$destination = $body;
my $result = undef;
$destination = general::expand($destination,
position=>$position,
host=>$host,
rank=>$rank);
if ($type)
{
if (-d $destination)
{
if (-d "$destination/$source" or mkdir "$destination/$source")
{
$filename{$position} = "$destination/$source";
}
else
{
diagnostic::system;
}
}
elsif (mkdir $destination)
{
$filename{$position} = $destination;
}
else
{ 
diagnostic::system;
}
if (exists($filename{$position}))
{
$result = open($file{$position},
"| cd $filename{$position} && tar x");
}
}
else
{
if (-d $destination)
{
$filename{$position} = "$destination/$source";
}
else
{
$filename{$position} = $destination;
}
$result = open($file{$position}, ">", $filename{$position});
}
if (defined($result))
{
binmode($file{$position}) or diagnostic::system;
$file_perms{$position} = $permissions;
fcntl($file{$position}, F_SETFL, O_NONBLOCK) or diagnostic::system;
$connector->output('state',synchronizer::FILE_RECEPTION_STARTED);
}
else
{
diagnostic::system;
$file{$position} = undef;
$connector->output('state',synchronizer::FILE_RECEPTION_FAILED);
}
}
else
{
if (not length($body))
{
if (defined($file{$position}))
{
communicator::post_termination($file{$position},
\&file_finalize, $position);
$connector->output('state',
synchronizer::FILE_RECEPTION_TERMINATED);
}
else
{
diagnostic::warning("End file message but no descritor");
}
delete($file{$position}) if exists($file{$position});
delete($file_perms{$position}) if exists($file_perms{$position});
delete($filename{$position}) if exists($filename{$position});
}
else
{
if (defined($file{$position}))
{
communicator::post_write($file{$position},$body) or
diagnostic::system;
}
else
{
diagnostic::warning("Undefined file descriptor for data");
}
}
}
}
sub forward_up($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
if ($general::root)
{
communicator::process_message($connector, $body);
}
else
{
foreach my $other_connector (communicator::get_connections('sources'))
{
$other_connector->send_message(
taktuk::encode($taktuk::forward_up, $body));
}
}
}
sub get($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
my $put_prefix;
my $get_prefix;
($get_prefix, $body) = taktuk::unpack($body);
$put_prefix=taktuk::encode($taktuk::send_to,
taktuk::pack($general::taktuk{rank}));
communicator::process_message($connector, taktuk::encode($get_prefix,
taktuk::encode($taktuk::put, taktuk::pack($put_prefix).$body)));
}
sub get_info($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
if (exists($general::taktuk{$body}))
{
$connector->send_message(taktuk::encode($taktuk::info,
$general::taktuk{$body}));
}
else
{
$connector->send_message($taktuk::invalid);
}
}
sub input($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
my $result;
foreach my $command (communicator::get_connections('local_commands'))
{
if (exists($command->{read}))
{
$result = communicator::post_write($command->{read}, $body);
}
else
{
$result = 1;
}
if (not $result)
{
if ($!{EPIPE})
{
diagnostic::debug("Broken pipe when sending input");
}
else
{
diagnostic::system;
}
}
}
}
sub kill($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
foreach my $command (communicator::get_connections('local_commands'))
{
CORE::kill $body, -$command->{pid};
}
}
sub message($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
my $control = communicator::get_outgoing;
$control->send_message(taktuk::encode($message, $body));
$control->{pending_messages}++;
if (($control->{pending_messages} == 0) and exists($control->{timer}))
{
$control->{timer}->unregister;
delete $control->{timer};
}
}
sub numbered($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
my ($rank, $count, $update) = split / /,$body;
diagnostic::debug("I'm $rank among $count");
$general::taktuk{rank} = $rank;
$ENV{TAKTUK_RANK} = $rank;
$general::taktuk{count} = $count;
$ENV{TAKTUK_COUNT} = $count;
my @connections = communicator::get_connections('sinks');
my $current = $rank+1;
$general::child_min = $current;
for (my $i=0; $i<=$#connections; $i++)
{
my $other_connector = $connections[$i];
if ($other_connector != $connector)
{
my $min = $current;
my $max = $current + $other_connector->{count} - 1;
if ($update and exists($other_connector->{min})
and (($min != $other_connector->{min})
or (($max != $other_connector->{max})
and ($i != $#connections))))
{
communicator::get_root->output('state',
synchronizer::UPDATE_FAILED);
$min = $other_connector->{min};
$max = $other_connector->{max};
$other_connector->send_message(taktuk::encode($taktuk::spread,
$taktuk::update_failed));
}
else
{
$other_connector->send_message(
taktuk::encode($taktuk::numbered,"$min $count $update"));
$other_connector->{min} = $min;
$other_connector->{max} = $max;
}
$current = $max+1;
}
else
{
diagnostic::warning("Bug : numbered coming from a sink");
}
}
$general::child_max = $current-1;
synchronizer::dispatch_event(synchronizer::TAKTUK_NUMBERED);
}
our %filehandles;
sub output($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
if ($general::root)
{
my ($fd, $remaining) = taktuk::unpack($body);
if (not exists($filehandles{$fd}))
{
open ($filehandles{$fd}, ">&=", $fd) or diagnostic::system;
(fcntl($filehandles{$fd}, F_SETFL, O_NONBLOCK)
or diagnostic::system) unless ($fd <= 2) and $general::root;
}
communicator::post_write($filehandles{$fd}, $remaining)
or diagnostic::system;
}
else
{
diagnostic::warning("Output message received on non root node");
}
}
sub option($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
my ($name, $value) = split //,$body,2;
diagnostic::debug("Setting $name to $value in $general::taktuk{rank}");
arguments::set_option($name, $value);
}
sub options($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
diagnostic::debug("Fetching [ $body ] as options line");
arguments::fetch_line($body);
arguments::parse_options();
}
sub pipe($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
my ($filename, $prefix) = taktuk::unpack($body);
my $fd = general::open_file($filename) or diagnostic::system;
if ($fd)
{
my $command = command::new(write=>$fd, type=>settings::FD,
data_handler=>\&communicator::process_pipe_output,
remove_handler=>\&communicator::remove_pipe,
message=>$prefix);
communicator::add_connector($command, 'pipe');
communicator::get_root->output('state', synchronizer::PIPE_STARTED);
}
else
{
communicator::get_root->output('state', synchronizer::PIPE_FAILED);
}
}
sub position($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
($general::position) = $body;
$ENV{TAKTUK_POSITION} = $body;
}
sub put($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
my $prefix;
my $source;
my $destination;
($prefix, $body) = taktuk::unpack($body);
($source, $body) = taktuk::unpack($body);
($destination, $body) = $body;
diagnostic::debug("Calling send_file with $prefix, $source, ".
"$destination from $general::taktuk{rank}");
$connector->send_file($prefix, $source, $destination);
}
sub quit($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
if (not $communicator::end)
{
handlers::eof($taktuk::eof, $connector, "");
communicator::terminate;
foreach my $other_connector (communicator::get_connections('sinks'))
{
diagnostic::debug("Sending quit message to $other_connector");
$other_connector->send_message($taktuk::quit);
}
}
}
sub ready($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
diagnostic::debug("Ready received");
$synchronizer::father_is_ready = 1;
synchronizer::check_ready_state;
}
sub recv_timeout($)
{
my $timer = shift;
$timer->{connector}->send_message($taktuk::timeout);
$timer->{connector}->{pending_messages}++;
}
our %reduce_result;
our %reduce_remaining;
our %reduce_connector;
our @reduce_pending;
use constant REDUCE_INITIALIZATION => -1;
use constant REDUCE_FINALIZATION => -2;
our %reduce_handler = ($taktuk::reduce_count => \&reduce_count,
$taktuk::reduce_tree => \&reduce_tree);
sub reduce_count ($$$$)
{
my $connector = shift;
my $rank = shift;
my $new_value = shift;
my $data = shift;
if ($rank == REDUCE_INITIALIZATION)
{
$general::numbering_update = $new_value?1:0;
synchronizer::set_not_numbered;
return 1;
}
elsif ($rank == REDUCE_FINALIZATION)
{
if ($general::root)
{
$data--;
numbered($taktuk::numbered, communicator::get_root,
"0 $data $general::numbering_update");
return undef;
}
else
{
return $data;
}
}
else
{
$connector->{count} = $new_value;
return $data + $new_value; 
}
}
sub reduce_tree($$$$)
{
my $source_connector = shift;
my $rank = shift;
my $new_value = shift;
my $data = shift;
my $result;
if ($rank == REDUCE_INITIALIZATION)
{
$result = "['$general::host ($general::taktuk{rank}, ".
"$synchronizer::states[synchronizer::TAKTUK_READY])'";
my @sinks = communicator::get_connections('sinks');
foreach my $connector (@sinks)
{
if ($connector->{state} < $connector::initialized)
{
my $number = $connector->{$taktuk::reduce_tree};
$reduce_result{$taktuk::reduce_tree}->[$number] =
", ['connecting $connector->{peer}']";
delete $connector->{$taktuk::reduce_tree};
$reduce_remaining{$taktuk::reduce_tree}--;
}
else
{
$connector->send_message(taktuk::encode($taktuk::reduce,
$taktuk::reduce_tree));
}
}
}
elsif ($rank == REDUCE_FINALIZATION)
{
if ($general::root)
{
eval('$new_value = '.$data."]");
general::print_tree("",$new_value);
$result = undef;
}
else
{
$result = $data."]";
}
}
else
{
$result = $data.",".$new_value; 
}
return $result;
}
sub reduce($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
my @connectors = (communicator::get_connections('sources'),
communicator::get_connections('sinks'));
my $i;
diagnostic::debug("Reduce in $general::taktuk{rank}");
my ($type,$value) = taktuk::decode($body);
if (not exists($reduce_connector{$type}))
{
diagnostic::error("Handler not defined for reduce $type")
if not exists($reduce_handler{$type});
$reduce_connector{$type} = [];
$reduce_connector{$type}->[0] = $connector;
$i = 0;
foreach my $target (@connectors)
{
if ($target != $connector)
{
$i++;
$target->{$type} = $i;
$reduce_connector{$type}->[$i] = $target;
}
}
$reduce_remaining{$type} = $i;
my $handler = $reduce_handler{$type};
$reduce_result{$type} = [];
$reduce_result{$type}->[0] =
&$handler($connector, REDUCE_INITIALIZATION, $value, undef);
}
else
{
push @reduce_pending, [ $message, $connector, $body ];
}
reduce_status_analysis($type);
}
sub reduce_status_analysis($)
{
my $type = shift;
if ($reduce_remaining{$type} == 0)
{
my $data = $reduce_result{$type};
my $handler = $reduce_handler{$type};
my $connector = $reduce_connector{$type};
my $result = $data->[0];
for (my $i=1; $i<=$#$data; $i++)
{
$result = &$handler($connector->[$i], $i, $data->[$i], $result);
}
$result = &$handler($connector->[0], REDUCE_FINALIZATION,
undef, $result);
if (defined($result))
{
my $reply_connector = $connector->[0];
$reply_connector->send_message(taktuk::encode(
$taktuk::reduce_result, taktuk::encode($type, $result)));
}
delete $reduce_connector{$type};
delete $reduce_remaining{$type};
delete $reduce_result{$type};
if (scalar(@reduce_pending))
{
my $next = pop @reduce_pending;
reduce($next->[0], $next->[1], $next->[2]);
}
}
}
sub reduce_result($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
diagnostic::debug("Reduce result from $general::taktuk{rank}");
my ($type,$value) = taktuk::decode($body);
$reduce_result{$type}->[$connector->{$type}] = $value;
$reduce_remaining{$type}--;
delete $connector->{$type};
reduce_status_analysis($type);
}
sub check_ongoing_reduces($)
{
my $connector = shift;
foreach my $type (keys(%reduce_connector))
{
if (exists($connector->{$type}))
{
$reduce_remaining{$type}--;
reduce_status_analysis($type);
}
}
}
sub resign($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
@scheduler::static_connectors = ();
@scheduler::dynamic_connectors = ();
foreach my $other_connector (communicator::get_connections('sources'),
communicator::get_connections('sinks'))
{
if ($other_connector != $connector)
{
if ($other_connector->{state} < $connector::initialized)
{
$other_connector->output('connector',"canceled");
$other_connector->cancel;
}
else
{
$other_connector->send_message($taktuk::resign);
}
}
}
}
sub send_to($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
my ($to, $remaining) = taktuk::unpack($body);
diagnostic::debug("Message to $to and I'm $general::taktuk{rank}");
my $new_message;
my @destination_list = taktuk::decode_set($to);
my @upward = ();
my @downward = ();
my $delivery = 0;
my $min = shift @destination_list;
my $max = shift @destination_list;
while (defined($min) and ($min < $general::taktuk{rank}))
{
if ($max >= $general::taktuk{rank})
{
push @upward, $min, $general::taktuk{rank}-1;
unshift @destination_list, $general::taktuk{rank}, $max;
}
else
{
push @upward, $min, $max;
}
$min = shift @destination_list;
$max = shift @destination_list;
}
if (defined($min) and ($min == $general::taktuk{rank}))
{
$delivery = 1;
if ($max == $general::taktuk{rank})
{
$min = shift @destination_list;
$max = shift @destination_list;
} 
else
{
$min = $general::taktuk{rank}+1;
}
}
my $i=0;
my @sinks = communicator::get_connections('sinks');
while (defined($min) and ($min <= $general::child_max))
{
if ($max > $general::child_max)
{
push @upward, $general::child_max+1, $max;
$max = $general::child_max;
}
while (($i <= $#sinks) and ($min > $sinks[$i]->{max}))
{
if (scalar(@downward))
{
$new_message = taktuk::encode($message,
taktuk::pack(taktuk::encode_set(@downward)).$remaining);
$sinks[$i]->send_message($new_message);
@downward = ();
}
$i++;
}
if (($i <= $#sinks) and ($min < $sinks[$i]->{min}) and
($max >= $sinks[$i]->{min}))
{
unshift @destination_list, $sinks[$i]->{min}, $max;
$max = $sinks[$i]->{min}-1;
}
if (($i > $#sinks) or ($min < $sinks[$i]->{min}))
{
diagnostic::warning("Send problem, ".
(($min != $max)?"$min-$max":"$min")." not available anymore");
}
else
{
if ($max > $sinks[$i]->{max})
{
unshift @destination_list, $sinks[$i]->{max}+1, $max;
$max = $sinks[$i]->{max};
}
push @downward, $min, $max;
}
$min = shift @destination_list;
$max = shift @destination_list;
}
if (scalar(@downward))
{
$new_message = taktuk::encode($message,
taktuk::pack(taktuk::encode_set(@downward)).$remaining);
$sinks[$i]->send_message($new_message);
}
while (defined($min))
{
push @upward, $min, $max;
$min = shift @destination_list;
$max = shift @destination_list;
}
if (scalar(@upward))
{
if ($general::root)
{
diagnostic::warning("Send problem, ".taktuk::encode_set(@upward).
" is(are) invalid destination(s)");
}
else
{
$new_message = taktuk::encode($message,
taktuk::pack(taktuk::encode_set(@upward)).$remaining);
communicator::get_root->send_message($new_message);
}
}
if ($delivery)
{
diagnostic::debug("Delivered message");
communicator::process_message(communicator::get_control, $remaining);
}
}
sub spread($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
broadcast($message, $connector, $body);
diagnostic::debug("Handling message locally");
communicator::process_message($connector, $body);
}
sub steal($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
diagnostic::debug("Steal request : $body, connector : $connector");
scheduler::theft_handler($connector, $body);
synchronizer::check_ready_state;
}
sub synchronize($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
communicator::process_message($connector, $body);
}
sub taktuk_code($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
diagnostic::debug("Taktuk code received");
$general::taktuk_code = $body;
}
sub taktuk_perl($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
my ($options, $arguments, $filename, $attributes);
($attributes, $body) = taktuk::unpack($body);
if ($body =~ /(?:^|\s)--(?:$|\s)/)
{
($options, $arguments) = split /(?:^|\s)--(?:$|\s)/,$body,2;
$arguments = "" if not defined($arguments);
}
else
{
($options, $arguments) = ("", $body);
}
($filename, $arguments) = split /\s/, $arguments, 2;
$filename = "" if not defined($filename);
$arguments = "" if not defined($arguments);
diagnostic::debug("Taktuk perl execution, options [$options], filename ".
"[$filename], arguments [$arguments]");
my $command = execute($taktuk::execute, $connector,
taktuk::pack($attributes)."perl $options -- - $arguments");
if ($command)
{
$command->{line} = "taktuk_perl";
$command->{line} .= " $body" if $body;
general::load_taktuk_package('taktuk');
communicator::post_write($command->{read}, $general::taktuk_package)
or diagnostic::system;
communicator::post_write($command->{read}, "\npackage main;\n")
or diagnostic::system;
if ($filename and ($filename ne "-"))
{
$filename = qx{echo "$filename"};
chomp($filename);
communicator::post_write($command->{read},
general::load_file($filename)) or diagnostic::system;
communicator::post_close($command);
}
}
}
sub wait_message($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
$connector->{pending_messages}--;
if ($body)
{
if (($connector->{pending_messages} < 0) and ($body > 0))
{
my $timer = timer->register($body,\&recv_timeout);
$timer->{connector} = $connector;
$connector->{timer} = $timer;
}
}
}
sub work($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
diagnostic::debug("Work received : $body");
scheduler::dispatch_work($body);
synchronizer::check_ready_state;
}
sub init()
{
handlers::register_handler($taktuk::arguments, \&arguments);
handlers::register_handler($taktuk::broadcast, \&broadcast);
handlers::register_handler($taktuk::down, \&down);
handlers::register_handler($taktuk::eof, \&eof);
handlers::register_handler($taktuk::execute, \&execute);
handlers::register_handler($taktuk::file, \&file);
handlers::register_handler($taktuk::forward_up, \&forward_up);
handlers::register_handler($taktuk::get, \&get);
handlers::register_handler($taktuk::get_info, \&get_info);
handlers::register_handler($taktuk::input, \&input);
handlers::register_handler($taktuk::kill, \&kill);
handlers::register_handler($taktuk::message, \&message);
handlers::register_handler($taktuk::output, \&output);
handlers::register_handler($taktuk::option, \&option);
handlers::register_handler($taktuk::options, \&options);
handlers::register_handler($taktuk::pipe, \&pipe);
handlers::register_handler($taktuk::position, \&position);
handlers::register_handler($taktuk::quit, \&quit);
handlers::register_handler($taktuk::ready, \&ready);
handlers::register_handler($taktuk::reduce, \&reduce);
handlers::register_handler($taktuk::reduce_result, \&reduce_result);
handlers::register_handler($taktuk::resign, \&resign);
handlers::register_handler($taktuk::numbered, \&numbered);
handlers::register_handler($taktuk::put, \&put);
handlers::register_handler($taktuk::send_to, \&send_to);
handlers::register_handler($taktuk::spread, \&spread);
handlers::register_handler($taktuk::steal, \&steal);
handlers::register_handler($taktuk::synchronize, \&synchronize);
handlers::register_handler($taktuk::taktuk_code, \&taktuk_code);
handlers::register_handler($taktuk::taktuk_perl, \&taktuk_perl);
handlers::register_handler($taktuk::update_failed, \&update_failed);
handlers::register_handler($taktuk::wait_message, \&wait_message);
handlers::register_handler($taktuk::work, \&work);
}
package main;
use strict; use bytes;
our $taktuk_interpreter = undef;
our $interactive = 0;
our $forced_interactive = 0;
our $no_numbering = 0;
our $terminate = 0;
our $quit_message = $taktuk::quit;
sub is_one_of($@)
{
my $prefix = shift;
my $found = undef;
my $number_found = 0;
foreach my $fullname (@_)
{
if ($fullname =~ m/^$prefix/)
{
$found = $fullname;
$number_found++;
}
}
$prefix=$found if ($number_found == 1);
return $prefix;
}
sub get_attributes()
{
my $attributes = "";
my $timeout_pack = "";
my $argument = arguments::get_next_argument;
while ($argument =~ /[a-z]/o)
{
$argument = is_one_of($argument, qw(timeout action kill));
if ($argument eq "timeout")
{
my $value = arguments::get_next_argument;
if (length($timeout_pack))
{
$attributes .= taktuk::pack($timeout_pack);
$timeout_pack = "";
}
$timeout_pack = taktuk::pack($value);
}
elsif ($argument eq "action")
{
my ($message, $type, $data) = translate;
if (length($timeout_pack))
{
if (not $type)
{
$timeout_pack .= taktuk::pack(
taktuk::encode($taktuk::action, $message))
if defined($message);
}
else
{
diagnostic::warning("Invalid attribute action");
}
}
else
{
diagnostic::warning("Action requires timeout specification");
}
}
elsif ($argument eq "kill")
{
my $value = arguments::get_next_argument;
if ($value =~ m/^\d+$/)
{
if (length($timeout_pack))
{
$timeout_pack .= taktuk::pack(taktuk::encode($taktuk::kill,
$value));
}
else
{
diagnostic::warning("Kill requires timeout specification");
}
}
else
{
diagnostic::error("Invalid signal number for kill");
}
}
else
{
diagnostic::warning("Unknown option $argument for exec");
}
$argument = arguments::get_next_argument;
} 
$attributes .= taktuk::pack($timeout_pack) if length($timeout_pack);
arguments::restore_last_argument;
diagnostic::debug("Exec attributes : $attributes");
return $attributes;
}
sub translate()
{
my $message = undef;
my $type = "";
my $data = undef;
my @commands = qw(broadcast downcast exec get help input kill 
network option put synchronize taktuk_perl version quit);
my $found="";
my $number_found=0;
my $command;
$command = arguments::get_next_command;
if (not $arguments_ended and ($command =~ /^[a-z]+$/))
{
$command = is_one_of($command, @commands);
}
if ($arguments_ended)
{
}
elsif ($command =~ m/^\s*$/o)
{
}
elsif ($command =~ /^\d/)
{
my @send_set = taktuk::decode_set($command);
if (scalar(@send_set))
{
($message, $type, $data) = translate();
$message = taktuk::encode($taktuk::send_to,
taktuk::pack(taktuk::encode_set(@send_set)).$message)
if (defined($message));
}
else
{
diagnostic::error("Invalid set specification : $command");
}
}
elsif ($command eq "broadcast")
{
($message, $type, $data) = translate();
$message = taktuk::encode($taktuk::broadcast, $message)
if (defined($message));
}
elsif ($command eq "downcast")
{
($message, $type, $data) = translate();
$message = taktuk::encode($taktuk::down, taktuk::encode(
$taktuk::broadcast, $message)) if (defined($message));
}
elsif ($command eq "exec")
{
my $attributes = get_attributes;
my $parameters = arguments::get_parameters;
if (defined($parameters))
{
$message = taktuk::encode($taktuk::execute,
taktuk::pack($attributes).$parameters);
}
else
{
$terminate = 1;
}
}
elsif ($command eq "get")
{
my $source = arguments::get_parameters;
my $destination = arguments::get_parameters;
if (defined($source) and defined($destination))
{
$message = "";
$type = $taktuk::get;
$data = [ $source, $destination ];
}
else
{
$terminate = 1;
}
}
elsif ($command eq "help")
{
general::print_help;
}
elsif ($command eq "input")
{
my $name = arguments::get_next_command;
if (defined($name) and ($name =~ m/^[a-z]+$/))
{
$name = is_one_of($name, qw(close data file line pipe));
}
else
{
arguments::restore_last_command;
$name = "data";
}
if ($name eq "close")
{
$message = $taktuk::eof;
}
else
{
my $parameters = arguments::get_parameters;
if (defined($parameters))
{
if ($name eq "data")
{
$message = taktuk::encode($taktuk::input, $parameters);
}
elsif ($name eq "line")
{
$message = taktuk::encode($taktuk::input,$parameters."\n");
}
elsif ($name eq "file")
{
$message = "";
$type = $taktuk::file;
$data = general::open_file($parameters);
$message = $taktuk::input if $data;
}
elsif ($name eq "pipe")
{
$message = "";
$type = $taktuk::pipe;
$data = $parameters;
$message = $taktuk::input;
}
else
{
diagnostic::warning("Unknown input type, ignoring command");
}
}
else
{
$terminate = 1;
}
}
}
elsif ($command eq "kill")
{
my $parameter = arguments::get_next_command;
if ($parameter)
{
$message = taktuk::encode($taktuk::kill, $parameter);
}
else
{
$message = taktuk::encode($taktuk::kill, 15);
}
}
elsif ($command eq "network")
{
my $subcommand = arguments::get_next_command;
$subcommand = is_one_of($subcommand, qw(cancel state renumber update));
if ($subcommand eq "cancel")
{
$message = $taktuk::resign;
}
elsif (($subcommand eq "state") or ($subcommand eq ""))
{
if ($general::root)
{
$message =
taktuk::encode($taktuk::reduce, $taktuk::reduce_tree);
}
else
{
diagnostic::warning("Cannot print tree from non root node");
}
}
elsif ($subcommand eq "renumber")
{
if (($general::root) and !$no_numbering)
{
$message = taktuk::encode($taktuk::spread,
taktuk::encode($taktuk::reduce, $taktuk::reduce_count));
}
else
{
diagnostic::warning("Cannot renumber from non root node");
}
}
elsif ($subcommand eq "update")
{
if (($general::root) and !$no_numbering)
{
$message = taktuk::encode($taktuk::spread,
taktuk::encode($taktuk::reduce, $taktuk::reduce_count.1));
}
else
{
diagnostic::warning("Cannot update from non root node");
}
}
else
{
diagnostic::warning("Unknwon network command $subcommand");
}
}
elsif ($command eq "option")
{
my $name = arguments::get_next_argument;
if (defined($name) and ($name =~ m/^[-a-z]+$/))
{
my $parameters = arguments::get_parameters;
if (defined($parameters))
{
$name = $arguments::short_name{$name} if (length($name) > 1);
$message = taktuk::encode($taktuk::option, $name.$parameters);
}
else
{
$terminate = 1;
}
}
else
{
arguments::restore_last_argument;
my $parameters = arguments::get_parameters;
if (defined($parameters))
{
$message = taktuk::encode($taktuk::options, $parameters);
}
else
{
$terminate = 1;
}
}
}
elsif ($command eq "put")
{
my $source = arguments::get_parameters;
my $destination = arguments::get_parameters;
if (defined($source) and defined($destination))
{
$message = "";
$type = $taktuk::put;
$data = [ $source, $destination ];
}
else
{
$terminate = 1;
}
}
elsif ($command eq "quit")
{
$message = $quit_message;
}
elsif ($command eq "synchronize")
{
($message, $type, $data) = translate();
$message = taktuk::encode($taktuk::synchronize, $message)
if (defined($message));
}
elsif ($command eq "taktuk_perl")
{
my $attributes = get_attributes;
my $parameters = arguments::get_parameters;
if (defined($parameters))
{
$message = taktuk::encode($taktuk::taktuk_perl,
taktuk::pack($attributes).$parameters);
}
else
{
$terminate = 1;
}
}
elsif ($command eq "version")
{
arguments::print_version;
}
else
{
diagnostic::error("Unknown command : $command");
}
return ($message, $type, $data);
}
sub manage_children_termination()
{
my $deceased = wait;
if ($deceased == $taktuk_interpreter)
{
exit 0;
}
}
sub fork_taktuk_interpreter()
{
$taktuk_interpreter = fork();
if (not defined($taktuk_interpreter))
{
diagnostic::system;
diagnostic::error("FATAL : cannot continue without forked interpreter");
exit 1;
}
if ($taktuk_interpreter)
{
if ($arguments::arguments_ended or $forced_interactive)
{
diagnostic::debug("Interactive mode");
if ($arguments::arguments_ended)
{
arguments::fetch_arguments(\*STDIN);
}
else
{
$interactive = 1;
}
}
communicator::get_outgoing->cleanup;
arguments::initialize_terminal;
$SIG{CHLD} = \&manage_children_termination;
process_commands();
$SIG{CHLD} = 'DEFAULT';
waitpid $taktuk_interpreter, 0;
exit 0;
}
else
{
communicator::get_control->cleanup;
}
}
sub handle_message($)
{
my $message = shift;
my $connector = communicator::get_control;
if (defined($taktuk_interpreter))
{
$connector->send_message($message);
}
else
{
communicator::process_message($connector, $message);
}
}
sub process_commands()
{
my $message;
my $type;
my $data;
handle_message(taktuk::encode($taktuk::spread,
taktuk::encode($taktuk::reduce, $taktuk::reduce_count)))
if (($general::root) and !$no_numbering);
while (not $arguments::arguments_ended and not $terminate)
{
($message, $type, $data) = translate();
arguments::skip_command_separator unless $arguments::arguments_ended;
if (defined($message))
{
if ($type eq $taktuk::file)
{
my $result;
my $partial_message;
my $buffer;
$result = sysread($data, $buffer, $taktuk::read_size/2);
while ($result)
{
$partial_message = taktuk::encode($message, $buffer);
diagnostic::debug("Message to send : $partial_message");
handle_message($partial_message);
$result = sysread($data, $buffer, $taktuk::read_size/2);
}
diagnostic::system if not defined($result);
CORE::close($data) if ($data != \*STDIN);
}
elsif ($type eq $taktuk::pipe)
{
my $new_message = taktuk::encode($taktuk::pipe,
taktuk::pack($data).$message);
handle_message($new_message);
}
elsif (($type eq $taktuk::put) or ($type eq $taktuk::get))
{
if ($message)
{
my $new_message = taktuk::encode($type, taktuk::pack(
$message).taktuk::pack($data->[0]).$data->[1]);
handle_message($new_message);
}
else
{
diagnostic::warning("get/put require a destination ".
"modifier");
}
}
else
{
diagnostic::debug("Message to send : $message");
handle_message($message);
}
}
if ($arguments::arguments_ended and $interactive)
{
arguments::fetch_arguments(\*STDIN);
$interactive = 0;
}
}
diagnostic::debug("End of process commands");
communicator::get_control->close('read') if $general::root;
}
taktuk::no_flush(\*STDOUT);
handlers::init;
arguments::init;
general::init;
communicator::init;
synchronizer::set_not_ready;
arguments::fetch_arguments(@ARGV);
arguments::parse_options;
synchronizer::setup_synchronization;
if ($general::root)
{
$synchronizer::father_is_ready = 1;
fork_taktuk_interpreter;
}
else
{
general::print($connector::init_string.$taktuk::RELEASE."\n");
}
synchronizer::check_ready_state;
scheduler::schedule;
communicator::run;
diagnostic::debug("End of the taktuk code");
communicator::get_root()->output('state', synchronizer::TAKTUK_TERMINATED);
exit(0);
__END__
