1package ipam 2 3import ( 4 "context" 5 "fmt" 6 "math/rand" 7 "net" 8 "sort" 9 "sync" 10 "sync/atomic" 11 "testing" 12 "time" 13 14 "github.com/docker/libnetwork/ipamapi" 15 "github.com/stretchr/testify/assert" 16 "golang.org/x/sync/semaphore" 17) 18 19const ( 20 all = iota 21 even 22 odd 23) 24 25type releaseMode uint 26 27type testContext struct { 28 a *Allocator 29 opts map[string]string 30 ipList []*net.IPNet 31 ipMap map[string]bool 32 pid string 33 maxIP int 34} 35 36func newTestContext(t *testing.T, mask int, options map[string]string) *testContext { 37 a, err := getAllocator(false) 38 if err != nil { 39 t.Fatal(err) 40 } 41 a.addrSpaces["giallo"] = &addrSpace{ 42 id: dsConfigKey + "/" + "giallo", 43 ds: a.addrSpaces[localAddressSpace].ds, 44 alloc: a.addrSpaces[localAddressSpace].alloc, 45 scope: a.addrSpaces[localAddressSpace].scope, 46 subnets: map[SubnetKey]*PoolData{}, 47 } 48 49 network := fmt.Sprintf("192.168.100.0/%d", mask) 50 // total ips 2^(32-mask) - 2 (network and broadcast) 51 totalIps := 1<<uint(32-mask) - 2 52 53 pid, _, _, err := a.RequestPool("giallo", network, "", nil, false) 54 if err != nil { 55 t.Fatal(err) 56 } 57 58 return &testContext{ 59 a: a, 60 opts: options, 61 ipList: make([]*net.IPNet, 0, totalIps), 62 ipMap: make(map[string]bool), 63 pid: pid, 64 maxIP: totalIps, 65 } 66} 67 68func TestDebug(t *testing.T) { 69 tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"}) 70 tctx.a.RequestAddress(tctx.pid, nil, map[string]string{ipamapi.AllocSerialPrefix: "true"}) 71 tctx.a.RequestAddress(tctx.pid, nil, map[string]string{ipamapi.AllocSerialPrefix: "true"}) 72} 73 74type op struct { 75 id int32 76 add bool 77 name string 78} 79 80func (o *op) String() string { 81 return fmt.Sprintf("%+v", *o) 82} 83 84func TestRequestPoolParallel(t *testing.T) { 85 a, err := getAllocator(false) 86 if err != nil { 87 t.Fatal(err) 88 } 89 var operationIndex int32 90 ch := make(chan *op, 240) 91 for i := 0; i < 120; i++ { 92 go func(t *testing.T, a *Allocator, ch chan *op) { 93 name, _, _, err := a.RequestPool("GlobalDefault", "", "", nil, false) 94 if err != nil { 95 t.Fatalf("request error %v", err) 96 } 97 idx := atomic.AddInt32(&operationIndex, 1) 98 ch <- &op{idx, true, name} 99 time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) 100 idx = atomic.AddInt32(&operationIndex, 1) 101 err = a.ReleasePool(name) 102 if err != nil { 103 t.Fatalf("relase error %v", err) 104 } 105 ch <- &op{idx, false, name} 106 }(t, a, ch) 107 } 108 109 // map of events 110 m := make(map[string][]*op) 111 for i := 0; i < 240; i++ { 112 x := <-ch 113 ops, ok := m[x.name] 114 if !ok { 115 ops = make([]*op, 0, 10) 116 } 117 ops = append(ops, x) 118 m[x.name] = ops 119 } 120 121 // Post processing to avoid event reordering on the channel 122 for pool, ops := range m { 123 sort.Slice(ops[:], func(i, j int) bool { 124 return ops[i].id < ops[j].id 125 }) 126 expected := true 127 for _, op := range ops { 128 if op.add != expected { 129 t.Fatalf("Operations for %v not valid %v, operations %v", pool, op, ops) 130 } 131 expected = !expected 132 } 133 } 134} 135 136func TestFullAllocateRelease(t *testing.T) { 137 for _, parallelism := range []int64{2, 4, 8} { 138 for _, mask := range []int{29, 25, 24, 21} { 139 tctx := newTestContext(t, mask, map[string]string{ipamapi.AllocSerialPrefix: "true"}) 140 allocate(t, tctx, parallelism) 141 release(t, tctx, all, parallelism) 142 } 143 } 144} 145 146func TestOddAllocateRelease(t *testing.T) { 147 for _, parallelism := range []int64{2, 4, 8} { 148 for _, mask := range []int{29, 25, 24, 21} { 149 tctx := newTestContext(t, mask, map[string]string{ipamapi.AllocSerialPrefix: "true"}) 150 allocate(t, tctx, parallelism) 151 release(t, tctx, odd, parallelism) 152 } 153 } 154} 155 156func TestFullAllocateSerialReleaseParallel(t *testing.T) { 157 for _, parallelism := range []int64{1, 4, 8} { 158 tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"}) 159 allocate(t, tctx, 1) 160 release(t, tctx, all, parallelism) 161 } 162} 163 164func TestOddAllocateSerialReleaseParallel(t *testing.T) { 165 for _, parallelism := range []int64{1, 4, 8} { 166 tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"}) 167 allocate(t, tctx, 1) 168 release(t, tctx, odd, parallelism) 169 } 170} 171 172func TestEvenAllocateSerialReleaseParallel(t *testing.T) { 173 for _, parallelism := range []int64{1, 4, 8} { 174 tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"}) 175 allocate(t, tctx, 1) 176 release(t, tctx, even, parallelism) 177 } 178} 179 180func allocate(t *testing.T, tctx *testContext, parallel int64) { 181 // Allocate the whole space 182 parallelExec := semaphore.NewWeighted(parallel) 183 routineNum := tctx.maxIP + 10 184 ch := make(chan *net.IPNet, routineNum) 185 var id int 186 var wg sync.WaitGroup 187 // routine loop 188 for { 189 wg.Add(1) 190 go func(id int) { 191 parallelExec.Acquire(context.Background(), 1) 192 ip, _, _ := tctx.a.RequestAddress(tctx.pid, nil, tctx.opts) 193 ch <- ip 194 parallelExec.Release(1) 195 wg.Done() 196 }(id) 197 id++ 198 if id == routineNum { 199 break 200 } 201 } 202 203 // give time to all the go routines to finish 204 wg.Wait() 205 206 // process results 207 for i := 0; i < routineNum; i++ { 208 ip := <-ch 209 if ip == nil { 210 continue 211 } 212 if there, ok := tctx.ipMap[ip.String()]; ok && there { 213 t.Fatalf("Got duplicate IP %s", ip.String()) 214 break 215 } 216 tctx.ipList = append(tctx.ipList, ip) 217 tctx.ipMap[ip.String()] = true 218 } 219 220 assert.Len(t, tctx.ipList, tctx.maxIP) 221 if len(tctx.ipList) != tctx.maxIP { 222 t.Fatal("missmatch number allocation") 223 } 224} 225 226func release(t *testing.T, tctx *testContext, mode releaseMode, parallel int64) { 227 var startIndex, increment, stopIndex, length int 228 switch mode { 229 case all: 230 startIndex = 0 231 increment = 1 232 stopIndex = tctx.maxIP - 1 233 length = tctx.maxIP 234 case odd, even: 235 if mode == odd { 236 startIndex = 1 237 } 238 increment = 2 239 stopIndex = tctx.maxIP - 1 240 length = tctx.maxIP / 2 241 if tctx.maxIP%2 > 0 { 242 length++ 243 } 244 default: 245 t.Fatal("unsupported mode yet") 246 } 247 248 ipIndex := make([]int, 0, length) 249 // calculate the index to release from the ipList 250 for i := startIndex; ; i += increment { 251 ipIndex = append(ipIndex, i) 252 if i+increment > stopIndex { 253 break 254 } 255 } 256 257 var id int 258 parallelExec := semaphore.NewWeighted(parallel) 259 ch := make(chan *net.IPNet, len(ipIndex)) 260 wg := sync.WaitGroup{} 261 for index := range ipIndex { 262 wg.Add(1) 263 go func(id, index int) { 264 parallelExec.Acquire(context.Background(), 1) 265 // logrus.Errorf("index %v", index) 266 // logrus.Errorf("list %v", tctx.ipList) 267 err := tctx.a.ReleaseAddress(tctx.pid, tctx.ipList[index].IP) 268 if err != nil { 269 t.Fatalf("routine %d got %v", id, err) 270 } 271 ch <- tctx.ipList[index] 272 parallelExec.Release(1) 273 wg.Done() 274 }(id, index) 275 id++ 276 } 277 wg.Wait() 278 279 for i := 0; i < len(ipIndex); i++ { 280 ip := <-ch 281 282 // check if it is really free 283 _, _, err := tctx.a.RequestAddress(tctx.pid, ip.IP, nil) 284 assert.NoError(t, err, "ip %v not properly released", ip) 285 if err != nil { 286 t.Fatalf("ip %v not properly released, error:%v", ip, err) 287 } 288 err = tctx.a.ReleaseAddress(tctx.pid, ip.IP) 289 assert.NoError(t, err) 290 291 if there, ok := tctx.ipMap[ip.String()]; !ok || !there { 292 t.Fatalf("ip %v got double deallocated", ip) 293 } 294 tctx.ipMap[ip.String()] = false 295 for j, v := range tctx.ipList { 296 if v == ip { 297 tctx.ipList = append(tctx.ipList[:j], tctx.ipList[j+1:]...) 298 break 299 } 300 } 301 } 302 303 assert.Len(t, tctx.ipList, tctx.maxIP-length) 304} 305