1<?php
2
3/**
4 * Observium
5 *
6 *   This file is part of Observium.
7 *
8 * @package    observium
9 * @subpackage influx
10 * @author     Bill Fenner <fenner@gmail.com>
11 * @copyright  (C) 2017 Observium Limited
12 *
13 */
14
15/* This is very much a feature in testing. Please monitor servers closely after enabling.
16
17/**
18 * Escapes a string for the InfluxDB Line Protocol.
19 * https://docs.influxdata.com/influxdb/v1.2/write_protocols/line_protocol_tutorial/
20 *
21 * @param string str
22**/
23function influxdb_escape($str)
24{
25  $str = str_replace( ',', '\,', $str );
26  $str = str_replace( '=', '\=', $str );
27  $str = str_replace( ' ', '\ ', $str );
28  return $str;
29}
30
31/**
32 * Quotes an array for the InfluxDB Line Protocol.
33 * https://docs.influxdata.com/influxdb/v1.2/write_protocols/line_protocol_tutorial/
34 *
35 * @param array data
36**/
37function influxdb_format_data($data)
38{
39  $values = array();
40  foreach ($data as $key=>$value) {
41    $values[] = influxdb_escape($key) . "=" . influxdb_escape($value);
42  }
43  return implode( ",", $values );
44}
45
46/**
47 * Posts an update to InfluxDB.
48 *
49 * @param string database
50 * @param array  tags
51 * @param array  data
52**/
53function influxdb_update_data($database, $tags, $data)
54{
55  global $config;
56
57  if ( !$config['influxdb']['enabled'] ) {
58    return;
59  }
60
61  $influx_update = $database;
62  if ( $tags ) {
63    $influx_update = $influx_update . "," . influxdb_format_data( $tags );
64  }
65  $influx_update = $influx_update . " " . influxdb_format_data( $data );
66
67  // This code posts each update to influx individually.
68  // Influx can accept multiple updates in one post, but
69  // is very equivocal about how many -- the documentation
70  // says you "may" have to split up your request if you
71  // have more than 5,000 updates.  We should be able to
72  // build up a queue of updates, and flush them to the
73  // server if the queue gets too long, or at the end of the
74  // poll, but until it becomes a real problem, we post the
75  // udpates one by one.
76  if ( $config['influxdb']['debug'] ) {
77    $f = fopen( '/tmp/influx-updates.txt', 'a' );
78    fwrite( $f, $influx_update . "\n" );
79    fclose( $f );
80    return;
81  }
82  $url = 'http://' . $config['influxdb']['server'] . '/write?db=' . $config['influxdb']['db'];
83  $c = curl_init();
84  curl_setopt_array( $c, array(
85    CURLOPT_URL => $url,
86    CURLOPT_CUSTOMREQUEST => 'POST',
87    CURLOPT_POSTFIELDS => $influx_update,
88    ) );
89  $response = curl_exec( $c );
90  $httpcode = curl_getinfo($c, CURLINFO_HTTP_CODE);
91  $error = 'cURL error num: '.curl_errno($c).' => '.curl_error($c);
92  print_debug('INFLUXDB POST: ' . $httpcode . ' ' . $error);
93  if (OBS_DEBUG > 1)
94  {
95    print_message('INFLUXDB REQUEST ' . print_r(curl_getinfo($c), TRUE), 'console');
96    print_message('INFLUXDB RESPONSE ' . print_r($response, TRUE), 'console');
97  }
98  curl_close( $c );
99}
100
101/**
102 * Pushes a set of data points that we want to add to an rrd file
103 * to influxdb too.
104 *
105 * @param array  device
106 * @param string filename
107 * @param array  ds
108 * @param array  definition
109**/
110function influxdb_update($device, $filename, $ds, $definition = NULL, $index = NULL)
111{
112  global $config;
113
114  $start = microtime(TRUE);
115
116  // This happens when called from obsolete rrdtool_update
117  // This code can go away when rrdtool_update is deleted.
118  if (!is_array($ds))
119  {
120    $tmpds = explode( ':', $ds );
121    // get rid of the timestamp (it's always "N")
122    array_shift( $tmpds );
123    $ds = array();
124    if ( count( $tmpds) == 1 ) {
125      $ds[ 'value' ] = $tmpds[ 0 ];
126    } else {
127      foreach ($tmpds as $idx => $value) {
128        $ds[ 'value' . (string)($idx + 1) ] = $value;
129      }
130    }
131  }
132  $influx_tags = array();
133  if ( !is_null( $device ) ) {
134    $influx_tags[ 'host' ] = $device[ 'hostname' ];
135  }
136  if ( !is_null( $index ) ) {
137    // If we have an index, then we have a definition too.
138    $filename = $definition[ 'file' ];
139    $filename = str_replace( '-%index%', '', $filename );
140    $influx_tags[ 'index' ] = $index;
141  }
142  $filename = str_replace( '.rrd', '', $filename );
143  $filename = influxdb_escape( $filename );
144
145  // Disable debugging for now
146  //print_vars(array($filename, $ds, $definition, $index, $influx_tags, $ds));
147
148
149  influxdb_update_data($filename, $influx_tags, $ds);
150
151  $runtime = microtime(TRUE) - $start;
152  $GLOBALS['influxdb_stats']['time'] += $runtime;
153  $GLOBALS['influxdb_stats']['count']++;
154}
155
156// EOF
157