1package httpd
2
3import (
4	"container/list"
5	"fmt"
6	"net"
7	"net/http"
8	"sync"
9	"sync/atomic"
10
11	"github.com/influxdata/influxdb/services/meta"
12)
13
14type RequestInfo struct {
15	IPAddr   string
16	Username string
17}
18
19type RequestStats struct {
20	Writes  int64 `json:"writes"`
21	Queries int64 `json:"queries"`
22}
23
24func (r *RequestInfo) String() string {
25	if r.Username != "" {
26		return fmt.Sprintf("%s:%s", r.Username, r.IPAddr)
27	}
28	return r.IPAddr
29}
30
31type RequestProfile struct {
32	tracker *RequestTracker
33	elem    *list.Element
34
35	mu       sync.RWMutex
36	Requests map[RequestInfo]*RequestStats
37}
38
39func (p *RequestProfile) AddWrite(info RequestInfo) {
40	p.add(info, p.addWrite)
41}
42
43func (p *RequestProfile) AddQuery(info RequestInfo) {
44	p.add(info, p.addQuery)
45}
46
47func (p *RequestProfile) add(info RequestInfo, fn func(*RequestStats)) {
48	// Look for a request entry for this request.
49	p.mu.RLock()
50	st := p.Requests[info]
51	p.mu.RUnlock()
52	if st != nil {
53		fn(st)
54		return
55	}
56
57	// There is no entry in the request tracker. Create one.
58	p.mu.Lock()
59	if st := p.Requests[info]; st != nil {
60		// Something else created this entry while we were waiting for the lock.
61		p.mu.Unlock()
62		fn(st)
63		return
64	}
65
66	st = &RequestStats{}
67	p.Requests[info] = st
68	p.mu.Unlock()
69	fn(st)
70}
71
72func (p *RequestProfile) addWrite(st *RequestStats) {
73	atomic.AddInt64(&st.Writes, 1)
74}
75
76func (p *RequestProfile) addQuery(st *RequestStats) {
77	atomic.AddInt64(&st.Queries, 1)
78}
79
80// Stop informs the RequestTracker to stop collecting statistics for this
81// profile.
82func (p *RequestProfile) Stop() {
83	p.tracker.mu.Lock()
84	p.tracker.profiles.Remove(p.elem)
85	p.tracker.mu.Unlock()
86}
87
88type RequestTracker struct {
89	mu       sync.RWMutex
90	profiles *list.List
91}
92
93func NewRequestTracker() *RequestTracker {
94	return &RequestTracker{
95		profiles: list.New(),
96	}
97}
98
99func (rt *RequestTracker) TrackRequests() *RequestProfile {
100	// Perform the memory allocation outside of the lock.
101	profile := &RequestProfile{
102		Requests: make(map[RequestInfo]*RequestStats),
103		tracker:  rt,
104	}
105
106	rt.mu.Lock()
107	profile.elem = rt.profiles.PushBack(profile)
108	rt.mu.Unlock()
109	return profile
110}
111
112func (rt *RequestTracker) Add(req *http.Request, user meta.User) {
113	rt.mu.RLock()
114	if rt.profiles.Len() == 0 {
115		rt.mu.RUnlock()
116		return
117	}
118	defer rt.mu.RUnlock()
119
120	var info RequestInfo
121	host, _, err := net.SplitHostPort(req.RemoteAddr)
122	if err != nil {
123		return
124	}
125
126	info.IPAddr = host
127	if user != nil {
128		info.Username = user.ID()
129	}
130
131	// Add the request info to the profiles.
132	for p := rt.profiles.Front(); p != nil; p = p.Next() {
133		profile := p.Value.(*RequestProfile)
134		if req.URL.Path == "/query" {
135			profile.AddQuery(info)
136		} else if req.URL.Path == "/write" {
137			profile.AddWrite(info)
138		}
139	}
140}
141