1package clusterinfo
2
3import (
4	"encoding/json"
5	"net"
6	"sort"
7	"strconv"
8	"time"
9
10	"github.com/blang/semver"
11	"github.com/nsqio/nsq/internal/quantile"
12)
13
14type ProducerTopic struct {
15	Topic      string `json:"topic"`
16	Tombstoned bool   `json:"tombstoned"`
17}
18
19type ProducerTopics []ProducerTopic
20
21func (pt ProducerTopics) Len() int           { return len(pt) }
22func (pt ProducerTopics) Swap(i, j int)      { pt[i], pt[j] = pt[j], pt[i] }
23func (pt ProducerTopics) Less(i, j int) bool { return pt[i].Topic < pt[j].Topic }
24
25type Producer struct {
26	RemoteAddresses  []string       `json:"remote_addresses"`
27	RemoteAddress    string         `json:"remote_address"`
28	Hostname         string         `json:"hostname"`
29	BroadcastAddress string         `json:"broadcast_address"`
30	TCPPort          int            `json:"tcp_port"`
31	HTTPPort         int            `json:"http_port"`
32	Version          string         `json:"version"`
33	VersionObj       semver.Version `json:"-"`
34	Topics           ProducerTopics `json:"topics"`
35	OutOfDate        bool           `json:"out_of_date"`
36}
37
38// UnmarshalJSON implements json.Unmarshaler and postprocesses of ProducerTopics and VersionObj
39func (p *Producer) UnmarshalJSON(b []byte) error {
40	var r struct {
41		RemoteAddress    string   `json:"remote_address"`
42		Hostname         string   `json:"hostname"`
43		BroadcastAddress string   `json:"broadcast_address"`
44		TCPPort          int      `json:"tcp_port"`
45		HTTPPort         int      `json:"http_port"`
46		Version          string   `json:"version"`
47		Topics           []string `json:"topics"`
48		Tombstoned       []bool   `json:"tombstones"`
49	}
50	if err := json.Unmarshal(b, &r); err != nil {
51		return err
52	}
53	*p = Producer{
54		RemoteAddress:    r.RemoteAddress,
55		Hostname:         r.Hostname,
56		BroadcastAddress: r.BroadcastAddress,
57		TCPPort:          r.TCPPort,
58		HTTPPort:         r.HTTPPort,
59		Version:          r.Version,
60	}
61	for i, t := range r.Topics {
62		p.Topics = append(p.Topics, ProducerTopic{Topic: t, Tombstoned: r.Tombstoned[i]})
63	}
64	version, err := semver.Parse(p.Version)
65	if err != nil {
66		version, _ = semver.Parse("0.0.0")
67	}
68	p.VersionObj = version
69	return nil
70}
71
72func (p *Producer) Address() string {
73	if p.RemoteAddress == "" {
74		return "N/A"
75	}
76	return p.RemoteAddress
77}
78
79func (p *Producer) HTTPAddress() string {
80	return net.JoinHostPort(p.BroadcastAddress, strconv.Itoa(p.HTTPPort))
81}
82
83func (p *Producer) TCPAddress() string {
84	return net.JoinHostPort(p.BroadcastAddress, strconv.Itoa(p.TCPPort))
85}
86
87// IsInconsistent checks for cases where an unexpected number of nsqd connections are
88// reporting the same information to nsqlookupd (ie: multiple instances are using the
89// same broadcast address), or cases where some nsqd are not reporting to all nsqlookupd.
90func (p *Producer) IsInconsistent(numLookupd int) bool {
91	return len(p.RemoteAddresses) != numLookupd
92}
93
94type TopicStats struct {
95	Node         string          `json:"node"`
96	Hostname     string          `json:"hostname"`
97	TopicName    string          `json:"topic_name"`
98	Depth        int64           `json:"depth"`
99	MemoryDepth  int64           `json:"memory_depth"`
100	BackendDepth int64           `json:"backend_depth"`
101	MessageCount int64           `json:"message_count"`
102	NodeStats    []*TopicStats   `json:"nodes"`
103	Channels     []*ChannelStats `json:"channels"`
104	Paused       bool            `json:"paused"`
105
106	E2eProcessingLatency *quantile.E2eProcessingLatencyAggregate `json:"e2e_processing_latency"`
107}
108
109func (t *TopicStats) Add(a *TopicStats) {
110	t.Node = "*"
111	t.Depth += a.Depth
112	t.MemoryDepth += a.MemoryDepth
113	t.BackendDepth += a.BackendDepth
114	t.MessageCount += a.MessageCount
115	if a.Paused {
116		t.Paused = a.Paused
117	}
118	for _, aChannelStats := range a.Channels {
119		found := false
120		for _, channelStats := range t.Channels {
121			if aChannelStats.ChannelName == channelStats.ChannelName {
122				found = true
123				channelStats.Add(aChannelStats)
124			}
125		}
126		if !found {
127			t.Channels = append(t.Channels, aChannelStats)
128		}
129	}
130	t.NodeStats = append(t.NodeStats, a)
131	sort.Sort(TopicStatsByHost{t.NodeStats})
132	if t.E2eProcessingLatency == nil {
133		t.E2eProcessingLatency = &quantile.E2eProcessingLatencyAggregate{
134			Addr:  t.Node,
135			Topic: t.TopicName,
136		}
137	}
138	t.E2eProcessingLatency.Add(a.E2eProcessingLatency)
139}
140
141type ChannelStats struct {
142	Node          string          `json:"node"`
143	Hostname      string          `json:"hostname"`
144	TopicName     string          `json:"topic_name"`
145	ChannelName   string          `json:"channel_name"`
146	Depth         int64           `json:"depth"`
147	MemoryDepth   int64           `json:"memory_depth"`
148	BackendDepth  int64           `json:"backend_depth"`
149	InFlightCount int64           `json:"in_flight_count"`
150	DeferredCount int64           `json:"deferred_count"`
151	RequeueCount  int64           `json:"requeue_count"`
152	TimeoutCount  int64           `json:"timeout_count"`
153	MessageCount  int64           `json:"message_count"`
154	ClientCount   int             `json:"client_count"`
155	Selected      bool            `json:"-"`
156	NodeStats     []*ChannelStats `json:"nodes"`
157	Clients       []*ClientStats  `json:"clients"`
158	Paused        bool            `json:"paused"`
159
160	E2eProcessingLatency *quantile.E2eProcessingLatencyAggregate `json:"e2e_processing_latency"`
161}
162
163func (c *ChannelStats) Add(a *ChannelStats) {
164	c.Node = "*"
165	c.Depth += a.Depth
166	c.MemoryDepth += a.MemoryDepth
167	c.BackendDepth += a.BackendDepth
168	c.InFlightCount += a.InFlightCount
169	c.DeferredCount += a.DeferredCount
170	c.RequeueCount += a.RequeueCount
171	c.TimeoutCount += a.TimeoutCount
172	c.MessageCount += a.MessageCount
173	c.ClientCount += a.ClientCount
174	if a.Paused {
175		c.Paused = a.Paused
176	}
177	c.NodeStats = append(c.NodeStats, a)
178	sort.Sort(ChannelStatsByHost{c.NodeStats})
179	if c.E2eProcessingLatency == nil {
180		c.E2eProcessingLatency = &quantile.E2eProcessingLatencyAggregate{
181			Addr:    c.Node,
182			Topic:   c.TopicName,
183			Channel: c.ChannelName,
184		}
185	}
186	c.E2eProcessingLatency.Add(a.E2eProcessingLatency)
187	c.Clients = append(c.Clients, a.Clients...)
188	sort.Sort(ClientsByHost{c.Clients})
189}
190
191type ClientStats struct {
192	Node              string        `json:"node"`
193	RemoteAddress     string        `json:"remote_address"`
194	Version           string        `json:"version"`
195	ClientID          string        `json:"client_id"`
196	Hostname          string        `json:"hostname"`
197	UserAgent         string        `json:"user_agent"`
198	ConnectTs         int64         `json:"connect_ts"`
199	ConnectedDuration time.Duration `json:"connected"`
200	InFlightCount     int           `json:"in_flight_count"`
201	ReadyCount        int           `json:"ready_count"`
202	FinishCount       int64         `json:"finish_count"`
203	RequeueCount      int64         `json:"requeue_count"`
204	MessageCount      int64         `json:"message_count"`
205	SampleRate        int32         `json:"sample_rate"`
206	Deflate           bool          `json:"deflate"`
207	Snappy            bool          `json:"snappy"`
208	Authed            bool          `json:"authed"`
209	AuthIdentity      string        `json:"auth_identity"`
210	AuthIdentityURL   string        `json:"auth_identity_url"`
211
212	TLS                           bool   `json:"tls"`
213	CipherSuite                   string `json:"tls_cipher_suite"`
214	TLSVersion                    string `json:"tls_version"`
215	TLSNegotiatedProtocol         string `json:"tls_negotiated_protocol"`
216	TLSNegotiatedProtocolIsMutual bool   `json:"tls_negotiated_protocol_is_mutual"`
217}
218
219// UnmarshalJSON implements json.Unmarshaler and postprocesses ConnectedDuration
220func (s *ClientStats) UnmarshalJSON(b []byte) error {
221	type locaClientStats ClientStats // re-typed to prevent recursion from json.Unmarshal
222	var ss locaClientStats
223	if err := json.Unmarshal(b, &ss); err != nil {
224		return err
225	}
226	*s = ClientStats(ss)
227	s.ConnectedDuration = time.Now().Truncate(time.Second).Sub(time.Unix(s.ConnectTs, 0))
228	return nil
229}
230
231func (c *ClientStats) HasUserAgent() bool {
232	return c.UserAgent != ""
233}
234
235func (c *ClientStats) HasSampleRate() bool {
236	return c.SampleRate > 0
237}
238
239type ChannelStatsList []*ChannelStats
240
241func (c ChannelStatsList) Len() int      { return len(c) }
242func (c ChannelStatsList) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
243
244type ChannelStatsByHost struct {
245	ChannelStatsList
246}
247
248func (c ChannelStatsByHost) Less(i, j int) bool {
249	return c.ChannelStatsList[i].Hostname < c.ChannelStatsList[j].Hostname
250}
251
252type ClientStatsList []*ClientStats
253
254func (c ClientStatsList) Len() int      { return len(c) }
255func (c ClientStatsList) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
256
257type ClientsByHost struct {
258	ClientStatsList
259}
260
261func (c ClientsByHost) Less(i, j int) bool {
262	return c.ClientStatsList[i].Hostname < c.ClientStatsList[j].Hostname
263}
264
265type TopicStatsList []*TopicStats
266
267func (t TopicStatsList) Len() int      { return len(t) }
268func (t TopicStatsList) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
269
270type TopicStatsByHost struct {
271	TopicStatsList
272}
273
274func (c TopicStatsByHost) Less(i, j int) bool {
275	return c.TopicStatsList[i].Hostname < c.TopicStatsList[j].Hostname
276}
277
278type Producers []*Producer
279
280func (t Producers) Len() int      { return len(t) }
281func (t Producers) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
282
283func (t Producers) HTTPAddrs() []string {
284	var addrs []string
285	for _, p := range t {
286		addrs = append(addrs, p.HTTPAddress())
287	}
288	return addrs
289}
290
291func (t Producers) Search(needle string) *Producer {
292	for _, producer := range t {
293		if needle == producer.HTTPAddress() {
294			return producer
295		}
296	}
297	return nil
298}
299
300type ProducersByHost struct {
301	Producers
302}
303
304func (c ProducersByHost) Less(i, j int) bool {
305	return c.Producers[i].Hostname < c.Producers[j].Hostname
306}
307