Creating a Firewall Log Analysis Tool for a SonicWall DSL Router, Part 2

In the last entry, log lines were being "compressed" by placing them into a database table. There were a few bugs in that code that have been fixed, and features added to the new script, below, that save us from losing some log data. Explanation after the code:

File: load

#! /usr/bin/perl
# vim:ts=4:sw=4:ai:

use Date::Parse;
use DBI;
use Socket;

$dsn = "DBI:mysql:database=firewall;host=localhost";
$dbh = DBI->connect( $dsn, 'firewall', '' );

$path = '/home/johnk/Sonicwall';

rename "$path/log", "$path/log.tmp";
system('/etc/init.d/rsyslog restart');

my %messages = ();
my %protos = ();

## load app protos from the db.  the ids don't necessarily match the port numbers.
$sql = "SELECT id,name FROM appprotos";
$sth = $dbh->prepare($sql);
$sth->execute();
while( @row = $sth->fetchrow_array )
{
        $protos{$row[1]} = $row[0];
}

open LOG,"<$path/log.tmp";
while (my $line = <LOG>)
{
        chomp $line;

        my @parts = split /\s+(\w+)=/, $line;
        shift @parts;
        %hash = @parts;

        my ($ip, $srcport, $srcint) = split /:/,$hash{src};
        $hash{src} = unpack('l',inet_aton($ip));

        my ($ip, $dstport, $dstint) = split /:/,$hash{dst};
        $hash{dst} = unpack('l',inet_aton($ip));

        $hash{'time'} = str2time(substr($hash{'time'},1,-1));

        my ($netproto, $appproto) = split /\//,$hash{proto};

        ## Substitute proto with the numeric code.  If it doesn't exist, then
        ## add it to the list, and then do the substitution.
        if ($protos{$appproto})
        {
                $appproto = $protos{$appproto};
        }
        else
        {
                $sql = "INSERT INTO appprotos (`name`) VALUES ('$appproto')";
                $sth = $dbh->prepare($sql);
                $result = $sth->execute();
                print "$sql\n" if (! $result);
                $id = $sth->{'mysql_insertid'};
                $protos{$appproto} = $id;
                $appproto = $id;
        }

        $sent = $hash{sent} + 0;
        $recd = $hash{recd} + 0;

        $sql = "INSERT INTO logs VALUES ($hash{time},$hash{pri},$hash{m},$hash{src},$srcport,'$srcint',$hash{dst},$dstport,'$dstint','$netproto',0,$sent,$recd)";
        $sth = $dbh->prepare($sql);
        if (! $sth->execute() )
        {
                print $line;
                print "\n";
                print $sql;
                print "\n";
                exit;
        }

        ## make a memo about the message
        $messages{$hash{m}} = $hash{msg};
}
close LOG;

## Add all the gathered msg messages into the messages table.
foreach my $key (keys %messages)
{
        $msg = substr($messages{$key}, 1, -1);
        $sql = "INSERT IGNORE INTO messages (id,name) VALUES ($key,'$msg')";
        $sth = $dbh->prepare($sql);
        $sth->execute();
}

unlink "$path/log.tmp";

Table definitions for the above:

--
-- Table structure for table `appprotos`
--

CREATE TABLE IF NOT EXISTS `appprotos` (
  `id` tinyint(3) unsigned NOT NULL auto_increment,
  `name` varchar(255) collate ascii_bin NOT NULL,
  PRIMARY KEY  (`id`)
) ENGINE=MyISAM  DEFAULT CHARSET=ascii COLLATE=ascii_bin AUTO_INCREMENT=12 ;

-- --------------------------------------------------------

--
-- Table structure for table `logs`
--

CREATE TABLE IF NOT EXISTS `logs` (
  `datetime` int(11) unsigned NOT NULL,
  `pri` smallint(2) unsigned NOT NULL,
  `m` smallint(3) unsigned NOT NULL,
  `src` int(11) unsigned NOT NULL,
  `srcport` mediumint(5) unsigned NOT NULL,
  `srcint` enum('','WAN','LAN','OPT') collate ascii_bin NOT NULL,
  `dst` int(11) unsigned NOT NULL,
  `dstport` mediumint(5) unsigned NOT NULL,
  `dstint` enum('','WAN','LAN','OPT') collate ascii_bin NOT NULL,
  `proto` enum('tcp','udp','icmp') collate ascii_bin NOT NULL,
  `appproto` smallint(3) unsigned default NULL,
  `sent` int(11) unsigned NOT NULL,
  `recd` int(11) unsigned NOT NULL
) ENGINE=MyISAM DEFAULT CHARSET=ascii COLLATE=ascii_bin;

-- --------------------------------------------------------

--
-- Table structure for table `messages`
--

CREATE TABLE IF NOT EXISTS `messages` (
  `id` smallint(3) NOT NULL,
  `name` varchar(255) collate ascii_bin NOT NULL,
  PRIMARY KEY  (`id`)
) ENGINE=MyISAM DEFAULT CHARSET=ascii COLLATE=ascii_bin;

This version of the script can be run as a cron job.

In the previous script, we lost some of the details of the protocol field. The protocol fields look like this: "tcp/http", "udp/dns" and so forth. It's the protocol and the application. Usually, this is identified by the port number, but I think the firewall does some packet inspection to identify it.

To save this data without consuming space, the appprotos table holds the names of the protocols, and in the logs table, we store a reference to appprotos. This reduces the appproto field to one byte (a tinyint). Since most traffic is "http" or "https" or "smtp", we gain a 4:1 compression.

The other bit of data to preserve is the msg field. That's a human-readable version of field m, which we store as a mediumint.

For both of these fields, we create records in the appprotos and messages table on an as-needed basis. When the script is executed, the tables are loaded into hashes. The log line values are encoded using these hashes. As new values are found, we insert the new values into the tables.

This has the nice side effect of keeping the tables small, so the key field can be kept as small as reasonably possible. (Also, I had a big bug in the previous entry -- the fields were signed values, but they should have been unsigned. Unsigned fields can store twice as many positive values.)

There are still some fields missing, but, at this time, I'm not going to save that data. I'm not even sure what reports I want, yet. Additionally, what I need right now is domain name resolution, so we can map from IP addresses to domain names.

Continued in the next entry, coming soon.

.