1#
2# collectd - bindings/buildperl/Collectd/Unixsock.pm
3# Copyright (C) 2007,2008  Florian octo Forster
4#
5# Permission is hereby granted, free of charge, to any person obtaining a
6# copy of this software and associated documentation files (the "Software"),
7# to deal in the Software without restriction, including without limitation
8# the rights to use, copy, modify, merge, publish, distribute, sublicense,
9# and/or sell copies of the Software, and to permit persons to whom the
10# Software is furnished to do so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in
13# all copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21# DEALINGS IN THE SOFTWARE.
22#
23# Authors:
24#   Florian Forster <octo at collectd.org>
25#
26
27package Collectd::Unixsock;
28
29=head1 NAME
30
31Collectd::Unixsock - Abstraction layer for accessing the functionality by
32collectd's unixsock plugin.
33
34=head1 SYNOPSIS
35
36  use Collectd::Unixsock;
37
38  my $sock = Collectd::Unixsock->new ($path);
39
40  my $value = $sock->getval (%identifier);
41  $sock->putval (%identifier,
42                 time => time (),
43		 values => [123, 234, 345]);
44
45  $sock->destroy ();
46
47=head1 DESCRIPTION
48
49collectd's unixsock plugin allows external programs to access the values it has
50collected or received and to submit own values. This Perl-module is simply a
51little abstraction layer over this interface to make it even easier for
52programmers to interact with the daemon.
53
54=cut
55
56use strict;
57use warnings;
58
59use Carp qw(cluck confess carp croak);
60use POSIX;
61use IO::Socket::UNIX;
62use Scalar::Util qw( looks_like_number );
63
64our $Debug = 0;
65
66sub _debug
67{
68	print @_ if $Debug;
69}
70
71sub _create_socket
72{
73	my $path = shift;
74	my $sock = IO::Socket::UNIX->new (Type => SOCK_STREAM, Peer => $path);
75	if (!$sock)
76	{
77		cluck ("Cannot open UNIX-socket $path: $!");
78		return;
79	}
80	return ($sock);
81} # _create_socket
82
83=head1 VALUE IDENTIFIERS
84
85The values in the collectd are identified using a five-tuple (host, plugin,
86plugin-instance, type, type-instance) where only plugin instance and type
87instance may be undef. Many functions expect an I<%identifier> hash that has at
88least the members B<host>, B<plugin>, and B<type>, possibly completed by
89B<plugin_instance> and B<type_instance>.
90
91Usually you can pass this hash as follows:
92
93  $self->method (host => $host, plugin => $plugin, type => $type, %other_args);
94
95=cut
96
97sub _create_identifier
98{
99	my $args = shift;
100	my ($host, $plugin, $type);
101
102	if (!$args->{host} || !$args->{plugin} || !$args->{type})
103	{
104		cluck ("Need `host', `plugin' and `type'");
105		return;
106	}
107
108	$host = $args->{host};
109	$plugin = $args->{plugin};
110	$plugin .= '-' . $args->{plugin_instance} if defined $args->{plugin_instance};
111	$type = $args->{type};
112	$type .= '-' . $args->{type_instance} if defined $args->{type_instance};
113
114	return "$host/$plugin/$type";
115} # _create_identifier
116
117sub _parse_identifier
118{
119	my $string = shift;
120	my ($plugin_instance, $type_instance);
121
122	my ($host, $plugin, $type) = split /\//, $string;
123
124	($plugin, $plugin_instance) = split /-/, $plugin, 2;
125	($type, $type_instance) = split /-/, $type, 2;
126
127	my $ident =
128	{
129		host => $host,
130		plugin => $plugin,
131		type => $type
132	};
133	$ident->{plugin_instance} = $plugin_instance if defined $plugin_instance;
134	$ident->{type_instance} = $type_instance if defined $type_instance;
135
136	return $ident;
137} # _parse_identifier
138
139sub _escape_argument
140{
141	my $arg = shift;
142
143	return $arg if $arg =~ /^\w+$/;
144
145	$arg =~ s#\\#\\\\#g;
146	$arg =~ s#"#\\"#g;
147	return "\"$arg\"";
148}
149
150# Handle socket errors.
151sub _socket_error {
152	my ($self, $where) = @_;
153
154	# If the peer has reset the connection, try to reconnect,
155	# otherwise fail.
156	if ($! == EPIPE) {
157		_debug "^^ error on $where: $!; reconnecting\n";
158		$self->destroy;
159		$self->{sock} = _create_socket ($self->{path}) or return 1;
160		return;
161	} else {
162		carp ("error on $where: $!; aborting action\n");
163		$self->{error} = $!;
164		return 1;
165	}
166}
167
168# Send a command on a socket, including any required argument escaping.
169# Return a single line of result.
170sub _socket_command {
171	my ($self, $command, $args) = @_;
172
173	my $fh = $self->{sock} or confess ('object has no filehandle');
174
175	if($args) {
176		my $identifier = _create_identifier ($args) or return;
177		$command .= ' ' . _escape_argument ($identifier) . "\n";
178	} else {
179		$command .= "\n";
180	}
181	_debug "-> $command";
182	while (not $fh->print($command)) {
183		return if $self->_socket_error ('print');
184		$fh = $self->{sock};
185	}
186
187	my $response;
188	while (not defined ($response = $fh->getline)) {
189		return if $self->_socket_error ('getline');
190		$fh = $self->{sock};
191	}
192	chomp $response;
193	_debug "<- $response\n";
194	return $response;
195}
196
197# Read any remaining results from a socket and pass them to
198# a callback for caller-defined mangling.
199sub _socket_chat
200{
201	my ($self, $msg, $callback, $cbdata) = @_;
202	my ($nresults, $ret);
203	my $fh = $self->{sock} or confess ('object has no filehandle');
204
205	($nresults, $msg) = split / /, $msg, 2;
206	if ($nresults <= 0)
207	{
208		$self->{error} = $msg;
209		return;
210	}
211
212	for (1 .. $nresults)
213	{
214		my $entry;
215		while (not defined($entry = $fh->getline)) {
216			return if $self->_socket_error ('getline');
217			$fh = $self->{sock};
218		}
219		chomp $entry;
220		_debug "<- $entry\n";
221		$callback->($entry, $cbdata);
222	}
223	return $cbdata;
224}
225
226# Send a raw message on a socket.
227# Returns true upon success and false otherwise.
228sub _send_message
229{
230	my ($self, $msg) = @_;
231
232	my $fh = $self->{'sock'} or confess ('object has no filehandle');
233
234	$msg .= "\n" unless $msg =~/\n$/;
235
236	#1024 is default buffer size at unixsock.c us_handle_client()
237	warn "Collectd::Unixsock->_send_message(\$msg): message is too long!" if length($msg) > 1024;
238
239	_debug "-> $msg";
240	while (not $fh->print($msg)) {
241		return if $self->_socket_error ('print');
242		$fh = $self->{sock};
243	}
244
245	while (not defined ($msg = <$fh>)) {
246		return if $self->_socket_error ('readline');
247		$fh = $self->{sock};
248	}
249	chomp ($msg);
250	_debug "<- $msg\n";
251
252	my ($status, $error) = split / /, $msg, 2;
253	return 1 if $status == 0;
254
255	$self->{error} = $error;
256	return;
257}
258
259=head1 PUBLIC METHODS
260
261=over 4
262
263=item I<$self> = Collectd::Unixsock->B<new> ([I<$path>]);
264
265Creates a new connection to the daemon. The optional I<$path> argument gives
266the path to the UNIX socket of the C<unixsock plugin> and defaults to
267F</var/run/collectd-unixsock>. Returns the newly created object on success and
268false on error.
269
270=cut
271
272sub new
273{
274	my $class = shift;
275	my $path = shift || '/var/run/collectd-unixsock';
276	my $sock = _create_socket ($path) or return;
277	return bless
278		{
279			path => $path,
280			sock => $sock,
281			error => 'No error'
282		}, $class;
283} # new
284
285=item I<$res> = I<$self>-E<gt>B<getval> (I<%identifier>);
286
287Requests a value-list from the daemon. On success a hash-ref is returned with
288the name of each data-source as the key and the according value as, well, the
289value. On error false is returned.
290
291=cut
292
293sub getval # {{{
294{
295	my $self = shift;
296	my %args = @_;
297	my $ret = {};
298
299	my $msg = $self->_socket_command('GETVAL', \%args) or return;
300	$self->_socket_chat($msg, sub {
301			local $_ = shift;
302			my $ret = shift;
303			/^(\w+)=NaN$/ and $ret->{$1} = undef, return;
304			/^(\w+)=(.*)$/ and looks_like_number($2) and $ret->{$1} = 0 + $2, return;
305		}, $ret
306	);
307	return $ret;
308} # }}} sub getval
309
310=item I<$res> = I<$self>-E<gt>B<getthreshold> (I<%identifier>);
311
312Requests a threshold from the daemon. On success a hash-ref is returned with
313the threshold data. On error false is returned.
314
315=cut
316
317sub getthreshold # {{{
318{
319	my $self = shift;
320	my %args = @_;
321	my $ret = {};
322
323	my $msg = $self->_socket_command('GETTHRESHOLD', \%args) or return;
324	$self->_socket_chat($msg, sub {
325			local $_ = shift;
326			my $ret = shift;
327			my ( $key, $val );
328			( $key, $val ) = /^\s*([^:]+):\s*(.*)/ and do {
329				  $key =~ s/\s*$//;
330				  $ret->{$key} = $val;
331			};
332		}, $ret
333	);
334	return $ret;
335} # }}} sub getthreshold
336
337=item I<$self>-E<gt>B<putval> (I<%identifier>, B<time> =E<gt> I<$time>, B<values> =E<gt> [...]);
338
339Submits a value-list to the daemon. If the B<time> argument is omitted
340C<time()> is used. The required argument B<values> is a reference to an array
341of values that is to be submitted. The number of values must match the number
342of values expected for the given B<type> (see L<VALUE IDENTIFIERS>), though
343this is checked by the daemon, not the Perl module. Also, gauge data-sources
344(e.E<nbsp>g. system-load) may be C<undef>. Returns true upon success and false
345otherwise.
346
347=cut
348
349sub putval
350{
351	my $self = shift;
352	my %args = @_;
353
354	my ($status, $msg, $identifier, $values);
355
356	my $interval = defined $args{interval} ?
357	' interval=' . _escape_argument ($args{interval}) : '';
358
359	$identifier = _create_identifier (\%args) or return;
360	if (!$args{values})
361	{
362		cluck ("Need argument `values'");
363		return;
364	}
365
366	if (ref ($args{values}))
367	{
368		my $time;
369
370		if ("ARRAY" ne ref ($args{values}))
371		{
372			cluck ("Invalid `values' argument (expected an array ref)");
373			return;
374		}
375
376		if (! scalar @{$args{values}})
377		{
378			cluck ("Empty `values' array");
379			return;
380		}
381
382		$time = $args{time} || time;
383		$values = join (':', $time, map { defined $_ ? $_ : 'U' } @{$args{values}});
384	}
385	else
386	{
387		$values = $args{values};
388	}
389
390	$msg = 'PUTVAL '
391	. _escape_argument ($identifier)
392	. $interval
393	. ' ' . _escape_argument ($values) . "\n";
394
395	return $self->_send_message($msg);
396} # putval
397
398=item I<$res> = I<$self>-E<gt>B<listval_filter> ( C<%identifier> )
399
400Queries a list of values from the daemon while restricting the results to
401certain hosts, plugins etc. The argument may be anything that passes for an
402identifier (cf. L<VALUE IDENTIFIERS>), although all fields are optional.
403The returned data is in the same format as from C<listval>.
404
405=cut
406
407sub listval_filter
408{
409	my $self = shift;
410	my %args = @_;
411	my @ret;
412	my $nresults;
413	my $fh = $self->{sock} or confess;
414
415	my $pattern =
416	(exists $args{host}              ? "$args{host}"             : '[^/]+') .
417	(exists $args{plugin}            ? "/$args{plugin}"          : '/[^/-]+') .
418	(exists $args{plugin_instance}   ? "-$args{plugin_instance}" : '(?:-[^/]+)?') .
419	(exists $args{type}              ? "/$args{type}"            : '/[^/-]+') .
420	(exists $args{type_instance}     ? "-$args{type_instance}"   : '(?:-[^/]+)?');
421	$pattern = qr/^\d+(?:\.\d+)? $pattern$/;
422
423	my $msg = $self->_socket_command('LISTVAL') or return;
424	($nresults, $msg) = split / /, $msg, 2;
425
426	# This could use _socket_chat() but doesn't for speed reasons
427	if ($nresults < 0)
428	{
429		$self->{error} = $msg;
430		return;
431	}
432
433	for (1 .. $nresults)
434	{
435		while (not defined ($msg = <$fh>)) {
436			return if $self->_socket_error ('readline');
437			$fh = $self->{sock};
438		}
439		chomp $msg;
440		_debug "<- $msg\n";
441		next unless $msg =~ $pattern;
442		my ($time, $ident) = split / /, $msg, 2;
443
444		$ident = _parse_identifier ($ident);
445		$ident->{time} = 0+$time;
446
447		push (@ret, $ident);
448	} # for (i = 0 .. $nresults)
449
450	return @ret;
451} # listval_filter
452
453=item I<$res> = I<$self>-E<gt>B<listval> ()
454
455Queries a list of values from the daemon. The list is returned as an array of
456hash references, where each hash reference is a valid identifier. The C<time>
457member of each hash holds the epoch value of the last update of that value.
458
459=cut
460
461sub listval
462{
463	my $self = shift;
464	my $nresults;
465	my @ret;
466	my $fh = $self->{sock} or confess;
467
468	my $msg = $self->_socket_command('LISTVAL') or return;
469	($nresults, $msg) = split / /, $msg, 2;
470
471	# This could use _socket_chat() but doesn't for speed reasons
472	if ($nresults < 0)
473	{
474		$self->{error} = $msg;
475		return;
476	}
477
478	for (1 .. $nresults)
479	{
480		while (not defined ($msg = <$fh>)) {
481			return if $self->_socket_error ('readline');
482			$fh = $self->{sock};
483		}
484		chomp $msg;
485		_debug "<- $msg\n";
486
487		my ($time, $ident) = split / /, $msg, 2;
488
489		$ident = _parse_identifier ($ident);
490		$ident->{time} = 0+$time;
491
492		push (@ret, $ident);
493	} # for (i = 0 .. $nresults)
494
495	return @ret;
496} # listval
497
498=item I<$res> = I<$self>-E<gt>B<putnotif> (B<severity> =E<gt> I<$severity>, B<message> =E<gt> I<$message>, ...);
499
500Submits a notification to the daemon.
501
502Valid options are:
503
504=over 4
505
506=item B<severity>
507
508Sets the severity of the notification. The value must be one of the following
509strings: C<failure>, C<warning>, or C<okay>. Case does not matter. This option
510is mandatory.
511
512=item B<message>
513
514Sets the message of the notification. This option is mandatory.
515
516=item B<time>
517
518Sets the time. If omitted, C<time()> is used.
519
520=item I<Value identifier>
521
522All the other fields of the value identifiers, B<host>, B<plugin>,
523B<plugin_instance>, B<type>, and B<type_instance>, are optional. When given,
524the notification is associated with the performance data of that identifier.
525For more details, please see L<collectd-unixsock(5)>.
526
527=back
528
529=cut
530
531sub putnotif
532{
533	my $self = shift;
534	my %args = @_;
535
536	my $status;
537
538	my $msg; # message sent to the socket
539
540	for my $arg (qw( message severity ))
541	{
542		cluck ("Need argument `$arg'"), return unless $args{$arg};
543	}
544	$args{severity} = lc $args{severity};
545	if (($args{severity} ne 'failure')
546		&& ($args{severity} ne 'warning')
547		&& ($args{severity} ne 'okay'))
548	{
549		cluck ("Invalid `severity: " . $args{severity});
550		return;
551	}
552
553	$args{time} ||= time;
554
555	$msg = 'PUTNOTIF '
556	. join (' ', map { $_ . '=' . _escape_argument ($args{$_}) } keys %args)
557	. "\n";
558
559	return $self->_send_message($msg);
560} # putnotif
561
562=item I<$self>-E<gt>B<flush> (B<timeout> =E<gt> I<$timeout>, B<plugins> =E<gt> [...], B<identifier>  =E<gt> [...]);
563
564Flush cached data.
565
566Valid options are:
567
568=over 4
569
570=item B<timeout>
571
572If this option is specified, only data older than I<$timeout> seconds is
573flushed.
574
575=item B<plugins>
576
577If this option is specified, only the selected plugins will be flushed. The
578argument is a reference to an array of strings.
579
580=item B<identifier>
581
582If this option is specified, only the given identifier(s) will be flushed. The
583argument is a reference to an array of identifiers. Identifiers, in this case,
584are hash references and have the members as outlined in L<VALUE IDENTIFIERS>.
585
586=back
587
588=cut
589
590sub flush
591{
592	my $self  = shift;
593	my %args = @_;
594
595	my $msg = "FLUSH";
596
597	$msg .= " timeout=$args{timeout}" if defined $args{timeout};
598
599	if ($args{plugins})
600	{
601		foreach my $plugin (@{$args{plugins}})
602		{
603			$msg .= " plugin=" . $plugin;
604		}
605	}
606
607	if ($args{identifier})
608	{
609		my $pre = $msg;
610		for my $identifier (@{$args{identifier}})
611		{
612			my $ident_str;
613
614			if (ref ($identifier) ne 'HASH')
615			{
616				cluck ("The argument of the `identifier' "
617					. "option must be an array of hashrefs.");
618				return;
619			}
620
621			$ident_str = _create_identifier ($identifier) or return;
622			$ident_str = ' identifier=' . _escape_argument ($ident_str);
623
624			if (length($msg)+length($ident_str) >= 1023) { #1024 - 1 byte for \n
625				$self->_send_message($msg) or return;
626				$msg = $pre;
627			}
628
629			$msg .= $ident_str;
630		}
631	}
632
633	return $self->_send_message($msg);
634}
635
636sub error
637{
638	return shift->{error};
639}
640
641=item I<$self>-E<gt>destroy ();
642
643Closes the socket before the object is destroyed. This function is also
644automatically called then the object goes out of scope.
645
646=back
647
648=cut
649
650sub destroy
651{
652	my $self = shift;
653	if ($self->{sock})
654	{
655		close $self->{sock};
656		delete $self->{sock};
657	}
658}
659
660sub DESTROY
661{
662	my $self = shift;
663	$self->destroy ();
664}
665
666=head1 SEE ALSO
667
668L<collectd(1)>,
669L<collectd.conf(5)>,
670L<collectd-unixsock(5)>
671
672=head1 AUTHOR
673
674Florian octo Forster E<lt>octo@collectd.orgE<gt>
675
676=cut
6771;
678# vim: set fdm=marker noexpandtab:
679