1package main
2
3import (
4	"encoding/json"
5	"log"
6	"net/url"
7	"strings"
8	"time"
9
10	"github.com/buger/goreplay/proto"
11
12	elastigo "github.com/mattbaird/elastigo/lib"
13)
14
15type ESUriErorr struct{}
16
17func (e *ESUriErorr) Error() string {
18	return "Wrong ElasticSearch URL format. Expected to be: scheme://host/index_name"
19}
20
21type ESPlugin struct {
22	Active  bool
23	ApiPort string
24	eConn   *elastigo.Conn
25	Host    string
26	Index   string
27	indexor *elastigo.BulkIndexer
28	done    chan bool
29}
30
31type ESRequestResponse struct {
32	ReqURL               string `json:"Req_URL"`
33	ReqMethod            string `json:"Req_Method"`
34	ReqUserAgent         string `json:"Req_User-Agent"`
35	ReqAcceptLanguage    string `json:"Req_Accept-Language,omitempty"`
36	ReqAccept            string `json:"Req_Accept,omitempty"`
37	ReqAcceptEncoding    string `json:"Req_Accept-Encoding,omitempty"`
38	ReqIfModifiedSince   string `json:"Req_If-Modified-Since,omitempty"`
39	ReqConnection        string `json:"Req_Connection,omitempty"`
40	ReqCookies           string `json:"Req_Cookies,omitempty"`
41	RespStatus           string `json:"Resp_Status"`
42	RespStatusCode       string `json:"Resp_Status-Code"`
43	RespProto            string `json:"Resp_Proto,omitempty"`
44	RespContentLength    string `json:"Resp_Content-Length,omitempty"`
45	RespContentType      string `json:"Resp_Content-Type,omitempty"`
46	RespTransferEncoding string `json:"Resp_Transfer-Encoding,omitempty"`
47	RespContentEncoding  string `json:"Resp_Content-Encoding,omitempty"`
48	RespExpires          string `json:"Resp_Expires,omitempty"`
49	RespCacheControl     string `json:"Resp_Cache-Control,omitempty"`
50	RespVary             string `json:"Resp_Vary,omitempty"`
51	RespSetCookie        string `json:"Resp_Set-Cookie,omitempty"`
52	Rtt                  int64  `json:"RTT"`
53	Timestamp            time.Time
54}
55
56// Parse ElasticSearch URI
57//
58// Proper format is: scheme://[userinfo@]host/index_name
59// userinfo is: user[:password]
60// net/url.Parse() does not fail if scheme is not provided but actualy does not
61// handle URI properly.
62// So we must 'validate' URI format to match requirements to use net/url.Parse()
63func parseURI(URI string) (err error, index string) {
64
65	parsedUrl, parseErr := url.Parse(URI)
66
67	if parseErr != nil {
68		err = new(ESUriErorr)
69		return
70	}
71
72	//	check URL validity by extracting host and index values.
73	host := parsedUrl.Host
74	urlPathParts := strings.Split(parsedUrl.Path, "/")
75	index = urlPathParts[len(urlPathParts)-1]
76
77	// force index specification in uri : ie no implicit index
78	if host == "" || index == "" {
79		err = new(ESUriErorr)
80	}
81
82	return
83}
84
85func (p *ESPlugin) Init(URI string) {
86	var err error
87
88	err, p.Index = parseURI(URI)
89
90	if err != nil {
91		log.Fatal("Can't initialize ElasticSearch plugin.", err)
92	}
93
94	p.eConn = elastigo.NewConn()
95
96	p.eConn.SetFromUrl(URI)
97
98	p.indexor = p.eConn.NewBulkIndexerErrors(50, 60)
99	p.done = make(chan bool)
100	p.indexor.Start()
101
102	go p.ErrorHandler()
103
104	Debug(1, "Initialized Elasticsearch Plugin")
105	return
106}
107
108func (p *ESPlugin) IndexerShutdown() {
109	p.indexor.Stop()
110	return
111}
112
113func (p *ESPlugin) ErrorHandler() {
114	for {
115		errBuf := <-p.indexor.ErrorChannel
116		Debug(1, "[ELASTICSEARCH]", errBuf.Err)
117	}
118}
119
120func (p *ESPlugin) RttDurationToMs(d time.Duration) int64 {
121	sec := d / time.Second
122	nsec := d % time.Second
123	fl := float64(sec) + float64(nsec)*1e-6
124	return int64(fl)
125}
126
127func (p *ESPlugin) ResponseAnalyze(req, resp []byte, start, stop time.Time) {
128	if len(resp) == 0 {
129		// nil http response - skipped elasticsearch export for this request
130		return
131	}
132	t := time.Now()
133	rtt := p.RttDurationToMs(stop.Sub(start))
134	req = payloadBody(req)
135
136	esResp := ESRequestResponse{
137		ReqURL:               string(proto.Path(req)),
138		ReqMethod:            string(proto.Method(req)),
139		ReqUserAgent:         string(proto.Header(req, []byte("User-Agent"))),
140		ReqAcceptLanguage:    string(proto.Header(req, []byte("Accept-Language"))),
141		ReqAccept:            string(proto.Header(req, []byte("Accept"))),
142		ReqAcceptEncoding:    string(proto.Header(req, []byte("Accept-Encoding"))),
143		ReqIfModifiedSince:   string(proto.Header(req, []byte("If-Modified-Since"))),
144		ReqConnection:        string(proto.Header(req, []byte("Connection"))),
145		ReqCookies:           string(proto.Header(req, []byte("Cookie"))),
146		RespStatus:           string(proto.Status(resp)),
147		RespStatusCode:       string(proto.Status(resp)),
148		RespProto:            string(proto.Method(resp)),
149		RespContentLength:    string(proto.Header(resp, []byte("Content-Length"))),
150		RespContentType:      string(proto.Header(resp, []byte("Content-Type"))),
151		RespTransferEncoding: string(proto.Header(resp, []byte("Transfer-Encoding"))),
152		RespContentEncoding:  string(proto.Header(resp, []byte("Content-Encoding"))),
153		RespExpires:          string(proto.Header(resp, []byte("Expires"))),
154		RespCacheControl:     string(proto.Header(resp, []byte("Cache-Control"))),
155		RespVary:             string(proto.Header(resp, []byte("Vary"))),
156		RespSetCookie:        string(proto.Header(resp, []byte("Set-Cookie"))),
157		Rtt:                  rtt,
158		Timestamp:            t,
159	}
160	j, err := json.Marshal(&esResp)
161	if err != nil {
162		Debug(0, "[ELASTIC-RESPONSE]", err)
163	} else {
164		p.indexor.Index(p.Index, "RequestResponse", "", "", "", &t, j)
165	}
166	return
167}
168