1// Copyright 2018 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package tester 16 17import ( 18 "context" 19 "fmt" 20 "math/rand" 21 "sync" 22 "sync/atomic" 23 "time" 24 25 "go.etcd.io/etcd/clientv3" 26 "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" 27 "go.etcd.io/etcd/functional/rpcpb" 28 29 "go.uber.org/zap" 30 "golang.org/x/time/rate" 31 "google.golang.org/grpc" 32) 33 34const ( 35 // time to live for lease 36 defaultTTL = 120 37 defaultTTLShort = 2 38) 39 40type leaseStresser struct { 41 stype rpcpb.StresserType 42 lg *zap.Logger 43 44 m *rpcpb.Member 45 cli *clientv3.Client 46 ctx context.Context 47 cancel func() 48 49 rateLimiter *rate.Limiter 50 // atomicModifiedKey records the number of keys created and deleted during a test case 51 atomicModifiedKey int64 52 numLeases int 53 keysPerLease int 54 55 aliveLeases *atomicLeases 56 revokedLeases *atomicLeases 57 shortLivedLeases *atomicLeases 58 59 runWg sync.WaitGroup 60 aliveWg sync.WaitGroup 61} 62 63type atomicLeases struct { 64 // rwLock is used to protect read/write access of leases map 65 // which are accessed and modified by different go routines. 66 rwLock sync.RWMutex 67 leases map[int64]time.Time 68} 69 70func (al *atomicLeases) add(leaseID int64, t time.Time) { 71 al.rwLock.Lock() 72 al.leases[leaseID] = t 73 al.rwLock.Unlock() 74} 75 76func (al *atomicLeases) update(leaseID int64, t time.Time) { 77 al.rwLock.Lock() 78 _, ok := al.leases[leaseID] 79 if ok { 80 al.leases[leaseID] = t 81 } 82 al.rwLock.Unlock() 83} 84 85func (al *atomicLeases) read(leaseID int64) (rv time.Time, ok bool) { 86 al.rwLock.RLock() 87 rv, ok = al.leases[leaseID] 88 al.rwLock.RUnlock() 89 return rv, ok 90} 91 92func (al *atomicLeases) remove(leaseID int64) { 93 al.rwLock.Lock() 94 delete(al.leases, leaseID) 95 al.rwLock.Unlock() 96} 97 98func (al *atomicLeases) getLeasesMap() map[int64]time.Time { 99 leasesCopy := make(map[int64]time.Time) 100 al.rwLock.RLock() 101 for k, v := range al.leases { 102 leasesCopy[k] = v 103 } 104 al.rwLock.RUnlock() 105 return leasesCopy 106} 107 108func (ls *leaseStresser) setupOnce() error { 109 if ls.aliveLeases != nil { 110 return nil 111 } 112 if ls.numLeases == 0 { 113 panic("expect numLeases to be set") 114 } 115 if ls.keysPerLease == 0 { 116 panic("expect keysPerLease to be set") 117 } 118 119 ls.aliveLeases = &atomicLeases{leases: make(map[int64]time.Time)} 120 return nil 121} 122 123func (ls *leaseStresser) Stress() error { 124 ls.lg.Info( 125 "stress START", 126 zap.String("stress-type", ls.stype.String()), 127 zap.String("endpoint", ls.m.EtcdClientEndpoint), 128 ) 129 130 if err := ls.setupOnce(); err != nil { 131 return err 132 } 133 134 ctx, cancel := context.WithCancel(context.Background()) 135 ls.ctx = ctx 136 ls.cancel = cancel 137 138 cli, err := ls.m.CreateEtcdClient(grpc.WithBackoffMaxDelay(1 * time.Second)) 139 if err != nil { 140 return fmt.Errorf("%v (%s)", err, ls.m.EtcdClientEndpoint) 141 } 142 ls.cli = cli 143 144 ls.revokedLeases = &atomicLeases{leases: make(map[int64]time.Time)} 145 ls.shortLivedLeases = &atomicLeases{leases: make(map[int64]time.Time)} 146 147 ls.runWg.Add(1) 148 go ls.run() 149 return nil 150} 151 152func (ls *leaseStresser) run() { 153 defer ls.runWg.Done() 154 ls.restartKeepAlives() 155 for { 156 // the number of keys created and deleted is roughly 2x the number of created keys for an iteration. 157 // the rateLimiter therefore consumes 2x ls.numLeases*ls.keysPerLease tokens where each token represents a create/delete operation for key. 158 err := ls.rateLimiter.WaitN(ls.ctx, 2*ls.numLeases*ls.keysPerLease) 159 if err == context.Canceled { 160 return 161 } 162 163 ls.lg.Debug( 164 "stress creating leases", 165 zap.String("stress-type", ls.stype.String()), 166 zap.String("endpoint", ls.m.EtcdClientEndpoint), 167 ) 168 ls.createLeases() 169 ls.lg.Debug( 170 "stress created leases", 171 zap.String("stress-type", ls.stype.String()), 172 zap.String("endpoint", ls.m.EtcdClientEndpoint), 173 ) 174 175 ls.lg.Debug( 176 "stress dropped leases", 177 zap.String("stress-type", ls.stype.String()), 178 zap.String("endpoint", ls.m.EtcdClientEndpoint), 179 ) 180 ls.randomlyDropLeases() 181 ls.lg.Debug( 182 "stress dropped leases", 183 zap.String("stress-type", ls.stype.String()), 184 zap.String("endpoint", ls.m.EtcdClientEndpoint), 185 ) 186 } 187} 188 189func (ls *leaseStresser) restartKeepAlives() { 190 for leaseID := range ls.aliveLeases.getLeasesMap() { 191 ls.aliveWg.Add(1) 192 go func(id int64) { 193 ls.keepLeaseAlive(id) 194 }(leaseID) 195 } 196} 197 198func (ls *leaseStresser) createLeases() { 199 ls.createAliveLeases() 200 ls.createShortLivedLeases() 201} 202 203func (ls *leaseStresser) createAliveLeases() { 204 neededLeases := ls.numLeases - len(ls.aliveLeases.getLeasesMap()) 205 var wg sync.WaitGroup 206 for i := 0; i < neededLeases; i++ { 207 wg.Add(1) 208 go func() { 209 defer wg.Done() 210 leaseID, err := ls.createLeaseWithKeys(defaultTTL) 211 if err != nil { 212 ls.lg.Debug( 213 "createLeaseWithKeys failed", 214 zap.String("endpoint", ls.m.EtcdClientEndpoint), 215 zap.Error(err), 216 ) 217 return 218 } 219 ls.aliveLeases.add(leaseID, time.Now()) 220 // keep track of all the keep lease alive go routines 221 ls.aliveWg.Add(1) 222 go ls.keepLeaseAlive(leaseID) 223 }() 224 } 225 wg.Wait() 226} 227 228func (ls *leaseStresser) createShortLivedLeases() { 229 // one round of createLeases() might not create all the short lived leases we want due to falures. 230 // thus, we want to create remaining short lived leases in the future round. 231 neededLeases := ls.numLeases - len(ls.shortLivedLeases.getLeasesMap()) 232 var wg sync.WaitGroup 233 for i := 0; i < neededLeases; i++ { 234 wg.Add(1) 235 go func() { 236 defer wg.Done() 237 leaseID, err := ls.createLeaseWithKeys(defaultTTLShort) 238 if err != nil { 239 return 240 } 241 ls.shortLivedLeases.add(leaseID, time.Now()) 242 }() 243 } 244 wg.Wait() 245} 246 247func (ls *leaseStresser) createLeaseWithKeys(ttl int64) (int64, error) { 248 leaseID, err := ls.createLease(ttl) 249 if err != nil { 250 ls.lg.Debug( 251 "createLease failed", 252 zap.String("stress-type", ls.stype.String()), 253 zap.String("endpoint", ls.m.EtcdClientEndpoint), 254 zap.Error(err), 255 ) 256 return -1, err 257 } 258 259 ls.lg.Debug( 260 "createLease created lease", 261 zap.String("stress-type", ls.stype.String()), 262 zap.String("endpoint", ls.m.EtcdClientEndpoint), 263 zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), 264 ) 265 if err := ls.attachKeysWithLease(leaseID); err != nil { 266 return -1, err 267 } 268 return leaseID, nil 269} 270 271func (ls *leaseStresser) randomlyDropLeases() { 272 var wg sync.WaitGroup 273 for l := range ls.aliveLeases.getLeasesMap() { 274 wg.Add(1) 275 go func(leaseID int64) { 276 defer wg.Done() 277 dropped, err := ls.randomlyDropLease(leaseID) 278 // if randomlyDropLease encountered an error such as context is cancelled, remove the lease from aliveLeases 279 // because we can't tell whether the lease is dropped or not. 280 if err != nil { 281 ls.lg.Debug( 282 "randomlyDropLease failed", 283 zap.String("endpoint", ls.m.EtcdClientEndpoint), 284 zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), 285 zap.Error(err), 286 ) 287 ls.aliveLeases.remove(leaseID) 288 return 289 } 290 if !dropped { 291 return 292 } 293 ls.lg.Debug( 294 "randomlyDropLease dropped a lease", 295 zap.String("stress-type", ls.stype.String()), 296 zap.String("endpoint", ls.m.EtcdClientEndpoint), 297 zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), 298 ) 299 ls.revokedLeases.add(leaseID, time.Now()) 300 ls.aliveLeases.remove(leaseID) 301 }(l) 302 } 303 wg.Wait() 304} 305 306func (ls *leaseStresser) createLease(ttl int64) (int64, error) { 307 resp, err := ls.cli.Grant(ls.ctx, ttl) 308 if err != nil { 309 return -1, err 310 } 311 return int64(resp.ID), nil 312} 313 314func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { 315 defer ls.aliveWg.Done() 316 ctx, cancel := context.WithCancel(ls.ctx) 317 stream, err := ls.cli.KeepAlive(ctx, clientv3.LeaseID(leaseID)) 318 defer func() { cancel() }() 319 for { 320 select { 321 case <-time.After(500 * time.Millisecond): 322 case <-ls.ctx.Done(): 323 ls.lg.Debug( 324 "keepLeaseAlive context canceled", 325 zap.String("stress-type", ls.stype.String()), 326 zap.String("endpoint", ls.m.EtcdClientEndpoint), 327 zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), 328 zap.Error(ls.ctx.Err()), 329 ) 330 // it is possible that lease expires at invariant checking phase but not at keepLeaseAlive() phase. 331 // this scenerio is possible when alive lease is just about to expire when keepLeaseAlive() exists and expires at invariant checking phase. 332 // to circumvent that scenerio, we check each lease before keepalive loop exist to see if it has been renewed in last TTL/2 duration. 333 // if it is renewed, this means that invariant checking have at least ttl/2 time before lease exipres which is long enough for the checking to finish. 334 // if it is not renewed, we remove the lease from the alive map so that the lease doesn't exipre during invariant checking 335 renewTime, ok := ls.aliveLeases.read(leaseID) 336 if ok && renewTime.Add(defaultTTL/2*time.Second).Before(time.Now()) { 337 ls.aliveLeases.remove(leaseID) 338 ls.lg.Debug( 339 "keepLeaseAlive lease has not been renewed, dropped it", 340 zap.String("stress-type", ls.stype.String()), 341 zap.String("endpoint", ls.m.EtcdClientEndpoint), 342 zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), 343 ) 344 } 345 return 346 } 347 348 if err != nil { 349 ls.lg.Debug( 350 "keepLeaseAlive lease creates stream error", 351 zap.String("stress-type", ls.stype.String()), 352 zap.String("endpoint", ls.m.EtcdClientEndpoint), 353 zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), 354 zap.Error(err), 355 ) 356 cancel() 357 ctx, cancel = context.WithCancel(ls.ctx) 358 stream, err = ls.cli.KeepAlive(ctx, clientv3.LeaseID(leaseID)) 359 cancel() 360 continue 361 } 362 if err != nil { 363 ls.lg.Debug( 364 "keepLeaseAlive failed to receive lease keepalive response", 365 zap.String("stress-type", ls.stype.String()), 366 zap.String("endpoint", ls.m.EtcdClientEndpoint), 367 zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), 368 zap.Error(err), 369 ) 370 continue 371 } 372 373 ls.lg.Debug( 374 "keepLeaseAlive waiting on lease stream", 375 zap.String("stress-type", ls.stype.String()), 376 zap.String("endpoint", ls.m.EtcdClientEndpoint), 377 zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), 378 ) 379 leaseRenewTime := time.Now() 380 respRC := <-stream 381 if respRC == nil { 382 ls.lg.Debug( 383 "keepLeaseAlive received nil lease keepalive response", 384 zap.String("stress-type", ls.stype.String()), 385 zap.String("endpoint", ls.m.EtcdClientEndpoint), 386 zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), 387 ) 388 continue 389 } 390 391 // lease expires after TTL become 0 392 // don't send keepalive if the lease has expired 393 if respRC.TTL <= 0 { 394 ls.lg.Debug( 395 "keepLeaseAlive stream received lease keepalive response TTL <= 0", 396 zap.String("stress-type", ls.stype.String()), 397 zap.String("endpoint", ls.m.EtcdClientEndpoint), 398 zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), 399 zap.Int64("ttl", respRC.TTL), 400 ) 401 ls.aliveLeases.remove(leaseID) 402 return 403 } 404 // renew lease timestamp only if lease is present 405 ls.lg.Debug( 406 "keepLeaseAlive renewed a lease", 407 zap.String("stress-type", ls.stype.String()), 408 zap.String("endpoint", ls.m.EtcdClientEndpoint), 409 zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), 410 ) 411 ls.aliveLeases.update(leaseID, leaseRenewTime) 412 } 413} 414 415// attachKeysWithLease function attaches keys to the lease. 416// the format of key is the concat of leaseID + '_' + '<order of key creation>' 417// e.g 5186835655248304152_0 for first created key and 5186835655248304152_1 for second created key 418func (ls *leaseStresser) attachKeysWithLease(leaseID int64) error { 419 var txnPuts []clientv3.Op 420 for j := 0; j < ls.keysPerLease; j++ { 421 txnput := clientv3.OpPut( 422 fmt.Sprintf("%d%s%d", leaseID, "_", j), 423 fmt.Sprintf("bar"), 424 clientv3.WithLease(clientv3.LeaseID(leaseID)), 425 ) 426 txnPuts = append(txnPuts, txnput) 427 } 428 // keep retrying until lease is not found or ctx is being canceled 429 for ls.ctx.Err() == nil { 430 _, err := ls.cli.Txn(ls.ctx).Then(txnPuts...).Commit() 431 if err == nil { 432 // since all created keys will be deleted too, the number of operations on keys will be roughly 2x the number of created keys 433 atomic.AddInt64(&ls.atomicModifiedKey, 2*int64(ls.keysPerLease)) 434 return nil 435 } 436 if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound { 437 return err 438 } 439 } 440 return ls.ctx.Err() 441} 442 443// randomlyDropLease drops the lease only when the rand.Int(2) returns 1. 444// This creates a 50/50 percents chance of dropping a lease 445func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) { 446 if rand.Intn(2) != 0 { 447 return false, nil 448 } 449 450 // keep retrying until a lease is dropped or ctx is being canceled 451 for ls.ctx.Err() == nil { 452 _, err := ls.cli.Revoke(ls.ctx, clientv3.LeaseID(leaseID)) 453 if err == nil || rpctypes.Error(err) == rpctypes.ErrLeaseNotFound { 454 return true, nil 455 } 456 } 457 458 ls.lg.Debug( 459 "randomlyDropLease error", 460 zap.String("stress-type", ls.stype.String()), 461 zap.String("endpoint", ls.m.EtcdClientEndpoint), 462 zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), 463 zap.Error(ls.ctx.Err()), 464 ) 465 return false, ls.ctx.Err() 466} 467 468func (ls *leaseStresser) Pause() map[string]int { 469 return ls.Close() 470} 471 472func (ls *leaseStresser) Close() map[string]int { 473 ls.cancel() 474 ls.runWg.Wait() 475 ls.aliveWg.Wait() 476 ls.cli.Close() 477 ls.lg.Info( 478 "stress STOP", 479 zap.String("stress-type", ls.stype.String()), 480 zap.String("endpoint", ls.m.EtcdClientEndpoint), 481 ) 482 return nil 483} 484 485func (ls *leaseStresser) ModifiedKeys() int64 { 486 return atomic.LoadInt64(&ls.atomicModifiedKey) 487} 488