1package main 2 3import ( 4 "encoding/json" 5 "log" 6 "net/url" 7 "strings" 8 "time" 9 10 "github.com/buger/goreplay/proto" 11 12 elastigo "github.com/mattbaird/elastigo/lib" 13) 14 15type ESUriErorr struct{} 16 17func (e *ESUriErorr) Error() string { 18 return "Wrong ElasticSearch URL format. Expected to be: scheme://host/index_name" 19} 20 21type ESPlugin struct { 22 Active bool 23 ApiPort string 24 eConn *elastigo.Conn 25 Host string 26 Index string 27 indexor *elastigo.BulkIndexer 28 done chan bool 29} 30 31type ESRequestResponse struct { 32 ReqURL string `json:"Req_URL"` 33 ReqMethod string `json:"Req_Method"` 34 ReqUserAgent string `json:"Req_User-Agent"` 35 ReqAcceptLanguage string `json:"Req_Accept-Language,omitempty"` 36 ReqAccept string `json:"Req_Accept,omitempty"` 37 ReqAcceptEncoding string `json:"Req_Accept-Encoding,omitempty"` 38 ReqIfModifiedSince string `json:"Req_If-Modified-Since,omitempty"` 39 ReqConnection string `json:"Req_Connection,omitempty"` 40 ReqCookies string `json:"Req_Cookies,omitempty"` 41 RespStatus string `json:"Resp_Status"` 42 RespStatusCode string `json:"Resp_Status-Code"` 43 RespProto string `json:"Resp_Proto,omitempty"` 44 RespContentLength string `json:"Resp_Content-Length,omitempty"` 45 RespContentType string `json:"Resp_Content-Type,omitempty"` 46 RespTransferEncoding string `json:"Resp_Transfer-Encoding,omitempty"` 47 RespContentEncoding string `json:"Resp_Content-Encoding,omitempty"` 48 RespExpires string `json:"Resp_Expires,omitempty"` 49 RespCacheControl string `json:"Resp_Cache-Control,omitempty"` 50 RespVary string `json:"Resp_Vary,omitempty"` 51 RespSetCookie string `json:"Resp_Set-Cookie,omitempty"` 52 Rtt int64 `json:"RTT"` 53 Timestamp time.Time 54} 55 56// Parse ElasticSearch URI 57// 58// Proper format is: scheme://[userinfo@]host/index_name 59// userinfo is: user[:password] 60// net/url.Parse() does not fail if scheme is not provided but actualy does not 61// handle URI properly. 62// So we must 'validate' URI format to match requirements to use net/url.Parse() 63func parseURI(URI string) (err error, index string) { 64 65 parsedUrl, parseErr := url.Parse(URI) 66 67 if parseErr != nil { 68 err = new(ESUriErorr) 69 return 70 } 71 72 // check URL validity by extracting host and index values. 73 host := parsedUrl.Host 74 urlPathParts := strings.Split(parsedUrl.Path, "/") 75 index = urlPathParts[len(urlPathParts)-1] 76 77 // force index specification in uri : ie no implicit index 78 if host == "" || index == "" { 79 err = new(ESUriErorr) 80 } 81 82 return 83} 84 85func (p *ESPlugin) Init(URI string) { 86 var err error 87 88 err, p.Index = parseURI(URI) 89 90 if err != nil { 91 log.Fatal("Can't initialize ElasticSearch plugin.", err) 92 } 93 94 p.eConn = elastigo.NewConn() 95 96 p.eConn.SetFromUrl(URI) 97 98 p.indexor = p.eConn.NewBulkIndexerErrors(50, 60) 99 p.done = make(chan bool) 100 p.indexor.Start() 101 102 go p.ErrorHandler() 103 104 Debug(1, "Initialized Elasticsearch Plugin") 105 return 106} 107 108func (p *ESPlugin) IndexerShutdown() { 109 p.indexor.Stop() 110 return 111} 112 113func (p *ESPlugin) ErrorHandler() { 114 for { 115 errBuf := <-p.indexor.ErrorChannel 116 Debug(1, "[ELASTICSEARCH]", errBuf.Err) 117 } 118} 119 120func (p *ESPlugin) RttDurationToMs(d time.Duration) int64 { 121 sec := d / time.Second 122 nsec := d % time.Second 123 fl := float64(sec) + float64(nsec)*1e-6 124 return int64(fl) 125} 126 127func (p *ESPlugin) ResponseAnalyze(req, resp []byte, start, stop time.Time) { 128 if len(resp) == 0 { 129 // nil http response - skipped elasticsearch export for this request 130 return 131 } 132 t := time.Now() 133 rtt := p.RttDurationToMs(stop.Sub(start)) 134 req = payloadBody(req) 135 136 esResp := ESRequestResponse{ 137 ReqURL: string(proto.Path(req)), 138 ReqMethod: string(proto.Method(req)), 139 ReqUserAgent: string(proto.Header(req, []byte("User-Agent"))), 140 ReqAcceptLanguage: string(proto.Header(req, []byte("Accept-Language"))), 141 ReqAccept: string(proto.Header(req, []byte("Accept"))), 142 ReqAcceptEncoding: string(proto.Header(req, []byte("Accept-Encoding"))), 143 ReqIfModifiedSince: string(proto.Header(req, []byte("If-Modified-Since"))), 144 ReqConnection: string(proto.Header(req, []byte("Connection"))), 145 ReqCookies: string(proto.Header(req, []byte("Cookie"))), 146 RespStatus: string(proto.Status(resp)), 147 RespStatusCode: string(proto.Status(resp)), 148 RespProto: string(proto.Method(resp)), 149 RespContentLength: string(proto.Header(resp, []byte("Content-Length"))), 150 RespContentType: string(proto.Header(resp, []byte("Content-Type"))), 151 RespTransferEncoding: string(proto.Header(resp, []byte("Transfer-Encoding"))), 152 RespContentEncoding: string(proto.Header(resp, []byte("Content-Encoding"))), 153 RespExpires: string(proto.Header(resp, []byte("Expires"))), 154 RespCacheControl: string(proto.Header(resp, []byte("Cache-Control"))), 155 RespVary: string(proto.Header(resp, []byte("Vary"))), 156 RespSetCookie: string(proto.Header(resp, []byte("Set-Cookie"))), 157 Rtt: rtt, 158 Timestamp: t, 159 } 160 j, err := json.Marshal(&esResp) 161 if err != nil { 162 Debug(0, "[ELASTIC-RESPONSE]", err) 163 } else { 164 p.indexor.Index(p.Index, "RequestResponse", "", "", "", &t, j) 165 } 166 return 167} 168