1package eventstream
2
3import (
4	"bytes"
5	"encoding/base64"
6	"encoding/json"
7	"fmt"
8	"strconv"
9)
10
11type decodedMessage struct {
12	rawMessage
13	Headers decodedHeaders `json:"headers"`
14}
15type jsonMessage struct {
16	Length     json.Number    `json:"total_length"`
17	HeadersLen json.Number    `json:"headers_length"`
18	PreludeCRC json.Number    `json:"prelude_crc"`
19	Headers    decodedHeaders `json:"headers"`
20	Payload    []byte         `json:"payload"`
21	CRC        json.Number    `json:"message_crc"`
22}
23
24func (d *decodedMessage) UnmarshalJSON(b []byte) (err error) {
25	var jsonMsg jsonMessage
26	if err = json.Unmarshal(b, &jsonMsg); err != nil {
27		return err
28	}
29
30	d.Length, err = numAsUint32(jsonMsg.Length)
31	if err != nil {
32		return err
33	}
34	d.HeadersLen, err = numAsUint32(jsonMsg.HeadersLen)
35	if err != nil {
36		return err
37	}
38	d.PreludeCRC, err = numAsUint32(jsonMsg.PreludeCRC)
39	if err != nil {
40		return err
41	}
42	d.Headers = jsonMsg.Headers
43	d.Payload = jsonMsg.Payload
44	d.CRC, err = numAsUint32(jsonMsg.CRC)
45	if err != nil {
46		return err
47	}
48
49	return nil
50}
51
52func (d *decodedMessage) MarshalJSON() ([]byte, error) {
53	jsonMsg := jsonMessage{
54		Length:     json.Number(strconv.Itoa(int(d.Length))),
55		HeadersLen: json.Number(strconv.Itoa(int(d.HeadersLen))),
56		PreludeCRC: json.Number(strconv.Itoa(int(d.PreludeCRC))),
57		Headers:    d.Headers,
58		Payload:    d.Payload,
59		CRC:        json.Number(strconv.Itoa(int(d.CRC))),
60	}
61
62	return json.Marshal(jsonMsg)
63}
64
65func numAsUint32(n json.Number) (uint32, error) {
66	v, err := n.Int64()
67	if err != nil {
68		return 0, fmt.Errorf("failed to get int64 json number, %v", err)
69	}
70
71	return uint32(v), nil
72}
73
74func (d decodedMessage) Message() Message {
75	return Message{
76		Headers: Headers(d.Headers),
77		Payload: d.Payload,
78	}
79}
80
81type decodedHeaders Headers
82
83func (hs *decodedHeaders) UnmarshalJSON(b []byte) error {
84	var jsonHeaders []struct {
85		Name  string      `json:"name"`
86		Type  valueType   `json:"type"`
87		Value interface{} `json:"value"`
88	}
89
90	decoder := json.NewDecoder(bytes.NewReader(b))
91	decoder.UseNumber()
92	if err := decoder.Decode(&jsonHeaders); err != nil {
93		return err
94	}
95
96	var headers Headers
97	for _, h := range jsonHeaders {
98		value, err := valueFromType(h.Type, h.Value)
99		if err != nil {
100			return err
101		}
102		headers.Set(h.Name, value)
103	}
104	*hs = decodedHeaders(headers)
105
106	return nil
107}
108
109func valueFromType(typ valueType, val interface{}) (Value, error) {
110	switch typ {
111	case trueValueType:
112		return BoolValue(true), nil
113	case falseValueType:
114		return BoolValue(false), nil
115	case int8ValueType:
116		v, err := val.(json.Number).Int64()
117		return Int8Value(int8(v)), err
118	case int16ValueType:
119		v, err := val.(json.Number).Int64()
120		return Int16Value(int16(v)), err
121	case int32ValueType:
122		v, err := val.(json.Number).Int64()
123		return Int32Value(int32(v)), err
124	case int64ValueType:
125		v, err := val.(json.Number).Int64()
126		return Int64Value(v), err
127	case bytesValueType:
128		v, err := base64.StdEncoding.DecodeString(val.(string))
129		return BytesValue(v), err
130	case stringValueType:
131		v, err := base64.StdEncoding.DecodeString(val.(string))
132		return StringValue(string(v)), err
133	case timestampValueType:
134		v, err := val.(json.Number).Int64()
135		return TimestampValue(timeFromEpochMilli(v)), err
136	case uuidValueType:
137		v, err := base64.StdEncoding.DecodeString(val.(string))
138		var tv UUIDValue
139		copy(tv[:], v)
140		return tv, err
141	default:
142		panic(fmt.Sprintf("unknown type, %s, %T", typ.String(), val))
143	}
144}
145