#!/usr/bin/perl

# collectd - contrib/rrd_filter.px
# Copyright (C) 2007-2008  Florian octo Forster
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; only version 2 of the License is applicable.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
#
# Authors:
#   Florian octo Forster <octo at verplant.org>

use strict;
use warnings;

=head1 NAME

rrd_filter.px - Perform same advanced non-standard operations on an RRD file.

=head1 SYNOPSYS

  rrd_filter.px -i input.rrd -o output.rrd [options]

=head1 DEPENDENCIES

rrd_filter.px requires the RRDTool binary, Perl and the included
L<Getopt::Long> module.

=cut

use Getopt::Long ('GetOptions');

our $InFile;
our $InDS = [];
our $OutFile;
our $OutDS = [];

our $NewDSes = [];
our $NewRRAs = [];

our $Step = 0;

our $Scale = 1.0;
our $Shift = 0.0;

our $Debug = 0;

=head1 OPTIONS

The following options can be passed on the command line:

=over 4

=item B<--infile> I<file>

=item B<-i> I<file>

Reads from I<file>. If I<file> ends in C<.rrd>, then C<rrdtool dump> is invoked
to create an XML dump of the RRD file. Otherwise the XML dump is expected
directly. The special filename C<-> can be used to read from STDIN.

=item B<--outfile> I<file>

=item B<-o> I<file>

Writes output to I<file>. If I<file> ends in C<.rrd>, then C<rrdtool restore>
is invoked to create a binary RRD file. Otherwise an XML output is written. The
special filename C<-> can be used to write to STDOUT.

=item B<--map> I<in_ds>:I<out_ds>

=item B<-m> I<in_ds>:I<out_ds>

Writes the datasource I<in_ds> to the output and renames it to I<out_ds>. This
is useful to extract one DS from an RRD file.

=item B<--step> I<seconds>

=item B<-s> I<seconds>

Changes the step of the output RRD file to be I<seconds>. The new stepsize must
be a multiple of the old stepsize of the other way around. When increasing the
stepsize the number of PDPs in each RRA must be dividable by the factor by
which the stepsize is increased. The length of CDPs and the absolute length of
RRAs (and thus the data itself) is not altered.

Examples:

  step =  10, rra_steps = 12   =>   step = 60, rra_steps =  2
  step = 300, rra_steps =  1   =>   step = 10, rra_steps = 30

=item B<--rra> B<RRA>:I<CF>:I<XFF>:I<steps>:I<rows>

=item B<-a> B<RRA>:I<CF>:I<XFF>:I<steps>:I<rows>

Inserts a new RRA in the generated RRD file. This is done B<after> the step has
been adjusted, take that into account when specifying I<steps> and I<rows>. For
an explanation of the format please see L<rrdcreate(1)>.

=item B<--scale> I<factor>

Scales the values by the factor I<factor>, i.E<nbsp>e. all values are
multiplied by I<factor>.

=item B<--shift> I<offset>

Shifts all values by I<offset>, i.E<nbsp>e. I<offset> is added to all values.

=back

=cut

GetOptions ("infile|i=s" => \$InFile,
	"outfile|o=s" => \$OutFile,
	'map|m=s' => sub
	{
		my ($in_ds, $out_ds) = split (':', $_[1]);
		if (!defined ($in_ds) || !defined ($out_ds))
		{
			print STDERR "Argument for `map' incorrect! The format is `--map in_ds:out_ds'\n";
			exit (1);
		}
		push (@$InDS, $in_ds);
		push (@$OutDS, $out_ds);
	},
	'step|s=i' => \$Step,
	'ds|d=s' => sub
	{
		#DS:ds-name:GAUGE | COUNTER | DERIVE | ABSOLUTE:heartbeat:min:max
		my ($ds, $name, $type, $hb, $min, $max) = split (':', $_[1]);
		if (($ds ne 'DS') || !defined ($max))
		{
			print STDERR "Please use the standard RRDTool syntax when adding DSes. I. e. DS:<name>:<type>:<heartbeat>:<min>:<max>.\n";
			exit (1);
		}
		push (@$NewDSes, {name => $name, type => $type, heartbeat => $hb, min => $min, max => $max});
	},
	'rra|a=s' => sub
	{
		my ($rra, $cf, $xff, $steps, $rows) = split (':', $_[1]);
		if (($rra ne 'RRA') || !defined ($rows))
		{
			print STDERR "Please use the standard RRDTool syntax when adding RRAs. I. e. RRA:<cf><xff>:<steps>:<rows>.\n";
			exit (1);
		}
		push (@$NewRRAs, {cf => $cf, xff => $xff, steps => $steps, rows => $rows});
	},
	'scale=f' => \$Scale,
	'shift=f' => \$Shift
) or exit (1);

