1package nsq
2
3import (
4	"encoding/binary"
5	"errors"
6	"fmt"
7	"io"
8	"regexp"
9)
10
11// MagicV1 is the initial identifier sent when connecting for V1 clients
12var MagicV1 = []byte("  V1")
13
14// MagicV2 is the initial identifier sent when connecting for V2 clients
15var MagicV2 = []byte("  V2")
16
17// frame types
18const (
19	FrameTypeResponse int32 = 0
20	FrameTypeError    int32 = 1
21	FrameTypeMessage  int32 = 2
22)
23
24var validTopicChannelNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+(#ephemeral)?$`)
25
26// IsValidTopicName checks a topic name for correctness
27func IsValidTopicName(name string) bool {
28	return isValidName(name)
29}
30
31// IsValidChannelName checks a channel name for correctness
32func IsValidChannelName(name string) bool {
33	return isValidName(name)
34}
35
36func isValidName(name string) bool {
37	if len(name) > 64 || len(name) < 1 {
38		return false
39	}
40	return validTopicChannelNameRegex.MatchString(name)
41}
42
43// ReadResponse is a client-side utility function to read from the supplied Reader
44// according to the NSQ protocol spec:
45//
46//    [x][x][x][x][x][x][x][x]...
47//    |  (int32) || (binary)
48//    |  4-byte  || N-byte
49//    ------------------------...
50//        size       data
51func ReadResponse(r io.Reader) ([]byte, error) {
52	var msgSize int32
53
54	// message size
55	err := binary.Read(r, binary.BigEndian, &msgSize)
56	if err != nil {
57		return nil, err
58	}
59
60	if msgSize < 0 {
61		return nil, fmt.Errorf("response msg size is negative: %v", msgSize)
62	}
63	// message binary data
64	buf := make([]byte, msgSize)
65	_, err = io.ReadFull(r, buf)
66	if err != nil {
67		return nil, err
68	}
69
70	return buf, nil
71}
72
73// UnpackResponse is a client-side utility function that unpacks serialized data
74// according to NSQ protocol spec:
75//
76//    [x][x][x][x][x][x][x][x]...
77//    |  (int32) || (binary)
78//    |  4-byte  || N-byte
79//    ------------------------...
80//      frame ID     data
81//
82// Returns a triplicate of: frame type, data ([]byte), error
83func UnpackResponse(response []byte) (int32, []byte, error) {
84	if len(response) < 4 {
85		return -1, nil, errors.New("length of response is too small")
86	}
87
88	return int32(binary.BigEndian.Uint32(response)), response[4:], nil
89}
90
91// ReadUnpackedResponse reads and parses data from the underlying
92// TCP connection according to the NSQ TCP protocol spec and
93// returns the frameType, data or error
94func ReadUnpackedResponse(r io.Reader) (int32, []byte, error) {
95	resp, err := ReadResponse(r)
96	if err != nil {
97		return -1, nil, err
98	}
99	return UnpackResponse(resp)
100}
101