1// Copyright 2017 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package pstest 16 17import ( 18 "context" 19 "fmt" 20 "io" 21 "sync" 22 "testing" 23 "time" 24 25 "cloud.google.com/go/internal/testutil" 26 "github.com/golang/protobuf/ptypes" 27 pb "google.golang.org/genproto/googleapis/pubsub/v1" 28 "google.golang.org/grpc" 29 "google.golang.org/grpc/codes" 30 "google.golang.org/grpc/status" 31) 32 33func TestTopics(t *testing.T) { 34 pclient, _, server, cleanup := newFake(context.TODO(), t) 35 defer cleanup() 36 37 ctx := context.Background() 38 var topics []*pb.Topic 39 for i := 1; i < 3; i++ { 40 topics = append(topics, mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{ 41 Name: fmt.Sprintf("projects/P/topics/T%d", i), 42 Labels: map[string]string{"num": fmt.Sprintf("%d", i)}, 43 })) 44 } 45 if got, want := len(server.GServer.topics), len(topics); got != want { 46 t.Fatalf("got %d topics, want %d", got, want) 47 } 48 for _, top := range topics { 49 got, err := pclient.GetTopic(ctx, &pb.GetTopicRequest{Topic: top.Name}) 50 if err != nil { 51 t.Fatal(err) 52 } 53 if !testutil.Equal(got, top) { 54 t.Errorf("\ngot %+v\nwant %+v", got, top) 55 } 56 } 57 58 res, err := pclient.ListTopics(ctx, &pb.ListTopicsRequest{Project: "projects/P"}) 59 if err != nil { 60 t.Fatal(err) 61 } 62 if got, want := res.Topics, topics; !testutil.Equal(got, want) { 63 t.Errorf("\ngot %+v\nwant %+v", got, want) 64 } 65 66 for _, top := range topics { 67 if _, err := pclient.DeleteTopic(ctx, &pb.DeleteTopicRequest{Topic: top.Name}); err != nil { 68 t.Fatal(err) 69 } 70 } 71 if got, want := len(server.GServer.topics), 0; got != want { 72 t.Fatalf("got %d topics, want %d", got, want) 73 } 74} 75 76func TestSubscriptions(t *testing.T) { 77 pclient, sclient, server, cleanup := newFake(context.TODO(), t) 78 defer cleanup() 79 80 ctx := context.Background() 81 topic := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"}) 82 var subs []*pb.Subscription 83 for i := 0; i < 3; i++ { 84 subs = append(subs, mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{ 85 Name: fmt.Sprintf("projects/P/subscriptions/S%d", i), 86 Topic: topic.Name, 87 AckDeadlineSeconds: int32(10 * (i + 1)), 88 })) 89 } 90 91 if got, want := len(server.GServer.subs), len(subs); got != want { 92 t.Fatalf("got %d subscriptions, want %d", got, want) 93 } 94 for _, s := range subs { 95 got, err := sclient.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.Name}) 96 if err != nil { 97 t.Fatal(err) 98 } 99 if !testutil.Equal(got, s) { 100 t.Errorf("\ngot %+v\nwant %+v", got, s) 101 } 102 } 103 104 res, err := sclient.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{Project: "projects/P"}) 105 if err != nil { 106 t.Fatal(err) 107 } 108 if got, want := res.Subscriptions, subs; !testutil.Equal(got, want) { 109 t.Errorf("\ngot %+v\nwant %+v", got, want) 110 } 111 112 res2, err := pclient.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{Topic: topic.Name}) 113 if err != nil { 114 t.Fatal(err) 115 } 116 if got, want := len(res2.Subscriptions), len(subs); got != want { 117 t.Fatalf("got %d subs, want %d", got, want) 118 } 119 for i, got := range res2.Subscriptions { 120 want := subs[i].Name 121 if !testutil.Equal(got, want) { 122 t.Errorf("\ngot %+v\nwant %+v", got, want) 123 } 124 } 125 126 for _, s := range subs { 127 if _, err := sclient.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Subscription: s.Name}); err != nil { 128 t.Fatal(err) 129 } 130 } 131 if got, want := len(server.GServer.subs), 0; got != want { 132 t.Fatalf("got %d subscriptions, want %d", got, want) 133 } 134} 135 136func TestSubscriptionErrors(t *testing.T) { 137 _, sclient, _, cleanup := newFake(context.TODO(), t) 138 defer cleanup() 139 140 ctx := context.Background() 141 142 checkCode := func(err error, want codes.Code) { 143 t.Helper() 144 if status.Code(err) != want { 145 t.Errorf("got %v, want code %s", err, want) 146 } 147 } 148 149 _, err := sclient.GetSubscription(ctx, &pb.GetSubscriptionRequest{}) 150 checkCode(err, codes.InvalidArgument) 151 _, err = sclient.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: "s"}) 152 checkCode(err, codes.NotFound) 153 _, err = sclient.UpdateSubscription(ctx, &pb.UpdateSubscriptionRequest{}) 154 checkCode(err, codes.InvalidArgument) 155 _, err = sclient.UpdateSubscription(ctx, &pb.UpdateSubscriptionRequest{Subscription: &pb.Subscription{}}) 156 checkCode(err, codes.InvalidArgument) 157 _, err = sclient.UpdateSubscription(ctx, &pb.UpdateSubscriptionRequest{Subscription: &pb.Subscription{Name: "s"}}) 158 checkCode(err, codes.NotFound) 159 _, err = sclient.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{}) 160 checkCode(err, codes.InvalidArgument) 161 _, err = sclient.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Subscription: "s"}) 162 checkCode(err, codes.NotFound) 163 _, err = sclient.Acknowledge(ctx, &pb.AcknowledgeRequest{}) 164 checkCode(err, codes.InvalidArgument) 165 _, err = sclient.Acknowledge(ctx, &pb.AcknowledgeRequest{Subscription: "s"}) 166 checkCode(err, codes.NotFound) 167 _, err = sclient.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{}) 168 checkCode(err, codes.InvalidArgument) 169 _, err = sclient.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{Subscription: "s"}) 170 checkCode(err, codes.NotFound) 171 _, err = sclient.Pull(ctx, &pb.PullRequest{}) 172 checkCode(err, codes.InvalidArgument) 173 _, err = sclient.Pull(ctx, &pb.PullRequest{Subscription: "s"}) 174 checkCode(err, codes.NotFound) 175 _, err = sclient.Seek(ctx, &pb.SeekRequest{}) 176 checkCode(err, codes.InvalidArgument) 177 srt := &pb.SeekRequest_Time{Time: ptypes.TimestampNow()} 178 _, err = sclient.Seek(ctx, &pb.SeekRequest{Target: srt}) 179 checkCode(err, codes.InvalidArgument) 180 _, err = sclient.Seek(ctx, &pb.SeekRequest{Target: srt, Subscription: "s"}) 181 checkCode(err, codes.NotFound) 182} 183 184func TestPublish(t *testing.T) { 185 s := NewServer() 186 defer s.Close() 187 188 var ids []string 189 for i := 0; i < 3; i++ { 190 ids = append(ids, s.Publish("projects/p/topics/t", []byte("hello"), nil)) 191 } 192 s.Wait() 193 ms := s.Messages() 194 if got, want := len(ms), len(ids); got != want { 195 t.Errorf("got %d messages, want %d", got, want) 196 } 197 for i, id := range ids { 198 if got, want := ms[i].ID, id; got != want { 199 t.Errorf("got %s, want %s", got, want) 200 } 201 } 202 203 m := s.Message(ids[1]) 204 if m == nil { 205 t.Error("got nil, want a message") 206 } 207} 208 209func TestClearMessages(t *testing.T) { 210 s := NewServer() 211 defer s.Close() 212 213 for i := 0; i < 3; i++ { 214 s.Publish("projects/p/topics/t", []byte("hello"), nil) 215 } 216 s.Wait() 217 msgs := s.Messages() 218 if got, want := len(msgs), 3; got != want { 219 t.Errorf("got %d messages, want %d", got, want) 220 } 221 s.ClearMessages() 222 msgs = s.Messages() 223 if got, want := len(msgs), 0; got != want { 224 t.Errorf("got %d messages, want %d", got, want) 225 } 226} 227 228// Note: this sets the fake's "now" time, so it is sensitive to concurrent changes to "now". 229func publish(t *testing.T, pclient pb.PublisherClient, topic *pb.Topic, messages []*pb.PubsubMessage) map[string]*pb.PubsubMessage { 230 pubTime := time.Now() 231 now.Store(func() time.Time { return pubTime }) 232 defer func() { now.Store(time.Now) }() 233 234 res, err := pclient.Publish(context.Background(), &pb.PublishRequest{ 235 Topic: topic.Name, 236 Messages: messages, 237 }) 238 if err != nil { 239 t.Fatal(err) 240 } 241 tsPubTime, err := ptypes.TimestampProto(pubTime) 242 if err != nil { 243 t.Fatal(err) 244 } 245 want := map[string]*pb.PubsubMessage{} 246 for i, id := range res.MessageIds { 247 want[id] = &pb.PubsubMessage{ 248 Data: messages[i].Data, 249 Attributes: messages[i].Attributes, 250 MessageId: id, 251 PublishTime: tsPubTime, 252 } 253 } 254 return want 255} 256 257func TestPull(t *testing.T) { 258 pclient, sclient, _, cleanup := newFake(context.TODO(), t) 259 defer cleanup() 260 261 top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"}) 262 sub := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{ 263 Name: "projects/P/subscriptions/S", 264 Topic: top.Name, 265 AckDeadlineSeconds: 10, 266 }) 267 268 want := publish(t, pclient, top, []*pb.PubsubMessage{ 269 {Data: []byte("d1")}, 270 {Data: []byte("d2")}, 271 {Data: []byte("d3")}, 272 }) 273 got := pubsubMessages(pullN(context.TODO(), t, len(want), sclient, sub)) 274 if diff := testutil.Diff(got, want); diff != "" { 275 t.Error(diff) 276 } 277 278 res, err := sclient.Pull(context.Background(), &pb.PullRequest{Subscription: sub.Name}) 279 if err != nil { 280 t.Fatal(err) 281 } 282 if len(res.ReceivedMessages) != 0 { 283 t.Errorf("got %d messages, want zero", len(res.ReceivedMessages)) 284 } 285} 286 287func TestStreamingPull(t *testing.T) { 288 // A simple test of streaming pull. 289 pclient, sclient, _, cleanup := newFake(context.TODO(), t) 290 defer cleanup() 291 292 top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"}) 293 sub := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{ 294 Name: "projects/P/subscriptions/S", 295 Topic: top.Name, 296 AckDeadlineSeconds: 10, 297 }) 298 299 want := publish(t, pclient, top, []*pb.PubsubMessage{ 300 {Data: []byte("d1")}, 301 {Data: []byte("d2")}, 302 {Data: []byte("d3")}, 303 }) 304 got := pubsubMessages(streamingPullN(context.TODO(), t, len(want), sclient, sub)) 305 if diff := testutil.Diff(got, want); diff != "" { 306 t.Error(diff) 307 } 308} 309 310// This test acks each message as it arrives and makes sure we don't see dups. 311func TestStreamingPullAck(t *testing.T) { 312 minAckDeadlineSecs = 1 313 pclient, sclient, _, cleanup := newFake(context.TODO(), t) 314 defer cleanup() 315 316 top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"}) 317 sub := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{ 318 Name: "projects/P/subscriptions/S", 319 Topic: top.Name, 320 AckDeadlineSeconds: 1, 321 }) 322 323 _ = publish(t, pclient, top, []*pb.PubsubMessage{ 324 {Data: []byte("d1")}, 325 {Data: []byte("d2")}, 326 {Data: []byte("d3")}, 327 }) 328 329 got := map[string]bool{} 330 ctx, cancel := context.WithCancel(context.Background()) 331 spc := mustStartStreamingPull(ctx, t, sclient, sub) 332 time.AfterFunc(time.Duration(2*minAckDeadlineSecs)*time.Second, cancel) 333 334 for i := 0; i < 4; i++ { 335 res, err := spc.Recv() 336 if err == io.EOF { 337 break 338 } 339 if err != nil { 340 if status.Code(err) == codes.Canceled { 341 break 342 } 343 t.Fatal(err) 344 } 345 if i == 3 { 346 t.Fatal("expected to only see 3 messages, got 4") 347 } 348 req := &pb.StreamingPullRequest{} 349 for _, m := range res.ReceivedMessages { 350 if got[m.Message.MessageId] { 351 t.Fatal("duplicate message") 352 } 353 got[m.Message.MessageId] = true 354 req.AckIds = append(req.AckIds, m.AckId) 355 } 356 if err := spc.Send(req); err != nil { 357 t.Fatal(err) 358 } 359 } 360} 361 362func TestAcknowledge(t *testing.T) { 363 ctx := context.Background() 364 pclient, sclient, srv, cleanup := newFake(context.TODO(), t) 365 defer cleanup() 366 367 top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"}) 368 sub := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{ 369 Name: "projects/P/subscriptions/S", 370 Topic: top.Name, 371 AckDeadlineSeconds: 10, 372 }) 373 374 publish(t, pclient, top, []*pb.PubsubMessage{ 375 {Data: []byte("d1")}, 376 {Data: []byte("d2")}, 377 {Data: []byte("d3")}, 378 }) 379 msgs := streamingPullN(context.TODO(), t, 3, sclient, sub) 380 var ackIDs []string 381 for _, m := range msgs { 382 ackIDs = append(ackIDs, m.AckId) 383 } 384 if _, err := sclient.Acknowledge(ctx, &pb.AcknowledgeRequest{ 385 Subscription: sub.Name, 386 AckIds: ackIDs, 387 }); err != nil { 388 t.Fatal(err) 389 } 390 smsgs := srv.Messages() 391 if got, want := len(smsgs), 3; got != want { 392 t.Fatalf("got %d messages, want %d", got, want) 393 } 394 for _, sm := range smsgs { 395 if sm.Acks != 1 { 396 t.Errorf("message %s: got %d acks, want 1", sm.ID, sm.Acks) 397 } 398 } 399} 400 401func TestModAck(t *testing.T) { 402 ctx := context.Background() 403 pclient, sclient, _, cleanup := newFake(context.TODO(), t) 404 defer cleanup() 405 406 top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"}) 407 sub := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{ 408 Name: "projects/P/subscriptions/S", 409 Topic: top.Name, 410 AckDeadlineSeconds: 10, 411 }) 412 413 publish(t, pclient, top, []*pb.PubsubMessage{ 414 {Data: []byte("d1")}, 415 {Data: []byte("d2")}, 416 {Data: []byte("d3")}, 417 }) 418 msgs := streamingPullN(context.TODO(), t, 3, sclient, sub) 419 var ackIDs []string 420 for _, m := range msgs { 421 ackIDs = append(ackIDs, m.AckId) 422 } 423 if _, err := sclient.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{ 424 Subscription: sub.Name, 425 AckIds: ackIDs, 426 AckDeadlineSeconds: 0, 427 }); err != nil { 428 t.Fatal(err) 429 } 430 // Having nacked all three messages, we should see them again. 431 msgs = streamingPullN(context.TODO(), t, 3, sclient, sub) 432 if got, want := len(msgs), 3; got != want { 433 t.Errorf("got %d messages, want %d", got, want) 434 } 435} 436 437func TestAckDeadline(t *testing.T) { 438 // Messages should be resent after they expire. 439 pclient, sclient, _, cleanup := newFake(context.TODO(), t) 440 defer cleanup() 441 442 minAckDeadlineSecs = 2 443 top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"}) 444 sub := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{ 445 Name: "projects/P/subscriptions/S", 446 Topic: top.Name, 447 AckDeadlineSeconds: minAckDeadlineSecs, 448 }) 449 450 _ = publish(t, pclient, top, []*pb.PubsubMessage{ 451 {Data: []byte("d1")}, 452 {Data: []byte("d2")}, 453 {Data: []byte("d3")}, 454 }) 455 456 got := map[string]int{} 457 spc := mustStartStreamingPull(context.TODO(), t, sclient, sub) 458 // In 5 seconds the ack deadline will expire twice, so we should see each message 459 // exactly three times. 460 time.AfterFunc(5*time.Second, func() { 461 if err := spc.CloseSend(); err != nil { 462 t.Errorf("CloseSend: %v", err) 463 } 464 }) 465 for { 466 res, err := spc.Recv() 467 if err == io.EOF { 468 break 469 } 470 if err != nil { 471 t.Fatal(err) 472 } 473 for _, m := range res.ReceivedMessages { 474 got[m.Message.MessageId]++ 475 } 476 } 477 for id, n := range got { 478 if n != 3 { 479 t.Errorf("message %s: saw %d times, want 3", id, n) 480 } 481 } 482} 483 484func TestMultiSubs(t *testing.T) { 485 // Each subscription gets every message. 486 pclient, sclient, _, cleanup := newFake(context.TODO(), t) 487 defer cleanup() 488 489 top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"}) 490 sub1 := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{ 491 Name: "projects/P/subscriptions/S1", 492 Topic: top.Name, 493 AckDeadlineSeconds: 10, 494 }) 495 sub2 := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{ 496 Name: "projects/P/subscriptions/S2", 497 Topic: top.Name, 498 AckDeadlineSeconds: 10, 499 }) 500 501 want := publish(t, pclient, top, []*pb.PubsubMessage{ 502 {Data: []byte("d1")}, 503 {Data: []byte("d2")}, 504 {Data: []byte("d3")}, 505 }) 506 got1 := pubsubMessages(streamingPullN(context.TODO(), t, len(want), sclient, sub1)) 507 got2 := pubsubMessages(streamingPullN(context.TODO(), t, len(want), sclient, sub2)) 508 if diff := testutil.Diff(got1, want); diff != "" { 509 t.Error(diff) 510 } 511 if diff := testutil.Diff(got2, want); diff != "" { 512 t.Error(diff) 513 } 514} 515 516// Messages are handed out to all streams of a subscription in a best-effort 517// round-robin behavior. The fake server prefers to fail-fast onto another 518// stream when one stream is already busy, though, so we're unable to test 519// strict round robin behavior. 520func TestMultiStreams(t *testing.T) { 521 ctx, cancel := context.WithCancel(context.Background()) 522 defer cancel() 523 pclient, sclient, _, cleanup := newFake(ctx, t) 524 defer cleanup() 525 526 top := mustCreateTopic(ctx, t, pclient, &pb.Topic{Name: "projects/P/topics/T"}) 527 sub := mustCreateSubscription(ctx, t, sclient, &pb.Subscription{ 528 Name: "projects/P/subscriptions/S", 529 Topic: top.Name, 530 AckDeadlineSeconds: 10, 531 }) 532 st1 := mustStartStreamingPull(ctx, t, sclient, sub) 533 defer st1.CloseSend() 534 st1Received := make(chan struct{}) 535 go func() { 536 _, err := st1.Recv() 537 if err != nil { 538 t.Error(err) 539 } 540 close(st1Received) 541 }() 542 543 st2 := mustStartStreamingPull(ctx, t, sclient, sub) 544 defer st2.CloseSend() 545 st2Received := make(chan struct{}) 546 go func() { 547 _, err := st2.Recv() 548 if err != nil { 549 t.Error(err) 550 } 551 close(st2Received) 552 }() 553 554 publish(t, pclient, top, []*pb.PubsubMessage{ 555 {Data: []byte("d1")}, 556 {Data: []byte("d2")}, 557 }) 558 559 timeout := time.After(5 * time.Second) 560 select { 561 case <-timeout: 562 t.Fatal("timed out waiting for stream 1 to receive any message") 563 case <-st1Received: 564 } 565 select { 566 case <-timeout: 567 t.Fatal("timed out waiting for stream 1 to receive any message") 568 case <-st2Received: 569 } 570} 571 572func TestStreamingPullTimeout(t *testing.T) { 573 pclient, sclient, srv, cleanup := newFake(context.TODO(), t) 574 defer cleanup() 575 576 timeout := 200 * time.Millisecond 577 srv.SetStreamTimeout(timeout) 578 top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"}) 579 sub := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{ 580 Name: "projects/P/subscriptions/S", 581 Topic: top.Name, 582 AckDeadlineSeconds: 10, 583 }) 584 stream := mustStartStreamingPull(context.TODO(), t, sclient, sub) 585 time.Sleep(2 * timeout) 586 _, err := stream.Recv() 587 if err != io.EOF { 588 t.Errorf("got %v, want io.EOF", err) 589 } 590} 591 592func TestSeek(t *testing.T) { 593 pclient, sclient, _, cleanup := newFake(context.TODO(), t) 594 defer cleanup() 595 596 top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"}) 597 sub := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{ 598 Name: "projects/P/subscriptions/S", 599 Topic: top.Name, 600 AckDeadlineSeconds: 10, 601 }) 602 ts := ptypes.TimestampNow() 603 _, err := sclient.Seek(context.Background(), &pb.SeekRequest{ 604 Subscription: sub.Name, 605 Target: &pb.SeekRequest_Time{Time: ts}, 606 }) 607 if err != nil { 608 t.Errorf("Seeking: %v", err) 609 } 610} 611 612func TestTryDeliverMessage(t *testing.T) { 613 for _, test := range []struct { 614 availStreamIdx int 615 expectedOutIdx int 616 }{ 617 {availStreamIdx: 0, expectedOutIdx: 0}, 618 // Stream 1 will always be marked for deletion. 619 {availStreamIdx: 2, expectedOutIdx: 1}, // s0, s1 (deleted), s2, s3 becomes s0, s2, s3. So we expect outIdx=1. 620 {availStreamIdx: 3, expectedOutIdx: 2}, // s0, s1 (deleted), s2, s3 becomes s0, s2, s3. So we expect outIdx=2. 621 } { 622 top := newTopic(&pb.Topic{Name: "some-topic"}) 623 sub := newSubscription(top, &sync.Mutex{}, &pb.Subscription{Name: "some-sub", Topic: "some-topic"}) 624 625 done := make(chan struct{}, 1) 626 done <- struct{}{} 627 sub.streams = []*stream{{}, {done: done}, {}, {}} 628 629 msgc := make(chan *pb.ReceivedMessage, 1) 630 sub.streams[test.availStreamIdx].msgc = msgc 631 632 var d int 633 idx, ok := sub.tryDeliverMessage(&message{deliveries: &d}, 0, time.Now()) 634 if !ok { 635 t.Fatalf("[avail=%d]: expected msg to be put on stream %d's channel, but it was not", test.availStreamIdx, test.expectedOutIdx) 636 } 637 if idx != test.expectedOutIdx { 638 t.Fatalf("[avail=%d]: expected msg to be put on stream %d, but it was put on %d", test.availStreamIdx, test.expectedOutIdx, idx) 639 } 640 select { 641 case <-msgc: 642 default: 643 t.Fatalf("[avail=%d]: expected msg to be put on stream %d's channel, but it was not", test.availStreamIdx, idx) 644 } 645 } 646} 647 648func mustStartStreamingPull(ctx context.Context, t *testing.T, sc pb.SubscriberClient, sub *pb.Subscription) pb.Subscriber_StreamingPullClient { 649 spc, err := sc.StreamingPull(ctx) 650 if err != nil { 651 t.Fatal(err) 652 } 653 if err := spc.Send(&pb.StreamingPullRequest{Subscription: sub.Name}); err != nil { 654 t.Fatal(err) 655 } 656 return spc 657} 658 659func pullN(ctx context.Context, t *testing.T, n int, sc pb.SubscriberClient, sub *pb.Subscription) map[string]*pb.ReceivedMessage { 660 got := map[string]*pb.ReceivedMessage{} 661 for i := 0; len(got) < n; i++ { 662 res, err := sc.Pull(ctx, &pb.PullRequest{Subscription: sub.Name, MaxMessages: int32(n - len(got))}) 663 if err != nil { 664 t.Fatal(err) 665 } 666 for _, m := range res.ReceivedMessages { 667 got[m.Message.MessageId] = m 668 } 669 } 670 return got 671} 672 673func streamingPullN(ctx context.Context, t *testing.T, n int, sc pb.SubscriberClient, sub *pb.Subscription) map[string]*pb.ReceivedMessage { 674 spc := mustStartStreamingPull(ctx, t, sc, sub) 675 got := map[string]*pb.ReceivedMessage{} 676 for i := 0; i < n; i++ { 677 res, err := spc.Recv() 678 if err != nil { 679 t.Fatal(err) 680 } 681 for _, m := range res.ReceivedMessages { 682 got[m.Message.MessageId] = m 683 } 684 } 685 if err := spc.CloseSend(); err != nil { 686 t.Fatal(err) 687 } 688 res, err := spc.Recv() 689 if err != io.EOF { 690 t.Fatalf("Recv returned <%v> instead of EOF; res = %v", err, res) 691 } 692 return got 693} 694 695func pubsubMessages(rms map[string]*pb.ReceivedMessage) map[string]*pb.PubsubMessage { 696 ms := map[string]*pb.PubsubMessage{} 697 for k, rm := range rms { 698 ms[k] = rm.Message 699 } 700 return ms 701} 702 703func mustCreateTopic(ctx context.Context, t *testing.T, pc pb.PublisherClient, topic *pb.Topic) *pb.Topic { 704 top, err := pc.CreateTopic(ctx, topic) 705 if err != nil { 706 t.Fatal(err) 707 } 708 return top 709} 710 711func mustCreateSubscription(ctx context.Context, t *testing.T, sc pb.SubscriberClient, sub *pb.Subscription) *pb.Subscription { 712 sub, err := sc.CreateSubscription(ctx, sub) 713 if err != nil { 714 t.Fatal(err) 715 } 716 return sub 717} 718 719// newFake creates a new fake server along with a publisher and subscriber 720// client. Its final return is a cleanup function. 721// 722// Note: be sure to call cleanup! 723func newFake(ctx context.Context, t *testing.T) (pb.PublisherClient, pb.SubscriberClient, *Server, func()) { 724 srv := NewServer() 725 conn, err := grpc.DialContext(ctx, srv.Addr, grpc.WithInsecure()) 726 if err != nil { 727 t.Fatal(err) 728 } 729 return pb.NewPublisherClient(conn), pb.NewSubscriberClient(conn), srv, func() { 730 srv.Close() 731 conn.Close() 732 } 733} 734