1package rpc
2
3import (
4	"fmt"
5	"sync"
6
7	"golang.org/x/net/context"
8)
9
10type ServeHandlerDescription struct {
11	MakeArg func() interface{}
12	Handler func(ctx context.Context, arg interface{}) (ret interface{}, err error)
13}
14
15type MethodType int
16
17const (
18	MethodInvalid        MethodType = -1
19	MethodCall           MethodType = 0
20	MethodResponse       MethodType = 1
21	MethodNotify         MethodType = 2
22	MethodCancel         MethodType = 3
23	MethodCallCompressed MethodType = 4
24)
25
26func (t MethodType) String() string {
27	switch t {
28	case MethodInvalid:
29		return "Invalid"
30	case MethodCall:
31		return "Call"
32	case MethodResponse:
33		return "Response"
34	case MethodNotify:
35		return "Notify"
36	case MethodCancel:
37		return "Cancel"
38	case MethodCallCompressed:
39		return "CallCompressed"
40	default:
41		return fmt.Sprintf("Method(%d)", t)
42	}
43}
44
45type CompressionType int
46
47const (
48	CompressionNone       CompressionType = 0
49	CompressionGzip       CompressionType = 1
50	CompressionMsgpackzip CompressionType = 2
51)
52
53func (t CompressionType) String() string {
54	switch t {
55	case CompressionNone:
56		return "none"
57	case CompressionGzip:
58		return "gzip"
59	case CompressionMsgpackzip:
60		return "msgpackzip"
61	default:
62		return fmt.Sprintf("Compression(%d)", t)
63	}
64}
65
66func (t CompressionType) NewCompressor() compressor {
67	switch t {
68	case CompressionGzip:
69		return newGzipCompressor()
70	case CompressionMsgpackzip:
71		return newMsgpackzipCompressor()
72	default:
73		return nil
74	}
75}
76
77type ErrorUnwrapper interface {
78	MakeArg() interface{}
79	UnwrapError(arg interface{}) (appError error, dispatchError error)
80}
81
82type Protocol struct {
83	Name      string
84	Methods   map[string]ServeHandlerDescription
85	WrapError WrapErrorFunc
86}
87
88type protocolMap map[string]Protocol
89
90type SeqNumber int
91
92type protocolHandler struct {
93	wef       WrapErrorFunc
94	mtx       sync.RWMutex
95	protocols protocolMap
96}
97
98func newProtocolHandler(wef WrapErrorFunc) *protocolHandler {
99	return &protocolHandler{
100		wef:       wef,
101		protocols: make(protocolMap),
102	}
103}
104
105func (h *protocolHandler) registerProtocol(p Protocol) error {
106	h.mtx.Lock()
107	defer h.mtx.Unlock()
108
109	if _, found := h.protocols[p.Name]; found {
110		return newAlreadyRegisteredError(p.Name)
111	}
112	h.protocols[p.Name] = p
113	return nil
114}
115
116func (h *protocolHandler) findServeHandler(name string) (*ServeHandlerDescription, WrapErrorFunc, error) {
117	h.mtx.RLock()
118	defer h.mtx.RUnlock()
119
120	p, m := splitMethodName(name)
121	prot, found := h.protocols[p]
122	if !found {
123		return nil, h.wef, newProtocolNotFoundError(p)
124	}
125	srv, found := prot.Methods[m]
126	if !found {
127		return nil, h.wef, newMethodNotFoundError(p, m)
128	}
129	return &srv, prot.WrapError, nil
130}
131
132func (h *protocolHandler) getArg(name string) (interface{}, error) {
133	handler, _, err := h.findServeHandler(name)
134	if err != nil {
135		return nil, err
136	}
137	return handler.MakeArg(), nil
138}
139