1package api 2 3import ( 4 "sync" 5 6 "github.com/prometheus/common/model" 7 8 "github.com/grafana/loki/pkg/logproto" 9) 10 11// Entry is a log entry with labels. 12type Entry struct { 13 Labels model.LabelSet 14 logproto.Entry 15} 16 17type InstrumentedEntryHandler interface { 18 EntryHandler 19 UnregisterLatencyMetric(labels model.LabelSet) 20} 21 22// EntryHandler is something that can "handle" entries via a channel. 23// Stop must be called to gracefully shutdown the EntryHandler 24type EntryHandler interface { 25 Chan() chan<- Entry 26 Stop() 27} 28 29// EntryMiddleware takes an EntryHandler and returns another one that will intercept and forward entries. 30// The newly created EntryHandler should be Stopped independently from the original one. 31type EntryMiddleware interface { 32 Wrap(EntryHandler) EntryHandler 33} 34 35// EntryMiddlewareFunc allows to create EntryMiddleware via a function. 36type EntryMiddlewareFunc func(EntryHandler) EntryHandler 37 38func (e EntryMiddlewareFunc) Wrap(next EntryHandler) EntryHandler { 39 return e(next) 40} 41 42// EntryMutatorFunc is a function that can mutate an entry 43type EntryMutatorFunc func(Entry) Entry 44 45type entryHandler struct { 46 stop func() 47 entries chan<- Entry 48} 49 50func (e entryHandler) Chan() chan<- Entry { 51 return e.entries 52} 53 54func (e entryHandler) Stop() { 55 e.stop() 56} 57 58// NewEntryHandler creates a new EntryHandler using a input channel and a stop function. 59func NewEntryHandler(entries chan<- Entry, stop func()) EntryHandler { 60 return entryHandler{ 61 stop: stop, 62 entries: entries, 63 } 64} 65 66// NewEntryMutatorHandler creates a EntryHandler that mutates incoming entries from another EntryHandler. 67func NewEntryMutatorHandler(next EntryHandler, f EntryMutatorFunc) EntryHandler { 68 in, wg, once := make(chan Entry), sync.WaitGroup{}, sync.Once{} 69 nextChan := next.Chan() 70 wg.Add(1) 71 go func() { 72 defer wg.Done() 73 for e := range in { 74 nextChan <- f(e) 75 } 76 }() 77 return NewEntryHandler(in, func() { 78 once.Do(func() { close(in) }) 79 wg.Wait() 80 }) 81} 82 83// AddLabelsMiddleware is an EntryMiddleware that adds some labels. 84func AddLabelsMiddleware(additionalLabels model.LabelSet) EntryMiddleware { 85 return EntryMiddlewareFunc(func(eh EntryHandler) EntryHandler { 86 return NewEntryMutatorHandler(eh, func(e Entry) Entry { 87 e.Labels = additionalLabels.Merge(e.Labels) 88 return e 89 }) 90 }) 91} 92