if (!$InFile || !$OutFile)
{
	print STDERR "Usage: $0 -i <infile> -m <in_ds>:<out_ds> -s <step>\n";
	exit (1);
}
if ((1 + @$InDS) != (1 + @$OutDS))
{
	print STDERR "You need the same amount of in- and out-DSes\n";
	exit (1);
}
main ($InFile, $OutFile);
exit (0);

{
my $ds_index;
my $current_index;
# state 0 == searching for DS index
# state 1 == parse RRA header
# state 2 == parse values
my $state;
my $out_cache;
sub handle_line_dsmap
{
	my $line = shift;
	my $index = shift;
	my $ret = '';

	if ((@$InDS == 0) || (@$OutDS == 0))
	{
		post_line ($line, $index + 1);
		return;
	}

	if (!defined ($state))
	{
		$current_index = -1;
		$state = 0;
		$out_cache = [];

		# $ds_index->[new_index] = old_index
		$ds_index = [];
		for (my $i = 0; $i < @$InDS; $i++)
		{
			print STDOUT "DS map $i: $InDS->[$i] -> $OutDS->[$i]\n" if ($Debug);
			$ds_index->[$i] = -1;
		}
	}

	if ($state == 0)
	{
		if ($line =~ m/<ds>/)
		{
			$current_index++;
			$out_cache->[$current_index] = $line;
		}
		elsif ($line =~ m#<name>\s*([^<\s]+)\s*</name>#)
		{
			# old_index == $current_index
			# new_index == $i
			for (my $i = 0; $i < @$InDS; $i++)
			{
				next if ($ds_index->[$i] >= 0);

				if ($1 eq $InDS->[$i])
				{
					$line =~ s#<name>\s*([^<\s]+)\s*</name>#<name> $OutDS->[$i] </name>#;
					$ds_index->[$i] = $current_index;
					last;
				}
			}

			$out_cache->[$current_index] .= $line;
		}
		elsif ($line =~ m#<last_ds>\s*([^\s>]+)\s*</last_ds>#i)
		{
			$out_cache->[$current_index] .= "\t\t<last_ds> NaN </last_ds>\n";
		}
		elsif ($line =~ m#<value>\s*([^\s>]+)\s*</value>#i)
		{
			$out_cache->[$current_index] .= "\t\t<value> NaN </value>\n";
		}
		elsif ($line =~ m#</ds>#)
		{
			$out_cache->[$current_index] .= $line;
		}
		elsif ($line =~ m#<rra>#)
		{
			# Print out all the DS definitions we need
			for (my $new_index = 0; $new_index < @$InDS; $new_index++)
			{
				my $old_index = $ds_index->[$new_index];
				while ($out_cache->[$old_index] =~ m/^(.*)$/gm)
				{
					post_line ("$1\n", $index + 1);
				}
			}

			# Clear the cache - it's used in state1, too.
			for (my $i = 0; $i <= $current_index; $i++)
			{
				$out_cache->[$i] = '';
			}

			$ret .= $line;
			$current_index = -1;
			$state = 1;
		}
		elsif ($current_index == -1)
		{
			# Print all the lines before the first DS definition
			$ret .= $line;
		}
		else
		{
			# Something belonging to a DS-definition
			$out_cache->[$current_index] .= $line;
		}
	}
	elsif ($state == 1)
	{
		if ($line =~ m#<ds>#)
		{
			$current_index++;
			$out_cache->[$current_index] .= $line;
		}
		elsif ($line =~ m#<value>\s*([^\s>]+)\s*</value>#i)
		{
			$out_cache->[$current_index] .= "\t\t\t<value> NaN </value>\n";
		}
		elsif ($line =~ m#</cdp_prep>#)
		{
			# Print out all the DS definitions we need
			for (my $new_index = 0; $new_index < @$InDS; $new_index++)
			{
				my $old_index = $ds_index->[$new_index];
				while ($out_cache->[$old_index] =~ m/^(.*)$/gm)
				{
					post_line ("$1\n", $index + 1);
				}
			}

			# Clear the cache
			for (my $i = 0; $i <= $current_index; $i++)
			{
				$out_cache->[$i] = '';
			}

			$ret .= $line;
			$current_index = -1;
		}
		elsif ($line =~ m#<database>#)
		{
			$ret .= $line;
			$state = 2;
		}
		elsif ($current_index == -1)
		{
			# Print all the lines before the first DS definition
			# and after cdp_prep
			$ret .= $line;
		}
		else
		{
			# Something belonging to a DS-definition
			$out_cache->[$current_index] .= $line;
		}
	}
	elsif ($state == 2)
	{
		if ($line =~ m#</database>#)
		{
			$ret .= $line;
			$current_index = -1;
			$state = 1;
		}
		else
		{
			my @values = ();
			my $i;
			
			$ret .= "\t\t";

			if ($line =~ m#(<!-- .*? -->)#)
			{
				$ret .= "$1 ";
			}
			$ret .= "<row> ";

			$i = 0;
			while ($line =~ m#<v>\s*([^<\s]+)\s*</v>#g)
			{
				$values[$i] = $1;
				$i++;
			}

			for (my $new_index = 0; $new_index < @$InDS; $new_index++)
			{
				my $old_index = $ds_index->[$new_index];
				$ret .= '<v> ' . $values[$old_index] . ' </v> ';
			}
			$ret .= "</row>\n";
		}
	}
	else
	{
		die;
	}

	if ($ret)
	{
		post_line ($ret, $index + 1);
	}
}} # handle_line_dsmap

#
# The _step_ handler
#
{
my $step_factor_up;
my $step_factor_down;
sub handle_line_step
{
	my $line = shift;
	my $index = shift;

	if (!$Step)
	{
		post_line ($line, $index + 1);
		return;
	}

	if ($Debug && !defined ($step_factor_up))
	{
		print STDOUT "New step: $Step\n";
	}

	$step_factor_up ||= 0;
	$step_factor_down ||= 0;

	if (($step_factor_up == 0) && ($step_factor_down == 0))
	{
		if ($line =~ m#<step>\s*(\d+)\s*</step>#i)
		{
			my $old_step = 0 + $1;
			if ($Step < $old_step)
			{
				$step_factor_down = int ($old_step / $Step);
				if (($step_factor_down * $Step) != $old_step)
				{
					print STDERR "The old step ($old_step seconds) "
					. "is not a multiple of the new step "
					. "($Step seconds).\n";
					exit (1);
				}
				$line = "<step> $Step </step>\n";
			}
			elsif ($Step > $old_step)
			{
				$step_factor_up = int ($Step / $old_step);
				if (($step_factor_up * $old_step) != $Step)
				{
					print STDERR "The new step ($Step seconds) "
					. "is not a multiple of the old step "
					. "($old_step seconds).\n";
					exit (1);
				}
				$line = "<step> $Step </step>\n";
			}
			else
			{
				$Step = 0;
			}
		}
	}
	elsif ($line =~ m#<pdp_per_row>\s*(\d+)\s*</pdp_per_row>#i)
	{
		my $old_val = 0 + $1;
		my $new_val;
		if ($step_factor_up)
		{
			$new_val = int ($old_val / $step_factor_up);
			if (($new_val * $step_factor_up) != $old_val)
			{
				print STDERR "Can't divide number of PDPs per row ($old_val) by step-factor ($step_factor_up).\n";
				exit (1);
			}
		}
		else
		{
			$new_val = $step_factor_down * $old_val;
		}
		$line = "<pdp_per_row> $new_val </pdp_per_row>\n";
	}

	post_line ($line, $index + 1);
}} # handle_line_step

#
# The _add DS_ handler
#
{
my $add_ds_done;
sub handle_line_add_ds
{
  my $line = shift;
  my $index = shift;

  my $post = sub { for (@_) { post_line ($_, $index + 1); } };

  if (!@$NewDSes)
  {
    $post->($line);
    return;
  }

  if (!$add_ds_done && ($line =~ m#<rra>#i))
  {
    for (my $i = 0; $i < @$NewDSes; $i++)
    {
      my $ds = $NewDSes->[$i];
      my $temp;

      my $min;
      my $max;

      if ($Debug)
      {
      	print STDOUT "Adding DS: name = $ds->{'name'}, type = $ds->{'type'}, heartbeat = $ds->{'heartbeat'}, min = $ds->{'min'}, max = $ds->{'max'}\n";
      }

      $min = 'NaN';
      if (defined ($ds->{'min'}) && ($ds->{'min'} ne 'U'))
      {
	$min = sprintf ('%.10e', $ds->{'min'});
      }
      
      $max = 'NaN';
      if (defined ($ds->{'max'}) && ($ds->{'max'} ne 'U'))
      {
	$max = sprintf ('%.10e', $ds->{'max'});
      }
      

      $post->("\t<ds>\n",
      "\t\t<name> $ds->{'name'} </name>\n",
      "\t\t<type> $ds->{'type'} </type>\n",
      "\t\t<minimal_heartbeat> $ds->{'heartbeat'} </minimal_heartbeat>\n",
      "\t\t<min> $min </min>\n",
      "\t\t<max> $max </max>\n",
      "\n",
      "\t\t<!-- PDP Status -->\n",
      "\t\t<last_ds> UNKN </last_ds>\n",
      "\t\t<value> NaN </value>\n",
      "\t\t<unknown_sec> 0 </unknown_sec>\n",
      "\t</ds>\n",
      "\n");
    }

    $add_ds_done = 1;
  }
  elsif ($add_ds_done && ($line =~ m#</ds>#i)) # inside a cdp_prep block
  {
    $post->("\t\t\t</ds>\n",
	"\t\t\t<ds>\n",
	"\t\t\t<primary_value> NaN </primary_value>\n",
	"\t\t\t<secondary_value> NaN </secondary_value>\n",
	"\t\t\t<value> NaN </value>\n",
	"\t\t\t<unknown_datapoints> 0 </unknown_datapoints>\n");
  }
  elsif ($line =~ m#<row>#i)
  {
	  my $insert = '<v> NaN </v>' x (0 + @$NewDSes);
	  $line =~ s#</row>#$insert</row>#i;
  }

  $post->($line);
}} # handle_line_add_ds

#
# The _add RRA_ handler
#
{
my $add_rra_done;
my $num_ds;
sub handle_line_add_rra
{
  my $line = shift;
  my $index = shift;

  my $post = sub { for (@_) { post_line ($_, $index + 1); } };

  $num_ds ||= 0;

  if (!@$NewRRAs || $add_rra_done)
  {
    $post->($line);
    return;
  }

  if ($line =~ m#<ds>#i)
  {
    $num_ds++;
  }
  elsif ($line =~ m#<rra>#i)
  {
    for (my $i = 0; $i < @$NewRRAs; $i++)
    {
      my $rra = $NewRRAs->[$i];
      my $temp;

      if ($Debug)
      {
      	print STDOUT "Adding RRA: CF = $rra->{'cf'}, xff = $rra->{'xff'}, steps = $rra->{'steps'}, rows = $rra->{'rows'}, num_ds = $num_ds\n";
      }

      $post->("\t<rra>\n",
      "\t\t<cf> $rra->{'cf'} </cf>\n",
      "\t\t<pdp_per_row> $rra->{'steps'} </pdp_per_row>\n",
      "\t\t<params>\n",
      "\t\t\t<xff> $rra->{'xff'} </xff>\n",
      "\t\t</params>\n",
      "\t\t<cdp_prep>\n");

      for (my $j = 0; $j < $num_ds; $j++)
      {
	$post->("\t\t\t<ds>\n",
	"\t\t\t\t<primary_value> NaN </primary_value>\n",
	"\t\t\t\t<secondary_value> NaN </secondary_value>\n",
	"\t\t\t\t<value> NaN </value>\n",
	"\t\t\t\t<unknown_datapoints> 0 </unknown_datapoints>\n",
	"\t\t\t</ds>\n");
      }

      $post->("\t\t</cdp_prep>\n", "\t\t<database>\n");
      $temp = "\t\t\t<row>" . join ('', map { "<v> NaN </v>" } (1 .. $num_ds)) . "</row>\n";
      for (my $j = 0; $j < $rra->{'rows'}; $j++)
      {
	$post->($temp);
      }
      $post->("\t\t</database>\n", "\t</rra>\n");
    }

    $add_rra_done = 1;
  }

  $post->($line);
}} # handle_line_add_rra

#
# The _scale/shift_ handler
#
sub calculate_scale_shift 
{
  my $value = shift;
  my $tag = shift;
  my $scale = shift;
  my $shift = shift;

  if (lc ("$value") eq 'nan')
  {
    $value = 'NaN';
    return ("<$tag> NaN </$tag>");
  }

  $value = ($scale * (0.0 + $value)) + $shift;
  return (sprintf ("<%s> %1.10e </%s>", $tag, $value, $tag));
}

sub handle_line_scale_shift
{
  my $line = shift;
  my $index = shift;

  if (($Scale != 1.0) || ($Shift != 0.0))
  {
    $line =~ s#<(min|max|last_ds|value|primary_value|secondary_value|v)>\s*([^\s<]+)\s*</[^>]+>#calculate_scale_shift ($2, $1, $Scale, $Shift)#eg;
  }

  post_line ($line, $index + 1);
}

#
# The _output_ handler
#
# This filter is unfinished!
#
{
my $fh;
sub set_output
{
	$fh = shift;
}

{
my $previous_values;
my $previous_differences;
my $pdp_per_row;
sub handle_line_peak_detect
{
  my $line = shift;
  my $index = shift;

  if (!$previous_values)
  {
    $previous_values = [];
    $previous_differences = [];
  }

  if ($line =~ m#</database>#i)
  {
    $previous_values = [];
    $previous_differences = [];
    print STDERR "==============================================================================\n";
  }
  elsif ($line =~ m#<pdp_per_row>\s*([1-9][0-9]*)\s*</pdp_per_row>#)
  {
    $pdp_per_row = int ($1);
    print STDERR "pdp_per_row = $pdp_per_row;\n";
  }
  elsif ($line =~ m#<row>#)
  {
    my @values = ();
    while ($line =~ m#<v>\s*([^\s>]+)\s*</v>#ig)
    {
      if ($1 eq 'NaN')
      {
	push (@values, undef);
      }
      else
      {
	push (@values, 0.0 + $1);
      }
    }

    for (my $i = 0; $i < @values; $i++)
    {
      if (!defined ($values[$i]))
      {
	$previous_values->[$i] = undef;
      }
      elsif (!defined ($previous_values->[$i]))
      {
	$previous_values->[$i] = $values[$i];
      }
      elsif (!defined ($previous_differences->[$i]))
      {
      	$previous_differences->[$i] = abs ($previous_values->[$i] - $values[$i]);
      }
      else
      {
      	my $divisor = ($previous_differences->[$i] < 1.0) ? 1.0 : $previous_differences->[$i];
	my $difference = abs ($previous_values->[$i] - $values[$i]);
	my $change = $pdp_per_row * $difference / $divisor;
	if (($divisor > 10.0) &&  ($change > 10e5))
	{
	  print STDERR "i = $i; average difference = " . $previous_differences->[$i]. "; current difference = " . $difference. "; change = $change;\n";
	}
	$previous_values->[$i] = $values[$i];
	$previous_differences->[$i] = (0.95 * $previous_differences->[$i]) + (0.05 * $difference);
      }
    }
  }

  post_line ($line, $index + 1);
}} # handle_line_peak_detect

sub handle_line_output
{
	my $line = shift;
	my $index = shift;

	if (!defined ($fh))
	{
		post_line ($line, $index + 1);
		return;
	}
	
	print $fh $line;
}} # handle_line_output

#
# Dispatching logic
#
{
my @handlers = ();
sub add_handler
{
	my $handler = shift;

	die unless (ref ($handler) eq 'CODE');
	push (@handlers, $handler);
} # add_handler

sub post_line
{
	my $line = shift;
	my $index = shift;

	if (0)
	{
		my $copy = $line;
		chomp ($copy);
		print "DEBUG: post_line ($copy, $index);\n";
	}

	if ($index > $#handlers)
	{
		return;
	}
	$handlers[$index]->($line, $index);
}} # post_line

sub handle_fh
{
  my $in_fh = shift;
  my $out_fh = shift;

  set_output ($out_fh);

  if (@$InDS)
  {
    add_handler (\&handle_line_dsmap);
  }

  if ($Step)
  {
    add_handler (\&handle_line_step);
  }

  if (($Scale != 1.0) || ($Shift != 0.0))
  {
    add_handler (\&handle_line_scale_shift);
  }

  #add_handler (\&handle_line_peak_detect);

  if (@$NewDSes)
  {
    add_handler (\&handle_line_add_ds);
  }

  if (@$NewRRAs)
  {
    add_handler (\&handle_line_add_rra);
  }

  add_handler (\&handle_line_output);

  while (my $line = <$in_fh>)
  {
    post_line ($line, 0);
  }
} # handle_fh

sub main
{
	my $in_file = shift;
	my $out_file = shift;

	my $in_fh;
	my $out_fh;

	my $in_needs_close = 1;
	my $out_needs_close = 1;

	if ($in_file =~ m/\.rrd$/i)
	{
		open ($in_fh,  '-|', 'rrdtool', 'dump', $in_file) or die ("open (rrdtool): $!");
	}
	elsif ($in_file eq '-')
	{
		$in_fh = \*STDIN;
		$in_needs_close = 0;
	}
	else
	{
		open ($in_fh, '<', $in_file) or die ("open ($in_file): $!");
	}

	if ($out_file =~ m/\.rrd$/i)
	{
		open ($out_fh, '|-', 'rrdtool', 'restore', '-', $out_file) or die ("open (rrdtool): $!");
	}
	elsif ($out_file eq '-')
	{
		$out_fh = \*STDOUT;
		$out_needs_close = 0;
	}
	else
	{
		open ($out_fh, '>', $out_file) or die ("open ($out_file): $!");
	}

	handle_fh ($in_fh, $out_fh);

	if ($in_needs_close)
	{
		close ($in_fh);
	}
	if ($out_needs_close)
	{
		close ($out_fh);
	}
} # main

=head1 LICENSE

This script is licensed under the GNU general public license, versionE<nbsp>2
(GPLv2).

=head1 AUTHOR

Florian octo Forster E<lt>octo at verplant.orgE<gt>

