1// Copyright 2018 Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package components
16
17import (
18	"fmt"
19	"net"
20	"strings"
21	"sync"
22	"time"
23
24	"golang.org/x/time/rate"
25	"google.golang.org/grpc"
26	"google.golang.org/grpc/keepalive"
27	grpcMetadata "google.golang.org/grpc/metadata"
28
29	mcp "istio.io/api/mcp/v1alpha1"
30
31	"istio.io/pkg/ctrlz/fw"
32	"istio.io/pkg/log"
33	"istio.io/pkg/version"
34
35	"istio.io/istio/galley/pkg/config/analysis/analyzers"
36	"istio.io/istio/galley/pkg/config/processing"
37	"istio.io/istio/galley/pkg/config/processing/snapshotter"
38	"istio.io/istio/galley/pkg/config/processor"
39	"istio.io/istio/galley/pkg/config/processor/groups"
40	"istio.io/istio/galley/pkg/config/processor/transforms"
41	"istio.io/istio/galley/pkg/config/source/kube"
42	"istio.io/istio/galley/pkg/config/source/kube/apiserver"
43	"istio.io/istio/galley/pkg/config/source/kube/apiserver/status"
44	"istio.io/istio/galley/pkg/config/util/kuberesource"
45	"istio.io/istio/galley/pkg/envvar"
46	"istio.io/istio/galley/pkg/server/process"
47	"istio.io/istio/galley/pkg/server/settings"
48	"istio.io/istio/pkg/config/event"
49	"istio.io/istio/pkg/config/schema"
50	"istio.io/istio/pkg/config/schema/collection"
51	"istio.io/istio/pkg/config/schema/snapshots"
52	configz "istio.io/istio/pkg/mcp/configz/server"
53	"istio.io/istio/pkg/mcp/creds"
54	"istio.io/istio/pkg/mcp/monitoring"
55	mcprate "istio.io/istio/pkg/mcp/rate"
56	"istio.io/istio/pkg/mcp/server"
57	"istio.io/istio/pkg/mcp/snapshot"
58	"istio.io/istio/pkg/mcp/source"
59)
60
61const versionMetadataKey = "config.source.version"
62
63// Processing component is the main config processing component that will listen to a config source and publish
64// resources through an MCP server, or a dialout connection.
65type Processing struct {
66	args *settings.Args
67
68	mcpCache     *snapshot.Cache
69	configzTopic fw.Topic
70
71	k kube.Interfaces
72
73	serveWG       sync.WaitGroup
74	grpcServer    *grpc.Server
75	runtime       *processing.Runtime
76	mcpSource     *source.Server
77	reporter      monitoring.Reporter
78	callOut       *callout
79	listenerMutex sync.Mutex
80	listener      net.Listener
81	stopCh        chan struct{}
82}
83
84var _ process.Component = &Processing{}
85
86// NewProcessing returns a new processing component.
87func NewProcessing(a *settings.Args) *Processing {
88	mcpCache := snapshot.New(groups.IndexFunction)
89	return &Processing{
90		args:         a,
91		mcpCache:     mcpCache,
92		configzTopic: configz.CreateTopic(mcpCache),
93	}
94}
95
96// Start implements process.Component
97func (p *Processing) Start() (err error) {
98	var mesh event.Source
99	var src event.Source
100	var updater snapshotter.StatusUpdater
101
102	if mesh, err = meshcfgNewFS(p.args.MeshConfigFile); err != nil {
103		return
104	}
105
106	m := schema.MustGet()
107
108	transformProviders := transforms.Providers(m)
109
110	// Disable any unnecessary resources, including resources not in configured snapshots
111	var colsInSnapshots collection.Names
112	for _, c := range m.AllCollectionsInSnapshots(p.args.Snapshots) {
113		colsInSnapshots = append(colsInSnapshots, collection.NewName(c))
114	}
115	kubeResources := kuberesource.DisableExcludedCollections(m.KubeCollections(), transformProviders,
116		colsInSnapshots, p.args.ExcludedResourceKinds, p.args.EnableServiceDiscovery)
117
118	if src, updater, err = p.createSourceAndStatusUpdater(kubeResources); err != nil {
119		return
120	}
121
122	var distributor snapshotter.Distributor = snapshotter.NewMCPDistributor(p.mcpCache)
123
124	if p.args.EnableConfigAnalysis {
125		combinedAnalyzer := analyzers.AllCombined()
126		combinedAnalyzer.RemoveSkipped(colsInSnapshots, kubeResources.DisabledCollectionNames(), transformProviders)
127
128		distributor = snapshotter.NewAnalyzingDistributor(snapshotter.AnalyzingDistributorSettings{
129			StatusUpdater:     updater,
130			Analyzer:          combinedAnalyzer,
131			Distributor:       distributor,
132			AnalysisSnapshots: p.args.Snapshots,
133			TriggerSnapshot:   p.args.TriggerSnapshot,
134		})
135	}
136
137	processorSettings := processor.Settings{
138		Metadata:           m,
139		DomainSuffix:       p.args.DomainSuffix,
140		Source:             event.CombineSources(mesh, src),
141		TransformProviders: transformProviders,
142		Distributor:        distributor,
143		EnabledSnapshots:   p.args.Snapshots,
144	}
145	if p.runtime, err = processorInitialize(processorSettings); err != nil {
146		return
147	}
148
149	grpcOptions := p.getServerGrpcOptions()
150
151	p.stopCh = make(chan struct{})
152	var checker source.AuthChecker = server.NewAllowAllChecker()
153	if !p.args.Insecure {
154		if checker, err = watchAccessList(p.stopCh, p.args.AccessListFile); err != nil {
155			return
156		}
157
158		var watcher creds.CertificateWatcher
159		if watcher, err = creds.PollFiles(p.stopCh, p.args.CredentialOptions); err != nil {
160			return
161		}
162		credentials := creds.CreateForServer(watcher)
163
164		grpcOptions = append(grpcOptions, grpc.Creds(credentials))
165	}
166	grpc.EnableTracing = p.args.EnableGRPCTracing
167	p.grpcServer = grpc.NewServer(grpcOptions...)
168
169	p.reporter = mcpMetricReporter("galley")
170
171	mcpSourceRateLimiter := mcprate.NewRateLimiter(envvar.MCPSourceReqFreq.Get(), envvar.MCPSourceReqBurstSize.Get())
172	options := &source.Options{
173		Watcher:            p.mcpCache,
174		Reporter:           p.reporter,
175		CollectionsOptions: source.CollectionOptionsFromSlice(m.AllCollectionsInSnapshots(snapshots.SnapshotNames())),
176		ConnRateLimiter:    mcpSourceRateLimiter,
177	}
178
179	// set incremental flag of all collections to true when incremental mcp enabled
180	if envvar.EnableIncrementalMCP.Get() {
181		for i := range options.CollectionsOptions {
182			options.CollectionsOptions[i].Incremental = true
183		}
184	}
185
186	md := grpcMetadata.MD{
187		versionMetadataKey: []string{version.Info.Version},
188	}
189	if err = parseSinkMeta(p.args.SinkMeta, md); err != nil {
190		return
191	}
192
193	if p.args.SinkAddress != "" {
194		p.callOut, err = newCallout(p.args.SinkAddress, p.args.SinkAuthMode, md, options)
195		if err != nil {
196			p.callOut = nil
197			err = fmt.Errorf("callout could not be initialized: %v", err)
198			return
199		}
200	}
201
202	sourceServerRateLimiter := rate.NewLimiter(rate.Every(envvar.SourceServerStreamFreq.Get()), envvar.SourceServerStreamBurstSize.Get())
203	serverOptions := &source.ServerOptions{
204		AuthChecker: checker,
205		RateLimiter: sourceServerRateLimiter,
206		Metadata:    md,
207	}
208
209	p.mcpSource = source.NewServer(options, serverOptions)
210
211	// get the network stuff setup
212	network := "tcp"
213	var address string
214	idx := strings.Index(p.args.APIAddress, "://")
215	if idx < 0 {
216		address = p.args.APIAddress
217	} else {
218		network = p.args.APIAddress[:idx]
219		address = p.args.APIAddress[idx+3:]
220	}
221
222	if p.listener, err = netListen(network, address); err != nil {
223		err = fmt.Errorf("unable to listen: %v", err)
224		return
225	}
226
227	mcp.RegisterResourceSourceServer(p.grpcServer, p.mcpSource)
228
229	var startWG sync.WaitGroup
230	startWG.Add(1)
231
232	p.serveWG.Add(1)
233	go func() {
234		defer p.serveWG.Done()
235		p.runtime.Start()
236
237		l := p.getListener()
238		if l != nil && p.args.EnableServer {
239			// start serving
240			gs := p.grpcServer
241			startWG.Done()
242			err = gs.Serve(l)
243			if err != nil {
244				scope.Errorf("Galley Server unexpectedly terminated: %v", err)
245			}
246		}
247	}()
248
249	if p.callOut != nil {
250		p.serveWG.Add(1)
251		go func() {
252			defer p.serveWG.Done()
253			p.callOut.run()
254		}()
255	}
256
257	if p.args.EnableServer {
258		startWG.Wait()
259	}
260
261	return nil
262}
263
264// ConfigZTopic returns the ConfigZTopic for the processor.
265func (p *Processing) ConfigZTopic() fw.Topic {
266	return p.configzTopic
267}
268
269func (p *Processing) getServerGrpcOptions() []grpc.ServerOption {
270	var grpcOptions []grpc.ServerOption
271	grpcOptions = append(grpcOptions,
272		grpc.MaxConcurrentStreams(uint32(p.args.MaxConcurrentStreams)),
273		grpc.MaxRecvMsgSize(int(p.args.MaxReceivedMessageSize)),
274		grpc.InitialWindowSize(int32(p.args.InitialWindowSize)),
275		grpc.InitialConnWindowSize(int32(p.args.InitialConnectionWindowSize)),
276		grpc.KeepaliveParams(keepalive.ServerParameters{
277			Timeout:               p.args.KeepAlive.Timeout,
278			Time:                  p.args.KeepAlive.Time,
279			MaxConnectionAge:      p.args.KeepAlive.MaxServerConnectionAge,
280			MaxConnectionAgeGrace: p.args.KeepAlive.MaxServerConnectionAgeGrace,
281		}),
282		// Relax keepalive enforcement policy requirements to avoid dropping connections due to too many pings.
283		grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
284			MinTime:             30 * time.Second,
285			PermitWithoutStream: true,
286		}),
287	)
288
289	return grpcOptions
290}
291
292func (p *Processing) getKubeInterfaces() (k kube.Interfaces, err error) {
293	if p.args.KubeRestConfig != nil {
294		return kube.NewInterfaces(p.args.KubeRestConfig), nil
295	}
296	if p.k == nil {
297		p.k, err = newInterfaces(p.args.KubeConfig)
298	}
299	k = p.k
300	return
301}
302
303func (p *Processing) createSourceAndStatusUpdater(schemas collection.Schemas) (
304	src event.Source, updater snapshotter.StatusUpdater, err error) {
305
306	if p.args.ConfigPath != "" {
307		if src, err = fsNew(p.args.ConfigPath, schemas, p.args.WatchConfigFiles); err != nil {
308			return
309		}
310		updater = &snapshotter.InMemoryStatusUpdater{}
311	} else {
312		var k kube.Interfaces
313		if k, err = p.getKubeInterfaces(); err != nil {
314			return
315		}
316
317		var statusCtl status.Controller
318		if p.args.EnableConfigAnalysis {
319			statusCtl = status.NewController("validationMessages")
320		}
321
322		o := apiserver.Options{
323			Client:           k,
324			ResyncPeriod:     p.args.ResyncPeriod,
325			Schemas:          schemas,
326			StatusController: statusCtl,
327		}
328		s := apiserver.New(o)
329		src = s
330		updater = s
331	}
332	return
333}
334
335// Stop implements process.Component
336func (p *Processing) Stop() {
337	if p.stopCh != nil {
338		close(p.stopCh)
339		p.stopCh = nil
340	}
341
342	if p.grpcServer != nil {
343		p.grpcServer.GracefulStop()
344		p.grpcServer = nil
345	}
346
347	if p.runtime != nil {
348		p.runtime.Stop()
349		p.runtime = nil
350	}
351
352	p.listenerMutex.Lock()
353	if p.listener != nil {
354		_ = p.listener.Close()
355		p.listener = nil
356	}
357	p.listenerMutex.Unlock()
358
359	if p.reporter != nil {
360		_ = p.reporter.Close()
361		p.reporter = nil
362	}
363
364	if p.callOut != nil {
365		p.callOut.stop()
366		p.callOut = nil
367	}
368
369	if p.grpcServer != nil || p.callOut != nil {
370		p.serveWG.Wait()
371	}
372
373	// final attempt to purge buffered logs
374	_ = log.Sync()
375}
376
377func (p *Processing) getListener() net.Listener {
378	p.listenerMutex.Lock()
379	defer p.listenerMutex.Unlock()
380	return p.listener
381}
382
383// Address returns the Address of the MCP service.
384func (p *Processing) Address() net.Addr {
385	l := p.getListener()
386	if l == nil {
387		return nil
388	}
389	return l.Addr()
390}
391
392func parseSinkMeta(pairs []string, md grpcMetadata.MD) error {
393	for _, p := range pairs {
394		kv := strings.Split(p, "=")
395		if len(kv) != 2 || kv[0] == "" || kv[1] == "" {
396			return fmt.Errorf("sinkMeta not in key=value format: %v", p)
397		}
398		md[kv[0]] = append(md[kv[0]], kv[1])
399	}
400	return nil
401}
402