1package analytics 2 3import ( 4 "encoding/json" 5 "time" 6) 7 8// Values implementing this interface are used by analytics clients to notify 9// the application when a message send succeeded or failed. 10// 11// Callback methods are called by a client's internal goroutines, there are no 12// guarantees on which goroutine will trigger the callbacks, the calls can be 13// made sequentially or in parallel, the order doesn't depend on the order of 14// messages were queued to the client. 15// 16// Callback methods must return quickly and not cause long blocking operations 17// to avoid interferring with the client's internal work flow. 18type Callback interface { 19 20 // This method is called for every message that was successfully sent to 21 // the API. 22 Success(Message) 23 24 // This method is called for every message that failed to be sent to the 25 // API and will be discarded by the client. 26 Failure(Message, error) 27} 28 29// This interface is used to represent analytics objects that can be sent via 30// a client. 31// 32// Types like analytics.Track, analytics.Page, etc... implement this interface 33// and therefore can be passed to the analytics.Client.Send method. 34type Message interface { 35 36 // Validate validates the internal structure of the message, the method must return 37 // nil if the message is valid, or an error describing what went wrong. 38 Validate() error 39 40 // internal is an unexposed interface function to ensure only types defined within this package can satisfy the Message interface. Invoking this method will panic. 41 internal() 42} 43 44// Takes a message id as first argument and returns it, unless it's the zero- 45// value, in that case the default id passed as second argument is returned. 46func makeMessageId(id string, def string) string { 47 if len(id) == 0 { 48 return def 49 } 50 return id 51} 52 53// Returns the time value passed as first argument, unless it's the zero-value, 54// in that case the default value passed as second argument is returned. 55func makeTimestamp(t time.Time, def time.Time) time.Time { 56 if t == (time.Time{}) { 57 return def 58 } 59 return t 60} 61 62// This structure represents objects sent to the /v1/batch endpoint. We don't 63// export this type because it's only meant to be used internally to send groups 64// of messages in one API call. 65type batch struct { 66 MessageId string `json:"messageId"` 67 SentAt time.Time `json:"sentAt"` 68 Messages []message `json:"batch"` 69 Context *Context `json:"context"` 70} 71 72type message struct { 73 msg Message 74 json []byte 75} 76 77func makeMessage(m Message, maxBytes int) (msg message, err error) { 78 if msg.json, err = json.Marshal(m); err == nil { 79 if len(msg.json) > maxBytes { 80 err = ErrMessageTooBig 81 } else { 82 msg.msg = m 83 } 84 } 85 return 86} 87 88func (m message) MarshalJSON() ([]byte, error) { 89 return m.json, nil 90} 91 92func (m message) size() int { 93 // The `+ 1` is for the comma that sits between each items of a JSON array. 94 return len(m.json) + 1 95} 96 97type messageQueue struct { 98 pending []message 99 bytes int 100 maxBatchSize int 101 maxBatchBytes int 102} 103 104func (q *messageQueue) push(m message) (b []message) { 105 if (q.bytes + m.size()) > q.maxBatchBytes { 106 b = q.flush() 107 } 108 109 if q.pending == nil { 110 q.pending = make([]message, 0, q.maxBatchSize) 111 } 112 113 q.pending = append(q.pending, m) 114 q.bytes += len(m.json) 115 116 if b == nil && len(q.pending) == q.maxBatchSize { 117 b = q.flush() 118 } 119 120 return 121} 122 123func (q *messageQueue) flush() (msgs []message) { 124 msgs, q.pending, q.bytes = q.pending, nil, 0 125 return 126} 127 128const ( 129 maxBatchBytes = 500000 130 maxMessageBytes = 32000 131) 132