1// Package splunk provides the log driver for forwarding server logs to
2// Splunk HTTP Event Collector endpoint.
3package splunk
4
5import (
6	"bytes"
7	"compress/gzip"
8	"context"
9	"crypto/tls"
10	"crypto/x509"
11	"encoding/json"
12	"fmt"
13	"io"
14	"io/ioutil"
15	"net/http"
16	"net/url"
17	"os"
18	"strconv"
19	"strings"
20	"sync"
21	"time"
22
23	"github.com/docker/docker/daemon/logger"
24	"github.com/docker/docker/daemon/logger/loggerutils"
25	"github.com/docker/docker/pkg/urlutil"
26	"github.com/sirupsen/logrus"
27)
28
29const (
30	driverName                    = "splunk"
31	splunkURLKey                  = "splunk-url"
32	splunkTokenKey                = "splunk-token"
33	splunkSourceKey               = "splunk-source"
34	splunkSourceTypeKey           = "splunk-sourcetype"
35	splunkIndexKey                = "splunk-index"
36	splunkCAPathKey               = "splunk-capath"
37	splunkCANameKey               = "splunk-caname"
38	splunkInsecureSkipVerifyKey   = "splunk-insecureskipverify"
39	splunkFormatKey               = "splunk-format"
40	splunkVerifyConnectionKey     = "splunk-verify-connection"
41	splunkGzipCompressionKey      = "splunk-gzip"
42	splunkGzipCompressionLevelKey = "splunk-gzip-level"
43	envKey                        = "env"
44	envRegexKey                   = "env-regex"
45	labelsKey                     = "labels"
46	tagKey                        = "tag"
47)
48
49const (
50	// How often do we send messages (if we are not reaching batch size)
51	defaultPostMessagesFrequency = 5 * time.Second
52	// How big can be batch of messages
53	defaultPostMessagesBatchSize = 1000
54	// Maximum number of messages we can store in buffer
55	defaultBufferMaximum = 10 * defaultPostMessagesBatchSize
56	// Number of messages allowed to be queued in the channel
57	defaultStreamChannelSize = 4 * defaultPostMessagesBatchSize
58)
59
60const (
61	envVarPostMessagesFrequency = "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_FREQUENCY"
62	envVarPostMessagesBatchSize = "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_BATCH_SIZE"
63	envVarBufferMaximum         = "SPLUNK_LOGGING_DRIVER_BUFFER_MAX"
64	envVarStreamChannelSize     = "SPLUNK_LOGGING_DRIVER_CHANNEL_SIZE"
65)
66
67var batchSendTimeout = 30 * time.Second
68
69type splunkLoggerInterface interface {
70	logger.Logger
71	worker()
72}
73
74type splunkLogger struct {
75	client    *http.Client
76	transport *http.Transport
77
78	url         string
79	auth        string
80	nullMessage *splunkMessage
81
82	// http compression
83	gzipCompression      bool
84	gzipCompressionLevel int
85
86	// Advanced options
87	postMessagesFrequency time.Duration
88	postMessagesBatchSize int
89	bufferMaximum         int
90
91	// For synchronization between background worker and logger.
92	// We use channel to send messages to worker go routine.
93	// All other variables for blocking Close call before we flush all messages to HEC
94	stream     chan *splunkMessage
95	lock       sync.RWMutex
96	closed     bool
97	closedCond *sync.Cond
98}
99
100type splunkLoggerInline struct {
101	*splunkLogger
102
103	nullEvent *splunkMessageEvent
104}
105
106type splunkLoggerJSON struct {
107	*splunkLoggerInline
108}
109
110type splunkLoggerRaw struct {
111	*splunkLogger
112
113	prefix []byte
114}
115
116type splunkMessage struct {
117	Event      interface{} `json:"event"`
118	Time       string      `json:"time"`
119	Host       string      `json:"host"`
120	Source     string      `json:"source,omitempty"`
121	SourceType string      `json:"sourcetype,omitempty"`
122	Index      string      `json:"index,omitempty"`
123}
124
125type splunkMessageEvent struct {
126	Line   interface{}       `json:"line"`
127	Source string            `json:"source"`
128	Tag    string            `json:"tag,omitempty"`
129	Attrs  map[string]string `json:"attrs,omitempty"`
130}
131
132const (
133	splunkFormatRaw    = "raw"
134	splunkFormatJSON   = "json"
135	splunkFormatInline = "inline"
136)
137
138func init() {
139	if err := logger.RegisterLogDriver(driverName, New); err != nil {
140		logrus.Fatal(err)
141	}
142	if err := logger.RegisterLogOptValidator(driverName, ValidateLogOpt); err != nil {
143		logrus.Fatal(err)
144	}
145}
146
147// New creates splunk logger driver using configuration passed in context
148func New(info logger.Info) (logger.Logger, error) {
149	hostname, err := info.Hostname()
150	if err != nil {
151		return nil, fmt.Errorf("%s: cannot access hostname to set source field", driverName)
152	}
153
154	// Parse and validate Splunk URL
155	splunkURL, err := parseURL(info)
156	if err != nil {
157		return nil, err
158	}
159
160	// Splunk Token is required parameter
161	splunkToken, ok := info.Config[splunkTokenKey]
162	if !ok {
163		return nil, fmt.Errorf("%s: %s is expected", driverName, splunkTokenKey)
164	}
165
166	tlsConfig := &tls.Config{}
167
168	// Splunk is using autogenerated certificates by default,
169	// allow users to trust them with skipping verification
170	if insecureSkipVerifyStr, ok := info.Config[splunkInsecureSkipVerifyKey]; ok {
171		insecureSkipVerify, err := strconv.ParseBool(insecureSkipVerifyStr)
172		if err != nil {
173			return nil, err
174		}
175		tlsConfig.InsecureSkipVerify = insecureSkipVerify
176	}
177
178	// If path to the root certificate is provided - load it
179	if caPath, ok := info.Config[splunkCAPathKey]; ok {
180		caCert, err := ioutil.ReadFile(caPath)
181		if err != nil {
182			return nil, err
183		}
184		caPool := x509.NewCertPool()
185		caPool.AppendCertsFromPEM(caCert)
186		tlsConfig.RootCAs = caPool
187	}
188
189	if caName, ok := info.Config[splunkCANameKey]; ok {
190		tlsConfig.ServerName = caName
191	}
192
193	gzipCompression := false
194	if gzipCompressionStr, ok := info.Config[splunkGzipCompressionKey]; ok {
195		gzipCompression, err = strconv.ParseBool(gzipCompressionStr)
196		if err != nil {
197			return nil, err
198		}
199	}
200
201	gzipCompressionLevel := gzip.DefaultCompression
202	if gzipCompressionLevelStr, ok := info.Config[splunkGzipCompressionLevelKey]; ok {
203		var err error
204		gzipCompressionLevel64, err := strconv.ParseInt(gzipCompressionLevelStr, 10, 32)
205		if err != nil {
206			return nil, err
207		}
208		gzipCompressionLevel = int(gzipCompressionLevel64)
209		if gzipCompressionLevel < gzip.DefaultCompression || gzipCompressionLevel > gzip.BestCompression {
210			err := fmt.Errorf("not supported level '%s' for %s (supported values between %d and %d)",
211				gzipCompressionLevelStr, splunkGzipCompressionLevelKey, gzip.DefaultCompression, gzip.BestCompression)
212			return nil, err
213		}
214	}
215
216	transport := &http.Transport{
217		TLSClientConfig: tlsConfig,
218	}
219	client := &http.Client{
220		Transport: transport,
221	}
222
223	source := info.Config[splunkSourceKey]
224	sourceType := info.Config[splunkSourceTypeKey]
225	index := info.Config[splunkIndexKey]
226
227	var nullMessage = &splunkMessage{
228		Host:       hostname,
229		Source:     source,
230		SourceType: sourceType,
231		Index:      index,
232	}
233
234	// Allow user to remove tag from the messages by setting tag to empty string
235	tag := ""
236	if tagTemplate, ok := info.Config[tagKey]; !ok || tagTemplate != "" {
237		tag, err = loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
238		if err != nil {
239			return nil, err
240		}
241	}
242
243	attrs, err := info.ExtraAttributes(nil)
244	if err != nil {
245		return nil, err
246	}
247
248	var (
249		postMessagesFrequency = getAdvancedOptionDuration(envVarPostMessagesFrequency, defaultPostMessagesFrequency)
250		postMessagesBatchSize = getAdvancedOptionInt(envVarPostMessagesBatchSize, defaultPostMessagesBatchSize)
251		bufferMaximum         = getAdvancedOptionInt(envVarBufferMaximum, defaultBufferMaximum)
252		streamChannelSize     = getAdvancedOptionInt(envVarStreamChannelSize, defaultStreamChannelSize)
253	)
254
255	logger := &splunkLogger{
256		client:                client,
257		transport:             transport,
258		url:                   splunkURL.String(),
259		auth:                  "Splunk " + splunkToken,
260		nullMessage:           nullMessage,
261		gzipCompression:       gzipCompression,
262		gzipCompressionLevel:  gzipCompressionLevel,
263		stream:                make(chan *splunkMessage, streamChannelSize),
264		postMessagesFrequency: postMessagesFrequency,
265		postMessagesBatchSize: postMessagesBatchSize,
266		bufferMaximum:         bufferMaximum,
267	}
268
269	// By default we verify connection, but we allow use to skip that
270	verifyConnection := true
271	if verifyConnectionStr, ok := info.Config[splunkVerifyConnectionKey]; ok {
272		var err error
273		verifyConnection, err = strconv.ParseBool(verifyConnectionStr)
274		if err != nil {
275			return nil, err
276		}
277	}
278	if verifyConnection {
279		err = verifySplunkConnection(logger)
280		if err != nil {
281			return nil, err
282		}
283	}
284
285	var splunkFormat string
286	if splunkFormatParsed, ok := info.Config[splunkFormatKey]; ok {
287		switch splunkFormatParsed {
288		case splunkFormatInline:
289		case splunkFormatJSON:
290		case splunkFormatRaw:
291		default:
292			return nil, fmt.Errorf("Unknown format specified %s, supported formats are inline, json and raw", splunkFormat)
293		}
294		splunkFormat = splunkFormatParsed
295	} else {
296		splunkFormat = splunkFormatInline
297	}
298
299	var loggerWrapper splunkLoggerInterface
300
301	switch splunkFormat {
302	case splunkFormatInline:
303		nullEvent := &splunkMessageEvent{
304			Tag:   tag,
305			Attrs: attrs,
306		}
307
308		loggerWrapper = &splunkLoggerInline{logger, nullEvent}
309	case splunkFormatJSON:
310		nullEvent := &splunkMessageEvent{
311			Tag:   tag,
312			Attrs: attrs,
313		}
314
315		loggerWrapper = &splunkLoggerJSON{&splunkLoggerInline{logger, nullEvent}}
316	case splunkFormatRaw:
317		var prefix bytes.Buffer
318		if tag != "" {
319			prefix.WriteString(tag)
320			prefix.WriteString(" ")
321		}
322		for key, value := range attrs {
323			prefix.WriteString(key)
324			prefix.WriteString("=")
325			prefix.WriteString(value)
326			prefix.WriteString(" ")
327		}
328
329		loggerWrapper = &splunkLoggerRaw{logger, prefix.Bytes()}
330	default:
331		return nil, fmt.Errorf("Unexpected format %s", splunkFormat)
332	}
333
334	go loggerWrapper.worker()
335
336	return loggerWrapper, nil
337}
338
339func (l *splunkLoggerInline) Log(msg *logger.Message) error {
340	message := l.createSplunkMessage(msg)
341
342	event := *l.nullEvent
343	event.Line = string(msg.Line)
344	event.Source = msg.Source
345
346	message.Event = &event
347	logger.PutMessage(msg)
348	return l.queueMessageAsync(message)
349}
350
351func (l *splunkLoggerJSON) Log(msg *logger.Message) error {
352	message := l.createSplunkMessage(msg)
353	event := *l.nullEvent
354
355	var rawJSONMessage json.RawMessage
356	if err := json.Unmarshal(msg.Line, &rawJSONMessage); err == nil {
357		event.Line = &rawJSONMessage
358	} else {
359		event.Line = string(msg.Line)
360	}
361
362	event.Source = msg.Source
363
364	message.Event = &event
365	logger.PutMessage(msg)
366	return l.queueMessageAsync(message)
367}
368
369func (l *splunkLoggerRaw) Log(msg *logger.Message) error {
370	// empty or whitespace-only messages are not accepted by HEC
371	if strings.TrimSpace(string(msg.Line)) == "" {
372		return nil
373	}
374
375	message := l.createSplunkMessage(msg)
376
377	message.Event = string(append(l.prefix, msg.Line...))
378	logger.PutMessage(msg)
379	return l.queueMessageAsync(message)
380}
381
382func (l *splunkLogger) queueMessageAsync(message *splunkMessage) error {
383	l.lock.RLock()
384	defer l.lock.RUnlock()
385	if l.closedCond != nil {
386		return fmt.Errorf("%s: driver is closed", driverName)
387	}
388	l.stream <- message
389	return nil
390}
391
392func (l *splunkLogger) worker() {
393	timer := time.NewTicker(l.postMessagesFrequency)
394	var messages []*splunkMessage
395	for {
396		select {
397		case message, open := <-l.stream:
398			if !open {
399				l.postMessages(messages, true)
400				l.lock.Lock()
401				defer l.lock.Unlock()
402				l.transport.CloseIdleConnections()
403				l.closed = true
404				l.closedCond.Signal()
405				return
406			}
407			messages = append(messages, message)
408			// Only sending when we get exactly to the batch size,
409			// This also helps not to fire postMessages on every new message,
410			// when previous try failed.
411			if len(messages)%l.postMessagesBatchSize == 0 {
412				messages = l.postMessages(messages, false)
413			}
414		case <-timer.C:
415			messages = l.postMessages(messages, false)
416		}
417	}
418}
419
420func (l *splunkLogger) postMessages(messages []*splunkMessage, lastChance bool) []*splunkMessage {
421	messagesLen := len(messages)
422
423	ctx, cancel := context.WithTimeout(context.Background(), batchSendTimeout)
424	defer cancel()
425
426	for i := 0; i < messagesLen; i += l.postMessagesBatchSize {
427		upperBound := i + l.postMessagesBatchSize
428		if upperBound > messagesLen {
429			upperBound = messagesLen
430		}
431
432		if err := l.tryPostMessages(ctx, messages[i:upperBound]); err != nil {
433			logrus.WithError(err).WithField("module", "logger/splunk").Warn("Error while sending logs")
434			if messagesLen-i >= l.bufferMaximum || lastChance {
435				// If this is last chance - print them all to the daemon log
436				if lastChance {
437					upperBound = messagesLen
438				}
439				// Not all sent, but buffer has got to its maximum, let's log all messages
440				// we could not send and return buffer minus one batch size
441				for j := i; j < upperBound; j++ {
442					if jsonEvent, err := json.Marshal(messages[j]); err != nil {
443						logrus.Error(err)
444					} else {
445						logrus.Error(fmt.Errorf("Failed to send a message '%s'", string(jsonEvent)))
446					}
447				}
448				return messages[upperBound:messagesLen]
449			}
450			// Not all sent, returning buffer from where we have not sent messages
451			return messages[i:messagesLen]
452		}
453	}
454	// All sent, return empty buffer
455	return messages[:0]
456}
457
458func (l *splunkLogger) tryPostMessages(ctx context.Context, messages []*splunkMessage) error {
459	if len(messages) == 0 {
460		return nil
461	}
462	var buffer bytes.Buffer
463	var writer io.Writer
464	var gzipWriter *gzip.Writer
465	var err error
466	// If gzip compression is enabled - create gzip writer with specified compression
467	// level. If gzip compression is disabled, use standard buffer as a writer
468	if l.gzipCompression {
469		gzipWriter, err = gzip.NewWriterLevel(&buffer, l.gzipCompressionLevel)
470		if err != nil {
471			return err
472		}
473		writer = gzipWriter
474	} else {
475		writer = &buffer
476	}
477	for _, message := range messages {
478		jsonEvent, err := json.Marshal(message)
479		if err != nil {
480			return err
481		}
482		if _, err := writer.Write(jsonEvent); err != nil {
483			return err
484		}
485	}
486	// If gzip compression is enabled, tell it, that we are done
487	if l.gzipCompression {
488		err = gzipWriter.Close()
489		if err != nil {
490			return err
491		}
492	}
493	req, err := http.NewRequest("POST", l.url, bytes.NewBuffer(buffer.Bytes()))
494	if err != nil {
495		return err
496	}
497	req = req.WithContext(ctx)
498	req.Header.Set("Authorization", l.auth)
499	// Tell if we are sending gzip compressed body
500	if l.gzipCompression {
501		req.Header.Set("Content-Encoding", "gzip")
502	}
503	res, err := l.client.Do(req)
504	if err != nil {
505		return err
506	}
507	defer res.Body.Close()
508	if res.StatusCode != http.StatusOK {
509		var body []byte
510		body, err = ioutil.ReadAll(res.Body)
511		if err != nil {
512			return err
513		}
514		return fmt.Errorf("%s: failed to send event - %s - %s", driverName, res.Status, body)
515	}
516	io.Copy(ioutil.Discard, res.Body)
517	return nil
518}
519
520func (l *splunkLogger) Close() error {
521	l.lock.Lock()
522	defer l.lock.Unlock()
523	if l.closedCond == nil {
524		l.closedCond = sync.NewCond(&l.lock)
525		close(l.stream)
526		for !l.closed {
527			l.closedCond.Wait()
528		}
529	}
530	return nil
531}
532
533func (l *splunkLogger) Name() string {
534	return driverName
535}
536
537func (l *splunkLogger) createSplunkMessage(msg *logger.Message) *splunkMessage {
538	message := *l.nullMessage
539	message.Time = fmt.Sprintf("%f", float64(msg.Timestamp.UnixNano())/float64(time.Second))
540	return &message
541}
542
543// ValidateLogOpt looks for all supported by splunk driver options
544func ValidateLogOpt(cfg map[string]string) error {
545	for key := range cfg {
546		switch key {
547		case splunkURLKey:
548		case splunkTokenKey:
549		case splunkSourceKey:
550		case splunkSourceTypeKey:
551		case splunkIndexKey:
552		case splunkCAPathKey:
553		case splunkCANameKey:
554		case splunkInsecureSkipVerifyKey:
555		case splunkFormatKey:
556		case splunkVerifyConnectionKey:
557		case splunkGzipCompressionKey:
558		case splunkGzipCompressionLevelKey:
559		case envKey:
560		case envRegexKey:
561		case labelsKey:
562		case tagKey:
563		default:
564			return fmt.Errorf("unknown log opt '%s' for %s log driver", key, driverName)
565		}
566	}
567	return nil
568}
569
570func parseURL(info logger.Info) (*url.URL, error) {
571	splunkURLStr, ok := info.Config[splunkURLKey]
572	if !ok {
573		return nil, fmt.Errorf("%s: %s is expected", driverName, splunkURLKey)
574	}
575
576	splunkURL, err := url.Parse(splunkURLStr)
577	if err != nil {
578		return nil, fmt.Errorf("%s: failed to parse %s as url value in %s", driverName, splunkURLStr, splunkURLKey)
579	}
580
581	if !urlutil.IsURL(splunkURLStr) ||
582		!splunkURL.IsAbs() ||
583		(splunkURL.Path != "" && splunkURL.Path != "/") ||
584		splunkURL.RawQuery != "" ||
585		splunkURL.Fragment != "" {
586		return nil, fmt.Errorf("%s: expected format scheme://dns_name_or_ip:port for %s", driverName, splunkURLKey)
587	}
588
589	splunkURL.Path = "/services/collector/event/1.0"
590
591	return splunkURL, nil
592}
593
594func verifySplunkConnection(l *splunkLogger) error {
595	req, err := http.NewRequest(http.MethodOptions, l.url, nil)
596	if err != nil {
597		return err
598	}
599	res, err := l.client.Do(req)
600	if err != nil {
601		return err
602	}
603	if res.Body != nil {
604		defer res.Body.Close()
605	}
606	if res.StatusCode != http.StatusOK {
607		var body []byte
608		body, err = ioutil.ReadAll(res.Body)
609		if err != nil {
610			return err
611		}
612		return fmt.Errorf("%s: failed to verify connection - %s - %s", driverName, res.Status, body)
613	}
614	return nil
615}
616
617func getAdvancedOptionDuration(envName string, defaultValue time.Duration) time.Duration {
618	valueStr := os.Getenv(envName)
619	if valueStr == "" {
620		return defaultValue
621	}
622	parsedValue, err := time.ParseDuration(valueStr)
623	if err != nil {
624		logrus.Error(fmt.Sprintf("Failed to parse value of %s as duration. Using default %v. %v", envName, defaultValue, err))
625		return defaultValue
626	}
627	return parsedValue
628}
629
630func getAdvancedOptionInt(envName string, defaultValue int) int {
631	valueStr := os.Getenv(envName)
632	if valueStr == "" {
633		return defaultValue
634	}
635	parsedValue, err := strconv.ParseInt(valueStr, 10, 32)
636	if err != nil {
637		logrus.Error(fmt.Sprintf("Failed to parse value of %s as integer. Using default %d. %v", envName, defaultValue, err))
638		return defaultValue
639	}
640	return int(parsedValue)
641}
642