1package eventstream
2
3import (
4	"encoding/base64"
5	"encoding/binary"
6	"fmt"
7	"io"
8	"strconv"
9	"time"
10)
11
12const maxHeaderValueLen = 1<<15 - 1 // 2^15-1 or 32KB - 1
13
14// valueType is the EventStream header value type.
15type valueType uint8
16
17// Header value types
18const (
19	trueValueType valueType = iota
20	falseValueType
21	int8ValueType  // Byte
22	int16ValueType // Short
23	int32ValueType // Integer
24	int64ValueType // Long
25	bytesValueType
26	stringValueType
27	timestampValueType
28	uuidValueType
29)
30
31func (t valueType) String() string {
32	switch t {
33	case trueValueType:
34		return "bool"
35	case falseValueType:
36		return "bool"
37	case int8ValueType:
38		return "int8"
39	case int16ValueType:
40		return "int16"
41	case int32ValueType:
42		return "int32"
43	case int64ValueType:
44		return "int64"
45	case bytesValueType:
46		return "byte_array"
47	case stringValueType:
48		return "string"
49	case timestampValueType:
50		return "timestamp"
51	case uuidValueType:
52		return "uuid"
53	default:
54		return fmt.Sprintf("unknown value type %d", uint8(t))
55	}
56}
57
58type rawValue struct {
59	Type  valueType
60	Len   uint16 // Only set for variable length slices
61	Value []byte // byte representation of value, BigEndian encoding.
62}
63
64func (r rawValue) encodeScalar(w io.Writer, v interface{}) error {
65	return binaryWriteFields(w, binary.BigEndian,
66		r.Type,
67		v,
68	)
69}
70
71func (r rawValue) encodeFixedSlice(w io.Writer, v []byte) error {
72	binary.Write(w, binary.BigEndian, r.Type)
73
74	_, err := w.Write(v)
75	return err
76}
77
78func (r rawValue) encodeBytes(w io.Writer, v []byte) error {
79	if len(v) > maxHeaderValueLen {
80		return LengthError{
81			Part: "header value",
82			Want: maxHeaderValueLen, Have: len(v),
83			Value: v,
84		}
85	}
86	r.Len = uint16(len(v))
87
88	err := binaryWriteFields(w, binary.BigEndian,
89		r.Type,
90		r.Len,
91	)
92	if err != nil {
93		return err
94	}
95
96	_, err = w.Write(v)
97	return err
98}
99
100func (r rawValue) encodeString(w io.Writer, v string) error {
101	if len(v) > maxHeaderValueLen {
102		return LengthError{
103			Part: "header value",
104			Want: maxHeaderValueLen, Have: len(v),
105			Value: v,
106		}
107	}
108	r.Len = uint16(len(v))
109
110	type stringWriter interface {
111		WriteString(string) (int, error)
112	}
113
114	err := binaryWriteFields(w, binary.BigEndian,
115		r.Type,
116		r.Len,
117	)
118	if err != nil {
119		return err
120	}
121
122	if sw, ok := w.(stringWriter); ok {
123		_, err = sw.WriteString(v)
124	} else {
125		_, err = w.Write([]byte(v))
126	}
127
128	return err
129}
130
131func decodeFixedBytesValue(r io.Reader, buf []byte) error {
132	_, err := io.ReadFull(r, buf)
133	return err
134}
135
136func decodeBytesValue(r io.Reader) ([]byte, error) {
137	var raw rawValue
138	var err error
139	raw.Len, err = decodeUint16(r)
140	if err != nil {
141		return nil, err
142	}
143
144	buf := make([]byte, raw.Len)
145	_, err = io.ReadFull(r, buf)
146	if err != nil {
147		return nil, err
148	}
149
150	return buf, nil
151}
152
153func decodeStringValue(r io.Reader) (string, error) {
154	v, err := decodeBytesValue(r)
155	return string(v), err
156}
157
158// Value represents the abstract header value.
159type Value interface {
160	Get() interface{}
161	String() string
162	valueType() valueType
163	encode(io.Writer) error
164}
165
166// An BoolValue provides eventstream encoding, and representation
167// of a Go bool value.
168type BoolValue bool
169
170// Get returns the underlying type
171func (v BoolValue) Get() interface{} {
172	return bool(v)
173}
174
175// valueType returns the EventStream header value type value.
176func (v BoolValue) valueType() valueType {
177	if v {
178		return trueValueType
179	}
180	return falseValueType
181}
182
183func (v BoolValue) String() string {
184	return strconv.FormatBool(bool(v))
185}
186
187// encode encodes the BoolValue into an eventstream binary value
188// representation.
189func (v BoolValue) encode(w io.Writer) error {
190	return binary.Write(w, binary.BigEndian, v.valueType())
191}
192
193// An Int8Value provides eventstream encoding, and representation of a Go
194// int8 value.
195type Int8Value int8
196
197// Get returns the underlying value.
198func (v Int8Value) Get() interface{} {
199	return int8(v)
200}
201
202// valueType returns the EventStream header value type value.
203func (Int8Value) valueType() valueType {
204	return int8ValueType
205}
206
207func (v Int8Value) String() string {
208	return fmt.Sprintf("0x%02x", int8(v))
209}
210
211// encode encodes the Int8Value into an eventstream binary value
212// representation.
213func (v Int8Value) encode(w io.Writer) error {
214	raw := rawValue{
215		Type: v.valueType(),
216	}
217
218	return raw.encodeScalar(w, v)
219}
220
221func (v *Int8Value) decode(r io.Reader) error {
222	n, err := decodeUint8(r)
223	if err != nil {
224		return err
225	}
226
227	*v = Int8Value(n)
228	return nil
229}
230
231// An Int16Value provides eventstream encoding, and representation of a Go
232// int16 value.
233type Int16Value int16
234
235// Get returns the underlying value.
236func (v Int16Value) Get() interface{} {
237	return int16(v)
238}
239
240// valueType returns the EventStream header value type value.
241func (Int16Value) valueType() valueType {
242	return int16ValueType
243}
244
245func (v Int16Value) String() string {
246	return fmt.Sprintf("0x%04x", int16(v))
247}
248
249// encode encodes the Int16Value into an eventstream binary value
250// representation.
251func (v Int16Value) encode(w io.Writer) error {
252	raw := rawValue{
253		Type: v.valueType(),
254	}
255	return raw.encodeScalar(w, v)
256}
257
258func (v *Int16Value) decode(r io.Reader) error {
259	n, err := decodeUint16(r)
260	if err != nil {
261		return err
262	}
263
264	*v = Int16Value(n)
265	return nil
266}
267
268// An Int32Value provides eventstream encoding, and representation of a Go
269// int32 value.
270type Int32Value int32
271
272// Get returns the underlying value.
273func (v Int32Value) Get() interface{} {
274	return int32(v)
275}
276
277// valueType returns the EventStream header value type value.
278func (Int32Value) valueType() valueType {
279	return int32ValueType
280}
281
282func (v Int32Value) String() string {
283	return fmt.Sprintf("0x%08x", int32(v))
284}
285
286// encode encodes the Int32Value into an eventstream binary value
287// representation.
288func (v Int32Value) encode(w io.Writer) error {
289	raw := rawValue{
290		Type: v.valueType(),
291	}
292	return raw.encodeScalar(w, v)
293}
294
295func (v *Int32Value) decode(r io.Reader) error {
296	n, err := decodeUint32(r)
297	if err != nil {
298		return err
299	}
300
301	*v = Int32Value(n)
302	return nil
303}
304
305// An Int64Value provides eventstream encoding, and representation of a Go
306// int64 value.
307type Int64Value int64
308
309// Get returns the underlying value.
310func (v Int64Value) Get() interface{} {
311	return int64(v)
312}
313
314// valueType returns the EventStream header value type value.
315func (Int64Value) valueType() valueType {
316	return int64ValueType
317}
318
319func (v Int64Value) String() string {
320	return fmt.Sprintf("0x%016x", int64(v))
321}
322
323// encode encodes the Int64Value into an eventstream binary value
324// representation.
325func (v Int64Value) encode(w io.Writer) error {
326	raw := rawValue{
327		Type: v.valueType(),
328	}
329	return raw.encodeScalar(w, v)
330}
331
332func (v *Int64Value) decode(r io.Reader) error {
333	n, err := decodeUint64(r)
334	if err != nil {
335		return err
336	}
337
338	*v = Int64Value(n)
339	return nil
340}
341
342// An BytesValue provides eventstream encoding, and representation of a Go
343// byte slice.
344type BytesValue []byte
345
346// Get returns the underlying value.
347func (v BytesValue) Get() interface{} {
348	return []byte(v)
349}
350
351// valueType returns the EventStream header value type value.
352func (BytesValue) valueType() valueType {
353	return bytesValueType
354}
355
356func (v BytesValue) String() string {
357	return base64.StdEncoding.EncodeToString([]byte(v))
358}
359
360// encode encodes the BytesValue into an eventstream binary value
361// representation.
362func (v BytesValue) encode(w io.Writer) error {
363	raw := rawValue{
364		Type: v.valueType(),
365	}
366
367	return raw.encodeBytes(w, []byte(v))
368}
369
370func (v *BytesValue) decode(r io.Reader) error {
371	buf, err := decodeBytesValue(r)
372	if err != nil {
373		return err
374	}
375
376	*v = BytesValue(buf)
377	return nil
378}
379
380// An StringValue provides eventstream encoding, and representation of a Go
381// string.
382type StringValue string
383
384// Get returns the underlying value.
385func (v StringValue) Get() interface{} {
386	return string(v)
387}
388
389// valueType returns the EventStream header value type value.
390func (StringValue) valueType() valueType {
391	return stringValueType
392}
393
394func (v StringValue) String() string {
395	return string(v)
396}
397
398// encode encodes the StringValue into an eventstream binary value
399// representation.
400func (v StringValue) encode(w io.Writer) error {
401	raw := rawValue{
402		Type: v.valueType(),
403	}
404
405	return raw.encodeString(w, string(v))
406}
407
408func (v *StringValue) decode(r io.Reader) error {
409	s, err := decodeStringValue(r)
410	if err != nil {
411		return err
412	}
413
414	*v = StringValue(s)
415	return nil
416}
417
418// An TimestampValue provides eventstream encoding, and representation of a Go
419// timestamp.
420type TimestampValue time.Time
421
422// Get returns the underlying value.
423func (v TimestampValue) Get() interface{} {
424	return time.Time(v)
425}
426
427// valueType returns the EventStream header value type value.
428func (TimestampValue) valueType() valueType {
429	return timestampValueType
430}
431
432func (v TimestampValue) epochMilli() int64 {
433	nano := time.Time(v).UnixNano()
434	msec := nano / int64(time.Millisecond)
435	return msec
436}
437
438func (v TimestampValue) String() string {
439	msec := v.epochMilli()
440	return strconv.FormatInt(msec, 10)
441}
442
443// encode encodes the TimestampValue into an eventstream binary value
444// representation.
445func (v TimestampValue) encode(w io.Writer) error {
446	raw := rawValue{
447		Type: v.valueType(),
448	}
449
450	msec := v.epochMilli()
451	return raw.encodeScalar(w, msec)
452}
453
454func (v *TimestampValue) decode(r io.Reader) error {
455	n, err := decodeUint64(r)
456	if err != nil {
457		return err
458	}
459
460	*v = TimestampValue(timeFromEpochMilli(int64(n)))
461	return nil
462}
463
464// MarshalJSON implements the json.Marshaler interface
465func (v TimestampValue) MarshalJSON() ([]byte, error) {
466	return []byte(v.String()), nil
467}
468
469func timeFromEpochMilli(t int64) time.Time {
470	secs := t / 1e3
471	msec := t % 1e3
472	return time.Unix(secs, msec*int64(time.Millisecond)).UTC()
473}
474
475// An UUIDValue provides eventstream encoding, and representation of a UUID
476// value.
477type UUIDValue [16]byte
478
479// Get returns the underlying value.
480func (v UUIDValue) Get() interface{} {
481	return v[:]
482}
483
484// valueType returns the EventStream header value type value.
485func (UUIDValue) valueType() valueType {
486	return uuidValueType
487}
488
489func (v UUIDValue) String() string {
490	return fmt.Sprintf(`%X-%X-%X-%X-%X`, v[0:4], v[4:6], v[6:8], v[8:10], v[10:])
491}
492
493// encode encodes the UUIDValue into an eventstream binary value
494// representation.
495func (v UUIDValue) encode(w io.Writer) error {
496	raw := rawValue{
497		Type: v.valueType(),
498	}
499
500	return raw.encodeFixedSlice(w, v[:])
501}
502
503func (v *UUIDValue) decode(r io.Reader) error {
504	tv := (*v)[:]
505	return decodeFixedBytesValue(r, tv)
506}
507