1package main 2 3import ( 4 "fmt" 5 "io" 6 "log" 7 "sync/atomic" 8 "time" 9 10 "github.com/buger/goreplay/proto" 11 "github.com/buger/goreplay/size" 12) 13 14const initialDynamicWorkers = 10 15 16type httpWorker struct { 17 output *HTTPOutput 18 client *HTTPClient 19 lastActivity time.Time 20 queue chan []byte 21 stop chan bool 22} 23 24func newHTTPWorker(output *HTTPOutput, queue chan []byte) *httpWorker { 25 client := NewHTTPClient(output.address, &HTTPClientConfig{ 26 FollowRedirects: output.config.RedirectLimit, 27 Debug: output.config.Debug, 28 OriginalHost: output.config.OriginalHost, 29 Timeout: output.config.Timeout, 30 ResponseBufferSize: int(output.config.BufferSize), 31 }) 32 33 w := &httpWorker{client: client} 34 if queue == nil { 35 w.queue = make(chan []byte, 100) 36 } else { 37 w.queue = queue 38 } 39 w.stop = make(chan bool) 40 41 go func() { 42 for { 43 select { 44 case payload := <-w.queue: 45 output.sendRequest(client, payload) 46 case <-w.stop: 47 return 48 } 49 } 50 }() 51 52 return w 53} 54 55type response struct { 56 payload []byte 57 uuid []byte 58 roundTripTime int64 59 startedAt int64 60} 61 62// HTTPOutputConfig struct for holding http output configuration 63type HTTPOutputConfig struct { 64 RedirectLimit int `json:"output-http-redirect-limit"` 65 66 Stats bool `json:"output-http-stats"` 67 WorkersMin int `json:"output-http-workers-min"` 68 WorkersMax int `json:"output-http-workers"` 69 StatsMs int `json:"output-http-stats-ms"` 70 Workers int 71 QueueLen int `json:"output-http-queue-len"` 72 73 ElasticSearch string `json:"output-http-elasticsearch"` 74 75 Timeout time.Duration `json:"output-http-timeout"` 76 OriginalHost bool `json:"output-http-original-host"` 77 BufferSize size.Size `json:"output-http-response-buffer"` 78 79 CompatibilityMode bool `json:"output-http-compatibility-mode"` 80 81 RequestGroup string 82 83 Debug bool `json:"output-http-debug"` 84 85 TrackResponses bool `json:"output-http-track-response"` 86} 87 88// HTTPOutput plugin manage pool of workers which send request to replayed server 89// By default workers pool is dynamic and starts with 10 workers 90// You can specify fixed number of workers using `--output-http-workers` 91type HTTPOutput struct { 92 // Keep this as first element of struct because it guarantees 64bit 93 // alignment. atomic.* functions crash on 32bit machines if operand is not 94 // aligned at 64bit. See https://github.com/golang/go/issues/599 95 activeWorkers int64 96 97 workerSessions map[string]*httpWorker 98 99 address string 100 limit int 101 queue chan []byte 102 103 responses chan response 104 105 needWorker chan int 106 107 config *HTTPOutputConfig 108 109 queueStats *GorStat 110 111 elasticSearch *ESPlugin 112 113 stop chan bool // Channel used only to indicate goroutine should shutdown 114} 115 116// NewHTTPOutput constructor for HTTPOutput 117// Initialize workers 118func NewHTTPOutput(address string, config *HTTPOutputConfig) io.Writer { 119 o := new(HTTPOutput) 120 121 o.address = address 122 o.config = config 123 o.stop = make(chan bool) 124 125 if o.config.Stats { 126 o.queueStats = NewGorStat("output_http", o.config.StatsMs) 127 } 128 129 o.queue = make(chan []byte, o.config.QueueLen) 130 o.responses = make(chan response, o.config.QueueLen) 131 o.needWorker = make(chan int, 1) 132 133 // Initial workers count 134 if o.config.WorkersMax == 0 { 135 o.needWorker <- initialDynamicWorkers 136 } else { 137 o.needWorker <- o.config.WorkersMax 138 } 139 140 if o.config.ElasticSearch != "" { 141 o.elasticSearch = new(ESPlugin) 142 o.elasticSearch.Init(o.config.ElasticSearch) 143 } 144 145 if Settings.RecognizeTCPSessions { 146 if !PRO { 147 log.Fatal("Detailed TCP sessions work only with PRO license") 148 } 149 o.workerSessions = make(map[string]*httpWorker, 100) 150 go o.sessionWorkerMaster() 151 } else { 152 go o.workerMaster() 153 } 154 155 return o 156} 157 158func (o *HTTPOutput) workerMaster() { 159 for { 160 newWorkers := <-o.needWorker 161 atomic.AddInt64(&o.activeWorkers, int64(newWorkers)) 162 for i := 0; i < newWorkers; i++ { 163 go o.startWorker() 164 } 165 } 166} 167 168func (o *HTTPOutput) sessionWorkerMaster() { 169 gc := time.Tick(time.Second) 170 171 for { 172 select { 173 case p := <-o.queue: 174 id := payloadID(p) 175 sessionID := string(id[0:20]) 176 worker, ok := o.workerSessions[sessionID] 177 178 if !ok { 179 atomic.AddInt64(&o.activeWorkers, 1) 180 worker = newHTTPWorker(o, nil) 181 o.workerSessions[sessionID] = worker 182 } 183 184 worker.queue <- p 185 worker.lastActivity = time.Now() 186 case <-gc: 187 now := time.Now() 188 189 for id, w := range o.workerSessions { 190 if !w.lastActivity.IsZero() && now.Sub(w.lastActivity) >= 120*time.Second { 191 w.stop <- true 192 delete(o.workerSessions, id) 193 atomic.AddInt64(&o.activeWorkers, -1) 194 } 195 } 196 } 197 } 198} 199 200func (o *HTTPOutput) startWorker() { 201 client := NewHTTPClient(o.address, &HTTPClientConfig{ 202 FollowRedirects: o.config.RedirectLimit, 203 Debug: o.config.Debug, 204 OriginalHost: o.config.OriginalHost, 205 Timeout: o.config.Timeout, 206 ResponseBufferSize: int(o.config.BufferSize), 207 CompatibilityMode: o.config.CompatibilityMode, 208 }) 209 210 for { 211 select { 212 case <-o.stop: 213 return 214 case data := <-o.queue: 215 o.sendRequest(client, data) 216 case <-time.After(2 * time.Second): 217 // When dynamic scaling enabled workers die after 2s of inactivity 218 if o.config.WorkersMin == o.config.WorkersMax { 219 continue 220 } 221 222 workersCount := int(atomic.LoadInt64(&o.activeWorkers)) 223 224 // At least 1 startWorker should be alive 225 if workersCount != 1 && workersCount > o.config.WorkersMin { 226 atomic.AddInt64(&o.activeWorkers, -1) 227 return 228 } 229 } 230 } 231} 232 233func (o *HTTPOutput) Write(data []byte) (n int, err error) { 234 if !isRequestPayload(data) { 235 return len(data), nil 236 } 237 238 buf := make([]byte, len(data)) 239 copy(buf, data) 240 241 select { 242 case <-o.stop: 243 return 0, ErrorStopped 244 case o.queue <- buf: 245 } 246 247 if o.config.Stats { 248 o.queueStats.Write(len(o.queue)) 249 } 250 251 if !Settings.RecognizeTCPSessions && o.config.WorkersMax != o.config.WorkersMin { 252 workersCount := int(atomic.LoadInt64(&o.activeWorkers)) 253 254 if len(o.queue) > workersCount { 255 extraWorkersReq := len(o.queue) - workersCount + 1 256 maxWorkersAvailable := o.config.WorkersMax - workersCount 257 if extraWorkersReq > maxWorkersAvailable { 258 extraWorkersReq = maxWorkersAvailable 259 } 260 if extraWorkersReq > 0 { 261 o.needWorker <- extraWorkersReq 262 } 263 } 264 } 265 266 return len(data), nil 267} 268 269func (o *HTTPOutput) Read(data []byte) (int, error) { 270 var resp response 271 select { 272 case <-o.stop: 273 return 0, ErrorStopped 274 case resp = <-o.responses: 275 } 276 277 Debug(3, "[OUTPUT-HTTP] Received response:", string(resp.payload)) 278 279 header := payloadHeader(ReplayedResponsePayload, resp.uuid, resp.roundTripTime, resp.startedAt) 280 n := copy(data, header) 281 if len(data) > len(header) { 282 n += copy(data[len(header):], resp.payload) 283 } 284 dis := len(header) + len(data) - n 285 if dis > 0 { 286 Debug(2, "[OUTPUT-HTTP] discarded", dis, "increase copy buffer size") 287 } 288 289 return n, nil 290} 291 292func (o *HTTPOutput) sendRequest(client *HTTPClient, request []byte) { 293 meta := payloadMeta(request) 294 295 Debug(2, fmt.Sprintf("[OUTPUT-HTTP] meta: %q", meta)) 296 297 if len(meta) < 2 { 298 return 299 } 300 uuid := meta[1] 301 302 body := payloadBody(request) 303 if !proto.HasRequestTitle(body) { 304 return 305 } 306 307 start := time.Now() 308 resp, err := client.Send(body) 309 stop := time.Now() 310 311 if err != nil { 312 Debug(1, "Error when sending ", err) 313 } 314 315 if o.config.TrackResponses { 316 o.responses <- response{resp, uuid, start.UnixNano(), stop.UnixNano() - start.UnixNano()} 317 } 318 319 if o.elasticSearch != nil { 320 o.elasticSearch.ResponseAnalyze(request, resp, start, stop) 321 } 322} 323 324func (o *HTTPOutput) String() string { 325 return "HTTP output: " + o.address 326} 327 328// Close closes the data channel so that data 329func (o *HTTPOutput) Close() error { 330 close(o.stop) 331 return nil 332} 333