1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package endpoints
18
19import (
20	"bytes"
21	"crypto/md5"
22	"encoding/hex"
23	"hash"
24	"sort"
25
26	"k8s.io/api/core/v1"
27	"k8s.io/apimachinery/pkg/types"
28	hashutil "k8s.io/kubernetes/pkg/util/hash"
29)
30
31// RepackSubsets takes a slice of EndpointSubset objects, expands it to the full
32// representation, and then repacks that into the canonical layout.  This
33// ensures that code which operates on these objects can rely on the common
34// form for things like comparison.  The result is a newly allocated slice.
35func RepackSubsets(subsets []v1.EndpointSubset) []v1.EndpointSubset {
36	// First map each unique port definition to the sets of hosts that
37	// offer it.
38	allAddrs := map[addressKey]*v1.EndpointAddress{}
39	portToAddrReadyMap := map[v1.EndpointPort]addressSet{}
40	for i := range subsets {
41		if len(subsets[i].Ports) == 0 {
42			// Don't discard endpoints with no ports defined, use a sentinel.
43			mapAddressesByPort(&subsets[i], v1.EndpointPort{Port: -1}, allAddrs, portToAddrReadyMap)
44		} else {
45			for _, port := range subsets[i].Ports {
46				mapAddressesByPort(&subsets[i], port, allAddrs, portToAddrReadyMap)
47			}
48		}
49	}
50
51	// Next, map the sets of hosts to the sets of ports they offer.
52	// Go does not allow maps or slices as keys to maps, so we have
53	// to synthesize an artificial key and do a sort of 2-part
54	// associative entity.
55	type keyString string
56	keyToAddrReadyMap := map[keyString]addressSet{}
57	addrReadyMapKeyToPorts := map[keyString][]v1.EndpointPort{}
58	for port, addrs := range portToAddrReadyMap {
59		key := keyString(hashAddresses(addrs))
60		keyToAddrReadyMap[key] = addrs
61		if port.Port > 0 { // avoid sentinels
62			addrReadyMapKeyToPorts[key] = append(addrReadyMapKeyToPorts[key], port)
63		} else {
64			if _, found := addrReadyMapKeyToPorts[key]; !found {
65				// Force it to be present in the map
66				addrReadyMapKeyToPorts[key] = nil
67			}
68		}
69	}
70
71	// Next, build the N-to-M association the API wants.
72	final := []v1.EndpointSubset{}
73	for key, ports := range addrReadyMapKeyToPorts {
74		var readyAddrs, notReadyAddrs []v1.EndpointAddress
75		for addr, ready := range keyToAddrReadyMap[key] {
76			if ready {
77				readyAddrs = append(readyAddrs, *addr)
78			} else {
79				notReadyAddrs = append(notReadyAddrs, *addr)
80			}
81		}
82		final = append(final, v1.EndpointSubset{Addresses: readyAddrs, NotReadyAddresses: notReadyAddrs, Ports: ports})
83	}
84
85	// Finally, sort it.
86	return SortSubsets(final)
87}
88
89// The sets of hosts must be de-duped, using IP+UID as the key.
90type addressKey struct {
91	ip  string
92	uid types.UID
93}
94
95// mapAddressesByPort adds all ready and not-ready addresses into a map by a single port.
96func mapAddressesByPort(subset *v1.EndpointSubset, port v1.EndpointPort, allAddrs map[addressKey]*v1.EndpointAddress, portToAddrReadyMap map[v1.EndpointPort]addressSet) {
97	for k := range subset.Addresses {
98		mapAddressByPort(&subset.Addresses[k], port, true, allAddrs, portToAddrReadyMap)
99	}
100	for k := range subset.NotReadyAddresses {
101		mapAddressByPort(&subset.NotReadyAddresses[k], port, false, allAddrs, portToAddrReadyMap)
102	}
103}
104
105// mapAddressByPort adds one address into a map by port, registering the address with a unique pointer, and preserving
106// any existing ready state.
107func mapAddressByPort(addr *v1.EndpointAddress, port v1.EndpointPort, ready bool, allAddrs map[addressKey]*v1.EndpointAddress, portToAddrReadyMap map[v1.EndpointPort]addressSet) *v1.EndpointAddress {
108	// use addressKey to distinguish between two endpoints that are identical addresses
109	// but may have come from different hosts, for attribution. For instance, Mesos
110	// assigns pods the node IP, but the pods are distinct.
111	key := addressKey{ip: addr.IP}
112	if addr.TargetRef != nil {
113		key.uid = addr.TargetRef.UID
114	}
115
116	// Accumulate the address. The full EndpointAddress structure is preserved for use when
117	// we rebuild the subsets so that the final TargetRef has all of the necessary data.
118	existingAddress := allAddrs[key]
119	if existingAddress == nil {
120		// Make a copy so we don't write to the
121		// input args of this function.
122		existingAddress = &v1.EndpointAddress{}
123		*existingAddress = *addr
124		allAddrs[key] = existingAddress
125	}
126
127	// Remember that this port maps to this address.
128	if _, found := portToAddrReadyMap[port]; !found {
129		portToAddrReadyMap[port] = addressSet{}
130	}
131	// if we have not yet recorded this port for this address, or if the previous
132	// state was ready, write the current ready state. not ready always trumps
133	// ready.
134	if wasReady, found := portToAddrReadyMap[port][existingAddress]; !found || wasReady {
135		portToAddrReadyMap[port][existingAddress] = ready
136	}
137	return existingAddress
138}
139
140type addressSet map[*v1.EndpointAddress]bool
141
142type addrReady struct {
143	addr  *v1.EndpointAddress
144	ready bool
145}
146
147func hashAddresses(addrs addressSet) string {
148	// Flatten the list of addresses into a string so it can be used as a
149	// map key.  Unfortunately, DeepHashObject is implemented in terms of
150	// spew, and spew does not handle non-primitive map keys well.  So
151	// first we collapse it into a slice, sort the slice, then hash that.
152	slice := make([]addrReady, 0, len(addrs))
153	for k, ready := range addrs {
154		slice = append(slice, addrReady{k, ready})
155	}
156	sort.Sort(addrsReady(slice))
157	hasher := md5.New()
158	hashutil.DeepHashObject(hasher, slice)
159	return hex.EncodeToString(hasher.Sum(nil)[0:])
160}
161
162func lessAddrReady(a, b addrReady) bool {
163	// ready is not significant to hashing since we can't have duplicate addresses
164	return LessEndpointAddress(a.addr, b.addr)
165}
166
167type addrsReady []addrReady
168
169func (sl addrsReady) Len() int      { return len(sl) }
170func (sl addrsReady) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i] }
171func (sl addrsReady) Less(i, j int) bool {
172	return lessAddrReady(sl[i], sl[j])
173}
174
175// LessEndpointAddress compares IP addresses lexicographically and returns true if first argument is lesser than second
176func LessEndpointAddress(a, b *v1.EndpointAddress) bool {
177	ipComparison := bytes.Compare([]byte(a.IP), []byte(b.IP))
178	if ipComparison != 0 {
179		return ipComparison < 0
180	}
181	if b.TargetRef == nil {
182		return false
183	}
184	if a.TargetRef == nil {
185		return true
186	}
187	return a.TargetRef.UID < b.TargetRef.UID
188}
189
190// SortSubsets sorts an array of EndpointSubset objects in place.  For ease of
191// use it returns the input slice.
192func SortSubsets(subsets []v1.EndpointSubset) []v1.EndpointSubset {
193	for i := range subsets {
194		ss := &subsets[i]
195		sort.Sort(addrsByIPAndUID(ss.Addresses))
196		sort.Sort(addrsByIPAndUID(ss.NotReadyAddresses))
197		sort.Sort(portsByHash(ss.Ports))
198	}
199	sort.Sort(subsetsByHash(subsets))
200	return subsets
201}
202
203func hashObject(hasher hash.Hash, obj interface{}) []byte {
204	hashutil.DeepHashObject(hasher, obj)
205	return hasher.Sum(nil)
206}
207
208type subsetsByHash []v1.EndpointSubset
209
210func (sl subsetsByHash) Len() int      { return len(sl) }
211func (sl subsetsByHash) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i] }
212func (sl subsetsByHash) Less(i, j int) bool {
213	hasher := md5.New()
214	h1 := hashObject(hasher, sl[i])
215	h2 := hashObject(hasher, sl[j])
216	return bytes.Compare(h1, h2) < 0
217}
218
219type addrsByIPAndUID []v1.EndpointAddress
220
221func (sl addrsByIPAndUID) Len() int      { return len(sl) }
222func (sl addrsByIPAndUID) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i] }
223func (sl addrsByIPAndUID) Less(i, j int) bool {
224	return LessEndpointAddress(&sl[i], &sl[j])
225}
226
227type portsByHash []v1.EndpointPort
228
229func (sl portsByHash) Len() int      { return len(sl) }
230func (sl portsByHash) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i] }
231func (sl portsByHash) Less(i, j int) bool {
232	hasher := md5.New()
233	h1 := hashObject(hasher, sl[i])
234	h2 := hashObject(hasher, sl[j])
235	return bytes.Compare(h1, h2) < 0
236}
237