1package main
2
3import (
4	"crypto/tls"
5	"fmt"
6	"hash/fnv"
7	"io"
8	"log"
9	"net"
10	"time"
11)
12
13// TCPOutput used for sending raw tcp payloads
14// Currently used for internal communication between listener and replay server
15// Can be used for transfering binary payloads like protocol buffers
16type TCPOutput struct {
17	address  string
18	limit    int
19	buf      []chan []byte
20	bufStats *GorStat
21	config   *TCPOutputConfig
22}
23
24// TCPOutputConfig tcp output configuration
25type TCPOutputConfig struct {
26	Secure bool `json:"output-tcp-secure"`
27	Sticky bool `json:"output-tcp-sticky"`
28}
29
30// NewTCPOutput constructor for TCPOutput
31// Initialize 10 workers which hold keep-alive connection
32func NewTCPOutput(address string, config *TCPOutputConfig) io.Writer {
33	o := new(TCPOutput)
34
35	o.address = address
36	o.config = config
37
38	if Settings.OutputTCPStats {
39		o.bufStats = NewGorStat("output_tcp", 5000)
40	}
41
42	if o.config.Sticky {
43		// create 10 buffers and send the buffer index to the worker
44		o.buf = make([]chan []byte, 10)
45		for i := 0; i < 10; i++ {
46			o.buf[i] = make(chan []byte, 100)
47			go o.worker(i)
48		}
49	} else {
50		// create 1 buffer and send its index (0) to all workers
51		o.buf = make([]chan []byte, 1)
52		o.buf[0] = make(chan []byte, 1000)
53		for i := 0; i < 10; i++ {
54			go o.worker(0)
55		}
56	}
57
58	return o
59}
60
61func (o *TCPOutput) worker(bufferIndex int) {
62	retries := 0
63	conn, err := o.connect(o.address)
64	for {
65		if err == nil {
66			break
67		}
68
69		log.Println("Can't connect to aggregator instance, reconnecting in 1 second. Retries:", retries)
70		time.Sleep(1 * time.Second)
71
72		conn, err = o.connect(o.address)
73		retries++
74	}
75
76	if retries > 0 {
77		log.Println("Connected to aggregator instance after ", retries, " retries")
78	}
79
80	defer conn.Close()
81
82	for {
83		data := <-o.buf[bufferIndex]
84		conn.Write(data)
85		_, err := conn.Write([]byte(payloadSeparator))
86
87		if err != nil {
88			log.Println("INFO: TCP output connection closed, reconnecting")
89			o.buf[bufferIndex] <- data
90			go o.worker(bufferIndex)
91			break
92		}
93	}
94}
95
96func (o *TCPOutput) getBufferIndex(data []byte) int {
97	if !o.config.Sticky {
98		return 0
99	}
100
101	hasher := fnv.New32a()
102	hasher.Write(payloadMeta(data)[1])
103	return int(hasher.Sum32()) % 10
104}
105
106func (o *TCPOutput) Write(data []byte) (n int, err error) {
107	if !isOriginPayload(data) {
108		return len(data), nil
109	}
110
111	// We have to copy, because sending data in multiple threads
112	newBuf := make([]byte, len(data))
113	copy(newBuf, data)
114
115	bufferIndex := o.getBufferIndex(data)
116	o.buf[bufferIndex] <- newBuf
117
118	if Settings.OutputTCPStats {
119		o.bufStats.Write(len(o.buf[bufferIndex]))
120	}
121
122	return len(data), nil
123}
124
125func (o *TCPOutput) connect(address string) (conn net.Conn, err error) {
126	if o.config.Secure {
127		conn, err = tls.Dial("tcp", address, &tls.Config{})
128	} else {
129		conn, err = net.Dial("tcp", address)
130	}
131
132	return
133}
134
135func (o *TCPOutput) String() string {
136	return fmt.Sprintf("TCP output %s, limit: %d", o.address, o.limit)
137}
138