1package main 2 3import ( 4 "crypto/tls" 5 "fmt" 6 "hash/fnv" 7 "io" 8 "log" 9 "net" 10 "time" 11) 12 13// TCPOutput used for sending raw tcp payloads 14// Currently used for internal communication between listener and replay server 15// Can be used for transfering binary payloads like protocol buffers 16type TCPOutput struct { 17 address string 18 limit int 19 buf []chan []byte 20 bufStats *GorStat 21 config *TCPOutputConfig 22} 23 24// TCPOutputConfig tcp output configuration 25type TCPOutputConfig struct { 26 Secure bool `json:"output-tcp-secure"` 27 Sticky bool `json:"output-tcp-sticky"` 28} 29 30// NewTCPOutput constructor for TCPOutput 31// Initialize 10 workers which hold keep-alive connection 32func NewTCPOutput(address string, config *TCPOutputConfig) io.Writer { 33 o := new(TCPOutput) 34 35 o.address = address 36 o.config = config 37 38 if Settings.OutputTCPStats { 39 o.bufStats = NewGorStat("output_tcp", 5000) 40 } 41 42 if o.config.Sticky { 43 // create 10 buffers and send the buffer index to the worker 44 o.buf = make([]chan []byte, 10) 45 for i := 0; i < 10; i++ { 46 o.buf[i] = make(chan []byte, 100) 47 go o.worker(i) 48 } 49 } else { 50 // create 1 buffer and send its index (0) to all workers 51 o.buf = make([]chan []byte, 1) 52 o.buf[0] = make(chan []byte, 1000) 53 for i := 0; i < 10; i++ { 54 go o.worker(0) 55 } 56 } 57 58 return o 59} 60 61func (o *TCPOutput) worker(bufferIndex int) { 62 retries := 0 63 conn, err := o.connect(o.address) 64 for { 65 if err == nil { 66 break 67 } 68 69 log.Println("Can't connect to aggregator instance, reconnecting in 1 second. Retries:", retries) 70 time.Sleep(1 * time.Second) 71 72 conn, err = o.connect(o.address) 73 retries++ 74 } 75 76 if retries > 0 { 77 log.Println("Connected to aggregator instance after ", retries, " retries") 78 } 79 80 defer conn.Close() 81 82 for { 83 data := <-o.buf[bufferIndex] 84 conn.Write(data) 85 _, err := conn.Write([]byte(payloadSeparator)) 86 87 if err != nil { 88 log.Println("INFO: TCP output connection closed, reconnecting") 89 o.buf[bufferIndex] <- data 90 go o.worker(bufferIndex) 91 break 92 } 93 } 94} 95 96func (o *TCPOutput) getBufferIndex(data []byte) int { 97 if !o.config.Sticky { 98 return 0 99 } 100 101 hasher := fnv.New32a() 102 hasher.Write(payloadMeta(data)[1]) 103 return int(hasher.Sum32()) % 10 104} 105 106func (o *TCPOutput) Write(data []byte) (n int, err error) { 107 if !isOriginPayload(data) { 108 return len(data), nil 109 } 110 111 // We have to copy, because sending data in multiple threads 112 newBuf := make([]byte, len(data)) 113 copy(newBuf, data) 114 115 bufferIndex := o.getBufferIndex(data) 116 o.buf[bufferIndex] <- newBuf 117 118 if Settings.OutputTCPStats { 119 o.bufStats.Write(len(o.buf[bufferIndex])) 120 } 121 122 return len(data), nil 123} 124 125func (o *TCPOutput) connect(address string) (conn net.Conn, err error) { 126 if o.config.Secure { 127 conn, err = tls.Dial("tcp", address, &tls.Config{}) 128 } else { 129 conn, err = net.Dial("tcp", address) 130 } 131 132 return 133} 134 135func (o *TCPOutput) String() string { 136 return fmt.Sprintf("TCP output %s, limit: %d", o.address, o.limit) 137} 138