1// Copyright 2018 Istio Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package components 16 17import ( 18 "fmt" 19 "net" 20 "strings" 21 "sync" 22 "time" 23 24 "golang.org/x/time/rate" 25 "google.golang.org/grpc" 26 "google.golang.org/grpc/keepalive" 27 grpcMetadata "google.golang.org/grpc/metadata" 28 29 mcp "istio.io/api/mcp/v1alpha1" 30 31 "istio.io/pkg/ctrlz/fw" 32 "istio.io/pkg/log" 33 "istio.io/pkg/version" 34 35 "istio.io/istio/galley/pkg/config/analysis/analyzers" 36 "istio.io/istio/galley/pkg/config/processing" 37 "istio.io/istio/galley/pkg/config/processing/snapshotter" 38 "istio.io/istio/galley/pkg/config/processor" 39 "istio.io/istio/galley/pkg/config/processor/groups" 40 "istio.io/istio/galley/pkg/config/processor/transforms" 41 "istio.io/istio/galley/pkg/config/source/kube" 42 "istio.io/istio/galley/pkg/config/source/kube/apiserver" 43 "istio.io/istio/galley/pkg/config/source/kube/apiserver/status" 44 "istio.io/istio/galley/pkg/config/util/kuberesource" 45 "istio.io/istio/galley/pkg/envvar" 46 "istio.io/istio/galley/pkg/server/process" 47 "istio.io/istio/galley/pkg/server/settings" 48 "istio.io/istio/pkg/config/event" 49 "istio.io/istio/pkg/config/schema" 50 "istio.io/istio/pkg/config/schema/collection" 51 "istio.io/istio/pkg/config/schema/snapshots" 52 configz "istio.io/istio/pkg/mcp/configz/server" 53 "istio.io/istio/pkg/mcp/creds" 54 "istio.io/istio/pkg/mcp/monitoring" 55 mcprate "istio.io/istio/pkg/mcp/rate" 56 "istio.io/istio/pkg/mcp/server" 57 "istio.io/istio/pkg/mcp/snapshot" 58 "istio.io/istio/pkg/mcp/source" 59) 60 61const versionMetadataKey = "config.source.version" 62 63// Processing component is the main config processing component that will listen to a config source and publish 64// resources through an MCP server, or a dialout connection. 65type Processing struct { 66 args *settings.Args 67 68 mcpCache *snapshot.Cache 69 configzTopic fw.Topic 70 71 k kube.Interfaces 72 73 serveWG sync.WaitGroup 74 grpcServer *grpc.Server 75 runtime *processing.Runtime 76 mcpSource *source.Server 77 reporter monitoring.Reporter 78 callOut *callout 79 listenerMutex sync.Mutex 80 listener net.Listener 81 stopCh chan struct{} 82} 83 84var _ process.Component = &Processing{} 85 86// NewProcessing returns a new processing component. 87func NewProcessing(a *settings.Args) *Processing { 88 mcpCache := snapshot.New(groups.IndexFunction) 89 return &Processing{ 90 args: a, 91 mcpCache: mcpCache, 92 configzTopic: configz.CreateTopic(mcpCache), 93 } 94} 95 96// Start implements process.Component 97func (p *Processing) Start() (err error) { 98 var mesh event.Source 99 var src event.Source 100 var updater snapshotter.StatusUpdater 101 102 if mesh, err = meshcfgNewFS(p.args.MeshConfigFile); err != nil { 103 return 104 } 105 106 m := schema.MustGet() 107 108 transformProviders := transforms.Providers(m) 109 110 // Disable any unnecessary resources, including resources not in configured snapshots 111 var colsInSnapshots collection.Names 112 for _, c := range m.AllCollectionsInSnapshots(p.args.Snapshots) { 113 colsInSnapshots = append(colsInSnapshots, collection.NewName(c)) 114 } 115 kubeResources := kuberesource.DisableExcludedCollections(m.KubeCollections(), transformProviders, 116 colsInSnapshots, p.args.ExcludedResourceKinds, p.args.EnableServiceDiscovery) 117 118 if src, updater, err = p.createSourceAndStatusUpdater(kubeResources); err != nil { 119 return 120 } 121 122 var distributor snapshotter.Distributor = snapshotter.NewMCPDistributor(p.mcpCache) 123 124 if p.args.EnableConfigAnalysis { 125 combinedAnalyzer := analyzers.AllCombined() 126 combinedAnalyzer.RemoveSkipped(colsInSnapshots, kubeResources.DisabledCollectionNames(), transformProviders) 127 128 distributor = snapshotter.NewAnalyzingDistributor(snapshotter.AnalyzingDistributorSettings{ 129 StatusUpdater: updater, 130 Analyzer: combinedAnalyzer, 131 Distributor: distributor, 132 AnalysisSnapshots: p.args.Snapshots, 133 TriggerSnapshot: p.args.TriggerSnapshot, 134 }) 135 } 136 137 processorSettings := processor.Settings{ 138 Metadata: m, 139 DomainSuffix: p.args.DomainSuffix, 140 Source: event.CombineSources(mesh, src), 141 TransformProviders: transformProviders, 142 Distributor: distributor, 143 EnabledSnapshots: p.args.Snapshots, 144 } 145 if p.runtime, err = processorInitialize(processorSettings); err != nil { 146 return 147 } 148 149 grpcOptions := p.getServerGrpcOptions() 150 151 p.stopCh = make(chan struct{}) 152 var checker source.AuthChecker = server.NewAllowAllChecker() 153 if !p.args.Insecure { 154 if checker, err = watchAccessList(p.stopCh, p.args.AccessListFile); err != nil { 155 return 156 } 157 158 var watcher creds.CertificateWatcher 159 if watcher, err = creds.PollFiles(p.stopCh, p.args.CredentialOptions); err != nil { 160 return 161 } 162 credentials := creds.CreateForServer(watcher) 163 164 grpcOptions = append(grpcOptions, grpc.Creds(credentials)) 165 } 166 grpc.EnableTracing = p.args.EnableGRPCTracing 167 p.grpcServer = grpc.NewServer(grpcOptions...) 168 169 p.reporter = mcpMetricReporter("galley") 170 171 mcpSourceRateLimiter := mcprate.NewRateLimiter(envvar.MCPSourceReqFreq.Get(), envvar.MCPSourceReqBurstSize.Get()) 172 options := &source.Options{ 173 Watcher: p.mcpCache, 174 Reporter: p.reporter, 175 CollectionsOptions: source.CollectionOptionsFromSlice(m.AllCollectionsInSnapshots(snapshots.SnapshotNames())), 176 ConnRateLimiter: mcpSourceRateLimiter, 177 } 178 179 // set incremental flag of all collections to true when incremental mcp enabled 180 if envvar.EnableIncrementalMCP.Get() { 181 for i := range options.CollectionsOptions { 182 options.CollectionsOptions[i].Incremental = true 183 } 184 } 185 186 md := grpcMetadata.MD{ 187 versionMetadataKey: []string{version.Info.Version}, 188 } 189 if err = parseSinkMeta(p.args.SinkMeta, md); err != nil { 190 return 191 } 192 193 if p.args.SinkAddress != "" { 194 p.callOut, err = newCallout(p.args.SinkAddress, p.args.SinkAuthMode, md, options) 195 if err != nil { 196 p.callOut = nil 197 err = fmt.Errorf("callout could not be initialized: %v", err) 198 return 199 } 200 } 201 202 sourceServerRateLimiter := rate.NewLimiter(rate.Every(envvar.SourceServerStreamFreq.Get()), envvar.SourceServerStreamBurstSize.Get()) 203 serverOptions := &source.ServerOptions{ 204 AuthChecker: checker, 205 RateLimiter: sourceServerRateLimiter, 206 Metadata: md, 207 } 208 209 p.mcpSource = source.NewServer(options, serverOptions) 210 211 // get the network stuff setup 212 network := "tcp" 213 var address string 214 idx := strings.Index(p.args.APIAddress, "://") 215 if idx < 0 { 216 address = p.args.APIAddress 217 } else { 218 network = p.args.APIAddress[:idx] 219 address = p.args.APIAddress[idx+3:] 220 } 221 222 if p.listener, err = netListen(network, address); err != nil { 223 err = fmt.Errorf("unable to listen: %v", err) 224 return 225 } 226 227 mcp.RegisterResourceSourceServer(p.grpcServer, p.mcpSource) 228 229 var startWG sync.WaitGroup 230 startWG.Add(1) 231 232 p.serveWG.Add(1) 233 go func() { 234 defer p.serveWG.Done() 235 p.runtime.Start() 236 237 l := p.getListener() 238 if l != nil && p.args.EnableServer { 239 // start serving 240 gs := p.grpcServer 241 startWG.Done() 242 err = gs.Serve(l) 243 if err != nil { 244 scope.Errorf("Galley Server unexpectedly terminated: %v", err) 245 } 246 } 247 }() 248 249 if p.callOut != nil { 250 p.serveWG.Add(1) 251 go func() { 252 defer p.serveWG.Done() 253 p.callOut.run() 254 }() 255 } 256 257 if p.args.EnableServer { 258 startWG.Wait() 259 } 260 261 return nil 262} 263 264// ConfigZTopic returns the ConfigZTopic for the processor. 265func (p *Processing) ConfigZTopic() fw.Topic { 266 return p.configzTopic 267} 268 269func (p *Processing) getServerGrpcOptions() []grpc.ServerOption { 270 var grpcOptions []grpc.ServerOption 271 grpcOptions = append(grpcOptions, 272 grpc.MaxConcurrentStreams(uint32(p.args.MaxConcurrentStreams)), 273 grpc.MaxRecvMsgSize(int(p.args.MaxReceivedMessageSize)), 274 grpc.InitialWindowSize(int32(p.args.InitialWindowSize)), 275 grpc.InitialConnWindowSize(int32(p.args.InitialConnectionWindowSize)), 276 grpc.KeepaliveParams(keepalive.ServerParameters{ 277 Timeout: p.args.KeepAlive.Timeout, 278 Time: p.args.KeepAlive.Time, 279 MaxConnectionAge: p.args.KeepAlive.MaxServerConnectionAge, 280 MaxConnectionAgeGrace: p.args.KeepAlive.MaxServerConnectionAgeGrace, 281 }), 282 // Relax keepalive enforcement policy requirements to avoid dropping connections due to too many pings. 283 grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ 284 MinTime: 30 * time.Second, 285 PermitWithoutStream: true, 286 }), 287 ) 288 289 return grpcOptions 290} 291 292func (p *Processing) getKubeInterfaces() (k kube.Interfaces, err error) { 293 if p.args.KubeRestConfig != nil { 294 return kube.NewInterfaces(p.args.KubeRestConfig), nil 295 } 296 if p.k == nil { 297 p.k, err = newInterfaces(p.args.KubeConfig) 298 } 299 k = p.k 300 return 301} 302 303func (p *Processing) createSourceAndStatusUpdater(schemas collection.Schemas) ( 304 src event.Source, updater snapshotter.StatusUpdater, err error) { 305 306 if p.args.ConfigPath != "" { 307 if src, err = fsNew(p.args.ConfigPath, schemas, p.args.WatchConfigFiles); err != nil { 308 return 309 } 310 updater = &snapshotter.InMemoryStatusUpdater{} 311 } else { 312 var k kube.Interfaces 313 if k, err = p.getKubeInterfaces(); err != nil { 314 return 315 } 316 317 var statusCtl status.Controller 318 if p.args.EnableConfigAnalysis { 319 statusCtl = status.NewController("validationMessages") 320 } 321 322 o := apiserver.Options{ 323 Client: k, 324 ResyncPeriod: p.args.ResyncPeriod, 325 Schemas: schemas, 326 StatusController: statusCtl, 327 } 328 s := apiserver.New(o) 329 src = s 330 updater = s 331 } 332 return 333} 334 335// Stop implements process.Component 336func (p *Processing) Stop() { 337 if p.stopCh != nil { 338 close(p.stopCh) 339 p.stopCh = nil 340 } 341 342 if p.grpcServer != nil { 343 p.grpcServer.GracefulStop() 344 p.grpcServer = nil 345 } 346 347 if p.runtime != nil { 348 p.runtime.Stop() 349 p.runtime = nil 350 } 351 352 p.listenerMutex.Lock() 353 if p.listener != nil { 354 _ = p.listener.Close() 355 p.listener = nil 356 } 357 p.listenerMutex.Unlock() 358 359 if p.reporter != nil { 360 _ = p.reporter.Close() 361 p.reporter = nil 362 } 363 364 if p.callOut != nil { 365 p.callOut.stop() 366 p.callOut = nil 367 } 368 369 if p.grpcServer != nil || p.callOut != nil { 370 p.serveWG.Wait() 371 } 372 373 // final attempt to purge buffered logs 374 _ = log.Sync() 375} 376 377func (p *Processing) getListener() net.Listener { 378 p.listenerMutex.Lock() 379 defer p.listenerMutex.Unlock() 380 return p.listener 381} 382 383// Address returns the Address of the MCP service. 384func (p *Processing) Address() net.Addr { 385 l := p.getListener() 386 if l == nil { 387 return nil 388 } 389 return l.Addr() 390} 391 392func parseSinkMeta(pairs []string, md grpcMetadata.MD) error { 393 for _, p := range pairs { 394 kv := strings.Split(p, "=") 395 if len(kv) != 2 || kv[0] == "" || kv[1] == "" { 396 return fmt.Errorf("sinkMeta not in key=value format: %v", p) 397 } 398 md[kv[0]] = append(md[kv[0]], kv[1]) 399 } 400 return nil 401} 402