1package grpc 2 3import ( 4 "context" 5 "fmt" 6 "reflect" 7 8 "google.golang.org/grpc" 9 "google.golang.org/grpc/metadata" 10 11 "github.com/go-kit/kit/endpoint" 12) 13 14// Client wraps a gRPC connection and provides a method that implements 15// endpoint.Endpoint. 16type Client struct { 17 client *grpc.ClientConn 18 serviceName string 19 method string 20 enc EncodeRequestFunc 21 dec DecodeResponseFunc 22 grpcReply reflect.Type 23 before []ClientRequestFunc 24 after []ClientResponseFunc 25 finalizer []ClientFinalizerFunc 26} 27 28// NewClient constructs a usable Client for a single remote endpoint. 29// Pass an zero-value protobuf message of the RPC response type as 30// the grpcReply argument. 31func NewClient( 32 cc *grpc.ClientConn, 33 serviceName string, 34 method string, 35 enc EncodeRequestFunc, 36 dec DecodeResponseFunc, 37 grpcReply interface{}, 38 options ...ClientOption, 39) *Client { 40 c := &Client{ 41 client: cc, 42 method: fmt.Sprintf("/%s/%s", serviceName, method), 43 enc: enc, 44 dec: dec, 45 // We are using reflect.Indirect here to allow both reply structs and 46 // pointers to these reply structs. New consumers of the client should 47 // use structs directly, while existing consumers will not break if they 48 // remain to use pointers to structs. 49 grpcReply: reflect.TypeOf( 50 reflect.Indirect( 51 reflect.ValueOf(grpcReply), 52 ).Interface(), 53 ), 54 before: []ClientRequestFunc{}, 55 after: []ClientResponseFunc{}, 56 } 57 for _, option := range options { 58 option(c) 59 } 60 return c 61} 62 63// ClientOption sets an optional parameter for clients. 64type ClientOption func(*Client) 65 66// ClientBefore sets the RequestFuncs that are applied to the outgoing gRPC 67// request before it's invoked. 68func ClientBefore(before ...ClientRequestFunc) ClientOption { 69 return func(c *Client) { c.before = append(c.before, before...) } 70} 71 72// ClientAfter sets the ClientResponseFuncs that are applied to the incoming 73// gRPC response prior to it being decoded. This is useful for obtaining 74// response metadata and adding onto the context prior to decoding. 75func ClientAfter(after ...ClientResponseFunc) ClientOption { 76 return func(c *Client) { c.after = append(c.after, after...) } 77} 78 79// ClientFinalizer is executed at the end of every gRPC request. 80// By default, no finalizer is registered. 81func ClientFinalizer(f ...ClientFinalizerFunc) ClientOption { 82 return func(s *Client) { s.finalizer = append(s.finalizer, f...) } 83} 84 85// Endpoint returns a usable endpoint that will invoke the gRPC specified by the 86// client. 87func (c Client) Endpoint() endpoint.Endpoint { 88 return func(ctx context.Context, request interface{}) (response interface{}, err error) { 89 ctx, cancel := context.WithCancel(ctx) 90 defer cancel() 91 92 if c.finalizer != nil { 93 defer func() { 94 for _, f := range c.finalizer { 95 f(ctx, err) 96 } 97 }() 98 } 99 100 ctx = context.WithValue(ctx, ContextKeyRequestMethod, c.method) 101 102 req, err := c.enc(ctx, request) 103 if err != nil { 104 return nil, err 105 } 106 107 md := &metadata.MD{} 108 for _, f := range c.before { 109 ctx = f(ctx, md) 110 } 111 ctx = metadata.NewOutgoingContext(ctx, *md) 112 113 var header, trailer metadata.MD 114 grpcReply := reflect.New(c.grpcReply).Interface() 115 if err = c.client.Invoke( 116 ctx, c.method, req, grpcReply, grpc.Header(&header), 117 grpc.Trailer(&trailer), 118 ); err != nil { 119 return nil, err 120 } 121 122 for _, f := range c.after { 123 ctx = f(ctx, header, trailer) 124 } 125 126 response, err = c.dec(ctx, grpcReply) 127 if err != nil { 128 return nil, err 129 } 130 return response, nil 131 } 132} 133 134// ClientFinalizerFunc can be used to perform work at the end of a client gRPC 135// request, after the response is returned. The principal 136// intended use is for error logging. Additional response parameters are 137// provided in the context under keys with the ContextKeyResponse prefix. 138// Note: err may be nil. There maybe also no additional response parameters depending on 139// when an error occurs. 140type ClientFinalizerFunc func(ctx context.Context, err error) 141