Ima::Human::Query::Optimizer

runrig on 2005-04-23T16:15:02

Since this one report was taking hours to generate in the "normal" way, I thought I'd take a different approach. I used the high performance SQL unloader to generate the driving data, and then a series of perl processes to summarize, filter, and/or transform the results. I searched CPAN for a beast that might help, and saw Pipeline, but I couldn't figure out how to use it in a reasonable amount of time (examples would be helpful!), so I came up with this, which I use with a pipe-delimited text stream, though if modified to be more generic, could probably be used with any format:

package MyPipe;

use strict;

use IO::Handle;
use IO::Pipe;
use IO::File;
use DBI;

sub new {
  my $class = shift;
  bless { PIPELINE => [@_] }, $class;
}

sub process {
  my $p = shift;
  unless ($p->{WRITE}) {
    my $next = shift @{$p->{PIPELINE}};
    if (ref($next)) {
      if (UNIVERSAL::isa($next, "ARRAY")) {
        if ($next->[0] eq "FILE") {
          $p->{WRITE} = IO::File->new(">$next->[1]")
            or die "Could not open $next->[1]: $!";
        } else {
          die "Don't know how to handle $next->[0] in process\n";
        }
      } else {
        die "Can only handle ARRAY references in process pipeline\n";
      }
    } else {
      if ($next) {
        my $pipe = IO::Pipe->new;
        if (my $pid = fork) {
          $pipe->writer;
          $p->{WRITE} = $pipe;
        } else {
          die "Could not fork: $!" unless defined $pid;
          $pipe->reader;
          $p->{READ} = $pipe;
          $p->$next();
          exit;
        }
      } else {
        $p->{WRITE} = IO::Handle->new;
        unless ($p->{WRITE}->fdopen(fileno(STDOUT),"w")) {
          die "Could not open STDOUT: $!";
        }
      }
    }

  }
  print { $p->{WRITE} } @_ or die "Pipe error: $!";
}

Each "next" segment in the pipeline is a method in the package, where a typical transformer/filter might be (BTW, I know "group" is a reserved word in SQL and a bad name for a table, but don't blame me!):

# Group data
{
my @fields = qw(
  group_parent
  group_name
  group_oed
  next_renew_date
  enrol_territory
);
my $sql = <{READ};

  chomp(my $hdr = <$fh>);
  my @in_fields = split /\|/, $hdr;
  my %data; undef @data{@fields, @in_fields};
  my @outfields = keys %data;
  $p->pipe_out(@outfields); # shortcut to 'process' sub

  my $dbh = $p->connect;
  my $sth = $dbh->prepare($sql);

  while (<$fh>) {
    chomp;
    @data{@in_fields} = split /\|/;
    $sth->execute($data{group_id});
    @data{@fields} = $sth->fetchrow_array or next;
    $sth->finish;
    $data{group_parent} ||= $data{group_id};
    $p->pipe_out(@data{@outfields});
  }
  $dbh->disconnect;
  $fh->close or die "Read error: $!";
  $p->{WRITE}->close or die "Write error: $!";
}
}

Typical usage in the main program is:

my $p = MyPipe->new(qw(
  ...other stuff....
  fetch_group
  ...even more stuff...
), [FILE => $filename]);

while (defined(my $line = <>)) {
  $p->process( $line );
}

Every process gets its own db handle, does its work, then passes it along to the next process. For processes which do their lookup from "small" tables, I either read the whole table into a hash array up front and lookup from that, or read from the database and cache the result. Or there's a filter which, since I know only 1 out of 10 rows will be passed along, I read up front the 10_000 ids I want from the table into a lookup hash, so I don't have to hit the database 100_000 times to see if it's one of the 10_000 I want. Total report time: about 10 minutes.

If anyone thinks this is "worthy of CPAN" (and I know how meaningless that phrase is -- Pipeline::Simple, anyone?), they're free to utilize the above code and release it. If anyone is mildly impressed by this, the last day of my contract was yesterday (surprise! it was cut short by two months due to the cancellation of another unrelated but large project), so feel free to hire me :-)