1// Package splunk provides the log driver for forwarding server logs to
2// Splunk HTTP Event Collector endpoint.
3package splunk // import "github.com/docker/docker/daemon/logger/splunk"
4
5import (
6	"bytes"
7	"compress/gzip"
8	"context"
9	"crypto/tls"
10	"crypto/x509"
11	"encoding/json"
12	"fmt"
13	"io"
14	"io/ioutil"
15	"net/http"
16	"net/url"
17	"os"
18	"strconv"
19	"strings"
20	"sync"
21	"time"
22
23	"github.com/docker/docker/daemon/logger"
24	"github.com/docker/docker/daemon/logger/loggerutils"
25	"github.com/docker/docker/pkg/pools"
26	"github.com/docker/docker/pkg/urlutil"
27	"github.com/sirupsen/logrus"
28)
29
30const (
31	driverName                    = "splunk"
32	splunkURLKey                  = "splunk-url"
33	splunkTokenKey                = "splunk-token"
34	splunkSourceKey               = "splunk-source"
35	splunkSourceTypeKey           = "splunk-sourcetype"
36	splunkIndexKey                = "splunk-index"
37	splunkCAPathKey               = "splunk-capath"
38	splunkCANameKey               = "splunk-caname"
39	splunkInsecureSkipVerifyKey   = "splunk-insecureskipverify"
40	splunkFormatKey               = "splunk-format"
41	splunkVerifyConnectionKey     = "splunk-verify-connection"
42	splunkGzipCompressionKey      = "splunk-gzip"
43	splunkGzipCompressionLevelKey = "splunk-gzip-level"
44	envKey                        = "env"
45	envRegexKey                   = "env-regex"
46	labelsKey                     = "labels"
47	tagKey                        = "tag"
48)
49
50const (
51	// How often do we send messages (if we are not reaching batch size)
52	defaultPostMessagesFrequency = 5 * time.Second
53	// How big can be batch of messages
54	defaultPostMessagesBatchSize = 1000
55	// Maximum number of messages we can store in buffer
56	defaultBufferMaximum = 10 * defaultPostMessagesBatchSize
57	// Number of messages allowed to be queued in the channel
58	defaultStreamChannelSize = 4 * defaultPostMessagesBatchSize
59	// maxResponseSize is the max amount that will be read from an http response
60	maxResponseSize = 1024
61)
62
63const (
64	envVarPostMessagesFrequency = "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_FREQUENCY"
65	envVarPostMessagesBatchSize = "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_BATCH_SIZE"
66	envVarBufferMaximum         = "SPLUNK_LOGGING_DRIVER_BUFFER_MAX"
67	envVarStreamChannelSize     = "SPLUNK_LOGGING_DRIVER_CHANNEL_SIZE"
68)
69
70var batchSendTimeout = 30 * time.Second
71
72type splunkLoggerInterface interface {
73	logger.Logger
74	worker()
75}
76
77type splunkLogger struct {
78	client    *http.Client
79	transport *http.Transport
80
81	url         string
82	auth        string
83	nullMessage *splunkMessage
84
85	// http compression
86	gzipCompression      bool
87	gzipCompressionLevel int
88
89	// Advanced options
90	postMessagesFrequency time.Duration
91	postMessagesBatchSize int
92	bufferMaximum         int
93
94	// For synchronization between background worker and logger.
95	// We use channel to send messages to worker go routine.
96	// All other variables for blocking Close call before we flush all messages to HEC
97	stream     chan *splunkMessage
98	lock       sync.RWMutex
99	closed     bool
100	closedCond *sync.Cond
101}
102
103type splunkLoggerInline struct {
104	*splunkLogger
105
106	nullEvent *splunkMessageEvent
107}
108
109type splunkLoggerJSON struct {
110	*splunkLoggerInline
111}
112
113type splunkLoggerRaw struct {
114	*splunkLogger
115
116	prefix []byte
117}
118
119type splunkMessage struct {
120	Event      interface{} `json:"event"`
121	Time       string      `json:"time"`
122	Host       string      `json:"host"`
123	Source     string      `json:"source,omitempty"`
124	SourceType string      `json:"sourcetype,omitempty"`
125	Index      string      `json:"index,omitempty"`
126}
127
128type splunkMessageEvent struct {
129	Line   interface{}       `json:"line"`
130	Source string            `json:"source"`
131	Tag    string            `json:"tag,omitempty"`
132	Attrs  map[string]string `json:"attrs,omitempty"`
133}
134
135const (
136	splunkFormatRaw    = "raw"
137	splunkFormatJSON   = "json"
138	splunkFormatInline = "inline"
139)
140
141func init() {
142	if err := logger.RegisterLogDriver(driverName, New); err != nil {
143		logrus.Fatal(err)
144	}
145	if err := logger.RegisterLogOptValidator(driverName, ValidateLogOpt); err != nil {
146		logrus.Fatal(err)
147	}
148}
149
150// New creates splunk logger driver using configuration passed in context
151func New(info logger.Info) (logger.Logger, error) {
152	hostname, err := info.Hostname()
153	if err != nil {
154		return nil, fmt.Errorf("%s: cannot access hostname to set source field", driverName)
155	}
156
157	// Parse and validate Splunk URL
158	splunkURL, err := parseURL(info)
159	if err != nil {
160		return nil, err
161	}
162
163	// Splunk Token is required parameter
164	splunkToken, ok := info.Config[splunkTokenKey]
165	if !ok {
166		return nil, fmt.Errorf("%s: %s is expected", driverName, splunkTokenKey)
167	}
168
169	tlsConfig := &tls.Config{}
170
171	// Splunk is using autogenerated certificates by default,
172	// allow users to trust them with skipping verification
173	if insecureSkipVerifyStr, ok := info.Config[splunkInsecureSkipVerifyKey]; ok {
174		insecureSkipVerify, err := strconv.ParseBool(insecureSkipVerifyStr)
175		if err != nil {
176			return nil, err
177		}
178		tlsConfig.InsecureSkipVerify = insecureSkipVerify
179	}
180
181	// If path to the root certificate is provided - load it
182	if caPath, ok := info.Config[splunkCAPathKey]; ok {
183		caCert, err := ioutil.ReadFile(caPath)
184		if err != nil {
185			return nil, err
186		}
187		caPool := x509.NewCertPool()
188		caPool.AppendCertsFromPEM(caCert)
189		tlsConfig.RootCAs = caPool
190	}
191
192	if caName, ok := info.Config[splunkCANameKey]; ok {
193		tlsConfig.ServerName = caName
194	}
195
196	gzipCompression := false
197	if gzipCompressionStr, ok := info.Config[splunkGzipCompressionKey]; ok {
198		gzipCompression, err = strconv.ParseBool(gzipCompressionStr)
199		if err != nil {
200			return nil, err
201		}
202	}
203
204	gzipCompressionLevel := gzip.DefaultCompression
205	if gzipCompressionLevelStr, ok := info.Config[splunkGzipCompressionLevelKey]; ok {
206		var err error
207		gzipCompressionLevel64, err := strconv.ParseInt(gzipCompressionLevelStr, 10, 32)
208		if err != nil {
209			return nil, err
210		}
211		gzipCompressionLevel = int(gzipCompressionLevel64)
212		if gzipCompressionLevel < gzip.DefaultCompression || gzipCompressionLevel > gzip.BestCompression {
213			err := fmt.Errorf("not supported level '%s' for %s (supported values between %d and %d)",
214				gzipCompressionLevelStr, splunkGzipCompressionLevelKey, gzip.DefaultCompression, gzip.BestCompression)
215			return nil, err
216		}
217	}
218
219	transport := &http.Transport{
220		TLSClientConfig: tlsConfig,
221		Proxy:           http.ProxyFromEnvironment,
222	}
223	client := &http.Client{
224		Transport: transport,
225	}
226
227	source := info.Config[splunkSourceKey]
228	sourceType := info.Config[splunkSourceTypeKey]
229	index := info.Config[splunkIndexKey]
230
231	var nullMessage = &splunkMessage{
232		Host:       hostname,
233		Source:     source,
234		SourceType: sourceType,
235		Index:      index,
236	}
237
238	// Allow user to remove tag from the messages by setting tag to empty string
239	tag := ""
240	if tagTemplate, ok := info.Config[tagKey]; !ok || tagTemplate != "" {
241		tag, err = loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
242		if err != nil {
243			return nil, err
244		}
245	}
246
247	attrs, err := info.ExtraAttributes(nil)
248	if err != nil {
249		return nil, err
250	}
251
252	var (
253		postMessagesFrequency = getAdvancedOptionDuration(envVarPostMessagesFrequency, defaultPostMessagesFrequency)
254		postMessagesBatchSize = getAdvancedOptionInt(envVarPostMessagesBatchSize, defaultPostMessagesBatchSize)
255		bufferMaximum         = getAdvancedOptionInt(envVarBufferMaximum, defaultBufferMaximum)
256		streamChannelSize     = getAdvancedOptionInt(envVarStreamChannelSize, defaultStreamChannelSize)
257	)
258
259	logger := &splunkLogger{
260		client:                client,
261		transport:             transport,
262		url:                   splunkURL.String(),
263		auth:                  "Splunk " + splunkToken,
264		nullMessage:           nullMessage,
265		gzipCompression:       gzipCompression,
266		gzipCompressionLevel:  gzipCompressionLevel,
267		stream:                make(chan *splunkMessage, streamChannelSize),
268		postMessagesFrequency: postMessagesFrequency,
269		postMessagesBatchSize: postMessagesBatchSize,
270		bufferMaximum:         bufferMaximum,
271	}
272
273	// By default we verify connection, but we allow use to skip that
274	verifyConnection := true
275	if verifyConnectionStr, ok := info.Config[splunkVerifyConnectionKey]; ok {
276		var err error
277		verifyConnection, err = strconv.ParseBool(verifyConnectionStr)
278		if err != nil {
279			return nil, err
280		}
281	}
282	if verifyConnection {
283		err = verifySplunkConnection(logger)
284		if err != nil {
285			return nil, err
286		}
287	}
288
289	var splunkFormat string
290	if splunkFormatParsed, ok := info.Config[splunkFormatKey]; ok {
291		switch splunkFormatParsed {
292		case splunkFormatInline:
293		case splunkFormatJSON:
294		case splunkFormatRaw:
295		default:
296			return nil, fmt.Errorf("Unknown format specified %s, supported formats are inline, json and raw", splunkFormat)
297		}
298		splunkFormat = splunkFormatParsed
299	} else {
300		splunkFormat = splunkFormatInline
301	}
302
303	var loggerWrapper splunkLoggerInterface
304
305	switch splunkFormat {
306	case splunkFormatInline:
307		nullEvent := &splunkMessageEvent{
308			Tag:   tag,
309			Attrs: attrs,
310		}
311
312		loggerWrapper = &splunkLoggerInline{logger, nullEvent}
313	case splunkFormatJSON:
314		nullEvent := &splunkMessageEvent{
315			Tag:   tag,
316			Attrs: attrs,
317		}
318
319		loggerWrapper = &splunkLoggerJSON{&splunkLoggerInline{logger, nullEvent}}
320	case splunkFormatRaw:
321		var prefix bytes.Buffer
322		if tag != "" {
323			prefix.WriteString(tag)
324			prefix.WriteString(" ")
325		}
326		for key, value := range attrs {
327			prefix.WriteString(key)
328			prefix.WriteString("=")
329			prefix.WriteString(value)
330			prefix.WriteString(" ")
331		}
332
333		loggerWrapper = &splunkLoggerRaw{logger, prefix.Bytes()}
334	default:
335		return nil, fmt.Errorf("Unexpected format %s", splunkFormat)
336	}
337
338	go loggerWrapper.worker()
339
340	return loggerWrapper, nil
341}
342
343func (l *splunkLoggerInline) Log(msg *logger.Message) error {
344	message := l.createSplunkMessage(msg)
345
346	event := *l.nullEvent
347	event.Line = string(msg.Line)
348	event.Source = msg.Source
349
350	message.Event = &event
351	logger.PutMessage(msg)
352	return l.queueMessageAsync(message)
353}
354
355func (l *splunkLoggerJSON) Log(msg *logger.Message) error {
356	message := l.createSplunkMessage(msg)
357	event := *l.nullEvent
358
359	var rawJSONMessage json.RawMessage
360	if err := json.Unmarshal(msg.Line, &rawJSONMessage); err == nil {
361		event.Line = &rawJSONMessage
362	} else {
363		event.Line = string(msg.Line)
364	}
365
366	event.Source = msg.Source
367
368	message.Event = &event
369	logger.PutMessage(msg)
370	return l.queueMessageAsync(message)
371}
372
373func (l *splunkLoggerRaw) Log(msg *logger.Message) error {
374	// empty or whitespace-only messages are not accepted by HEC
375	if strings.TrimSpace(string(msg.Line)) == "" {
376		return nil
377	}
378
379	message := l.createSplunkMessage(msg)
380
381	message.Event = string(append(l.prefix, msg.Line...))
382	logger.PutMessage(msg)
383	return l.queueMessageAsync(message)
384}
385
386func (l *splunkLogger) queueMessageAsync(message *splunkMessage) error {
387	l.lock.RLock()
388	defer l.lock.RUnlock()
389	if l.closedCond != nil {
390		return fmt.Errorf("%s: driver is closed", driverName)
391	}
392	l.stream <- message
393	return nil
394}
395
396func (l *splunkLogger) worker() {
397	timer := time.NewTicker(l.postMessagesFrequency)
398	var messages []*splunkMessage
399	for {
400		select {
401		case message, open := <-l.stream:
402			if !open {
403				l.postMessages(messages, true)
404				l.lock.Lock()
405				defer l.lock.Unlock()
406				l.transport.CloseIdleConnections()
407				l.closed = true
408				l.closedCond.Signal()
409				return
410			}
411			messages = append(messages, message)
412			// Only sending when we get exactly to the batch size,
413			// This also helps not to fire postMessages on every new message,
414			// when previous try failed.
415			if len(messages)%l.postMessagesBatchSize == 0 {
416				messages = l.postMessages(messages, false)
417			}
418		case <-timer.C:
419			messages = l.postMessages(messages, false)
420		}
421	}
422}
423
424func (l *splunkLogger) postMessages(messages []*splunkMessage, lastChance bool) []*splunkMessage {
425	messagesLen := len(messages)
426
427	ctx, cancel := context.WithTimeout(context.Background(), batchSendTimeout)
428	defer cancel()
429
430	for i := 0; i < messagesLen; i += l.postMessagesBatchSize {
431		upperBound := i + l.postMessagesBatchSize
432		if upperBound > messagesLen {
433			upperBound = messagesLen
434		}
435
436		if err := l.tryPostMessages(ctx, messages[i:upperBound]); err != nil {
437			logrus.WithError(err).WithField("module", "logger/splunk").Warn("Error while sending logs")
438			if messagesLen-i >= l.bufferMaximum || lastChance {
439				// If this is last chance - print them all to the daemon log
440				if lastChance {
441					upperBound = messagesLen
442				}
443				// Not all sent, but buffer has got to its maximum, let's log all messages
444				// we could not send and return buffer minus one batch size
445				for j := i; j < upperBound; j++ {
446					if jsonEvent, err := json.Marshal(messages[j]); err != nil {
447						logrus.Error(err)
448					} else {
449						logrus.Error(fmt.Errorf("Failed to send a message '%s'", string(jsonEvent)))
450					}
451				}
452				return messages[upperBound:messagesLen]
453			}
454			// Not all sent, returning buffer from where we have not sent messages
455			return messages[i:messagesLen]
456		}
457	}
458	// All sent, return empty buffer
459	return messages[:0]
460}
461
462func (l *splunkLogger) tryPostMessages(ctx context.Context, messages []*splunkMessage) error {
463	if len(messages) == 0 {
464		return nil
465	}
466	var buffer bytes.Buffer
467	var writer io.Writer
468	var gzipWriter *gzip.Writer
469	var err error
470	// If gzip compression is enabled - create gzip writer with specified compression
471	// level. If gzip compression is disabled, use standard buffer as a writer
472	if l.gzipCompression {
473		gzipWriter, err = gzip.NewWriterLevel(&buffer, l.gzipCompressionLevel)
474		if err != nil {
475			return err
476		}
477		writer = gzipWriter
478	} else {
479		writer = &buffer
480	}
481	for _, message := range messages {
482		jsonEvent, err := json.Marshal(message)
483		if err != nil {
484			return err
485		}
486		if _, err := writer.Write(jsonEvent); err != nil {
487			return err
488		}
489	}
490	// If gzip compression is enabled, tell it, that we are done
491	if l.gzipCompression {
492		err = gzipWriter.Close()
493		if err != nil {
494			return err
495		}
496	}
497	req, err := http.NewRequest("POST", l.url, bytes.NewBuffer(buffer.Bytes()))
498	if err != nil {
499		return err
500	}
501	req = req.WithContext(ctx)
502	req.Header.Set("Authorization", l.auth)
503	// Tell if we are sending gzip compressed body
504	if l.gzipCompression {
505		req.Header.Set("Content-Encoding", "gzip")
506	}
507	resp, err := l.client.Do(req)
508	if err != nil {
509		return err
510	}
511	defer func() {
512		pools.Copy(ioutil.Discard, resp.Body)
513		resp.Body.Close()
514	}()
515	if resp.StatusCode != http.StatusOK {
516		rdr := io.LimitReader(resp.Body, maxResponseSize)
517		body, err := ioutil.ReadAll(rdr)
518		if err != nil {
519			return err
520		}
521		return fmt.Errorf("%s: failed to send event - %s - %s", driverName, resp.Status, string(body))
522	}
523	return nil
524}
525
526func (l *splunkLogger) Close() error {
527	l.lock.Lock()
528	defer l.lock.Unlock()
529	if l.closedCond == nil {
530		l.closedCond = sync.NewCond(&l.lock)
531		close(l.stream)
532		for !l.closed {
533			l.closedCond.Wait()
534		}
535	}
536	return nil
537}
538
539func (l *splunkLogger) Name() string {
540	return driverName
541}
542
543func (l *splunkLogger) createSplunkMessage(msg *logger.Message) *splunkMessage {
544	message := *l.nullMessage
545	message.Time = fmt.Sprintf("%f", float64(msg.Timestamp.UnixNano())/float64(time.Second))
546	return &message
547}
548
549// ValidateLogOpt looks for all supported by splunk driver options
550func ValidateLogOpt(cfg map[string]string) error {
551	for key := range cfg {
552		switch key {
553		case splunkURLKey:
554		case splunkTokenKey:
555		case splunkSourceKey:
556		case splunkSourceTypeKey:
557		case splunkIndexKey:
558		case splunkCAPathKey:
559		case splunkCANameKey:
560		case splunkInsecureSkipVerifyKey:
561		case splunkFormatKey:
562		case splunkVerifyConnectionKey:
563		case splunkGzipCompressionKey:
564		case splunkGzipCompressionLevelKey:
565		case envKey:
566		case envRegexKey:
567		case labelsKey:
568		case tagKey:
569		default:
570			return fmt.Errorf("unknown log opt '%s' for %s log driver", key, driverName)
571		}
572	}
573	return nil
574}
575
576func parseURL(info logger.Info) (*url.URL, error) {
577	splunkURLStr, ok := info.Config[splunkURLKey]
578	if !ok {
579		return nil, fmt.Errorf("%s: %s is expected", driverName, splunkURLKey)
580	}
581
582	splunkURL, err := url.Parse(splunkURLStr)
583	if err != nil {
584		return nil, fmt.Errorf("%s: failed to parse %s as url value in %s", driverName, splunkURLStr, splunkURLKey)
585	}
586
587	if !urlutil.IsURL(splunkURLStr) ||
588		!splunkURL.IsAbs() ||
589		(splunkURL.Path != "" && splunkURL.Path != "/") ||
590		splunkURL.RawQuery != "" ||
591		splunkURL.Fragment != "" {
592		return nil, fmt.Errorf("%s: expected format scheme://dns_name_or_ip:port for %s", driverName, splunkURLKey)
593	}
594
595	splunkURL.Path = "/services/collector/event/1.0"
596
597	return splunkURL, nil
598}
599
600func verifySplunkConnection(l *splunkLogger) error {
601	req, err := http.NewRequest(http.MethodOptions, l.url, nil)
602	if err != nil {
603		return err
604	}
605	resp, err := l.client.Do(req)
606	if err != nil {
607		return err
608	}
609	defer func() {
610		pools.Copy(ioutil.Discard, resp.Body)
611		resp.Body.Close()
612	}()
613
614	if resp.StatusCode != http.StatusOK {
615		rdr := io.LimitReader(resp.Body, maxResponseSize)
616		body, err := ioutil.ReadAll(rdr)
617		if err != nil {
618			return err
619		}
620		return fmt.Errorf("%s: failed to verify connection - %s - %s", driverName, resp.Status, string(body))
621	}
622	return nil
623}
624
625func getAdvancedOptionDuration(envName string, defaultValue time.Duration) time.Duration {
626	valueStr := os.Getenv(envName)
627	if valueStr == "" {
628		return defaultValue
629	}
630	parsedValue, err := time.ParseDuration(valueStr)
631	if err != nil {
632		logrus.Error(fmt.Sprintf("Failed to parse value of %s as duration. Using default %v. %v", envName, defaultValue, err))
633		return defaultValue
634	}
635	return parsedValue
636}
637
638func getAdvancedOptionInt(envName string, defaultValue int) int {
639	valueStr := os.Getenv(envName)
640	if valueStr == "" {
641		return defaultValue
642	}
643	parsedValue, err := strconv.ParseInt(valueStr, 10, 32)
644	if err != nil {
645		logrus.Error(fmt.Sprintf("Failed to parse value of %s as integer. Using default %d. %v", envName, defaultValue, err))
646		return defaultValue
647	}
648	return int(parsedValue)
649}
650