1// This is a utility application that polls /stats for all the producers
2// of the specified topic/channel and displays aggregate stats
3
4package main
5
6import (
7	"errors"
8	"flag"
9	"fmt"
10	"log"
11	"os"
12	"os/signal"
13	"strconv"
14	"strings"
15	"syscall"
16	"time"
17
18	"github.com/nsqio/nsq/internal/app"
19	"github.com/nsqio/nsq/internal/clusterinfo"
20	"github.com/nsqio/nsq/internal/http_api"
21	"github.com/nsqio/nsq/internal/version"
22)
23
24var (
25	showVersion        = flag.Bool("version", false, "print version")
26	topic              = flag.String("topic", "", "NSQ topic")
27	channel            = flag.String("channel", "", "NSQ channel")
28	interval           = flag.Duration("interval", 2*time.Second, "duration of time between polling/printing output")
29	httpConnectTimeout = flag.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect")
30	httpRequestTimeout = flag.Duration("http-client-request-timeout", 5*time.Second, "timeout for HTTP request")
31	countNum           = numValue{}
32	nsqdHTTPAddrs      = app.StringArray{}
33	lookupdHTTPAddrs   = app.StringArray{}
34)
35
36type numValue struct {
37	isSet bool
38	value int
39}
40
41func (nv *numValue) String() string { return "N" }
42
43func (nv *numValue) Set(s string) error {
44	value, err := strconv.ParseInt(s, 10, 32)
45	if err != nil {
46		return err
47	}
48	nv.value = int(value)
49	nv.isSet = true
50	return nil
51}
52
53func init() {
54	flag.Var(&nsqdHTTPAddrs, "nsqd-http-address", "nsqd HTTP address (may be given multiple times)")
55	flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
56	flag.Var(&countNum, "count", "number of reports")
57}
58
59func statLoop(interval time.Duration, connectTimeout time.Duration, requestTimeout time.Duration,
60	topic string, channel string, nsqdTCPAddrs []string, lookupdHTTPAddrs []string) {
61	ci := clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout))
62	var o *clusterinfo.ChannelStats
63	for i := 0; !countNum.isSet || countNum.value >= i; i++ {
64		var producers clusterinfo.Producers
65		var err error
66
67		if len(lookupdHTTPAddrs) != 0 {
68			producers, err = ci.GetLookupdTopicProducers(topic, lookupdHTTPAddrs)
69		} else {
70			producers, err = ci.GetNSQDTopicProducers(topic, nsqdHTTPAddrs)
71		}
72		if err != nil {
73			log.Fatalf("ERROR: failed to get topic producers - %s", err)
74		}
75
76		_, channelStats, err := ci.GetNSQDStats(producers, topic, channel, false)
77		if err != nil {
78			log.Fatalf("ERROR: failed to get nsqd stats - %s", err)
79		}
80
81		c, ok := channelStats[channel]
82		if !ok {
83			log.Fatalf("ERROR: failed to find channel(%s) in stats metadata for topic(%s)", channel, topic)
84		}
85
86		if i%25 == 0 {
87			fmt.Printf("%s+%s+%s\n",
88				"------rate------",
89				"----------------depth----------------",
90				"--------------metadata---------------")
91			fmt.Printf("%7s %7s | %7s %7s %7s %5s %5s | %7s %7s %12s %7s\n",
92				"ingress", "egress",
93				"total", "mem", "disk", "inflt",
94				"def", "req", "t-o", "msgs", "clients")
95		}
96
97		if o == nil {
98			o = c
99			time.Sleep(interval)
100			continue
101		}
102
103		// TODO: paused
104		fmt.Printf("%7d %7d | %7d %7d %7d %5d %5d | %7d %7d %12d %7d\n",
105			int64(float64(c.MessageCount-o.MessageCount)/interval.Seconds()),
106			int64(float64(c.MessageCount-o.MessageCount-(c.Depth-o.Depth))/interval.Seconds()),
107			c.Depth,
108			c.MemoryDepth,
109			c.BackendDepth,
110			c.InFlightCount,
111			c.DeferredCount,
112			c.RequeueCount,
113			c.TimeoutCount,
114			c.MessageCount,
115			c.ClientCount)
116
117		o = c
118		time.Sleep(interval)
119	}
120	os.Exit(0)
121}
122
123func checkAddrs(addrs []string) error {
124	for _, a := range addrs {
125		if strings.HasPrefix(a, "http") {
126			return errors.New("address should not contain scheme")
127		}
128	}
129	return nil
130}
131
132func main() {
133	flag.Parse()
134
135	if *showVersion {
136		fmt.Printf("nsq_stat v%s\n", version.Binary)
137		return
138	}
139
140	if *topic == "" || *channel == "" {
141		log.Fatal("--topic and --channel are required")
142	}
143
144	intvl := *interval
145	if int64(intvl) <= 0 {
146		log.Fatal("--interval should be positive")
147	}
148
149	connectTimeout := *httpConnectTimeout
150	if int64(connectTimeout) <= 0 {
151		log.Fatal("--http-client-connect-timeout should be positive")
152	}
153
154	requestTimeout := *httpRequestTimeout
155	if int64(requestTimeout) <= 0 {
156		log.Fatal("--http-client-request-timeout should be positive")
157	}
158
159	if countNum.isSet && countNum.value <= 0 {
160		log.Fatal("--count should be positive")
161	}
162
163	if len(nsqdHTTPAddrs) == 0 && len(lookupdHTTPAddrs) == 0 {
164		log.Fatal("--nsqd-http-address or --lookupd-http-address required")
165	}
166	if len(nsqdHTTPAddrs) > 0 && len(lookupdHTTPAddrs) > 0 {
167		log.Fatal("use --nsqd-http-address or --lookupd-http-address not both")
168	}
169
170	if err := checkAddrs(nsqdHTTPAddrs); err != nil {
171		log.Fatalf("--nsqd-http-address error - %s", err)
172	}
173
174	if err := checkAddrs(lookupdHTTPAddrs); err != nil {
175		log.Fatalf("--lookupd-http-address error - %s", err)
176	}
177
178	termChan := make(chan os.Signal, 1)
179	signal.Notify(termChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
180
181	go statLoop(intvl, connectTimeout, requestTimeout, *topic, *channel, nsqdHTTPAddrs, lookupdHTTPAddrs)
182
183	<-termChan
184}
185