1package main 2 3import ( 4 "bytes" 5 "crypto/tls" 6 "crypto/x509" 7 "fmt" 8 "io/ioutil" 9 "log" 10 11 "github.com/Shopify/sarama" 12 "github.com/buger/goreplay/proto" 13) 14 15// KafkaConfig should contains required information to 16// build producers. 17 18type InputKafkaConfig struct { 19 producer sarama.AsyncProducer 20 consumer sarama.Consumer 21 Host string `json:"input-kafka-host"` 22 Topic string `json:"input-kafka-topic"` 23 UseJSON bool `json:"input-kafka-json-format"` 24} 25 26type OutputKafkaConfig struct { 27 producer sarama.AsyncProducer 28 consumer sarama.Consumer 29 Host string `json:"output-kafka-host"` 30 Topic string `json:"output-kafka-topic"` 31 UseJSON bool `json:"output-kafka-json-format"` 32} 33 34// KafkaTLSConfig should contains TLS certificates for connecting to secured Kafka clusters 35type KafkaTLSConfig struct { 36 CACert string `json:"kafka-tls-ca-cert"` 37 clientCert string `json:"kafka-tls-client-cert"` 38 clientKey string `json:"kafka-tls-client-key"` 39} 40 41// KafkaMessage should contains catched request information that should be 42// passed as Json to Apache Kafka. 43type KafkaMessage struct { 44 ReqURL string `json:"Req_URL"` 45 ReqType string `json:"Req_Type"` 46 ReqID string `json:"Req_ID"` 47 ReqTs string `json:"Req_Ts"` 48 ReqMethod string `json:"Req_Method"` 49 ReqBody string `json:"Req_Body,omitempty"` 50 ReqHeaders map[string]string `json:"Req_Headers,omitempty"` 51} 52 53// NewTLSConfig loads TLS certificates 54func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) { 55 tlsConfig := tls.Config{} 56 57 // Load client cert 58 cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile) 59 if err != nil { 60 return &tlsConfig, err 61 } 62 tlsConfig.Certificates = []tls.Certificate{cert} 63 64 // Load CA cert 65 caCert, err := ioutil.ReadFile(caCertFile) 66 if err != nil { 67 return &tlsConfig, err 68 } 69 caCertPool := x509.NewCertPool() 70 caCertPool.AppendCertsFromPEM(caCert) 71 tlsConfig.RootCAs = caCertPool 72 73 return &tlsConfig, err 74} 75 76// NewKafkaConfig returns Kafka config with or without TLS 77func NewKafkaConfig(tlsConfig *KafkaTLSConfig) *sarama.Config { 78 config := sarama.NewConfig() 79 // Configuration options go here 80 if (tlsConfig != nil) && (tlsConfig.CACert != "") && (tlsConfig.clientCert != "") && (tlsConfig.clientKey != "") { 81 config.Net.TLS.Enable = true 82 tlsConfig, err := NewTLSConfig(tlsConfig.clientCert, tlsConfig.clientKey, tlsConfig.CACert) 83 if err != nil { 84 log.Fatal(err) 85 } 86 config.Net.TLS.Config = tlsConfig 87 } 88 return config 89} 90 91// Dump returns the given request in its HTTP/1.x wire 92// representation. 93func (m KafkaMessage) Dump() ([]byte, error) { 94 var b bytes.Buffer 95 96 b.WriteString(fmt.Sprintf("%s %s %s\n", m.ReqType, m.ReqID, m.ReqTs)) 97 b.WriteString(fmt.Sprintf("%s %s HTTP/1.1", m.ReqMethod, m.ReqURL)) 98 b.Write(proto.CRLF) 99 for key, value := range m.ReqHeaders { 100 b.WriteString(fmt.Sprintf("%s: %s", key, value)) 101 b.Write(proto.CRLF) 102 } 103 104 b.Write(proto.CRLF) 105 b.WriteString(m.ReqBody) 106 107 return b.Bytes(), nil 108} 109