1// Copyright 2018 Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package papertrail
16
17import (
18	"fmt"
19	"html/template"
20	"log/syslog"
21	"net"
22	"regexp"
23	"runtime"
24	"strconv"
25	"strings"
26	"sync"
27	"time"
28
29	"istio.io/istio/mixer/adapter/solarwinds/config"
30	"istio.io/istio/mixer/pkg/adapter"
31	"istio.io/istio/mixer/template/logentry"
32	"istio.io/pkg/pool"
33)
34
35const (
36	defaultRetention = time.Hour
37	keyFormat        = "TS:%d-BODY:%s"
38	keyPattern       = "TS:(\\d+)-BODY:(.*)"
39	defaultTemplate  = `{{or (.originIp) "-"}} - {{or (.sourceUser) "-"}} ` +
40		`[{{or (.timestamp.Format "2006-01-02T15:04:05Z07:00") "-"}}] "{{or (.method) "-"}} {{or (.url) "-"}} ` +
41		`{{or (.protocol) "-"}}" {{or (.responseCode) "-"}} {{or (.responseSize) "-"}}`
42)
43
44var defaultWorkerCount = 10
45
46type logInfo struct {
47	tmpl *template.Template
48}
49
50// LoggerInterface is the interface for all Papertrail logger types
51type LoggerInterface interface {
52	Log(*logentry.Instance) error
53	Close() error
54}
55
56// Logger is a concrete type of LoggerInterface which collects and ships logs to Papertrail
57type Logger struct {
58	paperTrailURL string
59
60	retentionPeriod time.Duration
61
62	cmap *sync.Map
63
64	logInfos map[string]*logInfo
65
66	log adapter.Logger
67
68	env adapter.Env
69
70	maxWorkers int
71
72	loopFactor chan bool
73
74	loopWait chan struct{}
75}
76
77// NewLogger does some ground work and returns an instance of LoggerInterface
78func NewLogger(paperTrailURL string, retention time.Duration, logConfigs map[string]*config.Params_LogInfo,
79	env adapter.Env) (LoggerInterface, error) {
80	logger := env.Logger()
81	if retention.Seconds() <= float64(0) {
82		retention = defaultRetention
83	}
84
85	logger.Infof("Creating a new paper trail logger for url: %s", paperTrailURL)
86
87	p := &Logger{
88		paperTrailURL:   paperTrailURL,
89		retentionPeriod: retention,
90		cmap:            new(sync.Map),
91		log:             logger,
92		env:             env,
93		maxWorkers:      defaultWorkerCount * runtime.NumCPU(),
94		loopFactor:      make(chan bool),
95		loopWait:        make(chan struct{}),
96	}
97
98	p.logInfos = map[string]*logInfo{}
99
100	for inst, l := range logConfigs {
101		var templ string
102		if strings.TrimSpace(l.PayloadTemplate) != "" {
103			templ = l.PayloadTemplate
104		} else {
105			templ = defaultTemplate
106		}
107		tmpl, err := template.New(inst).Parse(templ)
108		if err != nil {
109			_ = logger.Errorf("failed to evaluate template for log instance: %s, skipping: %v", inst, err)
110			continue
111		}
112		p.logInfos[inst] = &logInfo{
113			tmpl: tmpl,
114		}
115	}
116
117	env.ScheduleDaemon(p.flushLogs)
118	return p, nil
119}
120
121// Log method receives log messages
122func (p *Logger) Log(msg *logentry.Instance) error {
123	linfo, ok := p.logInfos[msg.Name]
124	if !ok {
125		return p.log.Errorf("Got an unknown instance of log: %s. Hence Skipping.", msg.Name)
126	}
127	buf := pool.GetBuffer()
128	msg.Variables["timestamp"] = time.Now()
129	ipval, ok := msg.Variables["originIp"].([]byte)
130	if ok {
131		msg.Variables["originIp"] = net.IP(ipval).String()
132	}
133
134	if err := linfo.tmpl.Execute(buf, msg.Variables); err != nil {
135		_ = p.log.Errorf("failed to execute template for log '%s': %v", msg.Name, err)
136		// proceeding anyways
137	}
138	payload := buf.String()
139	pool.PutBuffer(buf)
140
141	if len(payload) > 0 {
142		ut := time.Now().UnixNano()
143		p.cmap.Store(fmt.Sprintf(keyFormat, ut, payload), struct{}{})
144	}
145	return nil
146}
147
148func (p *Logger) sendLogs(data string) error {
149	var err error
150	writer, err := syslog.Dial("udp", p.paperTrailURL, syslog.LOG_EMERG|syslog.LOG_KERN, "istio")
151	if err != nil {
152		return p.log.Errorf("Failed to dial syslog: %v", err)
153	}
154	defer func() { _ = writer.Close() }()
155	if err = writer.Info(data); err != nil {
156		return p.log.Errorf("failed to send log msg to papertrail: %v", err)
157	}
158	return nil
159}
160
161// This should be run in a routine
162func (p *Logger) flushLogs() {
163	var err error
164	defer func() {
165		p.loopWait <- struct{}{}
166	}()
167	re := regexp.MustCompile(keyPattern)
168	for {
169		select {
170		case <-p.loopFactor:
171			return
172		default:
173			hose := make(chan interface{}, p.maxWorkers)
174			var wg sync.WaitGroup
175
176			// workers
177			for i := 0; i < p.maxWorkers; i++ {
178				p.env.ScheduleDaemon(func() {
179					for keyI := range hose {
180						key, _ := keyI.(string)
181						match := re.FindStringSubmatch(key)
182						if len(match) > 2 {
183							if err = p.sendLogs(match[2]); err == nil {
184								p.cmap.Delete(key)
185								wg.Done()
186								continue
187							}
188
189							tsN, _ := strconv.ParseInt(match[1], 10, 64)
190							ts := time.Unix(0, tsN)
191
192							if time.Since(ts) > p.retentionPeriod {
193								p.cmap.Delete(key)
194							}
195						}
196						wg.Done()
197					}
198				})
199			}
200
201			p.cmap.Range(func(k, v interface{}) bool {
202				wg.Add(1)
203				hose <- k
204				return true
205			})
206			wg.Wait()
207			close(hose)
208			time.Sleep(500 * time.Millisecond)
209		}
210	}
211}
212
213// Close - closes the Logger instance
214func (p *Logger) Close() error {
215	p.loopFactor <- true
216	defer close(p.loopWait)
217	<-p.loopWait
218	return nil
219}
220