1package eventstream 2 3import ( 4 "encoding/base64" 5 "encoding/binary" 6 "fmt" 7 "io" 8 "strconv" 9 "time" 10) 11 12const maxHeaderValueLen = 1<<15 - 1 // 2^15-1 or 32KB - 1 13 14// valueType is the EventStream header value type. 15type valueType uint8 16 17// Header value types 18const ( 19 trueValueType valueType = iota 20 falseValueType 21 int8ValueType // Byte 22 int16ValueType // Short 23 int32ValueType // Integer 24 int64ValueType // Long 25 bytesValueType 26 stringValueType 27 timestampValueType 28 uuidValueType 29) 30 31func (t valueType) String() string { 32 switch t { 33 case trueValueType: 34 return "bool" 35 case falseValueType: 36 return "bool" 37 case int8ValueType: 38 return "int8" 39 case int16ValueType: 40 return "int16" 41 case int32ValueType: 42 return "int32" 43 case int64ValueType: 44 return "int64" 45 case bytesValueType: 46 return "byte_array" 47 case stringValueType: 48 return "string" 49 case timestampValueType: 50 return "timestamp" 51 case uuidValueType: 52 return "uuid" 53 default: 54 return fmt.Sprintf("unknown value type %d", uint8(t)) 55 } 56} 57 58type rawValue struct { 59 Type valueType 60 Len uint16 // Only set for variable length slices 61 Value []byte // byte representation of value, BigEndian encoding. 62} 63 64func (r rawValue) encodeScalar(w io.Writer, v interface{}) error { 65 return binaryWriteFields(w, binary.BigEndian, 66 r.Type, 67 v, 68 ) 69} 70 71func (r rawValue) encodeFixedSlice(w io.Writer, v []byte) error { 72 binary.Write(w, binary.BigEndian, r.Type) 73 74 _, err := w.Write(v) 75 return err 76} 77 78func (r rawValue) encodeBytes(w io.Writer, v []byte) error { 79 if len(v) > maxHeaderValueLen { 80 return LengthError{ 81 Part: "header value", 82 Want: maxHeaderValueLen, Have: len(v), 83 Value: v, 84 } 85 } 86 r.Len = uint16(len(v)) 87 88 err := binaryWriteFields(w, binary.BigEndian, 89 r.Type, 90 r.Len, 91 ) 92 if err != nil { 93 return err 94 } 95 96 _, err = w.Write(v) 97 return err 98} 99 100func (r rawValue) encodeString(w io.Writer, v string) error { 101 if len(v) > maxHeaderValueLen { 102 return LengthError{ 103 Part: "header value", 104 Want: maxHeaderValueLen, Have: len(v), 105 Value: v, 106 } 107 } 108 r.Len = uint16(len(v)) 109 110 type stringWriter interface { 111 WriteString(string) (int, error) 112 } 113 114 err := binaryWriteFields(w, binary.BigEndian, 115 r.Type, 116 r.Len, 117 ) 118 if err != nil { 119 return err 120 } 121 122 if sw, ok := w.(stringWriter); ok { 123 _, err = sw.WriteString(v) 124 } else { 125 _, err = w.Write([]byte(v)) 126 } 127 128 return err 129} 130 131func decodeFixedBytesValue(r io.Reader, buf []byte) error { 132 _, err := io.ReadFull(r, buf) 133 return err 134} 135 136func decodeBytesValue(r io.Reader) ([]byte, error) { 137 var raw rawValue 138 var err error 139 raw.Len, err = decodeUint16(r) 140 if err != nil { 141 return nil, err 142 } 143 144 buf := make([]byte, raw.Len) 145 _, err = io.ReadFull(r, buf) 146 if err != nil { 147 return nil, err 148 } 149 150 return buf, nil 151} 152 153func decodeStringValue(r io.Reader) (string, error) { 154 v, err := decodeBytesValue(r) 155 return string(v), err 156} 157 158// Value represents the abstract header value. 159type Value interface { 160 Get() interface{} 161 String() string 162 valueType() valueType 163 encode(io.Writer) error 164} 165 166// An BoolValue provides eventstream encoding, and representation 167// of a Go bool value. 168type BoolValue bool 169 170// Get returns the underlying type 171func (v BoolValue) Get() interface{} { 172 return bool(v) 173} 174 175// valueType returns the EventStream header value type value. 176func (v BoolValue) valueType() valueType { 177 if v { 178 return trueValueType 179 } 180 return falseValueType 181} 182 183func (v BoolValue) String() string { 184 return strconv.FormatBool(bool(v)) 185} 186 187// encode encodes the BoolValue into an eventstream binary value 188// representation. 189func (v BoolValue) encode(w io.Writer) error { 190 return binary.Write(w, binary.BigEndian, v.valueType()) 191} 192 193// An Int8Value provides eventstream encoding, and representation of a Go 194// int8 value. 195type Int8Value int8 196 197// Get returns the underlying value. 198func (v Int8Value) Get() interface{} { 199 return int8(v) 200} 201 202// valueType returns the EventStream header value type value. 203func (Int8Value) valueType() valueType { 204 return int8ValueType 205} 206 207func (v Int8Value) String() string { 208 return fmt.Sprintf("0x%02x", int8(v)) 209} 210 211// encode encodes the Int8Value into an eventstream binary value 212// representation. 213func (v Int8Value) encode(w io.Writer) error { 214 raw := rawValue{ 215 Type: v.valueType(), 216 } 217 218 return raw.encodeScalar(w, v) 219} 220 221func (v *Int8Value) decode(r io.Reader) error { 222 n, err := decodeUint8(r) 223 if err != nil { 224 return err 225 } 226 227 *v = Int8Value(n) 228 return nil 229} 230 231// An Int16Value provides eventstream encoding, and representation of a Go 232// int16 value. 233type Int16Value int16 234 235// Get returns the underlying value. 236func (v Int16Value) Get() interface{} { 237 return int16(v) 238} 239 240// valueType returns the EventStream header value type value. 241func (Int16Value) valueType() valueType { 242 return int16ValueType 243} 244 245func (v Int16Value) String() string { 246 return fmt.Sprintf("0x%04x", int16(v)) 247} 248 249// encode encodes the Int16Value into an eventstream binary value 250// representation. 251func (v Int16Value) encode(w io.Writer) error { 252 raw := rawValue{ 253 Type: v.valueType(), 254 } 255 return raw.encodeScalar(w, v) 256} 257 258func (v *Int16Value) decode(r io.Reader) error { 259 n, err := decodeUint16(r) 260 if err != nil { 261 return err 262 } 263 264 *v = Int16Value(n) 265 return nil 266} 267 268// An Int32Value provides eventstream encoding, and representation of a Go 269// int32 value. 270type Int32Value int32 271 272// Get returns the underlying value. 273func (v Int32Value) Get() interface{} { 274 return int32(v) 275} 276 277// valueType returns the EventStream header value type value. 278func (Int32Value) valueType() valueType { 279 return int32ValueType 280} 281 282func (v Int32Value) String() string { 283 return fmt.Sprintf("0x%08x", int32(v)) 284} 285 286// encode encodes the Int32Value into an eventstream binary value 287// representation. 288func (v Int32Value) encode(w io.Writer) error { 289 raw := rawValue{ 290 Type: v.valueType(), 291 } 292 return raw.encodeScalar(w, v) 293} 294 295func (v *Int32Value) decode(r io.Reader) error { 296 n, err := decodeUint32(r) 297 if err != nil { 298 return err 299 } 300 301 *v = Int32Value(n) 302 return nil 303} 304 305// An Int64Value provides eventstream encoding, and representation of a Go 306// int64 value. 307type Int64Value int64 308 309// Get returns the underlying value. 310func (v Int64Value) Get() interface{} { 311 return int64(v) 312} 313 314// valueType returns the EventStream header value type value. 315func (Int64Value) valueType() valueType { 316 return int64ValueType 317} 318 319func (v Int64Value) String() string { 320 return fmt.Sprintf("0x%016x", int64(v)) 321} 322 323// encode encodes the Int64Value into an eventstream binary value 324// representation. 325func (v Int64Value) encode(w io.Writer) error { 326 raw := rawValue{ 327 Type: v.valueType(), 328 } 329 return raw.encodeScalar(w, v) 330} 331 332func (v *Int64Value) decode(r io.Reader) error { 333 n, err := decodeUint64(r) 334 if err != nil { 335 return err 336 } 337 338 *v = Int64Value(n) 339 return nil 340} 341 342// An BytesValue provides eventstream encoding, and representation of a Go 343// byte slice. 344type BytesValue []byte 345 346// Get returns the underlying value. 347func (v BytesValue) Get() interface{} { 348 return []byte(v) 349} 350 351// valueType returns the EventStream header value type value. 352func (BytesValue) valueType() valueType { 353 return bytesValueType 354} 355 356func (v BytesValue) String() string { 357 return base64.StdEncoding.EncodeToString([]byte(v)) 358} 359 360// encode encodes the BytesValue into an eventstream binary value 361// representation. 362func (v BytesValue) encode(w io.Writer) error { 363 raw := rawValue{ 364 Type: v.valueType(), 365 } 366 367 return raw.encodeBytes(w, []byte(v)) 368} 369 370func (v *BytesValue) decode(r io.Reader) error { 371 buf, err := decodeBytesValue(r) 372 if err != nil { 373 return err 374 } 375 376 *v = BytesValue(buf) 377 return nil 378} 379 380// An StringValue provides eventstream encoding, and representation of a Go 381// string. 382type StringValue string 383 384// Get returns the underlying value. 385func (v StringValue) Get() interface{} { 386 return string(v) 387} 388 389// valueType returns the EventStream header value type value. 390func (StringValue) valueType() valueType { 391 return stringValueType 392} 393 394func (v StringValue) String() string { 395 return string(v) 396} 397 398// encode encodes the StringValue into an eventstream binary value 399// representation. 400func (v StringValue) encode(w io.Writer) error { 401 raw := rawValue{ 402 Type: v.valueType(), 403 } 404 405 return raw.encodeString(w, string(v)) 406} 407 408func (v *StringValue) decode(r io.Reader) error { 409 s, err := decodeStringValue(r) 410 if err != nil { 411 return err 412 } 413 414 *v = StringValue(s) 415 return nil 416} 417 418// An TimestampValue provides eventstream encoding, and representation of a Go 419// timestamp. 420type TimestampValue time.Time 421 422// Get returns the underlying value. 423func (v TimestampValue) Get() interface{} { 424 return time.Time(v) 425} 426 427// valueType returns the EventStream header value type value. 428func (TimestampValue) valueType() valueType { 429 return timestampValueType 430} 431 432func (v TimestampValue) epochMilli() int64 { 433 nano := time.Time(v).UnixNano() 434 msec := nano / int64(time.Millisecond) 435 return msec 436} 437 438func (v TimestampValue) String() string { 439 msec := v.epochMilli() 440 return strconv.FormatInt(msec, 10) 441} 442 443// encode encodes the TimestampValue into an eventstream binary value 444// representation. 445func (v TimestampValue) encode(w io.Writer) error { 446 raw := rawValue{ 447 Type: v.valueType(), 448 } 449 450 msec := v.epochMilli() 451 return raw.encodeScalar(w, msec) 452} 453 454func (v *TimestampValue) decode(r io.Reader) error { 455 n, err := decodeUint64(r) 456 if err != nil { 457 return err 458 } 459 460 *v = TimestampValue(timeFromEpochMilli(int64(n))) 461 return nil 462} 463 464// MarshalJSON implements the json.Marshaler interface 465func (v TimestampValue) MarshalJSON() ([]byte, error) { 466 return []byte(v.String()), nil 467} 468 469func timeFromEpochMilli(t int64) time.Time { 470 secs := t / 1e3 471 msec := t % 1e3 472 return time.Unix(secs, msec*int64(time.Millisecond)).UTC() 473} 474 475// An UUIDValue provides eventstream encoding, and representation of a UUID 476// value. 477type UUIDValue [16]byte 478 479// Get returns the underlying value. 480func (v UUIDValue) Get() interface{} { 481 return v[:] 482} 483 484// valueType returns the EventStream header value type value. 485func (UUIDValue) valueType() valueType { 486 return uuidValueType 487} 488 489func (v UUIDValue) String() string { 490 return fmt.Sprintf(`%X-%X-%X-%X-%X`, v[0:4], v[4:6], v[6:8], v[8:10], v[10:]) 491} 492 493// encode encodes the UUIDValue into an eventstream binary value 494// representation. 495func (v UUIDValue) encode(w io.Writer) error { 496 raw := rawValue{ 497 Type: v.valueType(), 498 } 499 500 return raw.encodeFixedSlice(w, v[:]) 501} 502 503func (v *UUIDValue) decode(r io.Reader) error { 504 tv := (*v)[:] 505 return decodeFixedBytesValue(r, tv) 506} 507