1/*
2Copyright 2017 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package ipvs
18
19import (
20	"bytes"
21	"errors"
22	"fmt"
23	"io"
24	"io/ioutil"
25	"net"
26	"os"
27	"reflect"
28	"regexp"
29	"strconv"
30	"strings"
31	"sync"
32	"sync/atomic"
33	"time"
34
35	"k8s.io/klog/v2"
36	utilexec "k8s.io/utils/exec"
37	utilnet "k8s.io/utils/net"
38
39	v1 "k8s.io/api/core/v1"
40	discovery "k8s.io/api/discovery/v1"
41	"k8s.io/apimachinery/pkg/types"
42	"k8s.io/apimachinery/pkg/util/sets"
43	"k8s.io/apimachinery/pkg/util/version"
44	"k8s.io/apimachinery/pkg/util/wait"
45	utilfeature "k8s.io/apiserver/pkg/util/feature"
46	"k8s.io/client-go/tools/events"
47	"k8s.io/kubernetes/pkg/features"
48	"k8s.io/kubernetes/pkg/proxy"
49	"k8s.io/kubernetes/pkg/proxy/healthcheck"
50	"k8s.io/kubernetes/pkg/proxy/metaproxier"
51	"k8s.io/kubernetes/pkg/proxy/metrics"
52	utilproxy "k8s.io/kubernetes/pkg/proxy/util"
53	proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
54	"k8s.io/kubernetes/pkg/util/async"
55	"k8s.io/kubernetes/pkg/util/conntrack"
56	utilipset "k8s.io/kubernetes/pkg/util/ipset"
57	utiliptables "k8s.io/kubernetes/pkg/util/iptables"
58	utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
59	utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
60)
61
62const (
63	// kubeServicesChain is the services portal chain
64	kubeServicesChain utiliptables.Chain = "KUBE-SERVICES"
65
66	// KubeFireWallChain is the kubernetes firewall chain.
67	KubeFireWallChain utiliptables.Chain = "KUBE-FIREWALL"
68
69	// kubePostroutingChain is the kubernetes postrouting chain
70	kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"
71
72	// KubeMarkMasqChain is the mark-for-masquerade chain
73	KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
74
75	// KubeNodePortChain is the kubernetes node port chain
76	KubeNodePortChain utiliptables.Chain = "KUBE-NODE-PORT"
77
78	// KubeMarkDropChain is the mark-for-drop chain
79	KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP"
80
81	// KubeForwardChain is the kubernetes forward chain
82	KubeForwardChain utiliptables.Chain = "KUBE-FORWARD"
83
84	// KubeLoadBalancerChain is the kubernetes chain for loadbalancer type service
85	KubeLoadBalancerChain utiliptables.Chain = "KUBE-LOAD-BALANCER"
86
87	// DefaultScheduler is the default ipvs scheduler algorithm - round robin.
88	DefaultScheduler = "rr"
89
90	// DefaultDummyDevice is the default dummy interface which ipvs service address will bind to it.
91	DefaultDummyDevice = "kube-ipvs0"
92
93	connReuseMinSupportedKernelVersion = "4.1"
94
95	// https://github.com/torvalds/linux/commit/35dfb013149f74c2be1ff9c78f14e6a3cd1539d1
96	connReuseFixedKernelVersion = "5.9"
97)
98
99// iptablesJumpChain is tables of iptables chains that ipvs proxier used to install iptables or cleanup iptables.
100// `to` is the iptables chain we want to operate.
101// `from` is the source iptables chain
102var iptablesJumpChain = []struct {
103	table   utiliptables.Table
104	from    utiliptables.Chain
105	to      utiliptables.Chain
106	comment string
107}{
108	{utiliptables.TableNAT, utiliptables.ChainOutput, kubeServicesChain, "kubernetes service portals"},
109	{utiliptables.TableNAT, utiliptables.ChainPrerouting, kubeServicesChain, "kubernetes service portals"},
110	{utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, "kubernetes postrouting rules"},
111	{utiliptables.TableFilter, utiliptables.ChainForward, KubeForwardChain, "kubernetes forwarding rules"},
112	{utiliptables.TableFilter, utiliptables.ChainInput, KubeNodePortChain, "kubernetes health check rules"},
113}
114
115var iptablesChains = []struct {
116	table utiliptables.Table
117	chain utiliptables.Chain
118}{
119	{utiliptables.TableNAT, kubeServicesChain},
120	{utiliptables.TableNAT, kubePostroutingChain},
121	{utiliptables.TableNAT, KubeFireWallChain},
122	{utiliptables.TableNAT, KubeNodePortChain},
123	{utiliptables.TableNAT, KubeLoadBalancerChain},
124	{utiliptables.TableNAT, KubeMarkMasqChain},
125	{utiliptables.TableFilter, KubeForwardChain},
126	{utiliptables.TableFilter, KubeNodePortChain},
127}
128
129var iptablesEnsureChains = []struct {
130	table utiliptables.Table
131	chain utiliptables.Chain
132}{
133	{utiliptables.TableNAT, KubeMarkDropChain},
134}
135
136var iptablesCleanupChains = []struct {
137	table utiliptables.Table
138	chain utiliptables.Chain
139}{
140	{utiliptables.TableNAT, kubeServicesChain},
141	{utiliptables.TableNAT, kubePostroutingChain},
142	{utiliptables.TableNAT, KubeFireWallChain},
143	{utiliptables.TableNAT, KubeNodePortChain},
144	{utiliptables.TableNAT, KubeLoadBalancerChain},
145	{utiliptables.TableFilter, KubeForwardChain},
146}
147
148// ipsetInfo is all ipset we needed in ipvs proxier
149var ipsetInfo = []struct {
150	name    string
151	setType utilipset.Type
152	comment string
153}{
154	{kubeLoopBackIPSet, utilipset.HashIPPortIP, kubeLoopBackIPSetComment},
155	{kubeClusterIPSet, utilipset.HashIPPort, kubeClusterIPSetComment},
156	{kubeExternalIPSet, utilipset.HashIPPort, kubeExternalIPSetComment},
157	{kubeExternalIPLocalSet, utilipset.HashIPPort, kubeExternalIPLocalSetComment},
158	{kubeLoadBalancerSet, utilipset.HashIPPort, kubeLoadBalancerSetComment},
159	{kubeLoadbalancerFWSet, utilipset.HashIPPort, kubeLoadbalancerFWSetComment},
160	{kubeLoadBalancerLocalSet, utilipset.HashIPPort, kubeLoadBalancerLocalSetComment},
161	{kubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, kubeLoadBalancerSourceIPSetComment},
162	{kubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, kubeLoadBalancerSourceCIDRSetComment},
163	{kubeNodePortSetTCP, utilipset.BitmapPort, kubeNodePortSetTCPComment},
164	{kubeNodePortLocalSetTCP, utilipset.BitmapPort, kubeNodePortLocalSetTCPComment},
165	{kubeNodePortSetUDP, utilipset.BitmapPort, kubeNodePortSetUDPComment},
166	{kubeNodePortLocalSetUDP, utilipset.BitmapPort, kubeNodePortLocalSetUDPComment},
167	{kubeNodePortSetSCTP, utilipset.HashIPPort, kubeNodePortSetSCTPComment},
168	{kubeNodePortLocalSetSCTP, utilipset.HashIPPort, kubeNodePortLocalSetSCTPComment},
169	{kubeHealthCheckNodePortSet, utilipset.BitmapPort, kubeHealthCheckNodePortSetComment},
170}
171
172// ipsetWithIptablesChain is the ipsets list with iptables source chain and the chain jump to
173// `iptables -t nat -A <from> -m set --match-set <name> <matchType> -j <to>`
174// example: iptables -t nat -A KUBE-SERVICES -m set --match-set KUBE-NODE-PORT-TCP dst -j KUBE-NODE-PORT
175// ipsets with other match rules will be created Individually.
176// Note: kubeNodePortLocalSetTCP must be prior to kubeNodePortSetTCP, the same for UDP.
177var ipsetWithIptablesChain = []struct {
178	name          string
179	from          string
180	to            string
181	matchType     string
182	protocolMatch string
183}{
184	{kubeLoopBackIPSet, string(kubePostroutingChain), "MASQUERADE", "dst,dst,src", ""},
185	{kubeLoadBalancerSet, string(kubeServicesChain), string(KubeLoadBalancerChain), "dst,dst", ""},
186	{kubeLoadbalancerFWSet, string(KubeLoadBalancerChain), string(KubeFireWallChain), "dst,dst", ""},
187	{kubeLoadBalancerSourceCIDRSet, string(KubeFireWallChain), "RETURN", "dst,dst,src", ""},
188	{kubeLoadBalancerSourceIPSet, string(KubeFireWallChain), "RETURN", "dst,dst,src", ""},
189	{kubeLoadBalancerLocalSet, string(KubeLoadBalancerChain), "RETURN", "dst,dst", ""},
190	{kubeNodePortLocalSetTCP, string(KubeNodePortChain), "RETURN", "dst", utilipset.ProtocolTCP},
191	{kubeNodePortSetTCP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", utilipset.ProtocolTCP},
192	{kubeNodePortLocalSetUDP, string(KubeNodePortChain), "RETURN", "dst", utilipset.ProtocolUDP},
193	{kubeNodePortSetUDP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", utilipset.ProtocolUDP},
194	{kubeNodePortSetSCTP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst,dst", utilipset.ProtocolSCTP},
195	{kubeNodePortLocalSetSCTP, string(KubeNodePortChain), "RETURN", "dst,dst", utilipset.ProtocolSCTP},
196}
197
198// In IPVS proxy mode, the following flags need to be set
199const (
200	sysctlBridgeCallIPTables      = "net/bridge/bridge-nf-call-iptables"
201	sysctlVSConnTrack             = "net/ipv4/vs/conntrack"
202	sysctlConnReuse               = "net/ipv4/vs/conn_reuse_mode"
203	sysctlExpireNoDestConn        = "net/ipv4/vs/expire_nodest_conn"
204	sysctlExpireQuiescentTemplate = "net/ipv4/vs/expire_quiescent_template"
205	sysctlForward                 = "net/ipv4/ip_forward"
206	sysctlArpIgnore               = "net/ipv4/conf/all/arp_ignore"
207	sysctlArpAnnounce             = "net/ipv4/conf/all/arp_announce"
208)
209
210// Proxier is an ipvs based proxy for connections between a localhost:lport
211// and services that provide the actual backends.
212type Proxier struct {
213	// the ipfamily on which this proxy is operating on.
214	ipFamily v1.IPFamily
215	// endpointsChanges and serviceChanges contains all changes to endpoints and
216	// services that happened since last syncProxyRules call. For a single object,
217	// changes are accumulated, i.e. previous is state from before all of them,
218	// current is state after applying all of those.
219	endpointsChanges *proxy.EndpointChangeTracker
220	serviceChanges   *proxy.ServiceChangeTracker
221
222	mu           sync.Mutex // protects the following fields
223	serviceMap   proxy.ServiceMap
224	endpointsMap proxy.EndpointsMap
225	portsMap     map[utilnet.LocalPort]utilnet.Closeable
226	nodeLabels   map[string]string
227	// endpointSlicesSynced, and servicesSynced are set to true when
228	// corresponding objects are synced after startup. This is used to avoid updating
229	// ipvs rules with some partial data after kube-proxy restart.
230	endpointSlicesSynced bool
231	servicesSynced       bool
232	initialized          int32
233	syncRunner           *async.BoundedFrequencyRunner // governs calls to syncProxyRules
234
235	// These are effectively const and do not need the mutex to be held.
236	syncPeriod    time.Duration
237	minSyncPeriod time.Duration
238	// Values are CIDR's to exclude when cleaning up IPVS rules.
239	excludeCIDRs []*net.IPNet
240	// Set to true to set sysctls arp_ignore and arp_announce
241	strictARP      bool
242	iptables       utiliptables.Interface
243	ipvs           utilipvs.Interface
244	ipset          utilipset.Interface
245	exec           utilexec.Interface
246	masqueradeAll  bool
247	masqueradeMark string
248	localDetector  proxyutiliptables.LocalTrafficDetector
249	hostname       string
250	nodeIP         net.IP
251	portMapper     utilnet.PortOpener
252	recorder       events.EventRecorder
253
254	serviceHealthServer healthcheck.ServiceHealthServer
255	healthzServer       healthcheck.ProxierHealthUpdater
256
257	ipvsScheduler string
258	// Added as a member to the struct to allow injection for testing.
259	ipGetter IPGetter
260	// The following buffers are used to reuse memory and avoid allocations
261	// that are significantly impacting performance.
262	iptablesData     *bytes.Buffer
263	filterChainsData *bytes.Buffer
264	natChains        *bytes.Buffer
265	filterChains     *bytes.Buffer
266	natRules         *bytes.Buffer
267	filterRules      *bytes.Buffer
268	// Added as a member to the struct to allow injection for testing.
269	netlinkHandle NetLinkHandle
270	// ipsetList is the list of ipsets that ipvs proxier used.
271	ipsetList map[string]*IPSet
272	// Values are as a parameter to select the interfaces which nodeport works.
273	nodePortAddresses []string
274	// networkInterfacer defines an interface for several net library functions.
275	// Inject for test purpose.
276	networkInterfacer     utilproxy.NetworkInterfacer
277	gracefuldeleteManager *GracefulTerminationManager
278}
279
280// IPGetter helps get node network interface IP and IPs binded to the IPVS dummy interface
281type IPGetter interface {
282	NodeIPs() ([]net.IP, error)
283	BindedIPs() (sets.String, error)
284}
285
286// realIPGetter is a real NodeIP handler, it implements IPGetter.
287type realIPGetter struct {
288	// nl is a handle for revoking netlink interface
289	nl NetLinkHandle
290}
291
292// NodeIPs returns all LOCAL type IP addresses from host which are taken as the Node IPs of NodePort service.
293// It will list source IP exists in local route table with `kernel` protocol type, and filter out IPVS proxier
294// created dummy device `kube-ipvs0` For example,
295// $ ip route show table local type local proto kernel
296// 10.0.0.1 dev kube-ipvs0  scope host  src 10.0.0.1
297// 10.0.0.10 dev kube-ipvs0  scope host  src 10.0.0.10
298// 10.0.0.252 dev kube-ipvs0  scope host  src 10.0.0.252
299// 100.106.89.164 dev eth0  scope host  src 100.106.89.164
300// 127.0.0.0/8 dev lo  scope host  src 127.0.0.1
301// 127.0.0.1 dev lo  scope host  src 127.0.0.1
302// 172.17.0.1 dev docker0  scope host  src 172.17.0.1
303// 192.168.122.1 dev virbr0  scope host  src 192.168.122.1
304// Then filter out dev==kube-ipvs0, and cut the unique src IP fields,
305// Node IP set: [100.106.89.164, 172.17.0.1, 192.168.122.1]
306// Note that loopback addresses are excluded.
307func (r *realIPGetter) NodeIPs() (ips []net.IP, err error) {
308	// Pass in empty filter device name for list all LOCAL type addresses.
309	nodeAddress, err := r.nl.GetLocalAddresses("", DefaultDummyDevice)
310	if err != nil {
311		return nil, fmt.Errorf("error listing LOCAL type addresses from host, error: %v", err)
312	}
313	// translate ip string to IP
314	for _, ipStr := range nodeAddress.UnsortedList() {
315		a := net.ParseIP(ipStr)
316		if a.IsLoopback() {
317			continue
318		}
319		ips = append(ips, a)
320	}
321	return ips, nil
322}
323
324// BindedIPs returns all addresses that are binded to the IPVS dummy interface kube-ipvs0
325func (r *realIPGetter) BindedIPs() (sets.String, error) {
326	return r.nl.GetLocalAddresses(DefaultDummyDevice, "")
327}
328
329// Proxier implements proxy.Provider
330var _ proxy.Provider = &Proxier{}
331
332// NewProxier returns a new Proxier given an iptables and ipvs Interface instance.
333// Because of the iptables and ipvs logic, it is assumed that there is only a single Proxier active on a machine.
334// An error will be returned if it fails to update or acquire the initial lock.
335// Once a proxier is created, it will keep iptables and ipvs rules up to date in the background and
336// will not terminate if a particular iptables or ipvs call fails.
337func NewProxier(ipt utiliptables.Interface,
338	ipvs utilipvs.Interface,
339	ipset utilipset.Interface,
340	sysctl utilsysctl.Interface,
341	exec utilexec.Interface,
342	syncPeriod time.Duration,
343	minSyncPeriod time.Duration,
344	excludeCIDRs []string,
345	strictARP bool,
346	tcpTimeout time.Duration,
347	tcpFinTimeout time.Duration,
348	udpTimeout time.Duration,
349	masqueradeAll bool,
350	masqueradeBit int,
351	localDetector proxyutiliptables.LocalTrafficDetector,
352	hostname string,
353	nodeIP net.IP,
354	recorder events.EventRecorder,
355	healthzServer healthcheck.ProxierHealthUpdater,
356	scheduler string,
357	nodePortAddresses []string,
358	kernelHandler KernelHandler,
359) (*Proxier, error) {
360	// Proxy needs br_netfilter and bridge-nf-call-iptables=1 when containers
361	// are connected to a Linux bridge (but not SDN bridges).  Until most
362	// plugins handle this, log when config is missing
363	if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
364		klog.InfoS("Missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended")
365	}
366
367	// Set the conntrack sysctl we need for
368	if err := utilproxy.EnsureSysctl(sysctl, sysctlVSConnTrack, 1); err != nil {
369		return nil, err
370	}
371
372	kernelVersionStr, err := kernelHandler.GetKernelVersion()
373	if err != nil {
374		return nil, fmt.Errorf("error determining kernel version to find required kernel modules for ipvs support: %v", err)
375	}
376	kernelVersion, err := version.ParseGeneric(kernelVersionStr)
377	if err != nil {
378		return nil, fmt.Errorf("error parsing kernel version %q: %v", kernelVersionStr, err)
379	}
380	if kernelVersion.LessThan(version.MustParseGeneric(connReuseMinSupportedKernelVersion)) {
381		klog.ErrorS(nil, fmt.Sprintf("can't set sysctl %s, kernel version must be at least %s", sysctlConnReuse, connReuseMinSupportedKernelVersion))
382	} else if kernelVersion.AtLeast(version.MustParseGeneric(connReuseFixedKernelVersion)) {
383		// https://github.com/kubernetes/kubernetes/issues/93297
384		klog.V(2).InfoS("Left as-is", "sysctl", sysctlConnReuse)
385	} else {
386		// Set the connection reuse mode
387		if err := utilproxy.EnsureSysctl(sysctl, sysctlConnReuse, 0); err != nil {
388			return nil, err
389		}
390	}
391
392	// Set the expire_nodest_conn sysctl we need for
393	if err := utilproxy.EnsureSysctl(sysctl, sysctlExpireNoDestConn, 1); err != nil {
394		return nil, err
395	}
396
397	// Set the expire_quiescent_template sysctl we need for
398	if err := utilproxy.EnsureSysctl(sysctl, sysctlExpireQuiescentTemplate, 1); err != nil {
399		return nil, err
400	}
401
402	// Set the ip_forward sysctl we need for
403	if err := utilproxy.EnsureSysctl(sysctl, sysctlForward, 1); err != nil {
404		return nil, err
405	}
406
407	if strictARP {
408		// Set the arp_ignore sysctl we need for
409		if err := utilproxy.EnsureSysctl(sysctl, sysctlArpIgnore, 1); err != nil {
410			return nil, err
411		}
412
413		// Set the arp_announce sysctl we need for
414		if err := utilproxy.EnsureSysctl(sysctl, sysctlArpAnnounce, 2); err != nil {
415			return nil, err
416		}
417	}
418
419	// Configure IPVS timeouts if any one of the timeout parameters have been set.
420	// This is the equivalent to running ipvsadm --set, a value of 0 indicates the
421	// current system timeout should be preserved
422	if tcpTimeout > 0 || tcpFinTimeout > 0 || udpTimeout > 0 {
423		if err := ipvs.ConfigureTimeouts(tcpTimeout, tcpFinTimeout, udpTimeout); err != nil {
424			klog.ErrorS(err, "failed to configure IPVS timeouts")
425		}
426	}
427
428	// Generate the masquerade mark to use for SNAT rules.
429	masqueradeValue := 1 << uint(masqueradeBit)
430	masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
431
432	ipFamily := v1.IPv4Protocol
433	if ipt.IsIPv6() {
434		ipFamily = v1.IPv6Protocol
435	}
436
437	klog.V(2).InfoS("record nodeIP and family", "nodeIP", nodeIP, "family", ipFamily)
438
439	if len(scheduler) == 0 {
440		klog.InfoS("IPVS scheduler not specified, use rr by default")
441		scheduler = DefaultScheduler
442	}
443
444	serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)
445
446	ipFamilyMap := utilproxy.MapCIDRsByIPFamily(nodePortAddresses)
447	nodePortAddresses = ipFamilyMap[ipFamily]
448	// Log the IPs not matching the ipFamily
449	if ips, ok := ipFamilyMap[utilproxy.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 {
450		klog.InfoS("found node IPs of the wrong family", "ipFamily", ipFamily, "ips", strings.Join(ips, ","))
451	}
452
453	// excludeCIDRs has been validated before, here we just parse it to IPNet list
454	parsedExcludeCIDRs, _ := utilnet.ParseCIDRs(excludeCIDRs)
455
456	proxier := &Proxier{
457		ipFamily:              ipFamily,
458		portsMap:              make(map[utilnet.LocalPort]utilnet.Closeable),
459		serviceMap:            make(proxy.ServiceMap),
460		serviceChanges:        proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
461		endpointsMap:          make(proxy.EndpointsMap),
462		endpointsChanges:      proxy.NewEndpointChangeTracker(hostname, nil, ipFamily, recorder, nil),
463		syncPeriod:            syncPeriod,
464		minSyncPeriod:         minSyncPeriod,
465		excludeCIDRs:          parsedExcludeCIDRs,
466		iptables:              ipt,
467		masqueradeAll:         masqueradeAll,
468		masqueradeMark:        masqueradeMark,
469		exec:                  exec,
470		localDetector:         localDetector,
471		hostname:              hostname,
472		nodeIP:                nodeIP,
473		portMapper:            &utilnet.ListenPortOpener,
474		recorder:              recorder,
475		serviceHealthServer:   serviceHealthServer,
476		healthzServer:         healthzServer,
477		ipvs:                  ipvs,
478		ipvsScheduler:         scheduler,
479		ipGetter:              &realIPGetter{nl: NewNetLinkHandle(ipFamily == v1.IPv6Protocol)},
480		iptablesData:          bytes.NewBuffer(nil),
481		filterChainsData:      bytes.NewBuffer(nil),
482		natChains:             bytes.NewBuffer(nil),
483		natRules:              bytes.NewBuffer(nil),
484		filterChains:          bytes.NewBuffer(nil),
485		filterRules:           bytes.NewBuffer(nil),
486		netlinkHandle:         NewNetLinkHandle(ipFamily == v1.IPv6Protocol),
487		ipset:                 ipset,
488		nodePortAddresses:     nodePortAddresses,
489		networkInterfacer:     utilproxy.RealNetwork{},
490		gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
491	}
492	// initialize ipsetList with all sets we needed
493	proxier.ipsetList = make(map[string]*IPSet)
494	for _, is := range ipsetInfo {
495		proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, (ipFamily == v1.IPv6Protocol), is.comment)
496	}
497	burstSyncs := 2
498	klog.V(2).InfoS("ipvs sync params", "ipFamily", ipt.Protocol(), "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
499	proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
500	proxier.gracefuldeleteManager.Run()
501	return proxier, nil
502}
503
504// NewDualStackProxier returns a new Proxier for dual-stack operation
505func NewDualStackProxier(
506	ipt [2]utiliptables.Interface,
507	ipvs utilipvs.Interface,
508	ipset utilipset.Interface,
509	sysctl utilsysctl.Interface,
510	exec utilexec.Interface,
511	syncPeriod time.Duration,
512	minSyncPeriod time.Duration,
513	excludeCIDRs []string,
514	strictARP bool,
515	tcpTimeout time.Duration,
516	tcpFinTimeout time.Duration,
517	udpTimeout time.Duration,
518	masqueradeAll bool,
519	masqueradeBit int,
520	localDetectors [2]proxyutiliptables.LocalTrafficDetector,
521	hostname string,
522	nodeIP [2]net.IP,
523	recorder events.EventRecorder,
524	healthzServer healthcheck.ProxierHealthUpdater,
525	scheduler string,
526	nodePortAddresses []string,
527	kernelHandler KernelHandler,
528) (proxy.Provider, error) {
529
530	safeIpset := newSafeIpset(ipset)
531
532	ipFamilyMap := utilproxy.MapCIDRsByIPFamily(nodePortAddresses)
533
534	// Create an ipv4 instance of the single-stack proxier
535	ipv4Proxier, err := NewProxier(ipt[0], ipvs, safeIpset, sysctl,
536		exec, syncPeriod, minSyncPeriod, filterCIDRs(false, excludeCIDRs), strictARP,
537		tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit,
538		localDetectors[0], hostname, nodeIP[0],
539		recorder, healthzServer, scheduler, ipFamilyMap[v1.IPv4Protocol], kernelHandler)
540	if err != nil {
541		return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
542	}
543
544	ipv6Proxier, err := NewProxier(ipt[1], ipvs, safeIpset, sysctl,
545		exec, syncPeriod, minSyncPeriod, filterCIDRs(true, excludeCIDRs), strictARP,
546		tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit,
547		localDetectors[1], hostname, nodeIP[1],
548		nil, nil, scheduler, ipFamilyMap[v1.IPv6Protocol], kernelHandler)
549	if err != nil {
550		return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err)
551	}
552
553	// Return a meta-proxier that dispatch calls between the two
554	// single-stack proxier instances
555	return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
556}
557
558func filterCIDRs(wantIPv6 bool, cidrs []string) []string {
559	var filteredCIDRs []string
560	for _, cidr := range cidrs {
561		if utilnet.IsIPv6CIDRString(cidr) == wantIPv6 {
562			filteredCIDRs = append(filteredCIDRs, cidr)
563		}
564	}
565	return filteredCIDRs
566}
567
568// internal struct for string service information
569type serviceInfo struct {
570	*proxy.BaseServiceInfo
571	// The following fields are computed and stored for performance reasons.
572	serviceNameString string
573}
574
575// returns a new proxy.ServicePort which abstracts a serviceInfo
576func newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort {
577	info := &serviceInfo{BaseServiceInfo: baseInfo}
578
579	// Store the following for performance reasons.
580	svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
581	svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name}
582	info.serviceNameString = svcPortName.String()
583
584	return info
585}
586
587// KernelHandler can handle the current installed kernel modules.
588type KernelHandler interface {
589	GetModules() ([]string, error)
590	GetKernelVersion() (string, error)
591}
592
593// LinuxKernelHandler implements KernelHandler interface.
594type LinuxKernelHandler struct {
595	executor utilexec.Interface
596}
597
598// NewLinuxKernelHandler initializes LinuxKernelHandler with exec.
599func NewLinuxKernelHandler() *LinuxKernelHandler {
600	return &LinuxKernelHandler{
601		executor: utilexec.New(),
602	}
603}
604
605// GetModules returns all installed kernel modules.
606func (handle *LinuxKernelHandler) GetModules() ([]string, error) {
607	// Check whether IPVS required kernel modules are built-in
608	kernelVersionStr, err := handle.GetKernelVersion()
609	if err != nil {
610		return nil, err
611	}
612	kernelVersion, err := version.ParseGeneric(kernelVersionStr)
613	if err != nil {
614		return nil, fmt.Errorf("error parsing kernel version %q: %v", kernelVersionStr, err)
615	}
616	ipvsModules := utilipvs.GetRequiredIPVSModules(kernelVersion)
617
618	var bmods, lmods []string
619
620	// Find out loaded kernel modules. If this is a full static kernel it will try to verify if the module is compiled using /boot/config-KERNELVERSION
621	modulesFile, err := os.Open("/proc/modules")
622	if err == os.ErrNotExist {
623		klog.ErrorS(err, "Failed to read file /proc/modules. Assuming this is a kernel without loadable modules support enabled")
624		kernelConfigFile := fmt.Sprintf("/boot/config-%s", kernelVersionStr)
625		kConfig, err := ioutil.ReadFile(kernelConfigFile)
626		if err != nil {
627			return nil, fmt.Errorf("Failed to read Kernel Config file %s with error %v", kernelConfigFile, err)
628		}
629		for _, module := range ipvsModules {
630			if match, _ := regexp.Match("CONFIG_"+strings.ToUpper(module)+"=y", kConfig); match {
631				bmods = append(bmods, module)
632			}
633		}
634		return bmods, nil
635	}
636	if err != nil {
637		return nil, fmt.Errorf("Failed to read file /proc/modules with error %v", err)
638	}
639	defer modulesFile.Close()
640
641	mods, err := getFirstColumn(modulesFile)
642	if err != nil {
643		return nil, fmt.Errorf("failed to find loaded kernel modules: %v", err)
644	}
645
646	builtinModsFilePath := fmt.Sprintf("/lib/modules/%s/modules.builtin", kernelVersionStr)
647	b, err := ioutil.ReadFile(builtinModsFilePath)
648	if err != nil {
649		klog.ErrorS(err, "Failed to read builtin modules file. You can ignore this message when kube-proxy is running inside container without mounting /lib/modules", "filePath", builtinModsFilePath)
650	}
651
652	for _, module := range ipvsModules {
653		if match, _ := regexp.Match(module+".ko", b); match {
654			bmods = append(bmods, module)
655		} else {
656			// Try to load the required IPVS kernel modules if not built in
657			err := handle.executor.Command("modprobe", "--", module).Run()
658			if err != nil {
659				klog.InfoS("Failed to load kernel module with modprobe. "+
660					"You can ignore this message when kube-proxy is running inside container without mounting /lib/modules", "moduleName", module)
661			} else {
662				lmods = append(lmods, module)
663			}
664		}
665	}
666
667	mods = append(mods, bmods...)
668	mods = append(mods, lmods...)
669	return mods, nil
670}
671
672// getFirstColumn reads all the content from r into memory and return a
673// slice which consists of the first word from each line.
674func getFirstColumn(r io.Reader) ([]string, error) {
675	b, err := ioutil.ReadAll(r)
676	if err != nil {
677		return nil, err
678	}
679
680	lines := strings.Split(string(b), "\n")
681	words := make([]string, 0, len(lines))
682	for i := range lines {
683		fields := strings.Fields(lines[i])
684		if len(fields) > 0 {
685			words = append(words, fields[0])
686		}
687	}
688	return words, nil
689}
690
691// GetKernelVersion returns currently running kernel version.
692func (handle *LinuxKernelHandler) GetKernelVersion() (string, error) {
693	kernelVersionFile := "/proc/sys/kernel/osrelease"
694	fileContent, err := ioutil.ReadFile(kernelVersionFile)
695	if err != nil {
696		return "", fmt.Errorf("error reading osrelease file %q: %v", kernelVersionFile, err)
697	}
698
699	return strings.TrimSpace(string(fileContent)), nil
700}
701
702// CanUseIPVSProxier returns true if we can use the ipvs Proxier.
703// This is determined by checking if all the required kernel modules can be loaded. It may
704// return an error if it fails to get the kernel modules information without error, in which
705// case it will also return false.
706func CanUseIPVSProxier(handle KernelHandler, ipsetver IPSetVersioner, scheduler string) (bool, error) {
707	mods, err := handle.GetModules()
708	if err != nil {
709		return false, fmt.Errorf("error getting installed ipvs required kernel modules: %v", err)
710	}
711	loadModules := sets.NewString()
712	loadModules.Insert(mods...)
713
714	kernelVersionStr, err := handle.GetKernelVersion()
715	if err != nil {
716		return false, fmt.Errorf("error determining kernel version to find required kernel modules for ipvs support: %v", err)
717	}
718	kernelVersion, err := version.ParseGeneric(kernelVersionStr)
719	if err != nil {
720		return false, fmt.Errorf("error parsing kernel version %q: %v", kernelVersionStr, err)
721	}
722	mods = utilipvs.GetRequiredIPVSModules(kernelVersion)
723	wantModules := sets.NewString()
724	// We check for the existence of the scheduler mod and will trigger a missingMods error if not found
725	if scheduler == "" {
726		scheduler = DefaultScheduler
727	}
728	schedulerMod := "ip_vs_" + scheduler
729	mods = append(mods, schedulerMod)
730	wantModules.Insert(mods...)
731
732	modules := wantModules.Difference(loadModules).UnsortedList()
733	var missingMods []string
734	ConntrackiMissingCounter := 0
735	for _, mod := range modules {
736		if strings.Contains(mod, "nf_conntrack") {
737			ConntrackiMissingCounter++
738		} else {
739			missingMods = append(missingMods, mod)
740		}
741	}
742	if ConntrackiMissingCounter == 2 {
743		missingMods = append(missingMods, "nf_conntrack_ipv4(or nf_conntrack for Linux kernel 4.19 and later)")
744	}
745
746	if len(missingMods) != 0 {
747		return false, fmt.Errorf("IPVS proxier will not be used because the following required kernel modules are not loaded: %v", missingMods)
748	}
749
750	// Check ipset version
751	versionString, err := ipsetver.GetVersion()
752	if err != nil {
753		return false, fmt.Errorf("error getting ipset version, error: %v", err)
754	}
755	if !checkMinVersion(versionString) {
756		return false, fmt.Errorf("ipset version: %s is less than min required version: %s", versionString, MinIPSetCheckVersion)
757	}
758	return true, nil
759}
760
761// CleanupIptablesLeftovers removes all iptables rules and chains created by the Proxier
762// It returns true if an error was encountered. Errors are logged.
763func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
764	// Unlink the iptables chains created by ipvs Proxier
765	for _, jc := range iptablesJumpChain {
766		args := []string{
767			"-m", "comment", "--comment", jc.comment,
768			"-j", string(jc.to),
769		}
770		if err := ipt.DeleteRule(jc.table, jc.from, args...); err != nil {
771			if !utiliptables.IsNotFoundError(err) {
772				klog.ErrorS(err, "Error removing iptables rules in ipvs proxier")
773				encounteredError = true
774			}
775		}
776	}
777
778	// Flush and remove all of our chains. Flushing all chains before removing them also removes all links between chains first.
779	for _, ch := range iptablesCleanupChains {
780		if err := ipt.FlushChain(ch.table, ch.chain); err != nil {
781			if !utiliptables.IsNotFoundError(err) {
782				klog.ErrorS(err, "Error removing iptables rules in ipvs proxier")
783				encounteredError = true
784			}
785		}
786	}
787
788	// Remove all of our chains.
789	for _, ch := range iptablesCleanupChains {
790		if err := ipt.DeleteChain(ch.table, ch.chain); err != nil {
791			if !utiliptables.IsNotFoundError(err) {
792				klog.ErrorS(err, "Error removing iptables rules in ipvs proxier")
793				encounteredError = true
794			}
795		}
796	}
797
798	return encounteredError
799}
800
801// CleanupLeftovers clean up all ipvs and iptables rules created by ipvs Proxier.
802func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset utilipset.Interface) (encounteredError bool) {
803	// Clear all ipvs rules
804	if ipvs != nil {
805		err := ipvs.Flush()
806		if err != nil {
807			klog.ErrorS(err, "Error flushing IPVS rules")
808			encounteredError = true
809		}
810	}
811	// Delete dummy interface created by ipvs Proxier.
812	nl := NewNetLinkHandle(false)
813	err := nl.DeleteDummyDevice(DefaultDummyDevice)
814	if err != nil {
815		klog.ErrorS(err, "Error deleting dummy device created by IPVS proxier", "device", DefaultDummyDevice)
816		encounteredError = true
817	}
818	// Clear iptables created by ipvs Proxier.
819	encounteredError = cleanupIptablesLeftovers(ipt) || encounteredError
820	// Destroy ip sets created by ipvs Proxier.  We should call it after cleaning up
821	// iptables since we can NOT delete ip set which is still referenced by iptables.
822	for _, set := range ipsetInfo {
823		err = ipset.DestroySet(set.name)
824		if err != nil {
825			if !utilipset.IsNotFoundError(err) {
826				klog.ErrorS(err, "Error removing ipset", "ipset", set.name)
827				encounteredError = true
828			}
829		}
830	}
831	return encounteredError
832}
833
834// Sync is called to synchronize the proxier state to iptables and ipvs as soon as possible.
835func (proxier *Proxier) Sync() {
836	if proxier.healthzServer != nil {
837		proxier.healthzServer.QueuedUpdate()
838	}
839	metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
840	proxier.syncRunner.Run()
841}
842
843// SyncLoop runs periodic work.  This is expected to run as a goroutine or as the main loop of the app.  It does not return.
844func (proxier *Proxier) SyncLoop() {
845	// Update healthz timestamp at beginning in case Sync() never succeeds.
846	if proxier.healthzServer != nil {
847		proxier.healthzServer.Updated()
848	}
849	// synthesize "last change queued" time as the informers are syncing.
850	metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
851	proxier.syncRunner.Loop(wait.NeverStop)
852}
853
854func (proxier *Proxier) setInitialized(value bool) {
855	var initialized int32
856	if value {
857		initialized = 1
858	}
859	atomic.StoreInt32(&proxier.initialized, initialized)
860}
861
862func (proxier *Proxier) isInitialized() bool {
863	return atomic.LoadInt32(&proxier.initialized) > 0
864}
865
866// OnServiceAdd is called whenever creation of new service object is observed.
867func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
868	proxier.OnServiceUpdate(nil, service)
869}
870
871// OnServiceUpdate is called whenever modification of an existing service object is observed.
872func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
873	if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
874		proxier.Sync()
875	}
876}
877
878// OnServiceDelete is called whenever deletion of an existing service object is observed.
879func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
880	proxier.OnServiceUpdate(service, nil)
881}
882
883// OnServiceSynced is called once all the initial event handlers were called and the state is fully propagated to local cache.
884func (proxier *Proxier) OnServiceSynced() {
885	proxier.mu.Lock()
886	proxier.servicesSynced = true
887	proxier.setInitialized(proxier.endpointSlicesSynced)
888	proxier.mu.Unlock()
889
890	// Sync unconditionally - this is called once per lifetime.
891	proxier.syncProxyRules()
892}
893
894// The following methods exist to implement the Proxier interface however
895// ipvs proxier only uses EndpointSlices so the following are noops
896
897// OnEndpointsAdd is called whenever creation of new endpoints object is observed.
898func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {}
899
900// OnEndpointsUpdate is called whenever modification of an existing endpoints object is observed.
901func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {}
902
903// OnEndpointsDelete is called whenever deletion of an existing endpoints object is observed.
904func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {}
905
906// OnEndpointsSynced is called once all the initial event handlers were called and the state is fully propagated to local cache.
907func (proxier *Proxier) OnEndpointsSynced() {}
908
909// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
910// is observed.
911func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
912	if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
913		proxier.Sync()
914	}
915}
916
917// OnEndpointSliceUpdate is called whenever modification of an existing endpoint
918// slice object is observed.
919func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
920	if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
921		proxier.Sync()
922	}
923}
924
925// OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice
926// object is observed.
927func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
928	if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() {
929		proxier.Sync()
930	}
931}
932
933// OnEndpointSlicesSynced is called once all the initial event handlers were
934// called and the state is fully propagated to local cache.
935func (proxier *Proxier) OnEndpointSlicesSynced() {
936	proxier.mu.Lock()
937	proxier.endpointSlicesSynced = true
938	proxier.setInitialized(proxier.servicesSynced)
939	proxier.mu.Unlock()
940
941	// Sync unconditionally - this is called once per lifetime.
942	proxier.syncProxyRules()
943}
944
945// OnNodeAdd is called whenever creation of new node object
946// is observed.
947func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
948	if node.Name != proxier.hostname {
949		klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname)
950		return
951	}
952
953	if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
954		return
955	}
956
957	proxier.mu.Lock()
958	proxier.nodeLabels = map[string]string{}
959	for k, v := range node.Labels {
960		proxier.nodeLabels[k] = v
961	}
962	proxier.mu.Unlock()
963	klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
964
965	proxier.syncProxyRules()
966}
967
968// OnNodeUpdate is called whenever modification of an existing
969// node object is observed.
970func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
971	if node.Name != proxier.hostname {
972		klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname)
973		return
974	}
975
976	if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
977		return
978	}
979
980	proxier.mu.Lock()
981	proxier.nodeLabels = map[string]string{}
982	for k, v := range node.Labels {
983		proxier.nodeLabels[k] = v
984	}
985	proxier.mu.Unlock()
986	klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
987
988	proxier.syncProxyRules()
989}
990
991// OnNodeDelete is called whenever deletion of an existing node
992// object is observed.
993func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
994	if node.Name != proxier.hostname {
995		klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname)
996		return
997	}
998	proxier.mu.Lock()
999	proxier.nodeLabels = nil
1000	proxier.mu.Unlock()
1001
1002	proxier.syncProxyRules()
1003}
1004
1005// OnNodeSynced is called once all the initial event handlers were
1006// called and the state is fully propagated to local cache.
1007func (proxier *Proxier) OnNodeSynced() {
1008}
1009
1010// This is where all of the ipvs calls happen.
1011// assumes proxier.mu is held
1012func (proxier *Proxier) syncProxyRules() {
1013	proxier.mu.Lock()
1014	defer proxier.mu.Unlock()
1015
1016	// don't sync rules till we've received services and endpoints
1017	if !proxier.isInitialized() {
1018		klog.V(2).InfoS("Not syncing ipvs rules until Services and Endpoints have been received from master")
1019		return
1020	}
1021
1022	// Keep track of how long syncs take.
1023	start := time.Now()
1024	defer func() {
1025		metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
1026		klog.V(4).InfoS("syncProxyRules complete", "elapsed", time.Since(start))
1027	}()
1028
1029	// We assume that if this was called, we really want to sync them,
1030	// even if nothing changed in the meantime. In other words, callers are
1031	// responsible for detecting no-op changes and not calling this function.
1032	serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges)
1033	endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
1034
1035	staleServices := serviceUpdateResult.UDPStaleClusterIP
1036	// merge stale services gathered from updateEndpointsMap
1037	for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
1038		if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
1039			klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "svcPortName", svcPortName.String(), "clusterIP", svcInfo.ClusterIP().String())
1040			staleServices.Insert(svcInfo.ClusterIP().String())
1041			for _, extIP := range svcInfo.ExternalIPStrings() {
1042				staleServices.Insert(extIP)
1043			}
1044		}
1045	}
1046
1047	klog.V(3).InfoS("Syncing ipvs Proxier rules")
1048
1049	// Begin install iptables
1050
1051	// Reset all buffers used later.
1052	// This is to avoid memory reallocations and thus improve performance.
1053	proxier.natChains.Reset()
1054	proxier.natRules.Reset()
1055	proxier.filterChains.Reset()
1056	proxier.filterRules.Reset()
1057
1058	// Write table headers.
1059	utilproxy.WriteLine(proxier.filterChains, "*filter")
1060	utilproxy.WriteLine(proxier.natChains, "*nat")
1061
1062	proxier.createAndLinkKubeChain()
1063
1064	// make sure dummy interface exists in the system where ipvs Proxier will bind service address on it
1065	_, err := proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
1066	if err != nil {
1067		klog.ErrorS(err, "Failed to create dummy interface", "interface", DefaultDummyDevice)
1068		return
1069	}
1070
1071	// make sure ip sets exists in the system.
1072	for _, set := range proxier.ipsetList {
1073		if err := ensureIPSet(set); err != nil {
1074			return
1075		}
1076		set.resetEntries()
1077	}
1078
1079	// Accumulate the set of local ports that we will be holding open once this update is complete
1080	replacementPortsMap := map[utilnet.LocalPort]utilnet.Closeable{}
1081	// activeIPVSServices represents IPVS service successfully created in this round of sync
1082	activeIPVSServices := map[string]bool{}
1083	// currentIPVSServices represent IPVS services listed from the system
1084	currentIPVSServices := make(map[string]*utilipvs.VirtualServer)
1085	// activeBindAddrs represents ip address successfully bind to DefaultDummyDevice in this round of sync
1086	activeBindAddrs := map[string]bool{}
1087
1088	bindedAddresses, err := proxier.ipGetter.BindedIPs()
1089	if err != nil {
1090		klog.ErrorS(err, "error listing addresses binded to dummy interface")
1091	}
1092
1093	hasNodePort := false
1094	for _, svc := range proxier.serviceMap {
1095		svcInfo, ok := svc.(*serviceInfo)
1096		if ok && svcInfo.NodePort() != 0 {
1097			hasNodePort = true
1098			break
1099		}
1100	}
1101
1102	// Both nodeAddresses and nodeIPs can be reused for all nodePort services
1103	// and only need to be computed if we have at least one nodePort service.
1104	var (
1105		// List of node addresses to listen on if a nodePort is set.
1106		nodeAddresses []string
1107		// List of node IP addresses to be used as IPVS services if nodePort is set.
1108		nodeIPs []net.IP
1109	)
1110
1111	if hasNodePort {
1112		nodeAddrSet, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
1113		if err != nil {
1114			klog.ErrorS(err, "Failed to get node ip address matching nodeport cidr")
1115		} else {
1116			nodeAddresses = nodeAddrSet.List()
1117			for _, address := range nodeAddresses {
1118				a := net.ParseIP(address)
1119				if a.IsLoopback() {
1120					continue
1121				}
1122				if utilproxy.IsZeroCIDR(address) {
1123					nodeIPs, err = proxier.ipGetter.NodeIPs()
1124					if err != nil {
1125						klog.ErrorS(err, "Failed to list all node IPs from host")
1126					}
1127					break
1128				}
1129				nodeIPs = append(nodeIPs, a)
1130			}
1131		}
1132	}
1133
1134	// filter node IPs by proxier ipfamily
1135	idx := 0
1136	for _, nodeIP := range nodeIPs {
1137		if (proxier.ipFamily == v1.IPv6Protocol) == utilnet.IsIPv6(nodeIP) {
1138			nodeIPs[idx] = nodeIP
1139			idx++
1140		}
1141	}
1142	// reset slice to filtered entries
1143	nodeIPs = nodeIPs[:idx]
1144
1145	localAddrSet := utilproxy.GetLocalAddrSet()
1146
1147	// Build IPVS rules for each service.
1148	for svcName, svc := range proxier.serviceMap {
1149		svcInfo, ok := svc.(*serviceInfo)
1150		if !ok {
1151			klog.ErrorS(nil, "Failed to cast serviceInfo", "svcName", svcName.String())
1152			continue
1153		}
1154		isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP())
1155		localPortIPFamily := utilnet.IPv4
1156		if isIPv6 {
1157			localPortIPFamily = utilnet.IPv6
1158		}
1159		protocol := strings.ToLower(string(svcInfo.Protocol()))
1160		// Precompute svcNameString; with many services the many calls
1161		// to ServicePortName.String() show up in CPU profiles.
1162		svcNameString := svcName.String()
1163
1164		// Handle traffic that loops back to the originator with SNAT.
1165		for _, e := range proxier.endpointsMap[svcName] {
1166			ep, ok := e.(*proxy.BaseEndpointInfo)
1167			if !ok {
1168				klog.ErrorS(nil, "Failed to cast BaseEndpointInfo", "endpoint", e.String())
1169				continue
1170			}
1171			if !ep.IsLocal {
1172				continue
1173			}
1174			epIP := ep.IP()
1175			epPort, err := ep.Port()
1176			// Error parsing this endpoint has been logged. Skip to next endpoint.
1177			if epIP == "" || err != nil {
1178				continue
1179			}
1180			entry := &utilipset.Entry{
1181				IP:       epIP,
1182				Port:     epPort,
1183				Protocol: protocol,
1184				IP2:      epIP,
1185				SetType:  utilipset.HashIPPortIP,
1186			}
1187			if valid := proxier.ipsetList[kubeLoopBackIPSet].validateEntry(entry); !valid {
1188				klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoopBackIPSet].Name)
1189				continue
1190			}
1191			proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String())
1192		}
1193
1194		// Capture the clusterIP.
1195		// ipset call
1196		entry := &utilipset.Entry{
1197			IP:       svcInfo.ClusterIP().String(),
1198			Port:     svcInfo.Port(),
1199			Protocol: protocol,
1200			SetType:  utilipset.HashIPPort,
1201		}
1202		// add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
1203		// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
1204		if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid {
1205			klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeClusterIPSet].Name)
1206			continue
1207		}
1208		proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())
1209		// ipvs call
1210		serv := &utilipvs.VirtualServer{
1211			Address:   svcInfo.ClusterIP(),
1212			Port:      uint16(svcInfo.Port()),
1213			Protocol:  string(svcInfo.Protocol()),
1214			Scheduler: proxier.ipvsScheduler,
1215		}
1216		// Set session affinity flag and timeout for IPVS service
1217		if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
1218			serv.Flags |= utilipvs.FlagPersistent
1219			serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
1220		}
1221		// We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
1222		if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
1223			activeIPVSServices[serv.String()] = true
1224			activeBindAddrs[serv.Address.String()] = true
1225			// ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
1226			// So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
1227			internalNodeLocal := false
1228			if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && svcInfo.NodeLocalInternal() {
1229				internalNodeLocal = true
1230			}
1231			if err := proxier.syncEndpoint(svcName, internalNodeLocal, serv); err != nil {
1232				klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String())
1233			}
1234		} else {
1235			klog.ErrorS(err, "Failed to sync service", "service", serv.String())
1236		}
1237
1238		// Capture externalIPs.
1239		for _, externalIP := range svcInfo.ExternalIPStrings() {
1240			// If the "external" IP happens to be an IP that is local to this
1241			// machine, hold the local port open so no other process can open it
1242			// (because the socket might open but it would never work).
1243			if (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(net.ParseIP(externalIP)) {
1244				// We do not start listening on SCTP ports, according to our agreement in the SCTP support KEP
1245				lp := utilnet.LocalPort{
1246					Description: "externalIP for " + svcNameString,
1247					IP:          externalIP,
1248					IPFamily:    localPortIPFamily,
1249					Port:        svcInfo.Port(),
1250					Protocol:    utilnet.Protocol(svcInfo.Protocol()),
1251				}
1252				if proxier.portsMap[lp] != nil {
1253					klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String())
1254					replacementPortsMap[lp] = proxier.portsMap[lp]
1255				} else {
1256					socket, err := proxier.portMapper.OpenLocalPort(&lp)
1257					if err != nil {
1258						msg := fmt.Sprintf("can't open port %s, skipping it", lp.String())
1259
1260						proxier.recorder.Eventf(
1261							&v1.ObjectReference{
1262								Kind:      "Node",
1263								Name:      proxier.hostname,
1264								UID:       types.UID(proxier.hostname),
1265								Namespace: "",
1266							}, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg)
1267						klog.ErrorS(err, "can't open port, skipping it", "port", lp.String())
1268						continue
1269					}
1270					klog.V(2).InfoS("Opened local port", "port", lp.String())
1271					replacementPortsMap[lp] = socket
1272				}
1273			} // We're holding the port, so it's OK to install IPVS rules.
1274
1275			// ipset call
1276			entry := &utilipset.Entry{
1277				IP:       externalIP,
1278				Port:     svcInfo.Port(),
1279				Protocol: protocol,
1280				SetType:  utilipset.HashIPPort,
1281			}
1282
1283			if svcInfo.NodeLocalExternal() {
1284				if valid := proxier.ipsetList[kubeExternalIPLocalSet].validateEntry(entry); !valid {
1285					klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeExternalIPLocalSet].Name)
1286					continue
1287				}
1288				proxier.ipsetList[kubeExternalIPLocalSet].activeEntries.Insert(entry.String())
1289			} else {
1290				// We have to SNAT packets to external IPs.
1291				if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid {
1292					klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeExternalIPSet].Name)
1293					continue
1294				}
1295				proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String())
1296			}
1297
1298			// ipvs call
1299			serv := &utilipvs.VirtualServer{
1300				Address:   net.ParseIP(externalIP),
1301				Port:      uint16(svcInfo.Port()),
1302				Protocol:  string(svcInfo.Protocol()),
1303				Scheduler: proxier.ipvsScheduler,
1304			}
1305			if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
1306				serv.Flags |= utilipvs.FlagPersistent
1307				serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
1308			}
1309			if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
1310				activeIPVSServices[serv.String()] = true
1311				activeBindAddrs[serv.Address.String()] = true
1312
1313				if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil {
1314					klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String())
1315				}
1316			} else {
1317				klog.ErrorS(err, "Failed to sync service", "service", serv.String())
1318			}
1319		}
1320
1321		// Capture load-balancer ingress.
1322		for _, ingress := range svcInfo.LoadBalancerIPStrings() {
1323			if ingress != "" {
1324				// ipset call
1325				entry = &utilipset.Entry{
1326					IP:       ingress,
1327					Port:     svcInfo.Port(),
1328					Protocol: protocol,
1329					SetType:  utilipset.HashIPPort,
1330				}
1331				// add service load balancer ingressIP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
1332				// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
1333				// If we are proxying globally, we need to masquerade in case we cross nodes.
1334				// If we are proxying only locally, we can retain the source IP.
1335				if valid := proxier.ipsetList[kubeLoadBalancerSet].validateEntry(entry); !valid {
1336					klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSet].Name)
1337					continue
1338				}
1339				proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String())
1340				// insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local
1341				if svcInfo.NodeLocalExternal() {
1342					if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid {
1343						klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerLocalSet].Name)
1344						continue
1345					}
1346					proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String())
1347				}
1348				if len(svcInfo.LoadBalancerSourceRanges()) != 0 {
1349					// The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
1350					// This currently works for loadbalancers that preserves source ips.
1351					// For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
1352					if valid := proxier.ipsetList[kubeLoadbalancerFWSet].validateEntry(entry); !valid {
1353						klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadbalancerFWSet].Name)
1354						continue
1355					}
1356					proxier.ipsetList[kubeLoadbalancerFWSet].activeEntries.Insert(entry.String())
1357					allowFromNode := false
1358					for _, src := range svcInfo.LoadBalancerSourceRanges() {
1359						// ipset call
1360						entry = &utilipset.Entry{
1361							IP:       ingress,
1362							Port:     svcInfo.Port(),
1363							Protocol: protocol,
1364							Net:      src,
1365							SetType:  utilipset.HashIPPortNet,
1366						}
1367						// enumerate all white list source cidr
1368						if valid := proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].validateEntry(entry); !valid {
1369							klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name)
1370							continue
1371						}
1372						proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].activeEntries.Insert(entry.String())
1373
1374						// ignore error because it has been validated
1375						_, cidr, _ := net.ParseCIDR(src)
1376						if cidr.Contains(proxier.nodeIP) {
1377							allowFromNode = true
1378						}
1379					}
1380					// generally, ip route rule was added to intercept request to loadbalancer vip from the
1381					// loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
1382					// Need to add the following rule to allow request on host.
1383					if allowFromNode {
1384						entry = &utilipset.Entry{
1385							IP:       ingress,
1386							Port:     svcInfo.Port(),
1387							Protocol: protocol,
1388							IP2:      ingress,
1389							SetType:  utilipset.HashIPPortIP,
1390						}
1391						// enumerate all white list source ip
1392						if valid := proxier.ipsetList[kubeLoadBalancerSourceIPSet].validateEntry(entry); !valid {
1393							klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSourceIPSet].Name)
1394							continue
1395						}
1396						proxier.ipsetList[kubeLoadBalancerSourceIPSet].activeEntries.Insert(entry.String())
1397					}
1398				}
1399
1400				// ipvs call
1401				serv := &utilipvs.VirtualServer{
1402					Address:   net.ParseIP(ingress),
1403					Port:      uint16(svcInfo.Port()),
1404					Protocol:  string(svcInfo.Protocol()),
1405					Scheduler: proxier.ipvsScheduler,
1406				}
1407				if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
1408					serv.Flags |= utilipvs.FlagPersistent
1409					serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
1410				}
1411				if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
1412					activeIPVSServices[serv.String()] = true
1413					activeBindAddrs[serv.Address.String()] = true
1414					if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil {
1415						klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv)
1416					}
1417				} else {
1418					klog.ErrorS(err, "Failed to sync service", "service", serv)
1419				}
1420			}
1421		}
1422
1423		if svcInfo.NodePort() != 0 {
1424			if len(nodeAddresses) == 0 || len(nodeIPs) == 0 {
1425				// Skip nodePort configuration since an error occurred when
1426				// computing nodeAddresses or nodeIPs.
1427				continue
1428			}
1429
1430			var lps []utilnet.LocalPort
1431			for _, address := range nodeAddresses {
1432				lp := utilnet.LocalPort{
1433					Description: "nodePort for " + svcNameString,
1434					IP:          address,
1435					IPFamily:    localPortIPFamily,
1436					Port:        svcInfo.NodePort(),
1437					Protocol:    utilnet.Protocol(svcInfo.Protocol()),
1438				}
1439				if utilproxy.IsZeroCIDR(address) {
1440					// Empty IP address means all
1441					lp.IP = ""
1442					lps = append(lps, lp)
1443					// If we encounter a zero CIDR, then there is no point in processing the rest of the addresses.
1444					break
1445				}
1446				lps = append(lps, lp)
1447			}
1448
1449			// For ports on node IPs, open the actual port and hold it.
1450			for _, lp := range lps {
1451				if proxier.portsMap[lp] != nil {
1452					klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String())
1453					replacementPortsMap[lp] = proxier.portsMap[lp]
1454					// We do not start listening on SCTP ports, according to our agreement in the
1455					// SCTP support KEP
1456				} else if svcInfo.Protocol() != v1.ProtocolSCTP {
1457					socket, err := proxier.portMapper.OpenLocalPort(&lp)
1458					if err != nil {
1459						msg := fmt.Sprintf("can't open port %s, skipping it", lp.String())
1460
1461						proxier.recorder.Eventf(
1462							&v1.ObjectReference{
1463								Kind:      "Node",
1464								Name:      proxier.hostname,
1465								UID:       types.UID(proxier.hostname),
1466								Namespace: "",
1467							}, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg)
1468						klog.ErrorS(err, "can't open port, skipping it", "port", lp.String())
1469						continue
1470					}
1471					klog.V(2).InfoS("Opened local port", "port", lp.String())
1472
1473					if lp.Protocol == utilnet.UDP {
1474						conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
1475					}
1476					replacementPortsMap[lp] = socket
1477				} // We're holding the port, so it's OK to install ipvs rules.
1478			}
1479
1480			// Nodeports need SNAT, unless they're local.
1481			// ipset call
1482
1483			var (
1484				nodePortSet *IPSet
1485				entries     []*utilipset.Entry
1486			)
1487
1488			switch protocol {
1489			case utilipset.ProtocolTCP:
1490				nodePortSet = proxier.ipsetList[kubeNodePortSetTCP]
1491				entries = []*utilipset.Entry{{
1492					// No need to provide ip info
1493					Port:     svcInfo.NodePort(),
1494					Protocol: protocol,
1495					SetType:  utilipset.BitmapPort,
1496				}}
1497			case utilipset.ProtocolUDP:
1498				nodePortSet = proxier.ipsetList[kubeNodePortSetUDP]
1499				entries = []*utilipset.Entry{{
1500					// No need to provide ip info
1501					Port:     svcInfo.NodePort(),
1502					Protocol: protocol,
1503					SetType:  utilipset.BitmapPort,
1504				}}
1505			case utilipset.ProtocolSCTP:
1506				nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP]
1507				// Since hash ip:port is used for SCTP, all the nodeIPs to be used in the SCTP ipset entries.
1508				entries = []*utilipset.Entry{}
1509				for _, nodeIP := range nodeIPs {
1510					entries = append(entries, &utilipset.Entry{
1511						IP:       nodeIP.String(),
1512						Port:     svcInfo.NodePort(),
1513						Protocol: protocol,
1514						SetType:  utilipset.HashIPPort,
1515					})
1516				}
1517			default:
1518				// It should never hit
1519				klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol)
1520			}
1521			if nodePortSet != nil {
1522				entryInvalidErr := false
1523				for _, entry := range entries {
1524					if valid := nodePortSet.validateEntry(entry); !valid {
1525						klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortSet.Name)
1526						entryInvalidErr = true
1527						break
1528					}
1529					nodePortSet.activeEntries.Insert(entry.String())
1530				}
1531				if entryInvalidErr {
1532					continue
1533				}
1534			}
1535
1536			// Add externaltrafficpolicy=local type nodeport entry
1537			if svcInfo.NodeLocalExternal() {
1538				var nodePortLocalSet *IPSet
1539				switch protocol {
1540				case utilipset.ProtocolTCP:
1541					nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetTCP]
1542				case utilipset.ProtocolUDP:
1543					nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetUDP]
1544				case utilipset.ProtocolSCTP:
1545					nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetSCTP]
1546				default:
1547					// It should never hit
1548					klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol)
1549				}
1550				if nodePortLocalSet != nil {
1551					entryInvalidErr := false
1552					for _, entry := range entries {
1553						if valid := nodePortLocalSet.validateEntry(entry); !valid {
1554							klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortLocalSet.Name)
1555							entryInvalidErr = true
1556							break
1557						}
1558						nodePortLocalSet.activeEntries.Insert(entry.String())
1559					}
1560					if entryInvalidErr {
1561						continue
1562					}
1563				}
1564			}
1565
1566			// Build ipvs kernel routes for each node ip address
1567			for _, nodeIP := range nodeIPs {
1568				// ipvs call
1569				serv := &utilipvs.VirtualServer{
1570					Address:   nodeIP,
1571					Port:      uint16(svcInfo.NodePort()),
1572					Protocol:  string(svcInfo.Protocol()),
1573					Scheduler: proxier.ipvsScheduler,
1574				}
1575				if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
1576					serv.Flags |= utilipvs.FlagPersistent
1577					serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
1578				}
1579				// There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
1580				if err := proxier.syncService(svcNameString, serv, false, bindedAddresses); err == nil {
1581					activeIPVSServices[serv.String()] = true
1582					if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil {
1583						klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv)
1584					}
1585				} else {
1586					klog.ErrorS(err, "Failed to sync service", "service", serv)
1587				}
1588			}
1589		}
1590
1591		if svcInfo.HealthCheckNodePort() != 0 {
1592			nodePortSet := proxier.ipsetList[kubeHealthCheckNodePortSet]
1593			entry := &utilipset.Entry{
1594				// No need to provide ip info
1595				Port:     svcInfo.HealthCheckNodePort(),
1596				Protocol: "tcp",
1597				SetType:  utilipset.BitmapPort,
1598			}
1599
1600			if valid := nodePortSet.validateEntry(entry); !valid {
1601				klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortSet.Name)
1602				continue
1603			}
1604			nodePortSet.activeEntries.Insert(entry.String())
1605		}
1606	}
1607
1608	// sync ipset entries
1609	for _, set := range proxier.ipsetList {
1610		set.syncIPSetEntries()
1611	}
1612
1613	// Tail call iptables rules for ipset, make sure only call iptables once
1614	// in a single loop per ip set.
1615	proxier.writeIptablesRules()
1616
1617	// Sync iptables rules.
1618	// NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
1619	proxier.iptablesData.Reset()
1620	proxier.iptablesData.Write(proxier.natChains.Bytes())
1621	proxier.iptablesData.Write(proxier.natRules.Bytes())
1622	proxier.iptablesData.Write(proxier.filterChains.Bytes())
1623	proxier.iptablesData.Write(proxier.filterRules.Bytes())
1624
1625	klog.V(5).InfoS("Restoring iptables", "rules", string(proxier.iptablesData.Bytes()))
1626	err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
1627	if err != nil {
1628		klog.ErrorS(err, "Failed to execute iptables-restore", "rules", string(proxier.iptablesData.Bytes()))
1629		metrics.IptablesRestoreFailuresTotal.Inc()
1630		// Revert new local ports.
1631		utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
1632		return
1633	}
1634	for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
1635		for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
1636			latency := metrics.SinceInSeconds(lastChangeTriggerTime)
1637			metrics.NetworkProgrammingLatency.Observe(latency)
1638			klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
1639		}
1640	}
1641
1642	// Close old local ports and save new ones.
1643	for k, v := range proxier.portsMap {
1644		if replacementPortsMap[k] == nil {
1645			v.Close()
1646		}
1647	}
1648	proxier.portsMap = replacementPortsMap
1649
1650	// Get legacy bind address
1651	// currentBindAddrs represents ip addresses bind to DefaultDummyDevice from the system
1652	currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(DefaultDummyDevice)
1653	if err != nil {
1654		klog.ErrorS(err, "Failed to get bind address")
1655	}
1656	legacyBindAddrs := proxier.getLegacyBindAddr(activeBindAddrs, currentBindAddrs)
1657
1658	// Clean up legacy IPVS services and unbind addresses
1659	appliedSvcs, err := proxier.ipvs.GetVirtualServers()
1660	if err == nil {
1661		for _, appliedSvc := range appliedSvcs {
1662			currentIPVSServices[appliedSvc.String()] = appliedSvc
1663		}
1664	} else {
1665		klog.ErrorS(err, "Failed to get ipvs service")
1666	}
1667	proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs)
1668
1669	if proxier.healthzServer != nil {
1670		proxier.healthzServer.Updated()
1671	}
1672	metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
1673
1674	// Update service healthchecks.  The endpoints list might include services that are
1675	// not "OnlyLocal", but the services list will not, and the serviceHealthServer
1676	// will just drop those endpoints.
1677	if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
1678		klog.ErrorS(err, "Error syncing healthcheck services")
1679	}
1680	if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
1681		klog.ErrorS(err, "Error syncing healthcheck endpoints")
1682	}
1683
1684	// Finish housekeeping.
1685	// TODO: these could be made more consistent.
1686	for _, svcIP := range staleServices.UnsortedList() {
1687		if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
1688			klog.ErrorS(err, "Failed to delete stale service IP connections", "ip", svcIP)
1689		}
1690	}
1691	proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
1692}
1693
1694// writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed
1695// according to proxier.ipsetList information and the ipset match relationship that `ipsetWithIptablesChain` specified.
1696// some ipset(kubeClusterIPSet for example) have particular match rules and iptables jump relation should be sync separately.
1697func (proxier *Proxier) writeIptablesRules() {
1698	// We are creating those slices ones here to avoid memory reallocations
1699	// in every loop. Note that reuse the memory, instead of doing:
1700	//   slice = <some new slice>
1701	// you should always do one of the below:
1702	//   slice = slice[:0] // and then append to it
1703	//   slice = append(slice[:0], ...)
1704	// To avoid growing this slice, we arbitrarily set its size to 64,
1705	// there is never more than that many arguments for a single line.
1706	// Note that even if we go over 64, it will still be correct - it
1707	// is just for efficiency, not correctness.
1708	args := make([]string, 64)
1709
1710	for _, set := range ipsetWithIptablesChain {
1711		if _, find := proxier.ipsetList[set.name]; find && !proxier.ipsetList[set.name].isEmpty() {
1712			args = append(args[:0], "-A", set.from)
1713			if set.protocolMatch != "" {
1714				args = append(args, "-p", set.protocolMatch)
1715			}
1716			args = append(args,
1717				"-m", "comment", "--comment", proxier.ipsetList[set.name].getComment(),
1718				"-m", "set", "--match-set", proxier.ipsetList[set.name].Name,
1719				set.matchType,
1720			)
1721			utilproxy.WriteLine(proxier.natRules, append(args, "-j", set.to)...)
1722		}
1723	}
1724
1725	if !proxier.ipsetList[kubeClusterIPSet].isEmpty() {
1726		args = append(args[:0],
1727			"-A", string(kubeServicesChain),
1728			"-m", "comment", "--comment", proxier.ipsetList[kubeClusterIPSet].getComment(),
1729			"-m", "set", "--match-set", proxier.ipsetList[kubeClusterIPSet].Name,
1730		)
1731		if proxier.masqueradeAll {
1732			utilproxy.WriteLine(proxier.natRules, append(args, "dst,dst", "-j", string(KubeMarkMasqChain))...)
1733		} else if proxier.localDetector.IsImplemented() {
1734			// This masquerades off-cluster traffic to a service VIP.  The idea
1735			// is that you can establish a static route for your Service range,
1736			// routing to any node, and that node will bridge into the Service
1737			// for you.  Since that might bounce off-node, we masquerade here.
1738			// If/when we support "Local" policy for VIPs, we should update this.
1739			utilproxy.WriteLine(proxier.natRules, proxier.localDetector.JumpIfNotLocal(append(args, "dst,dst"), string(KubeMarkMasqChain))...)
1740		} else {
1741			// Masquerade all OUTPUT traffic coming from a service ip.
1742			// The kube dummy interface has all service VIPs assigned which
1743			// results in the service VIP being picked as the source IP to reach
1744			// a VIP. This leads to a connection from VIP:<random port> to
1745			// VIP:<service port>.
1746			// Always masquerading OUTPUT (node-originating) traffic with a VIP
1747			// source ip and service port destination fixes the outgoing connections.
1748			utilproxy.WriteLine(proxier.natRules, append(args, "src,dst", "-j", string(KubeMarkMasqChain))...)
1749		}
1750	}
1751
1752	// externalIPRules adds iptables rules applies to Service ExternalIPs
1753	externalIPRules := func(args []string) {
1754		// Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
1755		// nor from a local process to be forwarded to the service.
1756		// This rule roughly translates to "all traffic from off-machine".
1757		// This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later.
1758		externalTrafficOnlyArgs := append(args,
1759			"-m", "physdev", "!", "--physdev-is-in",
1760			"-m", "addrtype", "!", "--src-type", "LOCAL")
1761		utilproxy.WriteLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", "ACCEPT")...)
1762		dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
1763		// Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local.
1764		// This covers cases like GCE load-balancers which get added to the local routing table.
1765		utilproxy.WriteLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...)
1766	}
1767
1768	if !proxier.ipsetList[kubeExternalIPSet].isEmpty() {
1769		// Build masquerade rules for packets to external IPs.
1770		args = append(args[:0],
1771			"-A", string(kubeServicesChain),
1772			"-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPSet].getComment(),
1773			"-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPSet].Name,
1774			"dst,dst",
1775		)
1776		utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
1777		externalIPRules(args)
1778	}
1779
1780	if !proxier.ipsetList[kubeExternalIPLocalSet].isEmpty() {
1781		args = append(args[:0],
1782			"-A", string(kubeServicesChain),
1783			"-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPLocalSet].getComment(),
1784			"-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPLocalSet].Name,
1785			"dst,dst",
1786		)
1787		externalIPRules(args)
1788	}
1789
1790	// -A KUBE-SERVICES  -m addrtype  --dst-type LOCAL -j KUBE-NODE-PORT
1791	args = append(args[:0],
1792		"-A", string(kubeServicesChain),
1793		"-m", "addrtype", "--dst-type", "LOCAL",
1794	)
1795	utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(KubeNodePortChain))...)
1796
1797	// mark drop for KUBE-LOAD-BALANCER
1798	utilproxy.WriteLine(proxier.natRules, []string{
1799		"-A", string(KubeLoadBalancerChain),
1800		"-j", string(KubeMarkMasqChain),
1801	}...)
1802
1803	// mark drop for KUBE-FIRE-WALL
1804	utilproxy.WriteLine(proxier.natRules, []string{
1805		"-A", string(KubeFireWallChain),
1806		"-j", string(KubeMarkDropChain),
1807	}...)
1808
1809	// Accept all traffic with destination of ipvs virtual service, in case other iptables rules
1810	// block the traffic, that may result in ipvs rules invalid.
1811	// Those rules must be in the end of KUBE-SERVICE chain
1812	proxier.acceptIPVSTraffic()
1813
1814	// If the masqueradeMark has been added then we want to forward that same
1815	// traffic, this allows NodePort traffic to be forwarded even if the default
1816	// FORWARD policy is not accept.
1817	utilproxy.WriteLine(proxier.filterRules,
1818		"-A", string(KubeForwardChain),
1819		"-m", "comment", "--comment", `"kubernetes forwarding rules"`,
1820		"-m", "mark", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark),
1821		"-j", "ACCEPT",
1822	)
1823
1824	// The following two rules ensure the traffic after the initial packet
1825	// accepted by the "kubernetes forwarding rules" rule above will be
1826	// accepted.
1827	utilproxy.WriteLine(proxier.filterRules,
1828		"-A", string(KubeForwardChain),
1829		"-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`,
1830		"-m", "conntrack",
1831		"--ctstate", "RELATED,ESTABLISHED",
1832		"-j", "ACCEPT",
1833	)
1834	utilproxy.WriteLine(proxier.filterRules,
1835		"-A", string(KubeForwardChain),
1836		"-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`,
1837		"-m", "conntrack",
1838		"--ctstate", "RELATED,ESTABLISHED",
1839		"-j", "ACCEPT",
1840	)
1841
1842	// Add rule to accept traffic towards health check node port
1843	utilproxy.WriteLine(proxier.filterRules,
1844		"-A", string(KubeNodePortChain),
1845		"-m", "comment", "--comment", proxier.ipsetList[kubeHealthCheckNodePortSet].getComment(),
1846		"-m", "set", "--match-set", proxier.ipsetList[kubeHealthCheckNodePortSet].Name, "dst",
1847		"-j", "ACCEPT",
1848	)
1849
1850	// Install the kubernetes-specific postrouting rules. We use a whole chain for
1851	// this so that it is easier to flush and change, for example if the mark
1852	// value should ever change.
1853	// NB: THIS MUST MATCH the corresponding code in the kubelet
1854	utilproxy.WriteLine(proxier.natRules, []string{
1855		"-A", string(kubePostroutingChain),
1856		"-m", "mark", "!", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark),
1857		"-j", "RETURN",
1858	}...)
1859	// Clear the mark to avoid re-masquerading if the packet re-traverses the network stack.
1860	utilproxy.WriteLine(proxier.natRules, []string{
1861		"-A", string(kubePostroutingChain),
1862		// XOR proxier.masqueradeMark to unset it
1863		"-j", "MARK", "--xor-mark", proxier.masqueradeMark,
1864	}...)
1865	masqRule := []string{
1866		"-A", string(kubePostroutingChain),
1867		"-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
1868		"-j", "MASQUERADE",
1869	}
1870	if proxier.iptables.HasRandomFully() {
1871		masqRule = append(masqRule, "--random-fully")
1872	}
1873	utilproxy.WriteLine(proxier.natRules, masqRule...)
1874
1875	// Install the kubernetes-specific masquerade mark rule. We use a whole chain for
1876	// this so that it is easier to flush and change, for example if the mark
1877	// value should ever change.
1878	utilproxy.WriteLine(proxier.natRules, []string{
1879		"-A", string(KubeMarkMasqChain),
1880		"-j", "MARK", "--or-mark", proxier.masqueradeMark,
1881	}...)
1882
1883	// Write the end-of-table markers.
1884	utilproxy.WriteLine(proxier.filterRules, "COMMIT")
1885	utilproxy.WriteLine(proxier.natRules, "COMMIT")
1886}
1887
1888func (proxier *Proxier) acceptIPVSTraffic() {
1889	sets := []string{kubeClusterIPSet, kubeLoadBalancerSet}
1890	for _, set := range sets {
1891		var matchType string
1892		if !proxier.ipsetList[set].isEmpty() {
1893			switch proxier.ipsetList[set].SetType {
1894			case utilipset.BitmapPort:
1895				matchType = "dst"
1896			default:
1897				matchType = "dst,dst"
1898			}
1899			utilproxy.WriteLine(proxier.natRules, []string{
1900				"-A", string(kubeServicesChain),
1901				"-m", "set", "--match-set", proxier.ipsetList[set].Name, matchType,
1902				"-j", "ACCEPT",
1903			}...)
1904		}
1905	}
1906}
1907
1908// createAndLinkKubeChain create all kube chains that ipvs proxier need and write basic link.
1909func (proxier *Proxier) createAndLinkKubeChain() {
1910	existingFilterChains := proxier.getExistingChains(proxier.filterChainsData, utiliptables.TableFilter)
1911	existingNATChains := proxier.getExistingChains(proxier.iptablesData, utiliptables.TableNAT)
1912
1913	// ensure KUBE-MARK-DROP chain exist but do not change any rules
1914	for _, ch := range iptablesEnsureChains {
1915		if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil {
1916			klog.ErrorS(err, "Failed to ensure chain exists", "table", ch.table, "chain", ch.chain)
1917			return
1918		}
1919	}
1920
1921	// Make sure we keep stats for the top-level chains
1922	for _, ch := range iptablesChains {
1923		if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil {
1924			klog.ErrorS(err, "Failed to ensure chain exists", "table", ch.table, "chain", ch.chain)
1925			return
1926		}
1927		if ch.table == utiliptables.TableNAT {
1928			if chain, ok := existingNATChains[ch.chain]; ok {
1929				utilproxy.WriteBytesLine(proxier.natChains, chain)
1930			} else {
1931				utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(ch.chain))
1932			}
1933		} else {
1934			if chain, ok := existingFilterChains[ch.chain]; ok {
1935				utilproxy.WriteBytesLine(proxier.filterChains, chain)
1936			} else {
1937				utilproxy.WriteLine(proxier.filterChains, utiliptables.MakeChainLine(ch.chain))
1938			}
1939		}
1940	}
1941
1942	for _, jc := range iptablesJumpChain {
1943		args := []string{"-m", "comment", "--comment", jc.comment, "-j", string(jc.to)}
1944		if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jc.table, jc.from, args...); err != nil {
1945			klog.ErrorS(err, "Failed to ensure chain jumps", "table", jc.table, "srcChain", jc.from, "dstChain", jc.to)
1946		}
1947	}
1948
1949}
1950
1951// getExistingChains get iptables-save output so we can check for existing chains and rules.
1952// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
1953// Result may SHARE memory with contents of buffer.
1954func (proxier *Proxier) getExistingChains(buffer *bytes.Buffer, table utiliptables.Table) map[utiliptables.Chain][]byte {
1955	buffer.Reset()
1956	err := proxier.iptables.SaveInto(table, buffer)
1957	if err != nil { // if we failed to get any rules
1958		klog.ErrorS(err, "Failed to execute iptables-save, syncing all rules")
1959	} else { // otherwise parse the output
1960		return utiliptables.GetChainLines(table, buffer.Bytes())
1961	}
1962	return nil
1963}
1964
1965// After a UDP or SCTP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
1966// risk sending more traffic to it, all of which will be lost (because UDP).
1967// This assumes the proxier mutex is held
1968func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) {
1969	for _, epSvcPair := range connectionMap {
1970		if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
1971			endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
1972			svcProto := svcInfo.Protocol()
1973			err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto)
1974			if err != nil {
1975				klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName.String())
1976			}
1977			for _, extIP := range svcInfo.ExternalIPStrings() {
1978				err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto)
1979				if err != nil {
1980					klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName.String(), "ip", extIP)
1981				}
1982			}
1983			for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
1984				err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto)
1985				if err != nil {
1986					klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName.String(), "ip", lbIP)
1987				}
1988			}
1989		}
1990	}
1991}
1992
1993func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool, bindedAddresses sets.String) error {
1994	appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
1995	if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
1996		if appliedVirtualServer == nil {
1997			// IPVS service is not found, create a new service
1998			klog.V(3).InfoS("Adding new service", "svcName", svcName, "address", fmt.Sprintf("%s:%d/%s", vs.Address, vs.Port, vs.Protocol))
1999			if err := proxier.ipvs.AddVirtualServer(vs); err != nil {
2000				klog.ErrorS(err, "Failed to add IPVS service", "svcName", svcName)
2001				return err
2002			}
2003		} else {
2004			// IPVS service was changed, update the existing one
2005			// During updates, service VIP will not go down
2006			klog.V(3).InfoS("IPVS service was changed", "svcName", svcName)
2007			if err := proxier.ipvs.UpdateVirtualServer(vs); err != nil {
2008				klog.ErrorS(err, "Failed to update IPVS service")
2009				return err
2010			}
2011		}
2012	}
2013
2014	// bind service address to dummy interface
2015	if bindAddr {
2016		// always attempt to bind if bindedAddresses is nil,
2017		// otherwise check if it's already binded and return early
2018		if bindedAddresses != nil && bindedAddresses.Has(vs.Address.String()) {
2019			return nil
2020		}
2021
2022		klog.V(4).InfoS("Bind addr", "address", vs.Address.String())
2023		_, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice)
2024		if err != nil {
2025			klog.ErrorS(err, "Failed to bind service address to dummy device", "svcName", svcName)
2026			return err
2027		}
2028	}
2029
2030	return nil
2031}
2032
2033func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNodeLocalEndpoints bool, vs *utilipvs.VirtualServer) error {
2034	appliedVirtualServer, err := proxier.ipvs.GetVirtualServer(vs)
2035	if err != nil {
2036		klog.Errorf("Failed to get IPVS service, error: %v", err)
2037		return err
2038	}
2039	if appliedVirtualServer == nil {
2040		return errors.New("IPVS virtual service does not exist")
2041	}
2042
2043	// curEndpoints represents IPVS destinations listed from current system.
2044	curEndpoints := sets.NewString()
2045	// readyEndpoints represents Endpoints watched from API Server.
2046	readyEndpoints := sets.NewString()
2047	// localReadyEndpoints represents local endpoints that are ready and NOT terminating.
2048	localReadyEndpoints := sets.NewString()
2049	// localReadyTerminatingEndpoints represents local endpoints that are ready AND terminating.
2050	// Fall back to these endpoints if no non-terminating ready endpoints exist for node-local traffic.
2051	localReadyTerminatingEndpoints := sets.NewString()
2052
2053	curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer)
2054	if err != nil {
2055		klog.ErrorS(err, "Failed to list IPVS destinations")
2056		return err
2057	}
2058	for _, des := range curDests {
2059		curEndpoints.Insert(des.String())
2060	}
2061
2062	endpoints := proxier.endpointsMap[svcPortName]
2063
2064	// Filtering for topology aware endpoints. This function will only
2065	// filter endpoints if appropriate feature gates are enabled and the
2066	// Service does not have conflicting configuration such as
2067	// externalTrafficPolicy=Local.
2068	svcInfo, ok := proxier.serviceMap[svcPortName]
2069	if !ok {
2070		klog.InfoS("Unable to filter endpoints due to missing Service info", "svcPortName", svcPortName)
2071	} else {
2072		endpoints = proxy.FilterEndpoints(endpoints, svcInfo, proxier.nodeLabels)
2073	}
2074
2075	for _, epInfo := range endpoints {
2076		if epInfo.IsReady() {
2077			readyEndpoints.Insert(epInfo.String())
2078		}
2079
2080		if onlyNodeLocalEndpoints && epInfo.GetIsLocal() {
2081			if epInfo.IsReady() {
2082				localReadyEndpoints.Insert(epInfo.String())
2083			} else if epInfo.IsServing() && epInfo.IsTerminating() {
2084				localReadyTerminatingEndpoints.Insert(epInfo.String())
2085			}
2086		}
2087	}
2088
2089	newEndpoints := readyEndpoints
2090	if onlyNodeLocalEndpoints {
2091		newEndpoints = localReadyEndpoints
2092
2093		if utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) {
2094			if len(newEndpoints) == 0 && localReadyTerminatingEndpoints.Len() > 0 {
2095				newEndpoints = localReadyTerminatingEndpoints
2096			}
2097		}
2098	}
2099
2100	// Create new endpoints
2101	for _, ep := range newEndpoints.List() {
2102		ip, port, err := net.SplitHostPort(ep)
2103		if err != nil {
2104			klog.ErrorS(err, "Failed to parse endpoint", "endpoint", ep)
2105			continue
2106		}
2107		portNum, err := strconv.Atoi(port)
2108		if err != nil {
2109			klog.ErrorS(err, "Failed to parse endpoint port", "port", port)
2110			continue
2111		}
2112
2113		newDest := &utilipvs.RealServer{
2114			Address: net.ParseIP(ip),
2115			Port:    uint16(portNum),
2116			Weight:  1,
2117		}
2118
2119		if curEndpoints.Has(ep) {
2120			// check if newEndpoint is in gracefulDelete list, if true, delete this ep immediately
2121			uniqueRS := GetUniqueRSName(vs, newDest)
2122			if !proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
2123				continue
2124			}
2125			klog.V(5).InfoS("new ep is in graceful delete list", "uniqueRS", uniqueRS)
2126			err := proxier.gracefuldeleteManager.MoveRSOutofGracefulDeleteList(uniqueRS)
2127			if err != nil {
2128				klog.ErrorS(err, "Failed to delete endpoint in gracefulDeleteQueue", "endpoint", ep)
2129				continue
2130			}
2131		}
2132		err = proxier.ipvs.AddRealServer(appliedVirtualServer, newDest)
2133		if err != nil {
2134			klog.ErrorS(err, "Failed to add destination", "newDest", newDest)
2135			continue
2136		}
2137	}
2138	// Delete old endpoints
2139	for _, ep := range curEndpoints.Difference(newEndpoints).UnsortedList() {
2140		// if curEndpoint is in gracefulDelete, skip
2141		uniqueRS := vs.String() + "/" + ep
2142		if proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
2143			continue
2144		}
2145		ip, port, err := net.SplitHostPort(ep)
2146		if err != nil {
2147			klog.ErrorS(err, "Failed to parse endpoint", "endpoint", ep)
2148			continue
2149		}
2150		portNum, err := strconv.Atoi(port)
2151		if err != nil {
2152			klog.ErrorS(err, "Failed to parse endpoint port", "port", port)
2153			continue
2154		}
2155
2156		delDest := &utilipvs.RealServer{
2157			Address: net.ParseIP(ip),
2158			Port:    uint16(portNum),
2159		}
2160
2161		klog.V(5).InfoS("Using graceful delete", "uniqueRS", uniqueRS)
2162		err = proxier.gracefuldeleteManager.GracefulDeleteRS(appliedVirtualServer, delDest)
2163		if err != nil {
2164			klog.ErrorS(err, "Failed to delete destination", "uniqueRS", uniqueRS)
2165			continue
2166		}
2167	}
2168	return nil
2169}
2170
2171func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer, legacyBindAddrs map[string]bool) {
2172	isIPv6 := utilnet.IsIPv6(proxier.nodeIP)
2173	for cs := range currentServices {
2174		svc := currentServices[cs]
2175		if proxier.isIPInExcludeCIDRs(svc.Address) {
2176			continue
2177		}
2178		if utilnet.IsIPv6(svc.Address) != isIPv6 {
2179			// Not our family
2180			continue
2181		}
2182		if _, ok := activeServices[cs]; !ok {
2183			klog.V(4).InfoS("Delete service", "service", svc.String())
2184			if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil {
2185				klog.ErrorS(err, "Failed to delete service", "service", svc.String())
2186			}
2187			addr := svc.Address.String()
2188			if _, ok := legacyBindAddrs[addr]; ok {
2189				klog.V(4).InfoS("Unbinding address", "address", addr)
2190				if err := proxier.netlinkHandle.UnbindAddress(addr, DefaultDummyDevice); err != nil {
2191					klog.ErrorS(err, "Failed to unbind service from dummy interface", "interface", DefaultDummyDevice, "address", addr)
2192				} else {
2193					// In case we delete a multi-port service, avoid trying to unbind multiple times
2194					delete(legacyBindAddrs, addr)
2195				}
2196			}
2197		}
2198	}
2199}
2200
2201func (proxier *Proxier) isIPInExcludeCIDRs(ip net.IP) bool {
2202	// make sure it does not fall within an excluded CIDR range.
2203	for _, excludedCIDR := range proxier.excludeCIDRs {
2204		if excludedCIDR.Contains(ip) {
2205			return true
2206		}
2207	}
2208	return false
2209}
2210
2211func (proxier *Proxier) getLegacyBindAddr(activeBindAddrs map[string]bool, currentBindAddrs []string) map[string]bool {
2212	legacyAddrs := make(map[string]bool)
2213	isIPv6 := utilnet.IsIPv6(proxier.nodeIP)
2214	for _, addr := range currentBindAddrs {
2215		addrIsIPv6 := utilnet.IsIPv6(net.ParseIP(addr))
2216		if addrIsIPv6 && !isIPv6 || !addrIsIPv6 && isIPv6 {
2217			continue
2218		}
2219		if _, ok := activeBindAddrs[addr]; !ok {
2220			legacyAddrs[addr] = true
2221		}
2222	}
2223	return legacyAddrs
2224}
2225
2226// ipvs Proxier fall back on iptables when it needs to do SNAT for engress packets
2227// It will only operate iptables *nat table.
2228// Create and link the kube postrouting chain for SNAT packets.
2229// Chain POSTROUTING (policy ACCEPT)
2230// target     prot opt source               destination
2231// KUBE-POSTROUTING  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes postrouting rules *
2232// Maintain by kubelet network sync loop
2233
2234// *nat
2235// :KUBE-POSTROUTING - [0:0]
2236// Chain KUBE-POSTROUTING (1 references)
2237// target     prot opt source               destination
2238// MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000
2239
2240// :KUBE-MARK-MASQ - [0:0]
2241// Chain KUBE-MARK-MASQ (0 references)
2242// target     prot opt source               destination
2243// MARK       all  --  0.0.0.0/0            0.0.0.0/0            MARK or 0x4000
2244