1package analytics
2
3import (
4	"encoding/json"
5	"time"
6)
7
8// Values implementing this interface are used by analytics clients to notify
9// the application when a message send succeeded or failed.
10//
11// Callback methods are called by a client's internal goroutines, there are no
12// guarantees on which goroutine will trigger the callbacks, the calls can be
13// made sequentially or in parallel, the order doesn't depend on the order of
14// messages were queued to the client.
15//
16// Callback methods must return quickly and not cause long blocking operations
17// to avoid interferring with the client's internal work flow.
18type Callback interface {
19
20	// This method is called for every message that was successfully sent to
21	// the API.
22	Success(Message)
23
24	// This method is called for every message that failed to be sent to the
25	// API and will be discarded by the client.
26	Failure(Message, error)
27}
28
29// This interface is used to represent analytics objects that can be sent via
30// a client.
31//
32// Types like analytics.Track, analytics.Page, etc... implement this interface
33// and therefore can be passed to the analytics.Client.Send method.
34type Message interface {
35
36	// Validate validates the internal structure of the message, the method must return
37	// nil if the message is valid, or an error describing what went wrong.
38	Validate() error
39
40	// internal is an unexposed interface function to ensure only types defined within this package can satisfy the Message interface. Invoking this method will panic.
41	internal()
42}
43
44// Takes a message id as first argument and returns it, unless it's the zero-
45// value, in that case the default id passed as second argument is returned.
46func makeMessageId(id string, def string) string {
47	if len(id) == 0 {
48		return def
49	}
50	return id
51}
52
53// Returns the time value passed as first argument, unless it's the zero-value,
54// in that case the default value passed as second argument is returned.
55func makeTimestamp(t time.Time, def time.Time) time.Time {
56	if t == (time.Time{}) {
57		return def
58	}
59	return t
60}
61
62// This structure represents objects sent to the /v1/batch endpoint. We don't
63// export this type because it's only meant to be used internally to send groups
64// of messages in one API call.
65type batch struct {
66	MessageId string    `json:"messageId"`
67	SentAt    time.Time `json:"sentAt"`
68	Messages  []message `json:"batch"`
69	Context   *Context  `json:"context"`
70}
71
72type message struct {
73	msg  Message
74	json []byte
75}
76
77func makeMessage(m Message, maxBytes int) (msg message, err error) {
78	if msg.json, err = json.Marshal(m); err == nil {
79		if len(msg.json) > maxBytes {
80			err = ErrMessageTooBig
81		} else {
82			msg.msg = m
83		}
84	}
85	return
86}
87
88func (m message) MarshalJSON() ([]byte, error) {
89	return m.json, nil
90}
91
92func (m message) size() int {
93	// The `+ 1` is for the comma that sits between each items of a JSON array.
94	return len(m.json) + 1
95}
96
97type messageQueue struct {
98	pending       []message
99	bytes         int
100	maxBatchSize  int
101	maxBatchBytes int
102}
103
104func (q *messageQueue) push(m message) (b []message) {
105	if (q.bytes + m.size()) > q.maxBatchBytes {
106		b = q.flush()
107	}
108
109	if q.pending == nil {
110		q.pending = make([]message, 0, q.maxBatchSize)
111	}
112
113	q.pending = append(q.pending, m)
114	q.bytes += len(m.json)
115
116	if b == nil && len(q.pending) == q.maxBatchSize {
117		b = q.flush()
118	}
119
120	return
121}
122
123func (q *messageQueue) flush() (msgs []message) {
124	msgs, q.pending, q.bytes = q.pending, nil, 0
125	return
126}
127
128const (
129	maxBatchBytes   = 500000
130	maxMessageBytes = 32000
131)
132