1// Copyright 2014 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package longtest_test 16 17import ( 18 "context" 19 "fmt" 20 "log" 21 "math/rand" 22 "strconv" 23 "strings" 24 "sync" 25 "testing" 26 "time" 27 28 "cloud.google.com/go/internal/testutil" 29 "cloud.google.com/go/pubsub" 30 "google.golang.org/api/iterator" 31 "google.golang.org/api/option" 32 "google.golang.org/grpc/codes" 33 "google.golang.org/grpc/status" 34) 35 36const ( 37 timeout = time.Minute * 10 38 ackDeadline = time.Second * 10 39 nMessages = 1e4 40 acceptableDupPercentage = 1 41 numAcceptableDups = int(nMessages * acceptableDupPercentage / 100) 42 resourcePrefix = "endtoend" 43) 44 45// The end-to-end pumps many messages into a topic and tests that they are all 46// delivered to each subscription for the topic. It also tests that messages 47// are not unexpectedly redelivered. 48func TestEndToEnd_Dupes(t *testing.T) { 49 t.Skip("https://github.com/googleapis/google-cloud-go/issues/1752") 50 51 ctx, cancel := context.WithTimeout(context.Background(), timeout) 52 defer cancel() 53 client, topic, cleanup := prepareEndToEndTest(ctx, t) 54 defer cleanup() 55 subPrefix := fmt.Sprintf("%s-%d", resourcePrefix, time.Now().UnixNano()) 56 57 // Two subscriptions to the same topic. 58 var err error 59 var subs [2]*pubsub.Subscription 60 for i := 0; i < len(subs); i++ { 61 subs[i], err = client.CreateSubscription(ctx, fmt.Sprintf("%s-%d", subPrefix, i), pubsub.SubscriptionConfig{ 62 Topic: topic, 63 AckDeadline: ackDeadline, 64 }) 65 if err != nil { 66 t.Fatalf("CreateSub error: %v", err) 67 } 68 defer subs[i].Delete(ctx) 69 } 70 71 err = publish(ctx, topic, nMessages) 72 topic.Stop() 73 if err != nil { 74 t.Fatalf("publish: %v", err) 75 } 76 77 // recv provides an indication that messages are still arriving. 78 recv := make(chan struct{}) 79 // We have two subscriptions to our topic. 80 // Each subscription will get a copy of each published message. 81 var wg sync.WaitGroup 82 cctx, cancel := context.WithTimeout(ctx, timeout) 83 defer cancel() 84 85 consumers := []*consumer{ 86 { 87 counts: make(map[string]int), 88 recv: recv, 89 durations: []time.Duration{time.Hour}, 90 done: make(chan struct{}), 91 }, 92 { 93 counts: make(map[string]int), 94 recv: recv, 95 durations: []time.Duration{ackDeadline, ackDeadline, ackDeadline / 2, ackDeadline / 2, time.Hour}, 96 done: make(chan struct{}), 97 }, 98 } 99 for i, con := range consumers { 100 con := con 101 sub := subs[i] 102 wg.Add(1) 103 go func() { 104 defer wg.Done() 105 con.consume(ctx, t, sub) 106 }() 107 } 108 // Wait for a while after the last message before declaring quiescence. 109 // We wait a multiple of the ack deadline, for two reasons: 110 // 1. To detect if messages are redelivered after having their ack 111 // deadline extended. 112 // 2. To wait for redelivery of messages that were en route when a Receive 113 // is canceled. This can take considerably longer than the ack deadline. 114 quiescenceDur := ackDeadline * 6 115 quiescenceTimer := time.NewTimer(quiescenceDur) 116 117loop: 118 for { 119 select { 120 case <-recv: 121 // Reset timer so we wait quiescenceDur after the last message. 122 // See https://godoc.org/time#Timer.Reset for why the Stop 123 // and channel drain are necessary. 124 if !quiescenceTimer.Stop() { 125 <-quiescenceTimer.C 126 } 127 quiescenceTimer.Reset(quiescenceDur) 128 129 case <-quiescenceTimer.C: 130 cancel() 131 log.Println("quiesced") 132 break loop 133 134 case <-cctx.Done(): 135 t.Fatal("timed out") 136 } 137 } 138 wg.Wait() 139 close(recv) 140 for i, con := range consumers { 141 var numDups int 142 var zeroes int 143 for _, v := range con.counts { 144 if v == 0 { 145 zeroes++ 146 } 147 numDups += v - 1 148 } 149 150 if zeroes > 0 { 151 t.Errorf("Consumer %d: %d messages never arrived", i, zeroes) 152 } else if numDups > numAcceptableDups { 153 t.Errorf("Consumer %d: Willing to accept %d dups (%v%% duplicated of %d messages), but got %d", i, numAcceptableDups, acceptableDupPercentage, int(nMessages), numDups) 154 } 155 } 156 157 for i, con := range consumers { 158 select { 159 case <-con.done: 160 case <-time.After(15 * time.Second): 161 t.Fatalf("timed out waiting for consumer %d to finish", i) 162 } 163 } 164} 165 166func TestEndToEnd_LongProcessingTime(t *testing.T) { 167 ctx, cancel := context.WithTimeout(context.Background(), timeout) 168 defer cancel() 169 client, topic, cleanup := prepareEndToEndTest(ctx, t) 170 defer cleanup() 171 subPrefix := fmt.Sprintf("%s-%d", resourcePrefix, time.Now().UnixNano()) 172 173 // Two subscriptions to the same topic. 174 sub, err := client.CreateSubscription(ctx, subPrefix+"-00", pubsub.SubscriptionConfig{ 175 Topic: topic, 176 AckDeadline: ackDeadline, 177 }) 178 if err != nil { 179 t.Fatalf("CreateSub error: %v", err) 180 } 181 defer sub.Delete(ctx) 182 183 // Tests the issue found in https://github.com/googleapis/google-cloud-go/issues/1247. 184 sub.ReceiveSettings.Synchronous = true 185 sub.ReceiveSettings.MaxOutstandingMessages = 500 186 187 err = publish(ctx, topic, 500) 188 topic.Stop() 189 if err != nil { 190 t.Fatalf("publish: %v", err) 191 } 192 193 // recv provides an indication that messages are still arriving. 194 recv := make(chan struct{}) 195 consumer := consumer{ 196 counts: make(map[string]int), 197 recv: recv, 198 durations: []time.Duration{time.Hour}, 199 processingDelay: func() time.Duration { 200 return time.Duration(1+rand.Int63n(120)) * time.Second 201 }, 202 done: make(chan struct{}), 203 } 204 go consumer.consume(ctx, t, sub) 205 // Wait for a while after the last message before declaring quiescence. 206 // We wait a multiple of the ack deadline, for two reasons: 207 // 1. To detect if messages are redelivered after having their ack 208 // deadline extended. 209 // 2. To wait for redelivery of messages that were en route when a Receive 210 // is canceled. This can take considerably longer than the ack deadline. 211 quiescenceDur := 12 * ackDeadline 212 quiescenceTimer := time.NewTimer(quiescenceDur) 213loop: 214 for { 215 select { 216 case <-recv: 217 // Reset timer so we wait quiescenceDur after the last message. 218 // See https://godoc.org/time#Timer.Reset for why the Stop 219 // and channel drain are necessary. 220 if !quiescenceTimer.Stop() { 221 <-quiescenceTimer.C 222 } 223 quiescenceTimer.Reset(quiescenceDur) 224 225 case <-quiescenceTimer.C: 226 cancel() 227 log.Println("quiesced") 228 break loop 229 230 case <-ctx.Done(): 231 t.Fatal("timed out") 232 } 233 } 234 close(recv) 235 var numDups int 236 var zeroes int 237 for _, v := range consumer.counts { 238 if v == 0 { 239 zeroes++ 240 } 241 numDups += v - 1 242 } 243 244 if zeroes > 0 { 245 t.Errorf("%d messages never arrived", zeroes) 246 } else if numDups > numAcceptableDups { 247 t.Errorf("Willing to accept %d dups (%v duplicated of %d messages), but got %d", numAcceptableDups, acceptableDupPercentage, int(nMessages), numDups) 248 } 249 250 select { 251 case <-consumer.done: 252 case <-time.After(15 * time.Second): 253 t.Fatal("timed out waiting for consumer to finish") 254 } 255} 256 257// publish publishes n messages to topic. 258func publish(ctx context.Context, topic *pubsub.Topic, n int) error { 259 var rs []*pubsub.PublishResult 260 for i := 0; i < n; i++ { 261 m := &pubsub.Message{Data: []byte(fmt.Sprintf("msg %d", i))} 262 rs = append(rs, topic.Publish(ctx, m)) 263 } 264 for _, r := range rs { 265 _, err := r.Get(ctx) 266 if err != nil { 267 return err 268 } 269 } 270 return nil 271} 272 273// consumer consumes messages according to its configuration. 274type consumer struct { 275 // A consumer will spin out a Receive for each duration, which will be 276 // canceled after each duration and the next one spun up. For example, if 277 // there are 5 3 second durations, then there will be 5 3 second Receives. 278 durations []time.Duration 279 280 // A value is sent to recv each time process is called. 281 recv chan struct{} 282 283 // How long to wait for before acking. 284 processingDelay func() time.Duration 285 286 mu sync.Mutex 287 counts map[string]int // msgID: recvdAmt 288 totalRecvd int 289 290 // Done consuming. 291 done chan struct{} 292} 293 294// consume reads messages from a subscription, and keeps track of what it receives in mc. 295// After consume returns, the caller should wait on wg to ensure that no more updates to mc will be made. 296func (c *consumer) consume(ctx context.Context, t *testing.T, sub *pubsub.Subscription) { 297 defer close(c.done) 298 for _, dur := range c.durations { 299 ctx2, cancel := context.WithTimeout(ctx, dur) 300 defer cancel() 301 id := sub.String()[len(sub.String())-1:] 302 t.Logf("%s: start receive", id) 303 prev := c.totalRecvd 304 err := sub.Receive(ctx2, c.process) 305 t.Logf("%s: end receive; read %d", id, c.totalRecvd-prev) 306 if serr, _ := status.FromError(err); err != nil && serr.Code() != codes.Canceled { 307 panic(err) 308 } 309 select { 310 case <-ctx.Done(): 311 return 312 default: 313 } 314 } 315} 316 317// process handles a message and records it in mc. 318func (c *consumer) process(_ context.Context, m *pubsub.Message) { 319 c.mu.Lock() 320 c.counts[m.ID]++ 321 c.totalRecvd++ 322 c.mu.Unlock() 323 c.recv <- struct{}{} 324 325 var delay time.Duration 326 if c.processingDelay == nil { 327 delay = time.Duration(rand.Intn(int(ackDeadline * 3))) 328 } else { 329 delay = c.processingDelay() 330 } 331 332 // Simulate time taken to process m, while continuing to process more messages. 333 // Some messages will need to have their ack deadline extended due to this delay. 334 time.AfterFunc(delay, func() { 335 m.Ack() 336 }) 337} 338 339// Remember to call cleanup! 340func prepareEndToEndTest(ctx context.Context, t *testing.T) (*pubsub.Client, *pubsub.Topic, func()) { 341 if testing.Short() { 342 t.Skip("Integration tests skipped in short mode") 343 } 344 ts := testutil.TokenSource(ctx, pubsub.ScopePubSub, pubsub.ScopeCloudPlatform) 345 if ts == nil { 346 t.Skip("Integration tests skipped. See CONTRIBUTING.md for details") 347 } 348 349 now := time.Now() 350 topicName := fmt.Sprintf("%s-%d", resourcePrefix, now.UnixNano()) 351 352 client, err := pubsub.NewClient(ctx, testutil.ProjID(), option.WithTokenSource(ts)) 353 if err != nil { 354 t.Fatalf("Creating client error: %v", err) 355 } 356 357 // Don't stop the test if cleanup failed. 358 if err := cleanupSubscription(ctx, client); err != nil { 359 t.Logf("Pre-test subscription cleanup failed: %v", err) 360 } 361 if err := cleanupTopic(ctx, client); err != nil { 362 t.Logf("Pre-test topic cleanup failed: %v", err) 363 } 364 365 var topic *pubsub.Topic 366 if topic, err = client.CreateTopic(ctx, topicName); err != nil { 367 t.Fatalf("CreateTopic error: %v", err) 368 } 369 370 return client, topic, func() { 371 topic.Delete(ctx) 372 client.Close() 373 } 374} 375 376// cleanupTopic deletes stale testing topics. 377func cleanupTopic(ctx context.Context, client *pubsub.Client) error { 378 if testing.Short() { 379 return nil // Don't clean up in short mode. 380 } 381 // Delete topics which were created a while ago. 382 const expireAge = 24 * time.Hour 383 384 it := client.Topics(ctx) 385 for { 386 t, err := it.Next() 387 if err == iterator.Done { 388 break 389 } 390 if err != nil { 391 return err 392 } 393 // Take timestamp from id. 394 tID := t.ID() 395 p := strings.Split(tID, "-") 396 397 // Only delete resources created from the endtoend test. 398 // Otherwise, this will affect other tests running midflight. 399 if p[0] == resourcePrefix { 400 tCreated := p[len(p)-1] 401 timestamp, err := strconv.ParseInt(tCreated, 10, 64) 402 if err != nil { 403 continue 404 } 405 timeTCreated := time.Unix(0, timestamp) 406 if time.Since(timeTCreated) > expireAge { 407 log.Printf("deleting topic %q", tID) 408 if err := t.Delete(ctx); err != nil { 409 return fmt.Errorf("Delete topic: %v: %v", t.String(), err) 410 } 411 } 412 } 413 } 414 return nil 415} 416 417// cleanupSubscription deletes stale testing subscriptions. 418func cleanupSubscription(ctx context.Context, client *pubsub.Client) error { 419 if testing.Short() { 420 return nil // Don't clean up in short mode. 421 } 422 // Delete subscriptions which were created a while ago. 423 const expireAge = 24 * time.Hour 424 425 it := client.Subscriptions(ctx) 426 for { 427 s, err := it.Next() 428 if err == iterator.Done { 429 break 430 } 431 if err != nil { 432 return err 433 } 434 sID := s.ID() 435 p := strings.Split(sID, "-") 436 437 // Only delete resources created from the endtoend test. 438 // Otherwise, this will affect other tests running midflight. 439 if p[0] == resourcePrefix { 440 sCreated := p[len(p)-2] 441 timestamp, err := strconv.ParseInt(sCreated, 10, 64) 442 if err != nil { 443 continue 444 } 445 timeSCreated := time.Unix(0, timestamp) 446 if time.Since(timeSCreated) > expireAge { 447 log.Printf("deleting subscription %q", sID) 448 if err := s.Delete(ctx); err != nil { 449 return fmt.Errorf("Delete subscription: %v: %v", s.String(), err) 450 } 451 } 452 } 453 } 454 return nil 455} 456