1// Copyright 2017 Michal Witkowski. All Rights Reserved. 2// See LICENSE for licensing terms. 3 4package proxy 5 6import ( 7 "context" 8 9 "google.golang.org/grpc" 10) 11 12// StreamDirector returns a gRPC ClientConn to be used to forward the call to. 13// 14// The presence of the `Context` allows for rich filtering, e.g. based on Metadata (headers). 15// If no handling is meant to be done, a `codes.NotImplemented` gRPC error should be returned. 16// 17// The context returned from this function should be the context for the *outgoing* (to backend) call. In case you want 18// to forward any Metadata between the inbound request and outbound requests, you should do it manually. However, you 19// *must* propagate the cancel function (`context.WithCancel`) of the inbound context to the one returned. 20// 21// It is worth noting that the StreamDirector will be fired *after* all server-side stream interceptors 22// are invoked. So decisions around authorization, monitoring etc. are better to be handled there. 23// 24// See the rather rich example. 25type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamPeeker) (*StreamParameters, error) 26 27// StreamParameters encapsulates streaming parameters the praefect coordinator returns to the 28// proxy handler 29type StreamParameters struct { 30 primary Destination 31 reqFinalizer func() error 32 callOptions []grpc.CallOption 33 secondaries []Destination 34} 35 36// Destination contains a client connection as well as a rewritten protobuf message 37type Destination struct { 38 // Ctx is the context used for the connection. 39 Ctx context.Context 40 // Conn is the GRPC client connection. 41 Conn *grpc.ClientConn 42 // Msg is the initial message which shall be sent to the destination. This is used in order 43 // to allow for re-writing the header message. 44 Msg []byte 45 // ErrHandler is invoked when proxying to the destination fails. It can be used to swallow 46 // errors in case proxying failures are considered to be non-fatal. If all errors are 47 // swallowed, the proxied RPC will be successful. 48 ErrHandler func(error) error 49} 50 51// NewStreamParameters returns a new instance of StreamParameters 52func NewStreamParameters(primary Destination, secondaries []Destination, reqFinalizer func() error, callOpts []grpc.CallOption) *StreamParameters { 53 return &StreamParameters{ 54 primary: primary, 55 secondaries: secondaries, 56 reqFinalizer: reqFinalizer, 57 callOptions: callOpts, 58 } 59} 60 61func (s *StreamParameters) Primary() Destination { 62 return s.primary 63} 64 65func (s *StreamParameters) Secondaries() []Destination { 66 return s.secondaries 67} 68 69// RequestFinalizer calls the request finalizer 70func (s *StreamParameters) RequestFinalizer() error { 71 if s.reqFinalizer != nil { 72 return s.reqFinalizer() 73 } 74 return nil 75} 76 77// CallOptions returns call options 78func (s *StreamParameters) CallOptions() []grpc.CallOption { 79 return s.callOptions 80} 81