1// Copyright 2017 Michal Witkowski. All Rights Reserved.
2// See LICENSE for licensing terms.
3
4package proxy
5
6import (
7	"context"
8
9	"google.golang.org/grpc"
10)
11
12// StreamDirector returns a gRPC ClientConn to be used to forward the call to.
13//
14// The presence of the `Context` allows for rich filtering, e.g. based on Metadata (headers).
15// If no handling is meant to be done, a `codes.NotImplemented` gRPC error should be returned.
16//
17// The context returned from this function should be the context for the *outgoing* (to backend) call. In case you want
18// to forward any Metadata between the inbound request and outbound requests, you should do it manually. However, you
19// *must* propagate the cancel function (`context.WithCancel`) of the inbound context to the one returned.
20//
21// It is worth noting that the StreamDirector will be fired *after* all server-side stream interceptors
22// are invoked. So decisions around authorization, monitoring etc. are better to be handled there.
23//
24// See the rather rich example.
25type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamPeeker) (*StreamParameters, error)
26
27// StreamParameters encapsulates streaming parameters the praefect coordinator returns to the
28// proxy handler
29type StreamParameters struct {
30	primary      Destination
31	reqFinalizer func() error
32	callOptions  []grpc.CallOption
33	secondaries  []Destination
34}
35
36// Destination contains a client connection as well as a rewritten protobuf message
37type Destination struct {
38	// Ctx is the context used for the connection.
39	Ctx context.Context
40	// Conn is the GRPC client connection.
41	Conn *grpc.ClientConn
42	// Msg is the initial message which shall be sent to the destination. This is used in order
43	// to allow for re-writing the header message.
44	Msg []byte
45	// ErrHandler is invoked when proxying to the destination fails. It can be used to swallow
46	// errors in case proxying failures are considered to be non-fatal. If all errors are
47	// swallowed, the proxied RPC will be successful.
48	ErrHandler func(error) error
49}
50
51// NewStreamParameters returns a new instance of StreamParameters
52func NewStreamParameters(primary Destination, secondaries []Destination, reqFinalizer func() error, callOpts []grpc.CallOption) *StreamParameters {
53	return &StreamParameters{
54		primary:      primary,
55		secondaries:  secondaries,
56		reqFinalizer: reqFinalizer,
57		callOptions:  callOpts,
58	}
59}
60
61func (s *StreamParameters) Primary() Destination {
62	return s.primary
63}
64
65func (s *StreamParameters) Secondaries() []Destination {
66	return s.secondaries
67}
68
69// RequestFinalizer calls the request finalizer
70func (s *StreamParameters) RequestFinalizer() error {
71	if s.reqFinalizer != nil {
72		return s.reqFinalizer()
73	}
74	return nil
75}
76
77// CallOptions returns call options
78func (s *StreamParameters) CallOptions() []grpc.CallOption {
79	return s.callOptions
80}
81