1package eventstream 2 3import ( 4 "bytes" 5 "encoding/base64" 6 "encoding/json" 7 "fmt" 8 "strconv" 9) 10 11type decodedMessage struct { 12 rawMessage 13 Headers decodedHeaders `json:"headers"` 14} 15type jsonMessage struct { 16 Length json.Number `json:"total_length"` 17 HeadersLen json.Number `json:"headers_length"` 18 PreludeCRC json.Number `json:"prelude_crc"` 19 Headers decodedHeaders `json:"headers"` 20 Payload []byte `json:"payload"` 21 CRC json.Number `json:"message_crc"` 22} 23 24func (d *decodedMessage) UnmarshalJSON(b []byte) (err error) { 25 var jsonMsg jsonMessage 26 if err = json.Unmarshal(b, &jsonMsg); err != nil { 27 return err 28 } 29 30 d.Length, err = numAsUint32(jsonMsg.Length) 31 if err != nil { 32 return err 33 } 34 d.HeadersLen, err = numAsUint32(jsonMsg.HeadersLen) 35 if err != nil { 36 return err 37 } 38 d.PreludeCRC, err = numAsUint32(jsonMsg.PreludeCRC) 39 if err != nil { 40 return err 41 } 42 d.Headers = jsonMsg.Headers 43 d.Payload = jsonMsg.Payload 44 d.CRC, err = numAsUint32(jsonMsg.CRC) 45 if err != nil { 46 return err 47 } 48 49 return nil 50} 51 52func (d *decodedMessage) MarshalJSON() ([]byte, error) { 53 jsonMsg := jsonMessage{ 54 Length: json.Number(strconv.Itoa(int(d.Length))), 55 HeadersLen: json.Number(strconv.Itoa(int(d.HeadersLen))), 56 PreludeCRC: json.Number(strconv.Itoa(int(d.PreludeCRC))), 57 Headers: d.Headers, 58 Payload: d.Payload, 59 CRC: json.Number(strconv.Itoa(int(d.CRC))), 60 } 61 62 return json.Marshal(jsonMsg) 63} 64 65func numAsUint32(n json.Number) (uint32, error) { 66 v, err := n.Int64() 67 if err != nil { 68 return 0, fmt.Errorf("failed to get int64 json number, %v", err) 69 } 70 71 return uint32(v), nil 72} 73 74func (d decodedMessage) Message() Message { 75 return Message{ 76 Headers: Headers(d.Headers), 77 Payload: d.Payload, 78 } 79} 80 81type decodedHeaders Headers 82 83func (hs *decodedHeaders) UnmarshalJSON(b []byte) error { 84 var jsonHeaders []struct { 85 Name string `json:"name"` 86 Type valueType `json:"type"` 87 Value interface{} `json:"value"` 88 } 89 90 decoder := json.NewDecoder(bytes.NewReader(b)) 91 decoder.UseNumber() 92 if err := decoder.Decode(&jsonHeaders); err != nil { 93 return err 94 } 95 96 var headers Headers 97 for _, h := range jsonHeaders { 98 value, err := valueFromType(h.Type, h.Value) 99 if err != nil { 100 return err 101 } 102 headers.Set(h.Name, value) 103 } 104 (*hs) = decodedHeaders(headers) 105 106 return nil 107} 108 109func valueFromType(typ valueType, val interface{}) (Value, error) { 110 switch typ { 111 case trueValueType: 112 return BoolValue(true), nil 113 case falseValueType: 114 return BoolValue(false), nil 115 case int8ValueType: 116 v, err := val.(json.Number).Int64() 117 return Int8Value(int8(v)), err 118 case int16ValueType: 119 v, err := val.(json.Number).Int64() 120 return Int16Value(int16(v)), err 121 case int32ValueType: 122 v, err := val.(json.Number).Int64() 123 return Int32Value(int32(v)), err 124 case int64ValueType: 125 v, err := val.(json.Number).Int64() 126 return Int64Value(v), err 127 case bytesValueType: 128 v, err := base64.StdEncoding.DecodeString(val.(string)) 129 return BytesValue(v), err 130 case stringValueType: 131 v, err := base64.StdEncoding.DecodeString(val.(string)) 132 return StringValue(string(v)), err 133 case timestampValueType: 134 v, err := val.(json.Number).Int64() 135 return TimestampValue(timeFromEpochMilli(v)), err 136 case uuidValueType: 137 v, err := base64.StdEncoding.DecodeString(val.(string)) 138 var tv UUIDValue 139 copy(tv[:], v) 140 return tv, err 141 default: 142 panic(fmt.Sprintf("unknown type, %s, %T", typ.String(), val)) 143 } 144} 145