1package stats
2
3import (
4	"encoding/json"
5	"fmt"
6	"net"
7	"strconv"
8	"time"
9
10	"zabbix.com/pkg/conf"
11	"zabbix.com/pkg/plugin"
12	"zabbix.com/pkg/zbxcomms"
13)
14
15type Options struct {
16	Timeout  int    `conf:"optional,range=1:30"`
17	Capacity int    `conf:"optional,range=1:100"`
18	SourceIP string `conf:"optional"`
19}
20
21// Plugin -
22type Plugin struct {
23	plugin.Base
24	localAddr net.Addr
25	options   Options
26}
27
28type queue struct {
29	From string `json:"from,omitempty"`
30	To   string `json:"to,omitempty"`
31}
32
33type message struct {
34	Request string `json:"request"`
35	Type    string `json:"type,omitempty"`
36	Params  *queue `json:"params,omitempty"`
37}
38
39type response struct {
40	Response string `json:"response"`
41	Info     string `json:"info,omitempty"`
42}
43
44const defaultServerPort = 10051
45
46var impl Plugin
47
48func (p *Plugin) getRemoteZabbixStats(addr string, req []byte) ([]byte, error) {
49	var parse response
50
51	resp, err := zbxcomms.Exchange(addr, &p.localAddr, time.Duration(p.options.Timeout)*time.Second, req)
52
53	if err != nil {
54		return nil, fmt.Errorf("Cannot obtain internal statistics: %s", err)
55	}
56
57	if len(resp) <= 0 {
58		return nil, fmt.Errorf("Cannot obtain internal statistics: received empty response.")
59	}
60
61	err = json.Unmarshal(resp, &parse)
62
63	if err != nil {
64		return nil, fmt.Errorf("Value should be a JSON object.")
65	}
66
67	if parse.Response != "success" {
68		if len(parse.Info) != 0 {
69			return nil, fmt.Errorf("Cannot obtain internal statistics: %s", parse.Info)
70		}
71
72		return nil, fmt.Errorf("Cannot find tag: info")
73	}
74
75	return resp, nil
76}
77
78// Export -
79func (p *Plugin) Export(key string, params []string, ctx plugin.ContextProvider) (interface{}, error) {
80	var addr string
81	var m message
82	var q queue
83
84	if len(params) > 5 {
85		return nil, fmt.Errorf("Too many parameters.")
86	}
87
88	if len(params) < 1 || params[0] == "" {
89		addr = fmt.Sprintf("127.0.0.1:%d", defaultServerPort)
90	} else {
91		addr = params[0]
92		if len(params) > 1 && params[1] != "" {
93			port, err := strconv.ParseUint(params[1], 10, 16)
94
95			if err != nil {
96				return nil, fmt.Errorf("Invalid second parameter.")
97			}
98
99			addr = fmt.Sprintf("%s:%d", addr, port)
100		} else {
101			addr = fmt.Sprintf("%s:%d", addr, defaultServerPort)
102		}
103	}
104
105	if len(params) > 2 {
106		if params[2] != "queue" {
107			return nil, fmt.Errorf("Invalid third parameter.")
108		}
109
110		if len(params) > 3 {
111			if len(params) > 4 {
112				q.To = params[4]
113			}
114
115			q.From = params[3]
116		}
117
118		m.Params = &q
119		m.Type = "queue"
120	}
121
122	m.Request = "zabbix.stats"
123	req, err := json.Marshal(m)
124
125	if err != nil {
126		return nil, fmt.Errorf("Cannot obtain internal statistics: %s", err)
127	}
128
129	resp, err := p.getRemoteZabbixStats(addr, req)
130
131	if err != nil {
132		return nil, err
133	}
134
135	str := string(resp)
136
137	return str, nil
138}
139
140func (p *Plugin) Configure(global *plugin.GlobalOptions, options interface{}) {
141	if err := conf.Unmarshal(options, &p.options); err != nil {
142		p.Warningf("cannot unmarshal configuration options: %s", err)
143	}
144	if p.options.Timeout == 0 {
145		p.options.Timeout = global.Timeout
146	}
147	if p.options.SourceIP == "" {
148		p.options.SourceIP = global.SourceIP
149	}
150}
151
152func (p *Plugin) Validate(options interface{}) error {
153	var o Options
154	return conf.Unmarshal(options, &o)
155}
156
157func init() {
158	plugin.RegisterMetrics(&impl, "ZabbixStats", "zabbix.stats", "Return a set of Zabbix server or proxy internal "+
159		"metrics or return number of monitored items in the queue which are delayed on Zabbix server or proxy.")
160}
161