1package nsq 2 3import ( 4 "encoding/binary" 5 "errors" 6 "fmt" 7 "io" 8 "regexp" 9) 10 11// MagicV1 is the initial identifier sent when connecting for V1 clients 12var MagicV1 = []byte(" V1") 13 14// MagicV2 is the initial identifier sent when connecting for V2 clients 15var MagicV2 = []byte(" V2") 16 17// frame types 18const ( 19 FrameTypeResponse int32 = 0 20 FrameTypeError int32 = 1 21 FrameTypeMessage int32 = 2 22) 23 24var validTopicChannelNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+(#ephemeral)?$`) 25 26// IsValidTopicName checks a topic name for correctness 27func IsValidTopicName(name string) bool { 28 return isValidName(name) 29} 30 31// IsValidChannelName checks a channel name for correctness 32func IsValidChannelName(name string) bool { 33 return isValidName(name) 34} 35 36func isValidName(name string) bool { 37 if len(name) > 64 || len(name) < 1 { 38 return false 39 } 40 return validTopicChannelNameRegex.MatchString(name) 41} 42 43// ReadResponse is a client-side utility function to read from the supplied Reader 44// according to the NSQ protocol spec: 45// 46// [x][x][x][x][x][x][x][x]... 47// | (int32) || (binary) 48// | 4-byte || N-byte 49// ------------------------... 50// size data 51func ReadResponse(r io.Reader) ([]byte, error) { 52 var msgSize int32 53 54 // message size 55 err := binary.Read(r, binary.BigEndian, &msgSize) 56 if err != nil { 57 return nil, err 58 } 59 60 if msgSize < 0 { 61 return nil, fmt.Errorf("response msg size is negative: %v", msgSize) 62 } 63 // message binary data 64 buf := make([]byte, msgSize) 65 _, err = io.ReadFull(r, buf) 66 if err != nil { 67 return nil, err 68 } 69 70 return buf, nil 71} 72 73// UnpackResponse is a client-side utility function that unpacks serialized data 74// according to NSQ protocol spec: 75// 76// [x][x][x][x][x][x][x][x]... 77// | (int32) || (binary) 78// | 4-byte || N-byte 79// ------------------------... 80// frame ID data 81// 82// Returns a triplicate of: frame type, data ([]byte), error 83func UnpackResponse(response []byte) (int32, []byte, error) { 84 if len(response) < 4 { 85 return -1, nil, errors.New("length of response is too small") 86 } 87 88 return int32(binary.BigEndian.Uint32(response)), response[4:], nil 89} 90 91// ReadUnpackedResponse reads and parses data from the underlying 92// TCP connection according to the NSQ TCP protocol spec and 93// returns the frameType, data or error 94func ReadUnpackedResponse(r io.Reader) (int32, []byte, error) { 95 resp, err := ReadResponse(r) 96 if err != nil { 97 return -1, nil, err 98 } 99 return UnpackResponse(resp) 100} 101