1package ipam
2
3import (
4	"context"
5	"fmt"
6	"math/rand"
7	"net"
8	"sort"
9	"sync"
10	"sync/atomic"
11	"testing"
12	"time"
13
14	"github.com/docker/libnetwork/ipamapi"
15	"github.com/stretchr/testify/assert"
16	"golang.org/x/sync/semaphore"
17)
18
19const (
20	all = iota
21	even
22	odd
23)
24
25type releaseMode uint
26
27type testContext struct {
28	a      *Allocator
29	opts   map[string]string
30	ipList []*net.IPNet
31	ipMap  map[string]bool
32	pid    string
33	maxIP  int
34}
35
36func newTestContext(t *testing.T, mask int, options map[string]string) *testContext {
37	a, err := getAllocator(false)
38	if err != nil {
39		t.Fatal(err)
40	}
41	a.addrSpaces["giallo"] = &addrSpace{
42		id:      dsConfigKey + "/" + "giallo",
43		ds:      a.addrSpaces[localAddressSpace].ds,
44		alloc:   a.addrSpaces[localAddressSpace].alloc,
45		scope:   a.addrSpaces[localAddressSpace].scope,
46		subnets: map[SubnetKey]*PoolData{},
47	}
48
49	network := fmt.Sprintf("192.168.100.0/%d", mask)
50	// total ips 2^(32-mask) - 2 (network and broadcast)
51	totalIps := 1<<uint(32-mask) - 2
52
53	pid, _, _, err := a.RequestPool("giallo", network, "", nil, false)
54	if err != nil {
55		t.Fatal(err)
56	}
57
58	return &testContext{
59		a:      a,
60		opts:   options,
61		ipList: make([]*net.IPNet, 0, totalIps),
62		ipMap:  make(map[string]bool),
63		pid:    pid,
64		maxIP:  totalIps,
65	}
66}
67
68func TestDebug(t *testing.T) {
69	tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
70	tctx.a.RequestAddress(tctx.pid, nil, map[string]string{ipamapi.AllocSerialPrefix: "true"})
71	tctx.a.RequestAddress(tctx.pid, nil, map[string]string{ipamapi.AllocSerialPrefix: "true"})
72}
73
74type op struct {
75	id   int32
76	add  bool
77	name string
78}
79
80func (o *op) String() string {
81	return fmt.Sprintf("%+v", *o)
82}
83
84func TestRequestPoolParallel(t *testing.T) {
85	a, err := getAllocator(false)
86	if err != nil {
87		t.Fatal(err)
88	}
89	var operationIndex int32
90	ch := make(chan *op, 240)
91	for i := 0; i < 120; i++ {
92		go func(t *testing.T, a *Allocator, ch chan *op) {
93			name, _, _, err := a.RequestPool("GlobalDefault", "", "", nil, false)
94			if err != nil {
95				t.Fatalf("request error %v", err)
96			}
97			idx := atomic.AddInt32(&operationIndex, 1)
98			ch <- &op{idx, true, name}
99			time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
100			idx = atomic.AddInt32(&operationIndex, 1)
101			err = a.ReleasePool(name)
102			if err != nil {
103				t.Fatalf("relase error %v", err)
104			}
105			ch <- &op{idx, false, name}
106		}(t, a, ch)
107	}
108
109	// map of events
110	m := make(map[string][]*op)
111	for i := 0; i < 240; i++ {
112		x := <-ch
113		ops, ok := m[x.name]
114		if !ok {
115			ops = make([]*op, 0, 10)
116		}
117		ops = append(ops, x)
118		m[x.name] = ops
119	}
120
121	// Post processing to avoid event reordering on the channel
122	for pool, ops := range m {
123		sort.Slice(ops[:], func(i, j int) bool {
124			return ops[i].id < ops[j].id
125		})
126		expected := true
127		for _, op := range ops {
128			if op.add != expected {
129				t.Fatalf("Operations for %v not valid %v, operations %v", pool, op, ops)
130			}
131			expected = !expected
132		}
133	}
134}
135
136func TestFullAllocateRelease(t *testing.T) {
137	for _, parallelism := range []int64{2, 4, 8} {
138		for _, mask := range []int{29, 25, 24, 21} {
139			tctx := newTestContext(t, mask, map[string]string{ipamapi.AllocSerialPrefix: "true"})
140			allocate(t, tctx, parallelism)
141			release(t, tctx, all, parallelism)
142		}
143	}
144}
145
146func TestOddAllocateRelease(t *testing.T) {
147	for _, parallelism := range []int64{2, 4, 8} {
148		for _, mask := range []int{29, 25, 24, 21} {
149			tctx := newTestContext(t, mask, map[string]string{ipamapi.AllocSerialPrefix: "true"})
150			allocate(t, tctx, parallelism)
151			release(t, tctx, odd, parallelism)
152		}
153	}
154}
155
156func TestFullAllocateSerialReleaseParallel(t *testing.T) {
157	for _, parallelism := range []int64{1, 4, 8} {
158		tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
159		allocate(t, tctx, 1)
160		release(t, tctx, all, parallelism)
161	}
162}
163
164func TestOddAllocateSerialReleaseParallel(t *testing.T) {
165	for _, parallelism := range []int64{1, 4, 8} {
166		tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
167		allocate(t, tctx, 1)
168		release(t, tctx, odd, parallelism)
169	}
170}
171
172func TestEvenAllocateSerialReleaseParallel(t *testing.T) {
173	for _, parallelism := range []int64{1, 4, 8} {
174		tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
175		allocate(t, tctx, 1)
176		release(t, tctx, even, parallelism)
177	}
178}
179
180func allocate(t *testing.T, tctx *testContext, parallel int64) {
181	// Allocate the whole space
182	parallelExec := semaphore.NewWeighted(parallel)
183	routineNum := tctx.maxIP + 10
184	ch := make(chan *net.IPNet, routineNum)
185	var id int
186	var wg sync.WaitGroup
187	// routine loop
188	for {
189		wg.Add(1)
190		go func(id int) {
191			parallelExec.Acquire(context.Background(), 1)
192			ip, _, _ := tctx.a.RequestAddress(tctx.pid, nil, tctx.opts)
193			ch <- ip
194			parallelExec.Release(1)
195			wg.Done()
196		}(id)
197		id++
198		if id == routineNum {
199			break
200		}
201	}
202
203	// give time to all the go routines to finish
204	wg.Wait()
205
206	// process results
207	for i := 0; i < routineNum; i++ {
208		ip := <-ch
209		if ip == nil {
210			continue
211		}
212		if there, ok := tctx.ipMap[ip.String()]; ok && there {
213			t.Fatalf("Got duplicate IP %s", ip.String())
214			break
215		}
216		tctx.ipList = append(tctx.ipList, ip)
217		tctx.ipMap[ip.String()] = true
218	}
219
220	assert.Len(t, tctx.ipList, tctx.maxIP)
221	if len(tctx.ipList) != tctx.maxIP {
222		t.Fatal("missmatch number allocation")
223	}
224}
225
226func release(t *testing.T, tctx *testContext, mode releaseMode, parallel int64) {
227	var startIndex, increment, stopIndex, length int
228	switch mode {
229	case all:
230		startIndex = 0
231		increment = 1
232		stopIndex = tctx.maxIP - 1
233		length = tctx.maxIP
234	case odd, even:
235		if mode == odd {
236			startIndex = 1
237		}
238		increment = 2
239		stopIndex = tctx.maxIP - 1
240		length = tctx.maxIP / 2
241		if tctx.maxIP%2 > 0 {
242			length++
243		}
244	default:
245		t.Fatal("unsupported mode yet")
246	}
247
248	ipIndex := make([]int, 0, length)
249	// calculate the index to release from the ipList
250	for i := startIndex; ; i += increment {
251		ipIndex = append(ipIndex, i)
252		if i+increment > stopIndex {
253			break
254		}
255	}
256
257	var id int
258	parallelExec := semaphore.NewWeighted(parallel)
259	ch := make(chan *net.IPNet, len(ipIndex))
260	wg := sync.WaitGroup{}
261	for index := range ipIndex {
262		wg.Add(1)
263		go func(id, index int) {
264			parallelExec.Acquire(context.Background(), 1)
265			// logrus.Errorf("index %v", index)
266			// logrus.Errorf("list %v", tctx.ipList)
267			err := tctx.a.ReleaseAddress(tctx.pid, tctx.ipList[index].IP)
268			if err != nil {
269				t.Fatalf("routine %d got %v", id, err)
270			}
271			ch <- tctx.ipList[index]
272			parallelExec.Release(1)
273			wg.Done()
274		}(id, index)
275		id++
276	}
277	wg.Wait()
278
279	for i := 0; i < len(ipIndex); i++ {
280		ip := <-ch
281
282		// check if it is really free
283		_, _, err := tctx.a.RequestAddress(tctx.pid, ip.IP, nil)
284		assert.NoError(t, err, "ip %v not properly released", ip)
285		if err != nil {
286			t.Fatalf("ip %v not properly released, error:%v", ip, err)
287		}
288		err = tctx.a.ReleaseAddress(tctx.pid, ip.IP)
289		assert.NoError(t, err)
290
291		if there, ok := tctx.ipMap[ip.String()]; !ok || !there {
292			t.Fatalf("ip %v got double deallocated", ip)
293		}
294		tctx.ipMap[ip.String()] = false
295		for j, v := range tctx.ipList {
296			if v == ip {
297				tctx.ipList = append(tctx.ipList[:j], tctx.ipList[j+1:]...)
298				break
299			}
300		}
301	}
302
303	assert.Len(t, tctx.ipList, tctx.maxIP-length)
304}
305