1package rpc 2 3import ( 4 "fmt" 5 "sync" 6 7 "golang.org/x/net/context" 8) 9 10type ServeHandlerDescription struct { 11 MakeArg func() interface{} 12 Handler func(ctx context.Context, arg interface{}) (ret interface{}, err error) 13} 14 15type MethodType int 16 17const ( 18 MethodInvalid MethodType = -1 19 MethodCall MethodType = 0 20 MethodResponse MethodType = 1 21 MethodNotify MethodType = 2 22 MethodCancel MethodType = 3 23 MethodCallCompressed MethodType = 4 24) 25 26func (t MethodType) String() string { 27 switch t { 28 case MethodInvalid: 29 return "Invalid" 30 case MethodCall: 31 return "Call" 32 case MethodResponse: 33 return "Response" 34 case MethodNotify: 35 return "Notify" 36 case MethodCancel: 37 return "Cancel" 38 case MethodCallCompressed: 39 return "CallCompressed" 40 default: 41 return fmt.Sprintf("Method(%d)", t) 42 } 43} 44 45type CompressionType int 46 47const ( 48 CompressionNone CompressionType = 0 49 CompressionGzip CompressionType = 1 50 CompressionMsgpackzip CompressionType = 2 51) 52 53func (t CompressionType) String() string { 54 switch t { 55 case CompressionNone: 56 return "none" 57 case CompressionGzip: 58 return "gzip" 59 case CompressionMsgpackzip: 60 return "msgpackzip" 61 default: 62 return fmt.Sprintf("Compression(%d)", t) 63 } 64} 65 66func (t CompressionType) NewCompressor() compressor { 67 switch t { 68 case CompressionGzip: 69 return newGzipCompressor() 70 case CompressionMsgpackzip: 71 return newMsgpackzipCompressor() 72 default: 73 return nil 74 } 75} 76 77type ErrorUnwrapper interface { 78 MakeArg() interface{} 79 UnwrapError(arg interface{}) (appError error, dispatchError error) 80} 81 82type Protocol struct { 83 Name string 84 Methods map[string]ServeHandlerDescription 85 WrapError WrapErrorFunc 86} 87 88type protocolMap map[string]Protocol 89 90type SeqNumber int 91 92type protocolHandler struct { 93 wef WrapErrorFunc 94 mtx sync.RWMutex 95 protocols protocolMap 96} 97 98func newProtocolHandler(wef WrapErrorFunc) *protocolHandler { 99 return &protocolHandler{ 100 wef: wef, 101 protocols: make(protocolMap), 102 } 103} 104 105func (h *protocolHandler) registerProtocol(p Protocol) error { 106 h.mtx.Lock() 107 defer h.mtx.Unlock() 108 109 if _, found := h.protocols[p.Name]; found { 110 return newAlreadyRegisteredError(p.Name) 111 } 112 h.protocols[p.Name] = p 113 return nil 114} 115 116func (h *protocolHandler) findServeHandler(name string) (*ServeHandlerDescription, WrapErrorFunc, error) { 117 h.mtx.RLock() 118 defer h.mtx.RUnlock() 119 120 p, m := splitMethodName(name) 121 prot, found := h.protocols[p] 122 if !found { 123 return nil, h.wef, newProtocolNotFoundError(p) 124 } 125 srv, found := prot.Methods[m] 126 if !found { 127 return nil, h.wef, newMethodNotFoundError(p, m) 128 } 129 return &srv, prot.WrapError, nil 130} 131 132func (h *protocolHandler) getArg(name string) (interface{}, error) { 133 handler, _, err := h.findServeHandler(name) 134 if err != nil { 135 return nil, err 136 } 137 return handler.MakeArg(), nil 138} 139