1// Copyright 2017 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package pubsub 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "reflect" 22 "sync" 23 "sync/atomic" 24 "testing" 25 "time" 26 27 "cloud.google.com/go/internal/testutil" 28 "cloud.google.com/go/pubsub/pstest" 29 "google.golang.org/api/option" 30 "google.golang.org/grpc" 31 "google.golang.org/grpc/codes" 32 "google.golang.org/grpc/status" 33) 34 35var ( 36 projName = "some-project" 37 topicName = "some-topic" 38 fullyQualifiedTopicName = fmt.Sprintf("projects/%s/topics/%s", projName, topicName) 39) 40 41func TestSplitRequestIDs(t *testing.T) { 42 t.Parallel() 43 ids := []string{"aaaa", "bbbb", "cccc", "dddd", "eeee"} 44 for _, test := range []struct { 45 ids []string 46 splitIndex int 47 }{ 48 {[]string{}, 0}, 49 {ids, 2}, 50 {ids[:2], 2}, 51 } { 52 got1, got2 := splitRequestIDs(test.ids, reqFixedOverhead+20) 53 want1, want2 := test.ids[:test.splitIndex], test.ids[test.splitIndex:] 54 if !testutil.Equal(got1, want1) { 55 t.Errorf("%v, 1: got %v, want %v", test, got1, want1) 56 } 57 if !testutil.Equal(got2, want2) { 58 t.Errorf("%v, 2: got %v, want %v", test, got2, want2) 59 } 60 } 61} 62 63func TestAckDistribution(t *testing.T) { 64 if testing.Short() { 65 t.SkipNow() 66 } 67 t.Skip("broken") 68 69 ctx, cancel := context.WithCancel(context.Background()) 70 defer cancel() 71 72 minAckDeadline = 1 * time.Second 73 pstest.SetMinAckDeadline(minAckDeadline) 74 srv := pstest.NewServer() 75 defer srv.Close() 76 defer pstest.ResetMinAckDeadline() 77 78 // Create the topic via a Publish. It's convenient to do it here as opposed to client.CreateTopic because the client 79 // has not been established yet, and also because we want to create the topic once whereas the client is established 80 // below twice. 81 srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil) 82 83 queuedMsgs := make(chan int32, 1024) 84 go continuouslySend(ctx, srv, queuedMsgs) 85 86 for _, testcase := range []struct { 87 initialProcessSecs int32 88 finalProcessSecs int32 89 }{ 90 {initialProcessSecs: 3, finalProcessSecs: 5}, // Process time goes up 91 {initialProcessSecs: 5, finalProcessSecs: 3}, // Process time goes down 92 } { 93 t.Logf("Testing %d -> %d", testcase.initialProcessSecs, testcase.finalProcessSecs) 94 95 // processTimeSecs is used by the sender to coordinate with the receiver how long the receiver should 96 // pretend to process for. e.g. if we test 3s -> 5s, processTimeSecs will start at 3, causing receiver 97 // to process messages received for 3s while sender sends the first batch. Then, as sender begins to 98 // send the next batch, sender will swap processTimeSeconds to 5s and begin sending, and receiver will 99 // process each message for 5s. In this way we simulate a client whose time-to-ack (process time) changes. 100 processTimeSecs := testcase.initialProcessSecs 101 102 s, client, err := initConn(ctx, srv.Addr) 103 if err != nil { 104 t.Fatal(err) 105 } 106 107 // recvdWg increments for each message sent, and decrements for each message received. 108 recvdWg := &sync.WaitGroup{} 109 110 go startReceiving(ctx, t, s, recvdWg, &processTimeSecs) 111 startSending(t, queuedMsgs, &processTimeSecs, testcase.initialProcessSecs, testcase.finalProcessSecs, recvdWg) 112 113 recvdWg.Wait() 114 time.Sleep(100 * time.Millisecond) // Wait a bit more for resources to clean up 115 err = client.Close() 116 if err != nil { 117 t.Fatal(err) 118 } 119 120 modacks := modacksByTime(srv.Messages()) 121 u := modackDeadlines(modacks) 122 initialDL := int32(minAckDeadline / time.Second) 123 if !setsAreEqual(u, []int32{initialDL, testcase.initialProcessSecs, testcase.finalProcessSecs}) { 124 t.Fatalf("Expected modack deadlines to contain (exactly, and only) %ds, %ds, %ds. Instead, got %v", 125 initialDL, testcase.initialProcessSecs, testcase.finalProcessSecs, toSet(u)) 126 } 127 } 128} 129 130// modacksByTime buckets modacks by time. 131func modacksByTime(msgs []*pstest.Message) map[time.Time][]pstest.Modack { 132 modacks := map[time.Time][]pstest.Modack{} 133 134 for _, msg := range msgs { 135 for _, m := range msg.Modacks { 136 modacks[m.ReceivedAt] = append(modacks[m.ReceivedAt], m) 137 } 138 } 139 return modacks 140} 141 142// setsAreEqual reports whether a and b contain the same values, ignoring duplicates. 143func setsAreEqual(haystack, needles []int32) bool { 144 hMap := map[int32]bool{} 145 nMap := map[int32]bool{} 146 147 for _, n := range needles { 148 nMap[n] = true 149 } 150 151 for _, n := range haystack { 152 hMap[n] = true 153 } 154 155 return reflect.DeepEqual(nMap, hMap) 156} 157 158// startReceiving pretends to be a client. It calls s.Receive and acks messages after some random delay. It also 159// looks out for dupes - any message that arrives twice will cause a failure. 160func startReceiving(ctx context.Context, t *testing.T, s *Subscription, recvdWg *sync.WaitGroup, processTimeSecs *int32) { 161 t.Log("Receiving..") 162 163 var recvdMu sync.Mutex 164 recvd := map[string]bool{} 165 166 err := s.Receive(ctx, func(ctx context.Context, msg *Message) { 167 msgData := string(msg.Data) 168 recvdMu.Lock() 169 _, ok := recvd[msgData] 170 if ok { 171 recvdMu.Unlock() 172 t.Fatalf("already saw \"%s\"\n", msgData) 173 return 174 } 175 recvd[msgData] = true 176 recvdMu.Unlock() 177 178 select { 179 case <-ctx.Done(): 180 msg.Nack() 181 recvdWg.Done() 182 case <-time.After(time.Duration(atomic.LoadInt32(processTimeSecs)) * time.Second): 183 msg.Ack() 184 recvdWg.Done() 185 } 186 }) 187 if err != nil { 188 if status.Code(err) != codes.Canceled { 189 t.Error(err) 190 } 191 } 192} 193 194// startSending sends four batches of messages broken up by minDeadline, initialProcessSecs, and finalProcessSecs. 195func startSending(t *testing.T, queuedMsgs chan int32, processTimeSecs *int32, initialProcessSecs int32, finalProcessSecs int32, recvdWg *sync.WaitGroup) { 196 var msg int32 197 198 // We must send this block to force the receiver to send its initially-configured modack time. The time that 199 // gets sent should be ignorant of the distribution, since there haven't been enough (any, actually) messages 200 // to create a distribution yet. 201 t.Log("minAckDeadlineSecsSending an initial message") 202 recvdWg.Add(1) 203 msg++ 204 queuedMsgs <- msg 205 <-time.After(minAckDeadline) 206 207 t.Logf("Sending some messages to update distribution to %d. This new distribution will be used "+ 208 "when the next batch of messages go out.", initialProcessSecs) 209 for i := 0; i < 10; i++ { 210 recvdWg.Add(1) 211 msg++ 212 queuedMsgs <- msg 213 } 214 atomic.SwapInt32(processTimeSecs, finalProcessSecs) 215 <-time.After(time.Duration(initialProcessSecs) * time.Second) 216 217 t.Logf("Sending many messages to update distribution to %d. This new distribution will be used "+ 218 "when the next batch of messages go out.", finalProcessSecs) 219 for i := 0; i < 100; i++ { 220 recvdWg.Add(1) 221 msg++ 222 queuedMsgs <- msg // Send many messages to drastically change distribution 223 } 224 <-time.After(time.Duration(finalProcessSecs) * time.Second) 225 226 t.Logf("Last message going out, whose deadline should be %d.", finalProcessSecs) 227 recvdWg.Add(1) 228 msg++ 229 queuedMsgs <- msg 230} 231 232// continuouslySend continuously sends messages that exist on the queuedMsgs chan. 233func continuouslySend(ctx context.Context, srv *pstest.Server, queuedMsgs chan int32) { 234 for { 235 select { 236 case <-ctx.Done(): 237 return 238 case m := <-queuedMsgs: 239 srv.Publish(fullyQualifiedTopicName, []byte(fmt.Sprintf("message %d", m)), nil) 240 } 241 } 242} 243 244func toSet(arr []int32) []int32 { 245 var s []int32 246 m := map[int32]bool{} 247 248 for _, v := range arr { 249 _, ok := m[v] 250 if !ok { 251 s = append(s, v) 252 m[v] = true 253 } 254 } 255 256 return s 257 258} 259 260func initConn(ctx context.Context, addr string) (*Subscription, *Client, error) { 261 conn, err := grpc.Dial(addr, grpc.WithInsecure()) 262 if err != nil { 263 return nil, nil, err 264 } 265 client, err := NewClient(ctx, projName, option.WithGRPCConn(conn)) 266 if err != nil { 267 return nil, nil, err 268 } 269 270 topic := client.Topic(topicName) 271 s, err := client.CreateSubscription(ctx, fmt.Sprintf("sub-%d", time.Now().UnixNano()), SubscriptionConfig{Topic: topic}) 272 if err != nil { 273 return nil, nil, err 274 } 275 276 exists, err := s.Exists(ctx) 277 if !exists { 278 return nil, nil, errors.New("Subscription does not exist") 279 } 280 if err != nil { 281 return nil, nil, err 282 } 283 284 return s, client, nil 285} 286 287// modackDeadlines takes a map of time => Modack, gathers all the Modack.AckDeadlines, 288// and returns them as a slice 289func modackDeadlines(m map[time.Time][]pstest.Modack) []int32 { 290 var u []int32 291 for _, vv := range m { 292 for _, v := range vv { 293 u = append(u, v.AckDeadline) 294 } 295 } 296 return u 297} 298