#!/usr/bin/perl -w use strict; use IO::Select; use IO::Socket::INET; use IO::Pty::Easy; $| = 1; # port to bind to my $port = 27113; # how long to block for incoming connections on each main loop iteration # also serves to sleep the main loop, increase this if this script is hammering the cpu to death my $loop_wait = 0.001; # create a new pty and spawn R my $pty = IO::Pty::Easy->new; $pty->spawn('/usr/local/lib/R/bin/R --vanilla --slave --no-readline'); sleep 2; # hopefully R is ready after 2 seconds # bind port for incoming connections my $sock = IO::Socket::INET->new( 'LocalPort' => $port, 'Proto' => 'tcp', 'Listen' => SOMAXCONN, 'Reuse' => 1 ); if ($sock) { print "[INIT] Bound to port $port\n"; } else { die "[DIE ] Cannot bind port $port\n"; } # start the main loop my $select = IO::Select->new; $select->add($sock); my @connq; my $abort; while ($pty->is_active) { # accept incoming connections and put them into the queue # we're going through the entire queue at once because SOMAXCONN sucks # 0 = socket object, 1 = select object, 2 = output buffer while ($select->can_read($loop_wait)) { my @thisconn; $thisconn[0] = $sock->accept(); $thisconn[1] = IO::Select->new; $thisconn[1]->add($thisconn[0]); $thisconn[2] = ''; push @connq, \@thisconn; print "[CONN] Opened from ".$thisconn[0]->sockhost."\n"; } # attempt to process connections in the queue for (my $i=0;$i<@connq;$i++) { # see if the socket is readable while (defined $connq[$i] && ${$connq[$i]}[1]->can_read(0)) { # try to read from the socket, or close it on read errors # IO::Socket->connected doesn't really to work right, so if we get a zero-length read we're going to close the connection my $sock_read; if (!defined ${$connq[$i]}[0]->recv($sock_read, 1024) || length($sock_read) == 0) { print "[CONN] Closed from ".${$connq[$i]}[0]->sockhost." (Connection reset by peer)\n"; ${$connq[$i]}[0]->close; delete $connq[$i]; last; } # if we read something, write it to the buffer ${$connq[$i]}[2] .= $sock_read; # if the client sent a newline, process their data if (substr($sock_read, -1) eq "\n") { # split incoming data into x and y values ${$connq[$i]}[2] =~ s/\n$//g; my @ydata = split(/ /, substr(${$connq[$i]}[2], 0, index(${$connq[$i]}[2], 'x'))); my @xdata = split(/ /, substr(${$connq[$i]}[2], index(${$connq[$i]}[2], 'x') + 1)); if (@ydata <= (@xdata / @ydata) || (@xdata / @ydata) != int(@xdata / @ydata)) { print "[CONN] Closed from ".${$connq[$i]}[0]->sockhost." (Short matrix)\n"; ${$connq[$i]}[0]->close; delete $connq[$i]; last } # now we generate some scripts for R with some really ugly code my $script = 'cat(fitted(lm(y~'; my $labels = "'y',"; for (my $i = 0; $i < (@xdata / @ydata); $i++) { $script .= "x$i+"; $labels .= "'x$i',"; } $labels =~ s/,$//g; $script =~ s/\+$//g; $script .= ',data=data.frame(matrix(c('; for (my $i = 0; $i < @ydata; $i++) { $script .= $ydata[$i].','; if ($ydata[$i] =~ m/([^\.\d]|.*\..*\..*|^0)/) { $abort = 1; last; } for (my $j = ((@xdata / @ydata) * $i); $j < (((@xdata / @ydata) * $i) + (@xdata / @ydata)); $j++) { if ($xdata[$j] =~ m/([^\.\d]|.*\..*\..*|^0)/) { $abort = 1; last; } $script .= $xdata[$j].','; } if ($abort) { last; } } if ($abort) { $abort = 0; print "[CONN] Closed from ".${$connq[$i]}[0]->sockhost." (Non-numeric data)\n"; ${$connq[$i]}[0]->close; delete $connq[$i]; last; } $script =~ s/,$//g; $script .= '),nrow='.@ydata.',ncol='.((@xdata / @ydata) + 1).',byrow=TRUE,dimnames=list(seq('.@ydata."),c($labels)))))));\n\n\n"; # give the script to R and wait for results my $write = $pty->write($script); print "[PROC] Writing script to R for ".${$connq[$i]}[0]->sockhost." $write/".length($script)." bytes\n"; my @results; my $ptybuf = ''; print "[PROC] Waiting for results for ".${$connq[$i]}[0]->sockhost."\n"; my $stime = time; while (@results < @ydata) { if ((time - $stime) > 5) { print "[HANG] Hangcheck tripped, jiggling the handle...\n"; if (!$pty->is_active) { die "Looks like R died, exiting\n"; } $pty->write("\n\n"); } if ((time - $stime) > 10) { print "[HANG] Hangcheck cycled for 5 seconds, breaking out..."; last; } my $ptyread = $pty->read(0); if (defined $ptyread) { $ptybuf .= $ptyread; @results = split / /, $ptybuf; } } # send the results to the client and close the socket ${$connq[$i]}[0]->send("@results\n"); print "[CONN] Closed from ".${$connq[$i]}[0]->sockhost." (Result returned)\n"; ${$connq[$i]}[0]->close; delete $connq[$i]; last; } } } # the connection queue is probably a mess, clean it up if (@connq > 8) { @connq = sort @connq; } } print "Something went wrong with the pty, exiting...\n";