1package collectors
2
3import (
4	"bytes"
5	"encoding/base64"
6	"encoding/gob"
7	"fmt"
8	"net/http"
9	"sort"
10	"strconv"
11	"time"
12
13	analytics "google.golang.org/api/analytics/v3"
14
15	"bosun.org/cmd/scollector/conf"
16	"bosun.org/metadata"
17	"bosun.org/opentsdb"
18	"golang.org/x/net/context"
19	"golang.org/x/oauth2"
20	"golang.org/x/oauth2/google"
21)
22
23const descActiveUsers = "Number of unique users actively visiting the site."
24
25type multiError []error
26
27func (m multiError) Error() string {
28	var fullErr string
29	for _, err := range m {
30		fullErr = fmt.Sprintf("%s\n%s", fullErr, err)
31	}
32	return fullErr
33}
34
35func init() {
36	registerInit(func(c *conf.Conf) {
37		for _, g := range c.GoogleAnalytics {
38			collectors = append(collectors, &IntervalCollector{
39				F: func() (opentsdb.MultiDataPoint, error) {
40					return c_google_analytics(g.ClientID, g.Secret, g.Token, g.JSONToken, g.Sites)
41				},
42				name:     "c_google_analytics",
43				Interval: time.Minute * 1,
44			})
45		}
46	})
47}
48
49func c_google_analytics(clientid string, secret string, tokenstr string, jsonToken string, sites []conf.GoogleAnalyticsSite) (opentsdb.MultiDataPoint, error) {
50	var md opentsdb.MultiDataPoint
51	var mErr multiError
52
53	c, err := googleAPIClient(clientid, secret, tokenstr, jsonToken, []string{analytics.AnalyticsScope})
54	if err != nil {
55		return nil, err
56	}
57	svc, err := analytics.New(c)
58	if err != nil {
59		return nil, err
60	}
61
62	// dimension: max records we want to fetch
63	// "source" has a very long tail so we limit it to something sane
64	// TODO: Dimensions we want and associated attributes should eventually be
65	// setup in configuration.
66	dimensions := map[string]int{"browser": -1, "trafficType": -1, "source": 10, "deviceCategory": -1, "operatingSystem": -1}
67	for _, site := range sites {
68		getPageviews(&md, svc, site)
69		if site.Detailed {
70			if err = getActiveUsers(&md, svc, site); err != nil {
71				mErr = append(mErr, err)
72			}
73			for dimension, topN := range dimensions {
74				if err = getActiveUsersByDimension(&md, svc, site, dimension, topN); err != nil {
75					mErr = append(mErr, err)
76				}
77			}
78		}
79	}
80
81	if len(mErr) == 0 {
82		return md, nil
83	} else {
84		return md, mErr
85	}
86}
87
88type kv struct {
89	key   string
90	value int
91}
92
93type kvList []kv
94
95func (p kvList) Len() int           { return len(p) }
96func (p kvList) Less(i, j int) bool { return p[i].value < p[j].value }
97func (p kvList) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }
98
99func getActiveUsersByDimension(md *opentsdb.MultiDataPoint, svc *analytics.Service, site conf.GoogleAnalyticsSite, dimension string, topN int) error {
100	call := svc.Data.Realtime.Get("ga:"+site.Profile, "rt:activeusers").Dimensions("rt:" + dimension)
101	data, err := call.Do()
102	if err != nil {
103		return err
104	}
105	tags := opentsdb.TagSet{"site": site.Name}
106	rows := make(kvList, len(data.Rows))
107	for i, row := range data.Rows {
108		// key will always be an string of the dimension we care about.
109		// For example, 'Chrome' would be a key for the 'browser' dimension.
110		key, _ := opentsdb.Clean(row[0])
111		if key == "" {
112			key = "__blank__"
113		}
114		value, err := strconv.Atoi(row[1])
115		if err != nil {
116			return fmt.Errorf("Error parsing GA data: %s", err)
117		}
118		rows[i] = kv{key: key, value: value}
119	}
120	sort.Sort(sort.Reverse(rows))
121	if topN != -1 && topN < len(rows) {
122		topRows := make(kvList, topN)
123		topRows = rows[:topN]
124		rows = topRows
125	}
126
127	for _, row := range rows {
128		Add(md, "google.analytics.realtime.activeusers.by_"+dimension, row.value, opentsdb.TagSet{dimension: row.key}.Merge(tags), metadata.Gauge, metadata.ActiveUsers, descActiveUsers)
129	}
130	return nil
131}
132
133func getActiveUsers(md *opentsdb.MultiDataPoint, svc *analytics.Service, site conf.GoogleAnalyticsSite) error {
134	call := svc.Data.Realtime.Get("ga:"+site.Profile, "rt:activeusers")
135	data, err := call.Do()
136	if err != nil {
137		return err
138	}
139	tags := opentsdb.TagSet{"site": site.Name}
140	if len(data.Rows) < 1 || len(data.Rows[0]) < 1 {
141		return fmt.Errorf("no active user data in response for site %v", site.Name)
142	}
143	value, err := strconv.Atoi(data.Rows[0][0])
144	if err != nil {
145		return fmt.Errorf("Error parsing GA data: %s", err)
146	}
147
148	Add(md, "google.analytics.realtime.activeusers", value, tags, metadata.Gauge, metadata.ActiveUsers, descActiveUsers)
149	return nil
150}
151
152func getPageviews(md *opentsdb.MultiDataPoint, svc *analytics.Service, site conf.GoogleAnalyticsSite) error {
153	call := svc.Data.Realtime.Get("ga:"+site.Profile, "rt:pageviews").Dimensions("rt:minutesAgo")
154	data, err := call.Do()
155	if err != nil {
156		return err
157	}
158
159	// If no offset was specified, the minute we care about is '1', or the most
160	// recently gathered, completed datapoint. Minute '0' is the current minute,
161	// and as such is incomplete.
162	offset := site.Offset
163	if offset == 0 {
164		offset = 1
165	}
166	time := time.Now().Add(time.Duration(-1*offset) * time.Minute).Unix()
167	pageviews := 0
168	// Iterates through the response data and returns the time slice we
169	// actually care about when we find it.
170	for _, row := range data.Rows {
171		// row == [2]string{"0", "123"}
172		// First item is the minute, second is the data (pageviews in this case)
173		minute, err := strconv.Atoi(row[0])
174		if err != nil {
175			return fmt.Errorf("Error parsing GA data: %s", err)
176		}
177		if minute == offset {
178			if pageviews, err = strconv.Atoi(row[1]); err != nil {
179				return fmt.Errorf("Error parsing GA data: %s", err)
180			}
181			break
182		}
183	}
184	AddTS(md, "google.analytics.realtime.pageviews", time, pageviews, opentsdb.TagSet{"site": site.Name}, metadata.Gauge, metadata.Count, "Number of pageviews tracked by GA in one minute")
185	return nil
186}
187
188// googleAPIClient() takes in a clientid, secret, a base64'd gob representing
189// the cached oauth token, and a list of oauth scopes.  Generating the token is
190// left as an exercise to the reader.
191// Or use a base 64 encoded service account json key. Provide json key OR oauth client info.
192func googleAPIClient(clientid string, secret string, tokenstr string, jsonToken string, scopes []string) (*http.Client, error) {
193
194	if jsonToken != "" && clientid+secret+tokenstr != "" {
195		return nil, fmt.Errorf("For google, provide a json token OR oauth client info and token. Not both")
196	}
197
198	ctx := context.Background()
199	if jsonToken != "" {
200		by, err := base64.StdEncoding.DecodeString(jsonToken)
201		if err != nil {
202			return nil, err
203		}
204		config, err := google.JWTConfigFromJSON(by, scopes...)
205		if err != nil {
206			return nil, err
207		}
208		return config.Client(ctx), nil
209	}
210
211	config := &oauth2.Config{
212		ClientID:     clientid,
213		ClientSecret: secret,
214		Endpoint:     google.Endpoint,
215		Scopes:       scopes,
216	}
217	token := new(oauth2.Token)
218	// Decode the base64'd gob
219	by, err := base64.StdEncoding.DecodeString(tokenstr)
220	if err != nil {
221		return nil, err
222	}
223	b := bytes.Buffer{}
224	b.Write(by)
225	d := gob.NewDecoder(&b)
226	err = d.Decode(&token)
227	return config.Client(ctx, token), nil
228}
229