1package main
2
3import (
4	"bytes"
5	"crypto/tls"
6	"crypto/x509"
7	"fmt"
8	"io/ioutil"
9	"log"
10
11	"github.com/Shopify/sarama"
12	"github.com/buger/goreplay/proto"
13)
14
15// KafkaConfig should contains required information to
16// build producers.
17
18type InputKafkaConfig struct {
19	producer sarama.AsyncProducer
20	consumer sarama.Consumer
21	Host     string `json:"input-kafka-host"`
22	Topic    string `json:"input-kafka-topic"`
23	UseJSON  bool   `json:"input-kafka-json-format"`
24}
25
26type OutputKafkaConfig struct {
27	producer sarama.AsyncProducer
28	consumer sarama.Consumer
29	Host     string `json:"output-kafka-host"`
30	Topic    string `json:"output-kafka-topic"`
31	UseJSON  bool   `json:"output-kafka-json-format"`
32}
33
34// KafkaTLSConfig should contains TLS certificates for connecting to secured Kafka clusters
35type KafkaTLSConfig struct {
36	CACert     string `json:"kafka-tls-ca-cert"`
37	clientCert string `json:"kafka-tls-client-cert"`
38	clientKey  string `json:"kafka-tls-client-key"`
39}
40
41// KafkaMessage should contains catched request information that should be
42// passed as Json to Apache Kafka.
43type KafkaMessage struct {
44	ReqURL     string            `json:"Req_URL"`
45	ReqType    string            `json:"Req_Type"`
46	ReqID      string            `json:"Req_ID"`
47	ReqTs      string            `json:"Req_Ts"`
48	ReqMethod  string            `json:"Req_Method"`
49	ReqBody    string            `json:"Req_Body,omitempty"`
50	ReqHeaders map[string]string `json:"Req_Headers,omitempty"`
51}
52
53// NewTLSConfig loads TLS certificates
54func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) {
55	tlsConfig := tls.Config{}
56
57	// Load client cert
58	cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile)
59	if err != nil {
60		return &tlsConfig, err
61	}
62	tlsConfig.Certificates = []tls.Certificate{cert}
63
64	// Load CA cert
65	caCert, err := ioutil.ReadFile(caCertFile)
66	if err != nil {
67		return &tlsConfig, err
68	}
69	caCertPool := x509.NewCertPool()
70	caCertPool.AppendCertsFromPEM(caCert)
71	tlsConfig.RootCAs = caCertPool
72
73	return &tlsConfig, err
74}
75
76// NewKafkaConfig returns Kafka config with or without TLS
77func NewKafkaConfig(tlsConfig *KafkaTLSConfig) *sarama.Config {
78	config := sarama.NewConfig()
79	// Configuration options go here
80	if (tlsConfig != nil) && (tlsConfig.CACert != "") && (tlsConfig.clientCert != "") && (tlsConfig.clientKey != "") {
81		config.Net.TLS.Enable = true
82		tlsConfig, err := NewTLSConfig(tlsConfig.clientCert, tlsConfig.clientKey, tlsConfig.CACert)
83		if err != nil {
84			log.Fatal(err)
85		}
86		config.Net.TLS.Config = tlsConfig
87	}
88	return config
89}
90
91// Dump returns the given request in its HTTP/1.x wire
92// representation.
93func (m KafkaMessage) Dump() ([]byte, error) {
94	var b bytes.Buffer
95
96	b.WriteString(fmt.Sprintf("%s %s %s\n", m.ReqType, m.ReqID, m.ReqTs))
97	b.WriteString(fmt.Sprintf("%s %s HTTP/1.1", m.ReqMethod, m.ReqURL))
98	b.Write(proto.CRLF)
99	for key, value := range m.ReqHeaders {
100		b.WriteString(fmt.Sprintf("%s: %s", key, value))
101		b.Write(proto.CRLF)
102	}
103
104	b.Write(proto.CRLF)
105	b.WriteString(m.ReqBody)
106
107	return b.Bytes(), nil
108}
109