1/* 2Copyright 2017 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package ipvs 18 19import ( 20 "bytes" 21 "errors" 22 "fmt" 23 "io" 24 "io/ioutil" 25 "net" 26 "os" 27 "reflect" 28 "regexp" 29 "strconv" 30 "strings" 31 "sync" 32 "sync/atomic" 33 "time" 34 35 "k8s.io/klog/v2" 36 utilexec "k8s.io/utils/exec" 37 utilnet "k8s.io/utils/net" 38 39 v1 "k8s.io/api/core/v1" 40 discovery "k8s.io/api/discovery/v1" 41 "k8s.io/apimachinery/pkg/types" 42 "k8s.io/apimachinery/pkg/util/sets" 43 "k8s.io/apimachinery/pkg/util/version" 44 "k8s.io/apimachinery/pkg/util/wait" 45 utilfeature "k8s.io/apiserver/pkg/util/feature" 46 "k8s.io/client-go/tools/events" 47 "k8s.io/kubernetes/pkg/features" 48 "k8s.io/kubernetes/pkg/proxy" 49 "k8s.io/kubernetes/pkg/proxy/healthcheck" 50 "k8s.io/kubernetes/pkg/proxy/metaproxier" 51 "k8s.io/kubernetes/pkg/proxy/metrics" 52 utilproxy "k8s.io/kubernetes/pkg/proxy/util" 53 proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" 54 "k8s.io/kubernetes/pkg/util/async" 55 "k8s.io/kubernetes/pkg/util/conntrack" 56 utilipset "k8s.io/kubernetes/pkg/util/ipset" 57 utiliptables "k8s.io/kubernetes/pkg/util/iptables" 58 utilipvs "k8s.io/kubernetes/pkg/util/ipvs" 59 utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" 60) 61 62const ( 63 // kubeServicesChain is the services portal chain 64 kubeServicesChain utiliptables.Chain = "KUBE-SERVICES" 65 66 // KubeFireWallChain is the kubernetes firewall chain. 67 KubeFireWallChain utiliptables.Chain = "KUBE-FIREWALL" 68 69 // kubePostroutingChain is the kubernetes postrouting chain 70 kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING" 71 72 // KubeMarkMasqChain is the mark-for-masquerade chain 73 KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ" 74 75 // KubeNodePortChain is the kubernetes node port chain 76 KubeNodePortChain utiliptables.Chain = "KUBE-NODE-PORT" 77 78 // KubeMarkDropChain is the mark-for-drop chain 79 KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP" 80 81 // KubeForwardChain is the kubernetes forward chain 82 KubeForwardChain utiliptables.Chain = "KUBE-FORWARD" 83 84 // KubeLoadBalancerChain is the kubernetes chain for loadbalancer type service 85 KubeLoadBalancerChain utiliptables.Chain = "KUBE-LOAD-BALANCER" 86 87 // DefaultScheduler is the default ipvs scheduler algorithm - round robin. 88 DefaultScheduler = "rr" 89 90 // DefaultDummyDevice is the default dummy interface which ipvs service address will bind to it. 91 DefaultDummyDevice = "kube-ipvs0" 92 93 connReuseMinSupportedKernelVersion = "4.1" 94 95 // https://github.com/torvalds/linux/commit/35dfb013149f74c2be1ff9c78f14e6a3cd1539d1 96 connReuseFixedKernelVersion = "5.9" 97) 98 99// iptablesJumpChain is tables of iptables chains that ipvs proxier used to install iptables or cleanup iptables. 100// `to` is the iptables chain we want to operate. 101// `from` is the source iptables chain 102var iptablesJumpChain = []struct { 103 table utiliptables.Table 104 from utiliptables.Chain 105 to utiliptables.Chain 106 comment string 107}{ 108 {utiliptables.TableNAT, utiliptables.ChainOutput, kubeServicesChain, "kubernetes service portals"}, 109 {utiliptables.TableNAT, utiliptables.ChainPrerouting, kubeServicesChain, "kubernetes service portals"}, 110 {utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, "kubernetes postrouting rules"}, 111 {utiliptables.TableFilter, utiliptables.ChainForward, KubeForwardChain, "kubernetes forwarding rules"}, 112 {utiliptables.TableFilter, utiliptables.ChainInput, KubeNodePortChain, "kubernetes health check rules"}, 113} 114 115var iptablesChains = []struct { 116 table utiliptables.Table 117 chain utiliptables.Chain 118}{ 119 {utiliptables.TableNAT, kubeServicesChain}, 120 {utiliptables.TableNAT, kubePostroutingChain}, 121 {utiliptables.TableNAT, KubeFireWallChain}, 122 {utiliptables.TableNAT, KubeNodePortChain}, 123 {utiliptables.TableNAT, KubeLoadBalancerChain}, 124 {utiliptables.TableNAT, KubeMarkMasqChain}, 125 {utiliptables.TableFilter, KubeForwardChain}, 126 {utiliptables.TableFilter, KubeNodePortChain}, 127} 128 129var iptablesEnsureChains = []struct { 130 table utiliptables.Table 131 chain utiliptables.Chain 132}{ 133 {utiliptables.TableNAT, KubeMarkDropChain}, 134} 135 136var iptablesCleanupChains = []struct { 137 table utiliptables.Table 138 chain utiliptables.Chain 139}{ 140 {utiliptables.TableNAT, kubeServicesChain}, 141 {utiliptables.TableNAT, kubePostroutingChain}, 142 {utiliptables.TableNAT, KubeFireWallChain}, 143 {utiliptables.TableNAT, KubeNodePortChain}, 144 {utiliptables.TableNAT, KubeLoadBalancerChain}, 145 {utiliptables.TableFilter, KubeForwardChain}, 146} 147 148// ipsetInfo is all ipset we needed in ipvs proxier 149var ipsetInfo = []struct { 150 name string 151 setType utilipset.Type 152 comment string 153}{ 154 {kubeLoopBackIPSet, utilipset.HashIPPortIP, kubeLoopBackIPSetComment}, 155 {kubeClusterIPSet, utilipset.HashIPPort, kubeClusterIPSetComment}, 156 {kubeExternalIPSet, utilipset.HashIPPort, kubeExternalIPSetComment}, 157 {kubeExternalIPLocalSet, utilipset.HashIPPort, kubeExternalIPLocalSetComment}, 158 {kubeLoadBalancerSet, utilipset.HashIPPort, kubeLoadBalancerSetComment}, 159 {kubeLoadbalancerFWSet, utilipset.HashIPPort, kubeLoadbalancerFWSetComment}, 160 {kubeLoadBalancerLocalSet, utilipset.HashIPPort, kubeLoadBalancerLocalSetComment}, 161 {kubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, kubeLoadBalancerSourceIPSetComment}, 162 {kubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, kubeLoadBalancerSourceCIDRSetComment}, 163 {kubeNodePortSetTCP, utilipset.BitmapPort, kubeNodePortSetTCPComment}, 164 {kubeNodePortLocalSetTCP, utilipset.BitmapPort, kubeNodePortLocalSetTCPComment}, 165 {kubeNodePortSetUDP, utilipset.BitmapPort, kubeNodePortSetUDPComment}, 166 {kubeNodePortLocalSetUDP, utilipset.BitmapPort, kubeNodePortLocalSetUDPComment}, 167 {kubeNodePortSetSCTP, utilipset.HashIPPort, kubeNodePortSetSCTPComment}, 168 {kubeNodePortLocalSetSCTP, utilipset.HashIPPort, kubeNodePortLocalSetSCTPComment}, 169 {kubeHealthCheckNodePortSet, utilipset.BitmapPort, kubeHealthCheckNodePortSetComment}, 170} 171 172// ipsetWithIptablesChain is the ipsets list with iptables source chain and the chain jump to 173// `iptables -t nat -A <from> -m set --match-set <name> <matchType> -j <to>` 174// example: iptables -t nat -A KUBE-SERVICES -m set --match-set KUBE-NODE-PORT-TCP dst -j KUBE-NODE-PORT 175// ipsets with other match rules will be created Individually. 176// Note: kubeNodePortLocalSetTCP must be prior to kubeNodePortSetTCP, the same for UDP. 177var ipsetWithIptablesChain = []struct { 178 name string 179 from string 180 to string 181 matchType string 182 protocolMatch string 183}{ 184 {kubeLoopBackIPSet, string(kubePostroutingChain), "MASQUERADE", "dst,dst,src", ""}, 185 {kubeLoadBalancerSet, string(kubeServicesChain), string(KubeLoadBalancerChain), "dst,dst", ""}, 186 {kubeLoadbalancerFWSet, string(KubeLoadBalancerChain), string(KubeFireWallChain), "dst,dst", ""}, 187 {kubeLoadBalancerSourceCIDRSet, string(KubeFireWallChain), "RETURN", "dst,dst,src", ""}, 188 {kubeLoadBalancerSourceIPSet, string(KubeFireWallChain), "RETURN", "dst,dst,src", ""}, 189 {kubeLoadBalancerLocalSet, string(KubeLoadBalancerChain), "RETURN", "dst,dst", ""}, 190 {kubeNodePortLocalSetTCP, string(KubeNodePortChain), "RETURN", "dst", utilipset.ProtocolTCP}, 191 {kubeNodePortSetTCP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", utilipset.ProtocolTCP}, 192 {kubeNodePortLocalSetUDP, string(KubeNodePortChain), "RETURN", "dst", utilipset.ProtocolUDP}, 193 {kubeNodePortSetUDP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", utilipset.ProtocolUDP}, 194 {kubeNodePortSetSCTP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst,dst", utilipset.ProtocolSCTP}, 195 {kubeNodePortLocalSetSCTP, string(KubeNodePortChain), "RETURN", "dst,dst", utilipset.ProtocolSCTP}, 196} 197 198// In IPVS proxy mode, the following flags need to be set 199const ( 200 sysctlBridgeCallIPTables = "net/bridge/bridge-nf-call-iptables" 201 sysctlVSConnTrack = "net/ipv4/vs/conntrack" 202 sysctlConnReuse = "net/ipv4/vs/conn_reuse_mode" 203 sysctlExpireNoDestConn = "net/ipv4/vs/expire_nodest_conn" 204 sysctlExpireQuiescentTemplate = "net/ipv4/vs/expire_quiescent_template" 205 sysctlForward = "net/ipv4/ip_forward" 206 sysctlArpIgnore = "net/ipv4/conf/all/arp_ignore" 207 sysctlArpAnnounce = "net/ipv4/conf/all/arp_announce" 208) 209 210// Proxier is an ipvs based proxy for connections between a localhost:lport 211// and services that provide the actual backends. 212type Proxier struct { 213 // the ipfamily on which this proxy is operating on. 214 ipFamily v1.IPFamily 215 // endpointsChanges and serviceChanges contains all changes to endpoints and 216 // services that happened since last syncProxyRules call. For a single object, 217 // changes are accumulated, i.e. previous is state from before all of them, 218 // current is state after applying all of those. 219 endpointsChanges *proxy.EndpointChangeTracker 220 serviceChanges *proxy.ServiceChangeTracker 221 222 mu sync.Mutex // protects the following fields 223 serviceMap proxy.ServiceMap 224 endpointsMap proxy.EndpointsMap 225 portsMap map[utilnet.LocalPort]utilnet.Closeable 226 nodeLabels map[string]string 227 // endpointSlicesSynced, and servicesSynced are set to true when 228 // corresponding objects are synced after startup. This is used to avoid updating 229 // ipvs rules with some partial data after kube-proxy restart. 230 endpointSlicesSynced bool 231 servicesSynced bool 232 initialized int32 233 syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules 234 235 // These are effectively const and do not need the mutex to be held. 236 syncPeriod time.Duration 237 minSyncPeriod time.Duration 238 // Values are CIDR's to exclude when cleaning up IPVS rules. 239 excludeCIDRs []*net.IPNet 240 // Set to true to set sysctls arp_ignore and arp_announce 241 strictARP bool 242 iptables utiliptables.Interface 243 ipvs utilipvs.Interface 244 ipset utilipset.Interface 245 exec utilexec.Interface 246 masqueradeAll bool 247 masqueradeMark string 248 localDetector proxyutiliptables.LocalTrafficDetector 249 hostname string 250 nodeIP net.IP 251 portMapper utilnet.PortOpener 252 recorder events.EventRecorder 253 254 serviceHealthServer healthcheck.ServiceHealthServer 255 healthzServer healthcheck.ProxierHealthUpdater 256 257 ipvsScheduler string 258 // Added as a member to the struct to allow injection for testing. 259 ipGetter IPGetter 260 // The following buffers are used to reuse memory and avoid allocations 261 // that are significantly impacting performance. 262 iptablesData *bytes.Buffer 263 filterChainsData *bytes.Buffer 264 natChains *bytes.Buffer 265 filterChains *bytes.Buffer 266 natRules *bytes.Buffer 267 filterRules *bytes.Buffer 268 // Added as a member to the struct to allow injection for testing. 269 netlinkHandle NetLinkHandle 270 // ipsetList is the list of ipsets that ipvs proxier used. 271 ipsetList map[string]*IPSet 272 // Values are as a parameter to select the interfaces which nodeport works. 273 nodePortAddresses []string 274 // networkInterfacer defines an interface for several net library functions. 275 // Inject for test purpose. 276 networkInterfacer utilproxy.NetworkInterfacer 277 gracefuldeleteManager *GracefulTerminationManager 278} 279 280// IPGetter helps get node network interface IP and IPs binded to the IPVS dummy interface 281type IPGetter interface { 282 NodeIPs() ([]net.IP, error) 283 BindedIPs() (sets.String, error) 284} 285 286// realIPGetter is a real NodeIP handler, it implements IPGetter. 287type realIPGetter struct { 288 // nl is a handle for revoking netlink interface 289 nl NetLinkHandle 290} 291 292// NodeIPs returns all LOCAL type IP addresses from host which are taken as the Node IPs of NodePort service. 293// It will list source IP exists in local route table with `kernel` protocol type, and filter out IPVS proxier 294// created dummy device `kube-ipvs0` For example, 295// $ ip route show table local type local proto kernel 296// 10.0.0.1 dev kube-ipvs0 scope host src 10.0.0.1 297// 10.0.0.10 dev kube-ipvs0 scope host src 10.0.0.10 298// 10.0.0.252 dev kube-ipvs0 scope host src 10.0.0.252 299// 100.106.89.164 dev eth0 scope host src 100.106.89.164 300// 127.0.0.0/8 dev lo scope host src 127.0.0.1 301// 127.0.0.1 dev lo scope host src 127.0.0.1 302// 172.17.0.1 dev docker0 scope host src 172.17.0.1 303// 192.168.122.1 dev virbr0 scope host src 192.168.122.1 304// Then filter out dev==kube-ipvs0, and cut the unique src IP fields, 305// Node IP set: [100.106.89.164, 172.17.0.1, 192.168.122.1] 306// Note that loopback addresses are excluded. 307func (r *realIPGetter) NodeIPs() (ips []net.IP, err error) { 308 // Pass in empty filter device name for list all LOCAL type addresses. 309 nodeAddress, err := r.nl.GetLocalAddresses("", DefaultDummyDevice) 310 if err != nil { 311 return nil, fmt.Errorf("error listing LOCAL type addresses from host, error: %v", err) 312 } 313 // translate ip string to IP 314 for _, ipStr := range nodeAddress.UnsortedList() { 315 a := net.ParseIP(ipStr) 316 if a.IsLoopback() { 317 continue 318 } 319 ips = append(ips, a) 320 } 321 return ips, nil 322} 323 324// BindedIPs returns all addresses that are binded to the IPVS dummy interface kube-ipvs0 325func (r *realIPGetter) BindedIPs() (sets.String, error) { 326 return r.nl.GetLocalAddresses(DefaultDummyDevice, "") 327} 328 329// Proxier implements proxy.Provider 330var _ proxy.Provider = &Proxier{} 331 332// NewProxier returns a new Proxier given an iptables and ipvs Interface instance. 333// Because of the iptables and ipvs logic, it is assumed that there is only a single Proxier active on a machine. 334// An error will be returned if it fails to update or acquire the initial lock. 335// Once a proxier is created, it will keep iptables and ipvs rules up to date in the background and 336// will not terminate if a particular iptables or ipvs call fails. 337func NewProxier(ipt utiliptables.Interface, 338 ipvs utilipvs.Interface, 339 ipset utilipset.Interface, 340 sysctl utilsysctl.Interface, 341 exec utilexec.Interface, 342 syncPeriod time.Duration, 343 minSyncPeriod time.Duration, 344 excludeCIDRs []string, 345 strictARP bool, 346 tcpTimeout time.Duration, 347 tcpFinTimeout time.Duration, 348 udpTimeout time.Duration, 349 masqueradeAll bool, 350 masqueradeBit int, 351 localDetector proxyutiliptables.LocalTrafficDetector, 352 hostname string, 353 nodeIP net.IP, 354 recorder events.EventRecorder, 355 healthzServer healthcheck.ProxierHealthUpdater, 356 scheduler string, 357 nodePortAddresses []string, 358 kernelHandler KernelHandler, 359) (*Proxier, error) { 360 // Proxy needs br_netfilter and bridge-nf-call-iptables=1 when containers 361 // are connected to a Linux bridge (but not SDN bridges). Until most 362 // plugins handle this, log when config is missing 363 if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 { 364 klog.InfoS("Missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended") 365 } 366 367 // Set the conntrack sysctl we need for 368 if err := utilproxy.EnsureSysctl(sysctl, sysctlVSConnTrack, 1); err != nil { 369 return nil, err 370 } 371 372 kernelVersionStr, err := kernelHandler.GetKernelVersion() 373 if err != nil { 374 return nil, fmt.Errorf("error determining kernel version to find required kernel modules for ipvs support: %v", err) 375 } 376 kernelVersion, err := version.ParseGeneric(kernelVersionStr) 377 if err != nil { 378 return nil, fmt.Errorf("error parsing kernel version %q: %v", kernelVersionStr, err) 379 } 380 if kernelVersion.LessThan(version.MustParseGeneric(connReuseMinSupportedKernelVersion)) { 381 klog.ErrorS(nil, fmt.Sprintf("can't set sysctl %s, kernel version must be at least %s", sysctlConnReuse, connReuseMinSupportedKernelVersion)) 382 } else if kernelVersion.AtLeast(version.MustParseGeneric(connReuseFixedKernelVersion)) { 383 // https://github.com/kubernetes/kubernetes/issues/93297 384 klog.V(2).InfoS("Left as-is", "sysctl", sysctlConnReuse) 385 } else { 386 // Set the connection reuse mode 387 if err := utilproxy.EnsureSysctl(sysctl, sysctlConnReuse, 0); err != nil { 388 return nil, err 389 } 390 } 391 392 // Set the expire_nodest_conn sysctl we need for 393 if err := utilproxy.EnsureSysctl(sysctl, sysctlExpireNoDestConn, 1); err != nil { 394 return nil, err 395 } 396 397 // Set the expire_quiescent_template sysctl we need for 398 if err := utilproxy.EnsureSysctl(sysctl, sysctlExpireQuiescentTemplate, 1); err != nil { 399 return nil, err 400 } 401 402 // Set the ip_forward sysctl we need for 403 if err := utilproxy.EnsureSysctl(sysctl, sysctlForward, 1); err != nil { 404 return nil, err 405 } 406 407 if strictARP { 408 // Set the arp_ignore sysctl we need for 409 if err := utilproxy.EnsureSysctl(sysctl, sysctlArpIgnore, 1); err != nil { 410 return nil, err 411 } 412 413 // Set the arp_announce sysctl we need for 414 if err := utilproxy.EnsureSysctl(sysctl, sysctlArpAnnounce, 2); err != nil { 415 return nil, err 416 } 417 } 418 419 // Configure IPVS timeouts if any one of the timeout parameters have been set. 420 // This is the equivalent to running ipvsadm --set, a value of 0 indicates the 421 // current system timeout should be preserved 422 if tcpTimeout > 0 || tcpFinTimeout > 0 || udpTimeout > 0 { 423 if err := ipvs.ConfigureTimeouts(tcpTimeout, tcpFinTimeout, udpTimeout); err != nil { 424 klog.ErrorS(err, "failed to configure IPVS timeouts") 425 } 426 } 427 428 // Generate the masquerade mark to use for SNAT rules. 429 masqueradeValue := 1 << uint(masqueradeBit) 430 masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue) 431 432 ipFamily := v1.IPv4Protocol 433 if ipt.IsIPv6() { 434 ipFamily = v1.IPv6Protocol 435 } 436 437 klog.V(2).InfoS("record nodeIP and family", "nodeIP", nodeIP, "family", ipFamily) 438 439 if len(scheduler) == 0 { 440 klog.InfoS("IPVS scheduler not specified, use rr by default") 441 scheduler = DefaultScheduler 442 } 443 444 serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder) 445 446 ipFamilyMap := utilproxy.MapCIDRsByIPFamily(nodePortAddresses) 447 nodePortAddresses = ipFamilyMap[ipFamily] 448 // Log the IPs not matching the ipFamily 449 if ips, ok := ipFamilyMap[utilproxy.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 { 450 klog.InfoS("found node IPs of the wrong family", "ipFamily", ipFamily, "ips", strings.Join(ips, ",")) 451 } 452 453 // excludeCIDRs has been validated before, here we just parse it to IPNet list 454 parsedExcludeCIDRs, _ := utilnet.ParseCIDRs(excludeCIDRs) 455 456 proxier := &Proxier{ 457 ipFamily: ipFamily, 458 portsMap: make(map[utilnet.LocalPort]utilnet.Closeable), 459 serviceMap: make(proxy.ServiceMap), 460 serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), 461 endpointsMap: make(proxy.EndpointsMap), 462 endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, ipFamily, recorder, nil), 463 syncPeriod: syncPeriod, 464 minSyncPeriod: minSyncPeriod, 465 excludeCIDRs: parsedExcludeCIDRs, 466 iptables: ipt, 467 masqueradeAll: masqueradeAll, 468 masqueradeMark: masqueradeMark, 469 exec: exec, 470 localDetector: localDetector, 471 hostname: hostname, 472 nodeIP: nodeIP, 473 portMapper: &utilnet.ListenPortOpener, 474 recorder: recorder, 475 serviceHealthServer: serviceHealthServer, 476 healthzServer: healthzServer, 477 ipvs: ipvs, 478 ipvsScheduler: scheduler, 479 ipGetter: &realIPGetter{nl: NewNetLinkHandle(ipFamily == v1.IPv6Protocol)}, 480 iptablesData: bytes.NewBuffer(nil), 481 filterChainsData: bytes.NewBuffer(nil), 482 natChains: bytes.NewBuffer(nil), 483 natRules: bytes.NewBuffer(nil), 484 filterChains: bytes.NewBuffer(nil), 485 filterRules: bytes.NewBuffer(nil), 486 netlinkHandle: NewNetLinkHandle(ipFamily == v1.IPv6Protocol), 487 ipset: ipset, 488 nodePortAddresses: nodePortAddresses, 489 networkInterfacer: utilproxy.RealNetwork{}, 490 gracefuldeleteManager: NewGracefulTerminationManager(ipvs), 491 } 492 // initialize ipsetList with all sets we needed 493 proxier.ipsetList = make(map[string]*IPSet) 494 for _, is := range ipsetInfo { 495 proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, (ipFamily == v1.IPv6Protocol), is.comment) 496 } 497 burstSyncs := 2 498 klog.V(2).InfoS("ipvs sync params", "ipFamily", ipt.Protocol(), "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs) 499 proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) 500 proxier.gracefuldeleteManager.Run() 501 return proxier, nil 502} 503 504// NewDualStackProxier returns a new Proxier for dual-stack operation 505func NewDualStackProxier( 506 ipt [2]utiliptables.Interface, 507 ipvs utilipvs.Interface, 508 ipset utilipset.Interface, 509 sysctl utilsysctl.Interface, 510 exec utilexec.Interface, 511 syncPeriod time.Duration, 512 minSyncPeriod time.Duration, 513 excludeCIDRs []string, 514 strictARP bool, 515 tcpTimeout time.Duration, 516 tcpFinTimeout time.Duration, 517 udpTimeout time.Duration, 518 masqueradeAll bool, 519 masqueradeBit int, 520 localDetectors [2]proxyutiliptables.LocalTrafficDetector, 521 hostname string, 522 nodeIP [2]net.IP, 523 recorder events.EventRecorder, 524 healthzServer healthcheck.ProxierHealthUpdater, 525 scheduler string, 526 nodePortAddresses []string, 527 kernelHandler KernelHandler, 528) (proxy.Provider, error) { 529 530 safeIpset := newSafeIpset(ipset) 531 532 ipFamilyMap := utilproxy.MapCIDRsByIPFamily(nodePortAddresses) 533 534 // Create an ipv4 instance of the single-stack proxier 535 ipv4Proxier, err := NewProxier(ipt[0], ipvs, safeIpset, sysctl, 536 exec, syncPeriod, minSyncPeriod, filterCIDRs(false, excludeCIDRs), strictARP, 537 tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit, 538 localDetectors[0], hostname, nodeIP[0], 539 recorder, healthzServer, scheduler, ipFamilyMap[v1.IPv4Protocol], kernelHandler) 540 if err != nil { 541 return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err) 542 } 543 544 ipv6Proxier, err := NewProxier(ipt[1], ipvs, safeIpset, sysctl, 545 exec, syncPeriod, minSyncPeriod, filterCIDRs(true, excludeCIDRs), strictARP, 546 tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit, 547 localDetectors[1], hostname, nodeIP[1], 548 nil, nil, scheduler, ipFamilyMap[v1.IPv6Protocol], kernelHandler) 549 if err != nil { 550 return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err) 551 } 552 553 // Return a meta-proxier that dispatch calls between the two 554 // single-stack proxier instances 555 return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil 556} 557 558func filterCIDRs(wantIPv6 bool, cidrs []string) []string { 559 var filteredCIDRs []string 560 for _, cidr := range cidrs { 561 if utilnet.IsIPv6CIDRString(cidr) == wantIPv6 { 562 filteredCIDRs = append(filteredCIDRs, cidr) 563 } 564 } 565 return filteredCIDRs 566} 567 568// internal struct for string service information 569type serviceInfo struct { 570 *proxy.BaseServiceInfo 571 // The following fields are computed and stored for performance reasons. 572 serviceNameString string 573} 574 575// returns a new proxy.ServicePort which abstracts a serviceInfo 576func newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort { 577 info := &serviceInfo{BaseServiceInfo: baseInfo} 578 579 // Store the following for performance reasons. 580 svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} 581 svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name} 582 info.serviceNameString = svcPortName.String() 583 584 return info 585} 586 587// KernelHandler can handle the current installed kernel modules. 588type KernelHandler interface { 589 GetModules() ([]string, error) 590 GetKernelVersion() (string, error) 591} 592 593// LinuxKernelHandler implements KernelHandler interface. 594type LinuxKernelHandler struct { 595 executor utilexec.Interface 596} 597 598// NewLinuxKernelHandler initializes LinuxKernelHandler with exec. 599func NewLinuxKernelHandler() *LinuxKernelHandler { 600 return &LinuxKernelHandler{ 601 executor: utilexec.New(), 602 } 603} 604 605// GetModules returns all installed kernel modules. 606func (handle *LinuxKernelHandler) GetModules() ([]string, error) { 607 // Check whether IPVS required kernel modules are built-in 608 kernelVersionStr, err := handle.GetKernelVersion() 609 if err != nil { 610 return nil, err 611 } 612 kernelVersion, err := version.ParseGeneric(kernelVersionStr) 613 if err != nil { 614 return nil, fmt.Errorf("error parsing kernel version %q: %v", kernelVersionStr, err) 615 } 616 ipvsModules := utilipvs.GetRequiredIPVSModules(kernelVersion) 617 618 var bmods, lmods []string 619 620 // Find out loaded kernel modules. If this is a full static kernel it will try to verify if the module is compiled using /boot/config-KERNELVERSION 621 modulesFile, err := os.Open("/proc/modules") 622 if err == os.ErrNotExist { 623 klog.ErrorS(err, "Failed to read file /proc/modules. Assuming this is a kernel without loadable modules support enabled") 624 kernelConfigFile := fmt.Sprintf("/boot/config-%s", kernelVersionStr) 625 kConfig, err := ioutil.ReadFile(kernelConfigFile) 626 if err != nil { 627 return nil, fmt.Errorf("Failed to read Kernel Config file %s with error %v", kernelConfigFile, err) 628 } 629 for _, module := range ipvsModules { 630 if match, _ := regexp.Match("CONFIG_"+strings.ToUpper(module)+"=y", kConfig); match { 631 bmods = append(bmods, module) 632 } 633 } 634 return bmods, nil 635 } 636 if err != nil { 637 return nil, fmt.Errorf("Failed to read file /proc/modules with error %v", err) 638 } 639 defer modulesFile.Close() 640 641 mods, err := getFirstColumn(modulesFile) 642 if err != nil { 643 return nil, fmt.Errorf("failed to find loaded kernel modules: %v", err) 644 } 645 646 builtinModsFilePath := fmt.Sprintf("/lib/modules/%s/modules.builtin", kernelVersionStr) 647 b, err := ioutil.ReadFile(builtinModsFilePath) 648 if err != nil { 649 klog.ErrorS(err, "Failed to read builtin modules file. You can ignore this message when kube-proxy is running inside container without mounting /lib/modules", "filePath", builtinModsFilePath) 650 } 651 652 for _, module := range ipvsModules { 653 if match, _ := regexp.Match(module+".ko", b); match { 654 bmods = append(bmods, module) 655 } else { 656 // Try to load the required IPVS kernel modules if not built in 657 err := handle.executor.Command("modprobe", "--", module).Run() 658 if err != nil { 659 klog.InfoS("Failed to load kernel module with modprobe. "+ 660 "You can ignore this message when kube-proxy is running inside container without mounting /lib/modules", "moduleName", module) 661 } else { 662 lmods = append(lmods, module) 663 } 664 } 665 } 666 667 mods = append(mods, bmods...) 668 mods = append(mods, lmods...) 669 return mods, nil 670} 671 672// getFirstColumn reads all the content from r into memory and return a 673// slice which consists of the first word from each line. 674func getFirstColumn(r io.Reader) ([]string, error) { 675 b, err := ioutil.ReadAll(r) 676 if err != nil { 677 return nil, err 678 } 679 680 lines := strings.Split(string(b), "\n") 681 words := make([]string, 0, len(lines)) 682 for i := range lines { 683 fields := strings.Fields(lines[i]) 684 if len(fields) > 0 { 685 words = append(words, fields[0]) 686 } 687 } 688 return words, nil 689} 690 691// GetKernelVersion returns currently running kernel version. 692func (handle *LinuxKernelHandler) GetKernelVersion() (string, error) { 693 kernelVersionFile := "/proc/sys/kernel/osrelease" 694 fileContent, err := ioutil.ReadFile(kernelVersionFile) 695 if err != nil { 696 return "", fmt.Errorf("error reading osrelease file %q: %v", kernelVersionFile, err) 697 } 698 699 return strings.TrimSpace(string(fileContent)), nil 700} 701 702// CanUseIPVSProxier returns true if we can use the ipvs Proxier. 703// This is determined by checking if all the required kernel modules can be loaded. It may 704// return an error if it fails to get the kernel modules information without error, in which 705// case it will also return false. 706func CanUseIPVSProxier(handle KernelHandler, ipsetver IPSetVersioner, scheduler string) (bool, error) { 707 mods, err := handle.GetModules() 708 if err != nil { 709 return false, fmt.Errorf("error getting installed ipvs required kernel modules: %v", err) 710 } 711 loadModules := sets.NewString() 712 loadModules.Insert(mods...) 713 714 kernelVersionStr, err := handle.GetKernelVersion() 715 if err != nil { 716 return false, fmt.Errorf("error determining kernel version to find required kernel modules for ipvs support: %v", err) 717 } 718 kernelVersion, err := version.ParseGeneric(kernelVersionStr) 719 if err != nil { 720 return false, fmt.Errorf("error parsing kernel version %q: %v", kernelVersionStr, err) 721 } 722 mods = utilipvs.GetRequiredIPVSModules(kernelVersion) 723 wantModules := sets.NewString() 724 // We check for the existence of the scheduler mod and will trigger a missingMods error if not found 725 if scheduler == "" { 726 scheduler = DefaultScheduler 727 } 728 schedulerMod := "ip_vs_" + scheduler 729 mods = append(mods, schedulerMod) 730 wantModules.Insert(mods...) 731 732 modules := wantModules.Difference(loadModules).UnsortedList() 733 var missingMods []string 734 ConntrackiMissingCounter := 0 735 for _, mod := range modules { 736 if strings.Contains(mod, "nf_conntrack") { 737 ConntrackiMissingCounter++ 738 } else { 739 missingMods = append(missingMods, mod) 740 } 741 } 742 if ConntrackiMissingCounter == 2 { 743 missingMods = append(missingMods, "nf_conntrack_ipv4(or nf_conntrack for Linux kernel 4.19 and later)") 744 } 745 746 if len(missingMods) != 0 { 747 return false, fmt.Errorf("IPVS proxier will not be used because the following required kernel modules are not loaded: %v", missingMods) 748 } 749 750 // Check ipset version 751 versionString, err := ipsetver.GetVersion() 752 if err != nil { 753 return false, fmt.Errorf("error getting ipset version, error: %v", err) 754 } 755 if !checkMinVersion(versionString) { 756 return false, fmt.Errorf("ipset version: %s is less than min required version: %s", versionString, MinIPSetCheckVersion) 757 } 758 return true, nil 759} 760 761// CleanupIptablesLeftovers removes all iptables rules and chains created by the Proxier 762// It returns true if an error was encountered. Errors are logged. 763func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool) { 764 // Unlink the iptables chains created by ipvs Proxier 765 for _, jc := range iptablesJumpChain { 766 args := []string{ 767 "-m", "comment", "--comment", jc.comment, 768 "-j", string(jc.to), 769 } 770 if err := ipt.DeleteRule(jc.table, jc.from, args...); err != nil { 771 if !utiliptables.IsNotFoundError(err) { 772 klog.ErrorS(err, "Error removing iptables rules in ipvs proxier") 773 encounteredError = true 774 } 775 } 776 } 777 778 // Flush and remove all of our chains. Flushing all chains before removing them also removes all links between chains first. 779 for _, ch := range iptablesCleanupChains { 780 if err := ipt.FlushChain(ch.table, ch.chain); err != nil { 781 if !utiliptables.IsNotFoundError(err) { 782 klog.ErrorS(err, "Error removing iptables rules in ipvs proxier") 783 encounteredError = true 784 } 785 } 786 } 787 788 // Remove all of our chains. 789 for _, ch := range iptablesCleanupChains { 790 if err := ipt.DeleteChain(ch.table, ch.chain); err != nil { 791 if !utiliptables.IsNotFoundError(err) { 792 klog.ErrorS(err, "Error removing iptables rules in ipvs proxier") 793 encounteredError = true 794 } 795 } 796 } 797 798 return encounteredError 799} 800 801// CleanupLeftovers clean up all ipvs and iptables rules created by ipvs Proxier. 802func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset utilipset.Interface) (encounteredError bool) { 803 // Clear all ipvs rules 804 if ipvs != nil { 805 err := ipvs.Flush() 806 if err != nil { 807 klog.ErrorS(err, "Error flushing IPVS rules") 808 encounteredError = true 809 } 810 } 811 // Delete dummy interface created by ipvs Proxier. 812 nl := NewNetLinkHandle(false) 813 err := nl.DeleteDummyDevice(DefaultDummyDevice) 814 if err != nil { 815 klog.ErrorS(err, "Error deleting dummy device created by IPVS proxier", "device", DefaultDummyDevice) 816 encounteredError = true 817 } 818 // Clear iptables created by ipvs Proxier. 819 encounteredError = cleanupIptablesLeftovers(ipt) || encounteredError 820 // Destroy ip sets created by ipvs Proxier. We should call it after cleaning up 821 // iptables since we can NOT delete ip set which is still referenced by iptables. 822 for _, set := range ipsetInfo { 823 err = ipset.DestroySet(set.name) 824 if err != nil { 825 if !utilipset.IsNotFoundError(err) { 826 klog.ErrorS(err, "Error removing ipset", "ipset", set.name) 827 encounteredError = true 828 } 829 } 830 } 831 return encounteredError 832} 833 834// Sync is called to synchronize the proxier state to iptables and ipvs as soon as possible. 835func (proxier *Proxier) Sync() { 836 if proxier.healthzServer != nil { 837 proxier.healthzServer.QueuedUpdate() 838 } 839 metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() 840 proxier.syncRunner.Run() 841} 842 843// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. 844func (proxier *Proxier) SyncLoop() { 845 // Update healthz timestamp at beginning in case Sync() never succeeds. 846 if proxier.healthzServer != nil { 847 proxier.healthzServer.Updated() 848 } 849 // synthesize "last change queued" time as the informers are syncing. 850 metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() 851 proxier.syncRunner.Loop(wait.NeverStop) 852} 853 854func (proxier *Proxier) setInitialized(value bool) { 855 var initialized int32 856 if value { 857 initialized = 1 858 } 859 atomic.StoreInt32(&proxier.initialized, initialized) 860} 861 862func (proxier *Proxier) isInitialized() bool { 863 return atomic.LoadInt32(&proxier.initialized) > 0 864} 865 866// OnServiceAdd is called whenever creation of new service object is observed. 867func (proxier *Proxier) OnServiceAdd(service *v1.Service) { 868 proxier.OnServiceUpdate(nil, service) 869} 870 871// OnServiceUpdate is called whenever modification of an existing service object is observed. 872func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { 873 if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() { 874 proxier.Sync() 875 } 876} 877 878// OnServiceDelete is called whenever deletion of an existing service object is observed. 879func (proxier *Proxier) OnServiceDelete(service *v1.Service) { 880 proxier.OnServiceUpdate(service, nil) 881} 882 883// OnServiceSynced is called once all the initial event handlers were called and the state is fully propagated to local cache. 884func (proxier *Proxier) OnServiceSynced() { 885 proxier.mu.Lock() 886 proxier.servicesSynced = true 887 proxier.setInitialized(proxier.endpointSlicesSynced) 888 proxier.mu.Unlock() 889 890 // Sync unconditionally - this is called once per lifetime. 891 proxier.syncProxyRules() 892} 893 894// The following methods exist to implement the Proxier interface however 895// ipvs proxier only uses EndpointSlices so the following are noops 896 897// OnEndpointsAdd is called whenever creation of new endpoints object is observed. 898func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {} 899 900// OnEndpointsUpdate is called whenever modification of an existing endpoints object is observed. 901func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {} 902 903// OnEndpointsDelete is called whenever deletion of an existing endpoints object is observed. 904func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {} 905 906// OnEndpointsSynced is called once all the initial event handlers were called and the state is fully propagated to local cache. 907func (proxier *Proxier) OnEndpointsSynced() {} 908 909// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object 910// is observed. 911func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) { 912 if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() { 913 proxier.Sync() 914 } 915} 916 917// OnEndpointSliceUpdate is called whenever modification of an existing endpoint 918// slice object is observed. 919func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) { 920 if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() { 921 proxier.Sync() 922 } 923} 924 925// OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice 926// object is observed. 927func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) { 928 if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() { 929 proxier.Sync() 930 } 931} 932 933// OnEndpointSlicesSynced is called once all the initial event handlers were 934// called and the state is fully propagated to local cache. 935func (proxier *Proxier) OnEndpointSlicesSynced() { 936 proxier.mu.Lock() 937 proxier.endpointSlicesSynced = true 938 proxier.setInitialized(proxier.servicesSynced) 939 proxier.mu.Unlock() 940 941 // Sync unconditionally - this is called once per lifetime. 942 proxier.syncProxyRules() 943} 944 945// OnNodeAdd is called whenever creation of new node object 946// is observed. 947func (proxier *Proxier) OnNodeAdd(node *v1.Node) { 948 if node.Name != proxier.hostname { 949 klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname) 950 return 951 } 952 953 if reflect.DeepEqual(proxier.nodeLabels, node.Labels) { 954 return 955 } 956 957 proxier.mu.Lock() 958 proxier.nodeLabels = map[string]string{} 959 for k, v := range node.Labels { 960 proxier.nodeLabels[k] = v 961 } 962 proxier.mu.Unlock() 963 klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels) 964 965 proxier.syncProxyRules() 966} 967 968// OnNodeUpdate is called whenever modification of an existing 969// node object is observed. 970func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) { 971 if node.Name != proxier.hostname { 972 klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname) 973 return 974 } 975 976 if reflect.DeepEqual(proxier.nodeLabels, node.Labels) { 977 return 978 } 979 980 proxier.mu.Lock() 981 proxier.nodeLabels = map[string]string{} 982 for k, v := range node.Labels { 983 proxier.nodeLabels[k] = v 984 } 985 proxier.mu.Unlock() 986 klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels) 987 988 proxier.syncProxyRules() 989} 990 991// OnNodeDelete is called whenever deletion of an existing node 992// object is observed. 993func (proxier *Proxier) OnNodeDelete(node *v1.Node) { 994 if node.Name != proxier.hostname { 995 klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname) 996 return 997 } 998 proxier.mu.Lock() 999 proxier.nodeLabels = nil 1000 proxier.mu.Unlock() 1001 1002 proxier.syncProxyRules() 1003} 1004 1005// OnNodeSynced is called once all the initial event handlers were 1006// called and the state is fully propagated to local cache. 1007func (proxier *Proxier) OnNodeSynced() { 1008} 1009 1010// This is where all of the ipvs calls happen. 1011// assumes proxier.mu is held 1012func (proxier *Proxier) syncProxyRules() { 1013 proxier.mu.Lock() 1014 defer proxier.mu.Unlock() 1015 1016 // don't sync rules till we've received services and endpoints 1017 if !proxier.isInitialized() { 1018 klog.V(2).InfoS("Not syncing ipvs rules until Services and Endpoints have been received from master") 1019 return 1020 } 1021 1022 // Keep track of how long syncs take. 1023 start := time.Now() 1024 defer func() { 1025 metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start)) 1026 klog.V(4).InfoS("syncProxyRules complete", "elapsed", time.Since(start)) 1027 }() 1028 1029 // We assume that if this was called, we really want to sync them, 1030 // even if nothing changed in the meantime. In other words, callers are 1031 // responsible for detecting no-op changes and not calling this function. 1032 serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges) 1033 endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) 1034 1035 staleServices := serviceUpdateResult.UDPStaleClusterIP 1036 // merge stale services gathered from updateEndpointsMap 1037 for _, svcPortName := range endpointUpdateResult.StaleServiceNames { 1038 if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { 1039 klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "svcPortName", svcPortName.String(), "clusterIP", svcInfo.ClusterIP().String()) 1040 staleServices.Insert(svcInfo.ClusterIP().String()) 1041 for _, extIP := range svcInfo.ExternalIPStrings() { 1042 staleServices.Insert(extIP) 1043 } 1044 } 1045 } 1046 1047 klog.V(3).InfoS("Syncing ipvs Proxier rules") 1048 1049 // Begin install iptables 1050 1051 // Reset all buffers used later. 1052 // This is to avoid memory reallocations and thus improve performance. 1053 proxier.natChains.Reset() 1054 proxier.natRules.Reset() 1055 proxier.filterChains.Reset() 1056 proxier.filterRules.Reset() 1057 1058 // Write table headers. 1059 utilproxy.WriteLine(proxier.filterChains, "*filter") 1060 utilproxy.WriteLine(proxier.natChains, "*nat") 1061 1062 proxier.createAndLinkKubeChain() 1063 1064 // make sure dummy interface exists in the system where ipvs Proxier will bind service address on it 1065 _, err := proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice) 1066 if err != nil { 1067 klog.ErrorS(err, "Failed to create dummy interface", "interface", DefaultDummyDevice) 1068 return 1069 } 1070 1071 // make sure ip sets exists in the system. 1072 for _, set := range proxier.ipsetList { 1073 if err := ensureIPSet(set); err != nil { 1074 return 1075 } 1076 set.resetEntries() 1077 } 1078 1079 // Accumulate the set of local ports that we will be holding open once this update is complete 1080 replacementPortsMap := map[utilnet.LocalPort]utilnet.Closeable{} 1081 // activeIPVSServices represents IPVS service successfully created in this round of sync 1082 activeIPVSServices := map[string]bool{} 1083 // currentIPVSServices represent IPVS services listed from the system 1084 currentIPVSServices := make(map[string]*utilipvs.VirtualServer) 1085 // activeBindAddrs represents ip address successfully bind to DefaultDummyDevice in this round of sync 1086 activeBindAddrs := map[string]bool{} 1087 1088 bindedAddresses, err := proxier.ipGetter.BindedIPs() 1089 if err != nil { 1090 klog.ErrorS(err, "error listing addresses binded to dummy interface") 1091 } 1092 1093 hasNodePort := false 1094 for _, svc := range proxier.serviceMap { 1095 svcInfo, ok := svc.(*serviceInfo) 1096 if ok && svcInfo.NodePort() != 0 { 1097 hasNodePort = true 1098 break 1099 } 1100 } 1101 1102 // Both nodeAddresses and nodeIPs can be reused for all nodePort services 1103 // and only need to be computed if we have at least one nodePort service. 1104 var ( 1105 // List of node addresses to listen on if a nodePort is set. 1106 nodeAddresses []string 1107 // List of node IP addresses to be used as IPVS services if nodePort is set. 1108 nodeIPs []net.IP 1109 ) 1110 1111 if hasNodePort { 1112 nodeAddrSet, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer) 1113 if err != nil { 1114 klog.ErrorS(err, "Failed to get node ip address matching nodeport cidr") 1115 } else { 1116 nodeAddresses = nodeAddrSet.List() 1117 for _, address := range nodeAddresses { 1118 a := net.ParseIP(address) 1119 if a.IsLoopback() { 1120 continue 1121 } 1122 if utilproxy.IsZeroCIDR(address) { 1123 nodeIPs, err = proxier.ipGetter.NodeIPs() 1124 if err != nil { 1125 klog.ErrorS(err, "Failed to list all node IPs from host") 1126 } 1127 break 1128 } 1129 nodeIPs = append(nodeIPs, a) 1130 } 1131 } 1132 } 1133 1134 // filter node IPs by proxier ipfamily 1135 idx := 0 1136 for _, nodeIP := range nodeIPs { 1137 if (proxier.ipFamily == v1.IPv6Protocol) == utilnet.IsIPv6(nodeIP) { 1138 nodeIPs[idx] = nodeIP 1139 idx++ 1140 } 1141 } 1142 // reset slice to filtered entries 1143 nodeIPs = nodeIPs[:idx] 1144 1145 localAddrSet := utilproxy.GetLocalAddrSet() 1146 1147 // Build IPVS rules for each service. 1148 for svcName, svc := range proxier.serviceMap { 1149 svcInfo, ok := svc.(*serviceInfo) 1150 if !ok { 1151 klog.ErrorS(nil, "Failed to cast serviceInfo", "svcName", svcName.String()) 1152 continue 1153 } 1154 isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP()) 1155 localPortIPFamily := utilnet.IPv4 1156 if isIPv6 { 1157 localPortIPFamily = utilnet.IPv6 1158 } 1159 protocol := strings.ToLower(string(svcInfo.Protocol())) 1160 // Precompute svcNameString; with many services the many calls 1161 // to ServicePortName.String() show up in CPU profiles. 1162 svcNameString := svcName.String() 1163 1164 // Handle traffic that loops back to the originator with SNAT. 1165 for _, e := range proxier.endpointsMap[svcName] { 1166 ep, ok := e.(*proxy.BaseEndpointInfo) 1167 if !ok { 1168 klog.ErrorS(nil, "Failed to cast BaseEndpointInfo", "endpoint", e.String()) 1169 continue 1170 } 1171 if !ep.IsLocal { 1172 continue 1173 } 1174 epIP := ep.IP() 1175 epPort, err := ep.Port() 1176 // Error parsing this endpoint has been logged. Skip to next endpoint. 1177 if epIP == "" || err != nil { 1178 continue 1179 } 1180 entry := &utilipset.Entry{ 1181 IP: epIP, 1182 Port: epPort, 1183 Protocol: protocol, 1184 IP2: epIP, 1185 SetType: utilipset.HashIPPortIP, 1186 } 1187 if valid := proxier.ipsetList[kubeLoopBackIPSet].validateEntry(entry); !valid { 1188 klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoopBackIPSet].Name) 1189 continue 1190 } 1191 proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String()) 1192 } 1193 1194 // Capture the clusterIP. 1195 // ipset call 1196 entry := &utilipset.Entry{ 1197 IP: svcInfo.ClusterIP().String(), 1198 Port: svcInfo.Port(), 1199 Protocol: protocol, 1200 SetType: utilipset.HashIPPort, 1201 } 1202 // add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin. 1203 // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) 1204 if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid { 1205 klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeClusterIPSet].Name) 1206 continue 1207 } 1208 proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String()) 1209 // ipvs call 1210 serv := &utilipvs.VirtualServer{ 1211 Address: svcInfo.ClusterIP(), 1212 Port: uint16(svcInfo.Port()), 1213 Protocol: string(svcInfo.Protocol()), 1214 Scheduler: proxier.ipvsScheduler, 1215 } 1216 // Set session affinity flag and timeout for IPVS service 1217 if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { 1218 serv.Flags |= utilipvs.FlagPersistent 1219 serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) 1220 } 1221 // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService() 1222 if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil { 1223 activeIPVSServices[serv.String()] = true 1224 activeBindAddrs[serv.Address.String()] = true 1225 // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP 1226 // So we still need clusterIP rules in onlyNodeLocalEndpoints mode. 1227 internalNodeLocal := false 1228 if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && svcInfo.NodeLocalInternal() { 1229 internalNodeLocal = true 1230 } 1231 if err := proxier.syncEndpoint(svcName, internalNodeLocal, serv); err != nil { 1232 klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String()) 1233 } 1234 } else { 1235 klog.ErrorS(err, "Failed to sync service", "service", serv.String()) 1236 } 1237 1238 // Capture externalIPs. 1239 for _, externalIP := range svcInfo.ExternalIPStrings() { 1240 // If the "external" IP happens to be an IP that is local to this 1241 // machine, hold the local port open so no other process can open it 1242 // (because the socket might open but it would never work). 1243 if (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(net.ParseIP(externalIP)) { 1244 // We do not start listening on SCTP ports, according to our agreement in the SCTP support KEP 1245 lp := utilnet.LocalPort{ 1246 Description: "externalIP for " + svcNameString, 1247 IP: externalIP, 1248 IPFamily: localPortIPFamily, 1249 Port: svcInfo.Port(), 1250 Protocol: utilnet.Protocol(svcInfo.Protocol()), 1251 } 1252 if proxier.portsMap[lp] != nil { 1253 klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String()) 1254 replacementPortsMap[lp] = proxier.portsMap[lp] 1255 } else { 1256 socket, err := proxier.portMapper.OpenLocalPort(&lp) 1257 if err != nil { 1258 msg := fmt.Sprintf("can't open port %s, skipping it", lp.String()) 1259 1260 proxier.recorder.Eventf( 1261 &v1.ObjectReference{ 1262 Kind: "Node", 1263 Name: proxier.hostname, 1264 UID: types.UID(proxier.hostname), 1265 Namespace: "", 1266 }, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg) 1267 klog.ErrorS(err, "can't open port, skipping it", "port", lp.String()) 1268 continue 1269 } 1270 klog.V(2).InfoS("Opened local port", "port", lp.String()) 1271 replacementPortsMap[lp] = socket 1272 } 1273 } // We're holding the port, so it's OK to install IPVS rules. 1274 1275 // ipset call 1276 entry := &utilipset.Entry{ 1277 IP: externalIP, 1278 Port: svcInfo.Port(), 1279 Protocol: protocol, 1280 SetType: utilipset.HashIPPort, 1281 } 1282 1283 if svcInfo.NodeLocalExternal() { 1284 if valid := proxier.ipsetList[kubeExternalIPLocalSet].validateEntry(entry); !valid { 1285 klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeExternalIPLocalSet].Name) 1286 continue 1287 } 1288 proxier.ipsetList[kubeExternalIPLocalSet].activeEntries.Insert(entry.String()) 1289 } else { 1290 // We have to SNAT packets to external IPs. 1291 if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid { 1292 klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeExternalIPSet].Name) 1293 continue 1294 } 1295 proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String()) 1296 } 1297 1298 // ipvs call 1299 serv := &utilipvs.VirtualServer{ 1300 Address: net.ParseIP(externalIP), 1301 Port: uint16(svcInfo.Port()), 1302 Protocol: string(svcInfo.Protocol()), 1303 Scheduler: proxier.ipvsScheduler, 1304 } 1305 if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { 1306 serv.Flags |= utilipvs.FlagPersistent 1307 serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) 1308 } 1309 if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil { 1310 activeIPVSServices[serv.String()] = true 1311 activeBindAddrs[serv.Address.String()] = true 1312 1313 if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil { 1314 klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String()) 1315 } 1316 } else { 1317 klog.ErrorS(err, "Failed to sync service", "service", serv.String()) 1318 } 1319 } 1320 1321 // Capture load-balancer ingress. 1322 for _, ingress := range svcInfo.LoadBalancerIPStrings() { 1323 if ingress != "" { 1324 // ipset call 1325 entry = &utilipset.Entry{ 1326 IP: ingress, 1327 Port: svcInfo.Port(), 1328 Protocol: protocol, 1329 SetType: utilipset.HashIPPort, 1330 } 1331 // add service load balancer ingressIP:Port to kubeServiceAccess ip set for the purpose of solving hairpin. 1332 // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) 1333 // If we are proxying globally, we need to masquerade in case we cross nodes. 1334 // If we are proxying only locally, we can retain the source IP. 1335 if valid := proxier.ipsetList[kubeLoadBalancerSet].validateEntry(entry); !valid { 1336 klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSet].Name) 1337 continue 1338 } 1339 proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String()) 1340 // insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local 1341 if svcInfo.NodeLocalExternal() { 1342 if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid { 1343 klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerLocalSet].Name) 1344 continue 1345 } 1346 proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String()) 1347 } 1348 if len(svcInfo.LoadBalancerSourceRanges()) != 0 { 1349 // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. 1350 // This currently works for loadbalancers that preserves source ips. 1351 // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. 1352 if valid := proxier.ipsetList[kubeLoadbalancerFWSet].validateEntry(entry); !valid { 1353 klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadbalancerFWSet].Name) 1354 continue 1355 } 1356 proxier.ipsetList[kubeLoadbalancerFWSet].activeEntries.Insert(entry.String()) 1357 allowFromNode := false 1358 for _, src := range svcInfo.LoadBalancerSourceRanges() { 1359 // ipset call 1360 entry = &utilipset.Entry{ 1361 IP: ingress, 1362 Port: svcInfo.Port(), 1363 Protocol: protocol, 1364 Net: src, 1365 SetType: utilipset.HashIPPortNet, 1366 } 1367 // enumerate all white list source cidr 1368 if valid := proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].validateEntry(entry); !valid { 1369 klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name) 1370 continue 1371 } 1372 proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].activeEntries.Insert(entry.String()) 1373 1374 // ignore error because it has been validated 1375 _, cidr, _ := net.ParseCIDR(src) 1376 if cidr.Contains(proxier.nodeIP) { 1377 allowFromNode = true 1378 } 1379 } 1380 // generally, ip route rule was added to intercept request to loadbalancer vip from the 1381 // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. 1382 // Need to add the following rule to allow request on host. 1383 if allowFromNode { 1384 entry = &utilipset.Entry{ 1385 IP: ingress, 1386 Port: svcInfo.Port(), 1387 Protocol: protocol, 1388 IP2: ingress, 1389 SetType: utilipset.HashIPPortIP, 1390 } 1391 // enumerate all white list source ip 1392 if valid := proxier.ipsetList[kubeLoadBalancerSourceIPSet].validateEntry(entry); !valid { 1393 klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSourceIPSet].Name) 1394 continue 1395 } 1396 proxier.ipsetList[kubeLoadBalancerSourceIPSet].activeEntries.Insert(entry.String()) 1397 } 1398 } 1399 1400 // ipvs call 1401 serv := &utilipvs.VirtualServer{ 1402 Address: net.ParseIP(ingress), 1403 Port: uint16(svcInfo.Port()), 1404 Protocol: string(svcInfo.Protocol()), 1405 Scheduler: proxier.ipvsScheduler, 1406 } 1407 if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { 1408 serv.Flags |= utilipvs.FlagPersistent 1409 serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) 1410 } 1411 if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil { 1412 activeIPVSServices[serv.String()] = true 1413 activeBindAddrs[serv.Address.String()] = true 1414 if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil { 1415 klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv) 1416 } 1417 } else { 1418 klog.ErrorS(err, "Failed to sync service", "service", serv) 1419 } 1420 } 1421 } 1422 1423 if svcInfo.NodePort() != 0 { 1424 if len(nodeAddresses) == 0 || len(nodeIPs) == 0 { 1425 // Skip nodePort configuration since an error occurred when 1426 // computing nodeAddresses or nodeIPs. 1427 continue 1428 } 1429 1430 var lps []utilnet.LocalPort 1431 for _, address := range nodeAddresses { 1432 lp := utilnet.LocalPort{ 1433 Description: "nodePort for " + svcNameString, 1434 IP: address, 1435 IPFamily: localPortIPFamily, 1436 Port: svcInfo.NodePort(), 1437 Protocol: utilnet.Protocol(svcInfo.Protocol()), 1438 } 1439 if utilproxy.IsZeroCIDR(address) { 1440 // Empty IP address means all 1441 lp.IP = "" 1442 lps = append(lps, lp) 1443 // If we encounter a zero CIDR, then there is no point in processing the rest of the addresses. 1444 break 1445 } 1446 lps = append(lps, lp) 1447 } 1448 1449 // For ports on node IPs, open the actual port and hold it. 1450 for _, lp := range lps { 1451 if proxier.portsMap[lp] != nil { 1452 klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String()) 1453 replacementPortsMap[lp] = proxier.portsMap[lp] 1454 // We do not start listening on SCTP ports, according to our agreement in the 1455 // SCTP support KEP 1456 } else if svcInfo.Protocol() != v1.ProtocolSCTP { 1457 socket, err := proxier.portMapper.OpenLocalPort(&lp) 1458 if err != nil { 1459 msg := fmt.Sprintf("can't open port %s, skipping it", lp.String()) 1460 1461 proxier.recorder.Eventf( 1462 &v1.ObjectReference{ 1463 Kind: "Node", 1464 Name: proxier.hostname, 1465 UID: types.UID(proxier.hostname), 1466 Namespace: "", 1467 }, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg) 1468 klog.ErrorS(err, "can't open port, skipping it", "port", lp.String()) 1469 continue 1470 } 1471 klog.V(2).InfoS("Opened local port", "port", lp.String()) 1472 1473 if lp.Protocol == utilnet.UDP { 1474 conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP) 1475 } 1476 replacementPortsMap[lp] = socket 1477 } // We're holding the port, so it's OK to install ipvs rules. 1478 } 1479 1480 // Nodeports need SNAT, unless they're local. 1481 // ipset call 1482 1483 var ( 1484 nodePortSet *IPSet 1485 entries []*utilipset.Entry 1486 ) 1487 1488 switch protocol { 1489 case utilipset.ProtocolTCP: 1490 nodePortSet = proxier.ipsetList[kubeNodePortSetTCP] 1491 entries = []*utilipset.Entry{{ 1492 // No need to provide ip info 1493 Port: svcInfo.NodePort(), 1494 Protocol: protocol, 1495 SetType: utilipset.BitmapPort, 1496 }} 1497 case utilipset.ProtocolUDP: 1498 nodePortSet = proxier.ipsetList[kubeNodePortSetUDP] 1499 entries = []*utilipset.Entry{{ 1500 // No need to provide ip info 1501 Port: svcInfo.NodePort(), 1502 Protocol: protocol, 1503 SetType: utilipset.BitmapPort, 1504 }} 1505 case utilipset.ProtocolSCTP: 1506 nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP] 1507 // Since hash ip:port is used for SCTP, all the nodeIPs to be used in the SCTP ipset entries. 1508 entries = []*utilipset.Entry{} 1509 for _, nodeIP := range nodeIPs { 1510 entries = append(entries, &utilipset.Entry{ 1511 IP: nodeIP.String(), 1512 Port: svcInfo.NodePort(), 1513 Protocol: protocol, 1514 SetType: utilipset.HashIPPort, 1515 }) 1516 } 1517 default: 1518 // It should never hit 1519 klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol) 1520 } 1521 if nodePortSet != nil { 1522 entryInvalidErr := false 1523 for _, entry := range entries { 1524 if valid := nodePortSet.validateEntry(entry); !valid { 1525 klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortSet.Name) 1526 entryInvalidErr = true 1527 break 1528 } 1529 nodePortSet.activeEntries.Insert(entry.String()) 1530 } 1531 if entryInvalidErr { 1532 continue 1533 } 1534 } 1535 1536 // Add externaltrafficpolicy=local type nodeport entry 1537 if svcInfo.NodeLocalExternal() { 1538 var nodePortLocalSet *IPSet 1539 switch protocol { 1540 case utilipset.ProtocolTCP: 1541 nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetTCP] 1542 case utilipset.ProtocolUDP: 1543 nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetUDP] 1544 case utilipset.ProtocolSCTP: 1545 nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetSCTP] 1546 default: 1547 // It should never hit 1548 klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol) 1549 } 1550 if nodePortLocalSet != nil { 1551 entryInvalidErr := false 1552 for _, entry := range entries { 1553 if valid := nodePortLocalSet.validateEntry(entry); !valid { 1554 klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortLocalSet.Name) 1555 entryInvalidErr = true 1556 break 1557 } 1558 nodePortLocalSet.activeEntries.Insert(entry.String()) 1559 } 1560 if entryInvalidErr { 1561 continue 1562 } 1563 } 1564 } 1565 1566 // Build ipvs kernel routes for each node ip address 1567 for _, nodeIP := range nodeIPs { 1568 // ipvs call 1569 serv := &utilipvs.VirtualServer{ 1570 Address: nodeIP, 1571 Port: uint16(svcInfo.NodePort()), 1572 Protocol: string(svcInfo.Protocol()), 1573 Scheduler: proxier.ipvsScheduler, 1574 } 1575 if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { 1576 serv.Flags |= utilipvs.FlagPersistent 1577 serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) 1578 } 1579 // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`. 1580 if err := proxier.syncService(svcNameString, serv, false, bindedAddresses); err == nil { 1581 activeIPVSServices[serv.String()] = true 1582 if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil { 1583 klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv) 1584 } 1585 } else { 1586 klog.ErrorS(err, "Failed to sync service", "service", serv) 1587 } 1588 } 1589 } 1590 1591 if svcInfo.HealthCheckNodePort() != 0 { 1592 nodePortSet := proxier.ipsetList[kubeHealthCheckNodePortSet] 1593 entry := &utilipset.Entry{ 1594 // No need to provide ip info 1595 Port: svcInfo.HealthCheckNodePort(), 1596 Protocol: "tcp", 1597 SetType: utilipset.BitmapPort, 1598 } 1599 1600 if valid := nodePortSet.validateEntry(entry); !valid { 1601 klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortSet.Name) 1602 continue 1603 } 1604 nodePortSet.activeEntries.Insert(entry.String()) 1605 } 1606 } 1607 1608 // sync ipset entries 1609 for _, set := range proxier.ipsetList { 1610 set.syncIPSetEntries() 1611 } 1612 1613 // Tail call iptables rules for ipset, make sure only call iptables once 1614 // in a single loop per ip set. 1615 proxier.writeIptablesRules() 1616 1617 // Sync iptables rules. 1618 // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table. 1619 proxier.iptablesData.Reset() 1620 proxier.iptablesData.Write(proxier.natChains.Bytes()) 1621 proxier.iptablesData.Write(proxier.natRules.Bytes()) 1622 proxier.iptablesData.Write(proxier.filterChains.Bytes()) 1623 proxier.iptablesData.Write(proxier.filterRules.Bytes()) 1624 1625 klog.V(5).InfoS("Restoring iptables", "rules", string(proxier.iptablesData.Bytes())) 1626 err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters) 1627 if err != nil { 1628 klog.ErrorS(err, "Failed to execute iptables-restore", "rules", string(proxier.iptablesData.Bytes())) 1629 metrics.IptablesRestoreFailuresTotal.Inc() 1630 // Revert new local ports. 1631 utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap) 1632 return 1633 } 1634 for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes { 1635 for _, lastChangeTriggerTime := range lastChangeTriggerTimes { 1636 latency := metrics.SinceInSeconds(lastChangeTriggerTime) 1637 metrics.NetworkProgrammingLatency.Observe(latency) 1638 klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency) 1639 } 1640 } 1641 1642 // Close old local ports and save new ones. 1643 for k, v := range proxier.portsMap { 1644 if replacementPortsMap[k] == nil { 1645 v.Close() 1646 } 1647 } 1648 proxier.portsMap = replacementPortsMap 1649 1650 // Get legacy bind address 1651 // currentBindAddrs represents ip addresses bind to DefaultDummyDevice from the system 1652 currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(DefaultDummyDevice) 1653 if err != nil { 1654 klog.ErrorS(err, "Failed to get bind address") 1655 } 1656 legacyBindAddrs := proxier.getLegacyBindAddr(activeBindAddrs, currentBindAddrs) 1657 1658 // Clean up legacy IPVS services and unbind addresses 1659 appliedSvcs, err := proxier.ipvs.GetVirtualServers() 1660 if err == nil { 1661 for _, appliedSvc := range appliedSvcs { 1662 currentIPVSServices[appliedSvc.String()] = appliedSvc 1663 } 1664 } else { 1665 klog.ErrorS(err, "Failed to get ipvs service") 1666 } 1667 proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs) 1668 1669 if proxier.healthzServer != nil { 1670 proxier.healthzServer.Updated() 1671 } 1672 metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime() 1673 1674 // Update service healthchecks. The endpoints list might include services that are 1675 // not "OnlyLocal", but the services list will not, and the serviceHealthServer 1676 // will just drop those endpoints. 1677 if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil { 1678 klog.ErrorS(err, "Error syncing healthcheck services") 1679 } 1680 if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil { 1681 klog.ErrorS(err, "Error syncing healthcheck endpoints") 1682 } 1683 1684 // Finish housekeeping. 1685 // TODO: these could be made more consistent. 1686 for _, svcIP := range staleServices.UnsortedList() { 1687 if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil { 1688 klog.ErrorS(err, "Failed to delete stale service IP connections", "ip", svcIP) 1689 } 1690 } 1691 proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) 1692} 1693 1694// writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed 1695// according to proxier.ipsetList information and the ipset match relationship that `ipsetWithIptablesChain` specified. 1696// some ipset(kubeClusterIPSet for example) have particular match rules and iptables jump relation should be sync separately. 1697func (proxier *Proxier) writeIptablesRules() { 1698 // We are creating those slices ones here to avoid memory reallocations 1699 // in every loop. Note that reuse the memory, instead of doing: 1700 // slice = <some new slice> 1701 // you should always do one of the below: 1702 // slice = slice[:0] // and then append to it 1703 // slice = append(slice[:0], ...) 1704 // To avoid growing this slice, we arbitrarily set its size to 64, 1705 // there is never more than that many arguments for a single line. 1706 // Note that even if we go over 64, it will still be correct - it 1707 // is just for efficiency, not correctness. 1708 args := make([]string, 64) 1709 1710 for _, set := range ipsetWithIptablesChain { 1711 if _, find := proxier.ipsetList[set.name]; find && !proxier.ipsetList[set.name].isEmpty() { 1712 args = append(args[:0], "-A", set.from) 1713 if set.protocolMatch != "" { 1714 args = append(args, "-p", set.protocolMatch) 1715 } 1716 args = append(args, 1717 "-m", "comment", "--comment", proxier.ipsetList[set.name].getComment(), 1718 "-m", "set", "--match-set", proxier.ipsetList[set.name].Name, 1719 set.matchType, 1720 ) 1721 utilproxy.WriteLine(proxier.natRules, append(args, "-j", set.to)...) 1722 } 1723 } 1724 1725 if !proxier.ipsetList[kubeClusterIPSet].isEmpty() { 1726 args = append(args[:0], 1727 "-A", string(kubeServicesChain), 1728 "-m", "comment", "--comment", proxier.ipsetList[kubeClusterIPSet].getComment(), 1729 "-m", "set", "--match-set", proxier.ipsetList[kubeClusterIPSet].Name, 1730 ) 1731 if proxier.masqueradeAll { 1732 utilproxy.WriteLine(proxier.natRules, append(args, "dst,dst", "-j", string(KubeMarkMasqChain))...) 1733 } else if proxier.localDetector.IsImplemented() { 1734 // This masquerades off-cluster traffic to a service VIP. The idea 1735 // is that you can establish a static route for your Service range, 1736 // routing to any node, and that node will bridge into the Service 1737 // for you. Since that might bounce off-node, we masquerade here. 1738 // If/when we support "Local" policy for VIPs, we should update this. 1739 utilproxy.WriteLine(proxier.natRules, proxier.localDetector.JumpIfNotLocal(append(args, "dst,dst"), string(KubeMarkMasqChain))...) 1740 } else { 1741 // Masquerade all OUTPUT traffic coming from a service ip. 1742 // The kube dummy interface has all service VIPs assigned which 1743 // results in the service VIP being picked as the source IP to reach 1744 // a VIP. This leads to a connection from VIP:<random port> to 1745 // VIP:<service port>. 1746 // Always masquerading OUTPUT (node-originating) traffic with a VIP 1747 // source ip and service port destination fixes the outgoing connections. 1748 utilproxy.WriteLine(proxier.natRules, append(args, "src,dst", "-j", string(KubeMarkMasqChain))...) 1749 } 1750 } 1751 1752 // externalIPRules adds iptables rules applies to Service ExternalIPs 1753 externalIPRules := func(args []string) { 1754 // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container) 1755 // nor from a local process to be forwarded to the service. 1756 // This rule roughly translates to "all traffic from off-machine". 1757 // This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later. 1758 externalTrafficOnlyArgs := append(args, 1759 "-m", "physdev", "!", "--physdev-is-in", 1760 "-m", "addrtype", "!", "--src-type", "LOCAL") 1761 utilproxy.WriteLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", "ACCEPT")...) 1762 dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL") 1763 // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local. 1764 // This covers cases like GCE load-balancers which get added to the local routing table. 1765 utilproxy.WriteLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...) 1766 } 1767 1768 if !proxier.ipsetList[kubeExternalIPSet].isEmpty() { 1769 // Build masquerade rules for packets to external IPs. 1770 args = append(args[:0], 1771 "-A", string(kubeServicesChain), 1772 "-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPSet].getComment(), 1773 "-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPSet].Name, 1774 "dst,dst", 1775 ) 1776 utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) 1777 externalIPRules(args) 1778 } 1779 1780 if !proxier.ipsetList[kubeExternalIPLocalSet].isEmpty() { 1781 args = append(args[:0], 1782 "-A", string(kubeServicesChain), 1783 "-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPLocalSet].getComment(), 1784 "-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPLocalSet].Name, 1785 "dst,dst", 1786 ) 1787 externalIPRules(args) 1788 } 1789 1790 // -A KUBE-SERVICES -m addrtype --dst-type LOCAL -j KUBE-NODE-PORT 1791 args = append(args[:0], 1792 "-A", string(kubeServicesChain), 1793 "-m", "addrtype", "--dst-type", "LOCAL", 1794 ) 1795 utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(KubeNodePortChain))...) 1796 1797 // mark drop for KUBE-LOAD-BALANCER 1798 utilproxy.WriteLine(proxier.natRules, []string{ 1799 "-A", string(KubeLoadBalancerChain), 1800 "-j", string(KubeMarkMasqChain), 1801 }...) 1802 1803 // mark drop for KUBE-FIRE-WALL 1804 utilproxy.WriteLine(proxier.natRules, []string{ 1805 "-A", string(KubeFireWallChain), 1806 "-j", string(KubeMarkDropChain), 1807 }...) 1808 1809 // Accept all traffic with destination of ipvs virtual service, in case other iptables rules 1810 // block the traffic, that may result in ipvs rules invalid. 1811 // Those rules must be in the end of KUBE-SERVICE chain 1812 proxier.acceptIPVSTraffic() 1813 1814 // If the masqueradeMark has been added then we want to forward that same 1815 // traffic, this allows NodePort traffic to be forwarded even if the default 1816 // FORWARD policy is not accept. 1817 utilproxy.WriteLine(proxier.filterRules, 1818 "-A", string(KubeForwardChain), 1819 "-m", "comment", "--comment", `"kubernetes forwarding rules"`, 1820 "-m", "mark", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark), 1821 "-j", "ACCEPT", 1822 ) 1823 1824 // The following two rules ensure the traffic after the initial packet 1825 // accepted by the "kubernetes forwarding rules" rule above will be 1826 // accepted. 1827 utilproxy.WriteLine(proxier.filterRules, 1828 "-A", string(KubeForwardChain), 1829 "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`, 1830 "-m", "conntrack", 1831 "--ctstate", "RELATED,ESTABLISHED", 1832 "-j", "ACCEPT", 1833 ) 1834 utilproxy.WriteLine(proxier.filterRules, 1835 "-A", string(KubeForwardChain), 1836 "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`, 1837 "-m", "conntrack", 1838 "--ctstate", "RELATED,ESTABLISHED", 1839 "-j", "ACCEPT", 1840 ) 1841 1842 // Add rule to accept traffic towards health check node port 1843 utilproxy.WriteLine(proxier.filterRules, 1844 "-A", string(KubeNodePortChain), 1845 "-m", "comment", "--comment", proxier.ipsetList[kubeHealthCheckNodePortSet].getComment(), 1846 "-m", "set", "--match-set", proxier.ipsetList[kubeHealthCheckNodePortSet].Name, "dst", 1847 "-j", "ACCEPT", 1848 ) 1849 1850 // Install the kubernetes-specific postrouting rules. We use a whole chain for 1851 // this so that it is easier to flush and change, for example if the mark 1852 // value should ever change. 1853 // NB: THIS MUST MATCH the corresponding code in the kubelet 1854 utilproxy.WriteLine(proxier.natRules, []string{ 1855 "-A", string(kubePostroutingChain), 1856 "-m", "mark", "!", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark), 1857 "-j", "RETURN", 1858 }...) 1859 // Clear the mark to avoid re-masquerading if the packet re-traverses the network stack. 1860 utilproxy.WriteLine(proxier.natRules, []string{ 1861 "-A", string(kubePostroutingChain), 1862 // XOR proxier.masqueradeMark to unset it 1863 "-j", "MARK", "--xor-mark", proxier.masqueradeMark, 1864 }...) 1865 masqRule := []string{ 1866 "-A", string(kubePostroutingChain), 1867 "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`, 1868 "-j", "MASQUERADE", 1869 } 1870 if proxier.iptables.HasRandomFully() { 1871 masqRule = append(masqRule, "--random-fully") 1872 } 1873 utilproxy.WriteLine(proxier.natRules, masqRule...) 1874 1875 // Install the kubernetes-specific masquerade mark rule. We use a whole chain for 1876 // this so that it is easier to flush and change, for example if the mark 1877 // value should ever change. 1878 utilproxy.WriteLine(proxier.natRules, []string{ 1879 "-A", string(KubeMarkMasqChain), 1880 "-j", "MARK", "--or-mark", proxier.masqueradeMark, 1881 }...) 1882 1883 // Write the end-of-table markers. 1884 utilproxy.WriteLine(proxier.filterRules, "COMMIT") 1885 utilproxy.WriteLine(proxier.natRules, "COMMIT") 1886} 1887 1888func (proxier *Proxier) acceptIPVSTraffic() { 1889 sets := []string{kubeClusterIPSet, kubeLoadBalancerSet} 1890 for _, set := range sets { 1891 var matchType string 1892 if !proxier.ipsetList[set].isEmpty() { 1893 switch proxier.ipsetList[set].SetType { 1894 case utilipset.BitmapPort: 1895 matchType = "dst" 1896 default: 1897 matchType = "dst,dst" 1898 } 1899 utilproxy.WriteLine(proxier.natRules, []string{ 1900 "-A", string(kubeServicesChain), 1901 "-m", "set", "--match-set", proxier.ipsetList[set].Name, matchType, 1902 "-j", "ACCEPT", 1903 }...) 1904 } 1905 } 1906} 1907 1908// createAndLinkKubeChain create all kube chains that ipvs proxier need and write basic link. 1909func (proxier *Proxier) createAndLinkKubeChain() { 1910 existingFilterChains := proxier.getExistingChains(proxier.filterChainsData, utiliptables.TableFilter) 1911 existingNATChains := proxier.getExistingChains(proxier.iptablesData, utiliptables.TableNAT) 1912 1913 // ensure KUBE-MARK-DROP chain exist but do not change any rules 1914 for _, ch := range iptablesEnsureChains { 1915 if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil { 1916 klog.ErrorS(err, "Failed to ensure chain exists", "table", ch.table, "chain", ch.chain) 1917 return 1918 } 1919 } 1920 1921 // Make sure we keep stats for the top-level chains 1922 for _, ch := range iptablesChains { 1923 if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil { 1924 klog.ErrorS(err, "Failed to ensure chain exists", "table", ch.table, "chain", ch.chain) 1925 return 1926 } 1927 if ch.table == utiliptables.TableNAT { 1928 if chain, ok := existingNATChains[ch.chain]; ok { 1929 utilproxy.WriteBytesLine(proxier.natChains, chain) 1930 } else { 1931 utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(ch.chain)) 1932 } 1933 } else { 1934 if chain, ok := existingFilterChains[ch.chain]; ok { 1935 utilproxy.WriteBytesLine(proxier.filterChains, chain) 1936 } else { 1937 utilproxy.WriteLine(proxier.filterChains, utiliptables.MakeChainLine(ch.chain)) 1938 } 1939 } 1940 } 1941 1942 for _, jc := range iptablesJumpChain { 1943 args := []string{"-m", "comment", "--comment", jc.comment, "-j", string(jc.to)} 1944 if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jc.table, jc.from, args...); err != nil { 1945 klog.ErrorS(err, "Failed to ensure chain jumps", "table", jc.table, "srcChain", jc.from, "dstChain", jc.to) 1946 } 1947 } 1948 1949} 1950 1951// getExistingChains get iptables-save output so we can check for existing chains and rules. 1952// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore 1953// Result may SHARE memory with contents of buffer. 1954func (proxier *Proxier) getExistingChains(buffer *bytes.Buffer, table utiliptables.Table) map[utiliptables.Chain][]byte { 1955 buffer.Reset() 1956 err := proxier.iptables.SaveInto(table, buffer) 1957 if err != nil { // if we failed to get any rules 1958 klog.ErrorS(err, "Failed to execute iptables-save, syncing all rules") 1959 } else { // otherwise parse the output 1960 return utiliptables.GetChainLines(table, buffer.Bytes()) 1961 } 1962 return nil 1963} 1964 1965// After a UDP or SCTP endpoint has been removed, we must flush any pending conntrack entries to it, or else we 1966// risk sending more traffic to it, all of which will be lost (because UDP). 1967// This assumes the proxier mutex is held 1968func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { 1969 for _, epSvcPair := range connectionMap { 1970 if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { 1971 endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) 1972 svcProto := svcInfo.Protocol() 1973 err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto) 1974 if err != nil { 1975 klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName.String()) 1976 } 1977 for _, extIP := range svcInfo.ExternalIPStrings() { 1978 err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto) 1979 if err != nil { 1980 klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName.String(), "ip", extIP) 1981 } 1982 } 1983 for _, lbIP := range svcInfo.LoadBalancerIPStrings() { 1984 err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto) 1985 if err != nil { 1986 klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName.String(), "ip", lbIP) 1987 } 1988 } 1989 } 1990 } 1991} 1992 1993func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool, bindedAddresses sets.String) error { 1994 appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs) 1995 if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) { 1996 if appliedVirtualServer == nil { 1997 // IPVS service is not found, create a new service 1998 klog.V(3).InfoS("Adding new service", "svcName", svcName, "address", fmt.Sprintf("%s:%d/%s", vs.Address, vs.Port, vs.Protocol)) 1999 if err := proxier.ipvs.AddVirtualServer(vs); err != nil { 2000 klog.ErrorS(err, "Failed to add IPVS service", "svcName", svcName) 2001 return err 2002 } 2003 } else { 2004 // IPVS service was changed, update the existing one 2005 // During updates, service VIP will not go down 2006 klog.V(3).InfoS("IPVS service was changed", "svcName", svcName) 2007 if err := proxier.ipvs.UpdateVirtualServer(vs); err != nil { 2008 klog.ErrorS(err, "Failed to update IPVS service") 2009 return err 2010 } 2011 } 2012 } 2013 2014 // bind service address to dummy interface 2015 if bindAddr { 2016 // always attempt to bind if bindedAddresses is nil, 2017 // otherwise check if it's already binded and return early 2018 if bindedAddresses != nil && bindedAddresses.Has(vs.Address.String()) { 2019 return nil 2020 } 2021 2022 klog.V(4).InfoS("Bind addr", "address", vs.Address.String()) 2023 _, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice) 2024 if err != nil { 2025 klog.ErrorS(err, "Failed to bind service address to dummy device", "svcName", svcName) 2026 return err 2027 } 2028 } 2029 2030 return nil 2031} 2032 2033func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNodeLocalEndpoints bool, vs *utilipvs.VirtualServer) error { 2034 appliedVirtualServer, err := proxier.ipvs.GetVirtualServer(vs) 2035 if err != nil { 2036 klog.Errorf("Failed to get IPVS service, error: %v", err) 2037 return err 2038 } 2039 if appliedVirtualServer == nil { 2040 return errors.New("IPVS virtual service does not exist") 2041 } 2042 2043 // curEndpoints represents IPVS destinations listed from current system. 2044 curEndpoints := sets.NewString() 2045 // readyEndpoints represents Endpoints watched from API Server. 2046 readyEndpoints := sets.NewString() 2047 // localReadyEndpoints represents local endpoints that are ready and NOT terminating. 2048 localReadyEndpoints := sets.NewString() 2049 // localReadyTerminatingEndpoints represents local endpoints that are ready AND terminating. 2050 // Fall back to these endpoints if no non-terminating ready endpoints exist for node-local traffic. 2051 localReadyTerminatingEndpoints := sets.NewString() 2052 2053 curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer) 2054 if err != nil { 2055 klog.ErrorS(err, "Failed to list IPVS destinations") 2056 return err 2057 } 2058 for _, des := range curDests { 2059 curEndpoints.Insert(des.String()) 2060 } 2061 2062 endpoints := proxier.endpointsMap[svcPortName] 2063 2064 // Filtering for topology aware endpoints. This function will only 2065 // filter endpoints if appropriate feature gates are enabled and the 2066 // Service does not have conflicting configuration such as 2067 // externalTrafficPolicy=Local. 2068 svcInfo, ok := proxier.serviceMap[svcPortName] 2069 if !ok { 2070 klog.InfoS("Unable to filter endpoints due to missing Service info", "svcPortName", svcPortName) 2071 } else { 2072 endpoints = proxy.FilterEndpoints(endpoints, svcInfo, proxier.nodeLabels) 2073 } 2074 2075 for _, epInfo := range endpoints { 2076 if epInfo.IsReady() { 2077 readyEndpoints.Insert(epInfo.String()) 2078 } 2079 2080 if onlyNodeLocalEndpoints && epInfo.GetIsLocal() { 2081 if epInfo.IsReady() { 2082 localReadyEndpoints.Insert(epInfo.String()) 2083 } else if epInfo.IsServing() && epInfo.IsTerminating() { 2084 localReadyTerminatingEndpoints.Insert(epInfo.String()) 2085 } 2086 } 2087 } 2088 2089 newEndpoints := readyEndpoints 2090 if onlyNodeLocalEndpoints { 2091 newEndpoints = localReadyEndpoints 2092 2093 if utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) { 2094 if len(newEndpoints) == 0 && localReadyTerminatingEndpoints.Len() > 0 { 2095 newEndpoints = localReadyTerminatingEndpoints 2096 } 2097 } 2098 } 2099 2100 // Create new endpoints 2101 for _, ep := range newEndpoints.List() { 2102 ip, port, err := net.SplitHostPort(ep) 2103 if err != nil { 2104 klog.ErrorS(err, "Failed to parse endpoint", "endpoint", ep) 2105 continue 2106 } 2107 portNum, err := strconv.Atoi(port) 2108 if err != nil { 2109 klog.ErrorS(err, "Failed to parse endpoint port", "port", port) 2110 continue 2111 } 2112 2113 newDest := &utilipvs.RealServer{ 2114 Address: net.ParseIP(ip), 2115 Port: uint16(portNum), 2116 Weight: 1, 2117 } 2118 2119 if curEndpoints.Has(ep) { 2120 // check if newEndpoint is in gracefulDelete list, if true, delete this ep immediately 2121 uniqueRS := GetUniqueRSName(vs, newDest) 2122 if !proxier.gracefuldeleteManager.InTerminationList(uniqueRS) { 2123 continue 2124 } 2125 klog.V(5).InfoS("new ep is in graceful delete list", "uniqueRS", uniqueRS) 2126 err := proxier.gracefuldeleteManager.MoveRSOutofGracefulDeleteList(uniqueRS) 2127 if err != nil { 2128 klog.ErrorS(err, "Failed to delete endpoint in gracefulDeleteQueue", "endpoint", ep) 2129 continue 2130 } 2131 } 2132 err = proxier.ipvs.AddRealServer(appliedVirtualServer, newDest) 2133 if err != nil { 2134 klog.ErrorS(err, "Failed to add destination", "newDest", newDest) 2135 continue 2136 } 2137 } 2138 // Delete old endpoints 2139 for _, ep := range curEndpoints.Difference(newEndpoints).UnsortedList() { 2140 // if curEndpoint is in gracefulDelete, skip 2141 uniqueRS := vs.String() + "/" + ep 2142 if proxier.gracefuldeleteManager.InTerminationList(uniqueRS) { 2143 continue 2144 } 2145 ip, port, err := net.SplitHostPort(ep) 2146 if err != nil { 2147 klog.ErrorS(err, "Failed to parse endpoint", "endpoint", ep) 2148 continue 2149 } 2150 portNum, err := strconv.Atoi(port) 2151 if err != nil { 2152 klog.ErrorS(err, "Failed to parse endpoint port", "port", port) 2153 continue 2154 } 2155 2156 delDest := &utilipvs.RealServer{ 2157 Address: net.ParseIP(ip), 2158 Port: uint16(portNum), 2159 } 2160 2161 klog.V(5).InfoS("Using graceful delete", "uniqueRS", uniqueRS) 2162 err = proxier.gracefuldeleteManager.GracefulDeleteRS(appliedVirtualServer, delDest) 2163 if err != nil { 2164 klog.ErrorS(err, "Failed to delete destination", "uniqueRS", uniqueRS) 2165 continue 2166 } 2167 } 2168 return nil 2169} 2170 2171func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer, legacyBindAddrs map[string]bool) { 2172 isIPv6 := utilnet.IsIPv6(proxier.nodeIP) 2173 for cs := range currentServices { 2174 svc := currentServices[cs] 2175 if proxier.isIPInExcludeCIDRs(svc.Address) { 2176 continue 2177 } 2178 if utilnet.IsIPv6(svc.Address) != isIPv6 { 2179 // Not our family 2180 continue 2181 } 2182 if _, ok := activeServices[cs]; !ok { 2183 klog.V(4).InfoS("Delete service", "service", svc.String()) 2184 if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil { 2185 klog.ErrorS(err, "Failed to delete service", "service", svc.String()) 2186 } 2187 addr := svc.Address.String() 2188 if _, ok := legacyBindAddrs[addr]; ok { 2189 klog.V(4).InfoS("Unbinding address", "address", addr) 2190 if err := proxier.netlinkHandle.UnbindAddress(addr, DefaultDummyDevice); err != nil { 2191 klog.ErrorS(err, "Failed to unbind service from dummy interface", "interface", DefaultDummyDevice, "address", addr) 2192 } else { 2193 // In case we delete a multi-port service, avoid trying to unbind multiple times 2194 delete(legacyBindAddrs, addr) 2195 } 2196 } 2197 } 2198 } 2199} 2200 2201func (proxier *Proxier) isIPInExcludeCIDRs(ip net.IP) bool { 2202 // make sure it does not fall within an excluded CIDR range. 2203 for _, excludedCIDR := range proxier.excludeCIDRs { 2204 if excludedCIDR.Contains(ip) { 2205 return true 2206 } 2207 } 2208 return false 2209} 2210 2211func (proxier *Proxier) getLegacyBindAddr(activeBindAddrs map[string]bool, currentBindAddrs []string) map[string]bool { 2212 legacyAddrs := make(map[string]bool) 2213 isIPv6 := utilnet.IsIPv6(proxier.nodeIP) 2214 for _, addr := range currentBindAddrs { 2215 addrIsIPv6 := utilnet.IsIPv6(net.ParseIP(addr)) 2216 if addrIsIPv6 && !isIPv6 || !addrIsIPv6 && isIPv6 { 2217 continue 2218 } 2219 if _, ok := activeBindAddrs[addr]; !ok { 2220 legacyAddrs[addr] = true 2221 } 2222 } 2223 return legacyAddrs 2224} 2225 2226// ipvs Proxier fall back on iptables when it needs to do SNAT for engress packets 2227// It will only operate iptables *nat table. 2228// Create and link the kube postrouting chain for SNAT packets. 2229// Chain POSTROUTING (policy ACCEPT) 2230// target prot opt source destination 2231// KUBE-POSTROUTING all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes postrouting rules * 2232// Maintain by kubelet network sync loop 2233 2234// *nat 2235// :KUBE-POSTROUTING - [0:0] 2236// Chain KUBE-POSTROUTING (1 references) 2237// target prot opt source destination 2238// MASQUERADE all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000 2239 2240// :KUBE-MARK-MASQ - [0:0] 2241// Chain KUBE-MARK-MASQ (0 references) 2242// target prot opt source destination 2243// MARK all -- 0.0.0.0/0 0.0.0.0/0 MARK or 0x4000 2244