1// Package splunk provides the log driver for forwarding server logs to 2// Splunk HTTP Event Collector endpoint. 3package splunk 4 5import ( 6 "bytes" 7 "compress/gzip" 8 "context" 9 "crypto/tls" 10 "crypto/x509" 11 "encoding/json" 12 "fmt" 13 "io" 14 "io/ioutil" 15 "net/http" 16 "net/url" 17 "os" 18 "strconv" 19 "strings" 20 "sync" 21 "time" 22 23 "github.com/docker/docker/daemon/logger" 24 "github.com/docker/docker/daemon/logger/loggerutils" 25 "github.com/docker/docker/pkg/urlutil" 26 "github.com/sirupsen/logrus" 27) 28 29const ( 30 driverName = "splunk" 31 splunkURLKey = "splunk-url" 32 splunkTokenKey = "splunk-token" 33 splunkSourceKey = "splunk-source" 34 splunkSourceTypeKey = "splunk-sourcetype" 35 splunkIndexKey = "splunk-index" 36 splunkCAPathKey = "splunk-capath" 37 splunkCANameKey = "splunk-caname" 38 splunkInsecureSkipVerifyKey = "splunk-insecureskipverify" 39 splunkFormatKey = "splunk-format" 40 splunkVerifyConnectionKey = "splunk-verify-connection" 41 splunkGzipCompressionKey = "splunk-gzip" 42 splunkGzipCompressionLevelKey = "splunk-gzip-level" 43 envKey = "env" 44 envRegexKey = "env-regex" 45 labelsKey = "labels" 46 tagKey = "tag" 47) 48 49const ( 50 // How often do we send messages (if we are not reaching batch size) 51 defaultPostMessagesFrequency = 5 * time.Second 52 // How big can be batch of messages 53 defaultPostMessagesBatchSize = 1000 54 // Maximum number of messages we can store in buffer 55 defaultBufferMaximum = 10 * defaultPostMessagesBatchSize 56 // Number of messages allowed to be queued in the channel 57 defaultStreamChannelSize = 4 * defaultPostMessagesBatchSize 58) 59 60const ( 61 envVarPostMessagesFrequency = "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_FREQUENCY" 62 envVarPostMessagesBatchSize = "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_BATCH_SIZE" 63 envVarBufferMaximum = "SPLUNK_LOGGING_DRIVER_BUFFER_MAX" 64 envVarStreamChannelSize = "SPLUNK_LOGGING_DRIVER_CHANNEL_SIZE" 65) 66 67var batchSendTimeout = 30 * time.Second 68 69type splunkLoggerInterface interface { 70 logger.Logger 71 worker() 72} 73 74type splunkLogger struct { 75 client *http.Client 76 transport *http.Transport 77 78 url string 79 auth string 80 nullMessage *splunkMessage 81 82 // http compression 83 gzipCompression bool 84 gzipCompressionLevel int 85 86 // Advanced options 87 postMessagesFrequency time.Duration 88 postMessagesBatchSize int 89 bufferMaximum int 90 91 // For synchronization between background worker and logger. 92 // We use channel to send messages to worker go routine. 93 // All other variables for blocking Close call before we flush all messages to HEC 94 stream chan *splunkMessage 95 lock sync.RWMutex 96 closed bool 97 closedCond *sync.Cond 98} 99 100type splunkLoggerInline struct { 101 *splunkLogger 102 103 nullEvent *splunkMessageEvent 104} 105 106type splunkLoggerJSON struct { 107 *splunkLoggerInline 108} 109 110type splunkLoggerRaw struct { 111 *splunkLogger 112 113 prefix []byte 114} 115 116type splunkMessage struct { 117 Event interface{} `json:"event"` 118 Time string `json:"time"` 119 Host string `json:"host"` 120 Source string `json:"source,omitempty"` 121 SourceType string `json:"sourcetype,omitempty"` 122 Index string `json:"index,omitempty"` 123} 124 125type splunkMessageEvent struct { 126 Line interface{} `json:"line"` 127 Source string `json:"source"` 128 Tag string `json:"tag,omitempty"` 129 Attrs map[string]string `json:"attrs,omitempty"` 130} 131 132const ( 133 splunkFormatRaw = "raw" 134 splunkFormatJSON = "json" 135 splunkFormatInline = "inline" 136) 137 138func init() { 139 if err := logger.RegisterLogDriver(driverName, New); err != nil { 140 logrus.Fatal(err) 141 } 142 if err := logger.RegisterLogOptValidator(driverName, ValidateLogOpt); err != nil { 143 logrus.Fatal(err) 144 } 145} 146 147// New creates splunk logger driver using configuration passed in context 148func New(info logger.Info) (logger.Logger, error) { 149 hostname, err := info.Hostname() 150 if err != nil { 151 return nil, fmt.Errorf("%s: cannot access hostname to set source field", driverName) 152 } 153 154 // Parse and validate Splunk URL 155 splunkURL, err := parseURL(info) 156 if err != nil { 157 return nil, err 158 } 159 160 // Splunk Token is required parameter 161 splunkToken, ok := info.Config[splunkTokenKey] 162 if !ok { 163 return nil, fmt.Errorf("%s: %s is expected", driverName, splunkTokenKey) 164 } 165 166 tlsConfig := &tls.Config{} 167 168 // Splunk is using autogenerated certificates by default, 169 // allow users to trust them with skipping verification 170 if insecureSkipVerifyStr, ok := info.Config[splunkInsecureSkipVerifyKey]; ok { 171 insecureSkipVerify, err := strconv.ParseBool(insecureSkipVerifyStr) 172 if err != nil { 173 return nil, err 174 } 175 tlsConfig.InsecureSkipVerify = insecureSkipVerify 176 } 177 178 // If path to the root certificate is provided - load it 179 if caPath, ok := info.Config[splunkCAPathKey]; ok { 180 caCert, err := ioutil.ReadFile(caPath) 181 if err != nil { 182 return nil, err 183 } 184 caPool := x509.NewCertPool() 185 caPool.AppendCertsFromPEM(caCert) 186 tlsConfig.RootCAs = caPool 187 } 188 189 if caName, ok := info.Config[splunkCANameKey]; ok { 190 tlsConfig.ServerName = caName 191 } 192 193 gzipCompression := false 194 if gzipCompressionStr, ok := info.Config[splunkGzipCompressionKey]; ok { 195 gzipCompression, err = strconv.ParseBool(gzipCompressionStr) 196 if err != nil { 197 return nil, err 198 } 199 } 200 201 gzipCompressionLevel := gzip.DefaultCompression 202 if gzipCompressionLevelStr, ok := info.Config[splunkGzipCompressionLevelKey]; ok { 203 var err error 204 gzipCompressionLevel64, err := strconv.ParseInt(gzipCompressionLevelStr, 10, 32) 205 if err != nil { 206 return nil, err 207 } 208 gzipCompressionLevel = int(gzipCompressionLevel64) 209 if gzipCompressionLevel < gzip.DefaultCompression || gzipCompressionLevel > gzip.BestCompression { 210 err := fmt.Errorf("not supported level '%s' for %s (supported values between %d and %d)", 211 gzipCompressionLevelStr, splunkGzipCompressionLevelKey, gzip.DefaultCompression, gzip.BestCompression) 212 return nil, err 213 } 214 } 215 216 transport := &http.Transport{ 217 TLSClientConfig: tlsConfig, 218 } 219 client := &http.Client{ 220 Transport: transport, 221 } 222 223 source := info.Config[splunkSourceKey] 224 sourceType := info.Config[splunkSourceTypeKey] 225 index := info.Config[splunkIndexKey] 226 227 var nullMessage = &splunkMessage{ 228 Host: hostname, 229 Source: source, 230 SourceType: sourceType, 231 Index: index, 232 } 233 234 // Allow user to remove tag from the messages by setting tag to empty string 235 tag := "" 236 if tagTemplate, ok := info.Config[tagKey]; !ok || tagTemplate != "" { 237 tag, err = loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate) 238 if err != nil { 239 return nil, err 240 } 241 } 242 243 attrs, err := info.ExtraAttributes(nil) 244 if err != nil { 245 return nil, err 246 } 247 248 var ( 249 postMessagesFrequency = getAdvancedOptionDuration(envVarPostMessagesFrequency, defaultPostMessagesFrequency) 250 postMessagesBatchSize = getAdvancedOptionInt(envVarPostMessagesBatchSize, defaultPostMessagesBatchSize) 251 bufferMaximum = getAdvancedOptionInt(envVarBufferMaximum, defaultBufferMaximum) 252 streamChannelSize = getAdvancedOptionInt(envVarStreamChannelSize, defaultStreamChannelSize) 253 ) 254 255 logger := &splunkLogger{ 256 client: client, 257 transport: transport, 258 url: splunkURL.String(), 259 auth: "Splunk " + splunkToken, 260 nullMessage: nullMessage, 261 gzipCompression: gzipCompression, 262 gzipCompressionLevel: gzipCompressionLevel, 263 stream: make(chan *splunkMessage, streamChannelSize), 264 postMessagesFrequency: postMessagesFrequency, 265 postMessagesBatchSize: postMessagesBatchSize, 266 bufferMaximum: bufferMaximum, 267 } 268 269 // By default we verify connection, but we allow use to skip that 270 verifyConnection := true 271 if verifyConnectionStr, ok := info.Config[splunkVerifyConnectionKey]; ok { 272 var err error 273 verifyConnection, err = strconv.ParseBool(verifyConnectionStr) 274 if err != nil { 275 return nil, err 276 } 277 } 278 if verifyConnection { 279 err = verifySplunkConnection(logger) 280 if err != nil { 281 return nil, err 282 } 283 } 284 285 var splunkFormat string 286 if splunkFormatParsed, ok := info.Config[splunkFormatKey]; ok { 287 switch splunkFormatParsed { 288 case splunkFormatInline: 289 case splunkFormatJSON: 290 case splunkFormatRaw: 291 default: 292 return nil, fmt.Errorf("Unknown format specified %s, supported formats are inline, json and raw", splunkFormat) 293 } 294 splunkFormat = splunkFormatParsed 295 } else { 296 splunkFormat = splunkFormatInline 297 } 298 299 var loggerWrapper splunkLoggerInterface 300 301 switch splunkFormat { 302 case splunkFormatInline: 303 nullEvent := &splunkMessageEvent{ 304 Tag: tag, 305 Attrs: attrs, 306 } 307 308 loggerWrapper = &splunkLoggerInline{logger, nullEvent} 309 case splunkFormatJSON: 310 nullEvent := &splunkMessageEvent{ 311 Tag: tag, 312 Attrs: attrs, 313 } 314 315 loggerWrapper = &splunkLoggerJSON{&splunkLoggerInline{logger, nullEvent}} 316 case splunkFormatRaw: 317 var prefix bytes.Buffer 318 if tag != "" { 319 prefix.WriteString(tag) 320 prefix.WriteString(" ") 321 } 322 for key, value := range attrs { 323 prefix.WriteString(key) 324 prefix.WriteString("=") 325 prefix.WriteString(value) 326 prefix.WriteString(" ") 327 } 328 329 loggerWrapper = &splunkLoggerRaw{logger, prefix.Bytes()} 330 default: 331 return nil, fmt.Errorf("Unexpected format %s", splunkFormat) 332 } 333 334 go loggerWrapper.worker() 335 336 return loggerWrapper, nil 337} 338 339func (l *splunkLoggerInline) Log(msg *logger.Message) error { 340 message := l.createSplunkMessage(msg) 341 342 event := *l.nullEvent 343 event.Line = string(msg.Line) 344 event.Source = msg.Source 345 346 message.Event = &event 347 logger.PutMessage(msg) 348 return l.queueMessageAsync(message) 349} 350 351func (l *splunkLoggerJSON) Log(msg *logger.Message) error { 352 message := l.createSplunkMessage(msg) 353 event := *l.nullEvent 354 355 var rawJSONMessage json.RawMessage 356 if err := json.Unmarshal(msg.Line, &rawJSONMessage); err == nil { 357 event.Line = &rawJSONMessage 358 } else { 359 event.Line = string(msg.Line) 360 } 361 362 event.Source = msg.Source 363 364 message.Event = &event 365 logger.PutMessage(msg) 366 return l.queueMessageAsync(message) 367} 368 369func (l *splunkLoggerRaw) Log(msg *logger.Message) error { 370 // empty or whitespace-only messages are not accepted by HEC 371 if strings.TrimSpace(string(msg.Line)) == "" { 372 return nil 373 } 374 375 message := l.createSplunkMessage(msg) 376 377 message.Event = string(append(l.prefix, msg.Line...)) 378 logger.PutMessage(msg) 379 return l.queueMessageAsync(message) 380} 381 382func (l *splunkLogger) queueMessageAsync(message *splunkMessage) error { 383 l.lock.RLock() 384 defer l.lock.RUnlock() 385 if l.closedCond != nil { 386 return fmt.Errorf("%s: driver is closed", driverName) 387 } 388 l.stream <- message 389 return nil 390} 391 392func (l *splunkLogger) worker() { 393 timer := time.NewTicker(l.postMessagesFrequency) 394 var messages []*splunkMessage 395 for { 396 select { 397 case message, open := <-l.stream: 398 if !open { 399 l.postMessages(messages, true) 400 l.lock.Lock() 401 defer l.lock.Unlock() 402 l.transport.CloseIdleConnections() 403 l.closed = true 404 l.closedCond.Signal() 405 return 406 } 407 messages = append(messages, message) 408 // Only sending when we get exactly to the batch size, 409 // This also helps not to fire postMessages on every new message, 410 // when previous try failed. 411 if len(messages)%l.postMessagesBatchSize == 0 { 412 messages = l.postMessages(messages, false) 413 } 414 case <-timer.C: 415 messages = l.postMessages(messages, false) 416 } 417 } 418} 419 420func (l *splunkLogger) postMessages(messages []*splunkMessage, lastChance bool) []*splunkMessage { 421 messagesLen := len(messages) 422 423 ctx, cancel := context.WithTimeout(context.Background(), batchSendTimeout) 424 defer cancel() 425 426 for i := 0; i < messagesLen; i += l.postMessagesBatchSize { 427 upperBound := i + l.postMessagesBatchSize 428 if upperBound > messagesLen { 429 upperBound = messagesLen 430 } 431 432 if err := l.tryPostMessages(ctx, messages[i:upperBound]); err != nil { 433 logrus.WithError(err).WithField("module", "logger/splunk").Warn("Error while sending logs") 434 if messagesLen-i >= l.bufferMaximum || lastChance { 435 // If this is last chance - print them all to the daemon log 436 if lastChance { 437 upperBound = messagesLen 438 } 439 // Not all sent, but buffer has got to its maximum, let's log all messages 440 // we could not send and return buffer minus one batch size 441 for j := i; j < upperBound; j++ { 442 if jsonEvent, err := json.Marshal(messages[j]); err != nil { 443 logrus.Error(err) 444 } else { 445 logrus.Error(fmt.Errorf("Failed to send a message '%s'", string(jsonEvent))) 446 } 447 } 448 return messages[upperBound:messagesLen] 449 } 450 // Not all sent, returning buffer from where we have not sent messages 451 return messages[i:messagesLen] 452 } 453 } 454 // All sent, return empty buffer 455 return messages[:0] 456} 457 458func (l *splunkLogger) tryPostMessages(ctx context.Context, messages []*splunkMessage) error { 459 if len(messages) == 0 { 460 return nil 461 } 462 var buffer bytes.Buffer 463 var writer io.Writer 464 var gzipWriter *gzip.Writer 465 var err error 466 // If gzip compression is enabled - create gzip writer with specified compression 467 // level. If gzip compression is disabled, use standard buffer as a writer 468 if l.gzipCompression { 469 gzipWriter, err = gzip.NewWriterLevel(&buffer, l.gzipCompressionLevel) 470 if err != nil { 471 return err 472 } 473 writer = gzipWriter 474 } else { 475 writer = &buffer 476 } 477 for _, message := range messages { 478 jsonEvent, err := json.Marshal(message) 479 if err != nil { 480 return err 481 } 482 if _, err := writer.Write(jsonEvent); err != nil { 483 return err 484 } 485 } 486 // If gzip compression is enabled, tell it, that we are done 487 if l.gzipCompression { 488 err = gzipWriter.Close() 489 if err != nil { 490 return err 491 } 492 } 493 req, err := http.NewRequest("POST", l.url, bytes.NewBuffer(buffer.Bytes())) 494 if err != nil { 495 return err 496 } 497 req = req.WithContext(ctx) 498 req.Header.Set("Authorization", l.auth) 499 // Tell if we are sending gzip compressed body 500 if l.gzipCompression { 501 req.Header.Set("Content-Encoding", "gzip") 502 } 503 res, err := l.client.Do(req) 504 if err != nil { 505 return err 506 } 507 defer res.Body.Close() 508 if res.StatusCode != http.StatusOK { 509 var body []byte 510 body, err = ioutil.ReadAll(res.Body) 511 if err != nil { 512 return err 513 } 514 return fmt.Errorf("%s: failed to send event - %s - %s", driverName, res.Status, body) 515 } 516 io.Copy(ioutil.Discard, res.Body) 517 return nil 518} 519 520func (l *splunkLogger) Close() error { 521 l.lock.Lock() 522 defer l.lock.Unlock() 523 if l.closedCond == nil { 524 l.closedCond = sync.NewCond(&l.lock) 525 close(l.stream) 526 for !l.closed { 527 l.closedCond.Wait() 528 } 529 } 530 return nil 531} 532 533func (l *splunkLogger) Name() string { 534 return driverName 535} 536 537func (l *splunkLogger) createSplunkMessage(msg *logger.Message) *splunkMessage { 538 message := *l.nullMessage 539 message.Time = fmt.Sprintf("%f", float64(msg.Timestamp.UnixNano())/float64(time.Second)) 540 return &message 541} 542 543// ValidateLogOpt looks for all supported by splunk driver options 544func ValidateLogOpt(cfg map[string]string) error { 545 for key := range cfg { 546 switch key { 547 case splunkURLKey: 548 case splunkTokenKey: 549 case splunkSourceKey: 550 case splunkSourceTypeKey: 551 case splunkIndexKey: 552 case splunkCAPathKey: 553 case splunkCANameKey: 554 case splunkInsecureSkipVerifyKey: 555 case splunkFormatKey: 556 case splunkVerifyConnectionKey: 557 case splunkGzipCompressionKey: 558 case splunkGzipCompressionLevelKey: 559 case envKey: 560 case envRegexKey: 561 case labelsKey: 562 case tagKey: 563 default: 564 return fmt.Errorf("unknown log opt '%s' for %s log driver", key, driverName) 565 } 566 } 567 return nil 568} 569 570func parseURL(info logger.Info) (*url.URL, error) { 571 splunkURLStr, ok := info.Config[splunkURLKey] 572 if !ok { 573 return nil, fmt.Errorf("%s: %s is expected", driverName, splunkURLKey) 574 } 575 576 splunkURL, err := url.Parse(splunkURLStr) 577 if err != nil { 578 return nil, fmt.Errorf("%s: failed to parse %s as url value in %s", driverName, splunkURLStr, splunkURLKey) 579 } 580 581 if !urlutil.IsURL(splunkURLStr) || 582 !splunkURL.IsAbs() || 583 (splunkURL.Path != "" && splunkURL.Path != "/") || 584 splunkURL.RawQuery != "" || 585 splunkURL.Fragment != "" { 586 return nil, fmt.Errorf("%s: expected format scheme://dns_name_or_ip:port for %s", driverName, splunkURLKey) 587 } 588 589 splunkURL.Path = "/services/collector/event/1.0" 590 591 return splunkURL, nil 592} 593 594func verifySplunkConnection(l *splunkLogger) error { 595 req, err := http.NewRequest(http.MethodOptions, l.url, nil) 596 if err != nil { 597 return err 598 } 599 res, err := l.client.Do(req) 600 if err != nil { 601 return err 602 } 603 if res.Body != nil { 604 defer res.Body.Close() 605 } 606 if res.StatusCode != http.StatusOK { 607 var body []byte 608 body, err = ioutil.ReadAll(res.Body) 609 if err != nil { 610 return err 611 } 612 return fmt.Errorf("%s: failed to verify connection - %s - %s", driverName, res.Status, body) 613 } 614 return nil 615} 616 617func getAdvancedOptionDuration(envName string, defaultValue time.Duration) time.Duration { 618 valueStr := os.Getenv(envName) 619 if valueStr == "" { 620 return defaultValue 621 } 622 parsedValue, err := time.ParseDuration(valueStr) 623 if err != nil { 624 logrus.Error(fmt.Sprintf("Failed to parse value of %s as duration. Using default %v. %v", envName, defaultValue, err)) 625 return defaultValue 626 } 627 return parsedValue 628} 629 630func getAdvancedOptionInt(envName string, defaultValue int) int { 631 valueStr := os.Getenv(envName) 632 if valueStr == "" { 633 return defaultValue 634 } 635 parsedValue, err := strconv.ParseInt(valueStr, 10, 32) 636 if err != nil { 637 logrus.Error(fmt.Sprintf("Failed to parse value of %s as integer. Using default %d. %v", envName, defaultValue, err)) 638 return defaultValue 639 } 640 return int(parsedValue) 641} 642