1package main
2
3import (
4	"bufio"
5	"flag"
6	"fmt"
7	"net"
8	"sync"
9	"time"
10
11	"github.com/nsqio/go-nsq"
12)
13
14var (
15	num        = flag.Int("num", 10000, "num channels")
16	tcpAddress = flag.String("nsqd-tcp-address", "127.0.0.1:4150", "<addr>:<port> to connect to nsqd")
17)
18
19func main() {
20	flag.Parse()
21	var wg sync.WaitGroup
22
23	goChan := make(chan int)
24	rdyChan := make(chan int)
25	for j := 0; j < *num; j++ {
26		wg.Add(1)
27		go func(id int) {
28			subWorker(*num, *tcpAddress, fmt.Sprintf("t%d", j), "ch", rdyChan, goChan, id)
29			wg.Done()
30		}(j)
31		<-rdyChan
32		time.Sleep(5 * time.Millisecond)
33	}
34
35	close(goChan)
36	wg.Wait()
37}
38
39func subWorker(n int, tcpAddr string,
40	topic string, channel string,
41	rdyChan chan int, goChan chan int, id int) {
42	conn, err := net.DialTimeout("tcp", tcpAddr, time.Second)
43	if err != nil {
44		panic(err.Error())
45	}
46	conn.Write(nsq.MagicV2)
47	rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
48	ci := make(map[string]interface{})
49	ci["client_id"] = "test"
50	cmd, _ := nsq.Identify(ci)
51	cmd.WriteTo(rw)
52	nsq.Subscribe(topic, channel).WriteTo(rw)
53	rdyCount := 1
54	rdy := rdyCount
55	rdyChan <- 1
56	<-goChan
57	nsq.Ready(rdyCount).WriteTo(rw)
58	rw.Flush()
59	nsq.ReadResponse(rw)
60	nsq.ReadResponse(rw)
61	for {
62		resp, err := nsq.ReadResponse(rw)
63		if err != nil {
64			panic(err.Error())
65		}
66		frameType, data, err := nsq.UnpackResponse(resp)
67		if err != nil {
68			panic(err.Error())
69		}
70		if frameType == nsq.FrameTypeError {
71			panic(string(data))
72		} else if frameType == nsq.FrameTypeResponse {
73			nsq.Nop().WriteTo(rw)
74			rw.Flush()
75			continue
76		}
77		msg, err := nsq.DecodeMessage(data)
78		if err != nil {
79			panic(err.Error())
80		}
81		nsq.Finish(msg.ID).WriteTo(rw)
82		rdy--
83		if rdy == 0 {
84			nsq.Ready(rdyCount).WriteTo(rw)
85			rdy = rdyCount
86			rw.Flush()
87		}
88	}
89}
90