1package httpd 2 3import ( 4 "container/list" 5 "fmt" 6 "net" 7 "net/http" 8 "sync" 9 "sync/atomic" 10 11 "github.com/influxdata/influxdb/services/meta" 12) 13 14type RequestInfo struct { 15 IPAddr string 16 Username string 17} 18 19type RequestStats struct { 20 Writes int64 `json:"writes"` 21 Queries int64 `json:"queries"` 22} 23 24func (r *RequestInfo) String() string { 25 if r.Username != "" { 26 return fmt.Sprintf("%s:%s", r.Username, r.IPAddr) 27 } 28 return r.IPAddr 29} 30 31type RequestProfile struct { 32 tracker *RequestTracker 33 elem *list.Element 34 35 mu sync.RWMutex 36 Requests map[RequestInfo]*RequestStats 37} 38 39func (p *RequestProfile) AddWrite(info RequestInfo) { 40 p.add(info, p.addWrite) 41} 42 43func (p *RequestProfile) AddQuery(info RequestInfo) { 44 p.add(info, p.addQuery) 45} 46 47func (p *RequestProfile) add(info RequestInfo, fn func(*RequestStats)) { 48 // Look for a request entry for this request. 49 p.mu.RLock() 50 st := p.Requests[info] 51 p.mu.RUnlock() 52 if st != nil { 53 fn(st) 54 return 55 } 56 57 // There is no entry in the request tracker. Create one. 58 p.mu.Lock() 59 if st := p.Requests[info]; st != nil { 60 // Something else created this entry while we were waiting for the lock. 61 p.mu.Unlock() 62 fn(st) 63 return 64 } 65 66 st = &RequestStats{} 67 p.Requests[info] = st 68 p.mu.Unlock() 69 fn(st) 70} 71 72func (p *RequestProfile) addWrite(st *RequestStats) { 73 atomic.AddInt64(&st.Writes, 1) 74} 75 76func (p *RequestProfile) addQuery(st *RequestStats) { 77 atomic.AddInt64(&st.Queries, 1) 78} 79 80// Stop informs the RequestTracker to stop collecting statistics for this 81// profile. 82func (p *RequestProfile) Stop() { 83 p.tracker.mu.Lock() 84 p.tracker.profiles.Remove(p.elem) 85 p.tracker.mu.Unlock() 86} 87 88type RequestTracker struct { 89 mu sync.RWMutex 90 profiles *list.List 91} 92 93func NewRequestTracker() *RequestTracker { 94 return &RequestTracker{ 95 profiles: list.New(), 96 } 97} 98 99func (rt *RequestTracker) TrackRequests() *RequestProfile { 100 // Perform the memory allocation outside of the lock. 101 profile := &RequestProfile{ 102 Requests: make(map[RequestInfo]*RequestStats), 103 tracker: rt, 104 } 105 106 rt.mu.Lock() 107 profile.elem = rt.profiles.PushBack(profile) 108 rt.mu.Unlock() 109 return profile 110} 111 112func (rt *RequestTracker) Add(req *http.Request, user meta.User) { 113 rt.mu.RLock() 114 if rt.profiles.Len() == 0 { 115 rt.mu.RUnlock() 116 return 117 } 118 defer rt.mu.RUnlock() 119 120 var info RequestInfo 121 host, _, err := net.SplitHostPort(req.RemoteAddr) 122 if err != nil { 123 return 124 } 125 126 info.IPAddr = host 127 if user != nil { 128 info.Username = user.ID() 129 } 130 131 // Add the request info to the profiles. 132 for p := rt.profiles.Front(); p != nil; p = p.Next() { 133 profile := p.Value.(*RequestProfile) 134 if req.URL.Path == "/query" { 135 profile.AddQuery(info) 136 } else if req.URL.Path == "/write" { 137 profile.AddWrite(info) 138 } 139 } 140} 141