1// Copyright 2018 Istio Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package papertrail 16 17import ( 18 "fmt" 19 "html/template" 20 "log/syslog" 21 "net" 22 "regexp" 23 "runtime" 24 "strconv" 25 "strings" 26 "sync" 27 "time" 28 29 "istio.io/istio/mixer/adapter/solarwinds/config" 30 "istio.io/istio/mixer/pkg/adapter" 31 "istio.io/istio/mixer/template/logentry" 32 "istio.io/pkg/pool" 33) 34 35const ( 36 defaultRetention = time.Hour 37 keyFormat = "TS:%d-BODY:%s" 38 keyPattern = "TS:(\\d+)-BODY:(.*)" 39 defaultTemplate = `{{or (.originIp) "-"}} - {{or (.sourceUser) "-"}} ` + 40 `[{{or (.timestamp.Format "2006-01-02T15:04:05Z07:00") "-"}}] "{{or (.method) "-"}} {{or (.url) "-"}} ` + 41 `{{or (.protocol) "-"}}" {{or (.responseCode) "-"}} {{or (.responseSize) "-"}}` 42) 43 44var defaultWorkerCount = 10 45 46type logInfo struct { 47 tmpl *template.Template 48} 49 50// LoggerInterface is the interface for all Papertrail logger types 51type LoggerInterface interface { 52 Log(*logentry.Instance) error 53 Close() error 54} 55 56// Logger is a concrete type of LoggerInterface which collects and ships logs to Papertrail 57type Logger struct { 58 paperTrailURL string 59 60 retentionPeriod time.Duration 61 62 cmap *sync.Map 63 64 logInfos map[string]*logInfo 65 66 log adapter.Logger 67 68 env adapter.Env 69 70 maxWorkers int 71 72 loopFactor chan bool 73 74 loopWait chan struct{} 75} 76 77// NewLogger does some ground work and returns an instance of LoggerInterface 78func NewLogger(paperTrailURL string, retention time.Duration, logConfigs map[string]*config.Params_LogInfo, 79 env adapter.Env) (LoggerInterface, error) { 80 logger := env.Logger() 81 if retention.Seconds() <= float64(0) { 82 retention = defaultRetention 83 } 84 85 logger.Infof("Creating a new paper trail logger for url: %s", paperTrailURL) 86 87 p := &Logger{ 88 paperTrailURL: paperTrailURL, 89 retentionPeriod: retention, 90 cmap: new(sync.Map), 91 log: logger, 92 env: env, 93 maxWorkers: defaultWorkerCount * runtime.NumCPU(), 94 loopFactor: make(chan bool), 95 loopWait: make(chan struct{}), 96 } 97 98 p.logInfos = map[string]*logInfo{} 99 100 for inst, l := range logConfigs { 101 var templ string 102 if strings.TrimSpace(l.PayloadTemplate) != "" { 103 templ = l.PayloadTemplate 104 } else { 105 templ = defaultTemplate 106 } 107 tmpl, err := template.New(inst).Parse(templ) 108 if err != nil { 109 _ = logger.Errorf("failed to evaluate template for log instance: %s, skipping: %v", inst, err) 110 continue 111 } 112 p.logInfos[inst] = &logInfo{ 113 tmpl: tmpl, 114 } 115 } 116 117 env.ScheduleDaemon(p.flushLogs) 118 return p, nil 119} 120 121// Log method receives log messages 122func (p *Logger) Log(msg *logentry.Instance) error { 123 linfo, ok := p.logInfos[msg.Name] 124 if !ok { 125 return p.log.Errorf("Got an unknown instance of log: %s. Hence Skipping.", msg.Name) 126 } 127 buf := pool.GetBuffer() 128 msg.Variables["timestamp"] = time.Now() 129 ipval, ok := msg.Variables["originIp"].([]byte) 130 if ok { 131 msg.Variables["originIp"] = net.IP(ipval).String() 132 } 133 134 if err := linfo.tmpl.Execute(buf, msg.Variables); err != nil { 135 _ = p.log.Errorf("failed to execute template for log '%s': %v", msg.Name, err) 136 // proceeding anyways 137 } 138 payload := buf.String() 139 pool.PutBuffer(buf) 140 141 if len(payload) > 0 { 142 ut := time.Now().UnixNano() 143 p.cmap.Store(fmt.Sprintf(keyFormat, ut, payload), struct{}{}) 144 } 145 return nil 146} 147 148func (p *Logger) sendLogs(data string) error { 149 var err error 150 writer, err := syslog.Dial("udp", p.paperTrailURL, syslog.LOG_EMERG|syslog.LOG_KERN, "istio") 151 if err != nil { 152 return p.log.Errorf("Failed to dial syslog: %v", err) 153 } 154 defer func() { _ = writer.Close() }() 155 if err = writer.Info(data); err != nil { 156 return p.log.Errorf("failed to send log msg to papertrail: %v", err) 157 } 158 return nil 159} 160 161// This should be run in a routine 162func (p *Logger) flushLogs() { 163 var err error 164 defer func() { 165 p.loopWait <- struct{}{} 166 }() 167 re := regexp.MustCompile(keyPattern) 168 for { 169 select { 170 case <-p.loopFactor: 171 return 172 default: 173 hose := make(chan interface{}, p.maxWorkers) 174 var wg sync.WaitGroup 175 176 // workers 177 for i := 0; i < p.maxWorkers; i++ { 178 p.env.ScheduleDaemon(func() { 179 for keyI := range hose { 180 key, _ := keyI.(string) 181 match := re.FindStringSubmatch(key) 182 if len(match) > 2 { 183 if err = p.sendLogs(match[2]); err == nil { 184 p.cmap.Delete(key) 185 wg.Done() 186 continue 187 } 188 189 tsN, _ := strconv.ParseInt(match[1], 10, 64) 190 ts := time.Unix(0, tsN) 191 192 if time.Since(ts) > p.retentionPeriod { 193 p.cmap.Delete(key) 194 } 195 } 196 wg.Done() 197 } 198 }) 199 } 200 201 p.cmap.Range(func(k, v interface{}) bool { 202 wg.Add(1) 203 hose <- k 204 return true 205 }) 206 wg.Wait() 207 close(hose) 208 time.Sleep(500 * time.Millisecond) 209 } 210 } 211} 212 213// Close - closes the Logger instance 214func (p *Logger) Close() error { 215 p.loopFactor <- true 216 defer close(p.loopWait) 217 <-p.loopWait 218 return nil 219} 220