1package main
2
3import (
4	"bufio"
5	"flag"
6	"fmt"
7	"log"
8	"net"
9	"runtime"
10	"sync"
11	"sync/atomic"
12	"time"
13
14	"github.com/nsqio/go-nsq"
15)
16
17var (
18	runfor     = flag.Duration("runfor", 10*time.Second, "duration of time to run")
19	tcpAddress = flag.String("nsqd-tcp-address", "127.0.0.1:4150", "<addr>:<port> to connect to nsqd")
20	topic      = flag.String("topic", "sub_bench", "topic to receive messages on")
21	size       = flag.Int("size", 200, "size of messages")
22	batchSize  = flag.Int("batch-size", 200, "batch size of messages")
23	deadline   = flag.String("deadline", "", "deadline to start the benchmark run")
24)
25
26var totalMsgCount int64
27
28func main() {
29	flag.Parse()
30	var wg sync.WaitGroup
31
32	log.SetPrefix("[bench_writer] ")
33
34	msg := make([]byte, *size)
35	batch := make([][]byte, *batchSize)
36	for i := range batch {
37		batch[i] = msg
38	}
39
40	goChan := make(chan int)
41	rdyChan := make(chan int)
42	for j := 0; j < runtime.GOMAXPROCS(0); j++ {
43		wg.Add(1)
44		go func() {
45			pubWorker(*runfor, *tcpAddress, *batchSize, batch, *topic, rdyChan, goChan)
46			wg.Done()
47		}()
48		<-rdyChan
49	}
50
51	if *deadline != "" {
52		t, err := time.Parse("2006-01-02 15:04:05", *deadline)
53		if err != nil {
54			log.Fatal(err)
55		}
56		d := t.Sub(time.Now())
57		log.Printf("sleeping until %s (%s)", t, d)
58		time.Sleep(d)
59	}
60
61	start := time.Now()
62	close(goChan)
63	wg.Wait()
64	end := time.Now()
65	duration := end.Sub(start)
66	tmc := atomic.LoadInt64(&totalMsgCount)
67	log.Printf("duration: %s - %.03fmb/s - %.03fops/s - %.03fus/op",
68		duration,
69		float64(tmc*int64(*size))/duration.Seconds()/1024/1024,
70		float64(tmc)/duration.Seconds(),
71		float64(duration/time.Microsecond)/float64(tmc))
72}
73
74func pubWorker(td time.Duration, tcpAddr string, batchSize int, batch [][]byte, topic string, rdyChan chan int, goChan chan int) {
75	conn, err := net.DialTimeout("tcp", tcpAddr, time.Second)
76	if err != nil {
77		panic(err.Error())
78	}
79	conn.Write(nsq.MagicV2)
80	rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
81	ci := make(map[string]interface{})
82	ci["client_id"] = "writer"
83	ci["hostname"] = "writer"
84	ci["user_agent"] = fmt.Sprintf("bench_writer/%s", nsq.VERSION)
85	cmd, _ := nsq.Identify(ci)
86	cmd.WriteTo(rw)
87	rdyChan <- 1
88	<-goChan
89	rw.Flush()
90	nsq.ReadResponse(rw)
91	var msgCount int64
92	endTime := time.Now().Add(td)
93	for {
94		cmd, _ := nsq.MultiPublish(topic, batch)
95		_, err := cmd.WriteTo(rw)
96		if err != nil {
97			panic(err.Error())
98		}
99		err = rw.Flush()
100		if err != nil {
101			panic(err.Error())
102		}
103		resp, err := nsq.ReadResponse(rw)
104		if err != nil {
105			panic(err.Error())
106		}
107		frameType, data, err := nsq.UnpackResponse(resp)
108		if err != nil {
109			panic(err.Error())
110		}
111		if frameType == nsq.FrameTypeError {
112			panic(string(data))
113		}
114		msgCount += int64(len(batch))
115		if time.Now().After(endTime) {
116			break
117		}
118	}
119	atomic.AddInt64(&totalMsgCount, msgCount)
120}
121