1/*
2 *
3 * Copyright 2020 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package resolver provides internal resolver-related functionality.
20package resolver
21
22import (
23	"context"
24	"sync"
25
26	"google.golang.org/grpc/internal/serviceconfig"
27	"google.golang.org/grpc/metadata"
28	"google.golang.org/grpc/resolver"
29)
30
31// ConfigSelector controls what configuration to use for every RPC.
32type ConfigSelector interface {
33	// Selects the configuration for the RPC, or terminates it using the error.
34	// This error will be converted by the gRPC library to a status error with
35	// code UNKNOWN if it is not returned as a status error.
36	SelectConfig(RPCInfo) (*RPCConfig, error)
37}
38
39// RPCInfo contains RPC information needed by a ConfigSelector.
40type RPCInfo struct {
41	// Context is the user's context for the RPC and contains headers and
42	// application timeout.  It is passed for interception purposes and for
43	// efficiency reasons.  SelectConfig should not be blocking.
44	Context context.Context
45	Method  string // i.e. "/Service/Method"
46}
47
48// RPCConfig describes the configuration to use for each RPC.
49type RPCConfig struct {
50	// The context to use for the remainder of the RPC; can pass info to LB
51	// policy or affect timeout or metadata.
52	Context      context.Context
53	MethodConfig serviceconfig.MethodConfig // configuration to use for this RPC
54	OnCommitted  func()                     // Called when the RPC has been committed (retries no longer possible)
55	Interceptor  ClientInterceptor
56}
57
58// ClientStream is the same as grpc.ClientStream, but defined here for circular
59// dependency reasons.
60type ClientStream interface {
61	// Header returns the header metadata received from the server if there
62	// is any. It blocks if the metadata is not ready to read.
63	Header() (metadata.MD, error)
64	// Trailer returns the trailer metadata from the server, if there is any.
65	// It must only be called after stream.CloseAndRecv has returned, or
66	// stream.Recv has returned a non-nil error (including io.EOF).
67	Trailer() metadata.MD
68	// CloseSend closes the send direction of the stream. It closes the stream
69	// when non-nil error is met. It is also not safe to call CloseSend
70	// concurrently with SendMsg.
71	CloseSend() error
72	// Context returns the context for this stream.
73	//
74	// It should not be called until after Header or RecvMsg has returned. Once
75	// called, subsequent client-side retries are disabled.
76	Context() context.Context
77	// SendMsg is generally called by generated code. On error, SendMsg aborts
78	// the stream. If the error was generated by the client, the status is
79	// returned directly; otherwise, io.EOF is returned and the status of
80	// the stream may be discovered using RecvMsg.
81	//
82	// SendMsg blocks until:
83	//   - There is sufficient flow control to schedule m with the transport, or
84	//   - The stream is done, or
85	//   - The stream breaks.
86	//
87	// SendMsg does not wait until the message is received by the server. An
88	// untimely stream closure may result in lost messages. To ensure delivery,
89	// users should ensure the RPC completed successfully using RecvMsg.
90	//
91	// It is safe to have a goroutine calling SendMsg and another goroutine
92	// calling RecvMsg on the same stream at the same time, but it is not safe
93	// to call SendMsg on the same stream in different goroutines. It is also
94	// not safe to call CloseSend concurrently with SendMsg.
95	SendMsg(m interface{}) error
96	// RecvMsg blocks until it receives a message into m or the stream is
97	// done. It returns io.EOF when the stream completes successfully. On
98	// any other error, the stream is aborted and the error contains the RPC
99	// status.
100	//
101	// It is safe to have a goroutine calling SendMsg and another goroutine
102	// calling RecvMsg on the same stream at the same time, but it is not
103	// safe to call RecvMsg on the same stream in different goroutines.
104	RecvMsg(m interface{}) error
105}
106
107// ClientInterceptor is an interceptor for gRPC client streams.
108type ClientInterceptor interface {
109	// NewStream produces a ClientStream for an RPC which may optionally use
110	// the provided function to produce a stream for delegation.  Note:
111	// RPCInfo.Context should not be used (will be nil).
112	//
113	// done is invoked when the RPC is finished using its connection, or could
114	// not be assigned a connection.  RPC operations may still occur on
115	// ClientStream after done is called, since the interceptor is invoked by
116	// application-layer operations.  done must never be nil when called.
117	NewStream(ctx context.Context, ri RPCInfo, done func(), newStream func(ctx context.Context, done func()) (ClientStream, error)) (ClientStream, error)
118}
119
120// ServerInterceptor is an interceptor for incoming RPC's on gRPC server side.
121type ServerInterceptor interface {
122	// AllowRPC checks if an incoming RPC is allowed to proceed based on
123	// information about connection RPC was received on, and HTTP Headers. This
124	// information will be piped into context.
125	AllowRPC(ctx context.Context) error // TODO: Make this a real interceptor for filters such as rate limiting.
126}
127
128type csKeyType string
129
130const csKey = csKeyType("grpc.internal.resolver.configSelector")
131
132// SetConfigSelector sets the config selector in state and returns the new
133// state.
134func SetConfigSelector(state resolver.State, cs ConfigSelector) resolver.State {
135	state.Attributes = state.Attributes.WithValue(csKey, cs)
136	return state
137}
138
139// GetConfigSelector retrieves the config selector from state, if present, and
140// returns it or nil if absent.
141func GetConfigSelector(state resolver.State) ConfigSelector {
142	cs, _ := state.Attributes.Value(csKey).(ConfigSelector)
143	return cs
144}
145
146// SafeConfigSelector allows for safe switching of ConfigSelector
147// implementations such that previous values are guaranteed to not be in use
148// when UpdateConfigSelector returns.
149type SafeConfigSelector struct {
150	mu sync.RWMutex
151	cs ConfigSelector
152}
153
154// UpdateConfigSelector swaps to the provided ConfigSelector and blocks until
155// all uses of the previous ConfigSelector have completed.
156func (scs *SafeConfigSelector) UpdateConfigSelector(cs ConfigSelector) {
157	scs.mu.Lock()
158	defer scs.mu.Unlock()
159	scs.cs = cs
160}
161
162// SelectConfig defers to the current ConfigSelector in scs.
163func (scs *SafeConfigSelector) SelectConfig(r RPCInfo) (*RPCConfig, error) {
164	scs.mu.RLock()
165	defer scs.mu.RUnlock()
166	return scs.cs.SelectConfig(r)
167}
168