1// This is a utility application that polls /stats for all the producers 2// of the specified topic/channel and displays aggregate stats 3 4package main 5 6import ( 7 "errors" 8 "flag" 9 "fmt" 10 "log" 11 "os" 12 "os/signal" 13 "strconv" 14 "strings" 15 "syscall" 16 "time" 17 18 "github.com/nsqio/nsq/internal/app" 19 "github.com/nsqio/nsq/internal/clusterinfo" 20 "github.com/nsqio/nsq/internal/http_api" 21 "github.com/nsqio/nsq/internal/version" 22) 23 24var ( 25 showVersion = flag.Bool("version", false, "print version") 26 topic = flag.String("topic", "", "NSQ topic") 27 channel = flag.String("channel", "", "NSQ channel") 28 interval = flag.Duration("interval", 2*time.Second, "duration of time between polling/printing output") 29 httpConnectTimeout = flag.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect") 30 httpRequestTimeout = flag.Duration("http-client-request-timeout", 5*time.Second, "timeout for HTTP request") 31 countNum = numValue{} 32 nsqdHTTPAddrs = app.StringArray{} 33 lookupdHTTPAddrs = app.StringArray{} 34) 35 36type numValue struct { 37 isSet bool 38 value int 39} 40 41func (nv *numValue) String() string { return "N" } 42 43func (nv *numValue) Set(s string) error { 44 value, err := strconv.ParseInt(s, 10, 32) 45 if err != nil { 46 return err 47 } 48 nv.value = int(value) 49 nv.isSet = true 50 return nil 51} 52 53func init() { 54 flag.Var(&nsqdHTTPAddrs, "nsqd-http-address", "nsqd HTTP address (may be given multiple times)") 55 flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)") 56 flag.Var(&countNum, "count", "number of reports") 57} 58 59func statLoop(interval time.Duration, connectTimeout time.Duration, requestTimeout time.Duration, 60 topic string, channel string, nsqdTCPAddrs []string, lookupdHTTPAddrs []string) { 61 ci := clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout)) 62 var o *clusterinfo.ChannelStats 63 for i := 0; !countNum.isSet || countNum.value >= i; i++ { 64 var producers clusterinfo.Producers 65 var err error 66 67 if len(lookupdHTTPAddrs) != 0 { 68 producers, err = ci.GetLookupdTopicProducers(topic, lookupdHTTPAddrs) 69 } else { 70 producers, err = ci.GetNSQDTopicProducers(topic, nsqdHTTPAddrs) 71 } 72 if err != nil { 73 log.Fatalf("ERROR: failed to get topic producers - %s", err) 74 } 75 76 _, channelStats, err := ci.GetNSQDStats(producers, topic, channel, false) 77 if err != nil { 78 log.Fatalf("ERROR: failed to get nsqd stats - %s", err) 79 } 80 81 c, ok := channelStats[channel] 82 if !ok { 83 log.Fatalf("ERROR: failed to find channel(%s) in stats metadata for topic(%s)", channel, topic) 84 } 85 86 if i%25 == 0 { 87 fmt.Printf("%s+%s+%s\n", 88 "------rate------", 89 "----------------depth----------------", 90 "--------------metadata---------------") 91 fmt.Printf("%7s %7s | %7s %7s %7s %5s %5s | %7s %7s %12s %7s\n", 92 "ingress", "egress", 93 "total", "mem", "disk", "inflt", 94 "def", "req", "t-o", "msgs", "clients") 95 } 96 97 if o == nil { 98 o = c 99 time.Sleep(interval) 100 continue 101 } 102 103 // TODO: paused 104 fmt.Printf("%7d %7d | %7d %7d %7d %5d %5d | %7d %7d %12d %7d\n", 105 int64(float64(c.MessageCount-o.MessageCount)/interval.Seconds()), 106 int64(float64(c.MessageCount-o.MessageCount-(c.Depth-o.Depth))/interval.Seconds()), 107 c.Depth, 108 c.MemoryDepth, 109 c.BackendDepth, 110 c.InFlightCount, 111 c.DeferredCount, 112 c.RequeueCount, 113 c.TimeoutCount, 114 c.MessageCount, 115 c.ClientCount) 116 117 o = c 118 time.Sleep(interval) 119 } 120 os.Exit(0) 121} 122 123func checkAddrs(addrs []string) error { 124 for _, a := range addrs { 125 if strings.HasPrefix(a, "http") { 126 return errors.New("address should not contain scheme") 127 } 128 } 129 return nil 130} 131 132func main() { 133 flag.Parse() 134 135 if *showVersion { 136 fmt.Printf("nsq_stat v%s\n", version.Binary) 137 return 138 } 139 140 if *topic == "" || *channel == "" { 141 log.Fatal("--topic and --channel are required") 142 } 143 144 intvl := *interval 145 if int64(intvl) <= 0 { 146 log.Fatal("--interval should be positive") 147 } 148 149 connectTimeout := *httpConnectTimeout 150 if int64(connectTimeout) <= 0 { 151 log.Fatal("--http-client-connect-timeout should be positive") 152 } 153 154 requestTimeout := *httpRequestTimeout 155 if int64(requestTimeout) <= 0 { 156 log.Fatal("--http-client-request-timeout should be positive") 157 } 158 159 if countNum.isSet && countNum.value <= 0 { 160 log.Fatal("--count should be positive") 161 } 162 163 if len(nsqdHTTPAddrs) == 0 && len(lookupdHTTPAddrs) == 0 { 164 log.Fatal("--nsqd-http-address or --lookupd-http-address required") 165 } 166 if len(nsqdHTTPAddrs) > 0 && len(lookupdHTTPAddrs) > 0 { 167 log.Fatal("use --nsqd-http-address or --lookupd-http-address not both") 168 } 169 170 if err := checkAddrs(nsqdHTTPAddrs); err != nil { 171 log.Fatalf("--nsqd-http-address error - %s", err) 172 } 173 174 if err := checkAddrs(lookupdHTTPAddrs); err != nil { 175 log.Fatalf("--lookupd-http-address error - %s", err) 176 } 177 178 termChan := make(chan os.Signal, 1) 179 signal.Notify(termChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) 180 181 go statLoop(intvl, connectTimeout, requestTimeout, *topic, *channel, nsqdHTTPAddrs, lookupdHTTPAddrs) 182 183 <-termChan 184} 185