1package main
2
3import (
4	"fmt"
5	"io"
6	"log"
7	"sync/atomic"
8	"time"
9
10	"github.com/buger/goreplay/proto"
11	"github.com/buger/goreplay/size"
12)
13
14const initialDynamicWorkers = 10
15
16type httpWorker struct {
17	output       *HTTPOutput
18	client       *HTTPClient
19	lastActivity time.Time
20	queue        chan []byte
21	stop         chan bool
22}
23
24func newHTTPWorker(output *HTTPOutput, queue chan []byte) *httpWorker {
25	client := NewHTTPClient(output.address, &HTTPClientConfig{
26		FollowRedirects:    output.config.RedirectLimit,
27		Debug:              output.config.Debug,
28		OriginalHost:       output.config.OriginalHost,
29		Timeout:            output.config.Timeout,
30		ResponseBufferSize: int(output.config.BufferSize),
31	})
32
33	w := &httpWorker{client: client}
34	if queue == nil {
35		w.queue = make(chan []byte, 100)
36	} else {
37		w.queue = queue
38	}
39	w.stop = make(chan bool)
40
41	go func() {
42		for {
43			select {
44			case payload := <-w.queue:
45				output.sendRequest(client, payload)
46			case <-w.stop:
47				return
48			}
49		}
50	}()
51
52	return w
53}
54
55type response struct {
56	payload       []byte
57	uuid          []byte
58	roundTripTime int64
59	startedAt     int64
60}
61
62// HTTPOutputConfig struct for holding http output configuration
63type HTTPOutputConfig struct {
64	RedirectLimit int `json:"output-http-redirect-limit"`
65
66	Stats      bool `json:"output-http-stats"`
67	WorkersMin int  `json:"output-http-workers-min"`
68	WorkersMax int  `json:"output-http-workers"`
69	StatsMs    int  `json:"output-http-stats-ms"`
70	Workers    int
71	QueueLen   int `json:"output-http-queue-len"`
72
73	ElasticSearch string `json:"output-http-elasticsearch"`
74
75	Timeout      time.Duration `json:"output-http-timeout"`
76	OriginalHost bool          `json:"output-http-original-host"`
77	BufferSize   size.Size     `json:"output-http-response-buffer"`
78
79	CompatibilityMode bool `json:"output-http-compatibility-mode"`
80
81	RequestGroup string
82
83	Debug bool `json:"output-http-debug"`
84
85	TrackResponses bool `json:"output-http-track-response"`
86}
87
88// HTTPOutput plugin manage pool of workers which send request to replayed server
89// By default workers pool is dynamic and starts with 10 workers
90// You can specify fixed number of workers using `--output-http-workers`
91type HTTPOutput struct {
92	// Keep this as first element of struct because it guarantees 64bit
93	// alignment. atomic.* functions crash on 32bit machines if operand is not
94	// aligned at 64bit. See https://github.com/golang/go/issues/599
95	activeWorkers int64
96
97	workerSessions map[string]*httpWorker
98
99	address string
100	limit   int
101	queue   chan []byte
102
103	responses chan response
104
105	needWorker chan int
106
107	config *HTTPOutputConfig
108
109	queueStats *GorStat
110
111	elasticSearch *ESPlugin
112
113	stop chan bool // Channel used only to indicate goroutine should shutdown
114}
115
116// NewHTTPOutput constructor for HTTPOutput
117// Initialize workers
118func NewHTTPOutput(address string, config *HTTPOutputConfig) io.Writer {
119	o := new(HTTPOutput)
120
121	o.address = address
122	o.config = config
123	o.stop = make(chan bool)
124
125	if o.config.Stats {
126		o.queueStats = NewGorStat("output_http", o.config.StatsMs)
127	}
128
129	o.queue = make(chan []byte, o.config.QueueLen)
130	o.responses = make(chan response, o.config.QueueLen)
131	o.needWorker = make(chan int, 1)
132
133	// Initial workers count
134	if o.config.WorkersMax == 0 {
135		o.needWorker <- initialDynamicWorkers
136	} else {
137		o.needWorker <- o.config.WorkersMax
138	}
139
140	if o.config.ElasticSearch != "" {
141		o.elasticSearch = new(ESPlugin)
142		o.elasticSearch.Init(o.config.ElasticSearch)
143	}
144
145	if Settings.RecognizeTCPSessions {
146		if !PRO {
147			log.Fatal("Detailed TCP sessions work only with PRO license")
148		}
149		o.workerSessions = make(map[string]*httpWorker, 100)
150		go o.sessionWorkerMaster()
151	} else {
152		go o.workerMaster()
153	}
154
155	return o
156}
157
158func (o *HTTPOutput) workerMaster() {
159	for {
160		newWorkers := <-o.needWorker
161		atomic.AddInt64(&o.activeWorkers, int64(newWorkers))
162		for i := 0; i < newWorkers; i++ {
163			go o.startWorker()
164		}
165	}
166}
167
168func (o *HTTPOutput) sessionWorkerMaster() {
169	gc := time.Tick(time.Second)
170
171	for {
172		select {
173		case p := <-o.queue:
174			id := payloadID(p)
175			sessionID := string(id[0:20])
176			worker, ok := o.workerSessions[sessionID]
177
178			if !ok {
179				atomic.AddInt64(&o.activeWorkers, 1)
180				worker = newHTTPWorker(o, nil)
181				o.workerSessions[sessionID] = worker
182			}
183
184			worker.queue <- p
185			worker.lastActivity = time.Now()
186		case <-gc:
187			now := time.Now()
188
189			for id, w := range o.workerSessions {
190				if !w.lastActivity.IsZero() && now.Sub(w.lastActivity) >= 120*time.Second {
191					w.stop <- true
192					delete(o.workerSessions, id)
193					atomic.AddInt64(&o.activeWorkers, -1)
194				}
195			}
196		}
197	}
198}
199
200func (o *HTTPOutput) startWorker() {
201	client := NewHTTPClient(o.address, &HTTPClientConfig{
202		FollowRedirects:    o.config.RedirectLimit,
203		Debug:              o.config.Debug,
204		OriginalHost:       o.config.OriginalHost,
205		Timeout:            o.config.Timeout,
206		ResponseBufferSize: int(o.config.BufferSize),
207		CompatibilityMode:  o.config.CompatibilityMode,
208	})
209
210	for {
211		select {
212		case <-o.stop:
213			return
214		case data := <-o.queue:
215			o.sendRequest(client, data)
216		case <-time.After(2 * time.Second):
217			// When dynamic scaling enabled workers die after 2s of inactivity
218			if o.config.WorkersMin == o.config.WorkersMax {
219				continue
220			}
221
222			workersCount := int(atomic.LoadInt64(&o.activeWorkers))
223
224			// At least 1 startWorker should be alive
225			if workersCount != 1 && workersCount > o.config.WorkersMin {
226				atomic.AddInt64(&o.activeWorkers, -1)
227				return
228			}
229		}
230	}
231}
232
233func (o *HTTPOutput) Write(data []byte) (n int, err error) {
234	if !isRequestPayload(data) {
235		return len(data), nil
236	}
237
238	buf := make([]byte, len(data))
239	copy(buf, data)
240
241	select {
242	case <-o.stop:
243		return 0, ErrorStopped
244	case o.queue <- buf:
245	}
246
247	if o.config.Stats {
248		o.queueStats.Write(len(o.queue))
249	}
250
251	if !Settings.RecognizeTCPSessions && o.config.WorkersMax != o.config.WorkersMin {
252		workersCount := int(atomic.LoadInt64(&o.activeWorkers))
253
254		if len(o.queue) > workersCount {
255			extraWorkersReq := len(o.queue) - workersCount + 1
256			maxWorkersAvailable := o.config.WorkersMax - workersCount
257			if extraWorkersReq > maxWorkersAvailable {
258				extraWorkersReq = maxWorkersAvailable
259			}
260			if extraWorkersReq > 0 {
261				o.needWorker <- extraWorkersReq
262			}
263		}
264	}
265
266	return len(data), nil
267}
268
269func (o *HTTPOutput) Read(data []byte) (int, error) {
270	var resp response
271	select {
272	case <-o.stop:
273		return 0, ErrorStopped
274	case resp = <-o.responses:
275	}
276
277	Debug(3, "[OUTPUT-HTTP] Received response:", string(resp.payload))
278
279	header := payloadHeader(ReplayedResponsePayload, resp.uuid, resp.roundTripTime, resp.startedAt)
280	n := copy(data, header)
281	if len(data) > len(header) {
282		n += copy(data[len(header):], resp.payload)
283	}
284	dis := len(header) + len(data) - n
285	if dis > 0 {
286		Debug(2, "[OUTPUT-HTTP] discarded", dis, "increase copy buffer size")
287	}
288
289	return n, nil
290}
291
292func (o *HTTPOutput) sendRequest(client *HTTPClient, request []byte) {
293	meta := payloadMeta(request)
294
295	Debug(2, fmt.Sprintf("[OUTPUT-HTTP] meta: %q", meta))
296
297	if len(meta) < 2 {
298		return
299	}
300	uuid := meta[1]
301
302	body := payloadBody(request)
303	if !proto.HasRequestTitle(body) {
304		return
305	}
306
307	start := time.Now()
308	resp, err := client.Send(body)
309	stop := time.Now()
310
311	if err != nil {
312		Debug(1, "Error when sending ", err)
313	}
314
315	if o.config.TrackResponses {
316		o.responses <- response{resp, uuid, start.UnixNano(), stop.UnixNano() - start.UnixNano()}
317	}
318
319	if o.elasticSearch != nil {
320		o.elasticSearch.ResponseAnalyze(request, resp, start, stop)
321	}
322}
323
324func (o *HTTPOutput) String() string {
325	return "HTTP output: " + o.address
326}
327
328// Close closes the data channel so that data
329func (o *HTTPOutput) Close() error {
330	close(o.stop)
331	return nil
332}
333